Refactor RPC Authentication System for Improved Maintainability (#1391)

This commit is contained in:
weisd
2026-01-05 19:51:51 +08:00
committed by GitHub
parent 0b6f3302ce
commit 5660208e89
15 changed files with 697 additions and 299 deletions

View File

@@ -15,11 +15,12 @@
use async_trait::async_trait;
use rustfs_ecstore::disk::endpoint::Endpoint;
use rustfs_lock::client::{LockClient, local::LocalClient, remote::RemoteClient};
use rustfs_ecstore::rpc::RemoteClient;
use rustfs_lock::client::{LockClient, local::LocalClient};
use rustfs_lock::types::{LockInfo, LockResponse, LockStats};
use rustfs_lock::{LockId, LockMetadata, LockPriority, LockType};
use rustfs_lock::{LockRequest, NamespaceLock, NamespaceLockManager};
use rustfs_protos::{node_service_time_out_client, proto_gen::node_service::GenerallyLockRequest};
use rustfs_protos::proto_gen::node_service::GenerallyLockRequest;
use serial_test::serial;
use std::{collections::HashMap, error::Error, sync::Arc, time::Duration};
use tokio::time::sleep;
@@ -156,7 +157,7 @@ async fn test_lock_unlock_rpc() -> Result<(), Box<dyn Error>> {
};
let args = serde_json::to_string(&args)?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let mut client = RemoteClient::new(CLUSTER_ADDR.to_string()).get_client().await?;
println!("got client");
let request = Request::new(GenerallyLockRequest { args: args.clone() });
@@ -614,7 +615,7 @@ async fn test_rpc_read_lock() -> Result<(), Box<dyn Error>> {
};
let args_str = serde_json::to_string(&args)?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let mut client = RemoteClient::new(CLUSTER_ADDR.to_string()).get_client().await?;
// First read lock
let request = Request::new(GenerallyLockRequest { args: args_str.clone() });
@@ -669,7 +670,7 @@ async fn test_lock_refresh() -> Result<(), Box<dyn Error>> {
};
let args_str = serde_json::to_string(&args)?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let mut client = RemoteClient::new(CLUSTER_ADDR.to_string()).get_client().await?;
// Acquire lock
let request = Request::new(GenerallyLockRequest { args: args_str.clone() });
@@ -713,7 +714,7 @@ async fn test_force_unlock() -> Result<(), Box<dyn Error>> {
};
let args_str = serde_json::to_string(&args)?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let mut client = RemoteClient::new(CLUSTER_ADDR.to_string()).get_client().await?;
// Acquire lock
let request = Request::new(GenerallyLockRequest { args: args_str.clone() });

View File

@@ -17,11 +17,11 @@ use crate::common::workspace_root;
use futures::future::join_all;
use rmp_serde::{Deserializer, Serializer};
use rustfs_ecstore::disk::{VolumeInfo, WalkDirOptions};
use rustfs_ecstore::rpc::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client};
use rustfs_filemeta::{MetaCacheEntry, MetacacheReader, MetacacheWriter};
use rustfs_protos::proto_gen::node_service::WalkDirRequest;
use rustfs_protos::{
models::{PingBody, PingBodyBuilder},
node_service_time_out_client,
proto_gen::node_service::{
ListVolumesRequest, LocalStorageInfoRequest, MakeVolumeRequest, PingRequest, PingResponse, ReadAllRequest,
},
@@ -53,7 +53,9 @@ async fn ping() -> Result<(), Box<dyn Error>> {
assert!(decoded_payload.is_ok());
// Create client
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let mut client =
node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await?;
// Construct PingRequest
let request = Request::new(PingRequest {
@@ -78,7 +80,9 @@ async fn ping() -> Result<(), Box<dyn Error>> {
#[tokio::test]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn make_volume() -> Result<(), Box<dyn Error>> {
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let mut client =
node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await?;
let request = Request::new(MakeVolumeRequest {
disk: "data".to_string(),
volume: "dandan".to_string(),
@@ -96,7 +100,9 @@ async fn make_volume() -> Result<(), Box<dyn Error>> {
#[tokio::test]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn list_volumes() -> Result<(), Box<dyn Error>> {
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let mut client =
node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await?;
let request = Request::new(ListVolumesRequest {
disk: "data".to_string(),
});
@@ -126,7 +132,9 @@ async fn walk_dir() -> Result<(), Box<dyn Error>> {
let (rd, mut wr) = tokio::io::duplex(1024);
let mut buf = Vec::new();
opts.serialize(&mut Serializer::new(&mut buf))?;
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let mut client =
node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await?;
let disk_path = std::env::var_os("RUSTFS_DISK_PATH").map(PathBuf::from).unwrap_or_else(|| {
let mut path = workspace_root();
path.push("target");
@@ -179,7 +187,9 @@ async fn walk_dir() -> Result<(), Box<dyn Error>> {
#[tokio::test]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn read_all() -> Result<(), Box<dyn Error>> {
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let mut client =
node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await?;
let request = Request::new(ReadAllRequest {
disk: "data".to_string(),
volume: "ff".to_string(),
@@ -197,7 +207,9 @@ async fn read_all() -> Result<(), Box<dyn Error>> {
#[tokio::test]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn storage_info() -> Result<(), Box<dyn Error>> {
let mut client = node_service_time_out_client(&CLUSTER_ADDR.to_string()).await?;
let mut client =
node_service_time_out_client(&CLUSTER_ADDR.to_string(), TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await?;
let request = Request::new(LocalStorageInfoRequest { metrics: true });
let response = client.local_storage_info(request).await?.into_inner();

View File

@@ -14,6 +14,7 @@
use crate::data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT, load_data_usage_from_backend};
use crate::error::{Error, Result};
use crate::rpc::node_service_time_out_client_no_auth;
use crate::{
disk::endpoint::Endpoint,
global::{GLOBAL_BOOT_TIME, GLOBAL_Endpoints},
@@ -29,7 +30,6 @@ use rustfs_madmin::{
};
use rustfs_protos::{
models::{PingBody, PingBodyBuilder},
node_service_time_out_client,
proto_gen::node_service::{PingRequest, PingResponse},
};
use std::{
@@ -101,7 +101,7 @@ async fn is_server_resolvable(endpoint: &Endpoint) -> Result<()> {
let decoded_payload = flatbuffers::root::<PingBody>(finished_data);
assert!(decoded_payload.is_ok());
let mut client = node_service_time_out_client(&addr)
let mut client = node_service_time_out_client_no_auth(&addr)
.await
.map_err(|err| Error::other(err.to_string()))?;

View File

@@ -0,0 +1,88 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::error::Error;
use http::Method;
use rustfs_common::GLOBAL_CONN_MAP;
use rustfs_protos::{create_new_channel, proto_gen::node_service::node_service_client::NodeServiceClient};
use tonic::{service::interceptor::InterceptedService, transport::Channel};
use tracing::debug;
use crate::rpc::{TONIC_RPC_PREFIX, gen_signature_headers};
/// 3. Subsequent calls will attempt fresh connections
/// 4. If node is still down, connection will fail fast (3s timeout)
pub async fn node_service_time_out_client(
addr: &String,
interceptor: TonicInterceptor,
) -> Result<NodeServiceClient<InterceptedService<Channel, TonicInterceptor>>, Box<dyn Error>> {
// Try to get cached channel
let cached_channel = { GLOBAL_CONN_MAP.read().await.get(addr).cloned() };
let channel = match cached_channel {
Some(channel) => {
debug!("Using cached gRPC channel for: {}", addr);
channel
}
None => {
// No cached connection, create new one
create_new_channel(addr).await?
}
};
Ok(NodeServiceClient::with_interceptor(channel, interceptor))
}
pub async fn node_service_time_out_client_no_auth(
addr: &String,
) -> Result<NodeServiceClient<InterceptedService<Channel, TonicInterceptor>>, Box<dyn Error>> {
node_service_time_out_client(addr, TonicInterceptor::NoOp(NoOpInterceptor)).await
}
pub struct TonicSignatureInterceptor;
impl tonic::service::Interceptor for TonicSignatureInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
let headers = gen_signature_headers(TONIC_RPC_PREFIX, &Method::GET);
req.metadata_mut().as_mut().extend(headers);
Ok(req)
}
}
pub fn gen_tonic_signature_interceptor() -> TonicSignatureInterceptor {
TonicSignatureInterceptor
}
pub struct NoOpInterceptor;
impl tonic::service::Interceptor for NoOpInterceptor {
fn call(&mut self, req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
Ok(req)
}
}
pub enum TonicInterceptor {
Signature(TonicSignatureInterceptor),
NoOp(NoOpInterceptor),
}
impl tonic::service::Interceptor for TonicInterceptor {
fn call(&mut self, req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
match self {
TonicInterceptor::Signature(interceptor) => interceptor.call(req),
TonicInterceptor::NoOp(interceptor) => interceptor.call(req),
}
}
}

View File

@@ -29,6 +29,7 @@ type HmacSha256 = Hmac<Sha256>;
const SIGNATURE_HEADER: &str = "x-rustfs-signature";
const TIMESTAMP_HEADER: &str = "x-rustfs-timestamp";
const SIGNATURE_VALID_DURATION: i64 = 300; // 5 minutes
pub const TONIC_RPC_PREFIX: &str = "/node_service.NodeService";
/// Get the shared secret for HMAC signing
fn get_shared_secret() -> String {
@@ -57,13 +58,25 @@ fn generate_signature(secret: &str, url: &str, method: &Method, timestamp: i64)
/// Build headers with authentication signature
pub fn build_auth_headers(url: &str, method: &Method, headers: &mut HeaderMap) {
let auth_headers = gen_signature_headers(url, method);
headers.extend(auth_headers);
}
pub fn gen_signature_headers(url: &str, method: &Method) -> HeaderMap {
let secret = get_shared_secret();
let timestamp = OffsetDateTime::now_utc().unix_timestamp();
let signature = generate_signature(&secret, url, method, timestamp);
headers.insert(SIGNATURE_HEADER, HeaderValue::from_str(&signature).unwrap());
headers.insert(TIMESTAMP_HEADER, HeaderValue::from_str(&timestamp.to_string()).unwrap());
let mut headers = HeaderMap::new();
headers.insert(SIGNATURE_HEADER, HeaderValue::from_str(&signature).expect("Invalid header value"));
headers.insert(
TIMESTAMP_HEADER,
HeaderValue::from_str(&timestamp.to_string()).expect("Invalid header value"),
);
headers
}
/// Verify the request signature for RPC requests

View File

@@ -12,12 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod client;
mod http_auth;
mod peer_rest_client;
mod peer_s3_client;
mod remote_disk;
mod remote_locker;
pub use http_auth::{build_auth_headers, verify_rpc_signature};
pub use client::{
TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client, node_service_time_out_client_no_auth,
};
pub use http_auth::{TONIC_RPC_PREFIX, build_auth_headers, gen_signature_headers, verify_rpc_signature};
pub use peer_rest_client::PeerRestClient;
pub use peer_s3_client::{LocalPeerS3Client, PeerS3Client, RemotePeerS3Client, S3PeerSys};
pub use remote_disk::RemoteDisk;
pub use remote_locker::RemoteClient;

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use crate::error::{Error, Result};
use crate::rpc::client::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client};
use crate::{
endpoints::EndpointServerPools,
global::is_dist_erasure,
@@ -25,21 +26,22 @@ use rustfs_madmin::{
metrics::RealtimeMetrics,
net::NetInfo,
};
use rustfs_protos::{
evict_failed_connection, node_service_time_out_client,
proto_gen::node_service::{
DeleteBucketMetadataRequest, DeletePolicyRequest, DeleteServiceAccountRequest, DeleteUserRequest, GetCpusRequest,
GetMemInfoRequest, GetMetricsRequest, GetNetInfoRequest, GetOsInfoRequest, GetPartitionsRequest, GetProcInfoRequest,
GetSeLinuxInfoRequest, GetSysConfigRequest, GetSysErrorsRequest, LoadBucketMetadataRequest, LoadGroupRequest,
LoadPolicyMappingRequest, LoadPolicyRequest, LoadRebalanceMetaRequest, LoadServiceAccountRequest,
LoadTransitionTierConfigRequest, LoadUserRequest, LocalStorageInfoRequest, Mss, ReloadPoolMetaRequest,
ReloadSiteReplicationConfigRequest, ServerInfoRequest, SignalServiceRequest, StartProfilingRequest, StopRebalanceRequest,
},
use rustfs_protos::evict_failed_connection;
use rustfs_protos::proto_gen::node_service::node_service_client::NodeServiceClient;
use rustfs_protos::proto_gen::node_service::{
DeleteBucketMetadataRequest, DeletePolicyRequest, DeleteServiceAccountRequest, DeleteUserRequest, GetCpusRequest,
GetMemInfoRequest, GetMetricsRequest, GetNetInfoRequest, GetOsInfoRequest, GetPartitionsRequest, GetProcInfoRequest,
GetSeLinuxInfoRequest, GetSysConfigRequest, GetSysErrorsRequest, LoadBucketMetadataRequest, LoadGroupRequest,
LoadPolicyMappingRequest, LoadPolicyRequest, LoadRebalanceMetaRequest, LoadServiceAccountRequest,
LoadTransitionTierConfigRequest, LoadUserRequest, LocalStorageInfoRequest, Mss, ReloadPoolMetaRequest,
ReloadSiteReplicationConfigRequest, ServerInfoRequest, SignalServiceRequest, StartProfilingRequest, StopRebalanceRequest,
};
use rustfs_utils::XHost;
use serde::{Deserialize, Serialize as _};
use std::{collections::HashMap, io::Cursor, time::SystemTime};
use tonic::Request;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::Channel;
use tracing::warn;
pub const PEER_RESTSIGNAL: &str = "signal";
@@ -83,6 +85,12 @@ impl PeerRestClient {
(remote, all)
}
pub async fn get_client(&self) -> Result<NodeServiceClient<InterceptedService<Channel, TonicInterceptor>>> {
node_service_time_out_client(&self.grid_host, TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))
}
/// Evict the connection to this peer from the global cache.
/// This should be called when communication with this peer fails.
pub async fn evict_connection(&self) {
@@ -101,9 +109,7 @@ impl PeerRestClient {
}
async fn local_storage_info_inner(&self) -> Result<rustfs_madmin::StorageInfo> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(LocalStorageInfoRequest { metrics: true });
let response = client.local_storage_info(request).await?.into_inner();
@@ -131,9 +137,7 @@ impl PeerRestClient {
}
async fn server_info_inner(&self) -> Result<ServerProperties> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(ServerInfoRequest { metrics: true });
let response = client.server_info(request).await?.into_inner();
@@ -152,9 +156,7 @@ impl PeerRestClient {
}
pub async fn get_cpus(&self) -> Result<Cpus> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(GetCpusRequest {});
let response = client.get_cpus(request).await?.into_inner();
@@ -173,9 +175,7 @@ impl PeerRestClient {
}
pub async fn get_net_info(&self) -> Result<NetInfo> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(GetNetInfoRequest {});
let response = client.get_net_info(request).await?.into_inner();
@@ -194,9 +194,7 @@ impl PeerRestClient {
}
pub async fn get_partitions(&self) -> Result<Partitions> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(GetPartitionsRequest {});
let response = client.get_partitions(request).await?.into_inner();
@@ -215,9 +213,7 @@ impl PeerRestClient {
}
pub async fn get_os_info(&self) -> Result<OsInfo> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(GetOsInfoRequest {});
let response = client.get_os_info(request).await?.into_inner();
@@ -236,9 +232,7 @@ impl PeerRestClient {
}
pub async fn get_se_linux_info(&self) -> Result<SysService> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(GetSeLinuxInfoRequest {});
let response = client.get_se_linux_info(request).await?.into_inner();
@@ -257,9 +251,7 @@ impl PeerRestClient {
}
pub async fn get_sys_config(&self) -> Result<SysConfig> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(GetSysConfigRequest {});
let response = client.get_sys_config(request).await?.into_inner();
@@ -278,9 +270,7 @@ impl PeerRestClient {
}
pub async fn get_sys_errors(&self) -> Result<SysErrors> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(GetSysErrorsRequest {});
let response = client.get_sys_errors(request).await?.into_inner();
@@ -299,9 +289,7 @@ impl PeerRestClient {
}
pub async fn get_mem_info(&self) -> Result<MemInfo> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(GetMemInfoRequest {});
let response = client.get_mem_info(request).await?.into_inner();
@@ -320,9 +308,7 @@ impl PeerRestClient {
}
pub async fn get_metrics(&self, t: MetricType, opts: &CollectMetricsOpts) -> Result<RealtimeMetrics> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let mut buf_t = Vec::new();
t.serialize(&mut Serializer::new(&mut buf_t))?;
let mut buf_o = Vec::new();
@@ -348,9 +334,7 @@ impl PeerRestClient {
}
pub async fn get_proc_info(&self) -> Result<ProcInfo> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(GetProcInfoRequest {});
let response = client.get_proc_info(request).await?.into_inner();
@@ -369,9 +353,7 @@ impl PeerRestClient {
}
pub async fn start_profiling(&self, profiler: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(StartProfilingRequest {
profiler: profiler.to_string(),
});
@@ -403,9 +385,7 @@ impl PeerRestClient {
}
pub async fn load_bucket_metadata(&self, bucket: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(LoadBucketMetadataRequest {
bucket: bucket.to_string(),
});
@@ -421,9 +401,7 @@ impl PeerRestClient {
}
pub async fn delete_bucket_metadata(&self, bucket: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(DeleteBucketMetadataRequest {
bucket: bucket.to_string(),
});
@@ -439,9 +417,7 @@ impl PeerRestClient {
}
pub async fn delete_policy(&self, policy: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(DeletePolicyRequest {
policy_name: policy.to_string(),
});
@@ -457,9 +433,7 @@ impl PeerRestClient {
}
pub async fn load_policy(&self, policy: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(LoadPolicyRequest {
policy_name: policy.to_string(),
});
@@ -475,9 +449,7 @@ impl PeerRestClient {
}
pub async fn load_policy_mapping(&self, user_or_group: &str, user_type: u64, is_group: bool) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(LoadPolicyMappingRequest {
user_or_group: user_or_group.to_string(),
user_type,
@@ -495,9 +467,7 @@ impl PeerRestClient {
}
pub async fn delete_user(&self, access_key: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(DeleteUserRequest {
access_key: access_key.to_string(),
});
@@ -517,9 +487,7 @@ impl PeerRestClient {
}
pub async fn delete_service_account(&self, access_key: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(DeleteServiceAccountRequest {
access_key: access_key.to_string(),
});
@@ -539,9 +507,7 @@ impl PeerRestClient {
}
pub async fn load_user(&self, access_key: &str, temp: bool) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(LoadUserRequest {
access_key: access_key.to_string(),
temp,
@@ -562,9 +528,7 @@ impl PeerRestClient {
}
pub async fn load_service_account(&self, access_key: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(LoadServiceAccountRequest {
access_key: access_key.to_string(),
});
@@ -584,9 +548,7 @@ impl PeerRestClient {
}
pub async fn load_group(&self, group: &str) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(LoadGroupRequest {
group: group.to_string(),
});
@@ -606,9 +568,7 @@ impl PeerRestClient {
}
pub async fn reload_site_replication_config(&self) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(ReloadSiteReplicationConfigRequest {});
let response = client.reload_site_replication_config(request).await?.into_inner();
@@ -622,9 +582,7 @@ impl PeerRestClient {
}
pub async fn signal_service(&self, sig: u64, sub_sys: &str, dry_run: bool, _exec_at: SystemTime) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let mut vars = HashMap::new();
vars.insert(PEER_RESTSIGNAL.to_string(), sig.to_string());
vars.insert(PEER_RESTSUB_SYS.to_string(), sub_sys.to_string());
@@ -644,23 +602,17 @@ impl PeerRestClient {
}
pub async fn get_metacache_listing(&self) -> Result<()> {
let _client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let _client = self.get_client().await?;
todo!()
}
pub async fn update_metacache_listing(&self) -> Result<()> {
let _client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let _client = self.get_client().await?;
todo!()
}
pub async fn reload_pool_meta(&self) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(ReloadPoolMetaRequest {});
let response = client.reload_pool_meta(request).await?.into_inner();
@@ -675,9 +627,7 @@ impl PeerRestClient {
}
pub async fn stop_rebalance(&self) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(StopRebalanceRequest {});
let response = client.stop_rebalance(request).await?.into_inner();
@@ -692,9 +642,7 @@ impl PeerRestClient {
}
pub async fn load_rebalance_meta(&self, start_rebalance: bool) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(LoadRebalanceMetaRequest { start_rebalance });
let response = client.load_rebalance_meta(request).await?.into_inner();
@@ -711,9 +659,7 @@ impl PeerRestClient {
}
pub async fn load_transition_tier_config(&self) -> Result<()> {
let mut client = node_service_time_out_client(&self.grid_host)
.await
.map_err(|err| Error::other(err.to_string()))?;
let mut client = self.get_client().await?;
let request = Request::new(LoadTransitionTierConfigRequest {});
let response = client.load_transition_tier_config(request).await?.into_inner();

View File

@@ -18,6 +18,9 @@ use crate::disk::error::{Error, Result};
use crate::disk::error_reduce::{BUCKET_OP_IGNORED_ERRS, is_all_buckets_not_found, reduce_write_quorum_errs};
use crate::disk::{DiskAPI, DiskStore, disk_store::get_max_timeout_duration};
use crate::global::GLOBAL_LOCAL_DISK_MAP;
use crate::rpc::client::{
TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client, node_service_time_out_client_no_auth,
};
use crate::store::all_local_disk;
use crate::store_utils::is_reserved_or_invalid_bucket;
use crate::{
@@ -32,7 +35,7 @@ use async_trait::async_trait;
use futures::future::join_all;
use rustfs_common::heal_channel::{DriveState, HealItemType, HealOpts, RUSTFS_RESERVED_BUCKET};
use rustfs_madmin::heal_commands::{HealDriveInfo, HealResultItem};
use rustfs_protos::node_service_time_out_client;
use rustfs_protos::proto_gen::node_service::node_service_client::NodeServiceClient;
use rustfs_protos::proto_gen::node_service::{
DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest,
};
@@ -40,6 +43,8 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
use tokio::{net::TcpStream, sync::RwLock, time};
use tokio_util::sync::CancellationToken;
use tonic::Request;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::Channel;
use tracing::{debug, info, warn};
type Client = Arc<Box<dyn PeerS3Client>>;
@@ -587,6 +592,12 @@ impl RemotePeerS3Client {
client
}
pub async fn get_client(&self) -> Result<NodeServiceClient<InterceptedService<Channel, TonicInterceptor>>> {
node_service_time_out_client(&self.addr, TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))
}
pub fn get_addr(&self) -> String {
self.addr.clone()
}
@@ -730,9 +741,7 @@ impl PeerS3Client for RemotePeerS3Client {
self.execute_with_timeout(
|| async {
let options: String = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let mut client = self.get_client().await?;
let request = Request::new(HealBucketRequest {
bucket: bucket.to_string(),
options,
@@ -762,9 +771,7 @@ impl PeerS3Client for RemotePeerS3Client {
self.execute_with_timeout(
|| async {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let mut client = self.get_client().await?;
let request = Request::new(ListBucketRequest { options });
let response = client.list_bucket(request).await?.into_inner();
if !response.success {
@@ -790,9 +797,7 @@ impl PeerS3Client for RemotePeerS3Client {
self.execute_with_timeout(
|| async {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let mut client = self.get_client().await?;
let request = Request::new(MakeBucketRequest {
name: bucket.to_string(),
options,
@@ -818,7 +823,7 @@ impl PeerS3Client for RemotePeerS3Client {
self.execute_with_timeout(
|| async {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(GetBucketInfoRequest {
@@ -845,9 +850,7 @@ impl PeerS3Client for RemotePeerS3Client {
async fn delete_bucket(&self, bucket: &str, _opts: &DeleteBucketOptions) -> Result<()> {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let mut client = self.get_client().await?;
let request = Request::new(DeleteBucketRequest {
bucket: bucket.to_string(),

View File

@@ -21,39 +21,44 @@ use std::{
use bytes::Bytes;
use futures::lock::Mutex;
use http::{HeaderMap, HeaderValue, Method, header::CONTENT_TYPE};
use rustfs_protos::{
node_service_time_out_client,
proto_gen::node_service::{
CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest,
DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest,
ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequest,
StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
},
use rustfs_protos::proto_gen::node_service::{
CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest,
DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest,
ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequest,
StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
node_service_client::NodeServiceClient,
};
use rustfs_utils::string::parse_bool_with_default;
use tokio::time;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use crate::disk::{
CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions,
ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
disk_store::{
CHECK_EVERY, CHECK_TIMEOUT_DURATION, ENV_RUSTFS_DRIVE_ACTIVE_MONITORING, SKIP_IF_SUCCESS_BEFORE, get_max_timeout_duration,
},
endpoint::Endpoint,
};
use crate::disk::{FileReader, FileWriter};
use crate::disk::{disk_store::DiskHealthTracker, error::DiskError};
use crate::{
disk::error::{Error, Result},
rpc::build_auth_headers,
};
use crate::{
disk::{
CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskLocation, DiskOption, FileInfoVersions,
ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
disk_store::{
CHECK_EVERY, CHECK_TIMEOUT_DURATION, ENV_RUSTFS_DRIVE_ACTIVE_MONITORING, SKIP_IF_SUCCESS_BEFORE,
get_max_timeout_duration,
},
endpoint::Endpoint,
},
rpc::client::gen_tonic_signature_interceptor,
};
use crate::{
disk::{FileReader, FileWriter},
rpc::client::{TonicInterceptor, node_service_time_out_client},
};
use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo};
use rustfs_protos::proto_gen::node_service::RenamePartRequest;
use rustfs_rio::{HttpReader, HttpWriter};
use tokio::{io::AsyncWrite, net::TcpStream, time::timeout};
use tonic::Request;
use tonic::{Request, service::interceptor::InterceptedService, transport::Channel};
use uuid::Uuid;
#[derive(Debug)]
@@ -259,6 +264,12 @@ impl RemoteDisk {
}
}
}
async fn get_client(&self) -> Result<NodeServiceClient<InterceptedService<Channel, TonicInterceptor>>> {
node_service_time_out_client(&self.addr, TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))
}
}
// TODO: all api need to handle errors
@@ -343,7 +354,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(MakeVolumeRequest {
@@ -370,7 +382,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(MakeVolumesRequest {
@@ -397,7 +410,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(ListVolumesRequest {
@@ -429,7 +443,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(StatVolumeRequest {
@@ -458,7 +473,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(DeleteVolumeRequest {
@@ -545,7 +561,8 @@ impl DiskAPI for RemoteDisk {
let file_info = serde_json::to_string(&fi)?;
let opts = serde_json::to_string(&opts)?;
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(DeleteVersionRequest {
@@ -603,7 +620,7 @@ impl DiskAPI for RemoteDisk {
}
});
}
let mut client = match node_service_time_out_client(&self.addr).await {
let mut client = match self.get_client().await {
Ok(client) => client,
Err(err) => {
let mut errors = Vec::with_capacity(versions.len());
@@ -674,7 +691,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(DeletePathsRequest {
@@ -703,7 +721,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(WriteMetadataRequest {
@@ -734,7 +753,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(UpdateMetadataRequest {
@@ -772,7 +792,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(ReadVersionRequest {
@@ -804,7 +825,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(ReadXlRequest {
@@ -843,7 +865,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let file_info = serde_json::to_string(&fi)?;
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(RenameDataRequest {
@@ -878,7 +901,8 @@ impl DiskAPI for RemoteDisk {
return Err(DiskError::FaultyDisk);
}
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(ListDirRequest {
@@ -1039,7 +1063,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(RenameFileRequest {
@@ -1069,7 +1094,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(RenamePartRequest {
@@ -1101,7 +1127,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let options = serde_json::to_string(&opt)?;
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(DeleteRequest {
@@ -1131,7 +1158,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let file_info = serde_json::to_string(&fi)?;
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(VerifyFileRequest {
@@ -1160,7 +1188,8 @@ impl DiskAPI for RemoteDisk {
async fn read_parts(&self, bucket: &str, paths: &[String]) -> Result<Vec<ObjectPartInfo>> {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(ReadPartsRequest {
@@ -1190,7 +1219,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let file_info = serde_json::to_string(&fi)?;
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(CheckPartsRequest {
@@ -1222,7 +1252,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let read_multiple_req = serde_json::to_string(&req)?;
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(ReadMultipleRequest {
@@ -1255,7 +1286,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(WriteAllRequest {
@@ -1284,7 +1316,8 @@ impl DiskAPI for RemoteDisk {
self.execute_with_timeout(
|| async {
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(ReadAllRequest {
@@ -1313,7 +1346,8 @@ impl DiskAPI for RemoteDisk {
}
let opts = serde_json::to_string(&opts)?;
let mut client = node_service_time_out_client(&self.addr)
let mut client = self
.get_client()
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(DiskInfoRequest {

View File

@@ -0,0 +1,394 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::rpc::client::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client};
use async_trait::async_trait;
use rustfs_lock::types::{LockId, LockMetadata, LockPriority};
use rustfs_lock::{LockClient, LockError, LockInfo, LockResponse, LockStats, LockStatus, Result};
use rustfs_lock::{LockRequest, LockType};
use rustfs_protos::proto_gen::node_service::node_service_client::NodeServiceClient;
use rustfs_protos::proto_gen::node_service::{GenerallyLockRequest, PingRequest};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tonic::Request;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::Channel;
use tracing::info;
/// Remote lock client implementation
#[derive(Debug)]
pub struct RemoteClient {
addr: String,
// Track active locks with their original owner information
active_locks: Arc<RwLock<HashMap<LockId, String>>>, // lock_id -> owner
}
impl Clone for RemoteClient {
fn clone(&self) -> Self {
Self {
addr: self.addr.clone(),
active_locks: self.active_locks.clone(),
}
}
}
impl RemoteClient {
pub fn new(endpoint: String) -> Self {
Self {
addr: endpoint,
active_locks: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn from_url(url: url::Url) -> Self {
Self {
addr: url.to_string(),
active_locks: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Create a minimal LockRequest for unlock operations
fn create_unlock_request(&self, lock_id: &LockId, owner: &str) -> LockRequest {
LockRequest {
lock_id: lock_id.clone(),
resource: lock_id.resource.clone(),
lock_type: LockType::Exclusive, // Type doesn't matter for unlock
owner: owner.to_string(),
acquire_timeout: std::time::Duration::from_secs(30),
ttl: std::time::Duration::from_secs(300),
metadata: LockMetadata::default(),
priority: LockPriority::Normal,
deadlock_detection: false,
}
}
pub async fn get_client(&self) -> Result<NodeServiceClient<InterceptedService<Channel, TonicInterceptor>>> {
node_service_time_out_client(&self.addr, TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await
.map_err(|err| LockError::internal(format!("can not get client, err: {err}")))
}
}
#[async_trait]
impl LockClient for RemoteClient {
async fn acquire_exclusive(&self, request: &LockRequest) -> Result<LockResponse> {
info!("remote acquire_exclusive for {}", request.resource);
let mut client = self.get_client().await?;
let req = Request::new(GenerallyLockRequest {
args: serde_json::to_string(&request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?,
});
let resp = client
.lock(req)
.await
.map_err(|e| LockError::internal(e.to_string()))?
.into_inner();
// Check for explicit error first
if let Some(error_info) = resp.error_info {
return Err(LockError::internal(error_info));
}
// Check if the lock acquisition was successful
if resp.success {
// Save the lock information for later release
let mut locks = self.active_locks.write().await;
locks.insert(request.lock_id.clone(), request.owner.clone());
Ok(LockResponse::success(
LockInfo {
id: request.lock_id.clone(),
resource: request.resource.clone(),
lock_type: request.lock_type,
status: LockStatus::Acquired,
owner: request.owner.clone(),
acquired_at: std::time::SystemTime::now(),
expires_at: std::time::SystemTime::now() + request.ttl,
last_refreshed: std::time::SystemTime::now(),
metadata: request.metadata.clone(),
priority: request.priority,
wait_start_time: None,
},
std::time::Duration::ZERO,
))
} else {
// Lock acquisition failed
Ok(LockResponse::failure(
"Lock acquisition failed on remote server".to_string(),
std::time::Duration::ZERO,
))
}
}
async fn acquire_shared(&self, request: &LockRequest) -> Result<LockResponse> {
info!("remote acquire_shared for {}", request.resource);
let mut client = self.get_client().await?;
let req = Request::new(GenerallyLockRequest {
args: serde_json::to_string(&request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?,
});
let resp = client
.r_lock(req)
.await
.map_err(|e| LockError::internal(e.to_string()))?
.into_inner();
// Check for explicit error first
if let Some(error_info) = resp.error_info {
return Err(LockError::internal(error_info));
}
// Check if the lock acquisition was successful
if resp.success {
// Save the lock information for later release
let mut locks = self.active_locks.write().await;
locks.insert(request.lock_id.clone(), request.owner.clone());
Ok(LockResponse::success(
LockInfo {
id: request.lock_id.clone(),
resource: request.resource.clone(),
lock_type: request.lock_type,
status: LockStatus::Acquired,
owner: request.owner.clone(),
acquired_at: std::time::SystemTime::now(),
expires_at: std::time::SystemTime::now() + request.ttl,
last_refreshed: std::time::SystemTime::now(),
metadata: request.metadata.clone(),
priority: request.priority,
wait_start_time: None,
},
std::time::Duration::ZERO,
))
} else {
// Lock acquisition failed
Ok(LockResponse::failure(
"Shared lock acquisition failed on remote server".to_string(),
std::time::Duration::ZERO,
))
}
}
async fn release(&self, lock_id: &LockId) -> Result<bool> {
info!("remote release for {}", lock_id);
// Get the original owner for this lock
let owner = {
let locks = self.active_locks.read().await;
locks.get(lock_id).cloned().unwrap_or_else(|| "remote".to_string())
};
let unlock_request = self.create_unlock_request(lock_id, &owner);
let request_string = serde_json::to_string(&unlock_request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?;
let mut client = self.get_client().await?;
// Try UnLock first (for exclusive locks)
let req = Request::new(GenerallyLockRequest {
args: request_string.clone(),
});
let resp = client.un_lock(req).await;
let success = if resp.is_err() {
// If that fails, try RUnLock (for shared locks)
let req = Request::new(GenerallyLockRequest { args: request_string });
let resp = client
.r_un_lock(req)
.await
.map_err(|e| LockError::internal(e.to_string()))?
.into_inner();
if let Some(error_info) = resp.error_info {
return Err(LockError::internal(error_info));
}
resp.success
} else {
let resp = resp.map_err(|e| LockError::internal(e.to_string()))?.into_inner();
if let Some(error_info) = resp.error_info {
return Err(LockError::internal(error_info));
}
resp.success
};
// Remove the lock from our tracking if successful
if success {
let mut locks = self.active_locks.write().await;
locks.remove(lock_id);
}
Ok(success)
}
async fn refresh(&self, lock_id: &LockId) -> Result<bool> {
info!("remote refresh for {}", lock_id);
let refresh_request = self.create_unlock_request(lock_id, "remote");
let mut client = self.get_client().await?;
let req = Request::new(GenerallyLockRequest {
args: serde_json::to_string(&refresh_request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?,
});
let resp = client
.refresh(req)
.await
.map_err(|e| LockError::internal(e.to_string()))?
.into_inner();
if let Some(error_info) = resp.error_info {
return Err(LockError::internal(error_info));
}
Ok(resp.success)
}
async fn force_release(&self, lock_id: &LockId) -> Result<bool> {
info!("remote force_release for {}", lock_id);
let force_request = self.create_unlock_request(lock_id, "remote");
let mut client = self.get_client().await?;
let req = Request::new(GenerallyLockRequest {
args: serde_json::to_string(&force_request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?,
});
let resp = client
.force_un_lock(req)
.await
.map_err(|e| LockError::internal(e.to_string()))?
.into_inner();
if let Some(error_info) = resp.error_info {
return Err(LockError::internal(error_info));
}
Ok(resp.success)
}
async fn check_status(&self, lock_id: &LockId) -> Result<Option<LockInfo>> {
info!("remote check_status for {}", lock_id);
// Since there's no direct status query in the gRPC service,
// we attempt a non-blocking lock acquisition to check if the resource is available
let status_request = self.create_unlock_request(lock_id, "remote");
let mut client = self.get_client().await?;
// Try to acquire a very short-lived lock to test availability
let req = Request::new(GenerallyLockRequest {
args: serde_json::to_string(&status_request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?,
});
// Try exclusive lock first with very short timeout
let resp = client.lock(req).await;
match resp {
Ok(response) => {
let resp = response.into_inner();
if resp.success {
// If we successfully acquired the lock, the resource was free
// Immediately release it
let release_req = Request::new(GenerallyLockRequest {
args: serde_json::to_string(&status_request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?,
});
let _ = client.un_lock(release_req).await; // Best effort release
// Return None since no one was holding the lock
Ok(None)
} else {
// Lock acquisition failed, meaning someone is holding it
// We can't determine the exact details remotely, so return a generic status
Ok(Some(LockInfo {
id: lock_id.clone(),
resource: lock_id.as_str().to_string(),
lock_type: LockType::Exclusive, // We can't know the exact type
status: LockStatus::Acquired,
owner: "unknown".to_string(), // Remote client can't determine owner
acquired_at: std::time::SystemTime::now(),
expires_at: std::time::SystemTime::now() + std::time::Duration::from_secs(3600),
last_refreshed: std::time::SystemTime::now(),
metadata: LockMetadata::default(),
priority: LockPriority::Normal,
wait_start_time: None,
}))
}
}
Err(_) => {
// Communication error or lock is held
Ok(Some(LockInfo {
id: lock_id.clone(),
resource: lock_id.as_str().to_string(),
lock_type: LockType::Exclusive,
status: LockStatus::Acquired,
owner: "unknown".to_string(),
acquired_at: std::time::SystemTime::now(),
expires_at: std::time::SystemTime::now() + std::time::Duration::from_secs(3600),
last_refreshed: std::time::SystemTime::now(),
metadata: LockMetadata::default(),
priority: LockPriority::Normal,
wait_start_time: None,
}))
}
}
}
async fn get_stats(&self) -> Result<LockStats> {
info!("remote get_stats from {}", self.addr);
// Since there's no direct statistics endpoint in the gRPC service,
// we return basic stats indicating this is a remote client
let stats = LockStats {
last_updated: std::time::SystemTime::now(),
..Default::default()
};
// We could potentially enhance this by:
// 1. Keeping local counters of operations performed
// 2. Adding a stats gRPC method to the service
// 3. Querying server health endpoints
// For now, return minimal stats indicating remote connectivity
Ok(stats)
}
async fn close(&self) -> Result<()> {
Ok(())
}
async fn is_online(&self) -> bool {
// Use Ping interface to test if remote service is online
let mut client = match self.get_client().await {
Ok(client) => client,
Err(_) => {
info!("remote client {} connection failed", self.addr);
return false;
}
};
let ping_req = Request::new(PingRequest {
version: 1,
body: bytes::Bytes::new(),
});
match client.ping(ping_req).await {
Ok(_) => {
info!("remote client {} is online", self.addr);
true
}
Err(_) => {
info!("remote client {} ping failed", self.addr);
false
}
}
}
async fn is_local(&self) -> bool {
false
}
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
pub mod local;
pub mod remote;
// pub mod remote;
use async_trait::async_trait;
use std::sync::Arc;
@@ -74,10 +74,10 @@ impl ClientFactory {
Arc::new(local::LocalClient::new())
}
/// Create remote client
pub fn create_remote(endpoint: String) -> Arc<dyn LockClient> {
Arc::new(remote::RemoteClient::new(endpoint))
}
// /// Create remote client
// pub fn create_remote(endpoint: String) -> Arc<dyn LockClient> {
// Arc::new(remote::RemoteClient::new(endpoint))
// }
}
#[cfg(test)]
@@ -85,15 +85,6 @@ mod tests {
use super::*;
use crate::types::LockType;
#[tokio::test]
async fn test_client_factory() {
let local_client = ClientFactory::create_local();
assert!(local_client.is_local().await);
let remote_client = ClientFactory::create_remote("http://localhost:8080".to_string());
assert!(!remote_client.is_local().await);
}
#[tokio::test]
async fn test_local_client_basic_operations() {
let client = ClientFactory::create_local();

View File

@@ -14,7 +14,7 @@
use async_trait::async_trait;
use rustfs_protos::{
node_service_time_out_client,
node_service_time_out_client, node_service_time_out_client_no_auth,
proto_gen::node_service::{GenerallyLockRequest, PingRequest},
};
use std::collections::HashMap;
@@ -82,7 +82,7 @@ impl RemoteClient {
impl LockClient for RemoteClient {
async fn acquire_exclusive(&self, request: &LockRequest) -> Result<LockResponse> {
info!("remote acquire_exclusive for {}", request.resource);
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?;
let req = Request::new(GenerallyLockRequest {
@@ -133,7 +133,7 @@ impl LockClient for RemoteClient {
async fn acquire_shared(&self, request: &LockRequest) -> Result<LockResponse> {
info!("remote acquire_shared for {}", request.resource);
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?;
let req = Request::new(GenerallyLockRequest {
@@ -195,7 +195,7 @@ impl LockClient for RemoteClient {
let request_string = serde_json::to_string(&unlock_request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?;
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?;
@@ -238,7 +238,7 @@ impl LockClient for RemoteClient {
async fn refresh(&self, lock_id: &LockId) -> Result<bool> {
info!("remote refresh for {}", lock_id);
let refresh_request = self.create_unlock_request(lock_id, "remote");
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?;
let req = Request::new(GenerallyLockRequest {
@@ -259,7 +259,7 @@ impl LockClient for RemoteClient {
async fn force_release(&self, lock_id: &LockId) -> Result<bool> {
info!("remote force_release for {}", lock_id);
let force_request = self.create_unlock_request(lock_id, "remote");
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?;
let req = Request::new(GenerallyLockRequest {
@@ -283,7 +283,7 @@ impl LockClient for RemoteClient {
// Since there's no direct status query in the gRPC service,
// we attempt a non-blocking lock acquisition to check if the resource is available
let status_request = self.create_unlock_request(lock_id, "remote");
let mut client = node_service_time_out_client(&self.addr)
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?;
@@ -372,7 +372,7 @@ impl LockClient for RemoteClient {
async fn is_online(&self) -> bool {
// Use Ping interface to test if remote service is online
let mut client = match node_service_time_out_client(&self.addr).await {
let mut client = match self.get_client().await {
Ok(client) => client,
Err(_) => {
info!("remote client {} connection failed", self.addr);

View File

@@ -37,7 +37,7 @@ pub mod types;
// Re-export main types for easy access
pub use crate::{
// Client interfaces
client::{LockClient, local::LocalClient, remote::RemoteClient},
client::{LockClient, local::LocalClient},
// Error types
error::{LockError, Result},
// Fast Lock System exports

View File

@@ -20,11 +20,10 @@ use rustfs_common::{GLOBAL_CONN_MAP, GLOBAL_MTLS_IDENTITY, GLOBAL_ROOT_CERT, evi
use std::{error::Error, time::Duration};
use tonic::{
Request, Status,
metadata::MetadataValue,
service::interceptor::InterceptedService,
transport::{Certificate, Channel, ClientTlsConfig, Endpoint},
};
use tracing::{debug, error, warn};
use tracing::{debug, warn};
// Type alias for the complex client type
pub type NodeServiceClientType = NodeServiceClient<
@@ -64,7 +63,7 @@ const RUSTFS_HTTPS_PREFIX: &str = "https://";
/// - Aggressive TCP keepalive (10s)
/// - HTTP/2 PING every 5s, timeout at 3s
/// - Overall RPC timeout of 30s (reduced from 60s)
async fn create_new_channel(addr: &str) -> Result<Channel, Box<dyn Error>> {
pub async fn create_new_channel(addr: &str) -> Result<Channel, Box<dyn Error>> {
debug!("Creating new gRPC channel to: {}", addr);
let mut connector = Endpoint::from_shared(addr.to_string())?
@@ -131,90 +130,6 @@ async fn create_new_channel(addr: &str) -> Result<Channel, Box<dyn Error>> {
Ok(channel)
}
/// Get a gRPC client for the NodeService with robust connection handling.
///
/// This function implements several resilience features:
/// 1. Connection caching for performance
/// 2. Automatic eviction of stale/dead connections on error
/// 3. Optimized keepalive settings for fast dead peer detection
/// 4. Reduced timeouts to fail fast when peers are unresponsive
///
/// # Connection Lifecycle
/// - Cached connections are reused for subsequent calls
/// - On any connection error, the cached connection is evicted
/// - Fresh connections are established with aggressive keepalive settings
///
/// # Cluster Power-Off Recovery
/// When a node experiences abrupt power-off:
/// 1. The cached connection will fail on next use
/// 2. The connection is automatically evicted from cache
/// 3. Subsequent calls will attempt fresh connections
/// 4. If node is still down, connection will fail fast (3s timeout)
pub async fn node_service_time_out_client(
addr: &String,
) -> Result<
NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>,
Box<dyn Error>,
> {
debug!("Obtaining gRPC client for NodeService at: {}", addr);
let token_str = rustfs_credentials::get_grpc_token();
let token: MetadataValue<_> = token_str.parse().map_err(|e| {
error!(
"Failed to parse gRPC auth token into MetadataValue: {:?}; env={} token_len={} token_prefix={}",
e,
rustfs_credentials::ENV_GRPC_AUTH_TOKEN,
token_str.len(),
token_str.chars().take(2).collect::<String>(),
);
e
})?;
// Try to get cached channel
let cached_channel = { GLOBAL_CONN_MAP.read().await.get(addr).cloned() };
let channel = match cached_channel {
Some(channel) => {
debug!("Using cached gRPC channel for: {}", addr);
channel
}
None => {
// No cached connection, create new one
create_new_channel(addr).await?
}
};
Ok(NodeServiceClient::with_interceptor(
channel,
Box::new(move |mut req: Request<()>| {
req.metadata_mut().insert("authorization", token.clone());
Ok(req)
}),
))
}
/// Get a gRPC client with automatic connection eviction on failure.
///
/// This is the preferred method for cluster operations as it ensures
/// that failed connections are automatically cleaned up from the cache.
///
/// Returns the client and the address for later eviction if needed.
pub async fn node_service_client_with_eviction(
addr: &String,
) -> Result<
(
NodeServiceClient<
InterceptedService<Channel, Box<dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static>>,
>,
String,
),
Box<dyn Error>,
> {
let client = node_service_time_out_client(addr).await?;
Ok((client, addr.clone()))
}
/// Evict a connection from the cache after a failure.
/// This should be called when an RPC fails to ensure fresh connections are tried.
pub async fn evict_failed_connection(addr: &str) {

View File

@@ -21,7 +21,7 @@ use crate::server::{ReadinessGateLayer, RemoteAddr, ServiceState, ServiceStateMa
use crate::storage;
use crate::storage::tonic_service::make_server;
use bytes::Bytes;
use http::{HeaderMap, Request as HttpRequest, Response};
use http::{HeaderMap, Method, Request as HttpRequest, Response};
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder as ConnBuilder,
@@ -31,6 +31,7 @@ use hyper_util::{
use metrics::{counter, histogram};
use rustfs_common::GlobalReadiness;
use rustfs_config::{MI_B, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_ecstore::rpc::{TONIC_RPC_PREFIX, verify_rpc_signature};
use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_utils::net::parse_and_resolve_address;
use rustls::ServerConfig;
@@ -42,7 +43,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
use tokio_rustls::TlsAcceptor;
use tonic::{Request, Status, metadata::MetadataValue};
use tonic::{Request, Status};
use tower::ServiceBuilder;
use tower_http::add_extension::AddExtensionLayer;
use tower_http::catch_panic::CatchPanicLayer;
@@ -722,17 +723,11 @@ fn handle_connection_error(err: &(dyn std::error::Error + 'static)) {
#[allow(clippy::result_large_err)]
fn check_auth(req: Request<()>) -> std::result::Result<Request<()>, Status> {
let token_str = rustfs_credentials::get_grpc_token();
let token: MetadataValue<_> = token_str.parse().map_err(|e| {
error!("Failed to parse RUSTFS_GRPC_AUTH_TOKEN into gRPC metadata value: {}", e);
Status::internal("Invalid auth token configuration")
verify_rpc_signature(TONIC_RPC_PREFIX, &Method::GET, req.metadata().as_ref()).map_err(|e| {
error!("RPC signature verification failed: {}", e);
Status::unauthenticated("No valid auth token")
})?;
match req.metadata().get("authorization") {
Some(t) if token == t => Ok(req),
_ => Err(Status::unauthenticated("No valid auth token")),
}
Ok(req)
}
/// Determines the listen backlog size.