diff --git a/Cargo.lock b/Cargo.lock index f283234a..df15b64e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -407,8 +407,10 @@ dependencies = [ name = "e2e_test" version = "0.0.1" dependencies = [ + "ecstore", "flatbuffers", "protos", + "serde_json", "tokio", "tonic", ] @@ -1550,6 +1552,7 @@ dependencies = [ "protobuf", "protos", "s3s", + "serde_json", "time", "tokio", "tonic", diff --git a/common/protos/src/generated/proto_gen/node_service.rs b/common/protos/src/generated/proto_gen/node_service.rs index 55b5b01a..13e968bf 100644 --- a/common/protos/src/generated/proto_gen/node_service.rs +++ b/common/protos/src/generated/proto_gen/node_service.rs @@ -38,6 +38,377 @@ pub struct MakeBucketResponse { #[prost(string, optional, tag = "2")] pub error_info: ::core::option::Option<::prost::alloc::string::String>, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadAllRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub path: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadAllResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(bytes = "vec", tag = "2")] + pub data: ::prost::alloc::vec::Vec, + #[prost(string, optional, tag = "3")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct WriteAllRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub path: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "4")] + pub data: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct WriteAllResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, optional, tag = "2")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DeleteRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub path: ::prost::alloc::string::String, + #[prost(string, tag = "4")] + pub options: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DeleteResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, optional, tag = "2")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RenameFileRequst { + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub src_volume: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub src_path: ::prost::alloc::string::String, + #[prost(string, tag = "4")] + pub dst_volume: ::prost::alloc::string::String, + #[prost(string, tag = "5")] + pub dst_path: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RenameFileResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, optional, tag = "2")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct WriteRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub path: ::prost::alloc::string::String, + #[prost(bool, tag = "4")] + pub is_append: bool, + #[prost(bytes = "vec", tag = "5")] + pub data: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct WriteResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, optional, tag = "2")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadAtRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub path: ::prost::alloc::string::String, + #[prost(int64, tag = "4")] + pub offset: i64, + #[prost(int64, tag = "5")] + pub length: i64, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadAtResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(bytes = "vec", tag = "2")] + pub data: ::prost::alloc::vec::Vec, + #[prost(int64, tag = "3")] + pub read_size: i64, + #[prost(string, optional, tag = "4")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListDirRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListDirResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, repeated, tag = "2")] + pub volumes: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, optional, tag = "3")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct WalkDirRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub walk_dir_options: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct WalkDirResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, repeated, tag = "2")] + pub meta_cache_entry: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, optional, tag = "3")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RenameDataRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub src_volume: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub src_path: ::prost::alloc::string::String, + #[prost(string, tag = "4")] + pub file_info: ::prost::alloc::string::String, + #[prost(string, tag = "5")] + pub dst_volume: ::prost::alloc::string::String, + #[prost(string, tag = "6")] + pub dst_path: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RenameDataResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, tag = "2")] + pub rename_data_resp: ::prost::alloc::string::String, + #[prost(string, optional, tag = "3")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MakeVolumesRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, repeated, tag = "2")] + pub volumes: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MakeVolumesResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, optional, tag = "2")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MakeVolumeRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MakeVolumeResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, optional, tag = "2")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListVolumesRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListVolumesResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, repeated, tag = "2")] + pub volume_infos: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, optional, tag = "3")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct StatVolumeRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct StatVolumeResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, tag = "2")] + pub volume_info: ::prost::alloc::string::String, + #[prost(string, optional, tag = "3")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct WriteMetadataRequest { + /// indicate which one in the disks + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub path: ::prost::alloc::string::String, + #[prost(string, tag = "4")] + pub file_info: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct WriteMetadataResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, optional, tag = "2")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadVersionRequest { + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub path: ::prost::alloc::string::String, + #[prost(string, tag = "4")] + pub version_id: ::prost::alloc::string::String, + #[prost(string, tag = "5")] + pub opts: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadVersionResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, tag = "2")] + pub file_info: ::prost::alloc::string::String, + #[prost(string, optional, tag = "3")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadXlRequest { + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub path: ::prost::alloc::string::String, + #[prost(bool, tag = "4")] + pub read_data: bool, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadXlResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, tag = "2")] + pub raw_file_info: ::prost::alloc::string::String, + #[prost(string, optional, tag = "3")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadMultipleRequest { + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub read_multiple_req: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadMultipleResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, repeated, tag = "2")] + pub read_multiple_resps: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, optional, tag = "3")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DeleteVolumeRequest { + #[prost(string, tag = "1")] + pub disk: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub volume: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DeleteVolumeResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, optional, tag = "2")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} /// Generated client implementations. pub mod node_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] @@ -116,6 +487,7 @@ pub mod node_service_client { self.inner = self.inner.max_encoding_message_size(limit); self } + /// -------------------------------meta service-------------------------- pub async fn ping( &mut self, request: impl tonic::IntoRequest, @@ -146,6 +518,277 @@ pub mod node_service_client { .insert(GrpcMethod::new("node_service.NodeService", "MakeBucket")); self.inner.unary(req, path, codec).await } + pub async fn read_all( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadAll"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "ReadAll")); + self.inner.unary(req, path, codec).await + } + pub async fn write_all( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/WriteAll"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "WriteAll")); + self.inner.unary(req, path, codec).await + } + pub async fn delete( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Delete"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "Delete")); + self.inner.unary(req, path, codec).await + } + pub async fn rename_file( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/RenameFile"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "RenameFile")); + self.inner.unary(req, path, codec).await + } + pub async fn write( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Write"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "Write")); + self.inner.unary(req, path, codec).await + } + /// rpc Append(AppendRequest) returns (AppendResponse) {}; + pub async fn read_at( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadAt"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "ReadAt")); + self.inner.unary(req, path, codec).await + } + pub async fn list_dir( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ListDir"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "ListDir")); + self.inner.unary(req, path, codec).await + } + pub async fn walk_dir( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/WalkDir"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "WalkDir")); + self.inner.unary(req, path, codec).await + } + pub async fn rename_data( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/RenameData"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "RenameData")); + self.inner.unary(req, path, codec).await + } + pub async fn make_volumes( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/MakeVolumes"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "MakeVolumes")); + self.inner.unary(req, path, codec).await + } + pub async fn make_volume( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/MakeVolume"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "MakeVolume")); + self.inner.unary(req, path, codec).await + } + pub async fn list_volumes( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ListVolumes"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "ListVolumes")); + self.inner.unary(req, path, codec).await + } + pub async fn stat_volume( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/StatVolume"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "StatVolume")); + self.inner.unary(req, path, codec).await + } + pub async fn write_metadata( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/WriteMetadata"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "WriteMetadata")); + self.inner.unary(req, path, codec).await + } + pub async fn read_version( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadVersion"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "ReadVersion")); + self.inner.unary(req, path, codec).await + } + pub async fn read_xl( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadXL"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "ReadXL")); + self.inner.unary(req, path, codec).await + } + pub async fn read_multiple( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadMultiple"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "ReadMultiple")); + self.inner.unary(req, path, codec).await + } + pub async fn delete_volume( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/DeleteVolume"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "DeleteVolume")); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -155,6 +798,7 @@ pub mod node_service_server { /// Generated trait containing gRPC methods that should be implemented for use with NodeServiceServer. #[async_trait] pub trait NodeService: Send + Sync + 'static { + /// -------------------------------meta service-------------------------- async fn ping( &self, request: tonic::Request, @@ -163,6 +807,79 @@ pub mod node_service_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn read_all( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn write_all( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn delete( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn rename_file( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn write( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// rpc Append(AppendRequest) returns (AppendResponse) {}; + async fn read_at( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn list_dir( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn walk_dir( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn rename_data( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn make_volumes( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn make_volume( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn list_volumes( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn stat_volume( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn write_metadata( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn read_version( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn read_xl( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn read_multiple( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn delete_volume( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct NodeServiceServer { @@ -290,6 +1007,510 @@ pub mod node_service_server { }; Box::pin(fut) } + "/node_service.NodeService/ReadAll" => { + #[allow(non_camel_case_types)] + struct ReadAllSvc(pub Arc); + impl tonic::server::UnaryService for ReadAllSvc { + type Response = super::ReadAllResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::read_all(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ReadAllSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/WriteAll" => { + #[allow(non_camel_case_types)] + struct WriteAllSvc(pub Arc); + impl tonic::server::UnaryService for WriteAllSvc { + type Response = super::WriteAllResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::write_all(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = WriteAllSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/Delete" => { + #[allow(non_camel_case_types)] + struct DeleteSvc(pub Arc); + impl tonic::server::UnaryService for DeleteSvc { + type Response = super::DeleteResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::delete(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = DeleteSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/RenameFile" => { + #[allow(non_camel_case_types)] + struct RenameFileSvc(pub Arc); + impl tonic::server::UnaryService for RenameFileSvc { + type Response = super::RenameFileResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::rename_file(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = RenameFileSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/Write" => { + #[allow(non_camel_case_types)] + struct WriteSvc(pub Arc); + impl tonic::server::UnaryService for WriteSvc { + type Response = super::WriteResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::write(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = WriteSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/ReadAt" => { + #[allow(non_camel_case_types)] + struct ReadAtSvc(pub Arc); + impl tonic::server::UnaryService for ReadAtSvc { + type Response = super::ReadAtResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::read_at(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ReadAtSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/ListDir" => { + #[allow(non_camel_case_types)] + struct ListDirSvc(pub Arc); + impl tonic::server::UnaryService for ListDirSvc { + type Response = super::ListDirResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::list_dir(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ListDirSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/WalkDir" => { + #[allow(non_camel_case_types)] + struct WalkDirSvc(pub Arc); + impl tonic::server::UnaryService for WalkDirSvc { + type Response = super::WalkDirResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::walk_dir(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = WalkDirSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/RenameData" => { + #[allow(non_camel_case_types)] + struct RenameDataSvc(pub Arc); + impl tonic::server::UnaryService for RenameDataSvc { + type Response = super::RenameDataResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::rename_data(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = RenameDataSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/MakeVolumes" => { + #[allow(non_camel_case_types)] + struct MakeVolumesSvc(pub Arc); + impl tonic::server::UnaryService for MakeVolumesSvc { + type Response = super::MakeVolumesResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::make_volumes(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = MakeVolumesSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/MakeVolume" => { + #[allow(non_camel_case_types)] + struct MakeVolumeSvc(pub Arc); + impl tonic::server::UnaryService for MakeVolumeSvc { + type Response = super::MakeVolumeResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::make_volume(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = MakeVolumeSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/ListVolumes" => { + #[allow(non_camel_case_types)] + struct ListVolumesSvc(pub Arc); + impl tonic::server::UnaryService for ListVolumesSvc { + type Response = super::ListVolumesResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::list_volumes(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ListVolumesSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/StatVolume" => { + #[allow(non_camel_case_types)] + struct StatVolumeSvc(pub Arc); + impl tonic::server::UnaryService for StatVolumeSvc { + type Response = super::StatVolumeResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::stat_volume(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = StatVolumeSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/WriteMetadata" => { + #[allow(non_camel_case_types)] + struct WriteMetadataSvc(pub Arc); + impl tonic::server::UnaryService for WriteMetadataSvc { + type Response = super::WriteMetadataResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::write_metadata(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = WriteMetadataSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/ReadVersion" => { + #[allow(non_camel_case_types)] + struct ReadVersionSvc(pub Arc); + impl tonic::server::UnaryService for ReadVersionSvc { + type Response = super::ReadVersionResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::read_version(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ReadVersionSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/ReadXL" => { + #[allow(non_camel_case_types)] + struct ReadXLSvc(pub Arc); + impl tonic::server::UnaryService for ReadXLSvc { + type Response = super::ReadXlResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::read_xl(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ReadXLSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/ReadMultiple" => { + #[allow(non_camel_case_types)] + struct ReadMultipleSvc(pub Arc); + impl tonic::server::UnaryService for ReadMultipleSvc { + type Response = super::ReadMultipleResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::read_multiple(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ReadMultipleSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/DeleteVolume" => { + #[allow(non_camel_case_types)] + struct DeleteVolumeSvc(pub Arc); + impl tonic::server::UnaryService for DeleteVolumeSvc { + type Response = super::DeleteVolumeResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::delete_volume(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = DeleteVolumeSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => Box::pin(async move { Ok(http::Response::builder() .status(200) diff --git a/common/protos/src/node.proto b/common/protos/src/node.proto index c3d6cd29..cc3ed1be 100644 --- a/common/protos/src/node.proto +++ b/common/protos/src/node.proto @@ -26,9 +26,259 @@ message MakeBucketResponse { optional string error_info = 2; } +message ReadAllRequest { + string disk = 1; // indicate which one in the disks + string volume = 2; + string path = 3; +} + +message ReadAllResponse { + bool success = 1; + bytes data = 2; + optional string error_info = 3; +} + +message WriteAllRequest { + string disk = 1; // indicate which one in the disks + string volume = 2; + string path = 3; + bytes data = 4; +} + +message WriteAllResponse { + bool success = 1; + optional string error_info = 2; +} + +message DeleteRequest { + string disk = 1; // indicate which one in the disks + string volume = 2; + string path = 3; + string options = 4; +} + +message DeleteResponse { + bool success = 1; + optional string error_info = 2; +} + +message RenameFileRequst { + string disk = 1; + string src_volume = 2; + string src_path = 3; + string dst_volume = 4; + string dst_path = 5; +} + +message RenameFileResponse { + bool success = 1; + optional string error_info = 2; +} + +message WriteRequest { + string disk = 1; // indicate which one in the disks + string volume = 2; + string path = 3; + bool is_append = 4; + bytes data = 5; +} + +message WriteResponse { + bool success = 1; + optional string error_info = 2; +} + +// message AppendRequest { +// string disk = 1; // indicate which one in the disks +// string volume = 2; +// string path = 3; +// bytes data = 4; +// } +// +// message AppendResponse { +// bool success = 1; +// optional string error_info = 2; +// } + +message ReadAtRequest { + string disk = 1; // indicate which one in the disks + string volume = 2; + string path = 3; + int64 offset = 4; + int64 length = 5; +} + +message ReadAtResponse { + bool success = 1; + bytes data = 2; + int64 read_size = 3; + optional string error_info = 4; +} + +message ListDirRequest { + string disk = 1; // indicate which one in the disks + string volume = 2; +} + +message ListDirResponse { + bool success = 1; + repeated string volumes = 2; + optional string error_info = 3; +} + +message WalkDirRequest { + string disk = 1; // indicate which one in the disks + string walk_dir_options = 2; +} + +message WalkDirResponse { + bool success = 1; + repeated string meta_cache_entry = 2; + optional string error_info = 3; +} + +message RenameDataRequest { + string disk = 1; // indicate which one in the disks + string src_volume = 2; + string src_path = 3; + string file_info = 4; + string dst_volume = 5; + string dst_path = 6; +} + +message RenameDataResponse { + bool success = 1; + string rename_data_resp = 2; + optional string error_info = 3; +} + +message MakeVolumesRequest { + string disk = 1; // indicate which one in the disks + repeated string volumes = 2; +} + +message MakeVolumesResponse { + bool success = 1; + optional string error_info = 2; +} + +message MakeVolumeRequest { + string disk = 1; // indicate which one in the disks + string volume = 2; +} + +message MakeVolumeResponse { + bool success = 1; + optional string error_info = 2; +} + +message ListVolumesRequest { + string disk = 1; // indicate which one in the disks +} + +message ListVolumesResponse { + bool success = 1; + repeated string volume_infos = 2; + optional string error_info = 3; +} + +message StatVolumeRequest { + string disk = 1; // indicate which one in the disks + string volume = 2; +} + +message StatVolumeResponse { + bool success = 1; + string volume_info = 2; + optional string error_info = 3; +} + +message WriteMetadataRequest { + string disk = 1; // indicate which one in the disks + string volume = 2; + string path = 3; + string file_info = 4; +} + +message WriteMetadataResponse { + bool success = 1; + optional string error_info = 2; +} + +message ReadVersionRequest { + string disk = 1; + string volume = 2; + string path = 3; + string version_id = 4; + string opts = 5; +} + +message ReadVersionResponse { + bool success = 1; + string file_info = 2; + optional string error_info = 3; +} + +message ReadXLRequest { + string disk = 1; + string volume = 2; + string path = 3; + bool read_data = 4; +} + +message ReadXLResponse { + bool success = 1; + string raw_file_info = 2; + optional string error_info = 3; +} + +message ReadMultipleRequest { + string disk = 1; + string read_multiple_req = 2; +} + +message ReadMultipleResponse { + bool success = 1; + repeated string read_multiple_resps = 2; + optional string error_info = 3; +} + +message DeleteVolumeRequest { + string disk = 1; + string volume = 2; +} + +message DeleteVolumeResponse { + bool success = 1; + optional string error_info = 2; +} + /* -------------------------------------------------------------------- */ service NodeService { +/* -------------------------------meta service-------------------------- */ rpc Ping(PingRequest) returns (PingResponse) {}; rpc MakeBucket(MakeBucketRequest) returns (MakeBucketResponse) {}; -} \ No newline at end of file + +/* -------------------------------disk service-------------------------- */ + + rpc ReadAll(ReadAllRequest) returns (ReadAllResponse) {}; + rpc WriteAll(WriteAllRequest) returns (WriteAllResponse) {}; + rpc Delete(DeleteRequest) returns (DeleteResponse) {}; + rpc RenameFile(RenameFileRequst) returns (RenameFileResponse) {}; + rpc Write(WriteRequest) returns (WriteResponse) {}; +// rpc Append(AppendRequest) returns (AppendResponse) {}; + rpc ReadAt(ReadAtRequest) returns (ReadAtResponse) {}; + rpc ListDir(ListDirRequest) returns (ListDirResponse) {}; + rpc WalkDir(WalkDirRequest) returns (WalkDirResponse) {}; + rpc RenameData(RenameDataRequest) returns (RenameDataResponse) {}; + rpc MakeVolumes(MakeVolumesRequest) returns (MakeVolumesResponse) {}; + rpc MakeVolume(MakeVolumeRequest) returns (MakeVolumeResponse) {}; + rpc ListVolumes(ListVolumesRequest) returns (ListVolumesResponse) {}; + rpc StatVolume(StatVolumeRequest) returns (StatVolumeResponse) {}; + rpc WriteMetadata(WriteMetadataRequest) returns (WriteMetadataResponse) {}; + rpc ReadVersion(ReadVersionRequest) returns (ReadVersionResponse) {}; + rpc ReadXL(ReadXLRequest) returns (ReadXLResponse) {}; + rpc ReadMultiple(ReadMultipleRequest) returns (ReadMultipleResponse) {}; + rpc DeleteVolume(DeleteVolumeRequest) returns (DeleteVolumeResponse) {}; +} diff --git a/e2e_test/Cargo.toml b/e2e_test/Cargo.toml index a1e30a55..90f99158 100644 --- a/e2e_test/Cargo.toml +++ b/e2e_test/Cargo.toml @@ -9,7 +9,9 @@ rust-version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +ecstore.workspace = true flatbuffers.workspace = true protos.workspace = true +serde_json.workspace = true tonic = { version = "0.12.1", features = ["gzip"] } tokio = { workspace = true } \ No newline at end of file diff --git a/e2e_test/src/reliant/node_interact_test.rs b/e2e_test/src/reliant/node_interact_test.rs index b2f97e13..ca6930e8 100644 --- a/e2e_test/src/reliant/node_interact_test.rs +++ b/e2e_test/src/reliant/node_interact_test.rs @@ -1,14 +1,21 @@ #![cfg(test)] +use ecstore::disk::VolumeInfo; use protos::{ models::{PingBody, PingBodyBuilder}, - proto_gen::node_service::{node_service_client::NodeServiceClient, PingRequest, PingResponse}, + proto_gen::node_service::{ + node_service_client::NodeServiceClient, ListVolumesRequest, MakeVolumeRequest, PingRequest, PingResponse, + }, }; use std::error::Error; use tonic::Request; +async fn get_client() -> Result, Box> { + Ok(NodeServiceClient::connect("http://localhost:9000").await?) +} + #[tokio::test] -async fn main() -> Result<(), Box> { +async fn ping() -> Result<(), Box> { let mut fbb = flatbuffers::FlatBufferBuilder::new(); let payload = fbb.create_vector(b"hello world"); @@ -44,3 +51,38 @@ async fn main() -> Result<(), Box> { Ok(()) } + +#[tokio::test] +async fn make_volume() -> Result<(), Box> { + let mut client = get_client().await?; + let request = Request::new(MakeVolumeRequest { + disk: "data".to_string(), + volume: "dandan".to_string(), + }); + + let response = client.make_volume(request).await?.into_inner(); + if response.success { + println!("success"); + } else { + println!("failed: {:?}", response.error_info); + } + Ok(()) +} + +#[tokio::test] +async fn list_volumes() -> Result<(), Box> { + let mut client = get_client().await?; + let request = Request::new(ListVolumesRequest { + disk: "data".to_string(), + }); + + let response = client.list_volumes(request).await?.into_inner(); + let volume_infos: Vec = response + .volume_infos + .into_iter() + .filter_map(|json_str| serde_json::from_str::(&json_str).ok()) + .collect(); + + println!("{:?}", volume_infos); + Ok(()) +} diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 9de369ac..aae55759 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -3,7 +3,7 @@ use super::{ DeleteOptions, DiskAPI, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, VolumeInfo, WalkDirOptions, }; -use crate::disk::STORAGE_FORMAT_FILE; +use crate::disk::{LocalFileReader, LocalFileWriter, STORAGE_FORMAT_FILE}; use crate::{ error::{Error, Result}, file_meta::FileMeta, @@ -344,6 +344,10 @@ impl DiskAPI for LocalDisk { self.id } + fn path(&self) -> PathBuf { + self.root.clone() + } + #[must_use] async fn read_all(&self, volume: &str, path: &str) -> Result { let p = self.get_object_path(volume, path)?; @@ -443,7 +447,8 @@ impl DiskAPI for LocalDisk { let file = File::create(&fpath).await?; - Ok(FileWriter::new(file)) + Ok(FileWriter::Local(LocalFileWriter::new(file))) + // Ok(FileWriter::new(file)) // let mut writer = BufWriter::new(file); @@ -469,7 +474,8 @@ impl DiskAPI for LocalDisk { .open(&p) .await?; - Ok(FileWriter::new(file)) + Ok(FileWriter::Local(LocalFileWriter::new(file))) + // Ok(FileWriter::new(file)) // let mut writer = BufWriter::new(file); @@ -486,7 +492,7 @@ impl DiskAPI for LocalDisk { debug!("read_file {:?}", &p); let file = File::options().read(true).open(&p).await?; - Ok(FileReader::new(file)) + Ok(FileReader::Local(LocalFileReader::new(file))) // file.seek(SeekFrom::Start(offset as u64)).await?; @@ -683,9 +689,7 @@ impl DiskAPI for LocalDisk { .await?; } - Ok(RenameDataResp { - old_data_dir: old_data_dir, - }) + Ok(RenameDataResp { old_data_dir }) } async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> { diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index b694f99b..2fc1cf3c 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -2,6 +2,7 @@ pub mod endpoint; pub mod error; pub mod format; mod local; +mod remote; pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys"; pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart"; @@ -12,18 +13,22 @@ pub const FORMAT_CONFIG_FILE: &str = "format.json"; const STORAGE_FORMAT_FILE: &str = "xl.meta"; use crate::{ - erasure::ReadAt, + erasure::{ReadAt, Write}, error::Result, file_meta::FileMeta, store_api::{FileInfo, RawFileInfo}, }; use bytes::Bytes; -use std::{fmt::Debug, io::SeekFrom, pin::Pin, sync::Arc}; +use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, ReadAtRequest, WriteRequest}; +use serde::{Deserialize, Serialize}; +use std::{fmt::Debug, io::SeekFrom, path::PathBuf, sync::Arc}; use time::OffsetDateTime; use tokio::{ fs::File, - io::{AsyncReadExt, AsyncSeekExt, AsyncWrite}, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, }; +use tonic::{transport::Channel, Request}; +use tower::timeout::Timeout; use uuid::Uuid; pub type DiskStore = Arc>; @@ -33,8 +38,8 @@ pub async fn new_disk(ep: &endpoint::Endpoint, opt: &DiskOption) -> Result Result bool; fn id(&self) -> Uuid; + fn path(&self) -> PathBuf; async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()>; async fn read_all(&self, volume: &str, path: &str) -> Result; @@ -82,7 +88,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { async fn read_multiple(&self, req: ReadMultipleReq) -> Result>; } -#[derive(Debug, Default, Clone)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct WalkDirOptions { // Bucket to scanner pub bucket: String, @@ -109,7 +115,7 @@ pub struct WalkDirOptions { pub disk_id: String, } -#[derive(Debug, Default)] +#[derive(Debug, Default, Serialize, Deserialize)] pub struct MetaCacheEntry { // name is the full name of the object including prefixes pub name: String, @@ -184,17 +190,18 @@ pub struct DiskOption { pub health_check: bool, } +#[derive(Serialize, Deserialize)] pub struct RenameDataResp { pub old_data_dir: Option, } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct DeleteOptions { pub recursive: bool, pub immediate: bool, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ReadMultipleReq { pub bucket: String, pub prefix: String, @@ -205,7 +212,7 @@ pub struct ReadMultipleReq { pub max_results: usize, } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct ReadMultipleResp { pub bucket: String, pub prefix: String, @@ -230,65 +237,160 @@ pub struct ReadMultipleResp { // } // } +#[derive(Debug, Deserialize, Serialize)] pub struct VolumeInfo { pub name: String, pub created: Option, } +#[derive(Deserialize, Serialize)] pub struct ReadOptions { pub read_data: bool, pub healing: bool, } -pub struct FileWriter { - pub inner: Pin>, +// pub struct FileWriter { +// pub inner: Pin>, +// } + +// impl AsyncWrite for FileWriter { +// fn poll_write( +// mut self: Pin<&mut Self>, +// cx: &mut std::task::Context<'_>, +// buf: &[u8], +// ) -> std::task::Poll> { +// Pin::new(&mut self.inner).poll_write(cx, buf) +// } + +// fn poll_flush( +// mut self: Pin<&mut Self>, +// cx: &mut std::task::Context<'_>, +// ) -> std::task::Poll> { +// Pin::new(&mut self.inner).poll_flush(cx) +// } + +// fn poll_shutdown( +// mut self: Pin<&mut Self>, +// cx: &mut std::task::Context<'_>, +// ) -> std::task::Poll> { +// Pin::new(&mut self.inner).poll_shutdown(cx) +// } +// } + +// impl FileWriter { +// pub fn new(inner: W) -> Self +// where +// W: AsyncWrite + Send + Sync + 'static, +// { +// Self { inner: Box::pin(inner) } +// } +// } + +pub enum FileWriter { + Local(LocalFileWriter), + Remote(RemoteFileWriter), } -impl AsyncWrite for FileWriter { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - Pin::new(&mut self.inner).poll_write(cx, buf) - } - - fn poll_flush( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.inner).poll_flush(cx) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.inner).poll_shutdown(cx) +#[async_trait::async_trait] +impl Write for FileWriter { + async fn write(&mut self, buf: &[u8]) -> Result<()> { + match self { + Self::Local(local_file_writer) => local_file_writer.write(buf).await, + Self::Remote(remote_file_writer) => remote_file_writer.write(buf).await, + } } } -impl FileWriter { - pub fn new(inner: W) -> Self - where - W: AsyncWrite + Send + Sync + 'static, - { - Self { inner: Box::pin(inner) } - } -} - -#[derive(Debug)] -pub struct FileReader { +pub struct LocalFileWriter { pub inner: File, } -impl FileReader { +impl LocalFileWriter { pub fn new(inner: File) -> Self { Self { inner } } } +#[async_trait::async_trait] +impl Write for LocalFileWriter { + async fn write(&mut self, buf: &[u8]) -> Result<()> { + self.inner.write(buf).await?; + self.inner.flush().await?; + + Ok(()) + } +} + +pub struct RemoteFileWriter { + pub root: PathBuf, + pub volume: String, + pub path: String, + pub is_append: bool, + client: NodeServiceClient>, +} + +impl RemoteFileWriter { + pub fn new( + root: PathBuf, + volume: String, + path: String, + is_append: bool, + client: NodeServiceClient>, + ) -> Self { + Self { + root, + volume, + path, + is_append, + client, + } + } +} + +#[async_trait::async_trait] +impl Write for RemoteFileWriter { + async fn write(&mut self, buf: &[u8]) -> Result<()> { + let request = Request::new(WriteRequest { + disk: self.root.to_string_lossy().to_string(), + volume: self.volume.to_string(), + path: self.path.to_string(), + is_append: self.is_append, + data: buf.to_vec(), + }); + let _response = self.client.write(request).await?.into_inner(); + Ok(()) + } +} + +#[derive(Debug)] +pub enum FileReader { + Local(LocalFileReader), + Remote(RemoteFileReader), +} + +#[async_trait::async_trait] impl ReadAt for FileReader { + async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)> { + match self { + Self::Local(local_file_writer) => local_file_writer.read_at(offset, length).await, + Self::Remote(remote_file_writer) => remote_file_writer.read_at(offset, length).await, + } + } +} + +#[derive(Debug)] +pub struct LocalFileReader { + pub inner: File, +} + +impl LocalFileReader { + pub fn new(inner: File) -> Self { + Self { inner } + } +} + +#[async_trait::async_trait] +impl ReadAt for LocalFileReader { async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)> { self.inner.seek(SeekFrom::Start(offset as u64)).await?; @@ -301,3 +403,38 @@ impl ReadAt for FileReader { Ok((buffer, bytes_read)) } } + +#[derive(Debug)] +pub struct RemoteFileReader { + pub root: PathBuf, + pub volume: String, + pub path: String, + client: NodeServiceClient>, +} + +impl RemoteFileReader { + pub fn new(root: PathBuf, volume: String, path: String, client: NodeServiceClient>) -> Self { + Self { + root, + volume, + path, + client, + } + } +} + +#[async_trait::async_trait] +impl ReadAt for RemoteFileReader { + async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)> { + let request = Request::new(ReadAtRequest { + disk: self.root.to_string_lossy().to_string(), + volume: self.volume.to_string(), + path: self.path.to_string(), + offset: offset.try_into().unwrap(), + length: length.try_into().unwrap(), + }); + let response = self.client.read_at(request).await?.into_inner(); + + Ok((response.data, response.read_size.try_into().unwrap())) + } +} diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs new file mode 100644 index 00000000..d1803d21 --- /dev/null +++ b/ecstore/src/disk/remote.rs @@ -0,0 +1,356 @@ +use std::{path::PathBuf, time::Duration}; + +use bytes::Bytes; +use protos::{ + node_service_time_out_client, + proto_gen::node_service::{ + node_service_client::NodeServiceClient, DeleteRequest, DeleteVolumeRequest, ListDirRequest, ListVolumesRequest, + MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest, ReadVersionRequest, ReadXlRequest, + RenameDataRequest, RenameFileRequst, StatVolumeRequest, WalkDirRequest, WriteAllRequest, WriteMetadataRequest, + }, + DEFAULT_GRPC_SERVER_MESSAGE_LEN, +}; +use tokio::fs; +use tonic::{ + transport::{Channel, Endpoint as tonic_Endpoint}, + Request, +}; +use tower::timeout::Timeout; +use uuid::Uuid; + +use crate::{ + error::Result, + store_api::{FileInfo, RawFileInfo}, +}; + +use super::{ + endpoint::Endpoint, DeleteOptions, DiskAPI, DiskOption, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, + ReadMultipleResp, ReadOptions, RemoteFileReader, RemoteFileWriter, RenameDataResp, VolumeInfo, WalkDirOptions, +}; + +#[derive(Debug)] +pub struct RemoteDisk { + channel: Channel, + pub root: PathBuf, +} + +impl RemoteDisk { + pub async fn new(ep: &Endpoint, _opt: &DiskOption) -> Result { + let connector = tonic_Endpoint::from_shared(format!( + "{}://{}{}", + ep.url.scheme(), + ep.url.host_str().unwrap(), + ep.url.port().unwrap() + )) + .unwrap(); + let channel = tokio::runtime::Runtime::new().unwrap().block_on(connector.connect()).unwrap(); + + let root = fs::canonicalize(ep.url.path()).await?; + + Ok(Self { channel, root }) + } + + fn get_client(&self) -> NodeServiceClient> { + node_service_time_out_client( + self.channel.clone(), + Duration::new(30, 0), // TODO: use config setting + DEFAULT_GRPC_SERVER_MESSAGE_LEN, + // grpc_enable_gzip, + false, // TODO: use config setting + ) + } +} + +// TODO: all api need to handle errors +#[async_trait::async_trait] +impl DiskAPI for RemoteDisk { + fn is_local(&self) -> bool { + false + } + + fn id(&self) -> Uuid { + Uuid::nil() + } + + fn path(&self) -> PathBuf { + self.root.clone() + } + + async fn read_all(&self, volume: &str, path: &str) -> Result { + let mut client = self.get_client(); + let request = Request::new(ReadAllRequest { + disk: self.root.to_string_lossy().to_string(), + volume: volume.to_string(), + path: path.to_string(), + }); + + let response = client.read_all(request).await?.into_inner(); + + Ok(Bytes::from(response.data)) + } + + async fn write_all(&self, volume: &str, path: &str, data: Vec) -> Result<()> { + let mut client = self.get_client(); + let request = Request::new(WriteAllRequest { + disk: self.root.to_string_lossy().to_string(), + volume: volume.to_string(), + path: path.to_string(), + data, + }); + + let _response = client.write_all(request).await?.into_inner(); + + Ok(()) + } + + async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> { + let options = serde_json::to_string(&opt)?; + let mut client = self.get_client(); + let request = Request::new(DeleteRequest { + disk: self.root.to_string_lossy().to_string(), + volume: volume.to_string(), + path: path.to_string(), + options, + }); + + let _response = client.delete(request).await?.into_inner(); + + Ok(()) + } + + async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> { + let mut client = self.get_client(); + let request = Request::new(RenameFileRequst { + disk: self.root.to_string_lossy().to_string(), + src_volume: src_volume.to_string(), + src_path: src_path.to_string(), + dst_volume: dst_volume.to_string(), + dst_path: dst_path.to_string(), + }); + + let _response = client.rename_file(request).await?.into_inner(); + + Ok(()) + } + + async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, _file_size: usize) -> Result { + Ok(FileWriter::Remote(RemoteFileWriter::new( + self.root.clone(), + volume.to_string(), + path.to_string(), + false, + self.get_client(), + ))) + } + + async fn append_file(&self, volume: &str, path: &str) -> Result { + Ok(FileWriter::Remote(RemoteFileWriter::new( + self.root.clone(), + volume.to_string(), + path.to_string(), + true, + self.get_client(), + ))) + } + + async fn read_file(&self, volume: &str, path: &str) -> Result { + Ok(FileReader::Remote(RemoteFileReader::new( + self.root.clone(), + volume.to_string(), + path.to_string(), + self.get_client(), + ))) + } + + async fn list_dir(&self, _origvolume: &str, volume: &str, _dir_path: &str, _count: i32) -> Result> { + let mut client = self.get_client(); + let request = Request::new(ListDirRequest { + disk: self.root.to_string_lossy().to_string(), + volume: volume.to_string(), + }); + + let response = client.list_dir(request).await?.into_inner(); + + Ok(response.volumes) + } + + async fn walk_dir(&self, opts: WalkDirOptions) -> Result> { + let walk_dir_options = serde_json::to_string(&opts)?; + let mut client = self.get_client(); + let request = Request::new(WalkDirRequest { + disk: self.root.to_string_lossy().to_string(), + walk_dir_options, + }); + + let response = client.walk_dir(request).await?.into_inner(); + let entries = response + .meta_cache_entry + .into_iter() + .filter_map(|json_str| serde_json::from_str::(&json_str).ok()) + .collect(); + + Ok(entries) + } + + async fn rename_data( + &self, + src_volume: &str, + src_path: &str, + fi: FileInfo, + dst_volume: &str, + dst_path: &str, + ) -> Result { + let file_info = serde_json::to_string(&fi)?; + let mut client = self.get_client(); + let request = Request::new(RenameDataRequest { + disk: self.root.to_string_lossy().to_string(), + src_volume: src_volume.to_string(), + src_path: src_path.to_string(), + file_info, + dst_volume: dst_volume.to_string(), + dst_path: dst_path.to_string(), + }); + + let response = client.rename_data(request).await?.into_inner(); + let rename_data_resp = serde_json::from_str::(&response.rename_data_resp)?; + + Ok(rename_data_resp) + } + + async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> { + let mut client = self.get_client(); + let request = Request::new(MakeVolumesRequest { + disk: self.root.to_string_lossy().to_string(), + volumes: volumes.iter().map(|s| (*s).to_string()).collect(), + }); + + let _response = client.make_volumes(request).await?.into_inner(); + + Ok(()) + } + + async fn make_volume(&self, volume: &str) -> Result<()> { + let mut client = self.get_client(); + let request = Request::new(MakeVolumeRequest { + disk: self.root.to_string_lossy().to_string(), + volume: volume.to_string(), + }); + + let _response = client.make_volume(request).await?.into_inner(); + + Ok(()) + } + + async fn list_volumes(&self) -> Result> { + let mut client = self.get_client(); + let request = Request::new(ListVolumesRequest { + disk: self.root.to_string_lossy().to_string(), + }); + + let response = client.list_volumes(request).await?.into_inner(); + let infos = response + .volume_infos + .into_iter() + .filter_map(|json_str| serde_json::from_str::(&json_str).ok()) + .collect(); + + Ok(infos) + } + + async fn stat_volume(&self, volume: &str) -> Result { + let mut client = self.get_client(); + let request = Request::new(StatVolumeRequest { + disk: self.root.to_string_lossy().to_string(), + volume: volume.to_string(), + }); + + let response = client.stat_volume(request).await?.into_inner(); + let volume_info = serde_json::from_str::(&response.volume_info)?; + + Ok(volume_info) + } + + async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> { + let file_info = serde_json::to_string(&fi)?; + let mut client = self.get_client(); + let request = Request::new(WriteMetadataRequest { + disk: self.root.to_string_lossy().to_string(), + volume: volume.to_string(), + path: path.to_string(), + file_info, + }); + + let _response = client.write_metadata(request).await?.into_inner(); + + Ok(()) + } + + async fn read_version( + &self, + _org_volume: &str, + volume: &str, + path: &str, + version_id: &str, + opts: &ReadOptions, + ) -> Result { + let opts = serde_json::to_string(opts)?; + let mut client = self.get_client(); + let request = Request::new(ReadVersionRequest { + disk: self.root.to_string_lossy().to_string(), + volume: volume.to_string(), + path: path.to_string(), + version_id: version_id.to_string(), + opts, + }); + + let response = client.read_version(request).await?.into_inner(); + let file_info = serde_json::from_str::(&response.file_info)?; + + Ok(file_info) + } + + async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result { + let mut client = self.get_client(); + let request = Request::new(ReadXlRequest { + disk: self.root.to_string_lossy().to_string(), + volume: volume.to_string(), + path: path.to_string(), + read_data, + }); + + let response = client.read_xl(request).await?.into_inner(); + let raw_file_info = serde_json::from_str::(&response.raw_file_info)?; + + Ok(raw_file_info) + } + + async fn read_multiple(&self, req: ReadMultipleReq) -> Result> { + let read_multiple_req = serde_json::to_string(&req)?; + let mut client = self.get_client(); + let request = Request::new(ReadMultipleRequest { + disk: self.root.to_string_lossy().to_string(), + read_multiple_req, + }); + + let response = client.read_multiple(request).await?.into_inner(); + let read_multiple_resps = response + .read_multiple_resps + .into_iter() + .filter_map(|json_str| serde_json::from_str::(&json_str).ok()) + .collect(); + + Ok(read_multiple_resps) + } + + async fn delete_volume(&self, volume: &str) -> Result<()> { + let mut client = self.get_client(); + let request = Request::new(DeleteVolumeRequest { + disk: self.root.to_string_lossy().to_string(), + volume: volume.to_string(), + }); + + let _response = client.delete_volume(request).await?.into_inner(); + + Ok(()) + } +} diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 6c23842a..13cb5af1 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -3,7 +3,7 @@ use bytes::Bytes; use futures::future::join_all; use futures::{Stream, StreamExt}; use reed_solomon_erasure::galois_8::ReedSolomon; -use tokio::io::AsyncWrite; +use std::fmt::Debug; use tokio::io::AsyncWriteExt; use tokio::io::DuplexStream; use tracing::debug; @@ -13,7 +13,7 @@ use uuid::Uuid; use crate::chunk_stream::ChunkedStream; use crate::disk::error::DiskError; -use crate::disk::FileReader; +use crate::disk::{FileReader, FileWriter}; pub struct Erasure { data_shards: usize, @@ -43,17 +43,16 @@ impl Erasure { } } - pub async fn encode( + pub async fn encode( &self, body: S, - writers: &mut [W], + writers: &mut [FileWriter], // block_size: usize, total_size: usize, _write_quorum: usize, ) -> Result where S: Stream> + Send + Sync + 'static, - W: AsyncWrite + Unpin, { let mut stream = ChunkedStream::new(body, total_size, self.block_size, false); let mut total: usize = 0; @@ -85,7 +84,7 @@ impl Erasure { let mut errs = Vec::new(); for (i, w) in writers.iter_mut().enumerate() { - match w.write_all(blocks[i].as_ref()).await { + match w.write(blocks[i].as_ref()).await { Ok(_) => errs.push(None), Err(e) => errs.push(Some(e)), } @@ -318,6 +317,12 @@ impl Erasure { } } +#[async_trait::async_trait] +pub trait Write { + async fn write(&mut self, buf: &[u8]) -> Result<()>; +} + +#[async_trait::async_trait] pub trait ReadAt { async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)>; } diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 82741b44..beec2d9e 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -3,7 +3,7 @@ mod chunk_stream; pub mod disk; mod disks_layout; mod endpoints; -mod erasure; +pub mod erasure; pub mod error; mod file_meta; pub mod peer; diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index a1556b10..f0f0e673 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -197,6 +197,7 @@ pub struct ObjectPartInfo { // } // } +#[derive(Serialize, Deserialize)] pub struct RawFileInfo { pub buf: Vec, } diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index a155cc22..89bd92a7 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -28,6 +28,7 @@ prost-types.workspace = true protos.workspace = true protobuf.workspace = true s3s.workspace = true +serde_json.workspace = true tracing.workspace = true time = { workspace = true, features = ["parsing", "formatting"] } tokio = { workspace = true, features = [ diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index 890e6cba..7fa7d5de 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -1,4 +1,10 @@ -use ecstore::{disk::DiskStore, peer::LocalPeerS3Client}; +use ecstore::{ + disk::{DeleteOptions, DiskStore, ReadMultipleReq, ReadOptions, WalkDirOptions}, + erasure::{ReadAt, Write}, + peer::{LocalPeerS3Client, PeerS3Client}, + store_api::{FileInfo, MakeBucketOptions}, +}; +use tokio::fs; use tonic::{Request, Response, Status}; use tracing::{debug, error, info}; @@ -6,7 +12,13 @@ use protos::{ models::{PingBody, PingBodyBuilder}, proto_gen::node_service::{ node_service_server::{NodeService as Node, NodeServiceServer as NodeServer}, - MakeBucketRequest, MakeBucketResponse, PingRequest, PingResponse, + DeleteRequest, DeleteResponse, DeleteVolumeRequest, DeleteVolumeResponse, ListDirRequest, ListDirResponse, + ListVolumesRequest, ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, MakeVolumeRequest, MakeVolumeResponse, + MakeVolumesRequest, MakeVolumesResponse, PingRequest, PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest, + ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse, ReadVersionRequest, ReadVersionResponse, ReadXlRequest, + ReadXlResponse, RenameDataRequest, RenameDataResponse, RenameFileRequst, RenameFileResponse, StatVolumeRequest, + StatVolumeResponse, WalkDirRequest, WalkDirResponse, WriteAllRequest, WriteAllResponse, WriteMetadataRequest, + WriteMetadataResponse, WriteRequest, WriteResponse, }, }; @@ -20,6 +32,24 @@ pub fn make_server(local_disks: Vec) -> NodeServer { NodeServer::new(NodeService { local_peer }) } +impl NodeService { + async fn find_disk(&self, disk_path: &String) -> Option { + let disk_path = match fs::canonicalize(disk_path).await { + Ok(disk_path) => disk_path, + Err(_) => return None, + }; + self.local_peer.local_disks.iter().find(|&x| x.path() == disk_path).cloned() + } + + fn all_disk(&self) -> Vec { + self.local_peer + .local_disks + .iter() + .map(|disk| disk.path().to_string_lossy().to_string()) + .collect() + } +} + #[tonic::async_trait] impl Node for NodeService { async fn ping(&self, request: Request) -> Result, Status> { @@ -52,9 +82,578 @@ impl Node for NodeService { async fn make_bucket(&self, request: Request) -> Result, Status> { debug!("make bucket"); - let _req = request.into_inner(); + let make_bucket_request = request.into_inner(); + let options = if let Some(opts) = make_bucket_request.options { + MakeBucketOptions { + force_create: opts.force_create, + } + } else { + MakeBucketOptions::default() + }; + match self.local_peer.make_bucket(&make_bucket_request.name, &options).await { + Ok(_) => Ok(tonic::Response::new(MakeBucketResponse { + success: true, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(MakeBucketResponse { + success: false, + error_info: Some(format!("make failed: {}", err.to_string())), + })), + } + } - // match self.local_peer.make_bucket(&name, &MakeBucketOptions::default()).await {} - unimplemented!() + async fn read_all(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + match disk.read_all(&request.volume, &request.path).await { + Ok(data) => Ok(tonic::Response::new(ReadAllResponse { + success: true, + data: data.to_vec(), + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(ReadAllResponse { + success: false, + data: Vec::new(), + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(ReadAllResponse { + success: false, + data: Vec::new(), + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn write_all(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + match disk.write_all(&request.volume, &request.path, request.data).await { + Ok(_) => Ok(tonic::Response::new(WriteAllResponse { + success: true, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(WriteAllResponse { + success: false, + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(WriteAllResponse { + success: false, + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn delete(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + let options = match serde_json::from_str::(&request.options) { + Ok(options) => options, + Err(_) => { + return Ok(tonic::Response::new(DeleteResponse { + success: false, + error_info: Some("can not decode DeleteOptions".to_string()), + })); + } + }; + match disk.delete(&request.volume, &request.path, options).await { + Ok(_) => Ok(tonic::Response::new(DeleteResponse { + success: true, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(DeleteResponse { + success: false, + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(DeleteResponse { + success: false, + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn rename_file(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + match disk + .rename_file(&request.src_volume, &request.src_path, &request.dst_volume, &request.dst_path) + .await + { + Ok(_) => Ok(tonic::Response::new(RenameFileResponse { + success: true, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(RenameFileResponse { + success: false, + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(RenameFileResponse { + success: false, + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn write(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + let file_writer = if request.is_append { + disk.append_file(&request.volume, &request.path).await + } else { + disk.create_file("", &request.volume, &request.path, 0).await + }; + + match file_writer { + Ok(mut file_writer) => match file_writer.write(&request.data).await { + Ok(_) => Ok(tonic::Response::new(WriteResponse { + success: true, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(WriteResponse { + success: false, + error_info: Some(err.to_string()), + })), + }, + Err(err) => Ok(tonic::Response::new(WriteResponse { + success: false, + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(WriteResponse { + success: false, + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn read_at(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + match disk.read_file(&request.volume, &request.path).await { + Ok(mut file_reader) => { + match file_reader + .read_at(request.offset.try_into().unwrap(), request.length.try_into().unwrap()) + .await + { + Ok((data, read_size)) => Ok(tonic::Response::new(ReadAtResponse { + success: true, + data, + read_size: read_size.try_into().unwrap(), + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(ReadAtResponse { + success: false, + data: Vec::new(), + read_size: -1, + error_info: Some(err.to_string()), + })), + } + } + Err(err) => Ok(tonic::Response::new(ReadAtResponse { + success: false, + data: Vec::new(), + read_size: -1, + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(ReadAtResponse { + success: false, + data: Vec::new(), + read_size: -1, + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn list_dir(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + match disk.list_dir("", &request.volume, "", 0).await { + Ok(volumes) => Ok(tonic::Response::new(ListDirResponse { + success: true, + volumes, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(ListDirResponse { + success: false, + volumes: Vec::new(), + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(ListDirResponse { + success: false, + volumes: Vec::new(), + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn walk_dir(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + let opts = match serde_json::from_str::(&request.walk_dir_options) { + Ok(options) => options, + Err(_) => { + return Ok(tonic::Response::new(WalkDirResponse { + success: false, + meta_cache_entry: Vec::new(), + error_info: Some("can not decode DeleteOptions".to_string()), + })); + } + }; + match disk.walk_dir(opts).await { + Ok(entries) => { + let entries = entries + .into_iter() + .filter_map(|entry| serde_json::to_string(&entry).ok()) + .collect(); + Ok(tonic::Response::new(WalkDirResponse { + success: true, + meta_cache_entry: entries, + error_info: None, + })) + } + Err(err) => Ok(tonic::Response::new(WalkDirResponse { + success: false, + meta_cache_entry: Vec::new(), + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(WalkDirResponse { + success: false, + meta_cache_entry: Vec::new(), + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn rename_data(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + let file_info = match serde_json::from_str::(&request.file_info) { + Ok(file_info) => file_info, + Err(_) => { + return Ok(tonic::Response::new(RenameDataResponse { + success: false, + rename_data_resp: String::new(), + error_info: Some("can not decode DeleteOptions".to_string()), + })); + } + }; + match disk + .rename_data(&request.src_volume, &request.src_path, file_info, &request.dst_volume, &request.dst_path) + .await + { + Ok(rename_data_resp) => { + let rename_data_resp = match serde_json::to_string(&rename_data_resp) { + Ok(file_info) => file_info, + Err(_) => { + return Ok(tonic::Response::new(RenameDataResponse { + success: false, + rename_data_resp: String::new(), + error_info: Some("can not encode RenameDataResp".to_string()), + })); + } + }; + Ok(tonic::Response::new(RenameDataResponse { + success: true, + rename_data_resp, + error_info: None, + })) + } + Err(err) => Ok(tonic::Response::new(RenameDataResponse { + success: false, + rename_data_resp: String::new(), + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(RenameDataResponse { + success: false, + rename_data_resp: String::new(), + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn make_volumes(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + match disk.make_volumes(request.volumes.iter().map(|s| &**s).collect()).await { + Ok(_) => Ok(tonic::Response::new(MakeVolumesResponse { + success: true, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(MakeVolumesResponse { + success: false, + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(MakeVolumesResponse { + success: false, + error_info: Some(format!("can not find disk, all disks: {:?}", self.all_disk())), + })) + } + } + + async fn make_volume(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + match disk.make_volume(&request.volume).await { + Ok(_) => Ok(tonic::Response::new(MakeVolumeResponse { + success: true, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(MakeVolumeResponse { + success: false, + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(MakeVolumeResponse { + success: false, + error_info: Some(format!("can not find disk, all disks: {:?}", self.all_disk())), + })) + } + } + + async fn list_volumes(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + match disk.list_volumes().await { + Ok(volume_infos) => { + let volume_infos = volume_infos + .into_iter() + .filter_map(|volume_info| serde_json::to_string(&volume_info).ok()) + .collect(); + Ok(tonic::Response::new(ListVolumesResponse { + success: true, + volume_infos, + error_info: None, + })) + } + Err(err) => Ok(tonic::Response::new(ListVolumesResponse { + success: false, + volume_infos: Vec::new(), + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(ListVolumesResponse { + success: false, + volume_infos: Vec::new(), + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn stat_volume(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + match disk.stat_volume(&request.volume).await { + Ok(volume_info) => match serde_json::to_string(&volume_info) { + Ok(volume_info) => Ok(tonic::Response::new(StatVolumeResponse { + success: true, + volume_info, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(StatVolumeResponse { + success: false, + volume_info: String::new(), + error_info: Some(format!("encode VolumeInfo failed, {}", err.to_string())), + })), + }, + Err(err) => Ok(tonic::Response::new(StatVolumeResponse { + success: false, + volume_info: String::new(), + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(StatVolumeResponse { + success: false, + volume_info: String::new(), + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn write_metadata(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + let file_info = match serde_json::from_str::(&request.file_info) { + Ok(file_info) => file_info, + Err(err) => { + return Ok(tonic::Response::new(WriteMetadataResponse { + success: false, + error_info: Some(format!("decode FileInfo failed, {}", err.to_string())), + })); + } + }; + match disk.write_metadata("", &request.volume, &request.path, file_info).await { + Ok(_) => Ok(tonic::Response::new(WriteMetadataResponse { + success: true, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(WriteMetadataResponse { + success: false, + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(WriteMetadataResponse { + success: false, + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn read_version(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + let opts = match serde_json::from_str::(&request.opts) { + Ok(options) => options, + Err(_) => { + return Ok(tonic::Response::new(ReadVersionResponse { + success: false, + file_info: String::new(), + error_info: Some("can not decode DeleteOptions".to_string()), + })); + } + }; + match disk + .read_version("", &request.volume, &request.path, &request.version_id, &opts) + .await + { + Ok(file_info) => match serde_json::to_string(&file_info) { + Ok(file_info) => Ok(tonic::Response::new(ReadVersionResponse { + success: true, + file_info, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(ReadVersionResponse { + success: false, + file_info: String::new(), + error_info: Some(format!("encode VolumeInfo failed, {}", err.to_string())), + })), + }, + Err(err) => Ok(tonic::Response::new(ReadVersionResponse { + success: false, + file_info: String::new(), + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(ReadVersionResponse { + success: false, + file_info: String::new(), + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn read_xl(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + match disk.read_xl(&request.volume, &request.path, request.read_data).await { + Ok(raw_file_info) => match serde_json::to_string(&raw_file_info) { + Ok(raw_file_info) => Ok(tonic::Response::new(ReadXlResponse { + success: true, + raw_file_info, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(ReadXlResponse { + success: false, + raw_file_info: String::new(), + error_info: Some(format!("encode RawFileInfo failed, {}", err.to_string())), + })), + }, + Err(err) => Ok(tonic::Response::new(ReadXlResponse { + success: false, + raw_file_info: String::new(), + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(ReadXlResponse { + success: false, + raw_file_info: String::new(), + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn read_multiple(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + let read_multiple_req = match serde_json::from_str::(&request.read_multiple_req) { + Ok(read_multiple_req) => read_multiple_req, + Err(_) => { + return Ok(tonic::Response::new(ReadMultipleResponse { + success: false, + read_multiple_resps: Vec::new(), + error_info: Some("can not decode ReadMultipleReq".to_string()), + })); + } + }; + match disk.read_multiple(read_multiple_req).await { + Ok(read_multiple_resps) => { + let read_multiple_resps = read_multiple_resps + .into_iter() + .filter_map(|read_multiple_resp| serde_json::to_string(&read_multiple_resp).ok()) + .collect(); + + Ok(tonic::Response::new(ReadMultipleResponse { + success: true, + read_multiple_resps, + error_info: None, + })) + } + Err(err) => Ok(tonic::Response::new(ReadMultipleResponse { + success: false, + read_multiple_resps: Vec::new(), + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(ReadMultipleResponse { + success: false, + read_multiple_resps: Vec::new(), + error_info: Some("can not find disk".to_string()), + })) + } + } + + async fn delete_volume(&self, request: Request) -> Result, Status> { + let request = request.into_inner(); + if let Some(disk) = self.find_disk(&request.disk).await { + match disk.delete_volume(&request.volume).await { + Ok(_) => Ok(tonic::Response::new(DeleteVolumeResponse { + success: true, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(DeleteVolumeResponse { + success: false, + error_info: Some(err.to_string()), + })), + } + } else { + Ok(tonic::Response::new(DeleteVolumeResponse { + success: false, + error_info: Some("can not find disk".to_string()), + })) + } } }