From 04ab9d75a97014d3056d4affcb0a0987c0d29a01 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Fri, 27 Sep 2024 16:29:40 +0800 Subject: [PATCH] multiplex channel && rpc service add authentication Signed-off-by: junxiang Mu <1948535941@qq.com> --- Cargo.lock | 4 + common/common/Cargo.toml | 1 + common/common/src/globals.rs | 4 + common/lock/src/remote_client.rs | 37 +- common/protos/Cargo.toml | 1 + .../generated/flatbuffers_generated/models.rs | 207 ++- .../src/generated/proto_gen/node_service.rs | 1546 +++++++++++++---- common/protos/src/lib.rs | 54 +- e2e_test/Cargo.toml | 2 + e2e_test/src/reliant/lock.rs | 56 +- ecstore/src/disk/mod.rs | 15 +- ecstore/src/disk/remote.rs | 157 +- ecstore/src/peer.rs | 74 +- rustfs/src/grpc.rs | 25 +- rustfs/src/main.rs | 13 +- 15 files changed, 1590 insertions(+), 606 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a814da0d..db5a762a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -368,6 +368,7 @@ version = "0.0.1" dependencies = [ "lazy_static", "tokio", + "tonic", "tracing-error", ] @@ -435,11 +436,13 @@ version = "0.0.1" dependencies = [ "ecstore", "flatbuffers", + "lazy_static", "lock", "protos", "serde_json", "tokio", "tonic", + "tower", "url", ] @@ -1423,6 +1426,7 @@ dependencies = [ name = "protos" version = "0.0.1" dependencies = [ + "common", "flatbuffers", "prost", "prost-build", diff --git a/common/common/Cargo.toml b/common/common/Cargo.toml index 2d8a7070..0675b06b 100644 --- a/common/common/Cargo.toml +++ b/common/common/Cargo.toml @@ -6,4 +6,5 @@ edition.workspace = true [dependencies] lazy_static.workspace = true tokio.workspace = true +tonic.workspace = true tracing-error.workspace = true \ No newline at end of file diff --git a/common/common/src/globals.rs b/common/common/src/globals.rs index 0439b4d8..0bbe07c8 100644 --- a/common/common/src/globals.rs +++ b/common/common/src/globals.rs @@ -1,8 +1,12 @@ +use std::collections::HashMap; + use lazy_static::lazy_static; use tokio::sync::RwLock; +use tonic::transport::Channel; lazy_static! { pub static ref GLOBAL_Local_Node_Name: RwLock = RwLock::new("".to_string()); pub static ref GLOBAL_Rustfs_Host: RwLock = RwLock::new("".to_string()); pub static ref GLOBAL_Rustfs_Port: RwLock = RwLock::new("9000".to_string()); + pub static ref GLOBAL_Conn_Map: RwLock> = RwLock::new(HashMap::new()); } diff --git a/common/lock/src/remote_client.rs b/common/lock/src/remote_client.rs index 731cb923..be65b858 100644 --- a/common/lock/src/remote_client.rs +++ b/common/lock/src/remote_client.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use common::error::{Error, Result}; -use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, GenerallyLockRequest}; +use protos::{node_service_time_out_client, proto_gen::node_service::GenerallyLockRequest}; use tonic::Request; use tracing::info; @@ -8,18 +8,13 @@ use crate::{lock_args::LockArgs, Locker}; #[derive(Debug, Clone)] pub struct RemoteClient { - url: url::Url, + addr: String, } impl RemoteClient { pub fn new(url: url::Url) -> Self { - Self { url } - } - - async fn get_client_v2(&self) -> Result> { - // Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?) - let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap()); - Ok(NodeServiceClient::connect(addr).await?) + let addr = format!("{}://{}:{}", url.scheme(), url.host_str().unwrap(), url.port().unwrap()); + Self { addr } } } @@ -28,7 +23,9 @@ impl Locker for RemoteClient { async fn lock(&mut self, args: &LockArgs) -> Result { info!("remote lock"); let args = serde_json::to_string(args)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(GenerallyLockRequest { args }); let response = client.lock(request).await?.into_inner(); @@ -43,7 +40,9 @@ impl Locker for RemoteClient { async fn unlock(&mut self, args: &LockArgs) -> Result { info!("remote unlock"); let args = serde_json::to_string(args)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(GenerallyLockRequest { args }); let response = client.un_lock(request).await?.into_inner(); @@ -58,7 +57,9 @@ impl Locker for RemoteClient { async fn rlock(&mut self, args: &LockArgs) -> Result { info!("remote rlock"); let args = serde_json::to_string(args)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(GenerallyLockRequest { args }); let response = client.r_lock(request).await?.into_inner(); @@ -73,7 +74,9 @@ impl Locker for RemoteClient { async fn runlock(&mut self, args: &LockArgs) -> Result { info!("remote runlock"); let args = serde_json::to_string(args)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(GenerallyLockRequest { args }); let response = client.r_un_lock(request).await?.into_inner(); @@ -88,7 +91,9 @@ impl Locker for RemoteClient { async fn force_unlock(&mut self, args: &LockArgs) -> Result { info!("remote force_unlock"); let args = serde_json::to_string(args)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(GenerallyLockRequest { args }); let response = client.force_un_lock(request).await?.into_inner(); @@ -103,7 +108,9 @@ impl Locker for RemoteClient { async fn refresh(&mut self, args: &LockArgs) -> Result { info!("remote refresh"); let args = serde_json::to_string(args)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(GenerallyLockRequest { args }); let response = client.refresh(request).await?.into_inner(); diff --git a/common/protos/Cargo.toml b/common/protos/Cargo.toml index a6ab9df6..4856fa06 100644 --- a/common/protos/Cargo.toml +++ b/common/protos/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true [dependencies] #async-backtrace = { workspace = true, optional = true } +common.workspace = true flatbuffers = { workspace = true } prost = { workspace = true } protobuf = { workspace = true } diff --git a/common/protos/src/generated/flatbuffers_generated/models.rs b/common/protos/src/generated/flatbuffers_generated/models.rs index e4949fdc..aa1f6ae2 100644 --- a/common/protos/src/generated/flatbuffers_generated/models.rs +++ b/common/protos/src/generated/flatbuffers_generated/models.rs @@ -1,9 +1,10 @@ // automatically generated by the FlatBuffers compiler, do not modify + // @generated -use core::cmp::Ordering; use core::mem; +use core::cmp::Ordering; extern crate flatbuffers; use self::flatbuffers::{EndianScalar, Follow}; @@ -11,114 +12,112 @@ use self::flatbuffers::{EndianScalar, Follow}; #[allow(unused_imports, dead_code)] pub mod models { - use core::cmp::Ordering; - use core::mem; + use core::mem; + use core::cmp::Ordering; - extern crate flatbuffers; - use self::flatbuffers::{EndianScalar, Follow}; + extern crate flatbuffers; + use self::flatbuffers::{EndianScalar, Follow}; - pub enum PingBodyOffset {} - #[derive(Copy, Clone, PartialEq)] +pub enum PingBodyOffset {} +#[derive(Copy, Clone, PartialEq)] - pub struct PingBody<'a> { - pub _tab: flatbuffers::Table<'a>, +pub struct PingBody<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for PingBody<'a> { + type Inner = PingBody<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { _tab: flatbuffers::Table::new(buf, loc) } + } +} + +impl<'a> PingBody<'a> { + pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4; + + pub const fn get_fully_qualified_name() -> &'static str { + "models.PingBody" + } + + #[inline] + pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + PingBody { _tab: table } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>, + args: &'args PingBodyArgs<'args> + ) -> flatbuffers::WIPOffset> { + let mut builder = PingBodyBuilder::new(_fbb); + if let Some(x) = args.payload { builder.add_payload(x); } + builder.finish() + } + + + #[inline] + pub fn payload(&self) -> Option> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { self._tab.get::>>(PingBody::VT_PAYLOAD, None)} + } +} + +impl flatbuffers::Verifiable for PingBody<'_> { + #[inline] + fn run_verifier( + v: &mut flatbuffers::Verifier, pos: usize + ) -> Result<(), flatbuffers::InvalidFlatbuffer> { + use self::flatbuffers::Verifiable; + v.visit_table(pos)? + .visit_field::>>("payload", Self::VT_PAYLOAD, false)? + .finish(); + Ok(()) + } +} +pub struct PingBodyArgs<'a> { + pub payload: Option>>, +} +impl<'a> Default for PingBodyArgs<'a> { + #[inline] + fn default() -> Self { + PingBodyArgs { + payload: None, } + } +} - impl<'a> flatbuffers::Follow<'a> for PingBody<'a> { - type Inner = PingBody<'a>; - #[inline] - unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - Self { - _tab: flatbuffers::Table::new(buf, loc), - } - } +pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> { + #[inline] + pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(PingBody::VT_PAYLOAD, payload); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> { + let start = _fbb.start_table(); + PingBodyBuilder { + fbb_: _fbb, + start_: start, } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} - impl<'a> PingBody<'a> { - pub const VT_PAYLOAD: flatbuffers::VOffsetT = 4; +impl core::fmt::Debug for PingBody<'_> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut ds = f.debug_struct("PingBody"); + ds.field("payload", &self.payload()); + ds.finish() + } +} +} // pub mod models - pub const fn get_fully_qualified_name() -> &'static str { - "models.PingBody" - } - - #[inline] - pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { - PingBody { _tab: table } - } - #[allow(unused_mut)] - pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>( - _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>, - args: &'args PingBodyArgs<'args>, - ) -> flatbuffers::WIPOffset> { - let mut builder = PingBodyBuilder::new(_fbb); - if let Some(x) = args.payload { - builder.add_payload(x); - } - builder.finish() - } - - #[inline] - pub fn payload(&self) -> Option> { - // Safety: - // Created from valid Table for this object - // which contains a valid value in this slot - unsafe { - self._tab - .get::>>(PingBody::VT_PAYLOAD, None) - } - } - } - - impl flatbuffers::Verifiable for PingBody<'_> { - #[inline] - fn run_verifier(v: &mut flatbuffers::Verifier, pos: usize) -> Result<(), flatbuffers::InvalidFlatbuffer> { - use self::flatbuffers::Verifiable; - v.visit_table(pos)? - .visit_field::>>("payload", Self::VT_PAYLOAD, false)? - .finish(); - Ok(()) - } - } - pub struct PingBodyArgs<'a> { - pub payload: Option>>, - } - impl<'a> Default for PingBodyArgs<'a> { - #[inline] - fn default() -> Self { - PingBodyArgs { payload: None } - } - } - - pub struct PingBodyBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> { - fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>, - start_: flatbuffers::WIPOffset, - } - impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PingBodyBuilder<'a, 'b, A> { - #[inline] - pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset>) { - self.fbb_ - .push_slot_always::>(PingBody::VT_PAYLOAD, payload); - } - #[inline] - pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PingBodyBuilder<'a, 'b, A> { - let start = _fbb.start_table(); - PingBodyBuilder { - fbb_: _fbb, - start_: start, - } - } - #[inline] - pub fn finish(self) -> flatbuffers::WIPOffset> { - let o = self.fbb_.end_table(self.start_); - flatbuffers::WIPOffset::new(o.value()) - } - } - - impl core::fmt::Debug for PingBody<'_> { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - let mut ds = f.debug_struct("PingBody"); - ds.field("payload", &self.payload()); - ds.finish() - } - } -} // pub mod models diff --git a/common/protos/src/generated/proto_gen/node_service.rs b/common/protos/src/generated/proto_gen/node_service.rs index 53c124e9..46cb39a2 100644 --- a/common/protos/src/generated/proto_gen/node_service.rs +++ b/common/protos/src/generated/proto_gen/node_service.rs @@ -491,8 +491,8 @@ pub struct GenerallyLockResponse { /// Generated client implementations. pub mod node_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; #[derive(Debug, Clone)] pub struct NodeServiceClient { inner: tonic::client::Grpc, @@ -523,15 +523,22 @@ pub mod node_service_client { let inner = tonic::client::Grpc::with_origin(inner, origin); Self { inner } } - pub fn with_interceptor(inner: T, interceptor: F) -> NodeServiceClient> + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> NodeServiceClient> where F: tonic::service::Interceptor, T::ResponseBody: Default, T: tonic::codegen::Service< http::Request, - Response = http::Response<>::ResponseBody>, + Response = http::Response< + >::ResponseBody, + >, >, - >>::Error: Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { NodeServiceClient::new(InterceptedService::new(inner, interceptor)) } @@ -574,9 +581,16 @@ pub mod node_service_client { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Ping"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/Ping", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "Ping")); @@ -585,13 +599,23 @@ pub mod node_service_client { pub async fn list_bucket( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ListBucket"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ListBucket", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ListBucket")); @@ -600,13 +624,23 @@ pub mod node_service_client { pub async fn make_bucket( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/MakeBucket"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/MakeBucket", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "MakeBucket")); @@ -615,13 +649,23 @@ pub mod node_service_client { pub async fn get_bucket_info( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/GetBucketInfo"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/GetBucketInfo", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "GetBucketInfo")); @@ -630,13 +674,23 @@ pub mod node_service_client { pub async fn delete_bucket( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/DeleteBucket"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/DeleteBucket", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "DeleteBucket")); @@ -645,13 +699,23 @@ pub mod node_service_client { pub async fn read_all( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadAll"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ReadAll", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ReadAll")); @@ -660,13 +724,23 @@ pub mod node_service_client { pub async fn write_all( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/WriteAll"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/WriteAll", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "WriteAll")); @@ -679,9 +753,16 @@ pub mod node_service_client { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Delete"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/Delete", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "Delete")); @@ -690,13 +771,23 @@ pub mod node_service_client { pub async fn rename_file( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/RenameFile"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/RenameFile", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "RenameFile")); @@ -709,9 +800,16 @@ pub mod node_service_client { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Write"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/Write", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "Write")); @@ -720,13 +818,23 @@ pub mod node_service_client { pub async fn write_stream( &mut self, request: impl tonic::IntoStreamingRequest, - ) -> std::result::Result>, tonic::Status> { + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/WriteStream"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/WriteStream", + ); let mut req = request.into_streaming_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "WriteStream")); @@ -736,13 +844,23 @@ pub mod node_service_client { pub async fn read_at( &mut self, request: impl tonic::IntoStreamingRequest, - ) -> std::result::Result>, tonic::Status> { + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadAt"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ReadAt", + ); let mut req = request.into_streaming_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ReadAt")); @@ -751,13 +869,23 @@ pub mod node_service_client { pub async fn list_dir( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ListDir"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ListDir", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ListDir")); @@ -766,13 +894,23 @@ pub mod node_service_client { pub async fn walk_dir( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/WalkDir"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/WalkDir", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "WalkDir")); @@ -781,13 +919,23 @@ pub mod node_service_client { pub async fn rename_data( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/RenameData"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/RenameData", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "RenameData")); @@ -796,13 +944,23 @@ pub mod node_service_client { pub async fn make_volumes( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/MakeVolumes"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/MakeVolumes", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "MakeVolumes")); @@ -811,13 +969,23 @@ pub mod node_service_client { pub async fn make_volume( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/MakeVolume"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/MakeVolume", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "MakeVolume")); @@ -826,13 +994,23 @@ pub mod node_service_client { pub async fn list_volumes( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ListVolumes"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ListVolumes", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ListVolumes")); @@ -841,13 +1019,23 @@ pub mod node_service_client { pub async fn stat_volume( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/StatVolume"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/StatVolume", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "StatVolume")); @@ -856,13 +1044,23 @@ pub mod node_service_client { pub async fn write_metadata( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/WriteMetadata"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/WriteMetadata", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "WriteMetadata")); @@ -871,13 +1069,23 @@ pub mod node_service_client { pub async fn read_version( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadVersion"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ReadVersion", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ReadVersion")); @@ -890,9 +1098,16 @@ pub mod node_service_client { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadXL"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ReadXL", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ReadXL")); @@ -901,13 +1116,23 @@ pub mod node_service_client { pub async fn delete_versions( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/DeleteVersions"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/DeleteVersions", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "DeleteVersions")); @@ -916,13 +1141,23 @@ pub mod node_service_client { pub async fn read_multiple( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ReadMultiple"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ReadMultiple", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ReadMultiple")); @@ -931,13 +1166,23 @@ pub mod node_service_client { pub async fn delete_volume( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/DeleteVolume"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/DeleteVolume", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "DeleteVolume")); @@ -946,13 +1191,23 @@ pub mod node_service_client { pub async fn lock( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Lock"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/Lock", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "Lock")); @@ -961,13 +1216,23 @@ pub mod node_service_client { pub async fn un_lock( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/UnLock"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/UnLock", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "UnLock")); @@ -976,13 +1241,23 @@ pub mod node_service_client { pub async fn r_lock( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/RLock"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/RLock", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "RLock")); @@ -991,13 +1266,23 @@ pub mod node_service_client { pub async fn r_un_lock( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/RUnLock"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/RUnLock", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "RUnLock")); @@ -1006,13 +1291,23 @@ pub mod node_service_client { pub async fn force_un_lock( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/ForceUnLock"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/ForceUnLock", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "ForceUnLock")); @@ -1021,13 +1316,23 @@ pub mod node_service_client { pub async fn refresh( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { self.inner .ready() .await - .map_err(|e| tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())))?; + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/node_service.NodeService/Refresh"); + let path = http::uri::PathAndQuery::from_static( + "/node_service.NodeService/Refresh", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("node_service.NodeService", "Refresh")); @@ -1050,19 +1355,31 @@ pub mod node_service_server { async fn list_bucket( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn make_bucket( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn get_bucket_info( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn delete_bucket( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn read_all( &self, request: tonic::Request, @@ -1070,7 +1387,10 @@ pub mod node_service_server { async fn write_all( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn delete( &self, request: tonic::Request, @@ -1078,21 +1398,31 @@ pub mod node_service_server { async fn rename_file( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn write( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; /// Server streaming response type for the WriteStream method. - type WriteStreamStream: tonic::codegen::tokio_stream::Stream> + type WriteStreamStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + Send + 'static; async fn write_stream( &self, request: tonic::Request>, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Server streaming response type for the ReadAt method. - type ReadAtStream: tonic::codegen::tokio_stream::Stream> + type ReadAtStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + Send + 'static; /// rpc Append(AppendRequest) returns (AppendResponse) {}; @@ -1111,31 +1441,52 @@ pub mod node_service_server { async fn rename_data( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn make_volumes( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn make_volume( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn list_volumes( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn stat_volume( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn write_metadata( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn read_version( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn read_xl( &self, request: tonic::Request, @@ -1143,39 +1494,66 @@ pub mod node_service_server { async fn delete_versions( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn read_multiple( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn delete_volume( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn lock( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn un_lock( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn r_lock( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn r_un_lock( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn force_un_lock( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn refresh( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct NodeServiceServer { @@ -1198,7 +1576,10 @@ pub mod node_service_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -1242,7 +1623,10 @@ pub mod node_service_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -1250,12 +1634,21 @@ pub mod node_service_server { "/node_service.NodeService/Ping" => { #[allow(non_camel_case_types)] struct PingSvc(pub Arc); - impl tonic::server::UnaryService for PingSvc { + impl tonic::server::UnaryService + for PingSvc { type Response = super::PingResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::ping(&inner, request).await }; + let fut = async move { + ::ping(&inner, request).await + }; Box::pin(fut) } } @@ -1268,8 +1661,14 @@ pub mod node_service_server { let method = PingSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1278,12 +1677,23 @@ pub mod node_service_server { "/node_service.NodeService/ListBucket" => { #[allow(non_camel_case_types)] struct ListBucketSvc(pub Arc); - impl tonic::server::UnaryService for ListBucketSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ListBucketSvc { type Response = super::ListBucketResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::list_bucket(&inner, request).await }; + let fut = async move { + ::list_bucket(&inner, request).await + }; Box::pin(fut) } } @@ -1296,8 +1706,14 @@ pub mod node_service_server { let method = ListBucketSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1306,12 +1722,23 @@ pub mod node_service_server { "/node_service.NodeService/MakeBucket" => { #[allow(non_camel_case_types)] struct MakeBucketSvc(pub Arc); - impl tonic::server::UnaryService for MakeBucketSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for MakeBucketSvc { type Response = super::MakeBucketResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::make_bucket(&inner, request).await }; + let fut = async move { + ::make_bucket(&inner, request).await + }; Box::pin(fut) } } @@ -1324,8 +1751,14 @@ pub mod node_service_server { let method = MakeBucketSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1334,12 +1767,23 @@ pub mod node_service_server { "/node_service.NodeService/GetBucketInfo" => { #[allow(non_camel_case_types)] struct GetBucketInfoSvc(pub Arc); - impl tonic::server::UnaryService for GetBucketInfoSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for GetBucketInfoSvc { type Response = super::GetBucketInfoResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::get_bucket_info(&inner, request).await }; + let fut = async move { + ::get_bucket_info(&inner, request).await + }; Box::pin(fut) } } @@ -1352,8 +1796,14 @@ pub mod node_service_server { let method = GetBucketInfoSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1362,12 +1812,23 @@ pub mod node_service_server { "/node_service.NodeService/DeleteBucket" => { #[allow(non_camel_case_types)] struct DeleteBucketSvc(pub Arc); - impl tonic::server::UnaryService for DeleteBucketSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for DeleteBucketSvc { type Response = super::DeleteBucketResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::delete_bucket(&inner, request).await }; + let fut = async move { + ::delete_bucket(&inner, request).await + }; Box::pin(fut) } } @@ -1380,8 +1841,14 @@ pub mod node_service_server { let method = DeleteBucketSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1390,12 +1857,23 @@ pub mod node_service_server { "/node_service.NodeService/ReadAll" => { #[allow(non_camel_case_types)] struct ReadAllSvc(pub Arc); - impl tonic::server::UnaryService for ReadAllSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ReadAllSvc { type Response = super::ReadAllResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::read_all(&inner, request).await }; + let fut = async move { + ::read_all(&inner, request).await + }; Box::pin(fut) } } @@ -1408,8 +1886,14 @@ pub mod node_service_server { let method = ReadAllSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1418,12 +1902,23 @@ pub mod node_service_server { "/node_service.NodeService/WriteAll" => { #[allow(non_camel_case_types)] struct WriteAllSvc(pub Arc); - impl tonic::server::UnaryService for WriteAllSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for WriteAllSvc { type Response = super::WriteAllResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::write_all(&inner, request).await }; + let fut = async move { + ::write_all(&inner, request).await + }; Box::pin(fut) } } @@ -1436,8 +1931,14 @@ pub mod node_service_server { let method = WriteAllSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1446,12 +1947,23 @@ pub mod node_service_server { "/node_service.NodeService/Delete" => { #[allow(non_camel_case_types)] struct DeleteSvc(pub Arc); - impl tonic::server::UnaryService for DeleteSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for DeleteSvc { type Response = super::DeleteResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::delete(&inner, request).await }; + let fut = async move { + ::delete(&inner, request).await + }; Box::pin(fut) } } @@ -1464,8 +1976,14 @@ pub mod node_service_server { let method = DeleteSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1474,12 +1992,23 @@ pub mod node_service_server { "/node_service.NodeService/RenameFile" => { #[allow(non_camel_case_types)] struct RenameFileSvc(pub Arc); - impl tonic::server::UnaryService for RenameFileSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for RenameFileSvc { type Response = super::RenameFileResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::rename_file(&inner, request).await }; + let fut = async move { + ::rename_file(&inner, request).await + }; Box::pin(fut) } } @@ -1492,8 +2021,14 @@ pub mod node_service_server { let method = RenameFileSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1502,12 +2037,21 @@ pub mod node_service_server { "/node_service.NodeService/Write" => { #[allow(non_camel_case_types)] struct WriteSvc(pub Arc); - impl tonic::server::UnaryService for WriteSvc { + impl tonic::server::UnaryService + for WriteSvc { type Response = super::WriteResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::write(&inner, request).await }; + let fut = async move { + ::write(&inner, request).await + }; Box::pin(fut) } } @@ -1520,8 +2064,14 @@ pub mod node_service_server { let method = WriteSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1530,13 +2080,26 @@ pub mod node_service_server { "/node_service.NodeService/WriteStream" => { #[allow(non_camel_case_types)] struct WriteStreamSvc(pub Arc); - impl tonic::server::StreamingService for WriteStreamSvc { + impl< + T: NodeService, + > tonic::server::StreamingService + for WriteStreamSvc { type Response = super::WriteResponse; type ResponseStream = T::WriteStreamStream; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request>) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + tonic::Streaming, + >, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::write_stream(&inner, request).await }; + let fut = async move { + ::write_stream(&inner, request).await + }; Box::pin(fut) } } @@ -1549,8 +2112,14 @@ pub mod node_service_server { let method = WriteStreamSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.streaming(method, req).await; Ok(res) }; @@ -1559,13 +2128,26 @@ pub mod node_service_server { "/node_service.NodeService/ReadAt" => { #[allow(non_camel_case_types)] struct ReadAtSvc(pub Arc); - impl tonic::server::StreamingService for ReadAtSvc { + impl< + T: NodeService, + > tonic::server::StreamingService + for ReadAtSvc { type Response = super::ReadAtResponse; type ResponseStream = T::ReadAtStream; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request>) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + tonic::Streaming, + >, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::read_at(&inner, request).await }; + let fut = async move { + ::read_at(&inner, request).await + }; Box::pin(fut) } } @@ -1578,8 +2160,14 @@ pub mod node_service_server { let method = ReadAtSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.streaming(method, req).await; Ok(res) }; @@ -1588,12 +2176,23 @@ pub mod node_service_server { "/node_service.NodeService/ListDir" => { #[allow(non_camel_case_types)] struct ListDirSvc(pub Arc); - impl tonic::server::UnaryService for ListDirSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ListDirSvc { type Response = super::ListDirResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::list_dir(&inner, request).await }; + let fut = async move { + ::list_dir(&inner, request).await + }; Box::pin(fut) } } @@ -1606,8 +2205,14 @@ pub mod node_service_server { let method = ListDirSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1616,12 +2221,23 @@ pub mod node_service_server { "/node_service.NodeService/WalkDir" => { #[allow(non_camel_case_types)] struct WalkDirSvc(pub Arc); - impl tonic::server::UnaryService for WalkDirSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for WalkDirSvc { type Response = super::WalkDirResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::walk_dir(&inner, request).await }; + let fut = async move { + ::walk_dir(&inner, request).await + }; Box::pin(fut) } } @@ -1634,8 +2250,14 @@ pub mod node_service_server { let method = WalkDirSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1644,12 +2266,23 @@ pub mod node_service_server { "/node_service.NodeService/RenameData" => { #[allow(non_camel_case_types)] struct RenameDataSvc(pub Arc); - impl tonic::server::UnaryService for RenameDataSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for RenameDataSvc { type Response = super::RenameDataResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::rename_data(&inner, request).await }; + let fut = async move { + ::rename_data(&inner, request).await + }; Box::pin(fut) } } @@ -1662,8 +2295,14 @@ pub mod node_service_server { let method = RenameDataSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1672,12 +2311,23 @@ pub mod node_service_server { "/node_service.NodeService/MakeVolumes" => { #[allow(non_camel_case_types)] struct MakeVolumesSvc(pub Arc); - impl tonic::server::UnaryService for MakeVolumesSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for MakeVolumesSvc { type Response = super::MakeVolumesResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::make_volumes(&inner, request).await }; + let fut = async move { + ::make_volumes(&inner, request).await + }; Box::pin(fut) } } @@ -1690,8 +2340,14 @@ pub mod node_service_server { let method = MakeVolumesSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1700,12 +2356,23 @@ pub mod node_service_server { "/node_service.NodeService/MakeVolume" => { #[allow(non_camel_case_types)] struct MakeVolumeSvc(pub Arc); - impl tonic::server::UnaryService for MakeVolumeSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for MakeVolumeSvc { type Response = super::MakeVolumeResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::make_volume(&inner, request).await }; + let fut = async move { + ::make_volume(&inner, request).await + }; Box::pin(fut) } } @@ -1718,8 +2385,14 @@ pub mod node_service_server { let method = MakeVolumeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1728,12 +2401,23 @@ pub mod node_service_server { "/node_service.NodeService/ListVolumes" => { #[allow(non_camel_case_types)] struct ListVolumesSvc(pub Arc); - impl tonic::server::UnaryService for ListVolumesSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ListVolumesSvc { type Response = super::ListVolumesResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::list_volumes(&inner, request).await }; + let fut = async move { + ::list_volumes(&inner, request).await + }; Box::pin(fut) } } @@ -1746,8 +2430,14 @@ pub mod node_service_server { let method = ListVolumesSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1756,12 +2446,23 @@ pub mod node_service_server { "/node_service.NodeService/StatVolume" => { #[allow(non_camel_case_types)] struct StatVolumeSvc(pub Arc); - impl tonic::server::UnaryService for StatVolumeSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for StatVolumeSvc { type Response = super::StatVolumeResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::stat_volume(&inner, request).await }; + let fut = async move { + ::stat_volume(&inner, request).await + }; Box::pin(fut) } } @@ -1774,8 +2475,14 @@ pub mod node_service_server { let method = StatVolumeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1784,12 +2491,23 @@ pub mod node_service_server { "/node_service.NodeService/WriteMetadata" => { #[allow(non_camel_case_types)] struct WriteMetadataSvc(pub Arc); - impl tonic::server::UnaryService for WriteMetadataSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for WriteMetadataSvc { type Response = super::WriteMetadataResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::write_metadata(&inner, request).await }; + let fut = async move { + ::write_metadata(&inner, request).await + }; Box::pin(fut) } } @@ -1802,8 +2520,14 @@ pub mod node_service_server { let method = WriteMetadataSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1812,12 +2536,23 @@ pub mod node_service_server { "/node_service.NodeService/ReadVersion" => { #[allow(non_camel_case_types)] struct ReadVersionSvc(pub Arc); - impl tonic::server::UnaryService for ReadVersionSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ReadVersionSvc { type Response = super::ReadVersionResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::read_version(&inner, request).await }; + let fut = async move { + ::read_version(&inner, request).await + }; Box::pin(fut) } } @@ -1830,8 +2565,14 @@ pub mod node_service_server { let method = ReadVersionSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1840,12 +2581,23 @@ pub mod node_service_server { "/node_service.NodeService/ReadXL" => { #[allow(non_camel_case_types)] struct ReadXLSvc(pub Arc); - impl tonic::server::UnaryService for ReadXLSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ReadXLSvc { type Response = super::ReadXlResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::read_xl(&inner, request).await }; + let fut = async move { + ::read_xl(&inner, request).await + }; Box::pin(fut) } } @@ -1858,8 +2610,14 @@ pub mod node_service_server { let method = ReadXLSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1868,12 +2626,23 @@ pub mod node_service_server { "/node_service.NodeService/DeleteVersions" => { #[allow(non_camel_case_types)] struct DeleteVersionsSvc(pub Arc); - impl tonic::server::UnaryService for DeleteVersionsSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for DeleteVersionsSvc { type Response = super::DeleteVersionsResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::delete_versions(&inner, request).await }; + let fut = async move { + ::delete_versions(&inner, request).await + }; Box::pin(fut) } } @@ -1886,8 +2655,14 @@ pub mod node_service_server { let method = DeleteVersionsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1896,12 +2671,23 @@ pub mod node_service_server { "/node_service.NodeService/ReadMultiple" => { #[allow(non_camel_case_types)] struct ReadMultipleSvc(pub Arc); - impl tonic::server::UnaryService for ReadMultipleSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ReadMultipleSvc { type Response = super::ReadMultipleResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::read_multiple(&inner, request).await }; + let fut = async move { + ::read_multiple(&inner, request).await + }; Box::pin(fut) } } @@ -1914,8 +2700,14 @@ pub mod node_service_server { let method = ReadMultipleSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1924,12 +2716,23 @@ pub mod node_service_server { "/node_service.NodeService/DeleteVolume" => { #[allow(non_camel_case_types)] struct DeleteVolumeSvc(pub Arc); - impl tonic::server::UnaryService for DeleteVolumeSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for DeleteVolumeSvc { type Response = super::DeleteVolumeResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::delete_volume(&inner, request).await }; + let fut = async move { + ::delete_volume(&inner, request).await + }; Box::pin(fut) } } @@ -1942,8 +2745,14 @@ pub mod node_service_server { let method = DeleteVolumeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1952,12 +2761,23 @@ pub mod node_service_server { "/node_service.NodeService/Lock" => { #[allow(non_camel_case_types)] struct LockSvc(pub Arc); - impl tonic::server::UnaryService for LockSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for LockSvc { type Response = super::GenerallyLockResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::lock(&inner, request).await }; + let fut = async move { + ::lock(&inner, request).await + }; Box::pin(fut) } } @@ -1970,8 +2790,14 @@ pub mod node_service_server { let method = LockSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -1980,12 +2806,23 @@ pub mod node_service_server { "/node_service.NodeService/UnLock" => { #[allow(non_camel_case_types)] struct UnLockSvc(pub Arc); - impl tonic::server::UnaryService for UnLockSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for UnLockSvc { type Response = super::GenerallyLockResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::un_lock(&inner, request).await }; + let fut = async move { + ::un_lock(&inner, request).await + }; Box::pin(fut) } } @@ -1998,8 +2835,14 @@ pub mod node_service_server { let method = UnLockSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2008,12 +2851,23 @@ pub mod node_service_server { "/node_service.NodeService/RLock" => { #[allow(non_camel_case_types)] struct RLockSvc(pub Arc); - impl tonic::server::UnaryService for RLockSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for RLockSvc { type Response = super::GenerallyLockResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::r_lock(&inner, request).await }; + let fut = async move { + ::r_lock(&inner, request).await + }; Box::pin(fut) } } @@ -2026,8 +2880,14 @@ pub mod node_service_server { let method = RLockSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2036,12 +2896,23 @@ pub mod node_service_server { "/node_service.NodeService/RUnLock" => { #[allow(non_camel_case_types)] struct RUnLockSvc(pub Arc); - impl tonic::server::UnaryService for RUnLockSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for RUnLockSvc { type Response = super::GenerallyLockResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::r_un_lock(&inner, request).await }; + let fut = async move { + ::r_un_lock(&inner, request).await + }; Box::pin(fut) } } @@ -2054,8 +2925,14 @@ pub mod node_service_server { let method = RUnLockSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2064,12 +2941,23 @@ pub mod node_service_server { "/node_service.NodeService/ForceUnLock" => { #[allow(non_camel_case_types)] struct ForceUnLockSvc(pub Arc); - impl tonic::server::UnaryService for ForceUnLockSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for ForceUnLockSvc { type Response = super::GenerallyLockResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::force_un_lock(&inner, request).await }; + let fut = async move { + ::force_un_lock(&inner, request).await + }; Box::pin(fut) } } @@ -2082,8 +2970,14 @@ pub mod node_service_server { let method = ForceUnLockSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -2092,12 +2986,23 @@ pub mod node_service_server { "/node_service.NodeService/Refresh" => { #[allow(non_camel_case_types)] struct RefreshSvc(pub Arc); - impl tonic::server::UnaryService for RefreshSvc { + impl< + T: NodeService, + > tonic::server::UnaryService + for RefreshSvc { type Response = super::GenerallyLockResponse; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { ::refresh(&inner, request).await }; + let fut = async move { + ::refresh(&inner, request).await + }; Box::pin(fut) } } @@ -2110,21 +3015,34 @@ pub mod node_service_server { let method = RefreshSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config(accept_compression_encodings, send_compression_encodings) - .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", tonic::Code::Unimplemented as i32) - .header(http::header::CONTENT_TYPE, tonic::metadata::GRPC_CONTENT_TYPE) - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/common/protos/src/lib.rs b/common/protos/src/lib.rs index 639f9b06..daadcb4a 100644 --- a/common/protos/src/lib.rs +++ b/common/protos/src/lib.rs @@ -1,28 +1,44 @@ mod generated; -use std::time::Duration; +use std::{error::Error, time::Duration}; +use common::globals::GLOBAL_Conn_Map; pub use generated::*; use proto_gen::node_service::node_service_client::NodeServiceClient; -use tonic::{codec::CompressionEncoding, transport::Channel}; -use tower::timeout::Timeout; +use tonic::{ + metadata::MetadataValue, + service::interceptor::InterceptedService, + transport::{Channel, Endpoint}, + Request, Status, +}; // Default 100 MB pub const DEFAULT_GRPC_SERVER_MESSAGE_LEN: usize = 100 * 1024 * 1024; -pub fn node_service_time_out_client( - channel: Channel, - time_out: Duration, - max_message_size: usize, - grpc_enable_gzip: bool, -) -> NodeServiceClient> { - let timeout_channel = Timeout::new(channel, time_out); - let client = NodeServiceClient::>::new(timeout_channel); - let client = NodeServiceClient::max_decoding_message_size(client, max_message_size); - if grpc_enable_gzip { - NodeServiceClient::max_encoding_message_size(client, max_message_size) - .accept_compressed(CompressionEncoding::Gzip) - .send_compressed(CompressionEncoding::Gzip) - } else { - NodeServiceClient::max_encoding_message_size(client, max_message_size) - } +pub async fn node_service_time_out_client( + addr: &String, +) -> Result< + NodeServiceClient< + InterceptedService) -> Result, Status> + Send + Sync + 'static>>, + >, + Box, +> { + let token: MetadataValue<_> = "rustfs rpc".parse()?; + let channel = match GLOBAL_Conn_Map.read().await.get(addr) { + Some(channel) => channel.clone(), + None => { + let connector = Endpoint::from_shared(addr.to_string())?.connect_timeout(Duration::from_secs(60)); + let channel = connector.connect().await?; + channel + } + }; + GLOBAL_Conn_Map.write().await.insert(addr.to_string(), channel.clone()); + + // let timeout_channel = Timeout::new(channel, Duration::from_secs(60)); + Ok(NodeServiceClient::with_interceptor( + channel, + Box::new(move |mut req: Request<()>| { + req.metadata_mut().insert("authorization", token.clone()); + Ok(req) + }), + )) } diff --git a/e2e_test/Cargo.toml b/e2e_test/Cargo.toml index 114ea8c1..c9818004 100644 --- a/e2e_test/Cargo.toml +++ b/e2e_test/Cargo.toml @@ -11,9 +11,11 @@ rust-version.workspace = true [dependencies] ecstore.workspace = true flatbuffers.workspace = true +lazy_static.workspace = true lock.workspace = true protos.workspace = true serde_json.workspace = true tonic = { version = "0.12.1", features = ["gzip"] } tokio = { workspace = true } +tower.workspace = true url.workspace = true \ No newline at end of file diff --git a/e2e_test/src/reliant/lock.rs b/e2e_test/src/reliant/lock.rs index f88830f9..d14a3784 100644 --- a/e2e_test/src/reliant/lock.rs +++ b/e2e_test/src/reliant/lock.rs @@ -1,7 +1,8 @@ #![cfg(test)] -use std::{error::Error, sync::Arc, time::Duration, vec}; +use std::{collections::HashMap, error::Error, sync::Arc, time::Duration, vec}; +use lazy_static::lazy_static; use lock::{ drwmutex::Options, lock_args::LockArgs, @@ -10,11 +11,44 @@ use lock::{ }; use protos::proto_gen::node_service::{node_service_client::NodeServiceClient, GenerallyLockRequest}; use tokio::sync::RwLock; -use tonic::Request; +use tonic::{ + metadata::MetadataValue, + service::interceptor::InterceptedService, + transport::{Channel, Endpoint}, + Request, Status, +}; -async fn get_client() -> Result, Box> { - // Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?) - Ok(NodeServiceClient::connect("http://localhost:9000").await?) +lazy_static! { + pub static ref GLOBAL_Conn_Map: RwLock> = RwLock::new(HashMap::new()); +} + +async fn get_client() -> Result< + NodeServiceClient< + InterceptedService) -> Result, Status> + Send + Sync + 'static>>, + >, + Box, +> { + let token: MetadataValue<_> = "rustfs rpc".parse()?; + let channel = match GLOBAL_Conn_Map.read().await.get("local") { + Some(channel) => channel.clone(), + None => { + println!("get channel start"); + let connector = Endpoint::from_static("http://localhost:9000").connect_timeout(Duration::from_secs(60)); + let channel = connector.connect().await?; + // let channel = Channel::from_static("http://localhost:9000").connect().await?; + channel + } + }; + GLOBAL_Conn_Map.write().await.insert("local".to_owned(), channel.clone()); + + // let timeout_channel = Timeout::new(channel, Duration::from_secs(60)); + Ok(NodeServiceClient::with_interceptor( + channel, + Box::new(move |mut req: Request<()>| { + req.metadata_mut().insert("authorization", token.clone()); + Ok(req) + }), + )) } #[tokio::test] @@ -29,12 +63,22 @@ async fn test_lock_unlock_rpc() -> Result<(), Box> { let args = serde_json::to_string(&args)?; let mut client = get_client().await?; - let request = Request::new(GenerallyLockRequest { args }); + println!("got client"); + let request = Request::new(GenerallyLockRequest { args: args.clone() }); + println!("start request"); let response = client.lock(request).await?.into_inner(); + println!("request ended"); if let Some(error_info) = response.error_info { assert!(false, "can not get lock: {}", error_info); } + + let request = Request::new(GenerallyLockRequest { args }); + let response = client.un_lock(request).await?.into_inner(); + if let Some(error_info) = response.error_info { + assert!(false, "can not get un_lock: {}", error_info); + } + Ok(()) } diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index 78f07805..0cd267a0 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -32,7 +32,7 @@ use tokio::{ sync::mpsc::{self, Sender}, }; use tokio_stream::wrappers::ReceiverStream; -use tonic::{transport::Channel, Streaming}; +use tonic::{service::interceptor::InterceptedService, transport::Channel, Request, Status, Streaming}; use tracing::info; use uuid::Uuid; @@ -365,7 +365,9 @@ impl RemoteFileWriter { volume: String, path: String, is_append: bool, - mut client: NodeServiceClient, + mut client: NodeServiceClient< + InterceptedService) -> Result, Status> + Send + Sync + 'static>>, + >, ) -> Result { let (tx, rx) = mpsc::channel(128); let in_stream = ReceiverStream::new(rx); @@ -480,7 +482,14 @@ pub struct RemoteFileReader { } impl RemoteFileReader { - pub async fn new(root: PathBuf, volume: String, path: String, mut client: NodeServiceClient) -> Result { + pub async fn new( + root: PathBuf, + volume: String, + path: String, + mut client: NodeServiceClient< + InterceptedService) -> Result, Status> + Send + Sync + 'static>>, + >, + ) -> Result { let (tx, rx) = mpsc::channel(128); let in_stream = ReceiverStream::new(rx); diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs index 8516e31f..233e724f 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/disk/remote.rs @@ -1,4 +1,4 @@ -use std::{path::PathBuf, sync::Arc, time::Duration}; +use std::path::PathBuf; use bytes::Bytes; use common::error::{Error, Result}; @@ -6,19 +6,12 @@ use futures::lock::Mutex; use protos::{ node_service_time_out_client, proto_gen::node_service::{ - node_service_client::NodeServiceClient, DeleteRequest, DeleteVersionsRequest, DeleteVolumeRequest, ListDirRequest, - ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest, ReadVersionRequest, - ReadXlRequest, RenameDataRequest, RenameFileRequst, StatVolumeRequest, WalkDirRequest, WriteAllRequest, - WriteMetadataRequest, + DeleteRequest, DeleteVersionsRequest, DeleteVolumeRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, + MakeVolumesRequest, ReadAllRequest, ReadMultipleRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, + RenameFileRequst, StatVolumeRequest, WalkDirRequest, WriteAllRequest, WriteMetadataRequest, }, - DEFAULT_GRPC_SERVER_MESSAGE_LEN, }; -use tokio::sync::RwLock; -use tonic::{ - transport::{Channel, Endpoint as tonic_Endpoint}, - Request, -}; -use tower::timeout::Timeout; +use tonic::Request; use tracing::info; use uuid::Uuid; @@ -36,7 +29,7 @@ use super::{ #[derive(Debug)] pub struct RemoteDisk { pub id: Mutex>, - pub channel: Arc>>, + pub addr: String, pub url: url::Url, pub root: PathBuf, } @@ -45,52 +38,14 @@ impl RemoteDisk { pub async fn new(ep: &Endpoint, _opt: &DiskOption) -> Result { // let root = fs::canonicalize(ep.url.path()).await?; let root = PathBuf::from(ep.url.path()); - + let addr = format!("{}://{}:{}", ep.url.scheme(), ep.url.host_str().unwrap(), ep.url.port().unwrap()); Ok(Self { - channel: Arc::new(RwLock::new(None)), + id: Mutex::new(None), + addr, url: ep.url.clone(), root, - id: Mutex::new(None), }) } - - #[allow(dead_code)] - async fn get_client(&self) -> Result>> { - let channel_clone = self.channel.clone(); - let channel = { - let read_lock = channel_clone.read().await; - - if let Some(ref channel) = *read_lock { - channel.clone() - } else { - let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap()); - info!("disk url: {}", addr); - let connector = tonic_Endpoint::from_shared(addr.clone())?; - - let new_channel = connector.connect().await.map_err(|_err| DiskError::DiskNotFound)?; - - info!("get channel success"); - - *self.channel.write().await = Some(new_channel.clone()); - - new_channel - } - }; - - Ok(node_service_time_out_client( - channel, - Duration::new(30, 0), // TODO: use config setting - DEFAULT_GRPC_SERVER_MESSAGE_LEN, - // grpc_enable_gzip, - false, // TODO: use config setting - )) - } - - async fn get_client_v2(&self) -> Result> { - // Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?) - let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap()); - Ok(NodeServiceClient::connect(addr).await?) - } } // TODO: all api need to handle errors @@ -118,7 +73,9 @@ impl DiskAPI for RemoteDisk { async fn read_all(&self, volume: &str, path: &str) -> Result { info!("read_all"); - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(ReadAllRequest { disk: self.root.to_string_lossy().to_string(), volume: volume.to_string(), @@ -138,7 +95,9 @@ impl DiskAPI for RemoteDisk { async fn write_all(&self, volume: &str, path: &str, data: Vec) -> Result<()> { info!("write_all"); - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(WriteAllRequest { disk: self.root.to_string_lossy().to_string(), volume: volume.to_string(), @@ -158,7 +117,9 @@ impl DiskAPI for RemoteDisk { async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> { info!("delete"); let options = serde_json::to_string(&opt)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(DeleteRequest { disk: self.root.to_string_lossy().to_string(), volume: volume.to_string(), @@ -177,7 +138,9 @@ impl DiskAPI for RemoteDisk { async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> { info!("rename_file"); - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(RenameFileRequst { disk: self.root.to_string_lossy().to_string(), src_volume: src_volume.to_string(), @@ -203,7 +166,9 @@ impl DiskAPI for RemoteDisk { volume.to_string(), path.to_string(), false, - self.get_client_v2().await?, + node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?, ) .await?, )) @@ -212,21 +177,39 @@ impl DiskAPI for RemoteDisk { async fn append_file(&self, volume: &str, path: &str) -> Result { info!("append_file"); Ok(FileWriter::Remote( - RemoteFileWriter::new(self.root.clone(), volume.to_string(), path.to_string(), true, self.get_client_v2().await?) - .await?, + RemoteFileWriter::new( + self.root.clone(), + volume.to_string(), + path.to_string(), + true, + node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?, + ) + .await?, )) } async fn read_file(&self, volume: &str, path: &str) -> Result { info!("read_file"); Ok(FileReader::Remote( - RemoteFileReader::new(self.root.clone(), volume.to_string(), path.to_string(), self.get_client_v2().await?).await?, + RemoteFileReader::new( + self.root.clone(), + volume.to_string(), + path.to_string(), + node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?, + ) + .await?, )) } async fn list_dir(&self, _origvolume: &str, volume: &str, _dir_path: &str, _count: i32) -> Result> { info!("list_dir"); - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(ListDirRequest { disk: self.root.to_string_lossy().to_string(), volume: volume.to_string(), @@ -244,7 +227,9 @@ impl DiskAPI for RemoteDisk { async fn walk_dir(&self, opts: WalkDirOptions) -> Result> { info!("walk_dir"); let walk_dir_options = serde_json::to_string(&opts)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(WalkDirRequest { disk: self.root.to_string_lossy().to_string(), walk_dir_options, @@ -275,7 +260,9 @@ impl DiskAPI for RemoteDisk { ) -> Result { info!("rename_data"); let file_info = serde_json::to_string(&fi)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(RenameDataRequest { disk: self.root.to_string_lossy().to_string(), src_volume: src_volume.to_string(), @@ -298,7 +285,9 @@ impl DiskAPI for RemoteDisk { async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> { info!("make_volumes"); - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(MakeVolumesRequest { disk: self.root.to_string_lossy().to_string(), volumes: volumes.iter().map(|s| (*s).to_string()).collect(), @@ -315,7 +304,9 @@ impl DiskAPI for RemoteDisk { async fn make_volume(&self, volume: &str) -> Result<()> { info!("make_volume"); - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(MakeVolumeRequest { disk: self.root.to_string_lossy().to_string(), volume: volume.to_string(), @@ -332,7 +323,9 @@ impl DiskAPI for RemoteDisk { async fn list_volumes(&self) -> Result> { info!("list_volumes"); - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(ListVolumesRequest { disk: self.root.to_string_lossy().to_string(), }); @@ -354,7 +347,9 @@ impl DiskAPI for RemoteDisk { async fn stat_volume(&self, volume: &str) -> Result { info!("stat_volume"); - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(StatVolumeRequest { disk: self.root.to_string_lossy().to_string(), volume: volume.to_string(), @@ -374,7 +369,9 @@ impl DiskAPI for RemoteDisk { async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> { info!("write_metadata"); let file_info = serde_json::to_string(&fi)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(WriteMetadataRequest { disk: self.root.to_string_lossy().to_string(), volume: volume.to_string(), @@ -401,7 +398,9 @@ impl DiskAPI for RemoteDisk { ) -> Result { info!("read_version"); let opts = serde_json::to_string(opts)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(ReadVersionRequest { disk: self.root.to_string_lossy().to_string(), volume: volume.to_string(), @@ -423,7 +422,9 @@ impl DiskAPI for RemoteDisk { async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result { info!("read_xl"); - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(ReadXlRequest { disk: self.root.to_string_lossy().to_string(), volume: volume.to_string(), @@ -454,7 +455,9 @@ impl DiskAPI for RemoteDisk { for file_info_versions in versions.iter() { versions_str.push(serde_json::to_string(file_info_versions)?); } - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(DeleteVersionsRequest { disk: self.root.to_string_lossy().to_string(), volume: volume.to_string(), @@ -487,7 +490,9 @@ impl DiskAPI for RemoteDisk { async fn read_multiple(&self, req: ReadMultipleReq) -> Result> { info!("read_multiple"); let read_multiple_req = serde_json::to_string(&req)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(ReadMultipleRequest { disk: self.root.to_string_lossy().to_string(), read_multiple_req, @@ -510,7 +515,9 @@ impl DiskAPI for RemoteDisk { async fn delete_volume(&self, volume: &str) -> Result<()> { info!("delete_volume"); - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(DeleteVolumeRequest { disk: self.root.to_string_lossy().to_string(), volume: volume.to_string(), diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index 644cc27d..4bff296e 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -1,16 +1,12 @@ use async_trait::async_trait; use common::error::{Error, Result}; use futures::future::join_all; -use protos::proto_gen::node_service::node_service_client::NodeServiceClient; +use protos::node_service_time_out_client; use protos::proto_gen::node_service::{DeleteBucketRequest, GetBucketInfoRequest, ListBucketRequest, MakeBucketRequest}; -use protos::{node_service_time_out_client, DEFAULT_GRPC_SERVER_MESSAGE_LEN}; use regex::Regex; -use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration}; -use tokio::sync::RwLock; -use tonic::transport::{Channel, Endpoint}; +use std::{collections::HashMap, fmt::Debug, sync::Arc}; use tonic::Request; -use tower::timeout::Timeout; -use tracing::{info, warn}; +use tracing::warn; use crate::store::all_local_disk; use crate::{ @@ -391,55 +387,13 @@ impl PeerS3Client for LocalPeerS3Client { pub struct RemotePeerS3Client { pub node: Option, pub pools: Option>, - connector: Endpoint, - channel: Arc>>, + addr: String, } impl RemotePeerS3Client { fn new(node: Option, pools: Option>) -> Self { - let connector = - Endpoint::from_shared(format!("{}", node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default())).unwrap(); - Self { - node, - pools, - connector, - channel: Arc::new(RwLock::new(None)), - } - } - - #[allow(dead_code)] - async fn get_client(&self) -> Result>> { - let channel_clone = self.channel.clone(); - let channel = { - let read_lock = channel_clone.read().await; - - if let Some(ref channel) = *read_lock { - channel.clone() - } else { - let new_channel = self.connector.connect().await?; - - info!("get channel success"); - - *self.channel.write().await = Some(new_channel.clone()); - - new_channel - } - }; - - Ok(node_service_time_out_client( - channel, - Duration::new(30, 0), // TODO: use config setting - DEFAULT_GRPC_SERVER_MESSAGE_LEN, - // grpc_enable_gzip, - false, // TODO: use config setting - )) - } - - async fn get_client_v2(&self) -> Result> { - // Ok(NodeServiceClient::connect("http://220.181.1.138:9000").await?) - // let addr = format!("{}://{}:{}", self.url.scheme(), self.url.host_str().unwrap(), self.url.port().unwrap()); - let addr = format!("{}", self.node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default()); - Ok(NodeServiceClient::connect(addr).await?) + let addr = format!("{}", node.as_ref().map(|v| { v.url.to_string() }).unwrap_or_default()); + Self { node, pools, addr } } } @@ -450,7 +404,9 @@ impl PeerS3Client for RemotePeerS3Client { } async fn list_bucket(&self, opts: &BucketOptions) -> Result> { let options = serde_json::to_string(opts)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(ListBucketRequest { options }); let response = client.list_bucket(request).await?.into_inner(); let bucket_infos = response @@ -463,7 +419,9 @@ impl PeerS3Client for RemotePeerS3Client { } async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { let options = serde_json::to_string(opts)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(MakeBucketRequest { name: bucket.to_string(), options, @@ -479,7 +437,9 @@ impl PeerS3Client for RemotePeerS3Client { } async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result { let options = serde_json::to_string(opts)?; - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(GetBucketInfoRequest { bucket: bucket.to_string(), options, @@ -491,7 +451,9 @@ impl PeerS3Client for RemotePeerS3Client { } async fn delete_bucket(&self, bucket: &str) -> Result<()> { - let mut client = self.get_client_v2().await?; + let mut client = node_service_time_out_client(&self.addr) + .await + .map_err(|err| Error::from_string(format!("can not get client, err: {}", err.to_string())))?; let request = Request::new(DeleteBucketRequest { bucket: bucket.to_string(), }); diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index 3b3d8c7d..b826c56b 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -17,16 +17,15 @@ use tracing::{debug, error, info}; use protos::{ models::{PingBody, PingBodyBuilder}, proto_gen::node_service::{ - node_service_server::{NodeService as Node, NodeServiceServer as NodeServer}, - DeleteBucketRequest, DeleteBucketResponse, DeleteRequest, DeleteResponse, DeleteVersionsRequest, DeleteVersionsResponse, - DeleteVolumeRequest, DeleteVolumeResponse, GenerallyLockRequest, GenerallyLockResponse, GetBucketInfoRequest, - GetBucketInfoResponse, ListBucketRequest, ListBucketResponse, ListDirRequest, ListDirResponse, ListVolumesRequest, - ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, MakeVolumeRequest, MakeVolumeResponse, MakeVolumesRequest, - MakeVolumesResponse, PingRequest, PingResponse, ReadAllRequest, ReadAllResponse, ReadAtRequest, ReadAtResponse, - ReadMultipleRequest, ReadMultipleResponse, ReadVersionRequest, ReadVersionResponse, ReadXlRequest, ReadXlResponse, - RenameDataRequest, RenameDataResponse, RenameFileRequst, RenameFileResponse, StatVolumeRequest, StatVolumeResponse, - WalkDirRequest, WalkDirResponse, WriteAllRequest, WriteAllResponse, WriteMetadataRequest, WriteMetadataResponse, - WriteRequest, WriteResponse, + node_service_server::NodeService as Node, DeleteBucketRequest, DeleteBucketResponse, DeleteRequest, DeleteResponse, + DeleteVersionsRequest, DeleteVersionsResponse, DeleteVolumeRequest, DeleteVolumeResponse, GenerallyLockRequest, + GenerallyLockResponse, GetBucketInfoRequest, GetBucketInfoResponse, ListBucketRequest, ListBucketResponse, + ListDirRequest, ListDirResponse, ListVolumesRequest, ListVolumesResponse, MakeBucketRequest, MakeBucketResponse, + MakeVolumeRequest, MakeVolumeResponse, MakeVolumesRequest, MakeVolumesResponse, PingRequest, PingResponse, + ReadAllRequest, ReadAllResponse, ReadAtRequest, ReadAtResponse, ReadMultipleRequest, ReadMultipleResponse, + ReadVersionRequest, ReadVersionResponse, ReadXlRequest, ReadXlResponse, RenameDataRequest, RenameDataResponse, + RenameFileRequst, RenameFileResponse, StatVolumeRequest, StatVolumeResponse, WalkDirRequest, WalkDirResponse, + WriteAllRequest, WriteAllResponse, WriteMetadataRequest, WriteMetadataResponse, WriteRequest, WriteResponse, }, }; @@ -56,13 +55,13 @@ fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> { } #[derive(Debug)] -struct NodeService { +pub struct NodeService { local_peer: LocalPeerS3Client, } -pub fn make_server() -> NodeServer { +pub fn make_server() -> NodeService { let local_peer = LocalPeerS3Client::new(None, None); - NodeServer::new(NodeService { local_peer }) + NodeService { local_peer } } impl NodeService { diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index c4bb74cb..500c5fd7 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -15,10 +15,12 @@ use hyper_util::{ server::conn::auto::Builder as ConnBuilder, service::TowerToHyperService, }; +use protos::proto_gen::node_service::node_service_server::NodeServiceServer; use s3s::{auth::SimpleAuth, service::S3ServiceBuilder}; use service::hybrid; use std::{io::IsTerminal, net::SocketAddr, str::FromStr}; use tokio::net::TcpListener; +use tonic::{metadata::MetadataValue, Request, Status}; use tracing::{debug, info, warn}; use tracing_error::ErrorLayer; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt}; @@ -39,6 +41,15 @@ fn setup_tracing() { subscriber.try_init().expect("failed to set global default subscriber"); } +fn check_auth(req: Request<()>) -> Result, Status> { + let token: MetadataValue<_> = "rustfs rpc".parse().unwrap(); + + match req.metadata().get("authorization") { + Some(t) if token == t => Ok(req), + _ => Err(Status::unauthenticated("No valid auth token")), + } +} + fn main() -> Result<()> { //解析获得到的参数 let opt = config::Opt::parse(); @@ -119,7 +130,7 @@ async fn run(opt: config::Opt) -> Result<()> { b.build() }; - let rpc_service = make_server(); + let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth); tokio::spawn(async move { let hyper_service = service.into_shared();