From 1f62c35846d8e79f8bc3d89a8d1f230358b97f0e Mon Sep 17 00:00:00 2001 From: houseme Date: Fri, 6 Jun 2025 15:30:27 +0800 Subject: [PATCH] cargo fmt --- common/common/src/lib.rs | 2 +- crypto/src/encdec/decrypt.rs | 2 +- ecstore/src/bucket/metadata.rs | 2 +- ecstore/src/bucket/metadata_sys.rs | 3 +- ecstore/src/bucket/mod.rs | 2 +- ecstore/src/bucket/replication/datatypes.rs | 2 +- ecstore/src/bucket/replication/mod.rs | 2 +- ecstore/src/cmd/bucket_replication.rs | 1 - ecstore/src/cmd/bucket_targets.rs | 1 - ecstore/src/cmd/mod.rs | 2 +- ecstore/src/heal/data_scanner.rs | 39 +++++++++++++-------- ecstore/src/lib.rs | 2 +- ecstore/src/store.rs | 2 +- ecstore/src/store_api.rs | 10 +++--- rustfs/src/admin/handlers.rs | 30 ++++++++-------- rustfs/src/admin/mod.rs | 32 ++++++++--------- rustfs/src/grpc.rs | 3 +- rustfs/src/main.rs | 2 +- 18 files changed, 70 insertions(+), 69 deletions(-) diff --git a/common/common/src/lib.rs b/common/common/src/lib.rs index e6a3849f..d17a8aa5 100644 --- a/common/common/src/lib.rs +++ b/common/common/src/lib.rs @@ -1,7 +1,7 @@ +pub mod bucket_stats; pub mod error; pub mod globals; pub mod last_minute; -pub mod bucket_stats; // is ',' pub static DEFAULT_DELIMITER: u8 = 44; diff --git a/crypto/src/encdec/decrypt.rs b/crypto/src/encdec/decrypt.rs index f339f219..8c46ab46 100644 --- a/crypto/src/encdec/decrypt.rs +++ b/crypto/src/encdec/decrypt.rs @@ -106,7 +106,7 @@ pub fn decrypt_data(password: &[u8], data: &[u8]) -> Result, crate::Erro // .map_err(|_| DecryptError::DecryptionError)? // } // 0x03 => { - + // let cipher = Aes256Gcm::new(Key::from_slice(&key)); // let nonce = Nonce::from_slice(nonce); // cipher diff --git a/ecstore/src/bucket/metadata.rs b/ecstore/src/bucket/metadata.rs index f7d628fd..8f934def 100644 --- a/ecstore/src/bucket/metadata.rs +++ b/ecstore/src/bucket/metadata.rs @@ -346,7 +346,7 @@ impl BucketMetadata { } //let temp = self.bucket_targets_config_json.clone(); if !self.bucket_targets_config_json.is_empty() { - let arr:Vec = serde_json::from_slice(&self.bucket_targets_config_json)?; + let arr: Vec = serde_json::from_slice(&self.bucket_targets_config_json)?; self.bucket_target_config = Some(BucketTargets { targets: arr }); } else { self.bucket_target_config = Some(BucketTargets::default()) diff --git a/ecstore/src/bucket/metadata_sys.rs b/ecstore/src/bucket/metadata_sys.rs index 9024ea2f..9a11a840 100644 --- a/ecstore/src/bucket/metadata_sys.rs +++ b/ecstore/src/bucket/metadata_sys.rs @@ -558,7 +558,7 @@ impl BucketMetadataSys { return Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound)); } res - }, + } Err(err) => { warn!("get_replication_config err {:?}", &err); return if config::error::is_err_config_not_found(&err) { @@ -593,7 +593,6 @@ impl BucketMetadataSys { } }; - println!("573"); if let Some(config) = &bm.bucket_target_config { diff --git a/ecstore/src/bucket/mod.rs b/ecstore/src/bucket/mod.rs index 35cd030a..a4e79c93 100644 --- a/ecstore/src/bucket/mod.rs +++ b/ecstore/src/bucket/mod.rs @@ -4,9 +4,9 @@ pub mod metadata_sys; pub mod object_lock; pub mod policy_sys; mod quota; +pub mod replication; pub mod tagging; pub mod target; pub mod utils; pub mod versioning; -pub mod replication; pub mod versioning_sys; diff --git a/ecstore/src/bucket/replication/datatypes.rs b/ecstore/src/bucket/replication/datatypes.rs index f2d78055..26c7c005 100644 --- a/ecstore/src/bucket/replication/datatypes.rs +++ b/ecstore/src/bucket/replication/datatypes.rs @@ -24,4 +24,4 @@ impl StatusType { pub fn is_empty(&self) -> bool { matches!(self, StatusType::Pending) // Adjust this as needed } -} \ No newline at end of file +} diff --git a/ecstore/src/bucket/replication/mod.rs b/ecstore/src/bucket/replication/mod.rs index 58fd615b..0d8681dc 100644 --- a/ecstore/src/bucket/replication/mod.rs +++ b/ecstore/src/bucket/replication/mod.rs @@ -1 +1 @@ -pub mod datatypes; \ No newline at end of file +pub mod datatypes; diff --git a/ecstore/src/cmd/bucket_replication.rs b/ecstore/src/cmd/bucket_replication.rs index 199b2791..179af69d 100644 --- a/ecstore/src/cmd/bucket_replication.rs +++ b/ecstore/src/cmd/bucket_replication.rs @@ -6,7 +6,6 @@ use crate::bucket::versioning_sys::BucketVersioningSys; use crate::new_object_layer_fn; use crate::peer::RemotePeerS3Client; use crate::store; -use crate::store_api; use crate::store_api::ObjectIO; use crate::store_api::ObjectInfo; use crate::store_api::ObjectOptions; diff --git a/ecstore/src/cmd/bucket_targets.rs b/ecstore/src/cmd/bucket_targets.rs index 3cfba660..e7d76715 100644 --- a/ecstore/src/cmd/bucket_targets.rs +++ b/ecstore/src/cmd/bucket_targets.rs @@ -13,7 +13,6 @@ use crate::{ //use tokio::sync::RwLock; use aws_sdk_s3::Client as S3Client; use chrono::Utc; -use futures::future::err; use lazy_static::lazy_static; use std::sync::Arc; use std::{ diff --git a/ecstore/src/cmd/mod.rs b/ecstore/src/cmd/mod.rs index dadec5ab..da274f39 100644 --- a/ecstore/src/cmd/mod.rs +++ b/ecstore/src/cmd/mod.rs @@ -1,2 +1,2 @@ +pub mod bucket_replication; pub mod bucket_targets; -pub mod bucket_replication; \ No newline at end of file diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 1db082bb..b0297996 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -18,10 +18,12 @@ use super::{ data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash}, heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN}, }; -use crate::{ - bucket::{metadata_sys, versioning::VersioningApi, versioning_sys::BucketVersioningSys}, cmd::bucket_replication::{ReplicationStatusType}, heal::data_usage::DATA_USAGE_ROOT -}; use crate::cmd::bucket_replication::queue_replication_heal; +use crate::{ + bucket::{metadata_sys, versioning::VersioningApi, versioning_sys::BucketVersioningSys}, + cmd::bucket_replication::ReplicationStatusType, + heal::data_usage::DATA_USAGE_ROOT, +}; use crate::{ cache_value::metacache_set::{list_path_raw, ListPathRawOptions}, config::{ @@ -553,7 +555,13 @@ impl ScannerItem { pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, usize) { let done = ScannerMetrics::time(ScannerMetric::Ilm); //todo: lifecycle - info!("apply_actions {} {} {:?} {:?}", oi.bucket.clone(), oi.name.clone(), oi.version_id.clone(), oi.user_defined.clone()); + info!( + "apply_actions {} {} {:?} {:?}", + oi.bucket.clone(), + oi.name.clone(), + oi.version_id.clone(), + oi.user_defined.clone() + ); // Create a mutable clone if you need to modify fields let mut oi = oi.clone(); @@ -561,7 +569,7 @@ impl ScannerItem { oi.user_defined .as_ref() .and_then(|map| map.get("x-amz-bucket-replication-status")) - .unwrap_or(&"PENDING".to_string()) + .unwrap_or(&"PENDING".to_string()), ); info!("apply status is: {:?}", oi.replication_status); self.heal_replication(&oi, _size_s).await; @@ -572,11 +580,15 @@ impl ScannerItem { pub async fn heal_replication(&mut self, oi: &ObjectInfo, size_s: &mut SizeSummary) { if oi.version_id.is_none() { - error!("heal_replication: no version_id or replication config {} {} {}", oi.bucket, oi.name, oi.version_id.is_none()); + error!( + "heal_replication: no version_id or replication config {} {} {}", + oi.bucket, + oi.name, + oi.version_id.is_none() + ); return; } - //let config = s3s::dto::ReplicationConfiguration{ role: todo!(), rules: todo!() }; // Use the provided variable instead of borrowing self mutably. let replication = match metadata_sys::get_replication_config(&oi.bucket).await { @@ -597,16 +609,17 @@ impl ScannerItem { //if oi.delete_marker || !oi.version_purge_status.is_empty() { if oi.delete_marker { - error!("heal_replication: delete marker or version purge status {} {} {:?} {} {:?}", oi.bucket, oi.name, oi.version_id, oi.delete_marker, oi.version_purge_status); + error!( + "heal_replication: delete marker or version purge status {} {} {:?} {} {:?}", + oi.bucket, oi.name, oi.version_id, oi.delete_marker, oi.version_purge_status + ); return; } - if oi.replication_status == ReplicationStatusType::Completed { return; } - info!("replication status is: {:?} and user define {:?}", oi.replication_status, oi.user_defined); let roi = queue_replication_heal(&oi.bucket, oi, &replication, 3).await; @@ -617,10 +630,7 @@ impl ScannerItem { } for (arn, tgt_status) in &roi.unwrap().target_statuses { - let tgt_size_s = size_s - .repl_target_stats - .entry(arn.clone()) - .or_default(); + let tgt_size_s = size_s.repl_target_stats.entry(arn.clone()).or_default(); match tgt_status { ReplicationStatusType::Pending => { @@ -650,7 +660,6 @@ impl ScannerItem { size_s.replica_size += oi.size; } } - } #[derive(Debug, Default)] diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 246b18ac..c1cb7b68 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -3,6 +3,7 @@ pub mod bitrot; pub mod bucket; pub mod cache_value; mod chunk_stream; +pub mod cmd; pub mod config; pub mod disk; pub mod disks_layout; @@ -32,7 +33,6 @@ pub mod store_list_objects; mod store_utils; pub mod utils; pub mod xhttp; -pub mod cmd; pub use global::new_object_layer_fn; pub use global::set_global_endpoints; diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 51d04667..608f06be 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -1374,7 +1374,7 @@ impl StorageAPI for ECStore { info.versionning = sys.versioning(); info.object_locking = sys.object_locking(); } - + Ok(info) } #[tracing::instrument(skip(self))] diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 6de6b7f1..6543e6dc 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -686,12 +686,11 @@ pub struct ObjectInfo { pub inlined: bool, pub metadata_only: bool, pub version_only: bool, - pub replication_status_internal:String, - pub replication_status:ReplicationStatusType, + pub replication_status_internal: String, + pub replication_status: ReplicationStatusType, pub version_purge_status_internal: String, pub version_purge_status: VersionPurgeStatusType, - pub checksum:Vec, - + pub checksum: Vec, } impl Clone for ObjectInfo { @@ -724,10 +723,9 @@ impl Clone for ObjectInfo { replication_status: self.replication_status.clone(), version_purge_status_internal: self.version_purge_status_internal.clone(), version_purge_status: self.version_purge_status.clone(), - checksum:Default::default(), + checksum: Default::default(), } } - } impl ObjectInfo { diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 2318acf9..6be56f11 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -2,15 +2,15 @@ use super::router::Operation; use crate::auth::check_key_valid; use crate::auth::get_condition_values; use crate::auth::get_session_token; +//use ecstore::error::Error as ec_Error; +use crate::storage::error::to_s3_error; use bytes::Bytes; use common::error::Error as ec_Error; use ecstore::admin_server_info::get_server_info; -use ecstore::bucket::versioning_sys::BucketVersioningSys; use ecstore::bucket::metadata_sys::{self, get_replication_config}; -use ecstore::bucket::target::{BucketTarget}; +use ecstore::bucket::target::BucketTarget; +use ecstore::bucket::versioning_sys::BucketVersioningSys; use ecstore::cmd::bucket_targets::{self, GLOBAL_Bucket_Target_Sys}; -//use ecstore::error::Error as ec_Error; -use crate::storage::error::to_s3_error; use ecstore::global::GLOBAL_ALlHealState; use ecstore::heal::data_usage::load_data_usage_from_backend; use ecstore::heal::heal_commands::HealOpts; @@ -21,7 +21,7 @@ use ecstore::peer::is_reserved_or_invalid_bucket; use ecstore::pools::{get_total_usable_capacity, get_total_usable_capacity_free}; use ecstore::store::is_valid_object_prefix; use ecstore::store_api::BucketOptions; -use ecstore::store_api::{StorageAPI}; +use ecstore::store_api::StorageAPI; use ecstore::utils::path::path_join; use futures::{Stream, StreamExt}; use http::{HeaderMap, Uri}; @@ -32,12 +32,12 @@ use iam::store::MappedPolicy; use madmin::metrics::RealtimeMetrics; use madmin::utils::parse_duration; use matchit::Params; +use percent_encoding::{percent_encode, AsciiSet, CONTROLS}; use policy::policy::action::Action; use policy::policy::action::S3Action; use policy::policy::default::DEFAULT_POLICIES; use policy::policy::Args; use policy::policy::BucketPolicy; -use percent_encoding::{percent_encode, AsciiSet, CONTROLS}; use s3s::header::CONTENT_TYPE; use s3s::stream::{ByteStream, DynByteStream}; use s3s::{s3_error, Body, S3Error, S3Request, S3Response, S3Result}; @@ -773,7 +773,7 @@ fn extract_query_params(uri: &Uri) -> HashMap { #[allow(dead_code)] fn is_local_host(_host: String) -> bool { - return false; + false } //awscurl --service s3 --region us-east-1 --access_key rustfsadmin --secret_key rustfsadmin "http://:9000/rustfs/admin/v3/replicationmetrics?bucket=1" @@ -804,7 +804,7 @@ impl Operation for SetRemoteTargetHandler { error!("credentials null"); return Err(s3_error!(InvalidRequest, "get cred failed")); }; - let _is_owner = true; // 先按true处理,后期根据请求决定 + let _is_owner = true; // 先按 true 处理,后期根据请求决定 let body = _req.input.store_all_unlimited().await.unwrap(); //println!("body: {}", std::str::from_utf8(&body.clone()).unwrap()); @@ -846,7 +846,7 @@ impl Operation for SetRemoteTargetHandler { } if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() { - let (arn, exist) = sys.get_remote_arn(&bucket, Some(&remote_target), "").await; + let (arn, exist) = sys.get_remote_arn(bucket, Some(&remote_target), "").await; info!("exist: {} {}", exist, arn.clone().unwrap_or_default()); if exist && arn.is_some() { let jsonarn = serde_json::to_string(&arn).expect("failed to serialize"); @@ -858,7 +858,7 @@ impl Operation for SetRemoteTargetHandler { Ok(_) => { { //todo 各种持久化的工作 - let targets = sys.list_targets(Some(&bucket), None).await; + let targets = sys.list_targets(Some(bucket), None).await; info!("targets is {}", targets.len()); match serde_json::to_vec(&targets) { Ok(json) => { @@ -950,7 +950,7 @@ impl Operation for ListRemoteTargetHandler { } if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() { - let targets = sys.list_targets(Some(&bucket), None).await; + let targets = sys.list_targets(Some(bucket), None).await; error!("target sys len {}", targets.len()); if targets.is_empty() { return Ok(S3Response::new(( @@ -994,11 +994,11 @@ impl Operation for RemoveRemoteTargetHandler { if let Some(bucket) = querys.get("bucket") { if bucket.is_empty() { error!("bucket parameter is empty"); - return Ok(S3Response::new((StatusCode::NOT_FOUND, Body::from("bucket not found".to_string())))); + return Ok(S3Response::new((StatusCode::NOT_FOUND, Body::from("bucket not found".to_string())))); } - let _arn = bucket_targets::ARN::parse(&arnstr); + let _arn = bucket_targets::ARN::parse(arnstr); - match get_replication_config(&bucket).await { + match get_replication_config(bucket).await { Ok((conf, _ts)) => { for ru in conf.rules { let encoded = percent_encode(ru.destination.bucket.as_bytes(), &COLON); @@ -1017,7 +1017,7 @@ impl Operation for RemoveRemoteTargetHandler { } //percent_decode_str(&arnstr); let decoded_str = decode(arnstr).unwrap(); - error!("need delete target is {}" , decoded_str); + error!("need delete target is {}", decoded_str); bucket_targets::remove_bucket_target(bucket, arnstr).await; } } diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index bbb1ad9b..24cc6178 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -11,7 +11,7 @@ use handlers::{ sts, user, }; -use handlers::{ListRemoteTargetHandler, SetRemoteTargetHandler, RemoveRemoteTargetHandler, GetReplicationMetricsHandler}; +use handlers::{GetReplicationMetricsHandler, ListRemoteTargetHandler, RemoveRemoteTargetHandler, SetRemoteTargetHandler}; use hyper::Method; use router::{AdminOperation, S3Router}; use rpc::regist_rpc_route; @@ -231,51 +231,49 @@ fn register_user_route(r: &mut S3Router) -> Result<()> { )?; r.insert( - Method::GET, - format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/list-remote-targets").as_str(), + Method::GET, + format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/list-remote-targets").as_str(), AdminOperation(&ListRemoteTargetHandler {}), )?; r.insert( - Method::GET, - format!("{}{}", ADMIN_PREFIX, "/v3/list-remote-targets").as_str(), + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/list-remote-targets").as_str(), AdminOperation(&ListRemoteTargetHandler {}), )?; r.insert( - Method::GET, + Method::GET, format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/replicationmetrics").as_str(), AdminOperation(&GetReplicationMetricsHandler {}), )?; r.insert( - Method::GET, + Method::GET, format!("{}{}", ADMIN_PREFIX, "/v3/replicationmetrics").as_str(), AdminOperation(&GetReplicationMetricsHandler {}), )?; - r.insert( - Method::PUT, - format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/set-remote-target").as_str(), + Method::PUT, + format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/set-remote-target").as_str(), AdminOperation(&SetRemoteTargetHandler {}), )?; r.insert( - Method::PUT, - format!("{}{}", ADMIN_PREFIX, "/v3/set-remote-target").as_str(), + Method::PUT, + format!("{}{}", ADMIN_PREFIX, "/v3/set-remote-target").as_str(), AdminOperation(&SetRemoteTargetHandler {}), )?; - r.insert( - Method::DELETE, - format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/remove-remote-target").as_str(), + Method::DELETE, + format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/remove-remote-target").as_str(), AdminOperation(&RemoveRemoteTargetHandler {}), )?; r.insert( - Method::DELETE, - format!("{}{}", ADMIN_PREFIX, "/v3/remove-remote-target").as_str(), + Method::DELETE, + format!("{}{}", ADMIN_PREFIX, "/v3/remove-remote-target").as_str(), AdminOperation(&RemoveRemoteTargetHandler {}), )?; diff --git a/rustfs/src/grpc.rs b/rustfs/src/grpc.rs index 88396ca3..e426c876 100644 --- a/rustfs/src/grpc.rs +++ b/rustfs/src/grpc.rs @@ -249,8 +249,7 @@ impl Node for NodeService { })) } - // println!("vuc") - + // println!("vuc") Err(err) => Ok(tonic::Response::new(GetBucketInfoResponse { success: false, bucket_info: String::new(), diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 4bd279ee..efe99d55 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -17,12 +17,12 @@ use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, Shutdo use bytes::Bytes; use chrono::Datelike; use clap::Parser; -use ecstore::cmd::bucket_replication::init_bucket_replication_pool; use common::{ error::{Error, Result}, globals::set_global_addr, }; use ecstore::bucket::metadata_sys::init_bucket_metadata_sys; +use ecstore::cmd::bucket_replication::init_bucket_replication_pool; use ecstore::config as ecconfig; use ecstore::config::GLOBAL_ConfigSys; use ecstore::heal::background_heal_ops::init_auto_heal;