diff --git a/Cargo.lock b/Cargo.lock index 9d8a9643..5b1f4da8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -762,6 +762,7 @@ version = "0.0.1" dependencies = [ "ecstore", "flatbuffers", + "futures", "lazy_static", "lock", "madmin", diff --git a/e2e_test/Cargo.toml b/e2e_test/Cargo.toml index 58974bad..0a8d98d5 100644 --- a/e2e_test/Cargo.toml +++ b/e2e_test/Cargo.toml @@ -14,6 +14,7 @@ workspace = true [dependencies] ecstore.workspace = true flatbuffers.workspace = true +futures.workspace = true lazy_static.workspace = true lock.workspace = true protos.workspace = true diff --git a/e2e_test/src/reliant/node_interact_test.rs b/e2e_test/src/reliant/node_interact_test.rs index 7f912870..cc81204d 100644 --- a/e2e_test/src/reliant/node_interact_test.rs +++ b/e2e_test/src/reliant/node_interact_test.rs @@ -1,7 +1,8 @@ #![cfg(test)] use ecstore::disk::{MetaCacheEntry, VolumeInfo, WalkDirOptions}; -use ecstore::metacache::writer::MetacacheWriter; +use ecstore::metacache::writer::{MetacacheReader, MetacacheWriter}; +use futures::future::join_all; use protos::proto_gen::node_service::WalkDirRequest; use protos::{ models::{PingBody, PingBodyBuilder}, @@ -14,6 +15,7 @@ use rmp_serde::Deserializer; use serde::Deserialize; use std::{error::Error, io::Cursor}; use tokio::io::AsyncWrite; +use tokio::spawn; use tokio::sync::mpsc; use tonic::codegen::tokio_stream::wrappers::ReceiverStream; use tonic::codegen::tokio_stream::StreamExt; @@ -95,7 +97,7 @@ async fn list_volumes() -> Result<(), Box> { } #[tokio::test] -async fn walk_dir() -> Result<(), Box> { +async fn walk_dir() -> Result<(), Box> { println!("walk_dir"); // TODO: use writer let opts = WalkDirOptions { @@ -105,30 +107,47 @@ async fn walk_dir() -> Result<(), Box> ..Default::default() }; let (rd, mut wr) = tokio::io::duplex(1024); - let mut out = MetacacheWriter::new(&mut wr); let walk_dir_options = serde_json::to_string(&opts)?; let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?; let request = Request::new(WalkDirRequest { - disk: "data".to_string(), + disk: "/home/dandan/code/rust/s3-rustfs/target/debug/data".to_string(), walk_dir_options, }); let mut response = client.walk_dir(request).await?.into_inner(); - loop { - match response.next().await { - Some(Ok(resp)) => { - if !resp.success { - println!("{}", resp.error_info.unwrap_or("".to_string())); + let job1 = spawn(async move { + let mut out = MetacacheWriter::new(&mut wr); + loop { + match response.next().await { + Some(Ok(resp)) => { + if !resp.success { + println!("{}", resp.error_info.unwrap_or("".to_string())); + } + let entry = serde_json::from_str::(&resp.meta_cache_entry) + .map_err(|e| ecstore::error::Error::from_string(format!("Unexpected response: {:?}", response))) + .unwrap(); + out.write_obj(&entry).await.unwrap(); + } + None => { + let _ = out.close().await; + break; + } + _ => { + println!("Unexpected response: {:?}", response); + let _ = out.close().await; + break; } - let entry = serde_json::from_str::(&resp.meta_cache_entry) - .map_err(|e| Err(ecstore::error::Error::from_string(format!("Unexpected response: {:?}", response))))?; - out.write_obj(&entry)?; } - None => break, - _ => println!("Unexpected response: {:?}", response), } - } + }); + let job2 = spawn(async move { + let mut reader = MetacacheReader::new(rd); + while let Ok(Some(entry)) = reader.peek().await { + println!("{:?}", entry); + } + }); + join_all(vec![job1, job2]).await; Ok(()) } diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs index 67caf5a1..1d3693e9 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/disk/remote.rs @@ -373,7 +373,7 @@ impl DiskAPI for RemoteDisk { return Err(Error::from_string(resp.error_info.unwrap_or("".to_string()))); } let entry = serde_json::from_str::(&resp.meta_cache_entry) - .map_err(|e| Error::from_string(format!("Unexpected response: {:?}", response)))?; + .map_err(|_| Error::from_string(format!("Unexpected response: {:?}", response)))?; out.write_obj(&entry).await?; } None => break, diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index 63ae2489..bae37e14 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -796,7 +796,7 @@ impl Node for NodeService { join_all(vec![job1, job2]).await; }); } else { - return Err(Status::invalid_argument("invalid disk")); + return Err(Status::invalid_argument(format!("invalid disk, all disk: {:?}", self.all_disk().await))); } let out_stream = ReceiverStream::new(rx);