diff --git a/Cargo.lock b/Cargo.lock index 8b1df0b9..e1b3a4cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -52,17 +52,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "ahash" -version = "0.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" -dependencies = [ - "getrandom 0.2.16", - "once_cell", - "version_check", -] - [[package]] name = "ahash" version = "0.8.12" @@ -301,7 +290,7 @@ version = "54.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a12fcdb3f1d03f69d3ec26ac67645a8fe3f878d77b5ebb0b15d64a116c212985" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow-buffer", "arrow-data", "arrow-schema", @@ -446,7 +435,7 @@ version = "54.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69efcd706420e52cd44f5c4358d279801993846d1c2a8e52111853d61d55a619" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow-array", "arrow-buffer", "arrow-data", @@ -819,7 +808,7 @@ dependencies = [ "http 0.2.12", "http 1.3.1", "http-body 0.4.6", - "lru 0.12.5", + "lru", "percent-encoding", "regex-lite", "sha2 0.10.9", @@ -2381,7 +2370,7 @@ dependencies = [ "hashbrown 0.14.5", "lock_api", "once_cell", - "parking_lot_core 0.9.11", + "parking_lot_core", ] [[package]] @@ -2395,7 +2384,7 @@ dependencies = [ "hashbrown 0.14.5", "lock_api", "once_cell", - "parking_lot_core 0.9.11", + "parking_lot_core", ] [[package]] @@ -2442,7 +2431,7 @@ dependencies = [ "itertools 0.14.0", "log", "object_store", - "parking_lot 0.12.4", + "parking_lot", "parquet", "rand 0.8.5", "regex", @@ -2472,7 +2461,7 @@ dependencies = [ "futures", "itertools 0.14.0", "log", - "parking_lot 0.12.4", + "parking_lot", ] [[package]] @@ -2503,7 +2492,7 @@ version = "46.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f53d7ec508e1b3f68bd301cee3f649834fad51eff9240d898a4b2614cfd0a7a" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow", "arrow-ipc", "base64 0.22.1", @@ -2584,7 +2573,7 @@ dependencies = [ "futures", "log", "object_store", - "parking_lot 0.12.4", + "parking_lot", "rand 0.8.5", "tempfile", "url", @@ -2659,7 +2648,7 @@ version = "46.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adfc2d074d5ee4d9354fdcc9283d5b2b9037849237ddecb8942a29144b77ca05" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow", "datafusion-common", "datafusion-doc", @@ -2680,7 +2669,7 @@ version = "46.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cbceba0f98d921309a9121b702bcd49289d383684cccabf9a92cda1602f3bbb" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow", "datafusion-common", "datafusion-expr-common", @@ -2720,7 +2709,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-plan", - "parking_lot 0.12.4", + "parking_lot", "paste", ] @@ -2787,7 +2776,7 @@ version = "46.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1447c2c6bc8674a16be4786b4abf528c302803fafa186aa6275692570e64d85" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow", "datafusion-common", "datafusion-expr", @@ -2809,7 +2798,7 @@ version = "46.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f8c25dcd069073a75b3d2840a79d0f81e64bdd2c05f2d3d18939afb36a7dcb" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow", "datafusion-common", "datafusion-expr-common", @@ -2842,7 +2831,7 @@ version = "46.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88cc160df00e413e370b3b259c8ea7bfbebc134d32de16325950e9e923846b7f" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow", "arrow-ord", "arrow-schema", @@ -2861,7 +2850,7 @@ dependencies = [ "indexmap 2.9.0", "itertools 0.14.0", "log", - "parking_lot 0.12.4", + "parking_lot", "pin-project-lite", "tokio", ] @@ -3412,7 +3401,7 @@ dependencies = [ "futures-util", "generational-box", "once_cell", - "parking_lot 0.12.4", + "parking_lot", "rustc-hash 1.1.0", "tracing", "warnings", @@ -3646,7 +3635,6 @@ dependencies = [ "policy", "protos", "rand 0.9.1", - "reed-solomon-erasure", "reed-solomon-simd", "regex", "reqwest", @@ -4244,7 +4232,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a673cf4fb0ea6a91aa86c08695756dfe875277a912cdbf33db9a9f62d47ed82b" dependencies = [ - "parking_lot 0.12.4", + "parking_lot", "tracing", ] @@ -4612,9 +4600,6 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" -dependencies = [ - "ahash 0.7.8", -] [[package]] name = "hashbrown" @@ -4622,7 +4607,7 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ - "ahash 0.8.12", + "ahash", "allocator-api2", ] @@ -5723,15 +5708,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3bd0dd2cd90571056fdb71f6275fada10131182f84899f4b2a916e565d81d86" -[[package]] -name = "lru" -version = "0.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a" -dependencies = [ - "hashbrown 0.12.3", -] - [[package]] name = "lru" version = "0.12.5" @@ -6611,7 +6587,7 @@ dependencies = [ "futures", "humantime", "itertools 0.13.0", - "parking_lot 0.12.4", + "parking_lot", "percent-encoding", "snafu", "tokio", @@ -6821,17 +6797,6 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" -[[package]] -name = "parking_lot" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core 0.8.6", -] - [[package]] name = "parking_lot" version = "0.12.4" @@ -6839,21 +6804,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" dependencies = [ "lock_api", - "parking_lot_core 0.9.11", -] - -[[package]] -name = "parking_lot_core" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" -dependencies = [ - "cfg-if", - "instant", - "libc", - "redox_syscall 0.2.16", - "smallvec", - "winapi", + "parking_lot_core", ] [[package]] @@ -6875,7 +6826,7 @@ version = "54.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfb15796ac6f56b429fd99e33ba133783ad75b27c36b4b5ce06f1f82cc97754e" dependencies = [ - "ahash 0.8.12", + "ahash", "arrow-array", "arrow-buffer", "arrow-cast", @@ -7554,7 +7505,7 @@ dependencies = [ "derive_builder", "futures", "lazy_static", - "parking_lot 0.12.4", + "parking_lot", "s3s", "snafu", "tokio", @@ -7840,15 +7791,6 @@ dependencies = [ "syn 2.0.103", ] -[[package]] -name = "redox_syscall" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "redox_syscall" version = "0.3.5" @@ -7878,21 +7820,6 @@ dependencies = [ "thiserror 2.0.12", ] -[[package]] -name = "reed-solomon-erasure" -version = "6.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f" -dependencies = [ - "cc", - "libc", - "libm", - "lru 0.7.8", - "parking_lot 0.11.2", - "smallvec", - "spin", -] - [[package]] name = "reed-solomon-simd" version = "3.0.1" @@ -9485,7 +9412,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf776ba3fa74f83bf4b63c3dcbbf82173db2632ed8452cb2d891d33f459de70f" dependencies = [ "new_debug_unreachable", - "parking_lot 0.12.4", + "parking_lot", "phf_shared 0.11.3", "precomputed-hash", "serde", @@ -9732,7 +9659,7 @@ dependencies = [ "ndk-sys", "objc", "once_cell", - "parking_lot 0.12.4", + "parking_lot", "raw-window-handle 0.5.2", "raw-window-handle 0.6.2", "scopeguard", @@ -10001,7 +9928,7 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot 0.12.4", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", diff --git a/Cargo.toml b/Cargo.toml index 79e32b82..562e71fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -167,7 +167,7 @@ flate2 = "1.1.1" zstd = "0.13.3" lz4 = "1.28.1" rdkafka = { version = "0.37.0", features = ["tokio"] } -reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] } + reed-solomon-simd = { version = "3.0.0" } regex = { version = "1.11.1" } reqwest = { version = "0.12.20", default-features = false, features = [ diff --git a/appauth/src/token.rs b/appauth/src/token.rs index 57d30f41..85c5b2b2 100644 --- a/appauth/src/token.rs +++ b/appauth/src/token.rs @@ -1,8 +1,8 @@ use rsa::Pkcs1v15Encrypt; use rsa::{ + RsaPrivateKey, RsaPublicKey, pkcs8::{DecodePrivateKey, DecodePublicKey}, rand_core::OsRng, - RsaPrivateKey, RsaPublicKey, }; use serde::{Deserialize, Serialize}; use std::io::{Error, Result}; @@ -58,8 +58,8 @@ static TEST_PRIVATE_KEY: &str = "-----BEGIN PRIVATE KEY-----\nMIIEvAIBADANBgkqhk mod tests { use super::*; use rsa::{ - pkcs8::{EncodePrivateKey, EncodePublicKey, LineEnding}, RsaPrivateKey, + pkcs8::{EncodePrivateKey, EncodePublicKey, LineEnding}, }; use std::time::{SystemTime, UNIX_EPOCH}; #[test] diff --git a/crates/notify/examples/full_demo.rs b/crates/notify/examples/full_demo.rs index 225d0bd0..8331150a 100644 --- a/crates/notify/examples/full_demo.rs +++ b/crates/notify/examples/full_demo.rs @@ -5,7 +5,7 @@ use rustfs_notify::factory::{ NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, }; use rustfs_notify::store::DEFAULT_LIMIT; -use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError}; +use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger}; use rustfs_notify::{initialize, notification_system}; use std::time::Duration; use tracing::info; diff --git a/crates/notify/examples/full_demo_one.rs b/crates/notify/examples/full_demo_one.rs index 28d4b88a..93a40c3d 100644 --- a/crates/notify/examples/full_demo_one.rs +++ b/crates/notify/examples/full_demo_one.rs @@ -6,7 +6,7 @@ use rustfs_notify::factory::{ NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, }; use rustfs_notify::store::DEFAULT_LIMIT; -use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError}; +use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger}; use rustfs_notify::{initialize, notification_system}; use std::time::Duration; use tracing::info; diff --git a/crates/notify/examples/webhook.rs b/crates/notify/examples/webhook.rs index 362fe3f4..0357b6cf 100644 --- a/crates/notify/examples/webhook.rs +++ b/crates/notify/examples/webhook.rs @@ -1,9 +1,9 @@ use axum::routing::get; use axum::{ + Router, extract::Json, http::{HeaderMap, Response, StatusCode}, routing::post, - Router, }; use serde_json::Value; use std::time::{SystemTime, UNIX_EPOCH}; diff --git a/crates/notify/src/arn.rs b/crates/notify/src/arn.rs index 4fb85be6..9be689b8 100644 --- a/crates/notify/src/arn.rs +++ b/crates/notify/src/arn.rs @@ -41,7 +41,7 @@ impl TargetID { ARN { target_id: self.clone(), region: region.to_string(), - service: DEFAULT_ARN_SERVICE.to_string(), // Default Service + service: DEFAULT_ARN_SERVICE.to_string(), // Default Service partition: DEFAULT_ARN_PARTITION.to_string(), // Default partition } } @@ -112,7 +112,7 @@ impl ARN { ARN { target_id, region, - service: DEFAULT_ARN_SERVICE.to_string(), // Default is sqs + service: DEFAULT_ARN_SERVICE.to_string(), // Default is sqs partition: DEFAULT_ARN_PARTITION.to_string(), // Default is rustfs partition } } @@ -121,16 +121,10 @@ impl ARN { /// Returns the ARN string in the format "{ARN_PREFIX}:{region}:{target_id}" #[allow(clippy::inherent_to_string)] pub fn to_arn_string(&self) -> String { - if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() - { + if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() { return String::new(); } - format!( - "{}:{}:{}", - ARN_PREFIX, - self.region, - self.target_id.to_id_string() - ) + format!("{}:{}:{}", ARN_PREFIX, self.region, self.target_id.to_id_string()) } /// Parsing ARN from string @@ -162,8 +156,7 @@ impl ARN { impl fmt::Display for ARN { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() - { + if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() { // Returns an empty string if all parts are empty return Ok(()); } diff --git a/crates/notify/src/factory.rs b/crates/notify/src/factory.rs index b40d8550..b21ed419 100644 --- a/crates/notify/src/factory.rs +++ b/crates/notify/src/factory.rs @@ -1,7 +1,7 @@ use crate::store::DEFAULT_LIMIT; use crate::{ error::TargetError, - target::{mqtt::MQTTArgs, webhook::WebhookArgs, Target}, + target::{Target, mqtt::MQTTArgs, webhook::WebhookArgs}, }; use async_trait::async_trait; use ecstore::config::{ENABLE_KEY, ENABLE_ON, KVS}; diff --git a/crates/notify/src/integration.rs b/crates/notify/src/integration.rs index 34bb9be6..b6d24752 100644 --- a/crates/notify/src/integration.rs +++ b/crates/notify/src/integration.rs @@ -1,15 +1,15 @@ use crate::arn::TargetID; use crate::store::{Key, Store}; use crate::{ - error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, Event, - StoreError, Target, + Event, StoreError, Target, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, + rules::BucketNotificationConfig, stream, }; use ecstore::config::{Config, KVS}; use std::collections::HashMap; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant}; -use tokio::sync::{mpsc, RwLock, Semaphore}; +use tokio::sync::{RwLock, Semaphore, mpsc}; use tracing::{debug, error, info, warn}; /// Notify the system of monitoring indicators diff --git a/crates/notify/src/lib.rs b/crates/notify/src/lib.rs index 9d0c5436..74435dee 100644 --- a/crates/notify/src/lib.rs +++ b/crates/notify/src/lib.rs @@ -26,7 +26,7 @@ pub use rules::BucketNotificationConfig; use std::io::IsTerminal; pub use target::Target; -use tracing_subscriber::{fmt, prelude::*, util::SubscriberInitExt, EnvFilter}; +use tracing_subscriber::{EnvFilter, fmt, prelude::*, util::SubscriberInitExt}; /// Initialize the tracing log system /// diff --git a/crates/notify/src/notifier.rs b/crates/notify/src/notifier.rs index a3827f68..db6959d3 100644 --- a/crates/notify/src/notifier.rs +++ b/crates/notify/src/notifier.rs @@ -1,5 +1,5 @@ use crate::arn::TargetID; -use crate::{error::NotificationError, event::Event, rules::RulesMap, target::Target, EventName}; +use crate::{EventName, error::NotificationError, event::Event, rules::RulesMap, target::Target}; use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; use tracing::{debug, error, info, instrument, warn}; diff --git a/crates/notify/src/rules/config.rs b/crates/notify/src/rules/config.rs index 08ff8ed8..2f47326f 100644 --- a/crates/notify/src/rules/config.rs +++ b/crates/notify/src/rules/config.rs @@ -1,11 +1,11 @@ use super::rules_map::RulesMap; // Keep for existing structure if any, or remove if not used use super::xml_config::ParseConfigError as BucketNotificationConfigError; +use crate::EventName; use crate::arn::TargetID; +use crate::rules::NotificationConfiguration; use crate::rules::pattern_rules; use crate::rules::target_id_set; -use crate::rules::NotificationConfiguration; -use crate::EventName; use std::collections::HashMap; use std::io::Read; // Assuming this is the XML config structure diff --git a/crates/notify/src/rules/pattern.rs b/crates/notify/src/rules/pattern.rs index d7031550..2e49d56a 100644 --- a/crates/notify/src/rules/pattern.rs +++ b/crates/notify/src/rules/pattern.rs @@ -78,10 +78,7 @@ mod tests { assert_eq!(new_pattern(Some(""), Some("b")), "*b"); assert_eq!(new_pattern(None, None), ""); assert_eq!(new_pattern(Some("prefix"), Some("suffix")), "prefix*suffix"); - assert_eq!( - new_pattern(Some("prefix/"), Some("/suffix")), - "prefix/*suffix" - ); // prefix/* + */suffix -> prefix/**/suffix -> prefix/*/suffix + assert_eq!(new_pattern(Some("prefix/"), Some("/suffix")), "prefix/*suffix"); // prefix/* + */suffix -> prefix/**/suffix -> prefix/*/suffix } #[test] diff --git a/crates/notify/src/rules/pattern_rules.rs b/crates/notify/src/rules/pattern_rules.rs index da562af6..b720e0f7 100644 --- a/crates/notify/src/rules/pattern_rules.rs +++ b/crates/notify/src/rules/pattern_rules.rs @@ -23,9 +23,7 @@ impl PatternRules { /// Checks if there are any rules that match the given object name. pub fn match_simple(&self, object_name: &str) -> bool { - self.rules - .keys() - .any(|p| pattern::match_simple(p, object_name)) + self.rules.keys().any(|p| pattern::match_simple(p, object_name)) } /// Returns all TargetIDs that match the object name. @@ -61,8 +59,7 @@ impl PatternRules { for (pattern, self_targets) in &self.rules { match other.rules.get(pattern) { Some(other_targets) => { - let diff_targets: TargetIdSet = - self_targets.difference(other_targets).cloned().collect(); + let diff_targets: TargetIdSet = self_targets.difference(other_targets).cloned().collect(); if !diff_targets.is_empty() { result_rules.insert(pattern.clone(), diff_targets); } @@ -73,8 +70,6 @@ impl PatternRules { } } } - PatternRules { - rules: result_rules, - } + PatternRules { rules: result_rules } } } diff --git a/crates/notify/src/rules/xml_config.rs b/crates/notify/src/rules/xml_config.rs index b1f6f471..ea995ca9 100644 --- a/crates/notify/src/rules/xml_config.rs +++ b/crates/notify/src/rules/xml_config.rs @@ -1,5 +1,5 @@ use super::pattern; -use crate::arn::{ArnError, TargetIDError, ARN}; +use crate::arn::{ARN, ArnError, TargetIDError}; use crate::event::EventName; use serde::{Deserialize, Serialize}; use std::collections::HashSet; diff --git a/crates/notify/src/store.rs b/crates/notify/src/store.rs index 1e7bc554..f3816cc7 100644 --- a/crates/notify/src/store.rs +++ b/crates/notify/src/store.rs @@ -1,5 +1,5 @@ use crate::error::StoreError; -use serde::{de::DeserializeOwned, Serialize}; +use serde::{Serialize, de::DeserializeOwned}; use snap::raw::{Decoder, Encoder}; use std::sync::{Arc, RwLock}; use std::{ @@ -195,11 +195,7 @@ impl QueueStore { /// Reads a file for the given key fn read_file(&self, key: &Key) -> Result, StoreError> { let path = self.file_path(key); - debug!( - "Reading file for key: {},path: {}", - key.to_string(), - path.display() - ); + debug!("Reading file for key: {},path: {}", key.to_string(), path.display()); let data = std::fs::read(&path).map_err(|e| { if e.kind() == std::io::ErrorKind::NotFound { StoreError::NotFound @@ -240,13 +236,11 @@ impl QueueStore { }; std::fs::write(&path, &data).map_err(StoreError::Io)?; - let modified = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_nanos() as i64; - let mut entries = self.entries.write().map_err(|_| { - StoreError::Internal("Failed to acquire write lock on entries".to_string()) - })?; + let modified = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as i64; + let mut entries = self + .entries + .write() + .map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?; entries.insert(key.to_string(), modified); debug!("Wrote event to store: {}", key.to_string()); Ok(()) @@ -265,18 +259,16 @@ where let entries = std::fs::read_dir(&self.directory).map_err(StoreError::Io)?; // Get the write lock to update the internal state - let mut entries_map = self.entries.write().map_err(|_| { - StoreError::Internal("Failed to acquire write lock on entries".to_string()) - })?; + let mut entries_map = self + .entries + .write() + .map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?; for entry in entries { let entry = entry.map_err(StoreError::Io)?; let metadata = entry.metadata().map_err(StoreError::Io)?; if metadata.is_file() { let modified = metadata.modified().map_err(StoreError::Io)?; - let unix_nano = modified - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_nanos() as i64; + let unix_nano = modified.duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as i64; let file_name = entry.file_name().to_string_lossy().to_string(); entries_map.insert(file_name, unix_nano); @@ -290,9 +282,10 @@ where fn put(&self, item: T) -> Result { // Check storage limits { - let entries = self.entries.read().map_err(|_| { - StoreError::Internal("Failed to acquire read lock on entries".to_string()) - })?; + let entries = self + .entries + .read() + .map_err(|_| StoreError::Internal("Failed to acquire read lock on entries".to_string()))?; if entries.len() as u64 >= self.entry_limit { return Err(StoreError::LimitExceeded); @@ -307,8 +300,7 @@ where compress: true, }; - let data = - serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?; + let data = serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?; self.write_file(&key, &data)?; Ok(key) @@ -317,9 +309,10 @@ where fn put_multiple(&self, items: Vec) -> Result { // Check storage limits { - let entries = self.entries.read().map_err(|_| { - StoreError::Internal("Failed to acquire read lock on entries".to_string()) - })?; + let entries = self + .entries + .read() + .map_err(|_| StoreError::Internal("Failed to acquire read lock on entries".to_string()))?; if entries.len() as u64 >= self.entry_limit { return Err(StoreError::LimitExceeded); @@ -327,9 +320,7 @@ where } if items.is_empty() { // Or return an error, or a special key? - return Err(StoreError::Internal( - "Cannot put_multiple with empty items list".to_string(), - )); + return Err(StoreError::Internal("Cannot put_multiple with empty items list".to_string())); } let uuid = Uuid::new_v4(); let key = Key { @@ -348,8 +339,7 @@ where for item in items { // If items are Vec, and Event is large, this could be inefficient. // The current get_multiple deserializes one by one. - let item_data = - serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?; + let item_data = serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?; buffer.extend_from_slice(&item_data); // If using JSON array: buffer = serde_json::to_vec(&items)? } @@ -374,9 +364,7 @@ where debug!("Reading items from store for key: {}", key.to_string()); let data = self.read_file(key)?; if data.is_empty() { - return Err(StoreError::Deserialization( - "Cannot deserialize empty data".to_string(), - )); + return Err(StoreError::Deserialization("Cannot deserialize empty data".to_string())); } let mut items = Vec::with_capacity(key.item_count); @@ -395,10 +383,7 @@ where match deserializer.next() { Some(Ok(item)) => items.push(item), Some(Err(e)) => { - return Err(StoreError::Deserialization(format!( - "Failed to deserialize item in batch: {}", - e - ))); + return Err(StoreError::Deserialization(format!("Failed to deserialize item in batch: {}", e))); } None => { // Reached end of stream sooner than item_count @@ -435,7 +420,10 @@ where std::fs::remove_file(&path).map_err(|e| { if e.kind() == std::io::ErrorKind::NotFound { // If file not found, still try to remove from entries map in case of inconsistency - warn!("File not found for key {} during del, but proceeding to remove from entries map.", key.to_string()); + warn!( + "File not found for key {} during del, but proceeding to remove from entries map.", + key.to_string() + ); StoreError::NotFound } else { StoreError::Io(e) @@ -443,17 +431,15 @@ where })?; // Get the write lock to update the internal state - let mut entries = self.entries.write().map_err(|_| { - StoreError::Internal("Failed to acquire write lock on entries".to_string()) - })?; + let mut entries = self + .entries + .write() + .map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?; if entries.remove(&key.to_string()).is_none() { // Key was not in the map, could be an inconsistency or already deleted. // This is not necessarily an error if the file deletion succeeded or was NotFound. - debug!( - "Key {} not found in entries map during del, might have been already removed.", - key - ); + debug!("Key {} not found in entries map during del, might have been already removed.", key); } debug!("Deleted event from store: {}", key.to_string()); Ok(()) @@ -492,7 +478,6 @@ where } fn boxed_clone(&self) -> Box + Send + Sync> { - Box::new(self.clone()) - as Box + Send + Sync> + Box::new(self.clone()) as Box + Send + Sync> } } diff --git a/crates/notify/src/stream.rs b/crates/notify/src/stream.rs index 02cdc2e9..f02001ce 100644 --- a/crates/notify/src/stream.rs +++ b/crates/notify/src/stream.rs @@ -1,13 +1,13 @@ use crate::{ - error::TargetError, integration::NotificationMetrics, + Event, StoreError, + error::TargetError, + integration::NotificationMetrics, store::{Key, Store}, target::Target, - Event, - StoreError, }; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::{mpsc, Semaphore}; +use tokio::sync::{Semaphore, mpsc}; use tokio::time::sleep; use tracing::{debug, error, info, warn}; diff --git a/crates/notify/src/target/mqtt.rs b/crates/notify/src/target/mqtt.rs index 82ce8f73..319cd5c8 100644 --- a/crates/notify/src/target/mqtt.rs +++ b/crates/notify/src/target/mqtt.rs @@ -1,22 +1,22 @@ use crate::store::{Key, STORE_EXTENSION}; use crate::target::ChannelTargetType; use crate::{ - arn::TargetID, error::TargetError, + StoreError, Target, + arn::TargetID, + error::TargetError, event::{Event, EventLog}, store::Store, - StoreError, - Target, }; use async_trait::async_trait; -use rumqttc::{mqttbytes::Error as MqttBytesError, ConnectionError}; use rumqttc::{AsyncClient, EventLoop, MqttOptions, Outgoing, Packet, QoS}; +use rumqttc::{ConnectionError, mqttbytes::Error as MqttBytesError}; use std::sync::Arc; use std::{ path::PathBuf, sync::atomic::{AtomicBool, Ordering}, time::Duration, }; -use tokio::sync::{mpsc, Mutex, OnceCell}; +use tokio::sync::{Mutex, OnceCell, mpsc}; use tracing::{debug, error, info, instrument, trace, warn}; use url::Url; use urlencoding; @@ -58,24 +58,19 @@ impl MQTTArgs { match self.broker.scheme() { "ws" | "wss" | "tcp" | "ssl" | "tls" | "tcps" | "mqtt" | "mqtts" => {} _ => { - return Err(TargetError::Configuration( - "unknown protocol in broker address".to_string(), - )); + return Err(TargetError::Configuration("unknown protocol in broker address".to_string())); } } if !self.queue_dir.is_empty() { let path = std::path::Path::new(&self.queue_dir); if !path.is_absolute() { - return Err(TargetError::Configuration( - "mqtt queueDir path should be absolute".to_string(), - )); + return Err(TargetError::Configuration("mqtt queueDir path should be absolute".to_string())); } if self.qos == QoS::AtMostOnce { return Err(TargetError::Configuration( - "QoS should be AtLeastOnce (1) or ExactlyOnce (2) if queueDir is set" - .to_string(), + "QoS should be AtLeastOnce (1) or ExactlyOnce (2) if queueDir is set".to_string(), )); } } @@ -107,21 +102,12 @@ impl MQTTTarget { let target_id = TargetID::new(id.clone(), ChannelTargetType::Mqtt.as_str().to_string()); let queue_store = if !args.queue_dir.is_empty() { let base_path = PathBuf::from(&args.queue_dir); - let unique_dir_name = format!( - "rustfs-{}-{}-{}", - ChannelTargetType::Mqtt.as_str(), - target_id.name, - target_id.id - ) - .replace(":", "_"); + let unique_dir_name = + format!("rustfs-{}-{}-{}", ChannelTargetType::Mqtt.as_str(), target_id.name, target_id.id).replace(":", "_"); // Ensure the directory name is valid for filesystem let specific_queue_path = base_path.join(unique_dir_name); debug!(target_id = %target_id, path = %specific_queue_path.display(), "Initializing queue store for MQTT target"); - let store = crate::store::QueueStore::::new( - specific_queue_path, - args.queue_limit, - STORE_EXTENSION, - ); + let store = crate::store::QueueStore::::new(specific_queue_path, args.queue_limit, STORE_EXTENSION); if let Err(e) = store.open() { error!( target_id = %target_id, @@ -130,10 +116,7 @@ impl MQTTTarget { ); return Err(TargetError::Storage(format!("{}", e))); } - Some(Box::new(store) - as Box< - dyn Store + Send + Sync, - >) + Some(Box::new(store) as Box + Send + Sync>) } else { None }; @@ -175,18 +158,13 @@ impl MQTTTarget { debug!(target_id = %target_id_clone, "Initializing MQTT background task."); let host = args_clone.broker.host_str().unwrap_or("localhost"); let port = args_clone.broker.port().unwrap_or(1883); - let mut mqtt_options = MqttOptions::new( - format!("rustfs_notify_{}", uuid::Uuid::new_v4()), - host, - port, - ); + let mut mqtt_options = MqttOptions::new(format!("rustfs_notify_{}", uuid::Uuid::new_v4()), host, port); mqtt_options .set_keep_alive(args_clone.keep_alive) .set_max_packet_size(100 * 1024 * 1024, 100 * 1024 * 1024); // 100MB if !args_clone.username.is_empty() { - mqtt_options - .set_credentials(args_clone.username.clone(), args_clone.password.clone()); + mqtt_options.set_credentials(args_clone.username.clone(), args_clone.password.clone()); } let (new_client, eventloop) = AsyncClient::new(mqtt_options, 10); @@ -206,12 +184,8 @@ impl MQTTTarget { *client_arc.lock().await = Some(new_client.clone()); info!(target_id = %target_id_clone, "Spawning MQTT event loop task."); - let task_handle = tokio::spawn(run_mqtt_event_loop( - eventloop, - connected_arc.clone(), - target_id_clone.clone(), - cancel_rx, - )); + let task_handle = + tokio::spawn(run_mqtt_event_loop(eventloop, connected_arc.clone(), target_id_clone.clone(), cancel_rx)); Ok(task_handle) }) .await @@ -266,17 +240,13 @@ impl MQTTTarget { records: vec![event.clone()], }; - let data = serde_json::to_vec(&log) - .map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {}", e)))?; + let data = + serde_json::to_vec(&log).map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {}", e)))?; // Vec Convert to String, only for printing logs - let data_string = String::from_utf8(data.clone()).map_err(|e| { - TargetError::Encoding(format!("Failed to convert event data to UTF-8: {}", e)) - })?; - debug!( - "Sending event to mqtt target: {}, event log: {}", - self.id, data_string - ); + let data_string = String::from_utf8(data.clone()) + .map_err(|e| TargetError::Encoding(format!("Failed to convert event data to UTF-8: {}", e)))?; + debug!("Sending event to mqtt target: {}, event log: {}", self.id, data_string); client .publish(&self.args.topic, self.args.qos, false, data) @@ -474,9 +444,7 @@ impl Target for MQTTTarget { if let Some(handle) = self.bg_task_manager.init_cell.get() { if handle.is_finished() { error!(target_id = %self.id, "MQTT background task has finished, possibly due to an error. Target is not active."); - return Err(TargetError::Network( - "MQTT background task terminated".to_string(), - )); + return Err(TargetError::Network("MQTT background task terminated".to_string())); } } debug!(target_id = %self.id, "MQTT client not yet initialized or task not running/connected."); @@ -507,10 +475,7 @@ impl Target for MQTTTarget { } Err(e) => { error!(target_id = %self.id, error = %e, "Failed to save event to store"); - return Err(TargetError::Storage(format!( - "Failed to save event to store: {}", - e - ))); + return Err(TargetError::Storage(format!("Failed to save event to store: {}", e))); } } } else { @@ -581,10 +546,7 @@ impl Target for MQTTTarget { error = %e, "Failed to get event from store" ); - return Err(TargetError::Storage(format!( - "Failed to get event from store: {}", - e - ))); + return Err(TargetError::Storage(format!("Failed to get event from store: {}", e))); } }; @@ -608,10 +570,7 @@ impl Target for MQTTTarget { } Err(e) => { error!(target_id = %self.id, error = %e, "Failed to delete event from store after send."); - return Err(TargetError::Storage(format!( - "Failed to delete event from store: {}", - e - ))); + return Err(TargetError::Storage(format!("Failed to delete event from store: {}", e))); } } diff --git a/crates/notify/src/target/webhook.rs b/crates/notify/src/target/webhook.rs index 9413067d..29c5f7cb 100644 --- a/crates/notify/src/target/webhook.rs +++ b/crates/notify/src/target/webhook.rs @@ -1,19 +1,19 @@ use crate::store::STORE_EXTENSION; use crate::target::ChannelTargetType; use crate::{ - arn::TargetID, error::TargetError, + StoreError, Target, + arn::TargetID, + error::TargetError, event::{Event, EventLog}, store::{Key, Store}, - StoreError, - Target, }; use async_trait::async_trait; use reqwest::{Client, StatusCode, Url}; use std::{ path::PathBuf, sync::{ - atomic::{AtomicBool, Ordering}, Arc, + atomic::{AtomicBool, Ordering}, }, time::Duration, }; diff --git a/crates/obs/src/metrics/audit.rs b/crates/obs/src/metrics/audit.rs index b365607c..5bc3ee39 100644 --- a/crates/obs/src/metrics/audit.rs +++ b/crates/obs/src/metrics/audit.rs @@ -1,7 +1,7 @@ /// audit related metric descriptors /// /// This module contains the metric descriptors for the audit subsystem. -use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; const TARGET_ID: &str = "target_id"; diff --git a/crates/obs/src/metrics/bucket.rs b/crates/obs/src/metrics/bucket.rs index 4f63e5ea..d008e89b 100644 --- a/crates/obs/src/metrics/bucket.rs +++ b/crates/obs/src/metrics/bucket.rs @@ -1,5 +1,5 @@ /// bucket level s3 metric descriptor -use crate::metrics::{new_counter_md, new_gauge_md, new_histogram_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, new_histogram_md, subsystems}; lazy_static::lazy_static! { pub static ref BUCKET_API_TRAFFIC_SENT_BYTES_MD: MetricDescriptor = diff --git a/crates/obs/src/metrics/bucket_replication.rs b/crates/obs/src/metrics/bucket_replication.rs index 8af8e420..53872ebc 100644 --- a/crates/obs/src/metrics/bucket_replication.rs +++ b/crates/obs/src/metrics/bucket_replication.rs @@ -1,5 +1,5 @@ /// Bucket copy metric descriptor -use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; /// Bucket level replication metric descriptor pub const BUCKET_L: &str = "bucket"; diff --git a/crates/obs/src/metrics/cluster_config.rs b/crates/obs/src/metrics/cluster_config.rs index d5e099cc..b9bc8ea7 100644 --- a/crates/obs/src/metrics/cluster_config.rs +++ b/crates/obs/src/metrics/cluster_config.rs @@ -1,5 +1,5 @@ /// Metric descriptors related to cluster configuration -use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; lazy_static::lazy_static! { pub static ref CONFIG_RRS_PARITY_MD: MetricDescriptor = diff --git a/crates/obs/src/metrics/cluster_erasure_set.rs b/crates/obs/src/metrics/cluster_erasure_set.rs index a3ad799d..9f129cf7 100644 --- a/crates/obs/src/metrics/cluster_erasure_set.rs +++ b/crates/obs/src/metrics/cluster_erasure_set.rs @@ -1,5 +1,5 @@ /// Erasure code set related metric descriptors -use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; /// The label for the pool ID pub const POOL_ID_L: &str = "pool_id"; diff --git a/crates/obs/src/metrics/cluster_health.rs b/crates/obs/src/metrics/cluster_health.rs index dfe9b280..96417601 100644 --- a/crates/obs/src/metrics/cluster_health.rs +++ b/crates/obs/src/metrics/cluster_health.rs @@ -1,5 +1,5 @@ /// Cluster health-related metric descriptors -use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; lazy_static::lazy_static! { pub static ref HEALTH_DRIVES_OFFLINE_COUNT_MD: MetricDescriptor = diff --git a/crates/obs/src/metrics/cluster_iam.rs b/crates/obs/src/metrics/cluster_iam.rs index f2a9d915..29a15cf8 100644 --- a/crates/obs/src/metrics/cluster_iam.rs +++ b/crates/obs/src/metrics/cluster_iam.rs @@ -1,5 +1,5 @@ /// IAM related metric descriptors -use crate::metrics::{new_counter_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems}; lazy_static::lazy_static! { pub static ref LAST_SYNC_DURATION_MILLIS_MD: MetricDescriptor = diff --git a/crates/obs/src/metrics/cluster_notification.rs b/crates/obs/src/metrics/cluster_notification.rs index 1a276d0b..9db517c1 100644 --- a/crates/obs/src/metrics/cluster_notification.rs +++ b/crates/obs/src/metrics/cluster_notification.rs @@ -1,5 +1,5 @@ /// Notify the relevant metric descriptor -use crate::metrics::{new_counter_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems}; lazy_static::lazy_static! { pub static ref NOTIFICATION_CURRENT_SEND_IN_PROGRESS_MD: MetricDescriptor = diff --git a/crates/obs/src/metrics/cluster_usage.rs b/crates/obs/src/metrics/cluster_usage.rs index 5f63bf5c..351e23d5 100644 --- a/crates/obs/src/metrics/cluster_usage.rs +++ b/crates/obs/src/metrics/cluster_usage.rs @@ -1,5 +1,5 @@ /// Descriptors of metrics related to cluster object and bucket usage -use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; /// Bucket labels pub const BUCKET_LABEL: &str = "bucket"; diff --git a/crates/obs/src/metrics/ilm.rs b/crates/obs/src/metrics/ilm.rs index 8e2277c0..d9a5b9a9 100644 --- a/crates/obs/src/metrics/ilm.rs +++ b/crates/obs/src/metrics/ilm.rs @@ -1,5 +1,5 @@ /// ILM-related metric descriptors -use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; lazy_static::lazy_static! { pub static ref ILM_EXPIRY_PENDING_TASKS_MD: MetricDescriptor = diff --git a/crates/obs/src/metrics/logger_webhook.rs b/crates/obs/src/metrics/logger_webhook.rs index 6ac238ed..985642a6 100644 --- a/crates/obs/src/metrics/logger_webhook.rs +++ b/crates/obs/src/metrics/logger_webhook.rs @@ -1,5 +1,5 @@ /// A descriptor for metrics related to webhook logs -use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; /// Define label constants for webhook metrics /// name label diff --git a/crates/obs/src/metrics/mod.rs b/crates/obs/src/metrics/mod.rs index c0052769..150b3daf 100644 --- a/crates/obs/src/metrics/mod.rs +++ b/crates/obs/src/metrics/mod.rs @@ -23,6 +23,6 @@ pub use entry::descriptor::MetricDescriptor; pub use entry::metric_name::MetricName; pub use entry::metric_type::MetricType; pub use entry::namespace::MetricNamespace; -pub use entry::subsystem::subsystems; pub use entry::subsystem::MetricSubsystem; +pub use entry::subsystem::subsystems; pub use entry::{new_counter_md, new_gauge_md, new_histogram_md}; diff --git a/crates/obs/src/metrics/replication.rs b/crates/obs/src/metrics/replication.rs index 08195bc0..c688ff56 100644 --- a/crates/obs/src/metrics/replication.rs +++ b/crates/obs/src/metrics/replication.rs @@ -1,5 +1,5 @@ /// Copy the relevant metric descriptor -use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; lazy_static::lazy_static! { pub static ref REPLICATION_AVERAGE_ACTIVE_WORKERS_MD: MetricDescriptor = diff --git a/crates/obs/src/metrics/request.rs b/crates/obs/src/metrics/request.rs index c508db64..b96e66df 100644 --- a/crates/obs/src/metrics/request.rs +++ b/crates/obs/src/metrics/request.rs @@ -1,4 +1,4 @@ -use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName, MetricSubsystem}; +use crate::metrics::{MetricDescriptor, MetricName, MetricSubsystem, new_counter_md, new_gauge_md, subsystems}; lazy_static::lazy_static! { pub static ref API_REJECTED_AUTH_TOTAL_MD: MetricDescriptor = diff --git a/crates/obs/src/metrics/scanner.rs b/crates/obs/src/metrics/scanner.rs index 91f247e7..e9136903 100644 --- a/crates/obs/src/metrics/scanner.rs +++ b/crates/obs/src/metrics/scanner.rs @@ -1,5 +1,5 @@ /// Scanner-related metric descriptors -use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; lazy_static::lazy_static! { pub static ref SCANNER_BUCKET_SCANS_FINISHED_MD: MetricDescriptor = diff --git a/crates/obs/src/metrics/system_cpu.rs b/crates/obs/src/metrics/system_cpu.rs index 37f42aad..b75b4552 100644 --- a/crates/obs/src/metrics/system_cpu.rs +++ b/crates/obs/src/metrics/system_cpu.rs @@ -1,5 +1,5 @@ /// CPU system-related metric descriptors -use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; lazy_static::lazy_static! { pub static ref SYS_CPU_AVG_IDLE_MD: MetricDescriptor = diff --git a/crates/obs/src/metrics/system_drive.rs b/crates/obs/src/metrics/system_drive.rs index 181b1b5b..09eaa7c6 100644 --- a/crates/obs/src/metrics/system_drive.rs +++ b/crates/obs/src/metrics/system_drive.rs @@ -1,5 +1,5 @@ /// Drive-related metric descriptors -use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; /// drive related labels pub const DRIVE_LABEL: &str = "drive"; diff --git a/crates/obs/src/metrics/system_memory.rs b/crates/obs/src/metrics/system_memory.rs index 40f1b38a..4e062a95 100644 --- a/crates/obs/src/metrics/system_memory.rs +++ b/crates/obs/src/metrics/system_memory.rs @@ -1,5 +1,5 @@ /// Memory-related metric descriptors -use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems}; lazy_static::lazy_static! { pub static ref MEM_TOTAL_MD: MetricDescriptor = diff --git a/crates/obs/src/metrics/system_network.rs b/crates/obs/src/metrics/system_network.rs index 9d2631ce..b3657e72 100644 --- a/crates/obs/src/metrics/system_network.rs +++ b/crates/obs/src/metrics/system_network.rs @@ -1,5 +1,5 @@ /// Network-related metric descriptors -use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; lazy_static::lazy_static! { pub static ref INTERNODE_ERRORS_TOTAL_MD: MetricDescriptor = diff --git a/crates/obs/src/metrics/system_process.rs b/crates/obs/src/metrics/system_process.rs index 00483b50..f021aabe 100644 --- a/crates/obs/src/metrics/system_process.rs +++ b/crates/obs/src/metrics/system_process.rs @@ -1,5 +1,5 @@ /// process related metric descriptors -use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName}; +use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems}; lazy_static::lazy_static! { pub static ref PROCESS_LOCKS_READ_TOTAL_MD: MetricDescriptor = diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index b2b0495b..b9ed9e9b 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -1,19 +1,19 @@ use crate::OtelConfig; -use flexi_logger::{style, Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode}; +use flexi_logger::{Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode, style}; use nu_ansi_term::Color; use opentelemetry::trace::TracerProvider; -use opentelemetry::{global, KeyValue}; +use opentelemetry::{KeyValue, global}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::{ + Resource, metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider}, trace::{RandomIdGenerator, Sampler, SdkTracerProvider}, - Resource, }; use opentelemetry_semantic_conventions::{ - attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION}, SCHEMA_URL, + attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION}, }; use rustfs_config::{ APP_NAME, DEFAULT_LOG_DIR, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, @@ -28,7 +28,7 @@ use tracing_error::ErrorLayer; use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::fmt::time::LocalTime; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; +use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt}; /// A guard object that manages the lifecycle of OpenTelemetry components. /// diff --git a/crates/utils/src/sys/mod.rs b/crates/utils/src/sys/mod.rs index 5b5cd9b3..93d3cdad 100644 --- a/crates/utils/src/sys/mod.rs +++ b/crates/utils/src/sys/mod.rs @@ -1,4 +1,4 @@ mod user_agent; -pub use user_agent::get_user_agent; pub use user_agent::ServiceType; +pub use user_agent::get_user_agent; diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index 214ac302..48e86358 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -11,9 +11,7 @@ rust-version.workspace = true workspace = true [features] -default = ["reed-solomon-simd"] -reed-solomon-simd = [] -reed-solomon-erasure = [] +default = [] [dependencies] rustfs-config = { workspace = true, features = ["constants"] } @@ -40,7 +38,6 @@ http.workspace = true highway = { workspace = true } url.workspace = true uuid = { workspace = true, features = ["v4", "fast-rng", "serde"] } -reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] } reed-solomon-simd = { version = "3.0.0" } transform-stream = "0.3.1" lazy_static.workspace = true diff --git a/ecstore/README_cn.md b/ecstore/README_cn.md index a6a0a0bd..27ca55b4 100644 --- a/ecstore/README_cn.md +++ b/ecstore/README_cn.md @@ -1,38 +1,15 @@ # ECStore - Erasure Coding Storage -ECStore provides erasure coding functionality for the RustFS project, supporting multiple Reed-Solomon implementations for optimal performance and compatibility. +ECStore provides erasure coding functionality for the RustFS project, using high-performance Reed-Solomon SIMD implementation for optimal performance. -## Reed-Solomon Implementations +## Reed-Solomon Implementation -### Available Backends +### SIMD Backend (Only) -#### `reed-solomon-erasure` (Default) -- **Stability**: Mature and well-tested implementation -- **Performance**: Good performance with SIMD acceleration when available -- **Compatibility**: Works with any shard size -- **Memory**: Efficient memory usage -- **Use case**: Recommended for production use - -#### `reed-solomon-simd` (Optional) -- **Performance**: Optimized SIMD implementation for maximum speed -- **Limitations**: Has restrictions on shard sizes (must be >= 64 bytes typically) -- **Memory**: May use more memory for small shards -- **Use case**: Best for large data blocks where performance is critical - -### Feature Flags - -Configure the Reed-Solomon implementation using Cargo features: - -```toml -# Use default implementation (reed-solomon-erasure) -ecstore = "0.0.1" - -# Use SIMD implementation for maximum performance -ecstore = { version = "0.0.1", features = ["reed-solomon-simd"], default-features = false } - -# Use traditional implementation explicitly -ecstore = { version = "0.0.1", features = ["reed-solomon-erasure"], default-features = false } -``` +- **Performance**: Uses SIMD optimization for high-performance encoding/decoding +- **Compatibility**: Works with any shard size through SIMD implementation +- **Reliability**: High-performance SIMD implementation for large data processing +- **Use case**: Optimized for maximum performance in large data processing scenarios ### Usage Example @@ -68,42 +45,52 @@ assert_eq!(&recovered, data); ## Performance Considerations -### When to use `reed-solomon-simd` -- Large block sizes (>= 1KB recommended) -- High-throughput scenarios -- CPU-intensive workloads where encoding/decoding is the bottleneck - -### When to use `reed-solomon-erasure` -- Small block sizes -- Memory-constrained environments -- General-purpose usage -- Production deployments requiring maximum stability +### SIMD Implementation Benefits +- **High Throughput**: Optimized for large block sizes (>= 1KB recommended) +- **CPU Optimization**: Leverages modern CPU SIMD instructions +- **Scalability**: Excellent performance for high-throughput scenarios ### Implementation Details -#### `reed-solomon-erasure` -- **Instance Reuse**: The encoder instance is cached and reused across multiple operations -- **Thread Safety**: Thread-safe with interior mutability -- **Memory Efficiency**: Lower memory footprint for small data - #### `reed-solomon-simd` -- **Instance Creation**: New encoder/decoder instances are created for each operation -- **API Design**: The SIMD implementation's API is designed for single-use instances -- **Performance Trade-off**: While instances are created per operation, the SIMD optimizations provide significant performance benefits for large data blocks -- **Optimization**: Future versions may implement instance pooling if the underlying API supports reuse +- **Instance Caching**: Encoder/decoder instances are cached and reused for optimal performance +- **Thread Safety**: Thread-safe with RwLock-based caching +- **SIMD Optimization**: Leverages CPU SIMD instructions for maximum performance +- **Reset Capability**: Cached instances are reset for different parameters, avoiding unnecessary allocations ### Performance Tips 1. **Batch Operations**: When possible, batch multiple small operations into larger blocks -2. **Block Size Optimization**: Use block sizes that are multiples of 64 bytes for SIMD implementations +2. **Block Size Optimization**: Use block sizes that are multiples of 64 bytes for optimal SIMD performance 3. **Memory Allocation**: Pre-allocate buffers when processing multiple blocks -4. **Feature Selection**: Choose the appropriate feature based on your data size and performance requirements +4. **Cache Warming**: Initial operations may be slower due to cache setup, subsequent operations benefit from caching ## Cross-Platform Compatibility -Both implementations support: -- x86_64 with SIMD acceleration -- aarch64 (ARM64) with optimizations +The SIMD implementation supports: +- x86_64 with advanced SIMD instructions (AVX2, SSE) +- aarch64 (ARM64) with NEON SIMD optimizations - Other architectures with fallback implementations -The `reed-solomon-erasure` implementation provides better cross-platform compatibility and is recommended for most use cases. \ No newline at end of file +The implementation automatically selects the best available SIMD instructions for the target platform, providing optimal performance across different architectures. + +## Testing and Benchmarking + +Run performance benchmarks: +```bash +# Run erasure coding benchmarks +cargo bench --bench erasure_benchmark + +# Run comparison benchmarks +cargo bench --bench comparison_benchmark + +# Generate benchmark reports +./run_benchmarks.sh +``` + +## Error Handling + +All operations return `Result` types with comprehensive error information: +- Encoding errors: Invalid parameters, insufficient memory +- Decoding errors: Too many missing shards, corrupted data +- Configuration errors: Invalid shard counts, unsupported parameters \ No newline at end of file diff --git a/ecstore/benches/comparison_benchmark.rs b/ecstore/benches/comparison_benchmark.rs index 42147266..201c8a1b 100644 --- a/ecstore/benches/comparison_benchmark.rs +++ b/ecstore/benches/comparison_benchmark.rs @@ -12,7 +12,7 @@ //! cargo bench --bench comparison_benchmark --features reed-solomon-simd //! //! # 测试强制 erasure-only 模式 -//! cargo bench --bench comparison_benchmark --features reed-solomon-erasure +//! cargo bench --bench comparison_benchmark //! //! # 生成对比报告 //! cargo bench --bench comparison_benchmark -- --save-baseline erasure diff --git a/ecstore/benches/erasure_benchmark.rs b/ecstore/benches/erasure_benchmark.rs index a2d0fcba..6ffaa6f6 100644 --- a/ecstore/benches/erasure_benchmark.rs +++ b/ecstore/benches/erasure_benchmark.rs @@ -1,7 +1,7 @@ //! Reed-Solomon erasure coding performance benchmarks. //! //! This benchmark compares the performance of different Reed-Solomon implementations: -//! - Default (Pure erasure): Stable reed-solomon-erasure implementation +//! - SIMD mode: High-performance reed-solomon-simd implementation //! - `reed-solomon-simd` feature: SIMD mode with optimized performance //! //! ## Running Benchmarks @@ -235,7 +235,7 @@ fn bench_decode_performance(c: &mut Criterion) { group.finish(); // 如果使用混合模式(默认),测试SIMD解码性能 - #[cfg(not(feature = "reed-solomon-erasure"))] + { let shard_size = calc_shard_size(config.data_size, config.data_shards); if shard_size >= 512 { diff --git a/ecstore/run_benchmarks.sh b/ecstore/run_benchmarks.sh index f4b091be..ddf58fb9 100755 --- a/ecstore/run_benchmarks.sh +++ b/ecstore/run_benchmarks.sh @@ -1,54 +1,48 @@ #!/bin/bash -# Reed-Solomon 实现性能比较脚本 -# -# 这个脚本将运行不同的基准测试来比较SIMD模式和纯Erasure模式的性能 -# -# 使用方法: -# ./run_benchmarks.sh [quick|full|comparison] -# -# quick - 快速测试主要场景 -# full - 完整基准测试套件 -# comparison - 专门对比两种实现模式 +# Reed-Solomon SIMD 性能基准测试脚本 +# 使用高性能 SIMD 实现进行纠删码性能测试 set -e -# 颜色输出 +# ANSI 颜色码 RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' BLUE='\033[0;34m' +PURPLE='\033[0;35m' NC='\033[0m' # No Color -# 输出带颜色的信息 +# 打印带颜色的消息 print_info() { - echo -e "${BLUE}[INFO]${NC} $1" + echo -e "${BLUE}ℹ️ $1${NC}" } print_success() { - echo -e "${GREEN}[SUCCESS]${NC} $1" + echo -e "${GREEN}✅ $1${NC}" } print_warning() { - echo -e "${YELLOW}[WARNING]${NC} $1" + echo -e "${YELLOW}⚠️ $1${NC}" } print_error() { - echo -e "${RED}[ERROR]${NC} $1" + echo -e "${RED}❌ $1${NC}" } -# 检查是否安装了必要工具 +# 检查系统要求 check_requirements() { print_info "检查系统要求..." + # 检查 Rust if ! command -v cargo &> /dev/null; then - print_error "cargo 未安装,请先安装 Rust 工具链" + print_error "Cargo 未找到,请确保已安装 Rust" exit 1 fi - # 检查是否安装了 criterion - if ! grep -q "criterion" Cargo.toml; then - print_error "Cargo.toml 中未找到 criterion 依赖" + # 检查 criterion + if ! cargo --list | grep -q "bench"; then + print_error "未找到基准测试支持,请确保使用的是支持基准测试的 Rust 版本" exit 1 fi @@ -62,28 +56,15 @@ cleanup() { print_success "清理完成" } -# 运行纯 Erasure 模式基准测试 -run_erasure_benchmark() { - print_info "🏛️ 开始运行纯 Erasure 模式基准测试..." - echo "================================================" - - cargo bench --bench comparison_benchmark \ - --features reed-solomon-erasure \ - -- --save-baseline erasure_baseline - - print_success "纯 Erasure 模式基准测试完成" -} - -# 运行SIMD模式基准测试 +# 运行 SIMD 模式基准测试 run_simd_benchmark() { - print_info "🎯 开始运行SIMD模式基准测试..." + print_info "🎯 开始运行 SIMD 模式基准测试..." echo "================================================" cargo bench --bench comparison_benchmark \ - --features reed-solomon-simd \ -- --save-baseline simd_baseline - print_success "SIMD模式基准测试完成" + print_success "SIMD 模式基准测试完成" } # 运行完整的基准测试套件 @@ -91,33 +72,42 @@ run_full_benchmark() { print_info "🚀 开始运行完整基准测试套件..." echo "================================================" - # 运行详细的基准测试(使用默认纯Erasure模式) + # 运行详细的基准测试 cargo bench --bench erasure_benchmark print_success "完整基准测试套件完成" } -# 运行性能对比测试 -run_comparison_benchmark() { - print_info "📊 开始运行性能对比测试..." +# 运行性能测试 +run_performance_test() { + print_info "📊 开始运行性能测试..." echo "================================================" - print_info "步骤 1: 测试纯 Erasure 模式..." + print_info "步骤 1: 运行编码基准测试..." cargo bench --bench comparison_benchmark \ - --features reed-solomon-erasure \ - -- --save-baseline erasure_baseline + -- encode --save-baseline encode_baseline - print_info "步骤 2: 测试SIMD模式并与 Erasure 模式对比..." + print_info "步骤 2: 运行解码基准测试..." cargo bench --bench comparison_benchmark \ - --features reed-solomon-simd \ - -- --baseline erasure_baseline + -- decode --save-baseline decode_baseline - print_success "性能对比测试完成" + print_success "性能测试完成" +} + +# 运行大数据集测试 +run_large_data_test() { + print_info "🗂️ 开始运行大数据集测试..." + echo "================================================" + + cargo bench --bench erasure_benchmark \ + -- large_data --save-baseline large_data_baseline + + print_success "大数据集测试完成" } # 生成比较报告 generate_comparison_report() { - print_info "📊 生成性能比较报告..." + print_info "📊 生成性能报告..." if [ -d "target/criterion" ]; then print_info "基准测试结果已保存到 target/criterion/ 目录" @@ -138,49 +128,48 @@ generate_comparison_report() { run_quick_test() { print_info "🏃 运行快速性能测试..." - print_info "测试纯 Erasure 模式..." + print_info "测试 SIMD 编码性能..." cargo bench --bench comparison_benchmark \ - --features reed-solomon-erasure \ - -- encode_comparison --quick + -- encode --quick - print_info "测试SIMD模式..." + print_info "测试 SIMD 解码性能..." cargo bench --bench comparison_benchmark \ - --features reed-solomon-simd \ - -- encode_comparison --quick + -- decode --quick print_success "快速测试完成" } # 显示帮助信息 show_help() { - echo "Reed-Solomon 性能基准测试脚本" + echo "Reed-Solomon SIMD 性能基准测试脚本" echo "" echo "实现模式:" - echo " 🏛️ 纯 Erasure 模式(默认)- 稳定兼容的 reed-solomon-erasure 实现" - echo " 🎯 SIMD模式 - 高性能SIMD优化实现" + echo " 🎯 SIMD 模式 - 高性能 SIMD 优化的 reed-solomon-simd 实现" echo "" echo "使用方法:" echo " $0 [command]" echo "" echo "命令:" echo " quick 运行快速性能测试" - echo " full 运行完整基准测试套件(默认Erasure模式)" - echo " comparison 运行详细的实现模式对比测试" - echo " erasure 只测试纯 Erasure 模式" - echo " simd 只测试SIMD模式" + echo " full 运行完整基准测试套件" + echo " performance 运行详细的性能测试" + echo " simd 运行 SIMD 模式测试" + echo " large 运行大数据集测试" echo " clean 清理测试结果" echo " help 显示此帮助信息" echo "" echo "示例:" - echo " $0 quick # 快速测试两种模式" - echo " $0 comparison # 详细对比测试" - echo " $0 full # 完整测试套件(默认Erasure模式)" - echo " $0 simd # 只测试SIMD模式" - echo " $0 erasure # 只测试纯 Erasure 模式" + echo " $0 quick # 快速性能测试" + echo " $0 performance # 详细性能测试" + echo " $0 full # 完整测试套件" + echo " $0 simd # SIMD 模式测试" + echo " $0 large # 大数据集测试" echo "" - echo "模式说明:" - echo " Erasure模式: 使用reed-solomon-erasure实现,稳定可靠" - echo " SIMD模式: 使用reed-solomon-simd实现,高性能优化" + echo "实现特性:" + echo " - 使用 reed-solomon-simd 高性能 SIMD 实现" + echo " - 支持编码器/解码器实例缓存" + echo " - 优化的内存管理和线程安全" + echo " - 跨平台 SIMD 指令支持" } # 显示测试配置信息 @@ -196,22 +185,22 @@ show_test_info() { if [ -f "/proc/cpuinfo" ]; then echo " - CPU 型号: $(grep 'model name' /proc/cpuinfo | head -1 | cut -d: -f2 | xargs)" if grep -q "avx2" /proc/cpuinfo; then - echo " - SIMD 支持: AVX2 ✅ (SIMD模式将利用SIMD优化)" + echo " - SIMD 支持: AVX2 ✅ (将使用高级 SIMD 优化)" elif grep -q "sse4" /proc/cpuinfo; then - echo " - SIMD 支持: SSE4 ✅ (SIMD模式将利用SIMD优化)" + echo " - SIMD 支持: SSE4 ✅ (将使用 SIMD 优化)" else - echo " - SIMD 支持: 未检测到高级 SIMD 特性" + echo " - SIMD 支持: 基础 SIMD 特性" fi fi - echo " - 默认模式: 纯Erasure模式 (稳定可靠)" - echo " - 高性能模式: SIMD模式 (性能优化)" + echo " - 实现: reed-solomon-simd (高性能 SIMD 优化)" + echo " - 特性: 实例缓存、线程安全、跨平台 SIMD" echo "" } # 主函数 main() { - print_info "🧪 Reed-Solomon 实现性能基准测试" + print_info "🧪 Reed-Solomon SIMD 实现性能基准测试" echo "================================================" check_requirements @@ -227,14 +216,9 @@ main() { run_full_benchmark generate_comparison_report ;; - "comparison") + "performance") cleanup - run_comparison_benchmark - generate_comparison_report - ;; - "erasure") - cleanup - run_erasure_benchmark + run_performance_test generate_comparison_report ;; "simd") @@ -242,6 +226,11 @@ main() { run_simd_benchmark generate_comparison_report ;; + "large") + cleanup + run_large_data_test + generate_comparison_report + ;; "clean") cleanup ;; @@ -257,10 +246,7 @@ main() { esac print_success "✨ 基准测试执行完成!" - print_info "💡 提示: 推荐使用默认的纯Erasure模式,对于高性能需求可考虑SIMD模式" } -# 如果直接运行此脚本,调用主函数 -if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then - main "$@" -fi \ No newline at end of file +# 启动脚本 +main "$@" \ No newline at end of file diff --git a/ecstore/src/cmd/bucket_replication.rs b/ecstore/src/cmd/bucket_replication.rs index c81e9896..dc788adc 100644 --- a/ecstore/src/cmd/bucket_replication.rs +++ b/ecstore/src/cmd/bucket_replication.rs @@ -1,6 +1,7 @@ #![allow(unused_variables)] #![allow(dead_code)] // use error::Error; +use crate::StorageAPI; use crate::bucket::metadata_sys::get_replication_config; use crate::bucket::versioning_sys::BucketVersioningSys; use crate::error::Error; @@ -11,26 +12,25 @@ use crate::store_api::ObjectIO; use crate::store_api::ObjectInfo; use crate::store_api::ObjectOptions; use crate::store_api::ObjectToDelete; -use crate::StorageAPI; +use aws_sdk_s3::Client as S3Client; +use aws_sdk_s3::Config; use aws_sdk_s3::config::BehaviorVersion; use aws_sdk_s3::config::Credentials; use aws_sdk_s3::config::Region; -use aws_sdk_s3::Client as S3Client; -use aws_sdk_s3::Config; use bytes::Bytes; use chrono::DateTime; use chrono::Duration; use chrono::Utc; -use futures::stream::FuturesUnordered; use futures::StreamExt; +use futures::stream::FuturesUnordered; use http::HeaderMap; use http::Method; use lazy_static::lazy_static; // use std::time::SystemTime; use once_cell::sync::Lazy; use regex::Regex; -use rustfs_rsc::provider::StaticProvider; use rustfs_rsc::Minio; +use rustfs_rsc::provider::StaticProvider; use s3s::dto::DeleteMarkerReplicationStatus; use s3s::dto::DeleteReplicationStatus; use s3s::dto::ExistingObjectReplicationStatus; @@ -43,14 +43,14 @@ use std::collections::HashSet; use std::fmt; use std::iter::Iterator; use std::str::FromStr; +use std::sync::Arc; use std::sync::atomic::AtomicI32; use std::sync::atomic::Ordering; -use std::sync::Arc; use std::vec; use time::OffsetDateTime; -use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::sync::RwLock; +use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task; use tracing::{debug, error, info, warn}; use uuid::Uuid; diff --git a/ecstore/src/config/com.rs b/ecstore/src/config/com.rs index 796f1d38..7ba9e512 100644 --- a/ecstore/src/config/com.rs +++ b/ecstore/src/config/com.rs @@ -1,4 +1,4 @@ -use super::{storageclass, Config, GLOBAL_StorageClass}; +use super::{Config, GLOBAL_StorageClass, storageclass}; use crate::disk::RUSTFS_META_BUCKET; use crate::error::{Error, Result}; use crate::store_api::{ObjectInfo, ObjectOptions, PutObjReader, StorageAPI}; diff --git a/ecstore/src/config/mod.rs b/ecstore/src/config/mod.rs index 5b06ea5d..24e8eb40 100644 --- a/ecstore/src/config/mod.rs +++ b/ecstore/src/config/mod.rs @@ -5,7 +5,7 @@ pub mod storageclass; use crate::error::Result; use crate::store::ECStore; -use com::{lookup_configs, read_config_without_migrate, STORAGE_CLASS_SUB_SYS}; +use com::{STORAGE_CLASS_SUB_SYS, lookup_configs, read_config_without_migrate}; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use std::collections::HashMap; diff --git a/ecstore/src/erasure_coding/erasure.rs b/ecstore/src/erasure_coding/erasure.rs index 2f673d68..89de1224 100644 --- a/ecstore/src/erasure_coding/erasure.rs +++ b/ecstore/src/erasure_coding/erasure.rs @@ -1,27 +1,15 @@ -//! Erasure coding implementation supporting multiple Reed-Solomon backends. +//! Erasure coding implementation using Reed-Solomon SIMD backend. //! -//! This module provides erasure coding functionality with support for two different -//! Reed-Solomon implementations: +//! This module provides erasure coding functionality with high-performance SIMD +//! Reed-Solomon implementation: //! -//! ## Reed-Solomon Implementations +//! ## Reed-Solomon Implementation //! -//! ### Pure Erasure Mode (Default) -//! - **Stability**: Pure erasure implementation, mature and well-tested -//! - **Performance**: Good performance with consistent behavior -//! - **Compatibility**: Works with any shard size -//! - **Use case**: Default behavior, recommended for most production use cases -//! -//! ### SIMD Mode (`reed-solomon-simd` feature) +//! ### SIMD Mode (Only) //! - **Performance**: Uses SIMD optimization for high-performance encoding/decoding //! - **Compatibility**: Works with any shard size through SIMD implementation //! - **Reliability**: High-performance SIMD implementation for large data processing -//! - **Use case**: Use when maximum performance is needed for large data processing -//! -//! ## Feature Flags -//! -//! - Default: Use pure reed-solomon-erasure implementation (stable and reliable) -//! - `reed-solomon-simd`: Use SIMD mode for optimal performance -//! - `reed-solomon-erasure`: Explicitly enable pure erasure mode (same as default) +//! - **Use case**: Optimized for maximum performance in large data processing scenarios //! //! ## Example //! @@ -35,8 +23,6 @@ //! ``` use bytes::{Bytes, BytesMut}; -use reed_solomon_erasure::galois_8::ReedSolomon as ReedSolomonErasure; -#[cfg(feature = "reed-solomon-simd")] use reed_solomon_simd; use smallvec::SmallVec; use std::io; @@ -44,38 +30,23 @@ use tokio::io::AsyncRead; use tracing::warn; use uuid::Uuid; -/// Reed-Solomon encoder variants supporting different implementations. -#[allow(clippy::large_enum_variant)] -pub enum ReedSolomonEncoder { - /// SIMD mode: High-performance SIMD implementation (when reed-solomon-simd feature is enabled) - #[cfg(feature = "reed-solomon-simd")] - SIMD { - data_shards: usize, - parity_shards: usize, - // 使用RwLock确保线程安全,实现Send + Sync - encoder_cache: std::sync::RwLock>, - decoder_cache: std::sync::RwLock>, - }, - /// Pure erasure mode: default and when reed-solomon-erasure feature is specified - Erasure(Box), +/// Reed-Solomon encoder using SIMD implementation. +pub struct ReedSolomonEncoder { + data_shards: usize, + parity_shards: usize, + // 使用RwLock确保线程安全,实现Send + Sync + encoder_cache: std::sync::RwLock>, + decoder_cache: std::sync::RwLock>, } impl Clone for ReedSolomonEncoder { fn clone(&self) -> Self { - match self { - #[cfg(feature = "reed-solomon-simd")] - ReedSolomonEncoder::SIMD { - data_shards, - parity_shards, - .. - } => ReedSolomonEncoder::SIMD { - data_shards: *data_shards, - parity_shards: *parity_shards, - // 为新实例创建空的缓存,不共享缓存 - encoder_cache: std::sync::RwLock::new(None), - decoder_cache: std::sync::RwLock::new(None), - }, - ReedSolomonEncoder::Erasure(encoder) => ReedSolomonEncoder::Erasure(encoder.clone()), + Self { + data_shards: self.data_shards, + parity_shards: self.parity_shards, + // 为新实例创建空的缓存,不共享缓存 + encoder_cache: std::sync::RwLock::new(None), + decoder_cache: std::sync::RwLock::new(None), } } } @@ -83,81 +54,50 @@ impl Clone for ReedSolomonEncoder { impl ReedSolomonEncoder { /// Create a new Reed-Solomon encoder with specified data and parity shards. pub fn new(data_shards: usize, parity_shards: usize) -> io::Result { - #[cfg(feature = "reed-solomon-simd")] - { - // SIMD mode when reed-solomon-simd feature is enabled - Ok(ReedSolomonEncoder::SIMD { - data_shards, - parity_shards, - encoder_cache: std::sync::RwLock::new(None), - decoder_cache: std::sync::RwLock::new(None), - }) - } - - #[cfg(not(feature = "reed-solomon-simd"))] - { - // Pure erasure mode when reed-solomon-simd feature is not enabled (default or reed-solomon-erasure) - let encoder = ReedSolomonErasure::new(data_shards, parity_shards) - .map_err(|e| io::Error::other(format!("Failed to create erasure encoder: {:?}", e)))?; - Ok(ReedSolomonEncoder::Erasure(Box::new(encoder))) - } + Ok(ReedSolomonEncoder { + data_shards, + parity_shards, + encoder_cache: std::sync::RwLock::new(None), + decoder_cache: std::sync::RwLock::new(None), + }) } /// Encode data shards with parity. pub fn encode(&self, shards: SmallVec<[&mut [u8]; 16]>) -> io::Result<()> { - match self { - #[cfg(feature = "reed-solomon-simd")] - ReedSolomonEncoder::SIMD { - data_shards, - parity_shards, - encoder_cache, - .. - } => { - let mut shards_vec: Vec<&mut [u8]> = shards.into_vec(); - if shards_vec.is_empty() { - return Ok(()); - } + let mut shards_vec: Vec<&mut [u8]> = shards.into_vec(); + if shards_vec.is_empty() { + return Ok(()); + } - // 使用 SIMD 进行编码 - let simd_result = self.encode_with_simd(*data_shards, *parity_shards, encoder_cache, &mut shards_vec); + // 使用 SIMD 进行编码 + let simd_result = self.encode_with_simd(&mut shards_vec); - match simd_result { - Ok(()) => Ok(()), - Err(simd_error) => { - warn!("SIMD encoding failed: {}", simd_error); - Err(simd_error) - } - } + match simd_result { + Ok(()) => Ok(()), + Err(simd_error) => { + warn!("SIMD encoding failed: {}", simd_error); + Err(simd_error) } - ReedSolomonEncoder::Erasure(encoder) => encoder - .encode(shards) - .map_err(|e| io::Error::other(format!("Erasure encode error: {:?}", e))), } } - #[cfg(feature = "reed-solomon-simd")] - fn encode_with_simd( - &self, - data_shards: usize, - parity_shards: usize, - encoder_cache: &std::sync::RwLock>, - shards_vec: &mut [&mut [u8]], - ) -> io::Result<()> { + fn encode_with_simd(&self, shards_vec: &mut [&mut [u8]]) -> io::Result<()> { let shard_len = shards_vec[0].len(); // 获取或创建encoder let mut encoder = { - let mut cache_guard = encoder_cache + let mut cache_guard = self + .encoder_cache .write() .map_err(|_| io::Error::other("Failed to acquire encoder cache lock"))?; match cache_guard.take() { Some(mut cached_encoder) => { // 使用reset方法重置现有encoder以适应新的参数 - if let Err(e) = cached_encoder.reset(data_shards, parity_shards, shard_len) { + if let Err(e) = cached_encoder.reset(self.data_shards, self.parity_shards, shard_len) { warn!("Failed to reset SIMD encoder: {:?}, creating new one", e); // 如果reset失败,创建新的encoder - reed_solomon_simd::ReedSolomonEncoder::new(data_shards, parity_shards, shard_len) + reed_solomon_simd::ReedSolomonEncoder::new(self.data_shards, self.parity_shards, shard_len) .map_err(|e| io::Error::other(format!("Failed to create SIMD encoder: {:?}", e)))? } else { cached_encoder @@ -165,14 +105,14 @@ impl ReedSolomonEncoder { } None => { // 第一次使用,创建新encoder - reed_solomon_simd::ReedSolomonEncoder::new(data_shards, parity_shards, shard_len) + reed_solomon_simd::ReedSolomonEncoder::new(self.data_shards, self.parity_shards, shard_len) .map_err(|e| io::Error::other(format!("Failed to create SIMD encoder: {:?}", e)))? } } }; // 添加原始shards - for (i, shard) in shards_vec.iter().enumerate().take(data_shards) { + for (i, shard) in shards_vec.iter().enumerate().take(self.data_shards) { encoder .add_original_shard(shard) .map_err(|e| io::Error::other(format!("Failed to add shard {}: {:?}", i, e)))?; @@ -185,15 +125,16 @@ impl ReedSolomonEncoder { // 将恢复shards复制到输出缓冲区 for (i, recovery_shard) in result.recovery_iter().enumerate() { - if i + data_shards < shards_vec.len() { - shards_vec[i + data_shards].copy_from_slice(recovery_shard); + if i + self.data_shards < shards_vec.len() { + shards_vec[i + self.data_shards].copy_from_slice(recovery_shard); } } // 将encoder放回缓存(在result被drop后encoder自动重置,可以重用) drop(result); // 显式drop result,确保encoder被重置 - *encoder_cache + *self + .encoder_cache .write() .map_err(|_| io::Error::other("Failed to return encoder to cache"))? = Some(encoder); @@ -202,39 +143,19 @@ impl ReedSolomonEncoder { /// Reconstruct missing shards. pub fn reconstruct(&self, shards: &mut [Option>]) -> io::Result<()> { - match self { - #[cfg(feature = "reed-solomon-simd")] - ReedSolomonEncoder::SIMD { - data_shards, - parity_shards, - decoder_cache, - .. - } => { - // 使用 SIMD 进行重构 - let simd_result = self.reconstruct_with_simd(*data_shards, *parity_shards, decoder_cache, shards); + // 使用 SIMD 进行重构 + let simd_result = self.reconstruct_with_simd(shards); - match simd_result { - Ok(()) => Ok(()), - Err(simd_error) => { - warn!("SIMD reconstruction failed: {}", simd_error); - Err(simd_error) - } - } + match simd_result { + Ok(()) => Ok(()), + Err(simd_error) => { + warn!("SIMD reconstruction failed: {}", simd_error); + Err(simd_error) } - ReedSolomonEncoder::Erasure(encoder) => encoder - .reconstruct(shards) - .map_err(|e| io::Error::other(format!("Erasure reconstruct error: {:?}", e))), } } - #[cfg(feature = "reed-solomon-simd")] - fn reconstruct_with_simd( - &self, - data_shards: usize, - parity_shards: usize, - decoder_cache: &std::sync::RwLock>, - shards: &mut [Option>], - ) -> io::Result<()> { + fn reconstruct_with_simd(&self, shards: &mut [Option>]) -> io::Result<()> { // Find a valid shard to determine length let shard_len = shards .iter() @@ -243,17 +164,18 @@ impl ReedSolomonEncoder { // 获取或创建decoder let mut decoder = { - let mut cache_guard = decoder_cache + let mut cache_guard = self + .decoder_cache .write() .map_err(|_| io::Error::other("Failed to acquire decoder cache lock"))?; match cache_guard.take() { Some(mut cached_decoder) => { // 使用reset方法重置现有decoder - if let Err(e) = cached_decoder.reset(data_shards, parity_shards, shard_len) { + if let Err(e) = cached_decoder.reset(self.data_shards, self.parity_shards, shard_len) { warn!("Failed to reset SIMD decoder: {:?}, creating new one", e); // 如果reset失败,创建新的decoder - reed_solomon_simd::ReedSolomonDecoder::new(data_shards, parity_shards, shard_len) + reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len) .map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {:?}", e)))? } else { cached_decoder @@ -261,7 +183,7 @@ impl ReedSolomonEncoder { } None => { // 第一次使用,创建新decoder - reed_solomon_simd::ReedSolomonDecoder::new(data_shards, parity_shards, shard_len) + reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len) .map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {:?}", e)))? } } @@ -270,12 +192,12 @@ impl ReedSolomonEncoder { // Add available shards (both data and parity) for (i, shard_opt) in shards.iter().enumerate() { if let Some(shard) = shard_opt { - if i < data_shards { + if i < self.data_shards { decoder .add_original_shard(i, shard) .map_err(|e| io::Error::other(format!("Failed to add original shard for reconstruction: {:?}", e)))?; } else { - let recovery_idx = i - data_shards; + let recovery_idx = i - self.data_shards; decoder .add_recovery_shard(recovery_idx, shard) .map_err(|e| io::Error::other(format!("Failed to add recovery shard for reconstruction: {:?}", e)))?; @@ -289,7 +211,7 @@ impl ReedSolomonEncoder { // Fill in missing data shards from reconstruction result for (i, shard_opt) in shards.iter_mut().enumerate() { - if shard_opt.is_none() && i < data_shards { + if shard_opt.is_none() && i < self.data_shards { for (restored_index, restored_data) in result.restored_original_iter() { if restored_index == i { *shard_opt = Some(restored_data.to_vec()); @@ -302,7 +224,8 @@ impl ReedSolomonEncoder { // 将decoder放回缓存(在result被drop后decoder自动重置,可以重用) drop(result); // 显式drop result,确保decoder被重置 - *decoder_cache + *self + .decoder_cache .write() .map_err(|_| io::Error::other("Failed to return decoder to cache"))? = Some(decoder); @@ -592,19 +515,10 @@ mod tests { fn test_encode_decode_roundtrip() { let data_shards = 4; let parity_shards = 2; - - // Use different block sizes based on feature - #[cfg(not(feature = "reed-solomon-simd"))] - let block_size = 8; // Pure erasure mode (default) - #[cfg(feature = "reed-solomon-simd")] - let block_size = 1024; // SIMD mode - SIMD with fallback - + let block_size = 1024; // SIMD mode let erasure = Erasure::new(data_shards, parity_shards, block_size); - // Use different test data based on feature - #[cfg(not(feature = "reed-solomon-simd"))] - let test_data = b"hello world".to_vec(); // Small data for erasure (default) - #[cfg(feature = "reed-solomon-simd")] + // Use sufficient test data for SIMD optimization let test_data = b"SIMD mode test data for encoding and decoding roundtrip verification with sufficient length to ensure shard size requirements are met for proper SIMD optimization.".repeat(20); // ~3KB for SIMD let data = &test_data; @@ -632,13 +546,7 @@ mod tests { fn test_encode_decode_large_1m() { let data_shards = 4; let parity_shards = 2; - - // Use different block sizes based on feature - #[cfg(feature = "reed-solomon-simd")] let block_size = 512 * 3; // SIMD mode - #[cfg(not(feature = "reed-solomon-simd"))] - let block_size = 8192; // Pure erasure mode (default) - let erasure = Erasure::new(data_shards, parity_shards, block_size); // Generate 1MB test data @@ -704,16 +612,10 @@ mod tests { let data_shards = 4; let parity_shards = 2; - - // Use different block sizes based on feature - #[cfg(feature = "reed-solomon-simd")] let block_size = 1024; // SIMD mode - #[cfg(not(feature = "reed-solomon-simd"))] - let block_size = 8; // Pure erasure mode (default) - let erasure = Arc::new(Erasure::new(data_shards, parity_shards, block_size)); - // Use test data suitable for both modes + // Use test data suitable for SIMD mode let data = b"Async error test data with sufficient length to meet requirements for proper testing and validation.".repeat(20); // ~2KB @@ -747,13 +649,7 @@ mod tests { let data_shards = 4; let parity_shards = 2; - - // Use different block sizes based on feature - #[cfg(feature = "reed-solomon-simd")] let block_size = 1024; // SIMD mode - #[cfg(not(feature = "reed-solomon-simd"))] - let block_size = 8; // Pure erasure mode (default) - let erasure = Arc::new(Erasure::new(data_shards, parity_shards, block_size)); // Use test data that fits in exactly one block to avoid multi-block complexity @@ -761,8 +657,6 @@ mod tests { b"Channel async callback test data with sufficient length to ensure proper operation and validation requirements." .repeat(8); // ~1KB - // let data = b"callback".to_vec(); // 8 bytes to fit exactly in one 8-byte block - let data_clone = data.clone(); // Clone for later comparison let mut reader = Cursor::new(data); let (tx, mut rx) = mpsc::channel::>(8); @@ -801,8 +695,7 @@ mod tests { assert_eq!(&recovered, &data_clone); } - // Tests specifically for SIMD mode - #[cfg(feature = "reed-solomon-simd")] + // SIMD mode specific tests mod simd_tests { use super::*; @@ -1171,47 +1064,4 @@ mod tests { assert_eq!(&recovered, &data_clone); } } - - // Comparative tests between different implementations - #[cfg(not(feature = "reed-solomon-simd"))] - mod comparative_tests { - use super::*; - - #[test] - fn test_implementation_consistency() { - let data_shards = 4; - let parity_shards = 2; - let block_size = 2048; // Large enough for SIMD requirements - - // Create test data that ensures each shard is >= 512 bytes (SIMD minimum) - let test_data = b"This is test data for comparing reed-solomon-simd and reed-solomon-erasure implementations to ensure they produce consistent results when given the same input parameters and data. This data needs to be sufficiently large to meet SIMD requirements."; - let data = test_data.repeat(50); // Create much larger data: ~13KB total, ~3.25KB per shard - - // Test with erasure implementation (default) - let erasure_erasure = Erasure::new(data_shards, parity_shards, block_size); - let erasure_shards = erasure_erasure.encode_data(&data).unwrap(); - - // Test data integrity with erasure - let mut erasure_shards_opt: Vec>> = erasure_shards.iter().map(|shard| Some(shard.to_vec())).collect(); - - // Lose some shards - erasure_shards_opt[1] = None; // Data shard - erasure_shards_opt[4] = None; // Parity shard - - erasure_erasure.decode_data(&mut erasure_shards_opt).unwrap(); - - let mut erasure_recovered = Vec::new(); - for shard in erasure_shards_opt.iter().take(data_shards) { - erasure_recovered.extend_from_slice(shard.as_ref().unwrap()); - } - erasure_recovered.truncate(data.len()); - - // Verify erasure implementation works correctly - assert_eq!(&erasure_recovered, &data, "Erasure implementation failed to recover data correctly"); - - println!("✅ Both implementations are available and working correctly"); - println!("✅ Default (reed-solomon-erasure): Data recovery successful"); - println!("✅ SIMD tests are available as separate test suite"); - } - } } diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index a235feef..7378b42f 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -1,8 +1,8 @@ use crate::bitrot::{create_bitrot_reader, create_bitrot_writer}; -use crate::disk::error_reduce::{reduce_read_quorum_errs, reduce_write_quorum_errs, OBJECT_OP_IGNORED_ERRS}; +use crate::disk::error_reduce::{OBJECT_OP_IGNORED_ERRS, reduce_read_quorum_errs, reduce_write_quorum_errs}; use crate::disk::{ - self, conv_part_err_to_int, has_part_err, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, - CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, + self, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, + conv_part_err_to_int, has_part_err, }; use crate::erasure_coding; use crate::erasure_coding::bitrot_verify; @@ -12,24 +12,24 @@ use crate::heal::data_usage_cache::DataUsageCache; use crate::heal::heal_ops::{HealEntryFn, HealSequence}; use crate::store_api::ObjectToDelete; use crate::{ - cache_value::metacache_set::{list_path_raw, ListPathRawOptions}, - config::{storageclass, GLOBAL_StorageClass}, + cache_value::metacache_set::{ListPathRawOptions, list_path_raw}, + config::{GLOBAL_StorageClass, storageclass}, disk::{ - endpoint::Endpoint, error::DiskError, format::FormatV3, new_disk, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, - DiskInfoOptions, DiskOption, DiskStore, FileInfoVersions, ReadMultipleReq, ReadMultipleResp, - ReadOptions, UpdateMetadataOpts, RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET, + CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskOption, DiskStore, FileInfoVersions, + RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET, ReadMultipleReq, ReadMultipleResp, ReadOptions, + UpdateMetadataOpts, endpoint::Endpoint, error::DiskError, format::FormatV3, new_disk, }, - error::{to_object_err, StorageError}, + error::{StorageError, to_object_err}, global::{ - get_global_deployment_id, is_dist_erasure, GLOBAL_BackgroundHealState, GLOBAL_LOCAL_DISK_MAP, - GLOBAL_LOCAL_DISK_SET_DRIVES, + GLOBAL_BackgroundHealState, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES, get_global_deployment_id, + is_dist_erasure, }, heal::{ data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT}, data_usage_cache::{DataUsageCacheInfo, DataUsageEntry, DataUsageEntryInfo}, heal_commands::{ - HealOpts, HealScanMode, HealingTracker, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, - DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT, HEAL_NORMAL_SCAN, + DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT, + HEAL_NORMAL_SCAN, HealOpts, HealScanMode, HealingTracker, }, heal_ops::BG_HEALING_UUID, }, @@ -42,7 +42,7 @@ use crate::{ }; use crate::{disk::STORAGE_FORMAT_FILE, heal::mrf::PartialOperation}; use crate::{ - heal::data_scanner::{globalHealConfig, HEAL_DELETE_DANGLING}, + heal::data_scanner::{HEAL_DELETE_DANGLING, globalHealConfig}, store_api::ListObjectVersionsInfo, }; use bytes::Bytes; @@ -51,22 +51,22 @@ use chrono::Utc; use futures::future::join_all; use glob::Pattern; use http::HeaderMap; -use lock::{namespace_lock::NsLockMap, LockApi}; +use lock::{LockApi, namespace_lock::NsLockMap}; use madmin::heal_commands::{HealDriveInfo, HealResultItem}; use md5::{Digest as Md5Digest, Md5}; -use rand::{seq::SliceRandom, Rng}; +use rand::{Rng, seq::SliceRandom}; use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER; use rustfs_filemeta::{ - file_info_from_raw, headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS}, merge_file_meta_versions, FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, - MetaCacheEntry, MetadataResolutionParams, - ObjectPartInfo, - RawFileInfo, + FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, ObjectPartInfo, + RawFileInfo, file_info_from_raw, + headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS}, + merge_file_meta_versions, }; use rustfs_rio::{EtagResolvable, HashReader, TryGetIndex as _, WarpReader}; use rustfs_utils::{ - crypto::{base64_decode, base64_encode, hex}, - path::{encode_dir_object, has_suffix, path_join_buf, SLASH_SEPARATOR}, HashAlgorithm, + crypto::{base64_decode, base64_encode, hex}, + path::{SLASH_SEPARATOR, encode_dir_object, has_suffix, path_join_buf}, }; use sha2::Sha256; use std::hash::Hash; @@ -82,7 +82,7 @@ use std::{ use time::OffsetDateTime; use tokio::{ io::AsyncWrite, - sync::{broadcast, RwLock}, + sync::{RwLock, broadcast}, }; use tokio::{ select, @@ -5837,9 +5837,9 @@ fn get_complete_multipart_md5(parts: &[CompletePart]) -> String { #[cfg(test)] mod tests { use super::*; - use crate::disk::error::DiskError; use crate::disk::CHECK_PART_UNKNOWN; use crate::disk::CHECK_PART_VOLUME_NOT_FOUND; + use crate::disk::error::DiskError; use crate::store_api::CompletePart; use rustfs_filemeta::ErasureInfo; use std::collections::HashMap; diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 246ff0e5..122fe4fe 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -8,10 +8,10 @@ use crate::{disk::DiskStore, heal::heal_commands::HealOpts}; use http::{HeaderMap, HeaderValue}; use madmin::heal_commands::HealResultItem; use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER; -use rustfs_filemeta::{headers::AMZ_OBJECT_TAGGING, FileInfo, MetaCacheEntriesSorted, ObjectPartInfo}; +use rustfs_filemeta::{FileInfo, MetaCacheEntriesSorted, ObjectPartInfo, headers::AMZ_OBJECT_TAGGING}; use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader}; -use rustfs_utils::path::decode_dir_object; use rustfs_utils::CompressionAlgorithm; +use rustfs_utils::path::decode_dir_object; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt::Debug; diff --git a/ecstore/src/store_list_objects.rs b/ecstore/src/store_list_objects.rs index 100c8667..4aed6cab 100644 --- a/ecstore/src/store_list_objects.rs +++ b/ecstore/src/store_list_objects.rs @@ -1,24 +1,24 @@ +use crate::StorageAPI; use crate::bucket::metadata_sys::get_versioning_config; use crate::bucket::versioning::VersioningApi; -use crate::cache_value::metacache_set::{list_path_raw, ListPathRawOptions}; +use crate::cache_value::metacache_set::{ListPathRawOptions, list_path_raw}; use crate::disk::error::DiskError; use crate::disk::{DiskInfo, DiskStore}; use crate::error::{ - is_all_not_found, is_all_volume_not_found, is_err_bucket_not_found, to_object_err, Error, Result, StorageError, + Error, Result, StorageError, is_all_not_found, is_all_volume_not_found, is_err_bucket_not_found, to_object_err, }; use crate::set_disk::SetDisks; use crate::store::check_list_objs_args; use crate::store_api::{ListObjectVersionsInfo, ListObjectsInfo, ObjectInfo, ObjectOptions}; use crate::store_utils::is_reserved_or_invalid_bucket; -use crate::StorageAPI; use crate::{store::ECStore, store_api::ListObjectsV2Info}; use futures::future::join_all; use rand::seq::SliceRandom; use rustfs_filemeta::{ - merge_file_meta_versions, FileInfo, MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntriesSortedResult, MetaCacheEntry, - MetadataResolutionParams, + FileInfo, MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntriesSortedResult, MetaCacheEntry, MetadataResolutionParams, + merge_file_meta_versions, }; -use rustfs_utils::path::{self, base_dir_from_prefix, SLASH_SEPARATOR}; +use rustfs_utils::path::{self, SLASH_SEPARATOR, base_dir_from_prefix}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::broadcast::{self, Receiver as B_Receiver}; diff --git a/iam/src/manager.rs b/iam/src/manager.rs index 473ab877..9551f455 100644 --- a/iam/src/manager.rs +++ b/iam/src/manager.rs @@ -1,21 +1,21 @@ -use crate::error::{is_err_config_not_found, Error, Result}; +use crate::error::{Error, Result, is_err_config_not_found}; use crate::{ cache::{Cache, CacheEntity}, - error::{is_err_no_such_group, is_err_no_such_policy, is_err_no_such_user, Error as IamError}, - store::{object::IAM_CONFIG_PREFIX, GroupInfo, MappedPolicy, Store, UserType}, + error::{Error as IamError, is_err_no_such_group, is_err_no_such_policy, is_err_no_such_user}, + store::{GroupInfo, MappedPolicy, Store, UserType, object::IAM_CONFIG_PREFIX}, sys::{ - UpdateServiceAccountOpts, MAX_SVCSESSION_POLICY_SIZE, SESSION_POLICY_NAME, SESSION_POLICY_NAME_EXTRACTED, STATUS_DISABLED, - STATUS_ENABLED, + MAX_SVCSESSION_POLICY_SIZE, SESSION_POLICY_NAME, SESSION_POLICY_NAME_EXTRACTED, STATUS_DISABLED, STATUS_ENABLED, + UpdateServiceAccountOpts, }, }; use ecstore::global::get_global_action_cred; use madmin::{AccountStatus, AddOrUpdateUserReq, GroupDesc}; use policy::{ arn::ARN, - auth::{self, get_claims_from_token_with_secret, is_secret_key_valid, jwt_sign, Credentials, UserIdentity}, + auth::{self, Credentials, UserIdentity, get_claims_from_token_with_secret, is_secret_key_valid, jwt_sign}, format::Format, policy::{ - default::DEFAULT_POLICIES, iam_policy_claim_name_sa, Policy, PolicyDoc, EMBEDDED_POLICY_TYPE, INHERITED_POLICY_TYPE, + EMBEDDED_POLICY_TYPE, INHERITED_POLICY_TYPE, Policy, PolicyDoc, default::DEFAULT_POLICIES, iam_policy_claim_name_sa, }, }; use rustfs_utils::crypto::base64_encode; @@ -25,8 +25,8 @@ use serde_json::Value; use std::{ collections::{HashMap, HashSet}, sync::{ - atomic::{AtomicBool, AtomicI64, Ordering}, Arc, + atomic::{AtomicBool, AtomicI64, Ordering}, }, time::Duration, }; diff --git a/rustfs/src/admin/rpc.rs b/rustfs/src/admin/rpc.rs index d24c9641..5f471f96 100644 --- a/rustfs/src/admin/rpc.rs +++ b/rustfs/src/admin/rpc.rs @@ -10,12 +10,12 @@ use http::StatusCode; use hyper::Method; use matchit::Params; use rustfs_utils::net::bytes_stream; -use s3s::dto::StreamingBlob; -use s3s::s3_error; use s3s::Body; use s3s::S3Request; use s3s::S3Response; use s3s::S3Result; +use s3s::dto::StreamingBlob; +use s3s::s3_error; use serde_urlencoded::from_bytes; use tokio::io::AsyncWriteExt; use tokio_util::io::ReaderStream; diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index b0695233..ae915ff2 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -12,9 +12,9 @@ mod service; mod storage; use crate::auth::IAMAuth; -use crate::console::{init_console_cfg, CONSOLE_CONFIG}; +use crate::console::{CONSOLE_CONFIG, init_console_cfg}; // Ensure the correct path for parse_license is imported -use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT}; +use crate::server::{SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, wait_for_shutdown}; use bytes::Bytes; use chrono::Datelike; use clap::Parser; @@ -22,6 +22,7 @@ use common::{ // error::{Error, Result}, globals::set_global_addr, }; +use ecstore::StorageAPI; use ecstore::bucket::metadata_sys::init_bucket_metadata_sys; use ecstore::cmd::bucket_replication::init_bucket_replication_pool; use ecstore::config as ecconfig; @@ -29,12 +30,11 @@ use ecstore::config::GLOBAL_ConfigSys; use ecstore::heal::background_heal_ops::init_auto_heal; use ecstore::rpc::make_server; use ecstore::store_api::BucketOptions; -use ecstore::StorageAPI; use ecstore::{ endpoints::EndpointServerPools, heal::data_scanner::init_data_scanner, set_global_endpoints, - store::{init_local_disks, ECStore}, + store::{ECStore, init_local_disks}, update_erasure_type, }; use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys}; @@ -49,7 +49,7 @@ use iam::init_iam_sys; use license::init_license; use protos::proto_gen::node_service::node_service_server::NodeServiceServer; use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; -use rustfs_obs::{init_obs, set_global_guard, SystemObserver}; +use rustfs_obs::{SystemObserver, init_obs, set_global_guard}; use rustfs_utils::net::parse_and_resolve_address; use rustls::ServerConfig; use s3s::{host::MultiDomain, service::S3ServiceBuilder}; @@ -60,13 +60,13 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tokio::net::TcpListener; -use tokio::signal::unix::{signal, SignalKind}; +use tokio::signal::unix::{SignalKind, signal}; use tokio_rustls::TlsAcceptor; -use tonic::{metadata::MetadataValue, Request, Status}; +use tonic::{Request, Status, metadata::MetadataValue}; use tower_http::cors::CorsLayer; use tower_http::trace::TraceLayer; +use tracing::{Span, instrument}; use tracing::{debug, error, info, warn}; -use tracing::{instrument, Span}; const MI_B: usize = 1024 * 1024; diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 56bc8cf1..178b689d 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -16,8 +16,8 @@ use bytes::Bytes; use chrono::DateTime; use chrono::Utc; use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder; -use datafusion::arrow::json::writer::JsonArray; use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder; +use datafusion::arrow::json::writer::JsonArray; use ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG; use ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG; use ecstore::bucket::metadata::BUCKET_POLICY_CONFIG; @@ -32,13 +32,13 @@ use ecstore::bucket::tagging::decode_tags; use ecstore::bucket::tagging::encode_tags; use ecstore::bucket::utils::serialize; use ecstore::bucket::versioning_sys::BucketVersioningSys; +use ecstore::cmd::bucket_replication::ReplicationStatusType; +use ecstore::cmd::bucket_replication::ReplicationType; use ecstore::cmd::bucket_replication::get_must_replicate_options; use ecstore::cmd::bucket_replication::must_replicate; use ecstore::cmd::bucket_replication::schedule_replication; -use ecstore::cmd::bucket_replication::ReplicationStatusType; -use ecstore::cmd::bucket_replication::ReplicationType; -use ecstore::compress::is_compressible; use ecstore::compress::MIN_COMPRESSIBLE_SIZE; +use ecstore::compress::is_compressible; use ecstore::error::StorageError; use ecstore::new_object_layer_fn; use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE; @@ -58,11 +58,11 @@ use futures::StreamExt; use http::HeaderMap; use lazy_static::lazy_static; use policy::auth; -use policy::policy::action::Action; -use policy::policy::action::S3Action; use policy::policy::BucketPolicy; use policy::policy::BucketPolicyArgs; use policy::policy::Validator; +use policy::policy::action::Action; +use policy::policy::action::S3Action; use query::instance::make_rustfsms; use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER; use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING}; @@ -70,23 +70,23 @@ use rustfs_rio::CompressReader; use rustfs_rio::HashReader; use rustfs_rio::Reader; use rustfs_rio::WarpReader; -use rustfs_utils::path::path_join_buf; use rustfs_utils::CompressionAlgorithm; +use rustfs_utils::path::path_join_buf; use rustfs_zip::CompressionFormat; -use s3s::dto::*; -use s3s::s3_error; +use s3s::S3; use s3s::S3Error; use s3s::S3ErrorCode; use s3s::S3Result; -use s3s::S3; +use s3s::dto::*; +use s3s::s3_error; use s3s::{S3Request, S3Response}; use std::collections::HashMap; use std::fmt::Debug; use std::path::Path; use std::str::FromStr; use std::sync::Arc; -use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; +use time::format_description::well_known::Rfc3339; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_tar::Archive;