diff --git a/crates/ecstore/src/metrics_realtime.rs b/crates/ecstore/src/metrics_realtime.rs index a5f5f3a3..730a6172 100644 --- a/crates/ecstore/src/metrics_realtime.rs +++ b/crates/ecstore/src/metrics_realtime.rs @@ -23,7 +23,7 @@ use rustfs_common::{ use rustfs_madmin::metrics::{DiskIOStats, DiskMetric, RealtimeMetrics}; use rustfs_utils::os::get_drive_stats; use serde::{Deserialize, Serialize}; -use tracing::info; +use tracing::{debug, info}; use crate::{ admin_server_info::get_local_server_property, @@ -44,7 +44,7 @@ pub struct CollectMetricsOpts { pub struct MetricType(u32); impl MetricType { - // 定义一些常量 + // Define some constants pub const NONE: MetricType = MetricType(0); pub const SCANNER: MetricType = MetricType(1 << 0); pub const DISK: MetricType = MetricType(1 << 1); @@ -70,8 +70,18 @@ impl MetricType { } } +/// Collect local metrics based on the specified types and options. +/// +/// # Arguments +/// +/// * `types` - A `MetricType` specifying which types of metrics to collect. +/// * `opts` - A reference to `CollectMetricsOpts` containing additional options for metric collection. +/// +/// # Returns +/// * A `RealtimeMetrics` struct containing the collected metrics. +/// pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts) -> RealtimeMetrics { - info!("collect_local_metrics"); + debug!("collect_local_metrics"); let mut real_time_metrics = RealtimeMetrics::default(); if types.0 == MetricType::NONE.0 { info!("types is None, return"); @@ -93,13 +103,13 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts) } if types.contains(&MetricType::DISK) { - info!("start get disk metrics"); + debug!("start get disk metrics"); let mut aggr = DiskMetric { collected_at: Utc::now(), ..Default::default() }; for (name, disk) in collect_local_disks_metrics(&opts.disks).await.into_iter() { - info!("got disk metric, name: {name}, metric: {disk:?}"); + debug!("got disk metric, name: {name}, metric: {disk:?}"); real_time_metrics.by_disk.insert(name, disk.clone()); aggr.merge(&disk); } @@ -107,7 +117,7 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts) } if types.contains(&MetricType::SCANNER) { - info!("start get scanner metrics"); + debug!("start get scanner metrics"); let metrics = globalMetrics.report().await; real_time_metrics.aggregated.scanner = Some(metrics); } diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 1074fd09..430c6759 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -323,7 +323,7 @@ impl Operation for ServiceHandle { async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { warn!("handle ServiceHandle"); - return Err(s3_error!(NotImplemented)); + Err(s3_error!(NotImplemented)) } } @@ -367,7 +367,7 @@ impl Operation for InspectDataHandler { async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { warn!("handle InspectDataHandler"); - return Err(s3_error!(NotImplemented)); + Err(s3_error!(NotImplemented)) } } @@ -489,7 +489,7 @@ impl Operation for DataUsageInfoHandler { let mut info_for_refresh = info.clone(); let store_for_refresh = store.clone(); - tokio::spawn(async move { + spawn(async move { if let Err(e) = collect_realtime_data_usage(&mut info_for_refresh, store_for_refresh.clone()).await { warn!("Background data usage refresh failed: {}", e); return; @@ -641,10 +641,7 @@ impl Operation for MetricsHandler { let mp = extract_metrics_init_params(&req.uri); info!("mp: {:?}", mp); - let tick = match parse_duration(&mp.tick) { - Ok(i) => i, - Err(_) => std_Duration::from_secs(1), - }; + let tick = parse_duration(&mp.tick).unwrap_or_else(|_| std_Duration::from_secs(3)); let mut n = mp.n; if n == 0 { @@ -657,28 +654,18 @@ impl Operation for MetricsHandler { MetricType::ALL }; - let disks = mp.disks.split(",").map(String::from).collect::>(); - let by_disk = mp.by_disk == "true"; - let mut disk_map = HashSet::new(); - if !disks.is_empty() && !disks[0].is_empty() { - for d in disks.iter() { - if !d.is_empty() { - disk_map.insert(d.to_string()); - } - } + fn parse_comma_separated(s: &str) -> HashSet { + s.split(',').filter(|part| !part.is_empty()).map(String::from).collect() } + let disks = parse_comma_separated(&mp.disks); + let by_disk = mp.by_disk == "true"; + let disk_map = disks; + let job_id = mp.by_job_id; - let hosts = mp.hosts.split(",").map(String::from).collect::>(); + let hosts = parse_comma_separated(&mp.hosts); let by_host = mp.by_host == "true"; - let mut host_map = HashSet::new(); - if !hosts.is_empty() && !hosts[0].is_empty() { - for d in hosts.iter() { - if !d.is_empty() { - host_map.insert(d.to_string()); - } - } - } + let host_map = hosts; let d_id = mp.by_dep_id; let mut interval = interval(tick); @@ -694,7 +681,7 @@ impl Operation for MetricsHandler { inner: ReceiverStream::new(rx), }); let body = Body::from(in_stream); - tokio::spawn(async move { + spawn(async move { while n > 0 { info!("loop, n: {n}"); let mut m = RealtimeMetrics::default(); @@ -932,7 +919,7 @@ impl Operation for BackgroundHealStatusHandler { async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { warn!("handle BackgroundHealStatusHandler"); - return Err(s3_error!(NotImplemented)); + Err(s3_error!(NotImplemented)) } } @@ -967,7 +954,7 @@ impl Operation for GetReplicationMetricsHandler { } //return Err(s3_error!(InvalidArgument, "Invalid bucket name")); //Ok(S3Response::with_headers((StatusCode::OK, Body::from()), header)) - return Ok(S3Response::new((StatusCode::OK, Body::from("Ok".to_string())))); + Ok(S3Response::new((StatusCode::OK, Body::from("Ok".to_string())))) } } @@ -994,7 +981,7 @@ impl Operation for SetRemoteTargetHandler { }; store - .get_bucket_info(bucket, &rustfs_ecstore::store_api::BucketOptions::default()) + .get_bucket_info(bucket, &BucketOptions::default()) .await .map_err(ApiError::from)?; @@ -1008,7 +995,7 @@ impl Operation for SetRemoteTargetHandler { }; let mut remote_target: BucketTarget = serde_json::from_slice(&body).map_err(|e| { - tracing::error!("Failed to parse BucketTarget from body: {}", e); + error!("Failed to parse BucketTarget from body: {}", e); ApiError::other(e) })?; @@ -1120,10 +1107,7 @@ impl Operation for ListRemoteTargetHandler { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not initialized".to_string())); }; - if let Err(err) = store - .get_bucket_info(bucket, &rustfs_ecstore::store_api::BucketOptions::default()) - .await - { + if let Err(err) = store.get_bucket_info(bucket, &BucketOptions::default()).await { error!("Error fetching bucket info: {:?}", err); return Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::from("Invalid bucket".to_string())))); } @@ -1152,7 +1136,7 @@ impl Operation for ListRemoteTargetHandler { let mut header = HeaderMap::new(); header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); - return Ok(S3Response::with_headers((StatusCode::OK, Body::from(json_targets)), header)); + Ok(S3Response::with_headers((StatusCode::OK, Body::from(json_targets)), header)) } } @@ -1177,10 +1161,7 @@ impl Operation for RemoveRemoteTargetHandler { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not initialized".to_string())); }; - if let Err(err) = store - .get_bucket_info(bucket, &rustfs_ecstore::store_api::BucketOptions::default()) - .await - { + if let Err(err) = store.get_bucket_info(bucket, &BucketOptions::default()).await { error!("Error fetching bucket info: {:?}", err); return Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::from("Invalid bucket".to_string())))); } @@ -1209,7 +1190,7 @@ impl Operation for RemoveRemoteTargetHandler { S3Error::with_message(S3ErrorCode::InternalError, format!("Failed to update bucket targets: {e}")) })?; - return Ok(S3Response::new((StatusCode::NO_CONTENT, Body::from("".to_string())))); + Ok(S3Response::new((StatusCode::NO_CONTENT, Body::from("".to_string())))) } } @@ -1219,9 +1200,7 @@ async fn collect_realtime_data_usage( store: Arc, ) -> Result<(), Box> { // Get bucket list and collect basic statistics - let buckets = store - .list_bucket(&rustfs_ecstore::store_api::BucketOptions::default()) - .await?; + let buckets = store.list_bucket(&BucketOptions::default()).await?; info.buckets_count = buckets.len() as u64; info.last_update = Some(std::time::SystemTime::now()); diff --git a/rustfs/src/admin/handlers/bucket_meta.rs b/rustfs/src/admin/handlers/bucket_meta.rs index f8c4bfd7..1989cf9d 100644 --- a/rustfs/src/admin/handlers/bucket_meta.rs +++ b/rustfs/src/admin/handlers/bucket_meta.rs @@ -137,16 +137,15 @@ impl Operation for ExportBucketMetadata { let conf_path = path_join_buf(&[bucket.name.as_str(), conf]); match conf { BUCKET_POLICY_CONFIG => { - let config: rustfs_policy::policy::BucketPolicy = - match metadata_sys::get_bucket_policy(&bucket.name).await { - Ok((res, _)) => res, - Err(e) => { - if e == StorageError::ConfigNotFound { - continue; - } - return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); + let config: BucketPolicy = match metadata_sys::get_bucket_policy(&bucket.name).await { + Ok((res, _)) => res, + Err(e) => { + if e == StorageError::ConfigNotFound { + continue; } - }; + return Err(s3_error!(InternalError, "get bucket metadata failed: {e}")); + } + }; let config_json = serde_json::to_vec(&config).map_err(|e| s3_error!(InternalError, "serialize config failed: {e}"))?; zip_writer diff --git a/rustfs/src/admin/handlers/event.rs b/rustfs/src/admin/handlers/event.rs index 0501a3fc..8fb4f67f 100644 --- a/rustfs/src/admin/handlers/event.rs +++ b/rustfs/src/admin/handlers/event.rs @@ -98,17 +98,11 @@ async fn validate_queue_dir(queue_dir: &str) -> S3Result<()> { } if let Err(e) = retry_metadata(queue_dir).await { - match e.kind() { - ErrorKind::NotFound => { - return Err(s3_error!(InvalidArgument, "queue_dir does not exist")); - } - ErrorKind::PermissionDenied => { - return Err(s3_error!(InvalidArgument, "queue_dir exists but permission denied")); - } - _ => { - return Err(s3_error!(InvalidArgument, "failed to access queue_dir: {}", e)); - } - } + return match e.kind() { + ErrorKind::NotFound => Err(s3_error!(InvalidArgument, "queue_dir does not exist")), + ErrorKind::PermissionDenied => Err(s3_error!(InvalidArgument, "queue_dir exists but permission denied")), + _ => Err(s3_error!(InvalidArgument, "failed to access queue_dir: {}", e)), + }; } } diff --git a/rustfs/src/admin/handlers/kms.rs b/rustfs/src/admin/handlers/kms.rs index 5065734d..dbe74dbb 100644 --- a/rustfs/src/admin/handlers/kms.rs +++ b/rustfs/src/admin/handlers/kms.rs @@ -153,7 +153,7 @@ impl Operation for CreateKeyHandler { let tags = request.tags.unwrap_or_default(); let key_name = tags.get("name").cloned(); - let kms_request = rustfs_kms::types::CreateKeyRequest { + let kms_request = CreateKeyRequest { key_name, key_usage: request.key_usage.unwrap_or(KeyUsage::EncryptDecrypt), description: request.description, @@ -216,7 +216,7 @@ impl Operation for DescribeKeyHandler { return Err(s3_error!(InternalError, "KMS service not initialized")); }; - let request = rustfs_kms::types::DescribeKeyRequest { key_id: key_id.clone() }; + let request = DescribeKeyRequest { key_id: key_id.clone() }; match service.describe_key(request).await { Ok(response) => { @@ -270,7 +270,7 @@ impl Operation for ListKeysHandler { return Err(s3_error!(InternalError, "KMS service not initialized")); }; - let request = rustfs_kms::types::ListKeysRequest { + let request = ListKeysRequest { limit: Some(limit), marker, status_filter: None, @@ -336,7 +336,7 @@ impl Operation for GenerateDataKeyHandler { return Err(s3_error!(InternalError, "KMS service not initialized")); }; - let kms_request = rustfs_kms::GenerateDataKeyRequest { + let kms_request = GenerateDataKeyRequest { key_id: request.key_id, key_spec: request.key_spec, encryption_context: request.encryption_context.unwrap_or_default(), diff --git a/rustfs/src/admin/handlers/kms_dynamic.rs b/rustfs/src/admin/handlers/kms_dynamic.rs index 57b4e595..5285e7d8 100644 --- a/rustfs/src/admin/handlers/kms_dynamic.rs +++ b/rustfs/src/admin/handlers/kms_dynamic.rs @@ -72,14 +72,11 @@ impl Operation for ConfigureKmsHandler { info!("Configuring KMS with request: {:?}", configure_request); - let service_manager = match get_global_kms_service_manager() { - Some(manager) => manager, - None => { - warn!("KMS service manager not initialized, initializing now as fallback"); - // Initialize the service manager as a fallback - rustfs_kms::init_global_kms_service_manager() - } - }; + let service_manager = get_global_kms_service_manager().unwrap_or_else(|| { + warn!("KMS service manager not initialized, initializing now as fallback"); + // Initialize the service manager as a fallback + rustfs_kms::init_global_kms_service_manager() + }); // Convert request to KmsConfig let kms_config = configure_request.to_kms_config(); @@ -162,14 +159,11 @@ impl Operation for StartKmsHandler { info!("Starting KMS service with force: {:?}", start_request.force); - let service_manager = match get_global_kms_service_manager() { - Some(manager) => manager, - None => { - warn!("KMS service manager not initialized, initializing now as fallback"); - // Initialize the service manager as a fallback - rustfs_kms::init_global_kms_service_manager() - } - }; + let service_manager = get_global_kms_service_manager().unwrap_or_else(|| { + warn!("KMS service manager not initialized, initializing now as fallback"); + // Initialize the service manager as a fallback + rustfs_kms::init_global_kms_service_manager() + }); // Check if already running and force flag let current_status = service_manager.get_status().await; @@ -280,14 +274,11 @@ impl Operation for StopKmsHandler { info!("Stopping KMS service"); - let service_manager = match get_global_kms_service_manager() { - Some(manager) => manager, - None => { - warn!("KMS service manager not initialized, initializing now as fallback"); - // Initialize the service manager as a fallback - rustfs_kms::init_global_kms_service_manager() - } - }; + let service_manager = get_global_kms_service_manager().unwrap_or_else(|| { + warn!("KMS service manager not initialized, initializing now as fallback"); + // Initialize the service manager as a fallback + rustfs_kms::init_global_kms_service_manager() + }); let (success, message, status) = match service_manager.stop().await { Ok(()) => { @@ -348,14 +339,11 @@ impl Operation for GetKmsStatusHandler { info!("Getting KMS service status"); - let service_manager = match get_global_kms_service_manager() { - Some(manager) => manager, - None => { - warn!("KMS service manager not initialized, initializing now as fallback"); - // Initialize the service manager as a fallback - rustfs_kms::init_global_kms_service_manager() - } - }; + let service_manager = get_global_kms_service_manager().unwrap_or_else(|| { + warn!("KMS service manager not initialized, initializing now as fallback"); + // Initialize the service manager as a fallback + rustfs_kms::init_global_kms_service_manager() + }); let status = service_manager.get_status().await; let config = service_manager.get_config().await; @@ -443,14 +431,11 @@ impl Operation for ReconfigureKmsHandler { info!("Reconfiguring KMS with request: {:?}", configure_request); - let service_manager = match get_global_kms_service_manager() { - Some(manager) => manager, - None => { - warn!("KMS service manager not initialized, initializing now as fallback"); - // Initialize the service manager as a fallback - rustfs_kms::init_global_kms_service_manager() - } - }; + let service_manager = get_global_kms_service_manager().unwrap_or_else(|| { + warn!("KMS service manager not initialized, initializing now as fallback"); + // Initialize the service manager as a fallback + rustfs_kms::init_global_kms_service_manager() + }); // Convert request to KmsConfig let kms_config = configure_request.to_kms_config(); diff --git a/rustfs/src/admin/handlers/sts.rs b/rustfs/src/admin/handlers/sts.rs index 90a3a050..385d6482 100644 --- a/rustfs/src/admin/handlers/sts.rs +++ b/rustfs/src/admin/handlers/sts.rs @@ -103,10 +103,10 @@ impl Operation for AssumeRoleHandle { claims.insert( "exp".to_string(), - serde_json::Value::Number(serde_json::Number::from(OffsetDateTime::now_utc().unix_timestamp() + exp as i64)), + Value::Number(serde_json::Number::from(OffsetDateTime::now_utc().unix_timestamp() + exp as i64)), ); - claims.insert("parent".to_string(), serde_json::Value::String(cred.access_key.clone())); + claims.insert("parent".to_string(), Value::String(cred.access_key.clone())); // warn!("AssumeRole get cred {:?}", &user); // warn!("AssumeRole get body {:?}", &body); @@ -176,7 +176,7 @@ pub fn populate_session_policy(claims: &mut HashMap, policy: &str claims.insert( SESSION_POLICY_NAME.to_string(), - serde_json::Value::String(base64_simd::URL_SAFE_NO_PAD.encode_to_string(&policy_buf)), + Value::String(base64_simd::URL_SAFE_NO_PAD.encode_to_string(&policy_buf)), ); } diff --git a/rustfs/src/admin/handlers/tier.rs b/rustfs/src/admin/handlers/tier.rs index 00fd61e6..dad51399 100644 --- a/rustfs/src/admin/handlers/tier.rs +++ b/rustfs/src/admin/handlers/tier.rs @@ -139,35 +139,35 @@ impl Operation for AddTier { let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await; //tier_config_mgr.reload(api); if let Err(err) = tier_config_mgr.add(args, force).await { - if err.code == ERR_TIER_ALREADY_EXISTS.code { - return Err(S3Error::with_message( + return if err.code == ERR_TIER_ALREADY_EXISTS.code { + Err(S3Error::with_message( S3ErrorCode::Custom("TierNameAlreadyExist".into()), "tier name already exists!", - )); + )) } else if err.code == ERR_TIER_NAME_NOT_UPPERCASE.code { - return Err(S3Error::with_message( + Err(S3Error::with_message( S3ErrorCode::Custom("TierNameNotUppercase".into()), "tier name not uppercase!", - )); + )) } else if err.code == ERR_TIER_BACKEND_IN_USE.code { - return Err(S3Error::with_message( + Err(S3Error::with_message( S3ErrorCode::Custom("TierNameBackendInUse!".into()), "tier name backend in use!", - )); + )) } else if err.code == ERR_TIER_CONNECT_ERR.code { - return Err(S3Error::with_message( + Err(S3Error::with_message( S3ErrorCode::Custom("TierConnectError".into()), "tier connect error!", - )); + )) } else if err.code == ERR_TIER_INVALID_CREDENTIALS.code { - return Err(S3Error::with_message(S3ErrorCode::Custom(err.code.clone().into()), err.message.clone())); + Err(S3Error::with_message(S3ErrorCode::Custom(err.code.clone().into()), err.message.clone())) } else { warn!("tier_config_mgr add failed, e: {:?}", err); - return Err(S3Error::with_message( + Err(S3Error::with_message( S3ErrorCode::Custom("TierAddFailed".into()), format!("tier add failed. {err}"), - )); - } + )) + }; } if let Err(e) = tier_config_mgr.save().await { warn!("tier_config_mgr save failed, e: {:?}", e); @@ -223,20 +223,20 @@ impl Operation for EditTier { let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await; //tier_config_mgr.reload(api); if let Err(err) = tier_config_mgr.edit(&tier_name, creds).await { - if err.code == ERR_TIER_NOT_FOUND.code { - return Err(S3Error::with_message(S3ErrorCode::Custom("TierNotFound".into()), "tier not found!")); + return if err.code == ERR_TIER_NOT_FOUND.code { + Err(S3Error::with_message(S3ErrorCode::Custom("TierNotFound".into()), "tier not found!")) } else if err.code == ERR_TIER_MISSING_CREDENTIALS.code { - return Err(S3Error::with_message( + Err(S3Error::with_message( S3ErrorCode::Custom("TierMissingCredentials".into()), "tier missing credentials!", - )); + )) } else { warn!("tier_config_mgr edit failed, e: {:?}", err); - return Err(S3Error::with_message( + Err(S3Error::with_message( S3ErrorCode::Custom("TierEditFailed".into()), format!("tier edit failed. {err}"), - )); - } + )) + }; } if let Err(e) = tier_config_mgr.save().await { warn!("tier_config_mgr save failed, e: {:?}", e); @@ -329,17 +329,17 @@ impl Operation for RemoveTier { let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await; //tier_config_mgr.reload(api); if let Err(err) = tier_config_mgr.remove(&tier_name, force).await { - if err.code == ERR_TIER_NOT_FOUND.code { - return Err(S3Error::with_message(S3ErrorCode::Custom("TierNotFound".into()), "tier not found.")); + return if err.code == ERR_TIER_NOT_FOUND.code { + Err(S3Error::with_message(S3ErrorCode::Custom("TierNotFound".into()), "tier not found.")) } else if err.code == ERR_TIER_BACKEND_NOT_EMPTY.code { - return Err(S3Error::with_message(S3ErrorCode::Custom("TierNameBackendInUse".into()), "tier is used.")); + Err(S3Error::with_message(S3ErrorCode::Custom("TierNameBackendInUse".into()), "tier is used.")) } else { warn!("tier_config_mgr remove failed, e: {:?}", err); - return Err(S3Error::with_message( + Err(S3Error::with_message( S3ErrorCode::Custom("TierRemoveFailed".into()), format!("tier remove failed. {err}"), - )); - } + )) + }; } if let Err(e) = tier_config_mgr.save().await { diff --git a/rustfs/src/admin/handlers/trace.rs b/rustfs/src/admin/handlers/trace.rs index 57b818ca..8b1e0b84 100644 --- a/rustfs/src/admin/handlers/trace.rs +++ b/rustfs/src/admin/handlers/trace.rs @@ -43,10 +43,10 @@ impl Operation for Trace { let _trace_opts = extract_trace_options(&req.uri)?; // let (tx, rx) = mpsc::channel(10000); - let _perrs = match GLOBAL_Endpoints.get() { + let _peers = match GLOBAL_Endpoints.get() { Some(ep) => PeerRestClient::new_clients(ep.clone()).await, None => (Vec::new(), Vec::new()), }; - return Err(s3_error!(NotImplemented)); + Err(s3_error!(NotImplemented)) } } diff --git a/rustfs/src/admin/handlers/user.rs b/rustfs/src/admin/handlers/user.rs index e2a09eab..986c01cf 100644 --- a/rustfs/src/admin/handlers/user.rs +++ b/rustfs/src/admin/handlers/user.rs @@ -28,7 +28,6 @@ use rustfs_madmin::{ user::{ImportIAMResult, SRSessionPolicy, SRSvcAccCreate}, }; use rustfs_policy::policy::action::{Action, AdminAction}; - use rustfs_utils::path::path_join_buf; use s3s::{ Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, diff --git a/rustfs/src/auth.rs b/rustfs/src/auth.rs index 61a4c666..17dcd357 100644 --- a/rustfs/src/auth.rs +++ b/rustfs/src/auth.rs @@ -598,7 +598,7 @@ mod tests { // The struct should be created successfully // We can't easily test internal state without exposing it, // but we can test it doesn't panic on creation - assert_eq!(std::mem::size_of_val(&iam_auth), std::mem::size_of::()); + assert_eq!(size_of_val(&iam_auth), size_of::()); } #[tokio::test] diff --git a/rustfs/src/storage/tonic_service.rs b/rustfs/src/storage/tonic_service.rs index a6f3f8c1..7a9a9b7d 100644 --- a/rustfs/src/storage/tonic_service.rs +++ b/rustfs/src/storage/tonic_service.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashMap, io::Cursor, pin::Pin, sync::Arc}; - -// use common::error::Error as EcsError; +use bytes::Bytes; use futures::Stream; use futures_util::future::join_all; +use rmp_serde::{Deserializer, Serializer}; +use rustfs_common::{globals::GLOBAL_Local_Node_Name, heal_channel::HealOpts}; use rustfs_ecstore::{ admin_server_info::get_local_server_property, bucket::{metadata::load_bucket_metadata, metadata_sys}, @@ -30,11 +30,6 @@ use rustfs_ecstore::{ store::{all_local_disk_path, find_local_disk}, store_api::{BucketOptions, DeleteBucketOptions, MakeBucketOptions, StorageAPI}, }; - -use rustfs_common::{globals::GLOBAL_Local_Node_Name, heal_channel::HealOpts}; - -use bytes::Bytes; -use rmp_serde::{Deserializer, Serializer}; use rustfs_filemeta::{FileInfo, MetacacheReader}; use rustfs_iam::{get_global_iam_sys, store::UserType}; use rustfs_lock::{LockClient, LockRequest}; @@ -47,13 +42,14 @@ use rustfs_protos::{ proto_gen::node_service::{node_service_server::NodeService as Node, *}, }; use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, io::Cursor, pin::Pin, sync::Arc}; use tokio::spawn; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status, Streaming}; use tracing::{debug, error, info, warn}; -type ResponseStream = Pin> + Send>>; +type ResponseStream = Pin> + Send>>; // fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> { // let mut err: &(dyn Error + 'static) = err_status; @@ -123,7 +119,7 @@ impl Node for NodeService { let finished_data = fbb.finished_data(); - Ok(tonic::Response::new(PingResponse { + Ok(Response::new(PingResponse { version: 1, body: Bytes::copy_from_slice(finished_data), })) @@ -135,7 +131,7 @@ impl Node for NodeService { let options = match serde_json::from_str::(&request.options) { Ok(options) => options, Err(err) => { - return Ok(tonic::Response::new(HealBucketResponse { + return Ok(Response::new(HealBucketResponse { success: false, error: Some(DiskError::other(format!("decode HealOpts failed: {err}")).into()), })); @@ -143,12 +139,12 @@ impl Node for NodeService { }; match self.local_peer.heal_bucket(&request.bucket, &options).await { - Ok(_) => Ok(tonic::Response::new(HealBucketResponse { + Ok(_) => Ok(Response::new(HealBucketResponse { success: true, error: None, })), - Err(err) => Ok(tonic::Response::new(HealBucketResponse { + Err(err) => Ok(Response::new(HealBucketResponse { success: false, error: Some(err.into()), })), @@ -162,7 +158,7 @@ impl Node for NodeService { let options = match serde_json::from_str::(&request.options) { Ok(options) => options, Err(err) => { - return Ok(tonic::Response::new(ListBucketResponse { + return Ok(Response::new(ListBucketResponse { success: false, bucket_infos: Vec::new(), error: Some(DiskError::other(format!("decode BucketOptions failed: {err}")).into()), @@ -175,14 +171,14 @@ impl Node for NodeService { .into_iter() .filter_map(|bucket_info| serde_json::to_string(&bucket_info).ok()) .collect(); - Ok(tonic::Response::new(ListBucketResponse { + Ok(Response::new(ListBucketResponse { success: true, bucket_infos, error: None, })) } - Err(err) => Ok(tonic::Response::new(ListBucketResponse { + Err(err) => Ok(Response::new(ListBucketResponse { success: false, bucket_infos: Vec::new(), error: Some(err.into()), @@ -197,18 +193,18 @@ impl Node for NodeService { let options = match serde_json::from_str::(&request.options) { Ok(options) => options, Err(err) => { - return Ok(tonic::Response::new(MakeBucketResponse { + return Ok(Response::new(MakeBucketResponse { success: false, error: Some(DiskError::other(format!("decode MakeBucketOptions failed: {err}")).into()), })); } }; match self.local_peer.make_bucket(&request.name, &options).await { - Ok(_) => Ok(tonic::Response::new(MakeBucketResponse { + Ok(_) => Ok(Response::new(MakeBucketResponse { success: true, error: None, })), - Err(err) => Ok(tonic::Response::new(MakeBucketResponse { + Err(err) => Ok(Response::new(MakeBucketResponse { success: false, error: Some(err.into()), })), @@ -222,7 +218,7 @@ impl Node for NodeService { let options = match serde_json::from_str::(&request.options) { Ok(options) => options, Err(err) => { - return Ok(tonic::Response::new(GetBucketInfoResponse { + return Ok(Response::new(GetBucketInfoResponse { success: false, bucket_info: String::new(), error: Some(DiskError::other(format!("decode BucketOptions failed: {err}")).into()), @@ -234,7 +230,7 @@ impl Node for NodeService { let bucket_info = match serde_json::to_string(&bucket_info) { Ok(bucket_info) => bucket_info, Err(err) => { - return Ok(tonic::Response::new(GetBucketInfoResponse { + return Ok(Response::new(GetBucketInfoResponse { success: false, bucket_info: String::new(), error: Some(DiskError::other(format!("encode data failed: {err}")).into()), @@ -242,7 +238,7 @@ impl Node for NodeService { } }; - Ok(tonic::Response::new(GetBucketInfoResponse { + Ok(Response::new(GetBucketInfoResponse { success: true, bucket_info, error: None, @@ -250,7 +246,7 @@ impl Node for NodeService { } // println!("vuc") - Err(err) => Ok(tonic::Response::new(GetBucketInfoResponse { + Err(err) => Ok(Response::new(GetBucketInfoResponse { success: false, bucket_info: String::new(), error: Some(err.into()), @@ -273,11 +269,11 @@ impl Node for NodeService { ) .await { - Ok(_) => Ok(tonic::Response::new(DeleteBucketResponse { + Ok(_) => Ok(Response::new(DeleteBucketResponse { success: true, error: None, })), - Err(err) => Ok(tonic::Response::new(DeleteBucketResponse { + Err(err) => Ok(Response::new(DeleteBucketResponse { success: false, error: Some(err.into()), })), @@ -290,19 +286,19 @@ impl Node for NodeService { let request = request.into_inner(); if let Some(disk) = self.find_disk(&request.disk).await { match disk.read_all(&request.volume, &request.path).await { - Ok(data) => Ok(tonic::Response::new(ReadAllResponse { + Ok(data) => Ok(Response::new(ReadAllResponse { success: true, data, error: None, })), - Err(err) => Ok(tonic::Response::new(ReadAllResponse { + Err(err) => Ok(Response::new(ReadAllResponse { success: false, data: Bytes::new(), error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(ReadAllResponse { + Ok(Response::new(ReadAllResponse { success: false, data: Bytes::new(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -314,17 +310,17 @@ impl Node for NodeService { let request = request.into_inner(); if let Some(disk) = self.find_disk(&request.disk).await { match disk.write_all(&request.volume, &request.path, request.data).await { - Ok(_) => Ok(tonic::Response::new(WriteAllResponse { + Ok(_) => Ok(Response::new(WriteAllResponse { success: true, error: None, })), - Err(err) => Ok(tonic::Response::new(WriteAllResponse { + Err(err) => Ok(Response::new(WriteAllResponse { success: false, error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(WriteAllResponse { + Ok(Response::new(WriteAllResponse { success: false, error: Some(DiskError::other("can not find disk".to_string()).into()), })) @@ -337,24 +333,24 @@ impl Node for NodeService { let options = match serde_json::from_str::(&request.options) { Ok(options) => options, Err(err) => { - return Ok(tonic::Response::new(DeleteResponse { + return Ok(Response::new(DeleteResponse { success: false, error: Some(DiskError::other(format!("decode DeleteOptions failed: {err}")).into()), })); } }; match disk.delete(&request.volume, &request.path, options).await { - Ok(_) => Ok(tonic::Response::new(DeleteResponse { + Ok(_) => Ok(Response::new(DeleteResponse { success: true, error: None, })), - Err(err) => Ok(tonic::Response::new(DeleteResponse { + Err(err) => Ok(Response::new(DeleteResponse { success: false, error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(DeleteResponse { + Ok(Response::new(DeleteResponse { success: false, error: Some(DiskError::other("can not find disk".to_string()).into()), })) @@ -367,7 +363,7 @@ impl Node for NodeService { let file_info = match serde_json::from_str::(&request.file_info) { Ok(file_info) => file_info, Err(err) => { - return Ok(tonic::Response::new(VerifyFileResponse { + return Ok(Response::new(VerifyFileResponse { success: false, check_parts_resp: "".to_string(), error: Some(DiskError::other(format!("decode FileInfo failed: {err}")).into()), @@ -379,27 +375,27 @@ impl Node for NodeService { let check_parts_resp = match serde_json::to_string(&check_parts_resp) { Ok(check_parts_resp) => check_parts_resp, Err(err) => { - return Ok(tonic::Response::new(VerifyFileResponse { + return Ok(Response::new(VerifyFileResponse { success: false, check_parts_resp: String::new(), error: Some(DiskError::other(format!("encode data failed: {err}")).into()), })); } }; - Ok(tonic::Response::new(VerifyFileResponse { + Ok(Response::new(VerifyFileResponse { success: true, check_parts_resp, error: None, })) } - Err(err) => Ok(tonic::Response::new(VerifyFileResponse { + Err(err) => Ok(Response::new(VerifyFileResponse { success: false, check_parts_resp: "".to_string(), error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(VerifyFileResponse { + Ok(Response::new(VerifyFileResponse { success: false, check_parts_resp: "".to_string(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -414,28 +410,28 @@ impl Node for NodeService { let data = match rmp_serde::to_vec(&data) { Ok(data) => data, Err(err) => { - return Ok(tonic::Response::new(ReadPartsResponse { + return Ok(Response::new(ReadPartsResponse { success: false, object_part_infos: Bytes::new(), error: Some(DiskError::other(format!("encode data failed: {err}")).into()), })); } }; - Ok(tonic::Response::new(ReadPartsResponse { + Ok(Response::new(ReadPartsResponse { success: true, object_part_infos: Bytes::copy_from_slice(&data), error: None, })) } - Err(err) => Ok(tonic::Response::new(ReadPartsResponse { + Err(err) => Ok(Response::new(ReadPartsResponse { success: false, object_part_infos: Bytes::new(), error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(ReadPartsResponse { + Ok(Response::new(ReadPartsResponse { success: false, object_part_infos: Bytes::new(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -448,7 +444,7 @@ impl Node for NodeService { let file_info = match serde_json::from_str::(&request.file_info) { Ok(file_info) => file_info, Err(err) => { - return Ok(tonic::Response::new(CheckPartsResponse { + return Ok(Response::new(CheckPartsResponse { success: false, check_parts_resp: "".to_string(), error: Some(DiskError::other(format!("decode FileInfo failed: {err}")).into()), @@ -460,27 +456,27 @@ impl Node for NodeService { let check_parts_resp = match serde_json::to_string(&check_parts_resp) { Ok(check_parts_resp) => check_parts_resp, Err(err) => { - return Ok(tonic::Response::new(CheckPartsResponse { + return Ok(Response::new(CheckPartsResponse { success: false, check_parts_resp: String::new(), error: Some(DiskError::other(format!("encode data failed: {err}")).into()), })); } }; - Ok(tonic::Response::new(CheckPartsResponse { + Ok(Response::new(CheckPartsResponse { success: true, check_parts_resp, error: None, })) } - Err(err) => Ok(tonic::Response::new(CheckPartsResponse { + Err(err) => Ok(Response::new(CheckPartsResponse { success: false, check_parts_resp: "".to_string(), error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(CheckPartsResponse { + Ok(Response::new(CheckPartsResponse { success: false, check_parts_resp: "".to_string(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -501,17 +497,17 @@ impl Node for NodeService { ) .await { - Ok(_) => Ok(tonic::Response::new(RenamePartResponse { + Ok(_) => Ok(Response::new(RenamePartResponse { success: true, error: None, })), - Err(err) => Ok(tonic::Response::new(RenamePartResponse { + Err(err) => Ok(Response::new(RenamePartResponse { success: false, error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(RenamePartResponse { + Ok(Response::new(RenamePartResponse { success: false, error: Some(DiskError::other("can not find disk".to_string()).into()), })) @@ -525,17 +521,17 @@ impl Node for NodeService { .rename_file(&request.src_volume, &request.src_path, &request.dst_volume, &request.dst_path) .await { - Ok(_) => Ok(tonic::Response::new(RenameFileResponse { + Ok(_) => Ok(Response::new(RenameFileResponse { success: true, error: None, })), - Err(err) => Ok(tonic::Response::new(RenameFileResponse { + Err(err) => Ok(Response::new(RenameFileResponse { success: false, error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(RenameFileResponse { + Ok(Response::new(RenameFileResponse { success: false, error: Some(DiskError::other("can not find disk".to_string()).into()), })) @@ -554,22 +550,22 @@ impl Node for NodeService { // match file_writer { // Ok(mut file_writer) => match file_writer.write(&request.data).await { - // Ok(_) => Ok(tonic::Response::new(WriteResponse { + // Ok(_) => Ok(Response::new(WriteResponse { // success: true, // error: None, // })), - // Err(err) => Ok(tonic::Response::new(WriteResponse { + // Err(err) => Ok(Response::new(WriteResponse { // success: false, // error: Some(err_to_proto_err(&err, &format!("write failed: {}", err))), // })), // }, - // Err(err) => Ok(tonic::Response::new(WriteResponse { + // Err(err) => Ok(Response::new(WriteResponse { // success: false, // error: Some(err_to_proto_err(&err, &format!("get writer failed: {}", err))), // })), // } // } else { - // Ok(tonic::Response::new(WriteResponse { + // Ok(Response::new(WriteResponse { // success: false, // error: Some(err_to_proto_err( // &EcsError::new(StorageError::InvalidArgument(Default::default(), Default::default(), Default::default())), @@ -617,7 +613,7 @@ impl Node for NodeService { // success: false, // error: Some(err_to_proto_err( // &err, - // &format!("get get file writer failed: {}", err), + // &format!("get file writer failed: {}", err), // )), // })) // .await @@ -679,7 +675,7 @@ impl Node for NodeService { // let out_stream = ReceiverStream::new(rx); - // Ok(tonic::Response::new(Box::pin(out_stream))) + // Ok(Response::new(Box::pin(out_stream))) } type ReadAtStream = ResponseStream; @@ -780,26 +776,26 @@ impl Node for NodeService { // let out_stream = ReceiverStream::new(rx); - // Ok(tonic::Response::new(Box::pin(out_stream))) + // Ok(Response::new(Box::pin(out_stream))) } async fn list_dir(&self, request: Request) -> Result, Status> { let request = request.into_inner(); if let Some(disk) = self.find_disk(&request.disk).await { match disk.list_dir("", &request.volume, "", 0).await { - Ok(volumes) => Ok(tonic::Response::new(ListDirResponse { + Ok(volumes) => Ok(Response::new(ListDirResponse { success: true, volumes, error: None, })), - Err(err) => Ok(tonic::Response::new(ListDirResponse { + Err(err) => Ok(Response::new(ListDirResponse { success: false, volumes: Vec::new(), error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(ListDirResponse { + Ok(Response::new(ListDirResponse { success: false, volumes: Vec::new(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -901,7 +897,7 @@ impl Node for NodeService { } let out_stream = ReceiverStream::new(rx); - Ok(tonic::Response::new(Box::pin(out_stream))) + Ok(Response::new(Box::pin(out_stream))) } async fn rename_data(&self, request: Request) -> Result, Status> { @@ -910,7 +906,7 @@ impl Node for NodeService { let file_info = match serde_json::from_str::(&request.file_info) { Ok(file_info) => file_info, Err(err) => { - return Ok(tonic::Response::new(RenameDataResponse { + return Ok(Response::new(RenameDataResponse { success: false, rename_data_resp: String::new(), error: Some(DiskError::other(format!("decode FileInfo failed: {err}")).into()), @@ -925,27 +921,27 @@ impl Node for NodeService { let rename_data_resp = match serde_json::to_string(&rename_data_resp) { Ok(file_info) => file_info, Err(err) => { - return Ok(tonic::Response::new(RenameDataResponse { + return Ok(Response::new(RenameDataResponse { success: false, rename_data_resp: String::new(), error: Some(DiskError::other(format!("encode data failed: {err}")).into()), })); } }; - Ok(tonic::Response::new(RenameDataResponse { + Ok(Response::new(RenameDataResponse { success: true, rename_data_resp, error: None, })) } - Err(err) => Ok(tonic::Response::new(RenameDataResponse { + Err(err) => Ok(Response::new(RenameDataResponse { success: false, rename_data_resp: String::new(), error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(RenameDataResponse { + Ok(Response::new(RenameDataResponse { success: false, rename_data_resp: String::new(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -957,17 +953,17 @@ impl Node for NodeService { let request = request.into_inner(); if let Some(disk) = self.find_disk(&request.disk).await { match disk.make_volumes(request.volumes.iter().map(|s| &**s).collect()).await { - Ok(_) => Ok(tonic::Response::new(MakeVolumesResponse { + Ok(_) => Ok(Response::new(MakeVolumesResponse { success: true, error: None, })), - Err(err) => Ok(tonic::Response::new(MakeVolumesResponse { + Err(err) => Ok(Response::new(MakeVolumesResponse { success: false, error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(MakeVolumesResponse { + Ok(Response::new(MakeVolumesResponse { success: false, error: Some(DiskError::other("can not find disk".to_string()).into()), })) @@ -978,17 +974,17 @@ impl Node for NodeService { let request = request.into_inner(); if let Some(disk) = self.find_disk(&request.disk).await { match disk.make_volume(&request.volume).await { - Ok(_) => Ok(tonic::Response::new(MakeVolumeResponse { + Ok(_) => Ok(Response::new(MakeVolumeResponse { success: true, error: None, })), - Err(err) => Ok(tonic::Response::new(MakeVolumeResponse { + Err(err) => Ok(Response::new(MakeVolumeResponse { success: false, error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(MakeVolumeResponse { + Ok(Response::new(MakeVolumeResponse { success: false, error: Some(DiskError::other("can not find disk".to_string()).into()), })) @@ -1004,20 +1000,20 @@ impl Node for NodeService { .into_iter() .filter_map(|volume_info| serde_json::to_string(&volume_info).ok()) .collect(); - Ok(tonic::Response::new(ListVolumesResponse { + Ok(Response::new(ListVolumesResponse { success: true, volume_infos, error: None, })) } - Err(err) => Ok(tonic::Response::new(ListVolumesResponse { + Err(err) => Ok(Response::new(ListVolumesResponse { success: false, volume_infos: Vec::new(), error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(ListVolumesResponse { + Ok(Response::new(ListVolumesResponse { success: false, volume_infos: Vec::new(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -1030,25 +1026,25 @@ impl Node for NodeService { if let Some(disk) = self.find_disk(&request.disk).await { match disk.stat_volume(&request.volume).await { Ok(volume_info) => match serde_json::to_string(&volume_info) { - Ok(volume_info) => Ok(tonic::Response::new(StatVolumeResponse { + Ok(volume_info) => Ok(Response::new(StatVolumeResponse { success: true, volume_info, error: None, })), - Err(err) => Ok(tonic::Response::new(StatVolumeResponse { + Err(err) => Ok(Response::new(StatVolumeResponse { success: false, volume_info: String::new(), error: Some(DiskError::other(format!("encode data failed: {err}")).into()), })), }, - Err(err) => Ok(tonic::Response::new(StatVolumeResponse { + Err(err) => Ok(Response::new(StatVolumeResponse { success: false, volume_info: String::new(), error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(StatVolumeResponse { + Ok(Response::new(StatVolumeResponse { success: false, volume_info: String::new(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -1060,17 +1056,17 @@ impl Node for NodeService { let request = request.into_inner(); if let Some(disk) = self.find_disk(&request.disk).await { match disk.delete_paths(&request.volume, &request.paths).await { - Ok(_) => Ok(tonic::Response::new(DeletePathsResponse { + Ok(_) => Ok(Response::new(DeletePathsResponse { success: true, error: None, })), - Err(err) => Ok(tonic::Response::new(DeletePathsResponse { + Err(err) => Ok(Response::new(DeletePathsResponse { success: false, error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(DeletePathsResponse { + Ok(Response::new(DeletePathsResponse { success: false, error: Some(DiskError::other("can not find disk".to_string()).into()), })) @@ -1083,7 +1079,7 @@ impl Node for NodeService { let file_info = match serde_json::from_str::(&request.file_info) { Ok(file_info) => file_info, Err(err) => { - return Ok(tonic::Response::new(UpdateMetadataResponse { + return Ok(Response::new(UpdateMetadataResponse { success: false, error: Some(DiskError::other(format!("decode FileInfo failed: {err}")).into()), })); @@ -1092,7 +1088,7 @@ impl Node for NodeService { let opts = match serde_json::from_str::(&request.opts) { Ok(opts) => opts, Err(err) => { - return Ok(tonic::Response::new(UpdateMetadataResponse { + return Ok(Response::new(UpdateMetadataResponse { success: false, error: Some(DiskError::other(format!("decode UpdateMetadataOpts failed: {err}")).into()), })); @@ -1100,17 +1096,17 @@ impl Node for NodeService { }; match disk.update_metadata(&request.volume, &request.path, file_info, &opts).await { - Ok(_) => Ok(tonic::Response::new(UpdateMetadataResponse { + Ok(_) => Ok(Response::new(UpdateMetadataResponse { success: true, error: None, })), - Err(err) => Ok(tonic::Response::new(UpdateMetadataResponse { + Err(err) => Ok(Response::new(UpdateMetadataResponse { success: false, error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(UpdateMetadataResponse { + Ok(Response::new(UpdateMetadataResponse { success: false, error: Some(DiskError::other("can not find disk".to_string()).into()), })) @@ -1123,24 +1119,24 @@ impl Node for NodeService { let file_info = match serde_json::from_str::(&request.file_info) { Ok(file_info) => file_info, Err(err) => { - return Ok(tonic::Response::new(WriteMetadataResponse { + return Ok(Response::new(WriteMetadataResponse { success: false, error: Some(DiskError::other(format!("decode FileInfo failed: {err}")).into()), })); } }; match disk.write_metadata("", &request.volume, &request.path, file_info).await { - Ok(_) => Ok(tonic::Response::new(WriteMetadataResponse { + Ok(_) => Ok(Response::new(WriteMetadataResponse { success: true, error: None, })), - Err(err) => Ok(tonic::Response::new(WriteMetadataResponse { + Err(err) => Ok(Response::new(WriteMetadataResponse { success: false, error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(WriteMetadataResponse { + Ok(Response::new(WriteMetadataResponse { success: false, error: Some(DiskError::other("can not find disk".to_string()).into()), })) @@ -1153,7 +1149,7 @@ impl Node for NodeService { let opts = match serde_json::from_str::(&request.opts) { Ok(options) => options, Err(err) => { - return Ok(tonic::Response::new(ReadVersionResponse { + return Ok(Response::new(ReadVersionResponse { success: false, file_info: String::new(), error: Some(DiskError::other(format!("decode ReadOptions failed: {err}")).into()), @@ -1165,25 +1161,25 @@ impl Node for NodeService { .await { Ok(file_info) => match serde_json::to_string(&file_info) { - Ok(file_info) => Ok(tonic::Response::new(ReadVersionResponse { + Ok(file_info) => Ok(Response::new(ReadVersionResponse { success: true, file_info, error: None, })), - Err(err) => Ok(tonic::Response::new(ReadVersionResponse { + Err(err) => Ok(Response::new(ReadVersionResponse { success: false, file_info: String::new(), error: Some(DiskError::other(format!("encode data failed: {err}")).into()), })), }, - Err(err) => Ok(tonic::Response::new(ReadVersionResponse { + Err(err) => Ok(Response::new(ReadVersionResponse { success: false, file_info: String::new(), error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(ReadVersionResponse { + Ok(Response::new(ReadVersionResponse { success: false, file_info: String::new(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -1196,25 +1192,25 @@ impl Node for NodeService { if let Some(disk) = self.find_disk(&request.disk).await { match disk.read_xl(&request.volume, &request.path, request.read_data).await { Ok(raw_file_info) => match serde_json::to_string(&raw_file_info) { - Ok(raw_file_info) => Ok(tonic::Response::new(ReadXlResponse { + Ok(raw_file_info) => Ok(Response::new(ReadXlResponse { success: true, raw_file_info, error: None, })), - Err(err) => Ok(tonic::Response::new(ReadXlResponse { + Err(err) => Ok(Response::new(ReadXlResponse { success: false, raw_file_info: String::new(), error: Some(DiskError::other(format!("encode data failed: {err}")).into()), })), }, - Err(err) => Ok(tonic::Response::new(ReadXlResponse { + Err(err) => Ok(Response::new(ReadXlResponse { success: false, raw_file_info: String::new(), error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(ReadXlResponse { + Ok(Response::new(ReadXlResponse { success: false, raw_file_info: String::new(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -1228,7 +1224,7 @@ impl Node for NodeService { let file_info = match serde_json::from_str::(&request.file_info) { Ok(file_info) => file_info, Err(err) => { - return Ok(tonic::Response::new(DeleteVersionResponse { + return Ok(Response::new(DeleteVersionResponse { success: false, raw_file_info: "".to_string(), error: Some(DiskError::other(format!("decode FileInfo failed: {err}")).into()), @@ -1238,7 +1234,7 @@ impl Node for NodeService { let opts = match serde_json::from_str::(&request.opts) { Ok(opts) => opts, Err(err) => { - return Ok(tonic::Response::new(DeleteVersionResponse { + return Ok(Response::new(DeleteVersionResponse { success: false, raw_file_info: "".to_string(), error: Some(DiskError::other(format!("decode DeleteOptions failed: {err}")).into()), @@ -1250,25 +1246,25 @@ impl Node for NodeService { .await { Ok(raw_file_info) => match serde_json::to_string(&raw_file_info) { - Ok(raw_file_info) => Ok(tonic::Response::new(DeleteVersionResponse { + Ok(raw_file_info) => Ok(Response::new(DeleteVersionResponse { success: true, raw_file_info, error: None, })), - Err(err) => Ok(tonic::Response::new(DeleteVersionResponse { + Err(err) => Ok(Response::new(DeleteVersionResponse { success: false, raw_file_info: "".to_string(), error: Some(DiskError::other(format!("encode data failed: {err}")).into()), })), }, - Err(err) => Ok(tonic::Response::new(DeleteVersionResponse { + Err(err) => Ok(Response::new(DeleteVersionResponse { success: false, raw_file_info: "".to_string(), error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(DeleteVersionResponse { + Ok(Response::new(DeleteVersionResponse { success: false, raw_file_info: "".to_string(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -1284,7 +1280,7 @@ impl Node for NodeService { match serde_json::from_str::(version) { Ok(version) => versions.push(version), Err(err) => { - return Ok(tonic::Response::new(DeleteVersionsResponse { + return Ok(Response::new(DeleteVersionsResponse { success: false, errors: Vec::new(), error: Some(DiskError::other(format!("decode FileInfoVersions failed: {err}")).into()), @@ -1295,7 +1291,7 @@ impl Node for NodeService { let opts = match serde_json::from_str::(&request.opts) { Ok(opts) => opts, Err(err) => { - return Ok(tonic::Response::new(DeleteVersionsResponse { + return Ok(Response::new(DeleteVersionsResponse { success: false, errors: Vec::new(), error: Some(DiskError::other(format!("decode DeleteOptions failed: {err}")).into()), @@ -1313,13 +1309,13 @@ impl Node for NodeService { }) .collect(); - Ok(tonic::Response::new(DeleteVersionsResponse { + Ok(Response::new(DeleteVersionsResponse { success: true, errors, error: None, })) } else { - Ok(tonic::Response::new(DeleteVersionsResponse { + Ok(Response::new(DeleteVersionsResponse { success: false, errors: Vec::new(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -1333,7 +1329,7 @@ impl Node for NodeService { let read_multiple_req = match serde_json::from_str::(&request.read_multiple_req) { Ok(read_multiple_req) => read_multiple_req, Err(err) => { - return Ok(tonic::Response::new(ReadMultipleResponse { + return Ok(Response::new(ReadMultipleResponse { success: false, read_multiple_resps: Vec::new(), error: Some(DiskError::other(format!("decode ReadMultipleReq failed: {err}")).into()), @@ -1347,20 +1343,20 @@ impl Node for NodeService { .filter_map(|read_multiple_resp| serde_json::to_string(&read_multiple_resp).ok()) .collect(); - Ok(tonic::Response::new(ReadMultipleResponse { + Ok(Response::new(ReadMultipleResponse { success: true, read_multiple_resps, error: None, })) } - Err(err) => Ok(tonic::Response::new(ReadMultipleResponse { + Err(err) => Ok(Response::new(ReadMultipleResponse { success: false, read_multiple_resps: Vec::new(), error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(ReadMultipleResponse { + Ok(Response::new(ReadMultipleResponse { success: false, read_multiple_resps: Vec::new(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -1372,17 +1368,17 @@ impl Node for NodeService { let request = request.into_inner(); if let Some(disk) = self.find_disk(&request.disk).await { match disk.delete_volume(&request.volume).await { - Ok(_) => Ok(tonic::Response::new(DeleteVolumeResponse { + Ok(_) => Ok(Response::new(DeleteVolumeResponse { success: true, error: None, })), - Err(err) => Ok(tonic::Response::new(DeleteVolumeResponse { + Err(err) => Ok(Response::new(DeleteVolumeResponse { success: false, error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(DeleteVolumeResponse { + Ok(Response::new(DeleteVolumeResponse { success: false, error: Some(DiskError::other("can not find disk".to_string()).into()), })) @@ -1395,7 +1391,7 @@ impl Node for NodeService { let opts = match serde_json::from_str::(&request.opts) { Ok(opts) => opts, Err(err) => { - return Ok(tonic::Response::new(DiskInfoResponse { + return Ok(Response::new(DiskInfoResponse { success: false, disk_info: "".to_string(), error: Some(DiskError::other(format!("decode DiskInfoOptions failed: {err}")).into()), @@ -1404,25 +1400,25 @@ impl Node for NodeService { }; match disk.disk_info(&opts).await { Ok(disk_info) => match serde_json::to_string(&disk_info) { - Ok(disk_info) => Ok(tonic::Response::new(DiskInfoResponse { + Ok(disk_info) => Ok(Response::new(DiskInfoResponse { success: true, disk_info, error: None, })), - Err(err) => Ok(tonic::Response::new(DiskInfoResponse { + Err(err) => Ok(Response::new(DiskInfoResponse { success: false, disk_info: "".to_string(), error: Some(DiskError::other(format!("encode data failed: {err}")).into()), })), }, - Err(err) => Ok(tonic::Response::new(DiskInfoResponse { + Err(err) => Ok(Response::new(DiskInfoResponse { success: false, disk_info: "".to_string(), error: Some(err.into()), })), } } else { - Ok(tonic::Response::new(DiskInfoResponse { + Ok(Response::new(DiskInfoResponse { success: false, disk_info: "".to_string(), error: Some(DiskError::other("can not find disk".to_string()).into()), @@ -1436,7 +1432,7 @@ impl Node for NodeService { let args: LockRequest = match serde_json::from_str(&request.args) { Ok(args) => args, Err(err) => { - return Ok(tonic::Response::new(GenerallyLockResponse { + return Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!("can not decode args, err: {err}")), })); @@ -1444,11 +1440,11 @@ impl Node for NodeService { }; match self.lock_manager.acquire_exclusive(&args).await { - Ok(result) => Ok(tonic::Response::new(GenerallyLockResponse { + Ok(result) => Ok(Response::new(GenerallyLockResponse { success: result.success, error_info: None, })), - Err(err) => Ok(tonic::Response::new(GenerallyLockResponse { + Err(err) => Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!( "can not lock, resource: {0}, owner: {1}, err: {2}", @@ -1463,7 +1459,7 @@ impl Node for NodeService { let args: LockRequest = match serde_json::from_str(&request.args) { Ok(args) => args, Err(err) => { - return Ok(tonic::Response::new(GenerallyLockResponse { + return Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!("can not decode args, err: {err}")), })); @@ -1471,11 +1467,11 @@ impl Node for NodeService { }; match self.lock_manager.release(&args.lock_id).await { - Ok(_) => Ok(tonic::Response::new(GenerallyLockResponse { + Ok(_) => Ok(Response::new(GenerallyLockResponse { success: true, error_info: None, })), - Err(err) => Ok(tonic::Response::new(GenerallyLockResponse { + Err(err) => Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!( "can not unlock, resource: {0}, owner: {1}, err: {2}", @@ -1490,7 +1486,7 @@ impl Node for NodeService { let args: LockRequest = match serde_json::from_str(&request.args) { Ok(args) => args, Err(err) => { - return Ok(tonic::Response::new(GenerallyLockResponse { + return Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!("can not decode args, err: {err}")), })); @@ -1498,11 +1494,11 @@ impl Node for NodeService { }; match self.lock_manager.acquire_shared(&args).await { - Ok(result) => Ok(tonic::Response::new(GenerallyLockResponse { + Ok(result) => Ok(Response::new(GenerallyLockResponse { success: result.success, error_info: None, })), - Err(err) => Ok(tonic::Response::new(GenerallyLockResponse { + Err(err) => Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!( "can not rlock, resource: {0}, owner: {1}, err: {2}", @@ -1517,7 +1513,7 @@ impl Node for NodeService { let args: LockRequest = match serde_json::from_str(&request.args) { Ok(args) => args, Err(err) => { - return Ok(tonic::Response::new(GenerallyLockResponse { + return Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!("can not decode args, err: {err}")), })); @@ -1525,11 +1521,11 @@ impl Node for NodeService { }; match self.lock_manager.release(&args.lock_id).await { - Ok(_) => Ok(tonic::Response::new(GenerallyLockResponse { + Ok(_) => Ok(Response::new(GenerallyLockResponse { success: true, error_info: None, })), - Err(err) => Ok(tonic::Response::new(GenerallyLockResponse { + Err(err) => Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!( "can not runlock, resource: {0}, owner: {1}, err: {2}", @@ -1544,7 +1540,7 @@ impl Node for NodeService { let args: LockRequest = match serde_json::from_str(&request.args) { Ok(args) => args, Err(err) => { - return Ok(tonic::Response::new(GenerallyLockResponse { + return Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!("can not decode args, err: {err}")), })); @@ -1552,11 +1548,11 @@ impl Node for NodeService { }; match self.lock_manager.release(&args.lock_id).await { - Ok(_) => Ok(tonic::Response::new(GenerallyLockResponse { + Ok(_) => Ok(Response::new(GenerallyLockResponse { success: true, error_info: None, })), - Err(err) => Ok(tonic::Response::new(GenerallyLockResponse { + Err(err) => Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!( "can not force_unlock, resource: {0}, owner: {1}, err: {2}", @@ -1571,14 +1567,14 @@ impl Node for NodeService { let _args: LockRequest = match serde_json::from_str(&request.args) { Ok(args) => args, Err(err) => { - return Ok(tonic::Response::new(GenerallyLockResponse { + return Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!("can not decode args, err: {err}")), })); } }; - Ok(tonic::Response::new(GenerallyLockResponse { + Ok(Response::new(GenerallyLockResponse { success: true, error_info: None, })) @@ -1591,7 +1587,7 @@ impl Node for NodeService { // let request = request.into_inner(); let Some(store) = new_object_layer_fn() else { - return Ok(tonic::Response::new(LocalStorageInfoResponse { + return Ok(Response::new(LocalStorageInfoResponse { success: false, storage_info: Bytes::new(), error_info: Some("errServerNotInitialized".to_string()), @@ -1601,14 +1597,14 @@ impl Node for NodeService { let info = store.local_storage_info().await; let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(tonic::Response::new(LocalStorageInfoResponse { + return Ok(Response::new(LocalStorageInfoResponse { success: false, storage_info: Bytes::new(), error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(LocalStorageInfoResponse { + Ok(Response::new(LocalStorageInfoResponse { success: true, storage_info: buf.into(), error_info: None, @@ -1619,13 +1615,13 @@ impl Node for NodeService { let info = get_local_server_property().await; let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(tonic::Response::new(ServerInfoResponse { + return Ok(Response::new(ServerInfoResponse { success: false, server_properties: Bytes::new(), error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(ServerInfoResponse { + Ok(Response::new(ServerInfoResponse { success: true, server_properties: buf.into(), error_info: None, @@ -1636,13 +1632,13 @@ impl Node for NodeService { let info = get_cpus(); let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(tonic::Response::new(GetCpusResponse { + return Ok(Response::new(GetCpusResponse { success: false, cpus: Bytes::new(), error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(GetCpusResponse { + Ok(Response::new(GetCpusResponse { success: true, cpus: buf.into(), error_info: None, @@ -1654,13 +1650,13 @@ impl Node for NodeService { let info = get_net_info(&addr, ""); let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(tonic::Response::new(GetNetInfoResponse { + return Ok(Response::new(GetNetInfoResponse { success: false, net_info: Bytes::new(), error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(GetNetInfoResponse { + Ok(Response::new(GetNetInfoResponse { success: true, net_info: buf.into(), error_info: None, @@ -1671,13 +1667,13 @@ impl Node for NodeService { let partitions = get_partitions(); let mut buf = Vec::new(); if let Err(err) = partitions.serialize(&mut Serializer::new(&mut buf)) { - return Ok(tonic::Response::new(GetPartitionsResponse { + return Ok(Response::new(GetPartitionsResponse { success: false, partitions: Bytes::new(), error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(GetPartitionsResponse { + Ok(Response::new(GetPartitionsResponse { success: true, partitions: buf.into(), error_info: None, @@ -1688,13 +1684,13 @@ impl Node for NodeService { let os_info = get_os_info(); let mut buf = Vec::new(); if let Err(err) = os_info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(tonic::Response::new(GetOsInfoResponse { + return Ok(Response::new(GetOsInfoResponse { success: false, os_info: Bytes::new(), error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(GetOsInfoResponse { + Ok(Response::new(GetOsInfoResponse { success: true, os_info: buf.into(), error_info: None, @@ -1709,13 +1705,13 @@ impl Node for NodeService { let info = get_sys_services(&addr); let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(tonic::Response::new(GetSeLinuxInfoResponse { + return Ok(Response::new(GetSeLinuxInfoResponse { success: false, sys_services: Bytes::new(), error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(GetSeLinuxInfoResponse { + Ok(Response::new(GetSeLinuxInfoResponse { success: true, sys_services: buf.into(), error_info: None, @@ -1727,13 +1723,13 @@ impl Node for NodeService { let info = get_sys_config(&addr); let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(tonic::Response::new(GetSysConfigResponse { + return Ok(Response::new(GetSysConfigResponse { success: false, sys_config: Bytes::new(), error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(GetSysConfigResponse { + Ok(Response::new(GetSysConfigResponse { success: true, sys_config: buf.into(), error_info: None, @@ -1745,13 +1741,13 @@ impl Node for NodeService { let info = get_sys_errors(&addr); let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(tonic::Response::new(GetSysErrorsResponse { + return Ok(Response::new(GetSysErrorsResponse { success: false, sys_errors: Bytes::new(), error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(GetSysErrorsResponse { + Ok(Response::new(GetSysErrorsResponse { success: true, sys_errors: buf.into(), error_info: None, @@ -1763,13 +1759,13 @@ impl Node for NodeService { let info = get_mem_info(&addr); let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(tonic::Response::new(GetMemInfoResponse { + return Ok(Response::new(GetMemInfoResponse { success: false, mem_info: Bytes::new(), error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(GetMemInfoResponse { + Ok(Response::new(GetMemInfoResponse { success: true, mem_info: buf.into(), error_info: None, @@ -1788,13 +1784,13 @@ impl Node for NodeService { let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(tonic::Response::new(GetMetricsResponse { + return Ok(Response::new(GetMetricsResponse { success: false, realtime_metrics: Bytes::new(), error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(GetMetricsResponse { + Ok(Response::new(GetMetricsResponse { success: true, realtime_metrics: buf.into(), error_info: None, @@ -1806,13 +1802,13 @@ impl Node for NodeService { let info = get_proc_info(&addr); let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(tonic::Response::new(GetProcInfoResponse { + return Ok(Response::new(GetProcInfoResponse { success: false, proc_info: Bytes::new(), error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(GetProcInfoResponse { + Ok(Response::new(GetProcInfoResponse { success: true, proc_info: buf.into(), error_info: None, @@ -1861,14 +1857,14 @@ impl Node for NodeService { let request = request.into_inner(); let bucket = request.bucket; if bucket.is_empty() { - return Ok(tonic::Response::new(LoadBucketMetadataResponse { + return Ok(Response::new(LoadBucketMetadataResponse { success: false, error_info: Some("bucket name is missing".to_string()), })); } let Some(store) = new_object_layer_fn() else { - return Ok(tonic::Response::new(LoadBucketMetadataResponse { + return Ok(Response::new(LoadBucketMetadataResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); @@ -1877,17 +1873,17 @@ impl Node for NodeService { match load_bucket_metadata(store, &bucket).await { Ok(meta) => { if let Err(err) = metadata_sys::set_bucket_metadata(bucket, meta).await { - return Ok(tonic::Response::new(LoadBucketMetadataResponse { + return Ok(Response::new(LoadBucketMetadataResponse { success: false, error_info: Some(err.to_string()), })); }; - Ok(tonic::Response::new(LoadBucketMetadataResponse { + Ok(Response::new(LoadBucketMetadataResponse { success: true, error_info: None, })) } - Err(err) => Ok(tonic::Response::new(LoadBucketMetadataResponse { + Err(err) => Ok(Response::new(LoadBucketMetadataResponse { success: false, error_info: Some(err.to_string()), })), @@ -1902,7 +1898,7 @@ impl Node for NodeService { let _bucket = request.bucket; //todo - Ok(tonic::Response::new(DeleteBucketMetadataResponse { + Ok(Response::new(DeleteBucketMetadataResponse { success: true, error_info: None, })) @@ -1912,14 +1908,14 @@ impl Node for NodeService { let request = request.into_inner(); let policy = request.policy_name; if policy.is_empty() { - return Ok(tonic::Response::new(DeletePolicyResponse { + return Ok(Response::new(DeletePolicyResponse { success: false, error_info: Some("policy name is missing".to_string()), })); } let Some(iam_sys) = get_global_iam_sys() else { - return Ok(tonic::Response::new(DeletePolicyResponse { + return Ok(Response::new(DeletePolicyResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); @@ -1927,12 +1923,12 @@ impl Node for NodeService { let resp = iam_sys.delete_policy(&policy, false).await; if let Err(err) = resp { - return Ok(tonic::Response::new(DeletePolicyResponse { + return Ok(Response::new(DeletePolicyResponse { success: false, error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(DeletePolicyResponse { + Ok(Response::new(DeletePolicyResponse { success: true, error_info: None, })) @@ -1942,13 +1938,13 @@ impl Node for NodeService { let request = request.into_inner(); let policy = request.policy_name; if policy.is_empty() { - return Ok(tonic::Response::new(LoadPolicyResponse { + return Ok(Response::new(LoadPolicyResponse { success: false, error_info: Some("policy name is missing".to_string()), })); } let Some(iam_sys) = get_global_iam_sys() else { - return Ok(tonic::Response::new(LoadPolicyResponse { + return Ok(Response::new(LoadPolicyResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); @@ -1956,12 +1952,12 @@ impl Node for NodeService { let resp = iam_sys.load_policy(&policy).await; if let Err(err) = resp { - return Ok(tonic::Response::new(LoadPolicyResponse { + return Ok(Response::new(LoadPolicyResponse { success: false, error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(LoadPolicyResponse { + Ok(Response::new(LoadPolicyResponse { success: true, error_info: None, })) @@ -1974,32 +1970,32 @@ impl Node for NodeService { let request = request.into_inner(); let user_or_group = request.user_or_group; if user_or_group.is_empty() { - return Ok(tonic::Response::new(LoadPolicyMappingResponse { + return Ok(Response::new(LoadPolicyMappingResponse { success: false, error_info: Some("user_or_group name is missing".to_string()), })); } let Some(user_type) = UserType::from_u64(request.user_type) else { - return Ok(tonic::Response::new(LoadPolicyMappingResponse { + return Ok(Response::new(LoadPolicyMappingResponse { success: false, error_info: Some("invalid user type".to_string()), })); }; let is_group = request.is_group; let Some(iam_sys) = get_global_iam_sys() else { - return Ok(tonic::Response::new(LoadPolicyMappingResponse { + return Ok(Response::new(LoadPolicyMappingResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); }; let resp = iam_sys.load_policy_mapping(&user_or_group, user_type, is_group).await; if let Err(err) = resp { - return Ok(tonic::Response::new(LoadPolicyMappingResponse { + return Ok(Response::new(LoadPolicyMappingResponse { success: false, error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(LoadPolicyMappingResponse { + Ok(Response::new(LoadPolicyMappingResponse { success: true, error_info: None, })) @@ -2009,13 +2005,13 @@ impl Node for NodeService { let request = request.into_inner(); let access_key = request.access_key; if access_key.is_empty() { - return Ok(tonic::Response::new(DeleteUserResponse { + return Ok(Response::new(DeleteUserResponse { success: false, error_info: Some("access_key name is missing".to_string()), })); } let Some(iam_sys) = get_global_iam_sys() else { - return Ok(tonic::Response::new(DeleteUserResponse { + return Ok(Response::new(DeleteUserResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); @@ -2023,12 +2019,12 @@ impl Node for NodeService { let resp = iam_sys.delete_user(&access_key, false).await; if let Err(err) = resp { - return Ok(tonic::Response::new(DeleteUserResponse { + return Ok(Response::new(DeleteUserResponse { success: false, error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(DeleteUserResponse { + Ok(Response::new(DeleteUserResponse { success: true, error_info: None, })) @@ -2041,25 +2037,25 @@ impl Node for NodeService { let request = request.into_inner(); let access_key = request.access_key; if access_key.is_empty() { - return Ok(tonic::Response::new(DeleteServiceAccountResponse { + return Ok(Response::new(DeleteServiceAccountResponse { success: false, error_info: Some("access_key name is missing".to_string()), })); } let Some(iam_sys) = get_global_iam_sys() else { - return Ok(tonic::Response::new(DeleteServiceAccountResponse { + return Ok(Response::new(DeleteServiceAccountResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); }; let resp = iam_sys.delete_service_account(&access_key, false).await; if let Err(err) = resp { - return Ok(tonic::Response::new(DeleteServiceAccountResponse { + return Ok(Response::new(DeleteServiceAccountResponse { success: false, error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(DeleteServiceAccountResponse { + Ok(Response::new(DeleteServiceAccountResponse { success: true, error_info: None, })) @@ -2070,14 +2066,14 @@ impl Node for NodeService { let access_key = request.access_key; let temp = request.temp; if access_key.is_empty() { - return Ok(tonic::Response::new(LoadUserResponse { + return Ok(Response::new(LoadUserResponse { success: false, error_info: Some("access_key name is missing".to_string()), })); } let Some(iam_sys) = get_global_iam_sys() else { - return Ok(tonic::Response::new(LoadUserResponse { + return Ok(Response::new(LoadUserResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); @@ -2087,13 +2083,13 @@ impl Node for NodeService { let resp = iam_sys.load_user(&access_key, user_type).await; if let Err(err) = resp { - return Ok(tonic::Response::new(LoadUserResponse { + return Ok(Response::new(LoadUserResponse { success: false, error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(LoadUserResponse { + Ok(Response::new(LoadUserResponse { success: true, error_info: None, })) @@ -2106,14 +2102,14 @@ impl Node for NodeService { let request = request.into_inner(); let access_key = request.access_key; if access_key.is_empty() { - return Ok(tonic::Response::new(LoadServiceAccountResponse { + return Ok(Response::new(LoadServiceAccountResponse { success: false, error_info: Some("access_key name is missing".to_string()), })); } let Some(iam_sys) = get_global_iam_sys() else { - return Ok(tonic::Response::new(LoadServiceAccountResponse { + return Ok(Response::new(LoadServiceAccountResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); @@ -2121,13 +2117,13 @@ impl Node for NodeService { let resp = iam_sys.load_service_account(&access_key).await; if let Err(err) = resp { - return Ok(tonic::Response::new(LoadServiceAccountResponse { + return Ok(Response::new(LoadServiceAccountResponse { success: false, error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(LoadServiceAccountResponse { + Ok(Response::new(LoadServiceAccountResponse { success: true, error_info: None, })) @@ -2137,14 +2133,14 @@ impl Node for NodeService { let request = request.into_inner(); let group = request.group; if group.is_empty() { - return Ok(tonic::Response::new(LoadGroupResponse { + return Ok(Response::new(LoadGroupResponse { success: false, error_info: Some("group name is missing".to_string()), })); } let Some(iam_sys) = get_global_iam_sys() else { - return Ok(tonic::Response::new(LoadGroupResponse { + return Ok(Response::new(LoadGroupResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); @@ -2152,12 +2148,12 @@ impl Node for NodeService { let resp = iam_sys.load_group(&group).await; if let Err(err) = resp { - return Ok(tonic::Response::new(LoadGroupResponse { + return Ok(Response::new(LoadGroupResponse { success: false, error_info: Some(err.to_string()), })); } - Ok(tonic::Response::new(LoadGroupResponse { + Ok(Response::new(LoadGroupResponse { success: true, error_info: None, })) @@ -2168,7 +2164,7 @@ impl Node for NodeService { _request: Request, ) -> Result, Status> { let Some(_store) = new_object_layer_fn() else { - return Ok(tonic::Response::new(ReloadSiteReplicationConfigResponse { + return Ok(Response::new(ReloadSiteReplicationConfigResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); @@ -2211,17 +2207,17 @@ impl Node for NodeService { _request: Request, ) -> Result, Status> { let Some(store) = new_object_layer_fn() else { - return Ok(tonic::Response::new(ReloadPoolMetaResponse { + return Ok(Response::new(ReloadPoolMetaResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); }; match store.reload_pool_meta().await { - Ok(_) => Ok(tonic::Response::new(ReloadPoolMetaResponse { + Ok(_) => Ok(Response::new(ReloadPoolMetaResponse { success: true, error_info: None, })), - Err(err) => Ok(tonic::Response::new(ReloadPoolMetaResponse { + Err(err) => Ok(Response::new(ReloadPoolMetaResponse { success: false, error_info: Some(err.to_string()), })), @@ -2230,14 +2226,14 @@ impl Node for NodeService { async fn stop_rebalance(&self, _request: Request) -> Result, Status> { let Some(store) = new_object_layer_fn() else { - return Ok(tonic::Response::new(StopRebalanceResponse { + return Ok(Response::new(StopRebalanceResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); }; let _ = store.stop_rebalance().await; - Ok(tonic::Response::new(StopRebalanceResponse { + Ok(Response::new(StopRebalanceResponse { success: true, error_info: None, })) @@ -2249,7 +2245,7 @@ impl Node for NodeService { request: Request, ) -> Result, Status> { let Some(store) = new_object_layer_fn() else { - return Ok(tonic::Response::new(LoadRebalanceMetaResponse { + return Ok(Response::new(LoadRebalanceMetaResponse { success: false, error_info: Some("errServerNotInitialized".to_string()), })); @@ -2269,12 +2265,12 @@ impl Node for NodeService { if start_rebalance { warn!("start rebalance"); let store = store.clone(); - tokio::spawn(async move { + spawn(async move { store.start_rebalance().await; }); } - Ok(tonic::Response::new(LoadRebalanceMetaResponse { + Ok(Response::new(LoadRebalanceMetaResponse { success: true, error_info: None, })) @@ -2292,11 +2288,12 @@ impl Node for NodeService { #[allow(unused_imports)] mod tests { use super::*; + use Request; use rustfs_protos::proto_gen::node_service::{ - BackgroundHealStatusRequest, CheckPartsRequest, DeleteBucketMetadataRequest, DeleteBucketRequest, DeletePathsRequest, - DeletePolicyRequest, DeleteRequest, DeleteServiceAccountRequest, DeleteUserRequest, DeleteVersionRequest, - DeleteVersionsRequest, DeleteVolumeRequest, DiskInfoRequest, GenerallyLockRequest, GetBucketInfoRequest, GetCpusRequest, - GetMemInfoRequest, GetNetInfoRequest, GetOsInfoRequest, GetPartitionsRequest, GetProcInfoRequest, GetSeLinuxInfoRequest, + CheckPartsRequest, DeleteBucketMetadataRequest, DeleteBucketRequest, DeletePathsRequest, DeletePolicyRequest, + DeleteRequest, DeleteServiceAccountRequest, DeleteUserRequest, DeleteVersionRequest, DeleteVersionsRequest, + DeleteVolumeRequest, DiskInfoRequest, GenerallyLockRequest, GetBucketInfoRequest, GetCpusRequest, GetMemInfoRequest, + GetNetInfoRequest, GetOsInfoRequest, GetPartitionsRequest, GetProcInfoRequest, GetSeLinuxInfoRequest, GetSysConfigRequest, GetSysErrorsRequest, HealBucketRequest, ListBucketRequest, ListDirRequest, ListVolumesRequest, LoadBucketMetadataRequest, LoadGroupRequest, LoadPolicyMappingRequest, LoadPolicyRequest, LoadRebalanceMetaRequest, LoadServiceAccountRequest, LoadUserRequest, LocalStorageInfoRequest, MakeBucketRequest, MakeVolumeRequest, @@ -2305,7 +2302,6 @@ mod tests { ServerInfoRequest, StatVolumeRequest, StopRebalanceRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest, }; - use tonic::Request; fn create_test_node_service() -> NodeService { make_server() diff --git a/rustfs/src/update.rs b/rustfs/src/update.rs index f776ddd6..c480e630 100644 --- a/rustfs/src/update.rs +++ b/rustfs/src/update.rs @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::version; use serde::{Deserialize, Serialize}; use std::time::Duration; use thiserror::Error; use tracing::{debug, error, info}; -use crate::version; - /// Update check related errors #[derive(Error, Debug)] pub enum UpdateCheckError { diff --git a/rustfs/src/version.rs b/rustfs/src/version.rs index c516a556..51f9ee98 100644 --- a/rustfs/src/version.rs +++ b/rustfs/src/version.rs @@ -1,3 +1,17 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use shadow_rs::shadow; use std::process::Command;