Merge branch 'list-objects-rpc' into list-objects

This commit is contained in:
weisd
2024-12-24 20:02:12 +08:00
8 changed files with 1074 additions and 3230 deletions

1
Cargo.lock generated
View File

@@ -762,6 +762,7 @@ version = "0.0.1"
dependencies = [
"ecstore",
"flatbuffers",
"futures",
"lazy_static",
"lock",
"madmin",

View File

@@ -1,10 +1,9 @@
// automatically generated by the FlatBuffers compiler, do not modify
// @generated
use core::mem;
use core::cmp::Ordering;
use core::mem;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
@@ -12,112 +11,114 @@ use self::flatbuffers::{EndianScalar, Follow};
#[allow(unused_imports, dead_code)]
pub mod models {
use core::mem;
use core::cmp::Ordering;
use core::cmp::Ordering;
use core::mem;
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub enum PingBodyOffset {}
#[derive(Copy, Clone, PartialEq)]
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self { _tab: flatbuffers::Table::new(buf, loc) }
}
}
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload { builder.add_payload(x); }
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe { self._tab.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(
v: &mut flatbuffers::Verifier, pos: usize
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs {
payload: None,
pub struct PingBody<'a> {
pub _tab: flatbuffers::Table<'a>,
}
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b , u8>>) {
self.fbb_.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
impl<'a> flatbuffers::Follow<'a> for PingBody<'a> {
type Inner = PingBody<'a>;
#[inline]
unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner {
Self {
_tab: flatbuffers::Table::new(buf, loc),
}
}
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models
impl<'a> PingBody<'a> {
pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4;
pub const fn get_fully_qualified_name() -> &'static str {
"models.PingBody"
}
#[inline]
pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self {
PingBody { _tab: table }
}
#[allow(unused_mut)]
pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>(
_fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>,
args: &'args PingBodyArgs<'args>,
) -> flatbuffers::WIPOffset<PingBody<'bldr>> {
let mut builder = PingBodyBuilder::new(_fbb);
if let Some(x) = args.payload {
builder.add_payload(x);
}
builder.finish()
}
#[inline]
pub fn payload(&self) -> Option<flatbuffers::Vector<'a, u8>> {
// Safety:
// Created from valid Table for this object
// which contains a valid value in this slot
unsafe {
self._tab
.get::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'a, u8>>>(PingBody::VT_PAYLOAD, None)
}
}
}
impl flatbuffers::Verifiable for PingBody<'_> {
#[inline]
fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>("payload", Self::VT_PAYLOAD, false)?
.finish();
Ok(())
}
}
pub struct PingBodyArgs<'a> {
pub payload: Option<flatbuffers::WIPOffset<flatbuffers::Vector<'a, u8>>>,
}
impl<'a> Default for PingBodyArgs<'a> {
#[inline]
fn default() -> Self {
PingBodyArgs { payload: None }
}
}
pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> {
fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>,
start_: flatbuffers::WIPOffset<flatbuffers::TableUnfinishedWIPOffset>,
}
impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> {
#[inline]
pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset<flatbuffers::Vector<'b, u8>>) {
self.fbb_
.push_slot_always::<flatbuffers::WIPOffset<_>>(PingBody::VT_PAYLOAD, payload);
}
#[inline]
pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> {
let start = _fbb.start_table();
PingBodyBuilder {
fbb_: _fbb,
start_: start,
}
}
#[inline]
pub fn finish(self) -> flatbuffers::WIPOffset<PingBody<'a>> {
let o = self.fbb_.end_table(self.start_);
flatbuffers::WIPOffset::new(o.value())
}
}
impl core::fmt::Debug for PingBody<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut ds = f.debug_struct("PingBody");
ds.field("payload", &self.payload());
ds.finish()
}
}
} // pub mod models

File diff suppressed because it is too large Load Diff

View File

