cargo fmt

This commit is contained in:
houseme
2025-06-06 15:30:27 +08:00
parent 3eaa3d68c7
commit 1f62c35846
18 changed files with 70 additions and 69 deletions

View File

@@ -1,7 +1,7 @@
pub mod bucket_stats;
pub mod error;
pub mod globals;
pub mod last_minute;
pub mod bucket_stats;
// is ','
pub static DEFAULT_DELIMITER: u8 = 44;

View File

@@ -106,7 +106,7 @@ pub fn decrypt_data(password: &[u8], data: &[u8]) -> Result<Vec<u8>, crate::Erro
// .map_err(|_| DecryptError::DecryptionError)?
// }
// 0x03 => {
// let cipher = Aes256Gcm::new(Key::from_slice(&key));
// let nonce = Nonce::from_slice(nonce);
// cipher

View File

@@ -346,7 +346,7 @@ impl BucketMetadata {
}
//let temp = self.bucket_targets_config_json.clone();
if !self.bucket_targets_config_json.is_empty() {
let arr:Vec<BucketTarget> = serde_json::from_slice(&self.bucket_targets_config_json)?;
let arr: Vec<BucketTarget> = serde_json::from_slice(&self.bucket_targets_config_json)?;
self.bucket_target_config = Some(BucketTargets { targets: arr });
} else {
self.bucket_target_config = Some(BucketTargets::default())

View File

@@ -558,7 +558,7 @@ impl BucketMetadataSys {
return Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound));
}
res
},
}
Err(err) => {
warn!("get_replication_config err {:?}", &err);
return if config::error::is_err_config_not_found(&err) {
@@ -593,7 +593,6 @@ impl BucketMetadataSys {
}
};
println!("573");
if let Some(config) = &bm.bucket_target_config {

View File

@@ -4,9 +4,9 @@ pub mod metadata_sys;
pub mod object_lock;
pub mod policy_sys;
mod quota;
pub mod replication;
pub mod tagging;
pub mod target;
pub mod utils;
pub mod versioning;
pub mod replication;
pub mod versioning_sys;

View File

@@ -24,4 +24,4 @@ impl StatusType {
pub fn is_empty(&self) -> bool {
matches!(self, StatusType::Pending) // Adjust this as needed
}
}
}

View File

@@ -1 +1 @@
pub mod datatypes;
pub mod datatypes;

View File

@@ -6,7 +6,6 @@ use crate::bucket::versioning_sys::BucketVersioningSys;
use crate::new_object_layer_fn;
use crate::peer::RemotePeerS3Client;
use crate::store;
use crate::store_api;
use crate::store_api::ObjectIO;
use crate::store_api::ObjectInfo;
use crate::store_api::ObjectOptions;

View File

@@ -13,7 +13,6 @@ use crate::{
//use tokio::sync::RwLock;
use aws_sdk_s3::Client as S3Client;
use chrono::Utc;
use futures::future::err;
use lazy_static::lazy_static;
use std::sync::Arc;
use std::{

View File

@@ -1,2 +1,2 @@
pub mod bucket_replication;
pub mod bucket_targets;
pub mod bucket_replication;

View File

@@ -18,10 +18,12 @@ use super::{
data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash},
heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN},
};
use crate::{
bucket::{metadata_sys, versioning::VersioningApi, versioning_sys::BucketVersioningSys}, cmd::bucket_replication::{ReplicationStatusType}, heal::data_usage::DATA_USAGE_ROOT
};
use crate::cmd::bucket_replication::queue_replication_heal;
use crate::{
bucket::{metadata_sys, versioning::VersioningApi, versioning_sys::BucketVersioningSys},
cmd::bucket_replication::ReplicationStatusType,
heal::data_usage::DATA_USAGE_ROOT,
};
use crate::{
cache_value::metacache_set::{list_path_raw, ListPathRawOptions},
config::{
@@ -553,7 +555,13 @@ impl ScannerItem {
pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, usize) {
let done = ScannerMetrics::time(ScannerMetric::Ilm);
//todo: lifecycle
info!("apply_actions {} {} {:?} {:?}", oi.bucket.clone(), oi.name.clone(), oi.version_id.clone(), oi.user_defined.clone());
info!(
"apply_actions {} {} {:?} {:?}",
oi.bucket.clone(),
oi.name.clone(),
oi.version_id.clone(),
oi.user_defined.clone()
);
// Create a mutable clone if you need to modify fields
let mut oi = oi.clone();
@@ -561,7 +569,7 @@ impl ScannerItem {
oi.user_defined
.as_ref()
.and_then(|map| map.get("x-amz-bucket-replication-status"))
.unwrap_or(&"PENDING".to_string())
.unwrap_or(&"PENDING".to_string()),
);
info!("apply status is: {:?}", oi.replication_status);
self.heal_replication(&oi, _size_s).await;
@@ -572,11 +580,15 @@ impl ScannerItem {
pub async fn heal_replication(&mut self, oi: &ObjectInfo, size_s: &mut SizeSummary) {
if oi.version_id.is_none() {
error!("heal_replication: no version_id or replication config {} {} {}", oi.bucket, oi.name, oi.version_id.is_none());
error!(
"heal_replication: no version_id or replication config {} {} {}",
oi.bucket,
oi.name,
oi.version_id.is_none()
);
return;
}
//let config = s3s::dto::ReplicationConfiguration{ role: todo!(), rules: todo!() };
// Use the provided variable instead of borrowing self mutably.
let replication = match metadata_sys::get_replication_config(&oi.bucket).await {
@@ -597,16 +609,17 @@ impl ScannerItem {
//if oi.delete_marker || !oi.version_purge_status.is_empty() {
if oi.delete_marker {
error!("heal_replication: delete marker or version purge status {} {} {:?} {} {:?}", oi.bucket, oi.name, oi.version_id, oi.delete_marker, oi.version_purge_status);
error!(
"heal_replication: delete marker or version purge status {} {} {:?} {} {:?}",
oi.bucket, oi.name, oi.version_id, oi.delete_marker, oi.version_purge_status
);
return;
}
if oi.replication_status == ReplicationStatusType::Completed {
return;
}
info!("replication status is: {:?} and user define {:?}", oi.replication_status, oi.user_defined);
let roi = queue_replication_heal(&oi.bucket, oi, &replication, 3).await;
@@ -617,10 +630,7 @@ impl ScannerItem {
}
for (arn, tgt_status) in &roi.unwrap().target_statuses {
let tgt_size_s = size_s
.repl_target_stats
.entry(arn.clone())
.or_default();
let tgt_size_s = size_s.repl_target_stats.entry(arn.clone()).or_default();
match tgt_status {
ReplicationStatusType::Pending => {
@@ -650,7 +660,6 @@ impl ScannerItem {
size_s.replica_size += oi.size;
}
}
}
#[derive(Debug, Default)]

View File

@@ -3,6 +3,7 @@ pub mod bitrot;
pub mod bucket;
pub mod cache_value;
mod chunk_stream;
pub mod cmd;
pub mod config;
pub mod disk;
pub mod disks_layout;
@@ -32,7 +33,6 @@ pub mod store_list_objects;
mod store_utils;
pub mod utils;
pub mod xhttp;
pub mod cmd;
pub use global::new_object_layer_fn;
pub use global::set_global_endpoints;

View File

@@ -1374,7 +1374,7 @@ impl StorageAPI for ECStore {
info.versionning = sys.versioning();
info.object_locking = sys.object_locking();
}
Ok(info)
}
#[tracing::instrument(skip(self))]

View File

@@ -686,12 +686,11 @@ pub struct ObjectInfo {
pub inlined: bool,
pub metadata_only: bool,
pub version_only: bool,
pub replication_status_internal:String,
pub replication_status:ReplicationStatusType,
pub replication_status_internal: String,
pub replication_status: ReplicationStatusType,
pub version_purge_status_internal: String,
pub version_purge_status: VersionPurgeStatusType,
pub checksum:Vec<u8>,
pub checksum: Vec<u8>,
}
impl Clone for ObjectInfo {
@@ -724,10 +723,9 @@ impl Clone for ObjectInfo {
replication_status: self.replication_status.clone(),
version_purge_status_internal: self.version_purge_status_internal.clone(),
version_purge_status: self.version_purge_status.clone(),
checksum:Default::default(),
checksum: Default::default(),
}
}
}
impl ObjectInfo {

View File

@@ -2,15 +2,15 @@ use super::router::Operation;
use crate::auth::check_key_valid;
use crate::auth::get_condition_values;
use crate::auth::get_session_token;
//use ecstore::error::Error as ec_Error;
use crate::storage::error::to_s3_error;
use bytes::Bytes;
use common::error::Error as ec_Error;
use ecstore::admin_server_info::get_server_info;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::bucket::metadata_sys::{self, get_replication_config};
use ecstore::bucket::target::{BucketTarget};
use ecstore::bucket::target::BucketTarget;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::cmd::bucket_targets::{self, GLOBAL_Bucket_Target_Sys};
//use ecstore::error::Error as ec_Error;
use crate::storage::error::to_s3_error;
use ecstore::global::GLOBAL_ALlHealState;
use ecstore::heal::data_usage::load_data_usage_from_backend;
use ecstore::heal::heal_commands::HealOpts;
@@ -21,7 +21,7 @@ use ecstore::peer::is_reserved_or_invalid_bucket;
use ecstore::pools::{get_total_usable_capacity, get_total_usable_capacity_free};
use ecstore::store::is_valid_object_prefix;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::{StorageAPI};
use ecstore::store_api::StorageAPI;
use ecstore::utils::path::path_join;
use futures::{Stream, StreamExt};
use http::{HeaderMap, Uri};
@@ -32,12 +32,12 @@ use iam::store::MappedPolicy;
use madmin::metrics::RealtimeMetrics;
use madmin::utils::parse_duration;
use matchit::Params;
use percent_encoding::{percent_encode, AsciiSet, CONTROLS};
use policy::policy::action::Action;
use policy::policy::action::S3Action;
use policy::policy::default::DEFAULT_POLICIES;
use policy::policy::Args;
use policy::policy::BucketPolicy;
use percent_encoding::{percent_encode, AsciiSet, CONTROLS};
use s3s::header::CONTENT_TYPE;
use s3s::stream::{ByteStream, DynByteStream};
use s3s::{s3_error, Body, S3Error, S3Request, S3Response, S3Result};
@@ -773,7 +773,7 @@ fn extract_query_params(uri: &Uri) -> HashMap<String, String> {
#[allow(dead_code)]
fn is_local_host(_host: String) -> bool {
return false;
false
}
//awscurl --service s3 --region us-east-1 --access_key rustfsadmin --secret_key rustfsadmin "http://:9000/rustfs/admin/v3/replicationmetrics?bucket=1"
@@ -804,7 +804,7 @@ impl Operation for SetRemoteTargetHandler {
error!("credentials null");
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let _is_owner = true; // 先按true处理后期根据请求决定
let _is_owner = true; // 先按 true 处理,后期根据请求决定
let body = _req.input.store_all_unlimited().await.unwrap();
//println!("body: {}", std::str::from_utf8(&body.clone()).unwrap());
@@ -846,7 +846,7 @@ impl Operation for SetRemoteTargetHandler {
}
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
let (arn, exist) = sys.get_remote_arn(&bucket, Some(&remote_target), "").await;
let (arn, exist) = sys.get_remote_arn(bucket, Some(&remote_target), "").await;
info!("exist: {} {}", exist, arn.clone().unwrap_or_default());
if exist && arn.is_some() {
let jsonarn = serde_json::to_string(&arn).expect("failed to serialize");
@@ -858,7 +858,7 @@ impl Operation for SetRemoteTargetHandler {
Ok(_) => {
{
//todo 各种持久化的工作
let targets = sys.list_targets(Some(&bucket), None).await;
let targets = sys.list_targets(Some(bucket), None).await;
info!("targets is {}", targets.len());
match serde_json::to_vec(&targets) {
Ok(json) => {
@@ -950,7 +950,7 @@ impl Operation for ListRemoteTargetHandler {
}
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
let targets = sys.list_targets(Some(&bucket), None).await;
let targets = sys.list_targets(Some(bucket), None).await;
error!("target sys len {}", targets.len());
if targets.is_empty() {
return Ok(S3Response::new((
@@ -994,11 +994,11 @@ impl Operation for RemoveRemoteTargetHandler {
if let Some(bucket) = querys.get("bucket") {
if bucket.is_empty() {
error!("bucket parameter is empty");
return Ok(S3Response::new((StatusCode::NOT_FOUND, Body::from("bucket not found".to_string()))));
return Ok(S3Response::new((StatusCode::NOT_FOUND, Body::from("bucket not found".to_string()))));
}
let _arn = bucket_targets::ARN::parse(&arnstr);
let _arn = bucket_targets::ARN::parse(arnstr);
match get_replication_config(&bucket).await {
match get_replication_config(bucket).await {
Ok((conf, _ts)) => {
for ru in conf.rules {
let encoded = percent_encode(ru.destination.bucket.as_bytes(), &COLON);
@@ -1017,7 +1017,7 @@ impl Operation for RemoveRemoteTargetHandler {
}
//percent_decode_str(&arnstr);
let decoded_str = decode(arnstr).unwrap();
error!("need delete target is {}" , decoded_str);
error!("need delete target is {}", decoded_str);
bucket_targets::remove_bucket_target(bucket, arnstr).await;
}
}

View File

@@ -11,7 +11,7 @@ use handlers::{
sts, user,
};
use handlers::{ListRemoteTargetHandler, SetRemoteTargetHandler, RemoveRemoteTargetHandler, GetReplicationMetricsHandler};
use handlers::{GetReplicationMetricsHandler, ListRemoteTargetHandler, RemoveRemoteTargetHandler, SetRemoteTargetHandler};
use hyper::Method;
use router::{AdminOperation, S3Router};
use rpc::regist_rpc_route;
@@ -231,51 +231,49 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> Result<()> {
)?;
r.insert(
Method::GET,
format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/list-remote-targets").as_str(),
Method::GET,
format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/list-remote-targets").as_str(),
AdminOperation(&ListRemoteTargetHandler {}),
)?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/list-remote-targets").as_str(),
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/list-remote-targets").as_str(),
AdminOperation(&ListRemoteTargetHandler {}),
)?;
r.insert(
Method::GET,
Method::GET,
format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/replicationmetrics").as_str(),
AdminOperation(&GetReplicationMetricsHandler {}),
)?;
r.insert(
Method::GET,
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/replicationmetrics").as_str(),
AdminOperation(&GetReplicationMetricsHandler {}),
)?;
r.insert(
Method::PUT,
format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/set-remote-target").as_str(),
Method::PUT,
format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/set-remote-target").as_str(),
AdminOperation(&SetRemoteTargetHandler {}),
)?;
r.insert(
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/set-remote-target").as_str(),
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/set-remote-target").as_str(),
AdminOperation(&SetRemoteTargetHandler {}),
)?;
r.insert(
Method::DELETE,
format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/remove-remote-target").as_str(),
Method::DELETE,
format!("{}{}", RUSTFS_ADMIN_PREFIX, "/v3/remove-remote-target").as_str(),
AdminOperation(&RemoveRemoteTargetHandler {}),
)?;
r.insert(
Method::DELETE,
format!("{}{}", ADMIN_PREFIX, "/v3/remove-remote-target").as_str(),
Method::DELETE,
format!("{}{}", ADMIN_PREFIX, "/v3/remove-remote-target").as_str(),
AdminOperation(&RemoveRemoteTargetHandler {}),
)?;

View File

@@ -249,8 +249,7 @@ impl Node for NodeService {
}))
}
// println!("vuc")
// println!("vuc")
Err(err) => Ok(tonic::Response::new(GetBucketInfoResponse {
success: false,
bucket_info: String::new(),

View File

@@ -17,12 +17,12 @@ use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, Shutdo
use bytes::Bytes;
use chrono::Datelike;
use clap::Parser;
use ecstore::cmd::bucket_replication::init_bucket_replication_pool;
use common::{
error::{Error, Result},
globals::set_global_addr,
};
use ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
use ecstore::cmd::bucket_replication::init_bucket_replication_pool;
use ecstore::config as ecconfig;
use ecstore::config::GLOBAL_ConfigSys;
use ecstore::heal::background_heal_ops::init_auto_heal;