support stream read

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-09-13 15:17:10 +08:00
parent 266d5618ae
commit c25cf508ff
5 changed files with 188 additions and 97 deletions

View File

@@ -720,18 +720,18 @@ pub mod node_service_client {
/// rpc Append(AppendRequest) returns (AppendResponse) {};
pub async fn read_at(
&mut self,
request: impl tonic::IntoRequest<super::ReadAtRequest>,
) -> std::result::Result<tonic::Response<super::ReadAtResponse>, tonic::Status> {
request: impl tonic::IntoStreamingRequest<Message = super::ReadAtRequest>,
) -> std::result::Result<tonic::Response<tonic::codec::Streaming<super::ReadAtResponse>>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadAt");
let mut req = request.into_request();
let mut req = request.into_streaming_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "ReadAt"));
self.inner.unary(req, path, codec).await
self.inner.streaming(req, path, codec).await
}
pub async fn list_dir(
&mut self,
@@ -986,11 +986,15 @@ pub mod node_service_server {
&self,
request: tonic::Request<tonic::Streaming<super::WriteRequest>>,
) -> std::result::Result<tonic::Response<Self::WriteStreamStream>, tonic::Status>;
/// Server streaming response type for the ReadAt method.
type ReadAtStream: tonic::codegen::tokio_stream::Stream<Item = std::result::Result<super::ReadAtResponse, tonic::Status>>
+ Send
+ 'static;
/// rpc Append(AppendRequest) returns (AppendResponse) {};
async fn read_at(
&self,
request: tonic::Request<super::ReadAtRequest>,
) -> std::result::Result<tonic::Response<super::ReadAtResponse>, tonic::Status>;
request: tonic::Request<tonic::Streaming<super::ReadAtRequest>>,
) -> std::result::Result<tonic::Response<Self::ReadAtStream>, tonic::Status>;
async fn list_dir(
&self,
request: tonic::Request<super::ListDirRequest>,
@@ -1426,10 +1430,11 @@ pub mod node_service_server {
"/node_service.NodeService/ReadAt" => {
#[allow(non_camel_case_types)]
struct ReadAtSvc<T: NodeService>(pub Arc<T>);
impl<T: NodeService> tonic::server::UnaryService<super::ReadAtRequest> for ReadAtSvc<T> {
impl<T: NodeService> tonic::server::StreamingService<super::ReadAtRequest> for ReadAtSvc<T> {
type Response = super::ReadAtResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(&mut self, request: tonic::Request<super::ReadAtRequest>) -> Self::Future {
type ResponseStream = T::ReadAtStream;
type Future = BoxFuture<tonic::Response<Self::ResponseStream>, tonic::Status>;
fn call(&mut self, request: tonic::Request<tonic::Streaming<super::ReadAtRequest>>) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { <T as NodeService>::read_at(&inner, request).await };
Box::pin(fut)
@@ -1446,7 +1451,7 @@ pub mod node_service_server {
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(accept_compression_encodings, send_compression_encodings)
.apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size);
let res = grpc.unary(method, req).await;
let res = grpc.streaming(method, req).await;
Ok(res)
};
Box::pin(fut)

View File

@@ -311,7 +311,7 @@ service NodeService {
rpc Write(WriteRequest) returns (WriteResponse) {};
rpc WriteStream(stream WriteRequest) returns (stream WriteResponse) {};
// rpc Append(AppendRequest) returns (AppendResponse) {};
rpc ReadAt(ReadAtRequest) returns (ReadAtResponse) {};
rpc ReadAt(stream ReadAtRequest) returns (stream ReadAtResponse) {};
rpc ListDir(ListDirRequest) returns (ListDirResponse) {};
rpc WalkDir(WalkDirRequest) returns (WalkDirResponse) {};
rpc RenameData(RenameDataRequest) returns (RenameDataResponse) {};

View File

@@ -20,7 +20,9 @@ use crate::{
};
use bytes::Bytes;
use futures::StreamExt;
use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, ReadAtRequest, WriteRequest};
use protos::proto_gen::node_service::{
node_service_client::NodeServiceClient, ReadAtRequest, ReadAtResponse, WriteRequest, WriteResponse,
};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, io::SeekFrom, path::PathBuf, sync::Arc};
use time::OffsetDateTime;
@@ -30,7 +32,7 @@ use tokio::{
sync::mpsc::{self, Sender},
};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Channel, Request};
use tonic::{transport::Channel, Streaming};
use tracing::info;
use uuid::Uuid;
@@ -354,10 +356,11 @@ pub struct RemoteFileWriter {
pub path: String,
pub is_append: bool,
tx: Sender<WriteRequest>,
resp_stream: Streaming<WriteResponse>,
}
impl RemoteFileWriter {
pub fn new(
pub async fn new(
root: PathBuf,
volume: String,
path: String,
@@ -367,26 +370,9 @@ impl RemoteFileWriter {
let (tx, rx) = mpsc::channel(128);
let in_stream = ReceiverStream::new(rx);
tokio::spawn(async move {
let response = client.write_stream(in_stream).await.unwrap();
let response = client.write_stream(in_stream).await.unwrap();
let mut resp_stream = response.into_inner();
while let Some(resp) = resp_stream.next().await {
match resp {
Ok(resp) => {
if resp.success {
info!("write stream success");
} else {
info!("write stream failed: {}", resp.error_info.unwrap_or("".to_string()));
}
}
Err(_err) => {
break;
}
}
}
});
let resp_stream = response.into_inner();
Ok(Self {
root,
@@ -394,6 +380,7 @@ impl RemoteFileWriter {
path,
is_append,
tx,
resp_stream,
})
}
}
@@ -408,7 +395,35 @@ impl Write for RemoteFileWriter {
is_append: self.is_append,
data: buf.to_vec(),
};
let _response = self.tx.send(request).await?;
self.tx.send(request).await?;
if let Some(resp) = self.resp_stream.next().await {
// match resp {
// Ok(resp) => {
// if resp.success {
// info!("write stream success");
// } else {
// info!("write stream failed: {}", resp.error_info.unwrap_or("".to_string()));
// }
// }
// Err(_err) => {
// }
// }
let resp = resp?;
if resp.success {
info!("write stream success");
} else {
let error_info = resp.error_info.unwrap_or("".to_string());
info!("write stream failed: {}", error_info);
return Err(Error::from_string(error_info));
}
} else {
let error_info = "can not get response";
info!("write stream failed: {}", error_info);
return Err(Error::from_string(error_info));
}
Ok(())
}
}
@@ -460,32 +475,55 @@ pub struct RemoteFileReader {
pub root: PathBuf,
pub volume: String,
pub path: String,
client: NodeServiceClient<Channel>,
tx: Sender<ReadAtRequest>,
resp_stream: Streaming<ReadAtResponse>,
}
impl RemoteFileReader {
pub fn new(root: PathBuf, volume: String, path: String, client: NodeServiceClient<Channel>) -> Self {
Self {
pub async fn new(root: PathBuf, volume: String, path: String, mut client: NodeServiceClient<Channel>) -> Result<Self> {
let (tx, rx) = mpsc::channel(128);
let in_stream = ReceiverStream::new(rx);
let response = client.read_at(in_stream).await.unwrap();
let resp_stream = response.into_inner();
Ok(Self {
root,
volume,
path,
client,
}
tx,
resp_stream,
})
}
}
#[async_trait::async_trait]
impl ReadAt for RemoteFileReader {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)> {
let request = Request::new(ReadAtRequest {
let request = ReadAtRequest {
disk: self.root.to_string_lossy().to_string(),
volume: self.volume.to_string(),
path: self.path.to_string(),
offset: offset.try_into().unwrap(),
length: length.try_into().unwrap(),
});
let response = self.client.read_at(request).await?.into_inner();
};
self.tx.send(request).await?;
Ok((response.data, response.read_size.try_into().unwrap()))
if let Some(resp) = self.resp_stream.next().await {
let resp = resp?;
if resp.success {
info!("read at stream success");
Ok((resp.data, resp.read_size.try_into().unwrap()))
} else {
let error_info = resp.error_info.unwrap_or("".to_string());
info!("read at stream failed: {}", error_info);
Err(Error::from_string(error_info))
}
} else {
let error_info = "can not get response";
info!("read at stream failed: {}", error_info);
Err(Error::from_string(error_info))
}
}
}

View File

@@ -196,34 +196,31 @@ impl DiskAPI for RemoteDisk {
async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, _file_size: usize) -> Result<FileWriter> {
info!("create_file");
Ok(FileWriter::Remote(RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
false,
self.get_client_v2().await?,
)?))
Ok(FileWriter::Remote(
RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
false,
self.get_client_v2().await?,
)
.await?,
))
}
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
info!("append_file");
Ok(FileWriter::Remote(RemoteFileWriter::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
true,
self.get_client_v2().await?,
)?))
Ok(FileWriter::Remote(
RemoteFileWriter::new(self.root.clone(), volume.to_string(), path.to_string(), true, self.get_client_v2().await?)
.await?,
))
}
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
info!("read_file");
Ok(FileReader::Remote(RemoteFileReader::new(
self.root.clone(),
volume.to_string(),
path.to_string(),
self.get_client_v2().await?,
)))
Ok(FileReader::Remote(
RemoteFileReader::new(self.root.clone(), volume.to_string(), path.to_string(), self.get_client_v2().await?).await?,
))
}
async fn list_dir(&self, _origvolume: &str, volume: &str, _dir_path: &str, _count: i32) -> Result<Vec<String>> {

View File

@@ -452,44 +452,95 @@ impl Node for NodeService {
Ok(tonic::Response::new(Box::pin(out_stream)))
}
async fn read_at(&self, request: Request<ReadAtRequest>) -> Result<Response<ReadAtResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {
match disk.read_file(&request.volume, &request.path).await {
Ok(mut file_reader) => {
match file_reader
.read_at(request.offset.try_into().unwrap(), request.length.try_into().unwrap())
type ReadAtStream = ResponseStream<ReadAtResponse>;
async fn read_at(&self, request: Request<Streaming<ReadAtRequest>>) -> Result<Response<Self::ReadAtStream>, Status> {
info!("read_at");
let mut in_stream = request.into_inner();
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
let mut file_ref = None;
while let Some(result) = in_stream.next().await {
match result {
Ok(v) => {
match file_ref.as_ref() {
Some(_) => (),
None => {
if let Some(disk) = find_local_disk(&v.disk).await {
match disk.read_file(&v.volume, &v.path).await {
Ok(file_reader) => file_ref = Some(file_reader),
Err(err) => {
tx.send(Ok(ReadAtResponse {
success: false,
data: Vec::new(),
error_info: Some(err.to_string()),
read_size: -1,
}))
.await
.expect("working rx");
break;
}
}
} else {
tx.send(Ok(ReadAtResponse {
success: false,
data: Vec::new(),
error_info: Some("can not find disk".to_string()),
read_size: -1,
}))
.await
.expect("working rx");
break;
}
}
};
match file_ref
.as_mut()
.unwrap()
.read_at(v.offset.try_into().unwrap(), v.length.try_into().unwrap())
.await
{
Ok((data, read_size)) => tx.send(Ok(ReadAtResponse {
success: true,
data,
read_size: read_size.try_into().unwrap(),
error_info: None,
})),
Err(err) => tx.send(Ok(ReadAtResponse {
success: false,
data: Vec::new(),
error_info: Some(err.to_string()),
read_size: -1,
})),
}
.await
{
Ok((data, read_size)) => Ok(tonic::Response::new(ReadAtResponse {
success: true,
data,
read_size: read_size.try_into().unwrap(),
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(ReadAtResponse {
success: false,
data: Vec::new(),
read_size: -1,
error_info: Some(err.to_string()),
})),
.unwrap();
}
Err(err) => {
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
// here you can handle special case when client
// disconnected in unexpected way
eprintln!("\tclient disconnected: broken pipe");
break;
}
}
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break, // response was dropped
}
}
}
Err(err) => Ok(tonic::Response::new(ReadAtResponse {
success: false,
data: Vec::new(),
read_size: -1,
error_info: Some(err.to_string()),
})),
}
} else {
Ok(tonic::Response::new(ReadAtResponse {
success: false,
data: Vec::new(),
read_size: -1,
error_info: Some("can not find disk".to_string()),
}))
}
println!("\tstream ended");
});
let out_stream = ReceiverStream::new(rx);
Ok(tonic::Response::new(Box::pin(out_stream)))
}
async fn list_dir(&self, request: Request<ListDirRequest>) -> Result<Response<ListDirResponse>, Status> {