fix rpc client (#1393)

This commit is contained in:
weisd
2026-01-05 21:52:04 +08:00
committed by GitHub
parent 5660208e89
commit b142563127
8 changed files with 19 additions and 426 deletions

View File

@@ -14,7 +14,7 @@
use crate::data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT, load_data_usage_from_backend};
use crate::error::{Error, Result};
use crate::rpc::node_service_time_out_client_no_auth;
use crate::rpc::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client};
use crate::{
disk::endpoint::Endpoint,
global::{GLOBAL_BOOT_TIME, GLOBAL_Endpoints},
@@ -101,9 +101,9 @@ async fn is_server_resolvable(endpoint: &Endpoint) -> Result<()> {
let decoded_payload = flatbuffers::root::<PingBody>(finished_data);
assert!(decoded_payload.is_ok());
let mut client = node_service_time_out_client_no_auth(&addr)
let mut client = node_service_time_out_client(&addr, TonicInterceptor::Signature(gen_tonic_signature_interceptor()))
.await
.map_err(|err| Error::other(err.to_string()))?;
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let request = Request::new(PingRequest {
version: 1,

View File

@@ -384,7 +384,7 @@ impl LocalDiskWrapper {
let stored_disk_id = self.disk.get_disk_id().await?;
if stored_disk_id != want_id {
return Err(Error::other(format!("Disk ID mismatch wanted {:?}, got {:?}", want_id, stored_disk_id)));
return Err(Error::other(format!("Disk ID mismatch wanted {want_id:?}, got {stored_disk_id:?}")));
}
Ok(())
@@ -468,7 +468,7 @@ impl LocalDiskWrapper {
// Timeout occurred, mark disk as potentially faulty and decrement waiting counter
self.health.decrement_waiting();
warn!("disk operation timeout after {:?}", timeout_duration);
Err(DiskError::other(format!("disk operation timeout after {:?}", timeout_duration)))
Err(DiskError::other(format!("disk operation timeout after {timeout_duration:?}")))
}
}
}

View File

@@ -18,9 +18,7 @@ use crate::disk::error::{Error, Result};
use crate::disk::error_reduce::{BUCKET_OP_IGNORED_ERRS, is_all_buckets_not_found, reduce_write_quorum_errs};
use crate::disk::{DiskAPI, DiskStore, disk_store::get_max_timeout_duration};
use crate::global::GLOBAL_LOCAL_DISK_MAP;
use crate::rpc::client::{
TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client, node_service_time_out_client_no_auth,
};
use crate::rpc::client::{TonicInterceptor, gen_tonic_signature_interceptor, node_service_time_out_client};
use crate::store::all_local_disk;
use crate::store_utils::is_reserved_or_invalid_bucket;
use crate::{
@@ -675,7 +673,7 @@ impl RemotePeerS3Client {
async fn perform_connectivity_check(addr: &str) -> Result<()> {
use tokio::time::timeout;
let url = url::Url::parse(addr).map_err(|e| Error::other(format!("Invalid URL: {}", e)))?;
let url = url::Url::parse(addr).map_err(|e| Error::other(format!("Invalid URL: {e}")))?;
let Some(host) = url.host_str() else {
return Err(Error::other("No host in URL".to_string()));
@@ -686,7 +684,7 @@ impl RemotePeerS3Client {
// Try to establish TCP connection
match timeout(CHECK_TIMEOUT_DURATION, TcpStream::connect((host, port))).await {
Ok(Ok(_)) => Ok(()),
_ => Err(Error::other(format!("Cannot connect to {}:{}", host, port))),
_ => Err(Error::other(format!("Cannot connect to {host}:{port}"))),
}
}
@@ -725,7 +723,7 @@ impl RemotePeerS3Client {
// Timeout occurred, mark peer as potentially faulty
self.health.decrement_waiting();
warn!("Remote peer operation timeout after {:?}", timeout_duration);
Err(Error::other(format!("Remote peer operation timeout after {:?}", timeout_duration)))
Err(Error::other(format!("Remote peer operation timeout after {timeout_duration:?}")))
}
}
}
@@ -823,9 +821,7 @@ impl PeerS3Client for RemotePeerS3Client {
self.execute_with_timeout(
|| async {
let options = serde_json::to_string(opts)?;
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
let mut client = self.get_client().await?;
let request = Request::new(GetBucketInfoRequest {
bucket: bucket.to_string(),
options,

View File

@@ -206,7 +206,7 @@ impl RemoteDisk {
/// Perform basic connectivity check for remote disk
async fn perform_connectivity_check(addr: &str) -> Result<()> {
let url = url::Url::parse(addr).map_err(|e| Error::other(format!("Invalid URL: {}", e)))?;
let url = url::Url::parse(addr).map_err(|e| Error::other(format!("Invalid URL: {e}")))?;
let Some(host) = url.host_str() else {
return Err(Error::other("No host in URL".to_string()));
@@ -220,7 +220,7 @@ impl RemoteDisk {
drop(stream);
Ok(())
}
_ => Err(Error::other(format!("Cannot connect to {}:{}", host, port))),
_ => Err(Error::other(format!("Cannot connect to {host}:{port}"))),
}
}
@@ -260,7 +260,7 @@ impl RemoteDisk {
// Timeout occurred, mark disk as potentially faulty
self.health.decrement_waiting();
warn!("Remote disk operation timeout after {:?}", timeout_duration);
Err(Error::other(format!("Remote disk operation timeout after {:?}", timeout_duration)))
Err(Error::other(format!("Remote disk operation timeout after {timeout_duration:?}")))
}
}
}

View File

@@ -1,403 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use rustfs_protos::{
node_service_time_out_client, node_service_time_out_client_no_auth,
proto_gen::node_service::{GenerallyLockRequest, PingRequest},
};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tonic::Request;
use tracing::info;
use crate::{
error::{LockError, Result},
types::{LockId, LockInfo, LockRequest, LockResponse, LockStats},
};
use super::LockClient;
/// Remote lock client implementation
#[derive(Debug)]
pub struct RemoteClient {
addr: String,
// Track active locks with their original owner information
active_locks: Arc<RwLock<HashMap<LockId, String>>>, // lock_id -> owner
}
impl Clone for RemoteClient {
fn clone(&self) -> Self {
Self {
addr: self.addr.clone(),
active_locks: self.active_locks.clone(),
}
}
}
impl RemoteClient {
pub fn new(endpoint: String) -> Self {
Self {
addr: endpoint,
active_locks: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn from_url(url: url::Url) -> Self {
Self {
addr: url.to_string(),
active_locks: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Create a minimal LockRequest for unlock operations
fn create_unlock_request(&self, lock_id: &LockId, owner: &str) -> LockRequest {
LockRequest {
lock_id: lock_id.clone(),
resource: lock_id.resource.clone(),
lock_type: crate::types::LockType::Exclusive, // Type doesn't matter for unlock
owner: owner.to_string(),
acquire_timeout: std::time::Duration::from_secs(30),
ttl: std::time::Duration::from_secs(300),
metadata: crate::types::LockMetadata::default(),
priority: crate::types::LockPriority::Normal,
deadlock_detection: false,
}
}
}
#[async_trait]
impl LockClient for RemoteClient {
async fn acquire_exclusive(&self, request: &LockRequest) -> Result<LockResponse> {
info!("remote acquire_exclusive for {}", request.resource);
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?;
let req = Request::new(GenerallyLockRequest {
args: serde_json::to_string(&request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?,
});
let resp = client
.lock(req)
.await
.map_err(|e| LockError::internal(e.to_string()))?
.into_inner();
// Check for explicit error first
if let Some(error_info) = resp.error_info {
return Err(LockError::internal(error_info));
}
// Check if the lock acquisition was successful
if resp.success {
// Save the lock information for later release
let mut locks = self.active_locks.write().await;
locks.insert(request.lock_id.clone(), request.owner.clone());
Ok(LockResponse::success(
LockInfo {
id: request.lock_id.clone(),
resource: request.resource.clone(),
lock_type: request.lock_type,
status: crate::types::LockStatus::Acquired,
owner: request.owner.clone(),
acquired_at: std::time::SystemTime::now(),
expires_at: std::time::SystemTime::now() + request.ttl,
last_refreshed: std::time::SystemTime::now(),
metadata: request.metadata.clone(),
priority: request.priority,
wait_start_time: None,
},
std::time::Duration::ZERO,
))
} else {
// Lock acquisition failed
Ok(LockResponse::failure(
"Lock acquisition failed on remote server".to_string(),
std::time::Duration::ZERO,
))
}
}
async fn acquire_shared(&self, request: &LockRequest) -> Result<LockResponse> {
info!("remote acquire_shared for {}", request.resource);
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?;
let req = Request::new(GenerallyLockRequest {
args: serde_json::to_string(&request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?,
});
let resp = client
.r_lock(req)
.await
.map_err(|e| LockError::internal(e.to_string()))?
.into_inner();
// Check for explicit error first
if let Some(error_info) = resp.error_info {
return Err(LockError::internal(error_info));
}
// Check if the lock acquisition was successful
if resp.success {
// Save the lock information for later release
let mut locks = self.active_locks.write().await;
locks.insert(request.lock_id.clone(), request.owner.clone());
Ok(LockResponse::success(
LockInfo {
id: request.lock_id.clone(),
resource: request.resource.clone(),
lock_type: request.lock_type,
status: crate::types::LockStatus::Acquired,
owner: request.owner.clone(),
acquired_at: std::time::SystemTime::now(),
expires_at: std::time::SystemTime::now() + request.ttl,
last_refreshed: std::time::SystemTime::now(),
metadata: request.metadata.clone(),
priority: request.priority,
wait_start_time: None,
},
std::time::Duration::ZERO,
))
} else {
// Lock acquisition failed
Ok(LockResponse::failure(
"Shared lock acquisition failed on remote server".to_string(),
std::time::Duration::ZERO,
))
}
}
async fn release(&self, lock_id: &LockId) -> Result<bool> {
info!("remote release for {}", lock_id);
// Get the original owner for this lock
let owner = {
let locks = self.active_locks.read().await;
locks.get(lock_id).cloned().unwrap_or_else(|| "remote".to_string())
};
let unlock_request = self.create_unlock_request(lock_id, &owner);
let request_string = serde_json::to_string(&unlock_request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?;
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?;
// Try UnLock first (for exclusive locks)
let req = Request::new(GenerallyLockRequest {
args: request_string.clone(),
});
let resp = client.un_lock(req).await;
let success = if resp.is_err() {
// If that fails, try RUnLock (for shared locks)
let req = Request::new(GenerallyLockRequest { args: request_string });
let resp = client
.r_un_lock(req)
.await
.map_err(|e| LockError::internal(e.to_string()))?
.into_inner();
if let Some(error_info) = resp.error_info {
return Err(LockError::internal(error_info));
}
resp.success
} else {
let resp = resp.map_err(|e| LockError::internal(e.to_string()))?.into_inner();
if let Some(error_info) = resp.error_info {
return Err(LockError::internal(error_info));
}
resp.success
};
// Remove the lock from our tracking if successful
if success {
let mut locks = self.active_locks.write().await;
locks.remove(lock_id);
}
Ok(success)
}
async fn refresh(&self, lock_id: &LockId) -> Result<bool> {
info!("remote refresh for {}", lock_id);
let refresh_request = self.create_unlock_request(lock_id, "remote");
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?;
let req = Request::new(GenerallyLockRequest {
args: serde_json::to_string(&refresh_request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?,
});
let resp = client
.refresh(req)
.await
.map_err(|e| LockError::internal(e.to_string()))?
.into_inner();
if let Some(error_info) = resp.error_info {
return Err(LockError::internal(error_info));
}
Ok(resp.success)
}
async fn force_release(&self, lock_id: &LockId) -> Result<bool> {
info!("remote force_release for {}", lock_id);
let force_request = self.create_unlock_request(lock_id, "remote");
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?;
let req = Request::new(GenerallyLockRequest {
args: serde_json::to_string(&force_request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?,
});
let resp = client
.force_un_lock(req)
.await
.map_err(|e| LockError::internal(e.to_string()))?
.into_inner();
if let Some(error_info) = resp.error_info {
return Err(LockError::internal(error_info));
}
Ok(resp.success)
}
async fn check_status(&self, lock_id: &LockId) -> Result<Option<LockInfo>> {
info!("remote check_status for {}", lock_id);
// Since there's no direct status query in the gRPC service,
// we attempt a non-blocking lock acquisition to check if the resource is available
let status_request = self.create_unlock_request(lock_id, "remote");
let mut client = node_service_time_out_client_no_auth(&self.addr)
.await
.map_err(|err| LockError::internal(format!("can not get client, err: {err}")))?;
// Try to acquire a very short-lived lock to test availability
let req = Request::new(GenerallyLockRequest {
args: serde_json::to_string(&status_request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?,
});
// Try exclusive lock first with very short timeout
let resp = client.lock(req).await;
match resp {
Ok(response) => {
let resp = response.into_inner();
if resp.success {
// If we successfully acquired the lock, the resource was free
// Immediately release it
let release_req = Request::new(GenerallyLockRequest {
args: serde_json::to_string(&status_request)
.map_err(|e| LockError::internal(format!("Failed to serialize request: {e}")))?,
});
let _ = client.un_lock(release_req).await; // Best effort release
// Return None since no one was holding the lock
Ok(None)
} else {
// Lock acquisition failed, meaning someone is holding it
// We can't determine the exact details remotely, so return a generic status
Ok(Some(LockInfo {
id: lock_id.clone(),
resource: lock_id.as_str().to_string(),
lock_type: crate::types::LockType::Exclusive, // We can't know the exact type
status: crate::types::LockStatus::Acquired,
owner: "unknown".to_string(), // Remote client can't determine owner
acquired_at: std::time::SystemTime::now(),
expires_at: std::time::SystemTime::now() + std::time::Duration::from_secs(3600),
last_refreshed: std::time::SystemTime::now(),
metadata: crate::types::LockMetadata::default(),
priority: crate::types::LockPriority::Normal,
wait_start_time: None,
}))
}
}
Err(_) => {
// Communication error or lock is held
Ok(Some(LockInfo {
id: lock_id.clone(),
resource: lock_id.as_str().to_string(),
lock_type: crate::types::LockType::Exclusive,
status: crate::types::LockStatus::Acquired,
owner: "unknown".to_string(),
acquired_at: std::time::SystemTime::now(),
expires_at: std::time::SystemTime::now() + std::time::Duration::from_secs(3600),
last_refreshed: std::time::SystemTime::now(),
metadata: crate::types::LockMetadata::default(),
priority: crate::types::LockPriority::Normal,
wait_start_time: None,
}))
}
}
}
async fn get_stats(&self) -> Result<LockStats> {
info!("remote get_stats from {}", self.addr);
// Since there's no direct statistics endpoint in the gRPC service,
// we return basic stats indicating this is a remote client
let stats = LockStats {
last_updated: std::time::SystemTime::now(),
..Default::default()
};
// We could potentially enhance this by:
// 1. Keeping local counters of operations performed
// 2. Adding a stats gRPC method to the service
// 3. Querying server health endpoints
// For now, return minimal stats indicating remote connectivity
Ok(stats)
}
async fn close(&self) -> Result<()> {
Ok(())
}
async fn is_online(&self) -> bool {
// Use Ping interface to test if remote service is online
let mut client = match self.get_client().await {
Ok(client) => client,
Err(_) => {
info!("remote client {} connection failed", self.addr);
return false;
}
};
let ping_req = Request::new(PingRequest {
version: 1,
body: bytes::Bytes::new(),
});
match client.ping(ping_req).await {
Ok(_) => {
info!("remote client {} is online", self.addr);
true
}
Err(_) => {
info!("remote client {} ping failed", self.addr);
false
}
}
}
async fn is_local(&self) -> bool {
false
}
}

View File

@@ -336,7 +336,7 @@ pub async fn init_ftp_system(
let ftps_address_str = rustfs_utils::get_env_str(rustfs_config::ENV_FTPS_ADDRESS, rustfs_config::DEFAULT_FTPS_ADDRESS);
let addr: SocketAddr = ftps_address_str
.parse()
.map_err(|e| format!("Invalid FTPS address '{}': {}", ftps_address_str, e))?;
.map_err(|e| format!("Invalid FTPS address '{ftps_address_str}': {e}"))?;
// Get FTPS configuration from environment variables
let cert_file = rustfs_utils::get_env_opt_str(rustfs_config::ENV_FTPS_CERTS_FILE);
@@ -402,7 +402,7 @@ pub async fn init_sftp_system(
let sftp_address_str = rustfs_utils::get_env_str(rustfs_config::ENV_SFTP_ADDRESS, rustfs_config::DEFAULT_SFTP_ADDRESS);
let addr: SocketAddr = sftp_address_str
.parse()
.map_err(|e| format!("Invalid SFTP address '{}': {}", sftp_address_str, e))?;
.map_err(|e| format!("Invalid SFTP address '{sftp_address_str}': {e}"))?;
// Get SFTP configuration from environment variables
let host_key = rustfs_utils::get_env_opt_str(rustfs_config::ENV_SFTP_HOST_KEY);

View File

@@ -26,7 +26,7 @@ pub enum RustFSError {
impl std::fmt::Display for RustFSError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RustFSError::Cert(msg) => write!(f, "Certificate error: {}", msg),
RustFSError::Cert(msg) => write!(f, "Certificate error: {msg}"),
}
}
}
@@ -78,7 +78,7 @@ fn parse_pem_private_key(pem: &[u8]) -> Result<PrivateKeyDer<'static>, RustFSErr
async fn read_file(path: &PathBuf, desc: &str) -> Result<Vec<u8>, RustFSError> {
tokio::fs::read(path)
.await
.map_err(|e| RustFSError::Cert(format!("read {} {:?}: {e}", desc, path)))
.map_err(|e| RustFSError::Cert(format!("read {desc} {path:?}: {e}")))
}
/// Initialize TLS material for both server and outbound client connections.

View File

@@ -1784,7 +1784,7 @@ impl Node for NodeService {
return Ok(Response::new(GetMetricsResponse {
success: false,
realtime_metrics: Bytes::new(),
error_info: Some(format!("Invalid metric_type: {}", err)),
error_info: Some(format!("Invalid metric_type: {err}")),
}));
}
};
@@ -1798,7 +1798,7 @@ impl Node for NodeService {
return Ok(Response::new(GetMetricsResponse {
success: false,
realtime_metrics: Bytes::new(),
error_info: Some(format!("Invalid opts: {}", err)),
error_info: Some(format!("Invalid opts: {err}")),
}));
}
};