support remote peer sys

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-08-29 19:29:13 +08:00
parent a8e546c9af
commit 29f4eaf94f
5 changed files with 380 additions and 33 deletions

View File

@@ -17,18 +17,28 @@ pub struct PingResponse {
pub body: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct MakeBucketOptions {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListBucketRequest {
#[prost(string, tag = "1")]
pub options: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListBucketResponse {
#[prost(bool, tag = "1")]
pub force_create: bool,
pub success: bool,
#[prost(string, repeated, tag = "2")]
pub bucket_infos: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(string, optional, tag = "3")]
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MakeBucketRequest {
#[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
#[prost(message, optional, tag = "2")]
pub options: ::core::option::Option<MakeBucketOptions>,
#[prost(string, tag = "2")]
pub options: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -40,6 +50,38 @@ pub struct MakeBucketResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetBucketInfoRequest {
#[prost(string, tag = "1")]
pub bucket: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub options: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetBucketInfoResponse {
#[prost(bool, tag = "1")]
pub success: bool,
#[prost(string, tag = "2")]
pub bucket_info: ::prost::alloc::string::String,
#[prost(string, optional, tag = "3")]
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DeleteBucketRequest {
#[prost(string, tag = "1")]
pub bucket: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DeleteBucketResponse {
#[prost(bool, tag = "1")]
pub success: bool,
#[prost(string, optional, tag = "2")]
pub error_info: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ReadAllRequest {
/// indicate which one in the disks
#[prost(string, tag = "1")]
@@ -503,6 +545,21 @@ pub mod node_service_client {
.insert(GrpcMethod::new("node_service.NodeService", "Ping"));
self.inner.unary(req, path, codec).await
}
pub async fn list_bucket(
&mut self,
request: impl tonic::IntoRequest<super::ListBucketRequest>,
) -> std::result::Result<tonic::Response<super::ListBucketResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ListBucket");
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "ListBucket"));
self.inner.unary(req, path, codec).await
}
pub async fn make_bucket(
&mut self,
request: impl tonic::IntoRequest<super::MakeBucketRequest>,
@@ -518,6 +575,36 @@ pub mod node_service_client {
.insert(GrpcMethod::new("node_service.NodeService", "MakeBucket"));
self.inner.unary(req, path, codec).await
}
pub async fn get_bucket_info(
&mut self,
request: impl tonic::IntoRequest<super::GetBucketInfoRequest>,
) -> std::result::Result<tonic::Response<super::GetBucketInfoResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/GetBucketInfo");
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "GetBucketInfo"));
self.inner.unary(req, path, codec).await
}
pub async fn delete_bucket(
&mut self,
request: impl tonic::IntoRequest<super::DeleteBucketRequest>,
) -> std::result::Result<tonic::Response<super::DeleteBucketResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/DeleteBucket");
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("node_service.NodeService", "DeleteBucket"));
self.inner.unary(req, path, codec).await
}
pub async fn read_all(
&mut self,
request: impl tonic::IntoRequest<super::ReadAllRequest>,
@@ -803,10 +890,22 @@ pub mod node_service_server {
&self,
request: tonic::Request<super::PingRequest>,
) -> std::result::Result<tonic::Response<super::PingResponse>, tonic::Status>;
async fn list_bucket(
&self,
request: tonic::Request<super::ListBucketRequest>,
) -> std::result::Result<tonic::Response<super::ListBucketResponse>, tonic::Status>;
async fn make_bucket(
&self,
request: tonic::Request<super::MakeBucketRequest>,
) -> std::result::Result<tonic::Response<super::MakeBucketResponse>, tonic::Status>;
async fn get_bucket_info(
&self,
request: tonic::Request<super::GetBucketInfoRequest>,
) -> std::result::Result<tonic::Response<super::GetBucketInfoResponse>, tonic::Status>;
async fn delete_bucket(
&self,
request: tonic::Request<super::DeleteBucketRequest>,
) -> std::result::Result<tonic::Response<super::DeleteBucketResponse>, tonic::Status>;
async fn read_all(
&self,
request: tonic::Request<super::ReadAllRequest>,
@@ -979,6 +1078,34 @@ pub mod node_service_server {
};
Box::pin(fut)
}
"/node_service.NodeService/ListBucket" => {
#[allow(non_camel_case_types)]
struct ListBucketSvc<T: NodeService>(pub Arc<T>);
impl<T: NodeService> tonic::server::UnaryService<super::ListBucketRequest> for ListBucketSvc<T> {
type Response = super::ListBucketResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(&mut self, request: tonic::Request<super::ListBucketRequest>) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { <T as NodeService>::list_bucket(&inner, request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = ListBucketSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(accept_compression_encodings, send_compression_encodings)
.apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/node_service.NodeService/MakeBucket" => {
#[allow(non_camel_case_types)]
struct MakeBucketSvc<T: NodeService>(pub Arc<T>);
@@ -1007,6 +1134,62 @@ pub mod node_service_server {
};
Box::pin(fut)
}
"/node_service.NodeService/GetBucketInfo" => {
#[allow(non_camel_case_types)]
struct GetBucketInfoSvc<T: NodeService>(pub Arc<T>);
impl<T: NodeService> tonic::server::UnaryService<super::GetBucketInfoRequest> for GetBucketInfoSvc<T> {
type Response = super::GetBucketInfoResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(&mut self, request: tonic::Request<super::GetBucketInfoRequest>) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { <T as NodeService>::get_bucket_info(&inner, request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = GetBucketInfoSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(accept_compression_encodings, send_compression_encodings)
.apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/node_service.NodeService/DeleteBucket" => {
#[allow(non_camel_case_types)]
struct DeleteBucketSvc<T: NodeService>(pub Arc<T>);
impl<T: NodeService> tonic::server::UnaryService<super::DeleteBucketRequest> for DeleteBucketSvc<T> {
type Response = super::DeleteBucketResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(&mut self, request: tonic::Request<super::DeleteBucketRequest>) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move { <T as NodeService>::delete_bucket(&inner, request).await };
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = DeleteBucketSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(accept_compression_encodings, send_compression_encodings)
.apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/node_service.NodeService/ReadAll" => {
#[allow(non_camel_case_types)]
struct ReadAllSvc<T: NodeService>(pub Arc<T>);

View File

@@ -12,13 +12,19 @@ message PingResponse {
bytes body = 2;
}
message MakeBucketOptions {
bool force_create = 1;
message ListBucketRequest {
string options = 1;
}
message ListBucketResponse {
bool success = 1;
repeated string bucket_infos = 2;
optional string error_info = 3;
}
message MakeBucketRequest {
string name = 1;
MakeBucketOptions options = 2;
string options = 2;
}
message MakeBucketResponse {
@@ -26,6 +32,26 @@ message MakeBucketResponse {
optional string error_info = 2;
}
message GetBucketInfoRequest {
string bucket = 1;
string options = 2;
}
message GetBucketInfoResponse {
bool success = 1;
string bucket_info = 2;
optional string error_info = 3;
}
message DeleteBucketRequest {
string bucket = 1;
}
message DeleteBucketResponse {
bool success = 1;
optional string error_info = 2;
}
message ReadAllRequest {
string disk = 1; // indicate which one in the disks
string volume = 2;
@@ -258,7 +284,10 @@ message DeleteVolumeResponse {
service NodeService {
/* -------------------------------meta service-------------------------- */
rpc Ping(PingRequest) returns (PingResponse) {};
rpc ListBucket(ListBucketRequest) returns (ListBucketResponse) {};
rpc MakeBucket(MakeBucketRequest) returns (MakeBucketResponse) {};
rpc GetBucketInfo(GetBucketInfoRequest) returns (GetBucketInfoResponse) {};
rpc DeleteBucket(DeleteBucketRequest) returns (DeleteBucketResponse) {};
/* -------------------------------disk service-------------------------- */

View File

@@ -1,10 +1,7 @@
use async_trait::async_trait;
use futures::future::join_all;
use protos::proto_gen::node_service::MakeBucketRequest;
use protos::{
node_service_time_out_client, proto_gen::node_service::MakeBucketOptions as proto_MakeBucketOptions,
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
};
use protos::proto_gen::node_service::{DeleteBucketRequest, GetBucketInfoRequest, ListBucketRequest, MakeBucketRequest};
use protos::{node_service_time_out_client, DEFAULT_GRPC_SERVER_MESSAGE_LEN};
use regex::Regex;
use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
use tonic::transport::{Channel, Endpoint};
@@ -407,10 +404,27 @@ impl PeerS3Client for RemotePeerS3Client {
fn get_pools(&self) -> Option<Vec<usize>> {
self.pools.clone()
}
async fn list_bucket(&self, _opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
unimplemented!()
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(
self.channel.clone(),
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
);
let request = Request::new(ListBucketRequest { options });
let response = client.list_bucket(request).await?.into_inner();
let bucket_infos = response
.bucket_infos
.into_iter()
.filter_map(|json_str| serde_json::from_str::<BucketInfo>(&json_str).ok())
.collect();
Ok(bucket_infos)
}
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(
self.channel.clone(),
Duration::new(30, 0), // TODO: use config setting
@@ -420,9 +434,7 @@ impl PeerS3Client for RemotePeerS3Client {
);
let request = Request::new(MakeBucketRequest {
name: bucket.to_string(),
options: Some(proto_MakeBucketOptions {
force_create: opts.force_create,
}),
options,
});
let _response = client.make_bucket(request).await?.into_inner();
@@ -430,12 +442,39 @@ impl PeerS3Client for RemotePeerS3Client {
Ok(())
}
async fn get_bucket_info(&self, _bucket: &str, _opts: &BucketOptions) -> Result<BucketInfo> {
unimplemented!()
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo> {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(
self.channel.clone(),
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
);
let request = Request::new(GetBucketInfoRequest {
bucket: bucket.to_string(),
options,
});
let response = client.get_bucket_info(request).await?.into_inner();
let bucket_info = serde_json::from_str::<BucketInfo>(&response.bucket_info)?;
Ok(bucket_info)
}
async fn delete_bucket(&self, _bucket: &str) -> Result<()> {
unimplemented!()
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
let mut client = node_service_time_out_client(
self.channel.clone(),
Duration::new(30, 0), // TODO: use config setting
DEFAULT_GRPC_SERVER_MESSAGE_LEN,
// grpc_enable_gzip,
false, // TODO: use config setting
);
let request = Request::new(DeleteBucketRequest {
bucket: bucket.to_string(),
});
let _response = client.delete_bucket(request).await?.into_inner();
Ok(())
}
}

View File

@@ -243,7 +243,7 @@ pub enum BitrotAlgorithm {
BLAKE2b512,
}
#[derive(Debug, Default)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct MakeBucketOptions {
pub force_create: bool,
}
@@ -378,9 +378,10 @@ pub struct ObjectOptions {
// }
// }
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BucketOptions {}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BucketInfo {
pub name: String,
pub created: Option<OffsetDateTime>,

View File

@@ -2,7 +2,7 @@ use ecstore::{
disk::{DeleteOptions, DiskStore, ReadMultipleReq, ReadOptions, WalkDirOptions},
erasure::{ReadAt, Write},
peer::{LocalPeerS3Client, PeerS3Client},
store_api::{FileInfo, MakeBucketOptions},
store_api::{BucketOptions, FileInfo, MakeBucketOptions},
};
use tokio::fs;
use tonic::{Request, Response, Status};
@@ -12,7 +12,8 @@ use protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{
node_service_server::{NodeService as Node, NodeServiceServer as NodeServer},
DeleteRequest, DeleteResponse, DeleteVolumeRequest, DeleteVolumeResponse, ListDirRequest, ListDirResponse,
DeleteBucketRequest, DeleteBucketResponse, DeleteRequest, DeleteResponse, DeleteVolumeRequest, DeleteVolumeResponse,
GetBucketInfoRequest, GetBucketInfoResponse, ListBucketRequest, ListBucketResponse, ListDirRequest, ListDirResponse,
ListVolumesRequest, ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, MakeVolumeRequest, MakeVolumeResponse,
MakeVolumesRequest, MakeVolumesResponse, PingRequest, PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest,
ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse, ReadVersionRequest, ReadVersionResponse, ReadXlRequest,
@@ -79,18 +80,55 @@ impl Node for NodeService {
}))
}
async fn list_bucket(&self, request: Request<ListBucketRequest>) -> Result<Response<ListBucketResponse>, Status> {
debug!("list bucket");
let request = request.into_inner();
let options = match serde_json::from_str::<BucketOptions>(&request.options) {
Ok(options) => options,
Err(err) => {
return Ok(tonic::Response::new(ListBucketResponse {
success: false,
bucket_infos: Vec::new(),
error_info: Some(format!("decode BucketOptions failed: {}", err.to_string())),
}))
}
};
match self.local_peer.list_bucket(&options).await {
Ok(bucket_infos) => {
let bucket_infos = bucket_infos
.into_iter()
.filter_map(|bucket_info| serde_json::to_string(&bucket_info).ok())
.collect();
Ok(tonic::Response::new(ListBucketResponse {
success: true,
bucket_infos,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(ListBucketResponse {
success: false,
bucket_infos: Vec::new(),
error_info: Some(format!("make failed: {}", err.to_string())),
})),
}
}
async fn make_bucket(&self, request: Request<MakeBucketRequest>) -> Result<Response<MakeBucketResponse>, Status> {
debug!("make bucket");
let make_bucket_request = request.into_inner();
let options = if let Some(opts) = make_bucket_request.options {
MakeBucketOptions {
force_create: opts.force_create,
let request = request.into_inner();
let options = match serde_json::from_str::<MakeBucketOptions>(&request.options) {
Ok(options) => options,
Err(err) => {
return Ok(tonic::Response::new(MakeBucketResponse {
success: false,
error_info: Some(format!("decode MakeBucketOptions failed: {}", err.to_string())),
}))
}
} else {
MakeBucketOptions::default()
};
match self.local_peer.make_bucket(&make_bucket_request.name, &options).await {
match self.local_peer.make_bucket(&request.name, &options).await {
Ok(_) => Ok(tonic::Response::new(MakeBucketResponse {
success: true,
error_info: None,
@@ -102,6 +140,63 @@ impl Node for NodeService {
}
}
async fn get_bucket_info(&self, request: Request<GetBucketInfoRequest>) -> Result<Response<GetBucketInfoResponse>, Status> {
debug!("get bucket info");
let request = request.into_inner();
let options = match serde_json::from_str::<BucketOptions>(&request.options) {
Ok(options) => options,
Err(err) => {
return Ok(tonic::Response::new(GetBucketInfoResponse {
success: false,
bucket_info: String::new(),
error_info: Some(format!("decode BucketOptions failed: {}", err.to_string())),
}))
}
};
match self.local_peer.get_bucket_info(&request.bucket, &options).await {
Ok(bucket_info) => {
let bucket_info = match serde_json::to_string(&bucket_info) {
Ok(bucket_info) => bucket_info,
Err(err) => {
return Ok(tonic::Response::new(GetBucketInfoResponse {
success: false,
bucket_info: String::new(),
error_info: Some(format!("encode BucketInfo failed: {}", err.to_string())),
}));
}
};
Ok(tonic::Response::new(GetBucketInfoResponse {
success: true,
bucket_info,
error_info: None,
}))
}
Err(err) => Ok(tonic::Response::new(GetBucketInfoResponse {
success: false,
bucket_info: String::new(),
error_info: Some(format!("make failed: {}", err.to_string())),
})),
}
}
async fn delete_bucket(&self, request: Request<DeleteBucketRequest>) -> Result<Response<DeleteBucketResponse>, Status> {
debug!("make bucket");
let request = request.into_inner();
match self.local_peer.delete_bucket(&request.bucket).await {
Ok(_) => Ok(tonic::Response::new(DeleteBucketResponse {
success: true,
error_info: None,
})),
Err(err) => Ok(tonic::Response::new(DeleteBucketResponse {
success: false,
error_info: Some(format!("make failed: {}", err.to_string())),
})),
}
}
async fn read_all(&self, request: Request<ReadAllRequest>) -> Result<Response<ReadAllResponse>, Status> {
let request = request.into_inner();
if let Some(disk) = self.find_disk(&request.disk).await {