add test case

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-12-23 16:21:27 +08:00
parent ab26b7c1ca
commit c078a4e04c
5 changed files with 38 additions and 17 deletions

1
Cargo.lock generated
View File

@@ -762,6 +762,7 @@ version = "0.0.1"
dependencies = [
"ecstore",
"flatbuffers",
"futures",
"lazy_static",
"lock",
"madmin",

View File

@@ -14,6 +14,7 @@ workspace = true
[dependencies]
ecstore.workspace = true
flatbuffers.workspace = true
futures.workspace = true
lazy_static.workspace = true
lock.workspace = true
protos.workspace = true

View File

@@ -1,7 +1,8 @@
#![cfg(test)]
use ecstore::disk::{MetaCacheEntry, VolumeInfo, WalkDirOptions};
use ecstore::metacache::writer::MetacacheWriter;
use ecstore::metacache::writer::{MetacacheReader, MetacacheWriter};
use futures::future::join_all;
use protos::proto_gen::node_service::WalkDirRequest;
use protos::{
models::{PingBody, PingBodyBuilder},
@@ -14,6 +15,7 @@ use rmp_serde::Deserializer;
use serde::Deserialize;
use std::{error::Error, io::Cursor};
use tokio::io::AsyncWrite;
use tokio::spawn;
use tokio::sync::mpsc;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::tokio_stream::StreamExt;
@@ -95,7 +97,7 @@ async fn list_volumes() -> Result<(), Box<dyn Error>> {
}
#[tokio::test]
async fn walk_dir<W: AsyncWrite + Unpin + Send>() -> Result<(), Box<dyn Error>> {
async fn walk_dir() -> Result<(), Box<dyn Error>> {
println!("walk_dir");
// TODO: use writer
let opts = WalkDirOptions {
@@ -105,30 +107,47 @@ async fn walk_dir<W: AsyncWrite + Unpin + Send>() -> Result<(), Box<dyn Error>>
..Default::default()
};
let (rd, mut wr) = tokio::io::duplex(1024);
let mut out = MetacacheWriter::new(&mut wr);
let walk_dir_options = serde_json::to_string(&opts)?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let request = Request::new(WalkDirRequest {
disk: "data".to_string(),
disk: "/home/dandan/code/rust/s3-rustfs/target/debug/data".to_string(),
walk_dir_options,
});
let mut response = client.walk_dir(request).await?.into_inner();
loop {
match response.next().await {
Some(Ok(resp)) => {
if !resp.success {
println!("{}", resp.error_info.unwrap_or("".to_string()));
let job1 = spawn(async move {
let mut out = MetacacheWriter::new(&mut wr);
loop {
match response.next().await {
Some(Ok(resp)) => {
if !resp.success {
println!("{}", resp.error_info.unwrap_or("".to_string()));
}
let entry = serde_json::from_str::<MetaCacheEntry>(&resp.meta_cache_entry)
.map_err(|e| ecstore::error::Error::from_string(format!("Unexpected response: {:?}", response)))
.unwrap();
out.write_obj(&entry).await.unwrap();
}
None => {
let _ = out.close().await;
break;
}
_ => {
println!("Unexpected response: {:?}", response);
let _ = out.close().await;
break;
}
let entry = serde_json::from_str::<MetaCacheEntry>(&resp.meta_cache_entry)
.map_err(|e| Err(ecstore::error::Error::from_string(format!("Unexpected response: {:?}", response))))?;
out.write_obj(&entry)?;
}
None => break,
_ => println!("Unexpected response: {:?}", response),
}
}
});
let job2 = spawn(async move {
let mut reader = MetacacheReader::new(rd);
while let Ok(Some(entry)) = reader.peek().await {
println!("{:?}", entry);
}
});
join_all(vec![job1, job2]).await;
Ok(())
}

View File

@@ -373,7 +373,7 @@ impl DiskAPI for RemoteDisk {
return Err(Error::from_string(resp.error_info.unwrap_or("".to_string())));
}
let entry = serde_json::from_str::<MetaCacheEntry>(&resp.meta_cache_entry)
.map_err(|e| Error::from_string(format!("Unexpected response: {:?}", response)))?;
.map_err(|_| Error::from_string(format!("Unexpected response: {:?}", response)))?;
out.write_obj(&entry).await?;
}
None => break,

View File

@@ -796,7 +796,7 @@ impl Node for NodeService {
join_all(vec![job1, job2]).await;
});
} else {
return Err(Status::invalid_argument("invalid disk"));
return Err(Status::invalid_argument(format!("invalid disk, all disk: {:?}", self.all_disk().await)));
}
let out_stream = ReceiverStream::new(rx);