diff --git a/crates/e2e_test/src/reliant/lock.rs b/crates/e2e_test/src/reliant/lock.rs index 126ab7b0..69227cc3 100644 --- a/crates/e2e_test/src/reliant/lock.rs +++ b/crates/e2e_test/src/reliant/lock.rs @@ -15,11 +15,12 @@ use async_trait::async_trait; use rustfs_ecstore::disk::endpoint::Endpoint; -use rustfs_lock::client::{LockClient, local::LocalClient, remote::RemoteClient}; +use rustfs_ecstore::rpc::RemoteClient; +use rustfs_lock::client::{LockClient, local::LocalClient}; use rustfs_lock::types::{LockInfo, LockResponse, LockStats}; use rustfs_lock::{LockId, LockMetadata, LockPriority, LockType}; use rustfs_lock::{LockRequest, NamespaceLock, NamespaceLockManager}; -use rustfs_protos::{node_service_time_out_client, proto_gen::node_service::GenerallyLockRequest}; +use rustfs_protos::proto_gen::node_service::GenerallyLockRequest; use serial_test::serial; use std::{collections::HashMap, error::Error, sync::Arc, time::Duration}; use tokio::time::sleep; @@ -156,7 +157,7 @@ async fn test_lock_unlock_rpc() -> Result<(), Box> { }; let args = serde_json::to_string(&args)?; - let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?; + let mut client = RemoteClient::new(CLUSTER_ADDR.to_string()).get_client().await?; println!("got client"); let request = Request::new(GenerallyLockRequest { args: args.clone() }); @@ -614,7 +615,7 @@ async fn test_rpc_read_lock() -> Result<(), Box> { }; let args_str = serde_json::to_string(&args)?; - let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?; + let mut client = RemoteClient::new(CLUSTER_ADDR.to_string()).get_client().await?; // First read lock let request = Request::new(GenerallyLockRequest { args: args_str.clone() }); @@ -669,7 +670,7 @@ async fn test_lock_refresh() -> Result<(), Box> { }; let args_str = serde_json::to_string(&args)?; - let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?; + let mut client = RemoteClient::new(CLUSTER_ADDR.to_string()).get_client().await?; // Acquire lock let request = Request::new(GenerallyLockRequest { args: args_str.clone() }); @@ -713,7 +714,7 @@ async fn test_force_unlock() -> Result<(), Box> { }; let args_str = serde_json::to_string(&args)?; - let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?; + let mut client = RemoteClient::new(CLUSTER_ADDR.to_string()).get_client().await?; // Acquire lock let request = Request::new(GenerallyLockRequest { args: args_str.clone() }); diff --git a/crates/e2e_test/src/reliant/node_interact_test.rs b/crates/e2e_test/src/reliant/node_interact_test.rs index d0000885..a25f1db6 100644 --- a/crates/e2e_test/src/reliant/node_interact_test.rs +++ b/crates/e2e_test/src/reliant/node_interact_test.rs @@ -17,11 +17,11 @@ use crate::common::workspace_root; use futures::future::join_all; use rmp_serde::{Deserializer, Serializer}; use rustfs_ecstore::disk::{VolumeInfo, WalkDirOptions}; +use rustfs_ecstore::rpc::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client}; use rustfs_filemeta::{MetaCacheEntry, MetacacheReader, MetacacheWriter}; use rustfs_protos::proto_gen::node_service::WalkDirRequest; use rustfs_protos::{ models::{PingBody, PingBodyBuilder}, - node_service_time_out_client, proto_gen::node_service::{ ListVolumesRequest, LocalStorageInfoRequest, MakeVolumeRequest, PingRequest, PingResponse, ReadAllRequest, }, @@ -53,7 +53,9 @@ async fn ping() -> Result<(), Box> { assert!(decoded_payload.is_ok()); // Create client - let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?; + let mut client = + node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor())) + .await?; // Construct PingRequest let request = Request::new(PingRequest { @@ -78,7 +80,9 @@ async fn ping() -> Result<(), Box> { #[tokio::test] #[ignore = "requires running RustFS server at localhost:9000"] async fn make_volume() -> Result<(), Box> { - let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?; + let mut client = + node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor())) + .await?; let request = Request::new(MakeVolumeRequest { disk: "data".to_string(), volume: "dandan".to_string(), @@ -96,7 +100,9 @@ async fn make_volume() -> Result<(), Box> { #[tokio::test] #[ignore = "requires running RustFS server at localhost:9000"] async fn list_volumes() -> Result<(), Box> { - let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?; + let mut client = + node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor())) + .await?; let request = Request::new(ListVolumesRequest { disk: "data".to_string(), }); @@ -126,7 +132,9 @@ async fn walk_dir() -> Result<(), Box> { let (rd, mut wr) = tokio::io::duplex(1024); let mut buf = Vec::new(); opts.serialize(&mut Serializer::new(&mut buf))?; - let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?; + let mut client = + node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor())) + .await?; let disk_path = std::env::var_os("RUSTFS_DISK_PATH").map(PathBuf::from).unwrap_or_else(|| { let mut path = workspace_root(); path.push("target"); @@ -179,7 +187,9 @@ async fn walk_dir() -> Result<(), Box> { #[tokio::test] #[ignore = "requires running RustFS server at localhost:9000"] async fn read_all() -> Result<(), Box> { - let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?; + let mut client = + node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor())) + .await?; let request = Request::new(ReadAllRequest { disk: "data".to_string(), volume: "ff".to_string(), @@ -197,7 +207,9 @@ async fn read_all() -> Result<(), Box> { #[tokio::test] #[ignore = "requires running RustFS server at localhost:9000"] async fn storage_info() -> Result<(), Box> { - let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?; + let mut client = + node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor())) + .await?; let request = Request::new(LocalStorageInfoRequest { metrics: true }); let response = client.local_storage_info(request).await?.into_inner(); diff --git a/crates/ecstore/src/admin_server_info.rs b/crates/ecstore/src/admin_server_info.rs index 324ec388..13187790 100644 --- a/crates/ecstore/src/admin_server_info.rs +++ b/crates/ecstore/src/admin_server_info.rs @@ -14,6 +14,7 @@ use crate::data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT, load_data_usage_from_backend}; use crate::error::{Error, Result}; +use crate::rpc::node_service_time_out_client_no_auth; use crate::{ disk::endpoint::Endpoint, global::{GLOBAL_BOOT_TIME, GLOBAL_Endpoints}, @@ -29,7 +30,6 @@ use rustfs_madmin::{ }; use rustfs_protos::{ models::{PingBody, PingBodyBuilder}, - node_service_time_out_client, proto_gen::node_service::{PingRequest, PingResponse}, }; use std::{ @@ -101,7 +101,7 @@ async fn is_server_resolvable(endpoint: &Endpoint) -> Result<()> { let decoded_payload = flatbuffers::root::(finished_data); assert!(decoded_payload.is_ok()); - let mut client = node_service_time_out_client(&addr) + let mut client = node_service_time_out_client_no_auth(&addr) .await .map_err(|err| Error::other(err.to_string()))?; diff --git a/crates/ecstore/src/rpc/client.rs b/crates/ecstore/src/rpc/client.rs new file mode 100644 index 00000000..fe966d86 --- /dev/null +++ b/crates/ecstore/src/rpc/client.rs @@ -0,0 +1,88 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::error::Error; + +use http::Method; +use rustfs_common::GLOBAL_CONN_MAP; +use rustfs_protos::{create_new_channel, proto_gen::node_service::node_service_client::NodeServiceClient}; +use tonic::{service::interceptor::InterceptedService, transport::Channel}; +use tracing::debug; + +use crate::rpc::{TONIC_RPC_PREFIX, gen_signature_headers}; + +/// 3. Subsequent calls will attempt fresh connections +/// 4. If node is still down, connection will fail fast (3s timeout) +pub async fn node_service_time_out_client( + addr: &String, + interceptor: TonicInterceptor, +) -> Result>, Box> { + // Try to get cached channel + let cached_channel = { GLOBAL_CONN_MAP.read().await.get(addr).cloned() }; + + let channel = match cached_channel { + Some(channel) => { + debug!("Using cached gRPC channel for: {}", addr); + channel + } + None => { + // No cached connection, create new one + create_new_channel(addr).await? + } + }; + + Ok(NodeServiceClient::with_interceptor(channel, interceptor)) +} + +pub async fn node_service_time_out_client_no_auth( + addr: &String, +) -> Result>, Box> { + node_service_time_out_client(addr, TonicInterceptor::NoOp(NoOpInterceptor)).await +} + +pub struct TonicSignatureInterceptor; + +impl tonic::service::Interceptor for TonicSignatureInterceptor { + fn call(&mut self, mut req: tonic::Request<()>) -> Result, tonic::Status> { + let headers = gen_signature_headers(TONIC_RPC_PREFIX, &Method::GET); + req.metadata_mut().as_mut().extend(headers); + Ok(req) + } +} + +pub fn gen_tonic_signature_interceptor() -> TonicSignatureInterceptor { + TonicSignatureInterceptor +} + +pub struct NoOpInterceptor; + +impl tonic::service::Interceptor for NoOpInterceptor { + fn call(&mut self, req: tonic::Request<()>) -> Result, tonic::Status> { + Ok(req) + } +} + +pub enum TonicInterceptor { + Signature(TonicSignatureInterceptor), + NoOp(NoOpInterceptor), +} + +impl tonic::service::Interceptor for TonicInterceptor { + fn call(&mut self, req: tonic::Request<()>) -> Result, tonic::Status> { + match self { + TonicInterceptor::Signature(interceptor) => interceptor.call(req), + TonicInterceptor::NoOp(interceptor) => interceptor.call(req), + } + } +} diff --git a/crates/ecstore/src/rpc/http_auth.rs b/crates/ecstore/src/rpc/http_auth.rs index b283b3d1..525ecda2 100644 --- a/crates/ecstore/src/rpc/http_auth.rs +++ b/crates/ecstore/src/rpc/http_auth.rs @@ -29,6 +29,7 @@ type HmacSha256 = Hmac; const SIGNATURE_HEADER: &str = "x-rustfs-signature"; const TIMESTAMP_HEADER: &str = "x-rustfs-timestamp"; const SIGNATURE_VALID_DURATION: i64 = 300; // 5 minutes +pub const TONIC_RPC_PREFIX: &str = "/node_service.NodeService"; /// Get the shared secret for HMAC signing fn get_shared_secret() -> String { @@ -57,13 +58,25 @@ fn generate_signature(secret: &str, url: &str, method: &Method, timestamp: i64) /// Build headers with authentication signature pub fn build_auth_headers(url: &str, method: &Method, headers: &mut HeaderMap) { + let auth_headers = gen_signature_headers(url, method); + + headers.extend(auth_headers); +} + +pub fn gen_signature_headers(url: &str, method: &Method) -> HeaderMap { let secret = get_shared_secret(); let timestamp = OffsetDateTime::now_utc().unix_timestamp(); let signature = generate_signature(&secret, url, method, timestamp); - headers.insert(SIGNATURE_HEADER, HeaderValue::from_str(&signature).unwrap()); - headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str(×tamp.to_string()).unwrap()); + let mut headers = HeaderMap::new(); + headers.insert(SIGNATURE_HEADER, HeaderValue::from_str(&signature).expect("Invalid header value")); + headers.insert( + TIMESTAMP_HEADER, + HeaderValue::from_str(×tamp.to_string()).expect("Invalid header value"), + ); + + headers } /// Verify the request signature for RPC requests diff --git a/crates/ecstore/src/rpc/mod.rs b/crates/ecstore/src/rpc/mod.rs index 4d140209..a5993553 100644 --- a/crates/ecstore/src/rpc/mod.rs +++ b/crates/ecstore/src/rpc/mod.rs @@ -12,12 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod client; mod http_auth; mod peer_rest_client; mod peer_s3_client; mod remote_disk; +mod remote_locker; -pub use http_auth::{build_auth_headers, verify_rpc_signature}; +pub use client::{ + TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client, node_service_time_out_client_no_auth, +}; +pub use http_auth::{TONIC_RPC_PREFIX, build_auth_headers, gen_signature_headers, verify_rpc_signature}; pub use peer_rest_client::PeerRestClient; pub use peer_s3_client::{LocalPeerS3Client, PeerS3Client, RemotePeerS3Client, S3PeerSys}; pub use remote_disk::RemoteDisk; +pub use remote_locker::RemoteClient; diff --git a/crates/ecstore/src/rpc/peer_rest_client.rs b/crates/ecstore/src/rpc/peer_rest_client.rs index 178b6a14..fc6d5374 100644 --- a/crates/ecstore/src/rpc/peer_rest_client.rs +++ b/crates/ecstore/src/rpc/peer_rest_client.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::error::{Error, Result}; +use crate::rpc::client::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client}; use crate::{ endpoints::EndpointServerPools, global::is_dist_erasure, @@ -25,21 +26,22 @@ use rustfs_madmin::{ metrics::RealtimeMetrics, net::NetInfo, }; -use rustfs_protos::{ - evict_failed_connection, node_service_time_out_client, - proto_gen::node_service::{ - DeleteBucketMetadataRequest, DeletePolicyRequest, DeleteServiceAccountRequest, DeleteUserRequest, GetCpusRequest, - GetMemInfoRequest, GetMetricsRequest, GetNetInfoRequest, GetOsInfoRequest, GetPartitionsRequest, GetProcInfoRequest, - GetSeLinuxInfoRequest, GetSysConfigRequest, GetSysErrorsRequest, LoadBucketMetadataRequest, LoadGroupRequest, - LoadPolicyMappingRequest, LoadPolicyRequest, LoadRebalanceMetaRequest, LoadServiceAccountRequest, - LoadTransitionTierConfigRequest, LoadUserRequest, LocalStorageInfoRequest, Mss, ReloadPoolMetaRequest, - ReloadSiteReplicationConfigRequest, ServerInfoRequest, SignalServiceRequest, StartProfilingRequest, StopRebalanceRequest, - }, +use rustfs_protos::evict_failed_connection; +use rustfs_protos::proto_gen::node_service::node_service_client::NodeServiceClient; +use rustfs_protos::proto_gen::node_service::{ + DeleteBucketMetadataRequest, DeletePolicyRequest, DeleteServiceAccountRequest, DeleteUserRequest, GetCpusRequest, + GetMemInfoRequest, GetMetricsRequest, GetNetInfoRequest, GetOsInfoRequest, GetPartitionsRequest, GetProcInfoRequest, + GetSeLinuxInfoRequest, GetSysConfigRequest, GetSysErrorsRequest, LoadBucketMetadataRequest, LoadGroupRequest, + LoadPolicyMappingRequest, LoadPolicyRequest, LoadRebalanceMetaRequest, LoadServiceAccountRequest, + LoadTransitionTierConfigRequest, LoadUserRequest, LocalStorageInfoRequest, Mss, ReloadPoolMetaRequest, + ReloadSiteReplicationConfigRequest, ServerInfoRequest, SignalServiceRequest, StartProfilingRequest, StopRebalanceRequest, }; use rustfs_utils::XHost; use serde::{Deserialize, Serialize as _}; use std::{collections::HashMap, io::Cursor, time::SystemTime}; use tonic::Request; +use tonic::service::interceptor::InterceptedService; +use tonic::transport::Channel; use tracing::warn; pub const PEER_RESTSIGNAL: &str = "signal"; @@ -83,6 +85,12 @@ impl PeerRestClient { (remote, all) } + pub async fn get_client(&self) -> Result>> { + node_service_time_out_client(&self.grid_host, TonicInterceptor::Signature(gen_tonic_signature_interceptor())) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}"))) + } + /// Evict the connection to this peer from the global cache. /// This should be called when communication with this peer fails. pub async fn evict_connection(&self) { @@ -101,9 +109,7 @@ impl PeerRestClient { } async fn local_storage_info_inner(&self) -> Result { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(LocalStorageInfoRequest { metrics: true }); let response = client.local_storage_info(request).await?.into_inner(); @@ -131,9 +137,7 @@ impl PeerRestClient { } async fn server_info_inner(&self) -> Result { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(ServerInfoRequest { metrics: true }); let response = client.server_info(request).await?.into_inner(); @@ -152,9 +156,7 @@ impl PeerRestClient { } pub async fn get_cpus(&self) -> Result { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(GetCpusRequest {}); let response = client.get_cpus(request).await?.into_inner(); @@ -173,9 +175,7 @@ impl PeerRestClient { } pub async fn get_net_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(GetNetInfoRequest {}); let response = client.get_net_info(request).await?.into_inner(); @@ -194,9 +194,7 @@ impl PeerRestClient { } pub async fn get_partitions(&self) -> Result { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(GetPartitionsRequest {}); let response = client.get_partitions(request).await?.into_inner(); @@ -215,9 +213,7 @@ impl PeerRestClient { } pub async fn get_os_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(GetOsInfoRequest {}); let response = client.get_os_info(request).await?.into_inner(); @@ -236,9 +232,7 @@ impl PeerRestClient { } pub async fn get_se_linux_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(GetSeLinuxInfoRequest {}); let response = client.get_se_linux_info(request).await?.into_inner(); @@ -257,9 +251,7 @@ impl PeerRestClient { } pub async fn get_sys_config(&self) -> Result { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(GetSysConfigRequest {}); let response = client.get_sys_config(request).await?.into_inner(); @@ -278,9 +270,7 @@ impl PeerRestClient { } pub async fn get_sys_errors(&self) -> Result { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(GetSysErrorsRequest {}); let response = client.get_sys_errors(request).await?.into_inner(); @@ -299,9 +289,7 @@ impl PeerRestClient { } pub async fn get_mem_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(GetMemInfoRequest {}); let response = client.get_mem_info(request).await?.into_inner(); @@ -320,9 +308,7 @@ impl PeerRestClient { } pub async fn get_metrics(&self, t: MetricType, opts: &CollectMetricsOpts) -> Result { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let mut buf_t = Vec::new(); t.serialize(&mut Serializer::new(&mut buf_t))?; let mut buf_o = Vec::new(); @@ -348,9 +334,7 @@ impl PeerRestClient { } pub async fn get_proc_info(&self) -> Result { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(GetProcInfoRequest {}); let response = client.get_proc_info(request).await?.into_inner(); @@ -369,9 +353,7 @@ impl PeerRestClient { } pub async fn start_profiling(&self, profiler: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(StartProfilingRequest { profiler: profiler.to_string(), }); @@ -403,9 +385,7 @@ impl PeerRestClient { } pub async fn load_bucket_metadata(&self, bucket: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(LoadBucketMetadataRequest { bucket: bucket.to_string(), }); @@ -421,9 +401,7 @@ impl PeerRestClient { } pub async fn delete_bucket_metadata(&self, bucket: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(DeleteBucketMetadataRequest { bucket: bucket.to_string(), }); @@ -439,9 +417,7 @@ impl PeerRestClient { } pub async fn delete_policy(&self, policy: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(DeletePolicyRequest { policy_name: policy.to_string(), }); @@ -457,9 +433,7 @@ impl PeerRestClient { } pub async fn load_policy(&self, policy: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(LoadPolicyRequest { policy_name: policy.to_string(), }); @@ -475,9 +449,7 @@ impl PeerRestClient { } pub async fn load_policy_mapping(&self, user_or_group: &str, user_type: u64, is_group: bool) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(LoadPolicyMappingRequest { user_or_group: user_or_group.to_string(), user_type, @@ -495,9 +467,7 @@ impl PeerRestClient { } pub async fn delete_user(&self, access_key: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(DeleteUserRequest { access_key: access_key.to_string(), }); @@ -517,9 +487,7 @@ impl PeerRestClient { } pub async fn delete_service_account(&self, access_key: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(DeleteServiceAccountRequest { access_key: access_key.to_string(), }); @@ -539,9 +507,7 @@ impl PeerRestClient { } pub async fn load_user(&self, access_key: &str, temp: bool) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(LoadUserRequest { access_key: access_key.to_string(), temp, @@ -562,9 +528,7 @@ impl PeerRestClient { } pub async fn load_service_account(&self, access_key: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(LoadServiceAccountRequest { access_key: access_key.to_string(), }); @@ -584,9 +548,7 @@ impl PeerRestClient { } pub async fn load_group(&self, group: &str) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(LoadGroupRequest { group: group.to_string(), }); @@ -606,9 +568,7 @@ impl PeerRestClient { } pub async fn reload_site_replication_config(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(ReloadSiteReplicationConfigRequest {}); let response = client.reload_site_replication_config(request).await?.into_inner(); @@ -622,9 +582,7 @@ impl PeerRestClient { } pub async fn signal_service(&self, sig: u64, sub_sys: &str, dry_run: bool, _exec_at: SystemTime) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let mut vars = HashMap::new(); vars.insert(PEER_RESTSIGNAL.to_string(), sig.to_string()); vars.insert(PEER_RESTSUB_SYS.to_string(), sub_sys.to_string()); @@ -644,23 +602,17 @@ impl PeerRestClient { } pub async fn get_metacache_listing(&self) -> Result<()> { - let _client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let _client = self.get_client().await?; todo!() } pub async fn update_metacache_listing(&self) -> Result<()> { - let _client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let _client = self.get_client().await?; todo!() } pub async fn reload_pool_meta(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(ReloadPoolMetaRequest {}); let response = client.reload_pool_meta(request).await?.into_inner(); @@ -675,9 +627,7 @@ impl PeerRestClient { } pub async fn stop_rebalance(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(StopRebalanceRequest {}); let response = client.stop_rebalance(request).await?.into_inner(); @@ -692,9 +642,7 @@ impl PeerRestClient { } pub async fn load_rebalance_meta(&self, start_rebalance: bool) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(LoadRebalanceMetaRequest { start_rebalance }); let response = client.load_rebalance_meta(request).await?.into_inner(); @@ -711,9 +659,7 @@ impl PeerRestClient { } pub async fn load_transition_tier_config(&self) -> Result<()> { - let mut client = node_service_time_out_client(&self.grid_host) - .await - .map_err(|err| Error::other(err.to_string()))?; + let mut client = self.get_client().await?; let request = Request::new(LoadTransitionTierConfigRequest {}); let response = client.load_transition_tier_config(request).await?.into_inner(); diff --git a/crates/ecstore/src/rpc/peer_s3_client.rs b/crates/ecstore/src/rpc/peer_s3_client.rs index fcd89f96..90d684eb 100644 --- a/crates/ecstore/src/rpc/peer_s3_client.rs +++ b/crates/ecstore/src/rpc/peer_s3_client.rs @@ -18,6 +18,9 @@ use crate::disk::error::{Error, Result}; use crate::disk::error_reduce::{BUCKET_OP_IGNORED_ERRS, is_all_buckets_not_found, reduce_write_quorum_errs}; use crate::disk::{DiskAPI, DiskStore, disk_store::get_max_timeout_duration}; use crate::global::GLOBAL_LOCAL_DISK_MAP; +use crate::rpc::client::{ + TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client, node_service_time_out_client_no_auth, +}; use crate::store::all_local_disk; use crate::store_utils::is_reserved_or_invalid_bucket; use crate::{ @@ -32,7 +35,7 @@ use async_trait::async_trait; use futures::future::join_all; use rustfs_common::heal_channel::{DriveState, HealItemType, HealOpts, RUSTFS_RESERVED_BUCKET}; use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem}; -use rustfs_protos::node_service_time_out_client; +use rustfs_protos::proto_gen::node_service::node_service_client::NodeServiceClient; use rustfs_protos::proto_gen::node_service::{ DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest, }; @@ -40,6 +43,8 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration}; use tokio::{net::TcpStream, sync::RwLock, time}; use tokio_util::sync::CancellationToken; use tonic::Request; +use tonic::service::interceptor::InterceptedService; +use tonic::transport::Channel; use tracing::{debug, info, warn}; type Client = Arc>; @@ -587,6 +592,12 @@ impl RemotePeerS3Client { client } + pub async fn get_client(&self) -> Result>> { + node_service_time_out_client(&self.addr, TonicInterceptor::Signature(gen_tonic_signature_interceptor())) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}"))) + } + pub fn get_addr(&self) -> String { self.addr.clone() } @@ -730,9 +741,7 @@ impl PeerS3Client for RemotePeerS3Client { self.execute_with_timeout( || async { let options: String = serde_json::to_string(opts)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let mut client = self.get_client().await?; let request = Request::new(HealBucketRequest { bucket: bucket.to_string(), options, @@ -762,9 +771,7 @@ impl PeerS3Client for RemotePeerS3Client { self.execute_with_timeout( || async { let options = serde_json::to_string(opts)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let mut client = self.get_client().await?; let request = Request::new(ListBucketRequest { options }); let response = client.list_bucket(request).await?.into_inner(); if !response.success { @@ -790,9 +797,7 @@ impl PeerS3Client for RemotePeerS3Client { self.execute_with_timeout( || async { let options = serde_json::to_string(opts)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let mut client = self.get_client().await?; let request = Request::new(MakeBucketRequest { name: bucket.to_string(), options, @@ -818,7 +823,7 @@ impl PeerS3Client for RemotePeerS3Client { self.execute_with_timeout( || async { let options = serde_json::to_string(opts)?; - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client_no_auth(&self.addr) .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(GetBucketInfoRequest { @@ -845,9 +850,7 @@ impl PeerS3Client for RemotePeerS3Client { async fn delete_bucket(&self, bucket: &str, _opts: &DeleteBucketOptions) -> Result<()> { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let mut client = self.get_client().await?; let request = Request::new(DeleteBucketRequest { bucket: bucket.to_string(), diff --git a/crates/ecstore/src/rpc/remote_disk.rs b/crates/ecstore/src/rpc/remote_disk.rs index 175ad3bf..78f39f55 100644 --- a/crates/ecstore/src/rpc/remote_disk.rs +++ b/crates/ecstore/src/rpc/remote_disk.rs @@ -21,39 +21,44 @@ use std::{ use bytes::Bytes; use futures::lock::Mutex; use http::{HeaderMap, HeaderValue, Method, header::CONTENT_TYPE}; -use rustfs_protos::{ - node_service_time_out_client, - proto_gen::node_service::{ - CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest, - DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, - ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequest, - StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest, - }, +use rustfs_protos::proto_gen::node_service::{ + CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest, + DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, + ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequest, + StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest, + node_service_client::NodeServiceClient, }; use rustfs_utils::string::parse_bool_with_default; use tokio::time; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; -use crate::disk::{ - CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions, - ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions, - disk_store::{ - CHECK_EVERY, CHECK_TIMEOUT_DURATION, ENV_RUSTFS_DRIVE_ACTIVE_MONITORING, SKIP_IF_SUCCESS_BEFORE, get_max_timeout_duration, - }, - endpoint::Endpoint, -}; -use crate::disk::{FileReader, FileWriter}; use crate::disk::{disk_store::DiskHealthTracker, error::DiskError}; use crate::{ disk::error::{Error, Result}, rpc::build_auth_headers, }; +use crate::{ + disk::{ + CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions, + ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions, + disk_store::{ + CHECK_EVERY, CHECK_TIMEOUT_DURATION, ENV_RUSTFS_DRIVE_ACTIVE_MONITORING, SKIP_IF_SUCCESS_BEFORE, + get_max_timeout_duration, + }, + endpoint::Endpoint, + }, + rpc::client::gen_tonic_signature_interceptor, +}; +use crate::{ + disk::{FileReader, FileWriter}, + rpc::client::{TonicInterceptor, node_service_time_out_client}, +}; use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo}; use rustfs_protos::proto_gen::node_service::RenamePartRequest; use rustfs_rio::{HttpReader, HttpWriter}; use tokio::{io::AsyncWrite, net::TcpStream, time::timeout}; -use tonic::Request; +use tonic::{Request, service::interceptor::InterceptedService, transport::Channel}; use uuid::Uuid; #[derive(Debug)] @@ -259,6 +264,12 @@ impl RemoteDisk { } } } + + async fn get_client(&self) -> Result>> { + node_service_time_out_client(&self.addr, TonicInterceptor::Signature(gen_tonic_signature_interceptor())) + .await + .map_err(|err| Error::other(format!("can not get client, err: {err}"))) + } } // TODO: all api need to handle errors @@ -343,7 +354,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(MakeVolumeRequest { @@ -370,7 +382,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(MakeVolumesRequest { @@ -397,7 +410,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(ListVolumesRequest { @@ -429,7 +443,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(StatVolumeRequest { @@ -458,7 +473,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(DeleteVolumeRequest { @@ -545,7 +561,8 @@ impl DiskAPI for RemoteDisk { let file_info = serde_json::to_string(&fi)?; let opts = serde_json::to_string(&opts)?; - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(DeleteVersionRequest { @@ -603,7 +620,7 @@ impl DiskAPI for RemoteDisk { } }); } - let mut client = match node_service_time_out_client(&self.addr).await { + let mut client = match self.get_client().await { Ok(client) => client, Err(err) => { let mut errors = Vec::with_capacity(versions.len()); @@ -674,7 +691,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(DeletePathsRequest { @@ -703,7 +721,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(WriteMetadataRequest { @@ -734,7 +753,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(UpdateMetadataRequest { @@ -772,7 +792,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(ReadVersionRequest { @@ -804,7 +825,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(ReadXlRequest { @@ -843,7 +865,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { let file_info = serde_json::to_string(&fi)?; - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(RenameDataRequest { @@ -878,7 +901,8 @@ impl DiskAPI for RemoteDisk { return Err(DiskError::FaultyDisk); } - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(ListDirRequest { @@ -1039,7 +1063,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(RenameFileRequest { @@ -1069,7 +1094,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(RenamePartRequest { @@ -1101,7 +1127,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { let options = serde_json::to_string(&opt)?; - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(DeleteRequest { @@ -1131,7 +1158,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { let file_info = serde_json::to_string(&fi)?; - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(VerifyFileRequest { @@ -1160,7 +1188,8 @@ impl DiskAPI for RemoteDisk { async fn read_parts(&self, bucket: &str, paths: &[String]) -> Result> { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(ReadPartsRequest { @@ -1190,7 +1219,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { let file_info = serde_json::to_string(&fi)?; - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(CheckPartsRequest { @@ -1222,7 +1252,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { let read_multiple_req = serde_json::to_string(&req)?; - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(ReadMultipleRequest { @@ -1255,7 +1286,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(WriteAllRequest { @@ -1284,7 +1316,8 @@ impl DiskAPI for RemoteDisk { self.execute_with_timeout( || async { - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(ReadAllRequest { @@ -1313,7 +1346,8 @@ impl DiskAPI for RemoteDisk { } let opts = serde_json::to_string(&opts)?; - let mut client = node_service_time_out_client(&self.addr) + let mut client = self + .get_client() .await .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(DiskInfoRequest { diff --git a/crates/ecstore/src/rpc/remote_locker.rs b/crates/ecstore/src/rpc/remote_locker.rs new file mode 100644 index 00000000..ea202de3 --- /dev/null +++ b/crates/ecstore/src/rpc/remote_locker.rs @@ -0,0 +1,394 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::rpc::client::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client}; +use async_trait::async_trait; +use rustfs_lock::types::{LockId, LockMetadata, LockPriority}; +use rustfs_lock::{LockClient, LockError, LockInfo, LockResponse, LockStats, LockStatus, Result}; +use rustfs_lock::{LockRequest, LockType}; +use rustfs_protos::proto_gen::node_service::node_service_client::NodeServiceClient; +use rustfs_protos::proto_gen::node_service::{GenerallyLockRequest, PingRequest}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use tonic::Request; +use tonic::service::interceptor::InterceptedService; +use tonic::transport::Channel; +use tracing::info; + +/// Remote lock client implementation +#[derive(Debug)] +pub struct RemoteClient { + addr: String, + // Track active locks with their original owner information + active_locks: Arc>>, // lock_id -> owner +} + +impl Clone for RemoteClient { + fn clone(&self) -> Self { + Self { + addr: self.addr.clone(), + active_locks: self.active_locks.clone(), + } + } +} + +impl RemoteClient { + pub fn new(endpoint: String) -> Self { + Self { + addr: endpoint, + active_locks: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub fn from_url(url: url::Url) -> Self { + Self { + addr: url.to_string(), + active_locks: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Create a minimal LockRequest for unlock operations + fn create_unlock_request(&self, lock_id: &LockId, owner: &str) -> LockRequest { + LockRequest { + lock_id: lock_id.clone(), + resource: lock_id.resource.clone(), + lock_type: LockType::Exclusive, // Type doesn't matter for unlock + owner: owner.to_string(), + acquire_timeout: std::time::Duration::from_secs(30), + ttl: std::time::Duration::from_secs(300), + metadata: LockMetadata::default(), + priority: LockPriority::Normal, + deadlock_detection: false, + } + } + + pub async fn get_client(&self) -> Result>> { + node_service_time_out_client(&self.addr, TonicInterceptor::Signature(gen_tonic_signature_interceptor())) + .await + .map_err(|err| LockError::internal(format!("can not get client, err: {err}"))) + } +} + +#[async_trait] +impl LockClient for RemoteClient { + async fn acquire_exclusive(&self, request: &LockRequest) -> Result { + info!("remote acquire_exclusive for {}", request.resource); + let mut client = self.get_client().await?; + let req = Request::new(GenerallyLockRequest { + args: serde_json::to_string(&request) + .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?, + }); + let resp = client + .lock(req) + .await + .map_err(|e| LockError::internal(e.to_string()))? + .into_inner(); + + // Check for explicit error first + if let Some(error_info) = resp.error_info { + return Err(LockError::internal(error_info)); + } + + // Check if the lock acquisition was successful + if resp.success { + // Save the lock information for later release + let mut locks = self.active_locks.write().await; + locks.insert(request.lock_id.clone(), request.owner.clone()); + + Ok(LockResponse::success( + LockInfo { + id: request.lock_id.clone(), + resource: request.resource.clone(), + lock_type: request.lock_type, + status: LockStatus::Acquired, + owner: request.owner.clone(), + acquired_at: std::time::SystemTime::now(), + expires_at: std::time::SystemTime::now() + request.ttl, + last_refreshed: std::time::SystemTime::now(), + metadata: request.metadata.clone(), + priority: request.priority, + wait_start_time: None, + }, + std::time::Duration::ZERO, + )) + } else { + // Lock acquisition failed + Ok(LockResponse::failure( + "Lock acquisition failed on remote server".to_string(), + std::time::Duration::ZERO, + )) + } + } + + async fn acquire_shared(&self, request: &LockRequest) -> Result { + info!("remote acquire_shared for {}", request.resource); + let mut client = self.get_client().await?; + let req = Request::new(GenerallyLockRequest { + args: serde_json::to_string(&request) + .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?, + }); + let resp = client + .r_lock(req) + .await + .map_err(|e| LockError::internal(e.to_string()))? + .into_inner(); + + // Check for explicit error first + if let Some(error_info) = resp.error_info { + return Err(LockError::internal(error_info)); + } + + // Check if the lock acquisition was successful + if resp.success { + // Save the lock information for later release + let mut locks = self.active_locks.write().await; + locks.insert(request.lock_id.clone(), request.owner.clone()); + + Ok(LockResponse::success( + LockInfo { + id: request.lock_id.clone(), + resource: request.resource.clone(), + lock_type: request.lock_type, + status: LockStatus::Acquired, + owner: request.owner.clone(), + acquired_at: std::time::SystemTime::now(), + expires_at: std::time::SystemTime::now() + request.ttl, + last_refreshed: std::time::SystemTime::now(), + metadata: request.metadata.clone(), + priority: request.priority, + wait_start_time: None, + }, + std::time::Duration::ZERO, + )) + } else { + // Lock acquisition failed + Ok(LockResponse::failure( + "Shared lock acquisition failed on remote server".to_string(), + std::time::Duration::ZERO, + )) + } + } + + async fn release(&self, lock_id: &LockId) -> Result { + info!("remote release for {}", lock_id); + + // Get the original owner for this lock + let owner = { + let locks = self.active_locks.read().await; + locks.get(lock_id).cloned().unwrap_or_else(|| "remote".to_string()) + }; + + let unlock_request = self.create_unlock_request(lock_id, &owner); + + let request_string = serde_json::to_string(&unlock_request) + .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?; + let mut client = self.get_client().await?; + + // Try UnLock first (for exclusive locks) + let req = Request::new(GenerallyLockRequest { + args: request_string.clone(), + }); + let resp = client.un_lock(req).await; + + let success = if resp.is_err() { + // If that fails, try RUnLock (for shared locks) + let req = Request::new(GenerallyLockRequest { args: request_string }); + let resp = client + .r_un_lock(req) + .await + .map_err(|e| LockError::internal(e.to_string()))? + .into_inner(); + if let Some(error_info) = resp.error_info { + return Err(LockError::internal(error_info)); + } + resp.success + } else { + let resp = resp.map_err(|e| LockError::internal(e.to_string()))?.into_inner(); + + if let Some(error_info) = resp.error_info { + return Err(LockError::internal(error_info)); + } + resp.success + }; + + // Remove the lock from our tracking if successful + if success { + let mut locks = self.active_locks.write().await; + locks.remove(lock_id); + } + + Ok(success) + } + + async fn refresh(&self, lock_id: &LockId) -> Result { + info!("remote refresh for {}", lock_id); + let refresh_request = self.create_unlock_request(lock_id, "remote"); + let mut client = self.get_client().await?; + let req = Request::new(GenerallyLockRequest { + args: serde_json::to_string(&refresh_request) + .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?, + }); + let resp = client + .refresh(req) + .await + .map_err(|e| LockError::internal(e.to_string()))? + .into_inner(); + if let Some(error_info) = resp.error_info { + return Err(LockError::internal(error_info)); + } + Ok(resp.success) + } + + async fn force_release(&self, lock_id: &LockId) -> Result { + info!("remote force_release for {}", lock_id); + let force_request = self.create_unlock_request(lock_id, "remote"); + let mut client = self.get_client().await?; + let req = Request::new(GenerallyLockRequest { + args: serde_json::to_string(&force_request) + .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?, + }); + let resp = client + .force_un_lock(req) + .await + .map_err(|e| LockError::internal(e.to_string()))? + .into_inner(); + if let Some(error_info) = resp.error_info { + return Err(LockError::internal(error_info)); + } + Ok(resp.success) + } + + async fn check_status(&self, lock_id: &LockId) -> Result> { + info!("remote check_status for {}", lock_id); + + // Since there's no direct status query in the gRPC service, + // we attempt a non-blocking lock acquisition to check if the resource is available + let status_request = self.create_unlock_request(lock_id, "remote"); + let mut client = self.get_client().await?; + + // Try to acquire a very short-lived lock to test availability + let req = Request::new(GenerallyLockRequest { + args: serde_json::to_string(&status_request) + .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?, + }); + + // Try exclusive lock first with very short timeout + let resp = client.lock(req).await; + + match resp { + Ok(response) => { + let resp = response.into_inner(); + if resp.success { + // If we successfully acquired the lock, the resource was free + // Immediately release it + let release_req = Request::new(GenerallyLockRequest { + args: serde_json::to_string(&status_request) + .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?, + }); + let _ = client.un_lock(release_req).await; // Best effort release + + // Return None since no one was holding the lock + Ok(None) + } else { + // Lock acquisition failed, meaning someone is holding it + // We can't determine the exact details remotely, so return a generic status + Ok(Some(LockInfo { + id: lock_id.clone(), + resource: lock_id.as_str().to_string(), + lock_type: LockType::Exclusive, // We can't know the exact type + status: LockStatus::Acquired, + owner: "unknown".to_string(), // Remote client can't determine owner + acquired_at: std::time::SystemTime::now(), + expires_at: std::time::SystemTime::now() + std::time::Duration::from_secs(3600), + last_refreshed: std::time::SystemTime::now(), + metadata: LockMetadata::default(), + priority: LockPriority::Normal, + wait_start_time: None, + })) + } + } + Err(_) => { + // Communication error or lock is held + Ok(Some(LockInfo { + id: lock_id.clone(), + resource: lock_id.as_str().to_string(), + lock_type: LockType::Exclusive, + status: LockStatus::Acquired, + owner: "unknown".to_string(), + acquired_at: std::time::SystemTime::now(), + expires_at: std::time::SystemTime::now() + std::time::Duration::from_secs(3600), + last_refreshed: std::time::SystemTime::now(), + metadata: LockMetadata::default(), + priority: LockPriority::Normal, + wait_start_time: None, + })) + } + } + } + + async fn get_stats(&self) -> Result { + info!("remote get_stats from {}", self.addr); + + // Since there's no direct statistics endpoint in the gRPC service, + // we return basic stats indicating this is a remote client + let stats = LockStats { + last_updated: std::time::SystemTime::now(), + ..Default::default() + }; + + // We could potentially enhance this by: + // 1. Keeping local counters of operations performed + // 2. Adding a stats gRPC method to the service + // 3. Querying server health endpoints + + // For now, return minimal stats indicating remote connectivity + Ok(stats) + } + + async fn close(&self) -> Result<()> { + Ok(()) + } + + async fn is_online(&self) -> bool { + // Use Ping interface to test if remote service is online + let mut client = match self.get_client().await { + Ok(client) => client, + Err(_) => { + info!("remote client {} connection failed", self.addr); + return false; + } + }; + + let ping_req = Request::new(PingRequest { + version: 1, + body: bytes::Bytes::new(), + }); + + match client.ping(ping_req).await { + Ok(_) => { + info!("remote client {} is online", self.addr); + true + } + Err(_) => { + info!("remote client {} ping failed", self.addr); + false + } + } + } + + async fn is_local(&self) -> bool { + false + } +} diff --git a/crates/lock/src/client/mod.rs b/crates/lock/src/client/mod.rs index 572bd579..7ab8fec6 100644 --- a/crates/lock/src/client/mod.rs +++ b/crates/lock/src/client/mod.rs @@ -13,7 +13,7 @@ // limitations under the License. pub mod local; -pub mod remote; +// pub mod remote; use async_trait::async_trait; use std::sync::Arc; @@ -74,10 +74,10 @@ impl ClientFactory { Arc::new(local::LocalClient::new()) } - /// Create remote client - pub fn create_remote(endpoint: String) -> Arc { - Arc::new(remote::RemoteClient::new(endpoint)) - } + // /// Create remote client + // pub fn create_remote(endpoint: String) -> Arc { + // Arc::new(remote::RemoteClient::new(endpoint)) + // } } #[cfg(test)] @@ -85,15 +85,6 @@ mod tests { use super::*; use crate::types::LockType; - #[tokio::test] - async fn test_client_factory() { - let local_client = ClientFactory::create_local(); - assert!(local_client.is_local().await); - - let remote_client = ClientFactory::create_remote("http://localhost:8080".to_string()); - assert!(!remote_client.is_local().await); - } - #[tokio::test] async fn test_local_client_basic_operations() { let client = ClientFactory::create_local(); diff --git a/crates/lock/src/client/remote.rs b/crates/lock/src/client/remote.rs index e370b627..e69a82f4 100644 --- a/crates/lock/src/client/remote.rs +++ b/crates/lock/src/client/remote.rs @@ -14,7 +14,7 @@ use async_trait::async_trait; use rustfs_protos::{ - node_service_time_out_client, + node_service_time_out_client, node_service_time_out_client_no_auth, proto_gen::node_service::{GenerallyLockRequest, PingRequest}, }; use std::collections::HashMap; @@ -82,7 +82,7 @@ impl RemoteClient { impl LockClient for RemoteClient { async fn acquire_exclusive(&self, request: &LockRequest) -> Result { info!("remote acquire_exclusive for {}", request.resource); - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client_no_auth(&self.addr) .await .map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?; let req = Request::new(GenerallyLockRequest { @@ -133,7 +133,7 @@ impl LockClient for RemoteClient { async fn acquire_shared(&self, request: &LockRequest) -> Result { info!("remote acquire_shared for {}", request.resource); - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client_no_auth(&self.addr) .await .map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?; let req = Request::new(GenerallyLockRequest { @@ -195,7 +195,7 @@ impl LockClient for RemoteClient { let request_string = serde_json::to_string(&unlock_request) .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?; - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client_no_auth(&self.addr) .await .map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?; @@ -238,7 +238,7 @@ impl LockClient for RemoteClient { async fn refresh(&self, lock_id: &LockId) -> Result { info!("remote refresh for {}", lock_id); let refresh_request = self.create_unlock_request(lock_id, "remote"); - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client_no_auth(&self.addr) .await .map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?; let req = Request::new(GenerallyLockRequest { @@ -259,7 +259,7 @@ impl LockClient for RemoteClient { async fn force_release(&self, lock_id: &LockId) -> Result { info!("remote force_release for {}", lock_id); let force_request = self.create_unlock_request(lock_id, "remote"); - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client_no_auth(&self.addr) .await .map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?; let req = Request::new(GenerallyLockRequest { @@ -283,7 +283,7 @@ impl LockClient for RemoteClient { // Since there's no direct status query in the gRPC service, // we attempt a non-blocking lock acquisition to check if the resource is available let status_request = self.create_unlock_request(lock_id, "remote"); - let mut client = node_service_time_out_client(&self.addr) + let mut client = node_service_time_out_client_no_auth(&self.addr) .await .map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?; @@ -372,7 +372,7 @@ impl LockClient for RemoteClient { async fn is_online(&self) -> bool { // Use Ping interface to test if remote service is online - let mut client = match node_service_time_out_client(&self.addr).await { + let mut client = match self.get_client().await { Ok(client) => client, Err(_) => { info!("remote client {} connection failed", self.addr); diff --git a/crates/lock/src/lib.rs b/crates/lock/src/lib.rs index 477362e2..a35b1bde 100644 --- a/crates/lock/src/lib.rs +++ b/crates/lock/src/lib.rs @@ -37,7 +37,7 @@ pub mod types; // Re-export main types for easy access pub use crate::{ // Client interfaces - client::{LockClient, local::LocalClient, remote::RemoteClient}, + client::{LockClient, local::LocalClient}, // Error types error::{LockError, Result}, // Fast Lock System exports diff --git a/crates/protos/src/lib.rs b/crates/protos/src/lib.rs index f9f4fe9c..45ddb5bb 100644 --- a/crates/protos/src/lib.rs +++ b/crates/protos/src/lib.rs @@ -20,11 +20,10 @@ use rustfs_common::{GLOBAL_CONN_MAP, GLOBAL_MTLS_IDENTITY, GLOBAL_ROOT_CERT, evi use std::{error::Error, time::Duration}; use tonic::{ Request, Status, - metadata::MetadataValue, service::interceptor::InterceptedService, transport::{Certificate, Channel, ClientTlsConfig, Endpoint}, }; -use tracing::{debug, error, warn}; +use tracing::{debug, warn}; // Type alias for the complex client type pub type NodeServiceClientType = NodeServiceClient< @@ -64,7 +63,7 @@ const RUSTFS_HTTPS_PREFIX: &str = "https://"; /// - Aggressive TCP keepalive (10s) /// - HTTP/2 PING every 5s, timeout at 3s /// - Overall RPC timeout of 30s (reduced from 60s) -async fn create_new_channel(addr: &str) -> Result> { +pub async fn create_new_channel(addr: &str) -> Result> { debug!("Creating new gRPC channel to: {}", addr); let mut connector = Endpoint::from_shared(addr.to_string())? @@ -131,90 +130,6 @@ async fn create_new_channel(addr: &str) -> Result> { Ok(channel) } -/// Get a gRPC client for the NodeService with robust connection handling. -/// -/// This function implements several resilience features: -/// 1. Connection caching for performance -/// 2. Automatic eviction of stale/dead connections on error -/// 3. Optimized keepalive settings for fast dead peer detection -/// 4. Reduced timeouts to fail fast when peers are unresponsive -/// -/// # Connection Lifecycle -/// - Cached connections are reused for subsequent calls -/// - On any connection error, the cached connection is evicted -/// - Fresh connections are established with aggressive keepalive settings -/// -/// # Cluster Power-Off Recovery -/// When a node experiences abrupt power-off: -/// 1. The cached connection will fail on next use -/// 2. The connection is automatically evicted from cache -/// 3. Subsequent calls will attempt fresh connections -/// 4. If node is still down, connection will fail fast (3s timeout) -pub async fn node_service_time_out_client( - addr: &String, -) -> Result< - NodeServiceClient< - InterceptedService) -> Result, Status> + Send + Sync + 'static>>, - >, - Box, -> { - debug!("Obtaining gRPC client for NodeService at: {}", addr); - let token_str = rustfs_credentials::get_grpc_token(); - let token: MetadataValue<_> = token_str.parse().map_err(|e| { - error!( - "Failed to parse gRPC auth token into MetadataValue: {:?}; env={} token_len={} token_prefix={}", - e, - rustfs_credentials::ENV_GRPC_AUTH_TOKEN, - token_str.len(), - token_str.chars().take(2).collect::(), - ); - e - })?; - - // Try to get cached channel - let cached_channel = { GLOBAL_CONN_MAP.read().await.get(addr).cloned() }; - - let channel = match cached_channel { - Some(channel) => { - debug!("Using cached gRPC channel for: {}", addr); - channel - } - None => { - // No cached connection, create new one - create_new_channel(addr).await? - } - }; - - Ok(NodeServiceClient::with_interceptor( - channel, - Box::new(move |mut req: Request<()>| { - req.metadata_mut().insert("authorization", token.clone()); - Ok(req) - }), - )) -} - -/// Get a gRPC client with automatic connection eviction on failure. -/// -/// This is the preferred method for cluster operations as it ensures -/// that failed connections are automatically cleaned up from the cache. -/// -/// Returns the client and the address for later eviction if needed. -pub async fn node_service_client_with_eviction( - addr: &String, -) -> Result< - ( - NodeServiceClient< - InterceptedService) -> Result, Status> + Send + Sync + 'static>>, - >, - String, - ), - Box, -> { - let client = node_service_time_out_client(addr).await?; - Ok((client, addr.clone())) -} - /// Evict a connection from the cache after a failure. /// This should be called when an RPC fails to ensure fresh connections are tried. pub async fn evict_failed_connection(addr: &str) { diff --git a/rustfs/src/server/http.rs b/rustfs/src/server/http.rs index e255c35e..c698ce4b 100644 --- a/rustfs/src/server/http.rs +++ b/rustfs/src/server/http.rs @@ -21,7 +21,7 @@ use crate::server::{ReadinessGateLayer, RemoteAddr, ServiceState, ServiceStateMa use crate::storage; use crate::storage::tonic_service::make_server; use bytes::Bytes; -use http::{HeaderMap, Request as HttpRequest, Response}; +use http::{HeaderMap, Method, Request as HttpRequest, Response}; use hyper_util::{ rt::{TokioExecutor, TokioIo}, server::conn::auto::Builder as ConnBuilder, @@ -31,6 +31,7 @@ use hyper_util::{ use metrics::{counter, histogram}; use rustfs_common::GlobalReadiness; use rustfs_config::{MI_B, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; +use rustfs_ecstore::rpc::{TONIC_RPC_PREFIX, verify_rpc_signature}; use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer; use rustfs_utils::net::parse_and_resolve_address; use rustls::ServerConfig; @@ -42,7 +43,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::TlsAcceptor; -use tonic::{Request, Status, metadata::MetadataValue}; +use tonic::{Request, Status}; use tower::ServiceBuilder; use tower_http::add_extension::AddExtensionLayer; use tower_http::catch_panic::CatchPanicLayer; @@ -722,17 +723,11 @@ fn handle_connection_error(err: &(dyn std::error::Error + 'static)) { #[allow(clippy::result_large_err)] fn check_auth(req: Request<()>) -> std::result::Result, Status> { - let token_str = rustfs_credentials::get_grpc_token(); - - let token: MetadataValue<_> = token_str.parse().map_err(|e| { - error!("Failed to parse RUSTFS_GRPC_AUTH_TOKEN into gRPC metadata value: {}", e); - Status::internal("Invalid auth token configuration") + verify_rpc_signature(TONIC_RPC_PREFIX, &Method::GET, req.metadata().as_ref()).map_err(|e| { + error!("RPC signature verification failed: {}", e); + Status::unauthenticated("No valid auth token") })?; - - match req.metadata().get("authorization") { - Some(t) if token == t => Ok(req), - _ => Err(Status::unauthenticated("No valid auth token")), - } + Ok(req) } /// Determines the listen backlog size.