From b142563127ce1fe0e4b84ade403155c4a6928229 Mon Sep 17 00:00:00 2001 From: weisd Date: Mon, 5 Jan 2026 21:52:04 +0800 Subject: [PATCH] fix rpc client (#1393) --- crates/ecstore/src/admin_server_info.rs | 6 +- crates/ecstore/src/disk/disk_store.rs | 4 +- crates/ecstore/src/rpc/peer_s3_client.rs | 14 +- crates/ecstore/src/rpc/remote_disk.rs | 6 +- crates/lock/src/client/remote.rs | 403 ----------------------- rustfs/src/init.rs | 4 +- rustfs/src/server/cert.rs | 4 +- rustfs/src/storage/tonic_service.rs | 4 +- 8 files changed, 19 insertions(+), 426 deletions(-) delete mode 100644 crates/lock/src/client/remote.rs diff --git a/crates/ecstore/src/admin_server_info.rs b/crates/ecstore/src/admin_server_info.rs index 13187790..54f9b981 100644 --- a/crates/ecstore/src/admin_server_info.rs +++ b/crates/ecstore/src/admin_server_info.rs @@ -14,7 +14,7 @@ use crate::data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT, load_data_usage_from_backend}; use crate::error::{Error, Result}; -use crate::rpc::node_service_time_out_client_no_auth; +use crate::rpc::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client}; use crate::{ disk::endpoint::Endpoint, global::{GLOBAL_BOOT_TIME, GLOBAL_Endpoints}, @@ -101,9 +101,9 @@ async fn is_server_resolvable(endpoint: &Endpoint) -> Result<()> { let decoded_payload = flatbuffers::root::(finished_data); assert!(decoded_payload.is_ok()); - let mut client = node_service_time_out_client_no_auth(&addr) + let mut client = node_service_time_out_client(&addr, TonicInterceptor::Signature(gen_tonic_signature_interceptor())) .await - .map_err(|err| Error::other(err.to_string()))?; + .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; let request = Request::new(PingRequest { version: 1, diff --git a/crates/ecstore/src/disk/disk_store.rs b/crates/ecstore/src/disk/disk_store.rs index 0f8fbf4d..baab323f 100644 --- a/crates/ecstore/src/disk/disk_store.rs +++ b/crates/ecstore/src/disk/disk_store.rs @@ -384,7 +384,7 @@ impl LocalDiskWrapper { let stored_disk_id = self.disk.get_disk_id().await?; if stored_disk_id != want_id { - return Err(Error::other(format!("Disk ID mismatch wanted {:?}, got {:?}", want_id, stored_disk_id))); + return Err(Error::other(format!("Disk ID mismatch wanted {want_id:?}, got {stored_disk_id:?}"))); } Ok(()) @@ -468,7 +468,7 @@ impl LocalDiskWrapper { // Timeout occurred, mark disk as potentially faulty and decrement waiting counter self.health.decrement_waiting(); warn!("disk operation timeout after {:?}", timeout_duration); - Err(DiskError::other(format!("disk operation timeout after {:?}", timeout_duration))) + Err(DiskError::other(format!("disk operation timeout after {timeout_duration:?}"))) } } } diff --git a/crates/ecstore/src/rpc/peer_s3_client.rs b/crates/ecstore/src/rpc/peer_s3_client.rs index 90d684eb..16522548 100644 --- a/crates/ecstore/src/rpc/peer_s3_client.rs +++ b/crates/ecstore/src/rpc/peer_s3_client.rs @@ -18,9 +18,7 @@ use crate::disk::error::{Error, Result}; use crate::disk::error_reduce::{BUCKET_OP_IGNORED_ERRS, is_all_buckets_not_found, reduce_write_quorum_errs}; use crate::disk::{DiskAPI, DiskStore, disk_store::get_max_timeout_duration}; use crate::global::GLOBAL_LOCAL_DISK_MAP; -use crate::rpc::client::{ - TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client, node_service_time_out_client_no_auth, -}; +use crate::rpc::client::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client}; use crate::store::all_local_disk; use crate::store_utils::is_reserved_or_invalid_bucket; use crate::{ @@ -675,7 +673,7 @@ impl RemotePeerS3Client { async fn perform_connectivity_check(addr: &str) -> Result<()> { use tokio::time::timeout; - let url = url::Url::parse(addr).map_err(|e| Error::other(format!("Invalid URL: {}", e)))?; + let url = url::Url::parse(addr).map_err(|e| Error::other(format!("Invalid URL: {e}")))?; let Some(host) = url.host_str() else { return Err(Error::other("No host in URL".to_string())); @@ -686,7 +684,7 @@ impl RemotePeerS3Client { // Try to establish TCP connection match timeout(CHECK_TIMEOUT_DURATION, TcpStream::connect((host, port))).await { Ok(Ok(_)) => Ok(()), - _ => Err(Error::other(format!("Cannot connect to {}:{}", host, port))), + _ => Err(Error::other(format!("Cannot connect to {host}:{port}"))), } } @@ -725,7 +723,7 @@ impl RemotePeerS3Client { // Timeout occurred, mark peer as potentially faulty self.health.decrement_waiting(); warn!("Remote peer operation timeout after {:?}", timeout_duration); - Err(Error::other(format!("Remote peer operation timeout after {:?}", timeout_duration))) + Err(Error::other(format!("Remote peer operation timeout after {timeout_duration:?}"))) } } } @@ -823,9 +821,7 @@ impl PeerS3Client for RemotePeerS3Client { self.execute_with_timeout( || async { let options = serde_json::to_string(opts)?; - let mut client = node_service_time_out_client_no_auth(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; + let mut client = self.get_client().await?; let request = Request::new(GetBucketInfoRequest { bucket: bucket.to_string(), options, diff --git a/crates/ecstore/src/rpc/remote_disk.rs b/crates/ecstore/src/rpc/remote_disk.rs index 78f39f55..f4f03468 100644 --- a/crates/ecstore/src/rpc/remote_disk.rs +++ b/crates/ecstore/src/rpc/remote_disk.rs @@ -206,7 +206,7 @@ impl RemoteDisk { /// Perform basic connectivity check for remote disk async fn perform_connectivity_check(addr: &str) -> Result<()> { - let url = url::Url::parse(addr).map_err(|e| Error::other(format!("Invalid URL: {}", e)))?; + let url = url::Url::parse(addr).map_err(|e| Error::other(format!("Invalid URL: {e}")))?; let Some(host) = url.host_str() else { return Err(Error::other("No host in URL".to_string())); @@ -220,7 +220,7 @@ impl RemoteDisk { drop(stream); Ok(()) } - _ => Err(Error::other(format!("Cannot connect to {}:{}", host, port))), + _ => Err(Error::other(format!("Cannot connect to {host}:{port}"))), } } @@ -260,7 +260,7 @@ impl RemoteDisk { // Timeout occurred, mark disk as potentially faulty self.health.decrement_waiting(); warn!("Remote disk operation timeout after {:?}", timeout_duration); - Err(Error::other(format!("Remote disk operation timeout after {:?}", timeout_duration))) + Err(Error::other(format!("Remote disk operation timeout after {timeout_duration:?}"))) } } } diff --git a/crates/lock/src/client/remote.rs b/crates/lock/src/client/remote.rs deleted file mode 100644 index e69a82f4..00000000 --- a/crates/lock/src/client/remote.rs +++ /dev/null @@ -1,403 +0,0 @@ -// Copyright 2024 RustFS Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use async_trait::async_trait; -use rustfs_protos::{ - node_service_time_out_client, node_service_time_out_client_no_auth, - proto_gen::node_service::{GenerallyLockRequest, PingRequest}, -}; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::RwLock; -use tonic::Request; -use tracing::info; - -use crate::{ - error::{LockError, Result}, - types::{LockId, LockInfo, LockRequest, LockResponse, LockStats}, -}; - -use super::LockClient; - -/// Remote lock client implementation -#[derive(Debug)] -pub struct RemoteClient { - addr: String, - // Track active locks with their original owner information - active_locks: Arc>>, // lock_id -> owner -} - -impl Clone for RemoteClient { - fn clone(&self) -> Self { - Self { - addr: self.addr.clone(), - active_locks: self.active_locks.clone(), - } - } -} - -impl RemoteClient { - pub fn new(endpoint: String) -> Self { - Self { - addr: endpoint, - active_locks: Arc::new(RwLock::new(HashMap::new())), - } - } - - pub fn from_url(url: url::Url) -> Self { - Self { - addr: url.to_string(), - active_locks: Arc::new(RwLock::new(HashMap::new())), - } - } - - /// Create a minimal LockRequest for unlock operations - fn create_unlock_request(&self, lock_id: &LockId, owner: &str) -> LockRequest { - LockRequest { - lock_id: lock_id.clone(), - resource: lock_id.resource.clone(), - lock_type: crate::types::LockType::Exclusive, // Type doesn't matter for unlock - owner: owner.to_string(), - acquire_timeout: std::time::Duration::from_secs(30), - ttl: std::time::Duration::from_secs(300), - metadata: crate::types::LockMetadata::default(), - priority: crate::types::LockPriority::Normal, - deadlock_detection: false, - } - } -} - -#[async_trait] -impl LockClient for RemoteClient { - async fn acquire_exclusive(&self, request: &LockRequest) -> Result { - info!("remote acquire_exclusive for {}", request.resource); - let mut client = node_service_time_out_client_no_auth(&self.addr) - .await - .map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?; - let req = Request::new(GenerallyLockRequest { - args: serde_json::to_string(&request) - .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?, - }); - let resp = client - .lock(req) - .await - .map_err(|e| LockError::internal(e.to_string()))? - .into_inner(); - - // Check for explicit error first - if let Some(error_info) = resp.error_info { - return Err(LockError::internal(error_info)); - } - - // Check if the lock acquisition was successful - if resp.success { - // Save the lock information for later release - let mut locks = self.active_locks.write().await; - locks.insert(request.lock_id.clone(), request.owner.clone()); - - Ok(LockResponse::success( - LockInfo { - id: request.lock_id.clone(), - resource: request.resource.clone(), - lock_type: request.lock_type, - status: crate::types::LockStatus::Acquired, - owner: request.owner.clone(), - acquired_at: std::time::SystemTime::now(), - expires_at: std::time::SystemTime::now() + request.ttl, - last_refreshed: std::time::SystemTime::now(), - metadata: request.metadata.clone(), - priority: request.priority, - wait_start_time: None, - }, - std::time::Duration::ZERO, - )) - } else { - // Lock acquisition failed - Ok(LockResponse::failure( - "Lock acquisition failed on remote server".to_string(), - std::time::Duration::ZERO, - )) - } - } - - async fn acquire_shared(&self, request: &LockRequest) -> Result { - info!("remote acquire_shared for {}", request.resource); - let mut client = node_service_time_out_client_no_auth(&self.addr) - .await - .map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?; - let req = Request::new(GenerallyLockRequest { - args: serde_json::to_string(&request) - .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?, - }); - let resp = client - .r_lock(req) - .await - .map_err(|e| LockError::internal(e.to_string()))? - .into_inner(); - - // Check for explicit error first - if let Some(error_info) = resp.error_info { - return Err(LockError::internal(error_info)); - } - - // Check if the lock acquisition was successful - if resp.success { - // Save the lock information for later release - let mut locks = self.active_locks.write().await; - locks.insert(request.lock_id.clone(), request.owner.clone()); - - Ok(LockResponse::success( - LockInfo { - id: request.lock_id.clone(), - resource: request.resource.clone(), - lock_type: request.lock_type, - status: crate::types::LockStatus::Acquired, - owner: request.owner.clone(), - acquired_at: std::time::SystemTime::now(), - expires_at: std::time::SystemTime::now() + request.ttl, - last_refreshed: std::time::SystemTime::now(), - metadata: request.metadata.clone(), - priority: request.priority, - wait_start_time: None, - }, - std::time::Duration::ZERO, - )) - } else { - // Lock acquisition failed - Ok(LockResponse::failure( - "Shared lock acquisition failed on remote server".to_string(), - std::time::Duration::ZERO, - )) - } - } - - async fn release(&self, lock_id: &LockId) -> Result { - info!("remote release for {}", lock_id); - - // Get the original owner for this lock - let owner = { - let locks = self.active_locks.read().await; - locks.get(lock_id).cloned().unwrap_or_else(|| "remote".to_string()) - }; - - let unlock_request = self.create_unlock_request(lock_id, &owner); - - let request_string = serde_json::to_string(&unlock_request) - .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?; - let mut client = node_service_time_out_client_no_auth(&self.addr) - .await - .map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?; - - // Try UnLock first (for exclusive locks) - let req = Request::new(GenerallyLockRequest { - args: request_string.clone(), - }); - let resp = client.un_lock(req).await; - - let success = if resp.is_err() { - // If that fails, try RUnLock (for shared locks) - let req = Request::new(GenerallyLockRequest { args: request_string }); - let resp = client - .r_un_lock(req) - .await - .map_err(|e| LockError::internal(e.to_string()))? - .into_inner(); - if let Some(error_info) = resp.error_info { - return Err(LockError::internal(error_info)); - } - resp.success - } else { - let resp = resp.map_err(|e| LockError::internal(e.to_string()))?.into_inner(); - - if let Some(error_info) = resp.error_info { - return Err(LockError::internal(error_info)); - } - resp.success - }; - - // Remove the lock from our tracking if successful - if success { - let mut locks = self.active_locks.write().await; - locks.remove(lock_id); - } - - Ok(success) - } - - async fn refresh(&self, lock_id: &LockId) -> Result { - info!("remote refresh for {}", lock_id); - let refresh_request = self.create_unlock_request(lock_id, "remote"); - let mut client = node_service_time_out_client_no_auth(&self.addr) - .await - .map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?; - let req = Request::new(GenerallyLockRequest { - args: serde_json::to_string(&refresh_request) - .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?, - }); - let resp = client - .refresh(req) - .await - .map_err(|e| LockError::internal(e.to_string()))? - .into_inner(); - if let Some(error_info) = resp.error_info { - return Err(LockError::internal(error_info)); - } - Ok(resp.success) - } - - async fn force_release(&self, lock_id: &LockId) -> Result { - info!("remote force_release for {}", lock_id); - let force_request = self.create_unlock_request(lock_id, "remote"); - let mut client = node_service_time_out_client_no_auth(&self.addr) - .await - .map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?; - let req = Request::new(GenerallyLockRequest { - args: serde_json::to_string(&force_request) - .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?, - }); - let resp = client - .force_un_lock(req) - .await - .map_err(|e| LockError::internal(e.to_string()))? - .into_inner(); - if let Some(error_info) = resp.error_info { - return Err(LockError::internal(error_info)); - } - Ok(resp.success) - } - - async fn check_status(&self, lock_id: &LockId) -> Result> { - info!("remote check_status for {}", lock_id); - - // Since there's no direct status query in the gRPC service, - // we attempt a non-blocking lock acquisition to check if the resource is available - let status_request = self.create_unlock_request(lock_id, "remote"); - let mut client = node_service_time_out_client_no_auth(&self.addr) - .await - .map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?; - - // Try to acquire a very short-lived lock to test availability - let req = Request::new(GenerallyLockRequest { - args: serde_json::to_string(&status_request) - .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?, - }); - - // Try exclusive lock first with very short timeout - let resp = client.lock(req).await; - - match resp { - Ok(response) => { - let resp = response.into_inner(); - if resp.success { - // If we successfully acquired the lock, the resource was free - // Immediately release it - let release_req = Request::new(GenerallyLockRequest { - args: serde_json::to_string(&status_request) - .map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?, - }); - let _ = client.un_lock(release_req).await; // Best effort release - - // Return None since no one was holding the lock - Ok(None) - } else { - // Lock acquisition failed, meaning someone is holding it - // We can't determine the exact details remotely, so return a generic status - Ok(Some(LockInfo { - id: lock_id.clone(), - resource: lock_id.as_str().to_string(), - lock_type: crate::types::LockType::Exclusive, // We can't know the exact type - status: crate::types::LockStatus::Acquired, - owner: "unknown".to_string(), // Remote client can't determine owner - acquired_at: std::time::SystemTime::now(), - expires_at: std::time::SystemTime::now() + std::time::Duration::from_secs(3600), - last_refreshed: std::time::SystemTime::now(), - metadata: crate::types::LockMetadata::default(), - priority: crate::types::LockPriority::Normal, - wait_start_time: None, - })) - } - } - Err(_) => { - // Communication error or lock is held - Ok(Some(LockInfo { - id: lock_id.clone(), - resource: lock_id.as_str().to_string(), - lock_type: crate::types::LockType::Exclusive, - status: crate::types::LockStatus::Acquired, - owner: "unknown".to_string(), - acquired_at: std::time::SystemTime::now(), - expires_at: std::time::SystemTime::now() + std::time::Duration::from_secs(3600), - last_refreshed: std::time::SystemTime::now(), - metadata: crate::types::LockMetadata::default(), - priority: crate::types::LockPriority::Normal, - wait_start_time: None, - })) - } - } - } - - async fn get_stats(&self) -> Result { - info!("remote get_stats from {}", self.addr); - - // Since there's no direct statistics endpoint in the gRPC service, - // we return basic stats indicating this is a remote client - let stats = LockStats { - last_updated: std::time::SystemTime::now(), - ..Default::default() - }; - - // We could potentially enhance this by: - // 1. Keeping local counters of operations performed - // 2. Adding a stats gRPC method to the service - // 3. Querying server health endpoints - - // For now, return minimal stats indicating remote connectivity - Ok(stats) - } - - async fn close(&self) -> Result<()> { - Ok(()) - } - - async fn is_online(&self) -> bool { - // Use Ping interface to test if remote service is online - let mut client = match self.get_client().await { - Ok(client) => client, - Err(_) => { - info!("remote client {} connection failed", self.addr); - return false; - } - }; - - let ping_req = Request::new(PingRequest { - version: 1, - body: bytes::Bytes::new(), - }); - - match client.ping(ping_req).await { - Ok(_) => { - info!("remote client {} is online", self.addr); - true - } - Err(_) => { - info!("remote client {} ping failed", self.addr); - false - } - } - } - - async fn is_local(&self) -> bool { - false - } -} diff --git a/rustfs/src/init.rs b/rustfs/src/init.rs index b24088a0..66a016b9 100644 --- a/rustfs/src/init.rs +++ b/rustfs/src/init.rs @@ -336,7 +336,7 @@ pub async fn init_ftp_system( let ftps_address_str = rustfs_utils::get_env_str(rustfs_config::ENV_FTPS_ADDRESS, rustfs_config::DEFAULT_FTPS_ADDRESS); let addr: SocketAddr = ftps_address_str .parse() - .map_err(|e| format!("Invalid FTPS address '{}': {}", ftps_address_str, e))?; + .map_err(|e| format!("Invalid FTPS address '{ftps_address_str}': {e}"))?; // Get FTPS configuration from environment variables let cert_file = rustfs_utils::get_env_opt_str(rustfs_config::ENV_FTPS_CERTS_FILE); @@ -402,7 +402,7 @@ pub async fn init_sftp_system( let sftp_address_str = rustfs_utils::get_env_str(rustfs_config::ENV_SFTP_ADDRESS, rustfs_config::DEFAULT_SFTP_ADDRESS); let addr: SocketAddr = sftp_address_str .parse() - .map_err(|e| format!("Invalid SFTP address '{}': {}", sftp_address_str, e))?; + .map_err(|e| format!("Invalid SFTP address '{sftp_address_str}': {e}"))?; // Get SFTP configuration from environment variables let host_key = rustfs_utils::get_env_opt_str(rustfs_config::ENV_SFTP_HOST_KEY); diff --git a/rustfs/src/server/cert.rs b/rustfs/src/server/cert.rs index 18f8656a..68b9bc14 100644 --- a/rustfs/src/server/cert.rs +++ b/rustfs/src/server/cert.rs @@ -26,7 +26,7 @@ pub enum RustFSError { impl std::fmt::Display for RustFSError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - RustFSError::Cert(msg) => write!(f, "Certificate error: {}", msg), + RustFSError::Cert(msg) => write!(f, "Certificate error: {msg}"), } } } @@ -78,7 +78,7 @@ fn parse_pem_private_key(pem: &[u8]) -> Result, RustFSErr async fn read_file(path: &PathBuf, desc: &str) -> Result, RustFSError> { tokio::fs::read(path) .await - .map_err(|e| RustFSError::Cert(format!("read {} {:?}: {e}", desc, path))) + .map_err(|e| RustFSError::Cert(format!("read {desc} {path:?}: {e}"))) } /// Initialize TLS material for both server and outbound client connections. diff --git a/rustfs/src/storage/tonic_service.rs b/rustfs/src/storage/tonic_service.rs index 1f173da5..4787aeb5 100644 --- a/rustfs/src/storage/tonic_service.rs +++ b/rustfs/src/storage/tonic_service.rs @@ -1784,7 +1784,7 @@ impl Node for NodeService { return Ok(Response::new(GetMetricsResponse { success: false, realtime_metrics: Bytes::new(), - error_info: Some(format!("Invalid metric_type: {}", err)), + error_info: Some(format!("Invalid metric_type: {err}")), })); } }; @@ -1798,7 +1798,7 @@ impl Node for NodeService { return Ok(Response::new(GetMetricsResponse { success: false, realtime_metrics: Bytes::new(), - error_info: Some(format!("Invalid opts: {}", err)), + error_info: Some(format!("Invalid opts: {err}")), })); } };