@@ -204,12 +204,12 @@ message ListDirResponse {
message WalkDirRequest {
string disk = 1; // indicate which one in the disks
string walk_dir_options = 2;
bytes walk_dir_options = 2;
}
message WalkDirResponse {
bool success = 1;
repeated string meta_cache_entry = 2;
string meta_cache_entry = 2;
optional string error_info = 3;
}
@@ -757,7 +757,7 @@ service NodeService {
// rpc Append(AppendRequest) returns (AppendResponse) {};
rpc ReadAt(stream ReadAtRequest) returns (stream ReadAtResponse) {};
rpc ListDir(ListDirRequest) returns (ListDirResponse) {};
rpc WalkDir(WalkDirRequest) returns (WalkDirResponse) {};
rpc WalkDir(WalkDirRequest) returns (stream WalkDirResponse) {};
rpc RenameData(RenameDataRequest) returns (RenameDataResponse) {};
rpc MakeVolumes(MakeVolumesRequest) returns (MakeVolumesResponse) {};
rpc MakeVolume(MakeVolumeRequest) returns (MakeVolumeResponse) {};

View File

@@ -14,6 +14,7 @@ workspace = true
[dependencies]
ecstore.workspace = true
flatbuffers.workspace = true
futures.workspace = true
lazy_static.workspace = true
lock.workspace = true
protos.workspace = true

View File

@@ -1,6 +1,9 @@
#![cfg(test)]
use ecstore::disk::VolumeInfo;
use ecstore::disk::{MetaCacheEntry, VolumeInfo, WalkDirOptions};
use ecstore::metacache::writer::{MetacacheReader, MetacacheWriter};
use futures::future::join_all;
use protos::proto_gen::node_service::WalkDirRequest;
use protos::{
models::{PingBody, PingBodyBuilder},
node_service_time_out_client,
@@ -8,9 +11,14 @@ use protos::{
ListVolumesRequest, LocalStorageInfoRequest, MakeVolumeRequest, PingRequest, PingResponse, ReadAllRequest,
},
};
use rmp_serde::Deserializer;
use serde::Deserialize;
use rmp_serde::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use std::{error::Error, io::Cursor};
use tokio::io::AsyncWrite;
use tokio::spawn;
use tokio::sync::mpsc;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::tokio_stream::StreamExt;
use tonic::Request;
const CLUSTER_ADDR: &str = "http://localhost:9000";
@@ -88,6 +96,62 @@ async fn list_volumes() -> Result<(), Box<dyn Error>> {
Ok(())
}
#[tokio::test]
async fn walk_dir() -> Result<(), Box<dyn Error>> {
println!("walk_dir");
// TODO: use writer
let opts = WalkDirOptions {
bucket: "dandan".to_owned(),
base_dir: "".to_owned(),
recursive: true,
..Default::default()
};
let (rd, mut wr) = tokio::io::duplex(1024);
let mut buf = Vec::new();
opts.serialize(&mut Serializer::new(&mut buf))?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let request = Request::new(WalkDirRequest {
disk: "/home/dandan/code/rust/s3-rustfs/target/debug/data".to_string(),
walk_dir_options: buf,
});
let mut response = client.walk_dir(request).await?.into_inner();
let job1 = spawn(async move {
let mut out = MetacacheWriter::new(&mut wr);
loop {
match response.next().await {
Some(Ok(resp)) => {
if !resp.success {
println!("{}", resp.error_info.unwrap_or("".to_string()));
}
let entry = serde_json::from_str::<MetaCacheEntry>(&resp.meta_cache_entry)
.map_err(|e| ecstore::error::Error::from_string(format!("Unexpected response: {:?}", response)))
.unwrap();
out.write_obj(&entry).await.unwrap();
}
None => {
let _ = out.close().await;
break;
}
_ => {
println!("Unexpected response: {:?}", response);
let _ = out.close().await;
break;
}
}
}
});
let job2 = spawn(async move {
let mut reader = MetacacheReader::new(rd);
while let Ok(Some(entry)) = reader.peek().await {
println!("{:?}", entry);
}
});
join_all(vec![job1, job2]).await;
Ok(())
}
#[tokio::test]
async fn read_all() -> Result<(), Box<dyn Error>> {
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;

View File

@@ -7,9 +7,11 @@ use protos::{
CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest,
DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, NsScannerRequest,
ReadAllRequest, ReadMultipleRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequst,
StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WalkDirRequest, WriteAllRequest, WriteMetadataRequest,
},
};
use rmp_serde::Serializer;
use serde::Serialize;
use tokio::{
io::AsyncWrite,
sync::mpsc::{self, Sender},
@@ -34,6 +36,7 @@ use crate::{
},
store_api::{FileInfo, RawFileInfo},
};
use crate::{disk::MetaCacheEntry, metacache::writer::MetacacheWriter};
use protos::proto_gen::node_service::RenamePartRequst;
#[derive(Debug)]
@@ -351,32 +354,37 @@ impl DiskAPI for RemoteDisk {
}
// FIXME: TODO: use writer
async fn walk_dir<W: AsyncWrite + Unpin + Send>(&self, _opts: WalkDirOptions, _wr: &mut W) -> Result<()> {
async fn walk_dir<W: AsyncWrite + Unpin + Send>(&self, opts: WalkDirOptions, wr: &mut W) -> Result<()> {
info!("walk_dir");
// TODO: use writer
unimplemented!()
// let walk_dir_options = serde_json::to_string(&opts)?;
// let mut client = node_service_time_out_client(&self.addr)
// .await
// .map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
// let request = Request::new(WalkDirRequest {
// disk: self.endpoint.to_string(),
// walk_dir_options,
// });
let mut wr = wr;
let mut out = MetacacheWriter::new(&mut wr);
let mut buf = Vec::new();
opts.serialize(&mut Serializer::new(&mut buf))?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::from_string(format!("can not get client, err: {}", err)))?;
let request = Request::new(WalkDirRequest {
disk: self.endpoint.to_string(),
walk_dir_options: buf,
});
let mut response = client.walk_dir(request).await?.into_inner();
// let response = client.walk_dir(request).await?.into_inner();
loop {
match response.next().await {
Some(Ok(resp)) => {
if !resp.success {
return Err(Error::from_string(resp.error_info.unwrap_or("".to_string())));
}
let entry = serde_json::from_str::<MetaCacheEntry>(&resp.meta_cache_entry)
.map_err(|_| Error::from_string(format!("Unexpected response: {:?}", response)))?;
out.write_obj(&entry).await?;
}
None => break,
_ => return Err(Error::from_string(format!("Unexpected response: {:?}", response))),
}
}
// if !response.success {
// return Err(Error::from_string(response.error_info.unwrap_or("".to_string())));
// }
// let entries = response
// .meta_cache_entry
// .into_iter()
// .filter_map(|json_str| serde_json::from_str::<MetaCacheEntry>(&json_str).ok())
// .collect();
// Ok(entries)
Ok(())
}
async fn rename_data(

View File

@@ -24,9 +24,12 @@ use ecstore::{
store_api::{BucketOptions, DeleteBucketOptions, FileInfo, MakeBucketOptions, StorageAPI},
};
use futures::{Stream, StreamExt};
use futures_util::future::join_all;
use lock::{lock_args::LockArgs, Locker, GLOBAL_LOCAL_SERVER};
use common::globals::GLOBAL_Local_Node_Name;
use ecstore::disk::error::is_err_eof;
use ecstore::metacache::writer::MetacacheReader;
use madmin::health::{
get_cpus, get_mem_info, get_os_info, get_partitions, get_proc_info, get_sys_config, get_sys_errors, get_sys_services,
};
@@ -37,6 +40,7 @@ use protos::{
};
use rmp_serde::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use tokio::spawn;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, Streaming};
@@ -730,46 +734,74 @@ impl Node for NodeService {
}
}
async fn walk_dir(&self, request: Request<WalkDirRequest>) -> Result<Response<WalkDirResponse>, Status> {
// TODO: use writer
unimplemented!()
// let request = request.into_inner();
// if let Some(disk) = self.find_disk(&request.disk).await {
// let opts = match serde_json::from_str::<WalkDirOptions>(&request.walk_dir_options) {
// Ok(options) => options,
// Err(_) => {
// return Ok(tonic::Response::new(WalkDirResponse {
// success: false,
// meta_cache_entry: Vec::new(),
// error_info: Some("can not decode DeleteOptions".to_string()),
// }));
// }
// };
// match disk.walk_dir(opts, &mut ecstore::io::Writer::NotUse).await {
// Ok(entries) => {
// let entries = entries
// .into_iter()
// .filter_map(|entry| serde_json::to_string(&entry).ok())
// .collect();
// Ok(tonic::Response::new(WalkDirResponse {
// success: true,
// meta_cache_entry: entries,
// error_info: None,
// }))
// }
// Err(err) => Ok(tonic::Response::new(WalkDirResponse {
// success: false,
// meta_cache_entry: Vec::new(),
// error_info: Some(err.to_string()),
// })),
// }
// } else {
// Ok(tonic::Response::new(WalkDirResponse {
// success: false,
// meta_cache_entry: Vec::new(),
// error_info: Some("can not find disk".to_string()),
// }))
// }
type WalkDirStream = ResponseStream<WalkDirResponse>;
async fn walk_dir(&self, request: Request<WalkDirRequest>) -> Result<Response<Self::WalkDirStream>, Status> {
info!("walk_dir");
let request = request.into_inner();
let (tx, rx) = mpsc::channel(128);
if let Some(disk) = self.find_disk(&request.disk).await {
let mut buf = Deserializer::new(Cursor::new(request.walk_dir_options));
let opts = match Deserialize::deserialize(&mut buf) {
Ok(options) => options,
Err(_) => {
return Err(Status::invalid_argument("invalid WalkDirOptions"));
}
};
spawn(async {
let (rd, mut wr) = tokio::io::duplex(64);
let job1 = spawn(async move {
if let Err(err) = disk.walk_dir(opts, &mut wr).await {
println!("walk_dir err {:?}", err);
}
});
let job2 = spawn(async move {
let mut reader = MetacacheReader::new(rd);
loop {
match reader.peek().await {
Ok(res) => {
if let Some(info) = res {
match serde_json::to_string(&info) {
Ok(meta_cache_entry) => tx
.send(Ok(WalkDirResponse {
success: true,
meta_cache_entry,
error_info: None,
}))
.await
.expect("working rx"),
Err(e) => tx
.send(Ok(WalkDirResponse {
success: false,
meta_cache_entry: "".to_string(),
error_info: Some(e.to_string()),
}))
.await
.expect("working rx"),
}
} else {
break;
}
}
Err(err) => {
if is_err_eof(&err) {
break;
}
println!("get err {:?}", err);
break;
}
}
}
});
join_all(vec![job1, job2]).await;
});
} else {
return Err(Status::invalid_argument(format!("invalid disk, all disk: {:?}", self.all_disk().await)));
}
let out_stream = ReceiverStream::new(rx);
Ok(tonic::Response::new(Box::pin(out_stream)))
}
async fn rename_data(&self, request: Request<RenameDataRequest>) -> Result<Response<RenameDataResponse>, Status> {