From 29f4eaf94f7201be27b934ab2a4b2275b2292653 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Thu, 29 Aug 2024 19:29:13 +0800 Subject: [PATCH] support remote peer sys Signed-off-by: junxiang Mu <1948535941@qq.com> --- .../src/generated/proto_gen/node_service.rs | 193 +++++++++++++++++- common/protos/src/node.proto | 35 +++- ecstore/src/peer.rs | 67 ++++-- ecstore/src/store_api.rs | 5 +- rustfs/src/grpc.rs | 113 +++++++++- 5 files changed, 380 insertions(+), 33 deletions(-) diff --git a/common/protos/src/generated/proto_gen/node_service.rs b/common/protos/src/generated/proto_gen/node_service.rs index 13e968bf..f72dbb10 100644 --- a/common/protos/src/generated/proto_gen/node_service.rs +++ b/common/protos/src/generated/proto_gen/node_service.rs @@ -17,18 +17,28 @@ pub struct PingResponse { pub body: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] -pub struct MakeBucketOptions { +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListBucketRequest { + #[prost(string, tag = "1")] + pub options: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListBucketResponse { #[prost(bool, tag = "1")] - pub force_create: bool, + pub success: bool, + #[prost(string, repeated, tag = "2")] + pub bucket_infos: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, optional, tag = "3")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct MakeBucketRequest { #[prost(string, tag = "1")] pub name: ::prost::alloc::string::String, - #[prost(message, optional, tag = "2")] - pub options: ::core::option::Option, + #[prost(string, tag = "2")] + pub options: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -40,6 +50,38 @@ pub struct MakeBucketResponse { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetBucketInfoRequest { + #[prost(string, tag = "1")] + pub bucket: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub options: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetBucketInfoResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, tag = "2")] + pub bucket_info: ::prost::alloc::string::String, + #[prost(string, optional, tag = "3")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DeleteBucketRequest { + #[prost(string, tag = "1")] + pub bucket: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DeleteBucketResponse { + #[prost(bool, tag = "1")] + pub success: bool, + #[prost(string, optional, tag = "2")] + pub error_info: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ReadAllRequest { /// indicate which one in the disks #[prost(string, tag = "1")] @@ -503,6 +545,21 @@ pub mod node_service_client { .insert(GrpcMethod::new("node_service.NodeService", "Ping")); self.inner.unary(req, path, codec).await } + pub async fn list_bucket( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ListBucket"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "ListBucket")); + self.inner.unary(req, path, codec).await + } pub async fn make_bucket( &mut self, request: impl tonic::IntoRequest, @@ -518,6 +575,36 @@ pub mod node_service_client { .insert(GrpcMethod::new("node_service.NodeService", "MakeBucket")); self.inner.unary(req, path, codec).await } + pub async fn get_bucket_info( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/GetBucketInfo"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "GetBucketInfo")); + self.inner.unary(req, path, codec).await + } + pub async fn delete_bucket( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/DeleteBucket"); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("node_service.NodeService", "DeleteBucket")); + self.inner.unary(req, path, codec).await + } pub async fn read_all( &mut self, request: impl tonic::IntoRequest, @@ -803,10 +890,22 @@ pub mod node_service_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn list_bucket( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; async fn make_bucket( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn get_bucket_info( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn delete_bucket( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; async fn read_all( &self, request: tonic::Request, @@ -979,6 +1078,34 @@ pub mod node_service_server { }; Box::pin(fut) } + "/node_service.NodeService/ListBucket" => { + #[allow(non_camel_case_types)] + struct ListBucketSvc(pub Arc); + impl tonic::server::UnaryService for ListBucketSvc { + type Response = super::ListBucketResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::list_bucket(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ListBucketSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/node_service.NodeService/MakeBucket" => { #[allow(non_camel_case_types)] struct MakeBucketSvc(pub Arc); @@ -1007,6 +1134,62 @@ pub mod node_service_server { }; Box::pin(fut) } + "/node_service.NodeService/GetBucketInfo" => { + #[allow(non_camel_case_types)] + struct GetBucketInfoSvc(pub Arc); + impl tonic::server::UnaryService for GetBucketInfoSvc { + type Response = super::GetBucketInfoResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::get_bucket_info(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetBucketInfoSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/node_service.NodeService/DeleteBucket" => { + #[allow(non_camel_case_types)] + struct DeleteBucketSvc(pub Arc); + impl tonic::server::UnaryService for DeleteBucketSvc { + type Response = super::DeleteBucketResponse; + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { ::delete_bucket(&inner, request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = DeleteBucketSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config(accept_compression_encodings, send_compression_encodings) + .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/node_service.NodeService/ReadAll" => { #[allow(non_camel_case_types)] struct ReadAllSvc(pub Arc); diff --git a/common/protos/src/node.proto b/common/protos/src/node.proto index cc3ed1be..1d7fe716 100644 --- a/common/protos/src/node.proto +++ b/common/protos/src/node.proto @@ -12,13 +12,19 @@ message PingResponse { bytes body = 2; } -message MakeBucketOptions { - bool force_create = 1; +message ListBucketRequest { + string options = 1; +} + +message ListBucketResponse { + bool success = 1; + repeated string bucket_infos = 2; + optional string error_info = 3; } message MakeBucketRequest { string name = 1; - MakeBucketOptions options = 2; + string options = 2; } message MakeBucketResponse { @@ -26,6 +32,26 @@ message MakeBucketResponse { optional string error_info = 2; } +message GetBucketInfoRequest { + string bucket = 1; + string options = 2; +} + +message GetBucketInfoResponse { + bool success = 1; + string bucket_info = 2; + optional string error_info = 3; +} + +message DeleteBucketRequest { + string bucket = 1; +} + +message DeleteBucketResponse { + bool success = 1; + optional string error_info = 2; +} + message ReadAllRequest { string disk = 1; // indicate which one in the disks string volume = 2; @@ -258,7 +284,10 @@ message DeleteVolumeResponse { service NodeService { /* -------------------------------meta service-------------------------- */ rpc Ping(PingRequest) returns (PingResponse) {}; + rpc ListBucket(ListBucketRequest) returns (ListBucketResponse) {}; rpc MakeBucket(MakeBucketRequest) returns (MakeBucketResponse) {}; + rpc GetBucketInfo(GetBucketInfoRequest) returns (GetBucketInfoResponse) {}; + rpc DeleteBucket(DeleteBucketRequest) returns (DeleteBucketResponse) {}; /* -------------------------------disk service-------------------------- */ diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index 4274c297..8d80c6dc 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -1,10 +1,7 @@ use async_trait::async_trait; use futures::future::join_all; -use protos::proto_gen::node_service::MakeBucketRequest; -use protos::{ - node_service_time_out_client, proto_gen::node_service::MakeBucketOptions as proto_MakeBucketOptions, - DEFAULT_GRPC_SERVER_MESSAGE_LEN, -}; +use protos::proto_gen::node_service::{DeleteBucketRequest, GetBucketInfoRequest, ListBucketRequest, MakeBucketRequest}; +use protos::{node_service_time_out_client, DEFAULT_GRPC_SERVER_MESSAGE_LEN}; use regex::Regex; use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration}; use tonic::transport::{Channel, Endpoint}; @@ -407,10 +404,27 @@ impl PeerS3Client for RemotePeerS3Client { fn get_pools(&self) -> Option> { self.pools.clone() } - async fn list_bucket(&self, _opts: &BucketOptions) -> Result> { - unimplemented!() + async fn list_bucket(&self, opts: &BucketOptions) -> Result> { + let options = serde_json::to_string(opts)?; + let mut client = node_service_time_out_client( + self.channel.clone(), + Duration::new(30, 0), // TODO: use config setting + DEFAULT_GRPC_SERVER_MESSAGE_LEN, + // grpc_enable_gzip, + false, // TODO: use config setting + ); + let request = Request::new(ListBucketRequest { options }); + let response = client.list_bucket(request).await?.into_inner(); + let bucket_infos = response + .bucket_infos + .into_iter() + .filter_map(|json_str| serde_json::from_str::(&json_str).ok()) + .collect(); + + Ok(bucket_infos) } async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { + let options = serde_json::to_string(opts)?; let mut client = node_service_time_out_client( self.channel.clone(), Duration::new(30, 0), // TODO: use config setting @@ -420,9 +434,7 @@ impl PeerS3Client for RemotePeerS3Client { ); let request = Request::new(MakeBucketRequest { name: bucket.to_string(), - options: Some(proto_MakeBucketOptions { - force_create: opts.force_create, - }), + options, }); let _response = client.make_bucket(request).await?.into_inner(); @@ -430,12 +442,39 @@ impl PeerS3Client for RemotePeerS3Client { Ok(()) } - async fn get_bucket_info(&self, _bucket: &str, _opts: &BucketOptions) -> Result { - unimplemented!() + async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result { + let options = serde_json::to_string(opts)?; + let mut client = node_service_time_out_client( + self.channel.clone(), + Duration::new(30, 0), // TODO: use config setting + DEFAULT_GRPC_SERVER_MESSAGE_LEN, + // grpc_enable_gzip, + false, // TODO: use config setting + ); + let request = Request::new(GetBucketInfoRequest { + bucket: bucket.to_string(), + options, + }); + let response = client.get_bucket_info(request).await?.into_inner(); + let bucket_info = serde_json::from_str::(&response.bucket_info)?; + + Ok(bucket_info) } - async fn delete_bucket(&self, _bucket: &str) -> Result<()> { - unimplemented!() + async fn delete_bucket(&self, bucket: &str) -> Result<()> { + let mut client = node_service_time_out_client( + self.channel.clone(), + Duration::new(30, 0), // TODO: use config setting + DEFAULT_GRPC_SERVER_MESSAGE_LEN, + // grpc_enable_gzip, + false, // TODO: use config setting + ); + let request = Request::new(DeleteBucketRequest { + bucket: bucket.to_string(), + }); + let _response = client.delete_bucket(request).await?.into_inner(); + + Ok(()) } } diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index f0f0e673..09993833 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -243,7 +243,7 @@ pub enum BitrotAlgorithm { BLAKE2b512, } -#[derive(Debug, Default)] +#[derive(Debug, Default, Serialize, Deserialize)] pub struct MakeBucketOptions { pub force_create: bool, } @@ -378,9 +378,10 @@ pub struct ObjectOptions { // } // } +#[derive(Debug, Default, Serialize, Deserialize)] pub struct BucketOptions {} -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct BucketInfo { pub name: String, pub created: Option, diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index 7fa7d5de..544f5810 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -2,7 +2,7 @@ use ecstore::{ disk::{DeleteOptions, DiskStore, ReadMultipleReq, ReadOptions, WalkDirOptions}, erasure::{ReadAt, Write}, peer::{LocalPeerS3Client, PeerS3Client}, - store_api::{FileInfo, MakeBucketOptions}, + store_api::{BucketOptions, FileInfo, MakeBucketOptions}, }; use tokio::fs; use tonic::{Request, Response, Status}; @@ -12,7 +12,8 @@ use protos::{ models::{PingBody, PingBodyBuilder}, proto_gen::node_service::{ node_service_server::{NodeService as Node, NodeServiceServer as NodeServer}, - DeleteRequest, DeleteResponse, DeleteVolumeRequest, DeleteVolumeResponse, ListDirRequest, ListDirResponse, + DeleteBucketRequest, DeleteBucketResponse, DeleteRequest, DeleteResponse, DeleteVolumeRequest, DeleteVolumeResponse, + GetBucketInfoRequest, GetBucketInfoResponse, ListBucketRequest, ListBucketResponse, ListDirRequest, ListDirResponse, ListVolumesRequest, ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, MakeVolumeRequest, MakeVolumeResponse, MakeVolumesRequest, MakeVolumesResponse, PingRequest, PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest, ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse, ReadVersionRequest, ReadVersionResponse, ReadXlRequest, @@ -79,18 +80,55 @@ impl Node for NodeService { })) } + async fn list_bucket(&self, request: Request) -> Result, Status> { + debug!("list bucket"); + + let request = request.into_inner(); + let options = match serde_json::from_str::(&request.options) { + Ok(options) => options, + Err(err) => { + return Ok(tonic::Response::new(ListBucketResponse { + success: false, + bucket_infos: Vec::new(), + error_info: Some(format!("decode BucketOptions failed: {}", err.to_string())), + })) + } + }; + match self.local_peer.list_bucket(&options).await { + Ok(bucket_infos) => { + let bucket_infos = bucket_infos + .into_iter() + .filter_map(|bucket_info| serde_json::to_string(&bucket_info).ok()) + .collect(); + Ok(tonic::Response::new(ListBucketResponse { + success: true, + bucket_infos, + error_info: None, + })) + } + + Err(err) => Ok(tonic::Response::new(ListBucketResponse { + success: false, + bucket_infos: Vec::new(), + error_info: Some(format!("make failed: {}", err.to_string())), + })), + } + } + async fn make_bucket(&self, request: Request) -> Result, Status> { debug!("make bucket"); - let make_bucket_request = request.into_inner(); - let options = if let Some(opts) = make_bucket_request.options { - MakeBucketOptions { - force_create: opts.force_create, + let request = request.into_inner(); + let options = match serde_json::from_str::(&request.options) { + Ok(options) => options, + Err(err) => { + return Ok(tonic::Response::new(MakeBucketResponse { + success: false, + error_info: Some(format!("decode MakeBucketOptions failed: {}", err.to_string())), + })) } - } else { - MakeBucketOptions::default() }; - match self.local_peer.make_bucket(&make_bucket_request.name, &options).await { + match self.local_peer.make_bucket(&request.name, &options).await { Ok(_) => Ok(tonic::Response::new(MakeBucketResponse { success: true, error_info: None, @@ -102,6 +140,63 @@ impl Node for NodeService { } } + async fn get_bucket_info(&self, request: Request) -> Result, Status> { + debug!("get bucket info"); + + let request = request.into_inner(); + let options = match serde_json::from_str::(&request.options) { + Ok(options) => options, + Err(err) => { + return Ok(tonic::Response::new(GetBucketInfoResponse { + success: false, + bucket_info: String::new(), + error_info: Some(format!("decode BucketOptions failed: {}", err.to_string())), + })) + } + }; + match self.local_peer.get_bucket_info(&request.bucket, &options).await { + Ok(bucket_info) => { + let bucket_info = match serde_json::to_string(&bucket_info) { + Ok(bucket_info) => bucket_info, + Err(err) => { + return Ok(tonic::Response::new(GetBucketInfoResponse { + success: false, + bucket_info: String::new(), + error_info: Some(format!("encode BucketInfo failed: {}", err.to_string())), + })); + } + }; + Ok(tonic::Response::new(GetBucketInfoResponse { + success: true, + bucket_info, + error_info: None, + })) + } + + Err(err) => Ok(tonic::Response::new(GetBucketInfoResponse { + success: false, + bucket_info: String::new(), + error_info: Some(format!("make failed: {}", err.to_string())), + })), + } + } + + async fn delete_bucket(&self, request: Request) -> Result, Status> { + debug!("make bucket"); + + let request = request.into_inner(); + match self.local_peer.delete_bucket(&request.bucket).await { + Ok(_) => Ok(tonic::Response::new(DeleteBucketResponse { + success: true, + error_info: None, + })), + Err(err) => Ok(tonic::Response::new(DeleteBucketResponse { + success: false, + error_info: Some(format!("make failed: {}", err.to_string())), + })), + } + } + async fn read_all(&self, request: Request) -> Result, Status> { let request = request.into_inner(); if let Some(disk) = self.find_disk(&request.disk).await {