Merge pull request #499 from rustfs/feat/migrate-to-simd-only

Feat/migrate to simd only
This commit is contained in:
weisd
2025-06-23 10:32:32 +08:00
committed by GitHub
60 changed files with 631 additions and 1035 deletions

125
Cargo.lock generated
View File

@@ -52,17 +52,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "ahash"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9"
dependencies = [
"getrandom 0.2.16",
"once_cell",
"version_check",
]
[[package]]
name = "ahash"
version = "0.8.12"
@@ -301,7 +290,7 @@ version = "54.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a12fcdb3f1d03f69d3ec26ac67645a8fe3f878d77b5ebb0b15d64a116c212985"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow-buffer",
"arrow-data",
"arrow-schema",
@@ -446,7 +435,7 @@ version = "54.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69efcd706420e52cd44f5c4358d279801993846d1c2a8e52111853d61d55a619"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow-array",
"arrow-buffer",
"arrow-data",
@@ -819,7 +808,7 @@ dependencies = [
"http 0.2.12",
"http 1.3.1",
"http-body 0.4.6",
"lru 0.12.5",
"lru",
"percent-encoding",
"regex-lite",
"sha2 0.10.9",
@@ -2381,7 +2370,7 @@ dependencies = [
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core 0.9.11",
"parking_lot_core",
]
[[package]]
@@ -2395,7 +2384,7 @@ dependencies = [
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core 0.9.11",
"parking_lot_core",
]
[[package]]
@@ -2442,7 +2431,7 @@ dependencies = [
"itertools 0.14.0",
"log",
"object_store",
"parking_lot 0.12.4",
"parking_lot",
"parquet",
"rand 0.8.5",
"regex",
@@ -2472,7 +2461,7 @@ dependencies = [
"futures",
"itertools 0.14.0",
"log",
"parking_lot 0.12.4",
"parking_lot",
]
[[package]]
@@ -2503,7 +2492,7 @@ version = "46.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f53d7ec508e1b3f68bd301cee3f649834fad51eff9240d898a4b2614cfd0a7a"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"arrow-ipc",
"base64 0.22.1",
@@ -2584,7 +2573,7 @@ dependencies = [
"futures",
"log",
"object_store",
"parking_lot 0.12.4",
"parking_lot",
"rand 0.8.5",
"tempfile",
"url",
@@ -2659,7 +2648,7 @@ version = "46.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adfc2d074d5ee4d9354fdcc9283d5b2b9037849237ddecb8942a29144b77ca05"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"datafusion-common",
"datafusion-doc",
@@ -2680,7 +2669,7 @@ version = "46.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cbceba0f98d921309a9121b702bcd49289d383684cccabf9a92cda1602f3bbb"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"datafusion-common",
"datafusion-expr-common",
@@ -2720,7 +2709,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datafusion-physical-plan",
"parking_lot 0.12.4",
"parking_lot",
"paste",
]
@@ -2787,7 +2776,7 @@ version = "46.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1447c2c6bc8674a16be4786b4abf528c302803fafa186aa6275692570e64d85"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"datafusion-common",
"datafusion-expr",
@@ -2809,7 +2798,7 @@ version = "46.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f8c25dcd069073a75b3d2840a79d0f81e64bdd2c05f2d3d18939afb36a7dcb"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"datafusion-common",
"datafusion-expr-common",
@@ -2842,7 +2831,7 @@ version = "46.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88cc160df00e413e370b3b259c8ea7bfbebc134d32de16325950e9e923846b7f"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"arrow-ord",
"arrow-schema",
@@ -2861,7 +2850,7 @@ dependencies = [
"indexmap 2.9.0",
"itertools 0.14.0",
"log",
"parking_lot 0.12.4",
"parking_lot",
"pin-project-lite",
"tokio",
]
@@ -3412,7 +3401,7 @@ dependencies = [
"futures-util",
"generational-box",
"once_cell",
"parking_lot 0.12.4",
"parking_lot",
"rustc-hash 1.1.0",
"tracing",
"warnings",
@@ -3646,7 +3635,6 @@ dependencies = [
"policy",
"protos",
"rand 0.9.1",
"reed-solomon-erasure",
"reed-solomon-simd",
"regex",
"reqwest",
@@ -4244,7 +4232,7 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a673cf4fb0ea6a91aa86c08695756dfe875277a912cdbf33db9a9f62d47ed82b"
dependencies = [
"parking_lot 0.12.4",
"parking_lot",
"tracing",
]
@@ -4612,9 +4600,6 @@ name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [
"ahash 0.7.8",
]
[[package]]
name = "hashbrown"
@@ -4622,7 +4607,7 @@ version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash 0.8.12",
"ahash",
"allocator-api2",
]
@@ -5723,15 +5708,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3bd0dd2cd90571056fdb71f6275fada10131182f84899f4b2a916e565d81d86"
[[package]]
name = "lru"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a"
dependencies = [
"hashbrown 0.12.3",
]
[[package]]
name = "lru"
version = "0.12.5"
@@ -6611,7 +6587,7 @@ dependencies = [
"futures",
"humantime",
"itertools 0.13.0",
"parking_lot 0.12.4",
"parking_lot",
"percent-encoding",
"snafu",
"tokio",
@@ -6821,17 +6797,6 @@ version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core 0.8.6",
]
[[package]]
name = "parking_lot"
version = "0.12.4"
@@ -6839,21 +6804,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13"
dependencies = [
"lock_api",
"parking_lot_core 0.9.11",
]
[[package]]
name = "parking_lot_core"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall 0.2.16",
"smallvec",
"winapi",
"parking_lot_core",
]
[[package]]
@@ -6875,7 +6826,7 @@ version = "54.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfb15796ac6f56b429fd99e33ba133783ad75b27c36b4b5ce06f1f82cc97754e"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow-array",
"arrow-buffer",
"arrow-cast",
@@ -7554,7 +7505,7 @@ dependencies = [
"derive_builder",
"futures",
"lazy_static",
"parking_lot 0.12.4",
"parking_lot",
"s3s",
"snafu",
"tokio",
@@ -7840,15 +7791,6 @@ dependencies = [
"syn 2.0.103",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.3.5"
@@ -7878,21 +7820,6 @@ dependencies = [
"thiserror 2.0.12",
]
[[package]]
name = "reed-solomon-erasure"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f"
dependencies = [
"cc",
"libc",
"libm",
"lru 0.7.8",
"parking_lot 0.11.2",
"smallvec",
"spin",
]
[[package]]
name = "reed-solomon-simd"
version = "3.0.1"
@@ -9485,7 +9412,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf776ba3fa74f83bf4b63c3dcbbf82173db2632ed8452cb2d891d33f459de70f"
dependencies = [
"new_debug_unreachable",
"parking_lot 0.12.4",
"parking_lot",
"phf_shared 0.11.3",
"precomputed-hash",
"serde",
@@ -9732,7 +9659,7 @@ dependencies = [
"ndk-sys",
"objc",
"once_cell",
"parking_lot 0.12.4",
"parking_lot",
"raw-window-handle 0.5.2",
"raw-window-handle 0.6.2",
"scopeguard",
@@ -10001,7 +9928,7 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot 0.12.4",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",

View File

@@ -167,7 +167,7 @@ flate2 = "1.1.1"
zstd = "0.13.3"
lz4 = "1.28.1"
rdkafka = { version = "0.37.0", features = ["tokio"] }
reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] }
reed-solomon-simd = { version = "3.0.0" }
regex = { version = "1.11.1" }
reqwest = { version = "0.12.20", default-features = false, features = [

View File

@@ -1,8 +1,8 @@
use rsa::Pkcs1v15Encrypt;
use rsa::{
RsaPrivateKey, RsaPublicKey,
pkcs8::{DecodePrivateKey, DecodePublicKey},
rand_core::OsRng,
RsaPrivateKey, RsaPublicKey,
};
use serde::{Deserialize, Serialize};
use std::io::{Error, Result};
@@ -58,8 +58,8 @@ static TEST_PRIVATE_KEY: &str = "-----BEGIN PRIVATE KEY-----\nMIIEvAIBADANBgkqhk
mod tests {
use super::*;
use rsa::{
pkcs8::{EncodePrivateKey, EncodePublicKey, LineEnding},
RsaPrivateKey,
pkcs8::{EncodePrivateKey, EncodePublicKey, LineEnding},
};
use std::time::{SystemTime, UNIX_EPOCH};
#[test]

View File

@@ -5,7 +5,7 @@ use rustfs_notify::factory::{
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
};
use rustfs_notify::store::DEFAULT_LIMIT;
use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError};
use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger};
use rustfs_notify::{initialize, notification_system};
use std::time::Duration;
use tracing::info;

View File

@@ -6,7 +6,7 @@ use rustfs_notify::factory::{
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
};
use rustfs_notify::store::DEFAULT_LIMIT;
use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError};
use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger};
use rustfs_notify::{initialize, notification_system};
use std::time::Duration;
use tracing::info;

View File

@@ -1,9 +1,9 @@
use axum::routing::get;
use axum::{
Router,
extract::Json,
http::{HeaderMap, Response, StatusCode},
routing::post,
Router,
};
use serde_json::Value;
use std::time::{SystemTime, UNIX_EPOCH};

View File

@@ -41,7 +41,7 @@ impl TargetID {
ARN {
target_id: self.clone(),
region: region.to_string(),
service: DEFAULT_ARN_SERVICE.to_string(), // Default Service
service: DEFAULT_ARN_SERVICE.to_string(), // Default Service
partition: DEFAULT_ARN_PARTITION.to_string(), // Default partition
}
}
@@ -112,7 +112,7 @@ impl ARN {
ARN {
target_id,
region,
service: DEFAULT_ARN_SERVICE.to_string(), // Default is sqs
service: DEFAULT_ARN_SERVICE.to_string(), // Default is sqs
partition: DEFAULT_ARN_PARTITION.to_string(), // Default is rustfs partition
}
}
@@ -121,16 +121,10 @@ impl ARN {
/// Returns the ARN string in the format "{ARN_PREFIX}:{region}:{target_id}"
#[allow(clippy::inherent_to_string)]
pub fn to_arn_string(&self) -> String {
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty()
{
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() {
return String::new();
}
format!(
"{}:{}:{}",
ARN_PREFIX,
self.region,
self.target_id.to_id_string()
)
format!("{}:{}:{}", ARN_PREFIX, self.region, self.target_id.to_id_string())
}
/// Parsing ARN from string
@@ -162,8 +156,7 @@ impl ARN {
impl fmt::Display for ARN {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty()
{
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() {
// Returns an empty string if all parts are empty
return Ok(());
}

View File

@@ -445,12 +445,8 @@ impl Event {
};
let mut resp_elements = args.resp_elements.clone();
resp_elements
.entry("x-amz-request-id".to_string())
.or_insert_with(|| "".to_string());
resp_elements
.entry("x-amz-id-2".to_string())
.or_insert_with(|| "".to_string());
resp_elements.entry("x-amz-request-id".to_string()).or_default();
resp_elements.entry("x-amz-id-2".to_string()).or_default();
// ... Filling of other response elements
// URL encoding of object keys

View File

@@ -1,7 +1,7 @@
use crate::store::DEFAULT_LIMIT;
use crate::{
error::TargetError,
target::{mqtt::MQTTArgs, webhook::WebhookArgs, Target},
target::{Target, mqtt::MQTTArgs, webhook::WebhookArgs},
};
use async_trait::async_trait;
use ecstore::config::{ENABLE_KEY, ENABLE_ON, KVS};

View File

@@ -54,7 +54,7 @@ impl Notifier {
// Create an event and send it
let event = Event::new(args.clone());
notification_sys
.send_event(&args.bucket_name, &args.event_name.as_str(), &args.object.name.clone(), event)
.send_event(&args.bucket_name, args.event_name.as_str(), args.object.name.as_str(), event)
.await;
}
}

View File

@@ -1,15 +1,15 @@
use crate::arn::TargetID;
use crate::store::{Key, Store};
use crate::{
error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, Event,
StoreError, Target,
Event, StoreError, Target, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry,
rules::BucketNotificationConfig, stream,
};
use ecstore::config::{Config, KVS};
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, RwLock, Semaphore};
use tokio::sync::{RwLock, Semaphore, mpsc};
use tracing::{debug, error, info, warn};
/// Notify the system of monitoring indicators
@@ -189,10 +189,7 @@ impl NotificationSystem {
info!("Attempting to remove target: {}", target_id);
let Some(store) = ecstore::global::new_object_layer_fn() else {
return Err(NotificationError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
"errServerNotInitialized",
)));
return Err(NotificationError::Io(std::io::Error::other("errServerNotInitialized")));
};
let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone())
@@ -243,10 +240,7 @@ impl NotificationSystem {
info!("Setting config for target {} of type {}", target_name, target_type);
// 1. Get the storage handle
let Some(store) = ecstore::global::new_object_layer_fn() else {
return Err(NotificationError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
"errServerNotInitialized",
)));
return Err(NotificationError::Io(std::io::Error::other("errServerNotInitialized")));
};
// 2. Read the latest configuration from storage
@@ -306,10 +300,7 @@ impl NotificationSystem {
pub async fn remove_target_config(&self, target_type: &str, target_name: &str) -> Result<(), NotificationError> {
info!("Removing config for target {} of type {}", target_name, target_type);
let Some(store) = ecstore::global::new_object_layer_fn() else {
return Err(NotificationError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
"errServerNotInitialized",
)));
return Err(NotificationError::Io(std::io::Error::other("errServerNotInitialized")));
};
let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone())

View File

@@ -26,7 +26,7 @@ pub use rules::BucketNotificationConfig;
use std::io::IsTerminal;
pub use target::Target;
use tracing_subscriber::{fmt, prelude::*, util::SubscriberInitExt, EnvFilter};
use tracing_subscriber::{EnvFilter, fmt, prelude::*, util::SubscriberInitExt};
/// Initialize the tracing log system
///

View File

@@ -1,5 +1,5 @@
use crate::arn::TargetID;
use crate::{error::NotificationError, event::Event, rules::RulesMap, target::Target, EventName};
use crate::{EventName, error::NotificationError, event::Event, rules::RulesMap, target::Target};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument, warn};

View File

@@ -1,11 +1,11 @@
use super::rules_map::RulesMap;
// Keep for existing structure if any, or remove if not used
use super::xml_config::ParseConfigError as BucketNotificationConfigError;
use crate::EventName;
use crate::arn::TargetID;
use crate::rules::NotificationConfiguration;
use crate::rules::pattern_rules;
use crate::rules::target_id_set;
use crate::rules::NotificationConfiguration;
use crate::EventName;
use std::collections::HashMap;
use std::io::Read;
// Assuming this is the XML config structure

View File

@@ -78,10 +78,7 @@ mod tests {
assert_eq!(new_pattern(Some(""), Some("b")), "*b");
assert_eq!(new_pattern(None, None), "");
assert_eq!(new_pattern(Some("prefix"), Some("suffix")), "prefix*suffix");
assert_eq!(
new_pattern(Some("prefix/"), Some("/suffix")),
"prefix/*suffix"
); // prefix/* + */suffix -> prefix/**/suffix -> prefix/*/suffix
assert_eq!(new_pattern(Some("prefix/"), Some("/suffix")), "prefix/*suffix"); // prefix/* + */suffix -> prefix/**/suffix -> prefix/*/suffix
}
#[test]

View File

@@ -23,9 +23,7 @@ impl PatternRules {
/// Checks if there are any rules that match the given object name.
pub fn match_simple(&self, object_name: &str) -> bool {
self.rules
.keys()
.any(|p| pattern::match_simple(p, object_name))
self.rules.keys().any(|p| pattern::match_simple(p, object_name))
}
/// Returns all TargetIDs that match the object name.
@@ -61,8 +59,7 @@ impl PatternRules {
for (pattern, self_targets) in &self.rules {
match other.rules.get(pattern) {
Some(other_targets) => {
let diff_targets: TargetIdSet =
self_targets.difference(other_targets).cloned().collect();
let diff_targets: TargetIdSet = self_targets.difference(other_targets).cloned().collect();
if !diff_targets.is_empty() {
result_rules.insert(pattern.clone(), diff_targets);
}
@@ -73,8 +70,6 @@ impl PatternRules {
}
}
}
PatternRules {
rules: result_rules,
}
PatternRules { rules: result_rules }
}
}

View File

@@ -1,5 +1,5 @@
use super::pattern;
use crate::arn::{ArnError, TargetIDError, ARN};
use crate::arn::{ARN, ArnError, TargetIDError};
use crate::event::EventName;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;

View File

@@ -1,5 +1,5 @@
use crate::error::StoreError;
use serde::{de::DeserializeOwned, Serialize};
use serde::{Serialize, de::DeserializeOwned};
use snap::raw::{Decoder, Encoder};
use std::sync::{Arc, RwLock};
use std::{
@@ -195,11 +195,7 @@ impl<T: Serialize + DeserializeOwned + Send + Sync> QueueStore<T> {
/// Reads a file for the given key
fn read_file(&self, key: &Key) -> Result<Vec<u8>, StoreError> {
let path = self.file_path(key);
debug!(
"Reading file for key: {},path: {}",
key.to_string(),
path.display()
);
debug!("Reading file for key: {},path: {}", key.to_string(), path.display());
let data = std::fs::read(&path).map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
StoreError::NotFound
@@ -240,13 +236,11 @@ impl<T: Serialize + DeserializeOwned + Send + Sync> QueueStore<T> {
};
std::fs::write(&path, &data).map_err(StoreError::Io)?;
let modified = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as i64;
let mut entries = self.entries.write().map_err(|_| {
StoreError::Internal("Failed to acquire write lock on entries".to_string())
})?;
let modified = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as i64;
let mut entries = self
.entries
.write()
.map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?;
entries.insert(key.to_string(), modified);
debug!("Wrote event to store: {}", key.to_string());
Ok(())
@@ -265,18 +259,16 @@ where
let entries = std::fs::read_dir(&self.directory).map_err(StoreError::Io)?;
// Get the write lock to update the internal state
let mut entries_map = self.entries.write().map_err(|_| {
StoreError::Internal("Failed to acquire write lock on entries".to_string())
})?;
let mut entries_map = self
.entries
.write()
.map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?;
for entry in entries {
let entry = entry.map_err(StoreError::Io)?;
let metadata = entry.metadata().map_err(StoreError::Io)?;
if metadata.is_file() {
let modified = metadata.modified().map_err(StoreError::Io)?;
let unix_nano = modified
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as i64;
let unix_nano = modified.duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as i64;
let file_name = entry.file_name().to_string_lossy().to_string();
entries_map.insert(file_name, unix_nano);
@@ -290,9 +282,10 @@ where
fn put(&self, item: T) -> Result<Self::Key, Self::Error> {
// Check storage limits
{
let entries = self.entries.read().map_err(|_| {
StoreError::Internal("Failed to acquire read lock on entries".to_string())
})?;
let entries = self
.entries
.read()
.map_err(|_| StoreError::Internal("Failed to acquire read lock on entries".to_string()))?;
if entries.len() as u64 >= self.entry_limit {
return Err(StoreError::LimitExceeded);
@@ -307,8 +300,7 @@ where
compress: true,
};
let data =
serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?;
let data = serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?;
self.write_file(&key, &data)?;
Ok(key)
@@ -317,9 +309,10 @@ where
fn put_multiple(&self, items: Vec<T>) -> Result<Self::Key, Self::Error> {
// Check storage limits
{
let entries = self.entries.read().map_err(|_| {
StoreError::Internal("Failed to acquire read lock on entries".to_string())
})?;
let entries = self
.entries
.read()
.map_err(|_| StoreError::Internal("Failed to acquire read lock on entries".to_string()))?;
if entries.len() as u64 >= self.entry_limit {
return Err(StoreError::LimitExceeded);
@@ -327,9 +320,7 @@ where
}
if items.is_empty() {
// Or return an error, or a special key?
return Err(StoreError::Internal(
"Cannot put_multiple with empty items list".to_string(),
));
return Err(StoreError::Internal("Cannot put_multiple with empty items list".to_string()));
}
let uuid = Uuid::new_v4();
let key = Key {
@@ -348,8 +339,7 @@ where
for item in items {
// If items are Vec<Event>, and Event is large, this could be inefficient.
// The current get_multiple deserializes one by one.
let item_data =
serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?;
let item_data = serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?;
buffer.extend_from_slice(&item_data);
// If using JSON array: buffer = serde_json::to_vec(&items)?
}
@@ -374,9 +364,7 @@ where
debug!("Reading items from store for key: {}", key.to_string());
let data = self.read_file(key)?;
if data.is_empty() {
return Err(StoreError::Deserialization(
"Cannot deserialize empty data".to_string(),
));
return Err(StoreError::Deserialization("Cannot deserialize empty data".to_string()));
}
let mut items = Vec::with_capacity(key.item_count);
@@ -395,10 +383,7 @@ where
match deserializer.next() {
Some(Ok(item)) => items.push(item),
Some(Err(e)) => {
return Err(StoreError::Deserialization(format!(
"Failed to deserialize item in batch: {}",
e
)));
return Err(StoreError::Deserialization(format!("Failed to deserialize item in batch: {}", e)));
}
None => {
// Reached end of stream sooner than item_count
@@ -435,7 +420,10 @@ where
std::fs::remove_file(&path).map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
// If file not found, still try to remove from entries map in case of inconsistency
warn!("File not found for key {} during del, but proceeding to remove from entries map.", key.to_string());
warn!(
"File not found for key {} during del, but proceeding to remove from entries map.",
key.to_string()
);
StoreError::NotFound
} else {
StoreError::Io(e)
@@ -443,17 +431,15 @@ where
})?;
// Get the write lock to update the internal state
let mut entries = self.entries.write().map_err(|_| {
StoreError::Internal("Failed to acquire write lock on entries".to_string())
})?;
let mut entries = self
.entries
.write()
.map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?;
if entries.remove(&key.to_string()).is_none() {
// Key was not in the map, could be an inconsistency or already deleted.
// This is not necessarily an error if the file deletion succeeded or was NotFound.
debug!(
"Key {} not found in entries map during del, might have been already removed.",
key
);
debug!("Key {} not found in entries map during del, might have been already removed.", key);
}
debug!("Deleted event from store: {}", key.to_string());
Ok(())
@@ -492,7 +478,6 @@ where
}
fn boxed_clone(&self) -> Box<dyn Store<T, Error = Self::Error, Key = Self::Key> + Send + Sync> {
Box::new(self.clone())
as Box<dyn Store<T, Error = Self::Error, Key = Self::Key> + Send + Sync>
Box::new(self.clone()) as Box<dyn Store<T, Error = Self::Error, Key = Self::Key> + Send + Sync>
}
}

View File

@@ -1,13 +1,13 @@
use crate::{
error::TargetError, integration::NotificationMetrics,
Event, StoreError,
error::TargetError,
integration::NotificationMetrics,
store::{Key, Store},
target::Target,
Event,
StoreError,
};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, Semaphore};
use tokio::sync::{Semaphore, mpsc};
use tokio::time::sleep;
use tracing::{debug, error, info, warn};

View File

@@ -1,22 +1,22 @@
use crate::store::{Key, STORE_EXTENSION};
use crate::target::ChannelTargetType;
use crate::{
arn::TargetID, error::TargetError,
StoreError, Target,
arn::TargetID,
error::TargetError,
event::{Event, EventLog},
store::Store,
StoreError,
Target,
};
use async_trait::async_trait;
use rumqttc::{mqttbytes::Error as MqttBytesError, ConnectionError};
use rumqttc::{AsyncClient, EventLoop, MqttOptions, Outgoing, Packet, QoS};
use rumqttc::{ConnectionError, mqttbytes::Error as MqttBytesError};
use std::sync::Arc;
use std::{
path::PathBuf,
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use tokio::sync::{mpsc, Mutex, OnceCell};
use tokio::sync::{Mutex, OnceCell, mpsc};
use tracing::{debug, error, info, instrument, trace, warn};
use url::Url;
use urlencoding;
@@ -58,24 +58,19 @@ impl MQTTArgs {
match self.broker.scheme() {
"ws" | "wss" | "tcp" | "ssl" | "tls" | "tcps" | "mqtt" | "mqtts" => {}
_ => {
return Err(TargetError::Configuration(
"unknown protocol in broker address".to_string(),
));
return Err(TargetError::Configuration("unknown protocol in broker address".to_string()));
}
}
if !self.queue_dir.is_empty() {
let path = std::path::Path::new(&self.queue_dir);
if !path.is_absolute() {
return Err(TargetError::Configuration(
"mqtt queueDir path should be absolute".to_string(),
));
return Err(TargetError::Configuration("mqtt queueDir path should be absolute".to_string()));
}
if self.qos == QoS::AtMostOnce {
return Err(TargetError::Configuration(
"QoS should be AtLeastOnce (1) or ExactlyOnce (2) if queueDir is set"
.to_string(),
"QoS should be AtLeastOnce (1) or ExactlyOnce (2) if queueDir is set".to_string(),
));
}
}
@@ -107,21 +102,12 @@ impl MQTTTarget {
let target_id = TargetID::new(id.clone(), ChannelTargetType::Mqtt.as_str().to_string());
let queue_store = if !args.queue_dir.is_empty() {
let base_path = PathBuf::from(&args.queue_dir);
let unique_dir_name = format!(
"rustfs-{}-{}-{}",
ChannelTargetType::Mqtt.as_str(),
target_id.name,
target_id.id
)
.replace(":", "_");
let unique_dir_name =
format!("rustfs-{}-{}-{}", ChannelTargetType::Mqtt.as_str(), target_id.name, target_id.id).replace(":", "_");
// Ensure the directory name is valid for filesystem
let specific_queue_path = base_path.join(unique_dir_name);
debug!(target_id = %target_id, path = %specific_queue_path.display(), "Initializing queue store for MQTT target");
let store = crate::store::QueueStore::<Event>::new(
specific_queue_path,
args.queue_limit,
STORE_EXTENSION,
);
let store = crate::store::QueueStore::<Event>::new(specific_queue_path, args.queue_limit, STORE_EXTENSION);
if let Err(e) = store.open() {
error!(
target_id = %target_id,
@@ -130,10 +116,7 @@ impl MQTTTarget {
);
return Err(TargetError::Storage(format!("{}", e)));
}
Some(Box::new(store)
as Box<
dyn Store<Event, Error = StoreError, Key = Key> + Send + Sync,
>)
Some(Box::new(store) as Box<dyn Store<Event, Error = StoreError, Key = Key> + Send + Sync>)
} else {
None
};
@@ -175,18 +158,13 @@ impl MQTTTarget {
debug!(target_id = %target_id_clone, "Initializing MQTT background task.");
let host = args_clone.broker.host_str().unwrap_or("localhost");
let port = args_clone.broker.port().unwrap_or(1883);
let mut mqtt_options = MqttOptions::new(
format!("rustfs_notify_{}", uuid::Uuid::new_v4()),
host,
port,
);
let mut mqtt_options = MqttOptions::new(format!("rustfs_notify_{}", uuid::Uuid::new_v4()), host, port);
mqtt_options
.set_keep_alive(args_clone.keep_alive)
.set_max_packet_size(100 * 1024 * 1024, 100 * 1024 * 1024); // 100MB
if !args_clone.username.is_empty() {
mqtt_options
.set_credentials(args_clone.username.clone(), args_clone.password.clone());
mqtt_options.set_credentials(args_clone.username.clone(), args_clone.password.clone());
}
let (new_client, eventloop) = AsyncClient::new(mqtt_options, 10);
@@ -206,12 +184,8 @@ impl MQTTTarget {
*client_arc.lock().await = Some(new_client.clone());
info!(target_id = %target_id_clone, "Spawning MQTT event loop task.");
let task_handle = tokio::spawn(run_mqtt_event_loop(
eventloop,
connected_arc.clone(),
target_id_clone.clone(),
cancel_rx,
));
let task_handle =
tokio::spawn(run_mqtt_event_loop(eventloop, connected_arc.clone(), target_id_clone.clone(), cancel_rx));
Ok(task_handle)
})
.await
@@ -266,17 +240,13 @@ impl MQTTTarget {
records: vec![event.clone()],
};
let data = serde_json::to_vec(&log)
.map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {}", e)))?;
let data =
serde_json::to_vec(&log).map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {}", e)))?;
// Vec<u8> Convert to String, only for printing logs
let data_string = String::from_utf8(data.clone()).map_err(|e| {
TargetError::Encoding(format!("Failed to convert event data to UTF-8: {}", e))
})?;
debug!(
"Sending event to mqtt target: {}, event log: {}",
self.id, data_string
);
let data_string = String::from_utf8(data.clone())
.map_err(|e| TargetError::Encoding(format!("Failed to convert event data to UTF-8: {}", e)))?;
debug!("Sending event to mqtt target: {}, event log: {}", self.id, data_string);
client
.publish(&self.args.topic, self.args.qos, false, data)
@@ -474,9 +444,7 @@ impl Target for MQTTTarget {
if let Some(handle) = self.bg_task_manager.init_cell.get() {
if handle.is_finished() {
error!(target_id = %self.id, "MQTT background task has finished, possibly due to an error. Target is not active.");
return Err(TargetError::Network(
"MQTT background task terminated".to_string(),
));
return Err(TargetError::Network("MQTT background task terminated".to_string()));
}
}
debug!(target_id = %self.id, "MQTT client not yet initialized or task not running/connected.");
@@ -507,10 +475,7 @@ impl Target for MQTTTarget {
}
Err(e) => {
error!(target_id = %self.id, error = %e, "Failed to save event to store");
return Err(TargetError::Storage(format!(
"Failed to save event to store: {}",
e
)));
return Err(TargetError::Storage(format!("Failed to save event to store: {}", e)));
}
}
} else {
@@ -581,10 +546,7 @@ impl Target for MQTTTarget {
error = %e,
"Failed to get event from store"
);
return Err(TargetError::Storage(format!(
"Failed to get event from store: {}",
e
)));
return Err(TargetError::Storage(format!("Failed to get event from store: {}", e)));
}
};
@@ -608,10 +570,7 @@ impl Target for MQTTTarget {
}
Err(e) => {
error!(target_id = %self.id, error = %e, "Failed to delete event from store after send.");
return Err(TargetError::Storage(format!(
"Failed to delete event from store: {}",
e
)));
return Err(TargetError::Storage(format!("Failed to delete event from store: {}", e)));
}
}

View File

@@ -1,19 +1,19 @@
use crate::store::STORE_EXTENSION;
use crate::target::ChannelTargetType;
use crate::{
arn::TargetID, error::TargetError,
StoreError, Target,
arn::TargetID,
error::TargetError,
event::{Event, EventLog},
store::{Key, Store},
StoreError,
Target,
};
use async_trait::async_trait;
use reqwest::{Client, StatusCode, Url};
use std::{
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};

View File

@@ -1,7 +1,7 @@
/// audit related metric descriptors
///
/// This module contains the metric descriptors for the audit subsystem.
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
const TARGET_ID: &str = "target_id";

View File

@@ -1,5 +1,5 @@
/// bucket level s3 metric descriptor
use crate::metrics::{new_counter_md, new_gauge_md, new_histogram_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, new_histogram_md, subsystems};
lazy_static::lazy_static! {
pub static ref BUCKET_API_TRAFFIC_SENT_BYTES_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Bucket copy metric descriptor
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
/// Bucket level replication metric descriptor
pub const BUCKET_L: &str = "bucket";

View File

@@ -1,5 +1,5 @@
/// Metric descriptors related to cluster configuration
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref CONFIG_RRS_PARITY_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Erasure code set related metric descriptors
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
/// The label for the pool ID
pub const POOL_ID_L: &str = "pool_id";

View File

@@ -1,5 +1,5 @@
/// Cluster health-related metric descriptors
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref HEALTH_DRIVES_OFFLINE_COUNT_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// IAM related metric descriptors
use crate::metrics::{new_counter_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems};
lazy_static::lazy_static! {
pub static ref LAST_SYNC_DURATION_MILLIS_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Notify the relevant metric descriptor
use crate::metrics::{new_counter_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems};
lazy_static::lazy_static! {
pub static ref NOTIFICATION_CURRENT_SEND_IN_PROGRESS_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Descriptors of metrics related to cluster object and bucket usage
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
/// Bucket labels
pub const BUCKET_LABEL: &str = "bucket";

View File

@@ -1,5 +1,5 @@
/// ILM-related metric descriptors
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref ILM_EXPIRY_PENDING_TASKS_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// A descriptor for metrics related to webhook logs
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
/// Define label constants for webhook metrics
/// name label

View File

@@ -23,6 +23,6 @@ pub use entry::descriptor::MetricDescriptor;
pub use entry::metric_name::MetricName;
pub use entry::metric_type::MetricType;
pub use entry::namespace::MetricNamespace;
pub use entry::subsystem::subsystems;
pub use entry::subsystem::MetricSubsystem;
pub use entry::subsystem::subsystems;
pub use entry::{new_counter_md, new_gauge_md, new_histogram_md};

View File

@@ -1,5 +1,5 @@
/// Copy the relevant metric descriptor
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref REPLICATION_AVERAGE_ACTIVE_WORKERS_MD: MetricDescriptor =

View File

@@ -1,4 +1,4 @@
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName, MetricSubsystem};
use crate::metrics::{MetricDescriptor, MetricName, MetricSubsystem, new_counter_md, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref API_REJECTED_AUTH_TOTAL_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Scanner-related metric descriptors
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref SCANNER_BUCKET_SCANS_FINISHED_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// CPU system-related metric descriptors
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref SYS_CPU_AVG_IDLE_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Drive-related metric descriptors
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
/// drive related labels
pub const DRIVE_LABEL: &str = "drive";

View File

@@ -1,5 +1,5 @@
/// Memory-related metric descriptors
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref MEM_TOTAL_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Network-related metric descriptors
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref INTERNODE_ERRORS_TOTAL_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// process related metric descriptors
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref PROCESS_LOCKS_READ_TOTAL_MD: MetricDescriptor =

View File

@@ -1,19 +1,19 @@
use crate::OtelConfig;
use flexi_logger::{style, Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode};
use flexi_logger::{Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode, style};
use nu_ansi_term::Color;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry::{KeyValue, global};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::{
Resource,
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
Resource,
};
use opentelemetry_semantic_conventions::{
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
SCHEMA_URL,
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_DIR, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO,
@@ -28,7 +28,7 @@ use tracing_error::ErrorLayer;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::fmt::time::LocalTime;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt};
/// A guard object that manages the lifecycle of OpenTelemetry components.
///

View File

@@ -1,4 +1,4 @@
mod user_agent;
pub use user_agent::get_user_agent;
pub use user_agent::ServiceType;
pub use user_agent::get_user_agent;

View File

@@ -100,7 +100,7 @@ impl UserAgent {
fn get_macos_platform(_sys: &System) -> String {
let binding = System::os_version().unwrap_or("14.5.0".to_string());
let version = binding.split('.').collect::<Vec<&str>>();
let major = version.get(0).unwrap_or(&"14").to_string();
let major = version.first().unwrap_or(&"14").to_string();
let minor = version.get(1).unwrap_or(&"5").to_string();
let patch = version.get(2).unwrap_or(&"0").to_string();

View File

@@ -11,9 +11,7 @@ rust-version.workspace = true
workspace = true
[features]
default = ["reed-solomon-simd"]
reed-solomon-simd = []
reed-solomon-erasure = []
default = []
[dependencies]
rustfs-config = { workspace = true, features = ["constants"] }
@@ -40,7 +38,6 @@ http.workspace = true
highway = { workspace = true }
url.workspace = true
uuid = { workspace = true, features = ["v4", "fast-rng", "serde"] }
reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] }
reed-solomon-simd = { version = "3.0.0" }
transform-stream = "0.3.1"
lazy_static.workspace = true

View File

@@ -1,38 +1,15 @@
# ECStore - Erasure Coding Storage
ECStore provides erasure coding functionality for the RustFS project, supporting multiple Reed-Solomon implementations for optimal performance and compatibility.
ECStore provides erasure coding functionality for the RustFS project, using high-performance Reed-Solomon SIMD implementation for optimal performance.
## Reed-Solomon Implementations
## Reed-Solomon Implementation
### Available Backends
### SIMD Backend (Only)
#### `reed-solomon-erasure` (Default)
- **Stability**: Mature and well-tested implementation
- **Performance**: Good performance with SIMD acceleration when available
- **Compatibility**: Works with any shard size
- **Memory**: Efficient memory usage
- **Use case**: Recommended for production use
#### `reed-solomon-simd` (Optional)
- **Performance**: Optimized SIMD implementation for maximum speed
- **Limitations**: Has restrictions on shard sizes (must be >= 64 bytes typically)
- **Memory**: May use more memory for small shards
- **Use case**: Best for large data blocks where performance is critical
### Feature Flags
Configure the Reed-Solomon implementation using Cargo features:
```toml
# Use default implementation (reed-solomon-erasure)
ecstore = "0.0.1"
# Use SIMD implementation for maximum performance
ecstore = { version = "0.0.1", features = ["reed-solomon-simd"], default-features = false }
# Use traditional implementation explicitly
ecstore = { version = "0.0.1", features = ["reed-solomon-erasure"], default-features = false }
```
- **Performance**: Uses SIMD optimization for high-performance encoding/decoding
- **Compatibility**: Works with any shard size through SIMD implementation
- **Reliability**: High-performance SIMD implementation for large data processing
- **Use case**: Optimized for maximum performance in large data processing scenarios
### Usage Example
@@ -68,42 +45,52 @@ assert_eq!(&recovered, data);
## Performance Considerations
### When to use `reed-solomon-simd`
- Large block sizes (>= 1KB recommended)
- High-throughput scenarios
- CPU-intensive workloads where encoding/decoding is the bottleneck
### When to use `reed-solomon-erasure`
- Small block sizes
- Memory-constrained environments
- General-purpose usage
- Production deployments requiring maximum stability
### SIMD Implementation Benefits
- **High Throughput**: Optimized for large block sizes (>= 1KB recommended)
- **CPU Optimization**: Leverages modern CPU SIMD instructions
- **Scalability**: Excellent performance for high-throughput scenarios
### Implementation Details
#### `reed-solomon-erasure`
- **Instance Reuse**: The encoder instance is cached and reused across multiple operations
- **Thread Safety**: Thread-safe with interior mutability
- **Memory Efficiency**: Lower memory footprint for small data
#### `reed-solomon-simd`
- **Instance Creation**: New encoder/decoder instances are created for each operation
- **API Design**: The SIMD implementation's API is designed for single-use instances
- **Performance Trade-off**: While instances are created per operation, the SIMD optimizations provide significant performance benefits for large data blocks
- **Optimization**: Future versions may implement instance pooling if the underlying API supports reuse
- **Instance Caching**: Encoder/decoder instances are cached and reused for optimal performance
- **Thread Safety**: Thread-safe with RwLock-based caching
- **SIMD Optimization**: Leverages CPU SIMD instructions for maximum performance
- **Reset Capability**: Cached instances are reset for different parameters, avoiding unnecessary allocations
### Performance Tips
1. **Batch Operations**: When possible, batch multiple small operations into larger blocks
2. **Block Size Optimization**: Use block sizes that are multiples of 64 bytes for SIMD implementations
2. **Block Size Optimization**: Use block sizes that are multiples of 64 bytes for optimal SIMD performance
3. **Memory Allocation**: Pre-allocate buffers when processing multiple blocks
4. **Feature Selection**: Choose the appropriate feature based on your data size and performance requirements
4. **Cache Warming**: Initial operations may be slower due to cache setup, subsequent operations benefit from caching
## Cross-Platform Compatibility
Both implementations support:
- x86_64 with SIMD acceleration
- aarch64 (ARM64) with optimizations
The SIMD implementation supports:
- x86_64 with advanced SIMD instructions (AVX2, SSE)
- aarch64 (ARM64) with NEON SIMD optimizations
- Other architectures with fallback implementations
The `reed-solomon-erasure` implementation provides better cross-platform compatibility and is recommended for most use cases.
The implementation automatically selects the best available SIMD instructions for the target platform, providing optimal performance across different architectures.
## Testing and Benchmarking
Run performance benchmarks:
```bash
# Run erasure coding benchmarks
cargo bench --bench erasure_benchmark
# Run comparison benchmarks
cargo bench --bench comparison_benchmark
# Generate benchmark reports
./run_benchmarks.sh
```
## Error Handling
All operations return `Result` types with comprehensive error information:
- Encoding errors: Invalid parameters, insufficient memory
- Decoding errors: Too many missing shards, corrupted data
- Configuration errors: Invalid shard counts, unsupported parameters

View File

@@ -1,29 +1,28 @@
//! 专门比较 Pure Erasure 和 Hybrid (SIMD) 模式性能的基准测试
//! Reed-Solomon SIMD performance analysis benchmarks
//!
//! 这个基准测试使用不同的feature编译配置来直接对比两种实现的性能。
//! This benchmark analyzes the performance characteristics of the SIMD Reed-Solomon implementation
//! across different data sizes, shard configurations, and usage patterns.
//!
//! ## 运行比较测试
//! ## Running Performance Analysis
//!
//! ```bash
//! # 测试 Pure Erasure 实现 (默认)
//! # Run all SIMD performance tests
//! cargo bench --bench comparison_benchmark
//!
//! # 测试 Hybrid (SIMD) 实现
//! cargo bench --bench comparison_benchmark --features reed-solomon-simd
//! # Generate detailed performance report
//! cargo bench --bench comparison_benchmark -- --save-baseline simd_analysis
//!
//! # 测试强制 erasure-only 模式
//! cargo bench --bench comparison_benchmark --features reed-solomon-erasure
//!
//! # 生成对比报告
//! cargo bench --bench comparison_benchmark -- --save-baseline erasure
//! cargo bench --bench comparison_benchmark --features reed-solomon-simd -- --save-baseline hybrid
//! # Run specific test categories
//! cargo bench --bench comparison_benchmark encode_analysis
//! cargo bench --bench comparison_benchmark decode_analysis
//! cargo bench --bench comparison_benchmark shard_analysis
//! ```
use criterion::{BenchmarkId, Criterion, Throughput, black_box, criterion_group, criterion_main};
use ecstore::erasure_coding::Erasure;
use std::time::Duration;
/// 基准测试数据配置
/// Performance test data configuration
struct TestData {
data: Vec<u8>,
size_name: &'static str,
@@ -36,41 +35,41 @@ impl TestData {
}
}
/// 生成不同大小的测试数据集
/// Generate different sized test datasets for performance analysis
fn generate_test_datasets() -> Vec<TestData> {
vec![
TestData::new(1024, "1KB"), // 小数据
TestData::new(8 * 1024, "8KB"), // 中小数据
TestData::new(64 * 1024, "64KB"), // 中等数据
TestData::new(256 * 1024, "256KB"), // 中大数据
TestData::new(1024 * 1024, "1MB"), // 大数据
TestData::new(4 * 1024 * 1024, "4MB"), // 超大数据
TestData::new(1024, "1KB"), // Small data
TestData::new(8 * 1024, "8KB"), // Medium-small data
TestData::new(64 * 1024, "64KB"), // Medium data
TestData::new(256 * 1024, "256KB"), // Medium-large data
TestData::new(1024 * 1024, "1MB"), // Large data
TestData::new(4 * 1024 * 1024, "4MB"), // Extra large data
]
}
/// 编码性能比较基准测试
fn bench_encode_comparison(c: &mut Criterion) {
/// SIMD encoding performance analysis
fn bench_encode_analysis(c: &mut Criterion) {
let datasets = generate_test_datasets();
let configs = vec![
(4, 2, "4+2"), // 常用配置
(6, 3, "6+3"), // 50%冗余
(8, 4, "8+4"), // 50%冗余,更多分片
(4, 2, "4+2"), // Common configuration
(6, 3, "6+3"), // 50% redundancy
(8, 4, "8+4"), // 50% redundancy, more shards
];
for dataset in &datasets {
for (data_shards, parity_shards, config_name) in &configs {
let test_name = format!("{}_{}_{}", dataset.size_name, config_name, get_implementation_name());
let test_name = format!("{}_{}_{}", dataset.size_name, config_name, "simd");
let mut group = c.benchmark_group("encode_comparison");
let mut group = c.benchmark_group("encode_analysis");
group.throughput(Throughput::Bytes(dataset.data.len() as u64));
group.sample_size(20);
group.measurement_time(Duration::from_secs(10));
// 检查是否能够创建erasure实例某些配置在纯SIMD模式下可能失败
// Test SIMD encoding performance
match Erasure::new(*data_shards, *parity_shards, dataset.data.len()).encode_data(&dataset.data) {
Ok(_) => {
group.bench_with_input(
BenchmarkId::new("implementation", &test_name),
BenchmarkId::new("simd_encode", &test_name),
&(&dataset.data, *data_shards, *parity_shards),
|b, (data, data_shards, parity_shards)| {
let erasure = Erasure::new(*data_shards, *parity_shards, data.len());
@@ -82,7 +81,7 @@ fn bench_encode_comparison(c: &mut Criterion) {
);
}
Err(e) => {
println!("⚠️ 跳过测试 {} - 配置不支持: {}", test_name, e);
println!("⚠️ Skipping test {} - configuration not supported: {}", test_name, e);
}
}
group.finish();
@@ -90,35 +89,35 @@ fn bench_encode_comparison(c: &mut Criterion) {
}
}
/// 解码性能比较基准测试
fn bench_decode_comparison(c: &mut Criterion) {
/// SIMD decoding performance analysis
fn bench_decode_analysis(c: &mut Criterion) {
let datasets = generate_test_datasets();
let configs = vec![(4, 2, "4+2"), (6, 3, "6+3"), (8, 4, "8+4")];
for dataset in &datasets {
for (data_shards, parity_shards, config_name) in &configs {
let test_name = format!("{}_{}_{}", dataset.size_name, config_name, get_implementation_name());
let test_name = format!("{}_{}_{}", dataset.size_name, config_name, "simd");
let erasure = Erasure::new(*data_shards, *parity_shards, dataset.data.len());
// 预先编码数据 - 检查是否支持此配置
// Pre-encode data - check if this configuration is supported
match erasure.encode_data(&dataset.data) {
Ok(encoded_shards) => {
let mut group = c.benchmark_group("decode_comparison");
let mut group = c.benchmark_group("decode_analysis");
group.throughput(Throughput::Bytes(dataset.data.len() as u64));
group.sample_size(20);
group.measurement_time(Duration::from_secs(10));
group.bench_with_input(
BenchmarkId::new("implementation", &test_name),
BenchmarkId::new("simd_decode", &test_name),
&(&encoded_shards, *data_shards, *parity_shards),
|b, (shards, data_shards, parity_shards)| {
let erasure = Erasure::new(*data_shards, *parity_shards, dataset.data.len());
b.iter(|| {
// 模拟最大可恢复的数据丢失
// Simulate maximum recoverable data loss
let mut shards_opt: Vec<Option<Vec<u8>>> =
shards.iter().map(|shard| Some(shard.to_vec())).collect();
// 丢失等于奇偶校验分片数量的分片
// Lose up to parity_shards number of shards
for item in shards_opt.iter_mut().take(*parity_shards) {
*item = None;
}
@@ -131,33 +130,33 @@ fn bench_decode_comparison(c: &mut Criterion) {
group.finish();
}
Err(e) => {
println!("⚠️ 跳过解码测试 {} - 配置不支持: {}", test_name, e);
println!("⚠️ Skipping decode test {} - configuration not supported: {}", test_name, e);
}
}
}
}
}
/// 分片大小敏感性测试
fn bench_shard_size_sensitivity(c: &mut Criterion) {
/// Shard size sensitivity analysis for SIMD optimization
fn bench_shard_size_analysis(c: &mut Criterion) {
let data_shards = 4;
let parity_shards = 2;
// 测试不同的分片大小特别关注SIMD的临界点
// Test different shard sizes, focusing on SIMD optimization thresholds
let shard_sizes = vec![32, 64, 128, 256, 512, 1024, 2048, 4096, 8192];
let mut group = c.benchmark_group("shard_size_sensitivity");
let mut group = c.benchmark_group("shard_size_analysis");
group.sample_size(15);
group.measurement_time(Duration::from_secs(8));
for shard_size in shard_sizes {
let total_size = shard_size * data_shards;
let data = (0..total_size).map(|i| (i % 256) as u8).collect::<Vec<u8>>();
let test_name = format!("{}B_shard_{}", shard_size, get_implementation_name());
let test_name = format!("{}B_shard_simd", shard_size);
group.throughput(Throughput::Bytes(total_size as u64));
// 检查此分片大小是否支持
// Check if this shard size is supported
let erasure = Erasure::new(data_shards, parity_shards, data.len());
match erasure.encode_data(&data) {
Ok(_) => {
@@ -170,15 +169,15 @@ fn bench_shard_size_sensitivity(c: &mut Criterion) {
});
}
Err(e) => {
println!("⚠️ 跳过分片大小测试 {} - 不支持: {}", test_name, e);
println!("⚠️ Skipping shard size test {} - not supported: {}", test_name, e);
}
}
}
group.finish();
}
/// 高负载并发测试
fn bench_concurrent_load(c: &mut Criterion) {
/// High-load concurrent performance analysis
fn bench_concurrent_analysis(c: &mut Criterion) {
use std::sync::Arc;
use std::thread;
@@ -186,14 +185,14 @@ fn bench_concurrent_load(c: &mut Criterion) {
let data = Arc::new((0..data_size).map(|i| (i % 256) as u8).collect::<Vec<u8>>());
let erasure = Arc::new(Erasure::new(4, 2, data_size));
let mut group = c.benchmark_group("concurrent_load");
let mut group = c.benchmark_group("concurrent_analysis");
group.throughput(Throughput::Bytes(data_size as u64));
group.sample_size(10);
group.measurement_time(Duration::from_secs(15));
let test_name = format!("1MB_concurrent_{}", get_implementation_name());
let test_name = "1MB_concurrent_simd";
group.bench_function(&test_name, |b| {
group.bench_function(test_name, |b| {
b.iter(|| {
let handles: Vec<_> = (0..4)
.map(|_| {
@@ -214,42 +213,44 @@ fn bench_concurrent_load(c: &mut Criterion) {
group.finish();
}
/// 错误恢复能力测试
fn bench_error_recovery_performance(c: &mut Criterion) {
let data_size = 256 * 1024; // 256KB
/// Error recovery performance analysis
fn bench_error_recovery_analysis(c: &mut Criterion) {
let data_size = 512 * 1024; // 512KB
let data = (0..data_size).map(|i| (i % 256) as u8).collect::<Vec<u8>>();
let configs = vec![
(4, 2, 1), // 丢失1个分片
(4, 2, 2), // 丢失2个分片最大可恢复
(6, 3, 2), // 丢失2个分片
(6, 3, 3), // 丢失3个分片最大可恢复
(8, 4, 3), // 丢失3个分片
(8, 4, 4), // 丢失4个分片最大可恢复
// Test different error recovery scenarios
let scenarios = vec![
(4, 2, 1, "single_loss"), // Lose 1 shard
(4, 2, 2, "double_loss"), // Lose 2 shards (maximum)
(6, 3, 1, "single_loss_6_3"), // Lose 1 shard with 6+3
(6, 3, 3, "triple_loss_6_3"), // Lose 3 shards (maximum)
(8, 4, 2, "double_loss_8_4"), // Lose 2 shards with 8+4
(8, 4, 4, "quad_loss_8_4"), // Lose 4 shards (maximum)
];
let mut group = c.benchmark_group("error_recovery");
let mut group = c.benchmark_group("error_recovery_analysis");
group.throughput(Throughput::Bytes(data_size as u64));
group.sample_size(15);
group.measurement_time(Duration::from_secs(8));
group.measurement_time(Duration::from_secs(10));
for (data_shards, parity_shards, lost_shards) in configs {
for (data_shards, parity_shards, loss_count, scenario_name) in scenarios {
let erasure = Erasure::new(data_shards, parity_shards, data_size);
let test_name = format!("{}+{}_lost{}_{}", data_shards, parity_shards, lost_shards, get_implementation_name());
// 检查此配置是否支持
match erasure.encode_data(&data) {
Ok(encoded_shards) => {
let test_name = format!("{}+{}_{}", data_shards, parity_shards, scenario_name);
group.bench_with_input(
BenchmarkId::new("recovery", &test_name),
&(&encoded_shards, data_shards, parity_shards, lost_shards),
|b, (shards, data_shards, parity_shards, lost_shards)| {
&(&encoded_shards, data_shards, parity_shards, loss_count),
|b, (shards, data_shards, parity_shards, loss_count)| {
let erasure = Erasure::new(*data_shards, *parity_shards, data_size);
b.iter(|| {
// Simulate specific number of shard losses
let mut shards_opt: Vec<Option<Vec<u8>>> = shards.iter().map(|shard| Some(shard.to_vec())).collect();
// 丢失指定数量的分片
for item in shards_opt.iter_mut().take(*lost_shards) {
// Lose the specified number of shards
for item in shards_opt.iter_mut().take(*loss_count) {
*item = None;
}
@@ -260,71 +261,57 @@ fn bench_error_recovery_performance(c: &mut Criterion) {
);
}
Err(e) => {
println!("⚠️ 跳过错误恢复测试 {} - 配置不支持: {}", test_name, e);
println!("⚠️ Skipping recovery test {}: {}", scenario_name, e);
}
}
}
group.finish();
}
/// 内存效率测试
fn bench_memory_efficiency(c: &mut Criterion) {
let data_shards = 4;
let parity_shards = 2;
let data_size = 1024 * 1024; // 1MB
/// Memory efficiency analysis
fn bench_memory_analysis(c: &mut Criterion) {
let data_sizes = vec![64 * 1024, 256 * 1024, 1024 * 1024]; // 64KB, 256KB, 1MB
let config = (4, 2); // 4+2 configuration
let mut group = c.benchmark_group("memory_efficiency");
group.throughput(Throughput::Bytes(data_size as u64));
group.sample_size(10);
let mut group = c.benchmark_group("memory_analysis");
group.sample_size(15);
group.measurement_time(Duration::from_secs(8));
let test_name = format!("memory_pattern_{}", get_implementation_name());
for data_size in data_sizes {
let data = (0..data_size).map(|i| (i % 256) as u8).collect::<Vec<u8>>();
let size_name = format!("{}KB", data_size / 1024);
// 测试连续多次编码对内存的影响
group.bench_function(format!("{}_continuous", test_name), |b| {
let erasure = Erasure::new(data_shards, parity_shards, data_size);
b.iter(|| {
for i in 0..10 {
let data = vec![(i % 256) as u8; data_size];
let shards = erasure.encode_data(black_box(&data)).unwrap();
group.throughput(Throughput::Bytes(data_size as u64));
// Test instance reuse vs new instance creation
group.bench_with_input(BenchmarkId::new("reuse_instance", &size_name), &data, |b, data| {
let erasure = Erasure::new(config.0, config.1, data.len());
b.iter(|| {
let shards = erasure.encode_data(black_box(data)).unwrap();
black_box(shards);
}
});
});
});
// 测试大量小编码任务
group.bench_function(format!("{}_small_chunks", test_name), |b| {
let chunk_size = 1024; // 1KB chunks
let erasure = Erasure::new(data_shards, parity_shards, chunk_size);
b.iter(|| {
for i in 0..1024 {
let data = vec![(i % 256) as u8; chunk_size];
let shards = erasure.encode_data(black_box(&data)).unwrap();
group.bench_with_input(BenchmarkId::new("new_instance", &size_name), &data, |b, data| {
b.iter(|| {
let erasure = Erasure::new(config.0, config.1, data.len());
let shards = erasure.encode_data(black_box(data)).unwrap();
black_box(shards);
}
});
});
});
}
group.finish();
}
/// 获取当前实现的名称
fn get_implementation_name() -> &'static str {
#[cfg(feature = "reed-solomon-simd")]
return "hybrid";
#[cfg(not(feature = "reed-solomon-simd"))]
return "erasure";
}
// Benchmark group configuration
criterion_group!(
benches,
bench_encode_comparison,
bench_decode_comparison,
bench_shard_size_sensitivity,
bench_concurrent_load,
bench_error_recovery_performance,
bench_memory_efficiency
bench_encode_analysis,
bench_decode_analysis,
bench_shard_size_analysis,
bench_concurrent_analysis,
bench_error_recovery_analysis,
bench_memory_analysis
);
criterion_main!(benches);

View File

@@ -1,25 +1,23 @@
//! Reed-Solomon erasure coding performance benchmarks.
//! Reed-Solomon SIMD erasure coding performance benchmarks.
//!
//! This benchmark compares the performance of different Reed-Solomon implementations:
//! - Default (Pure erasure): Stable reed-solomon-erasure implementation
//! - `reed-solomon-simd` feature: SIMD mode with optimized performance
//! This benchmark tests the performance of the high-performance SIMD Reed-Solomon implementation.
//!
//! ## Running Benchmarks
//!
//! ```bash
//! # 运行所有基准测试
//! # Run all benchmarks
//! cargo bench
//!
//! # 运行特定的基准测试
//! # Run specific benchmark
//! cargo bench --bench erasure_benchmark
//!
//! # 生成HTML报告
//! # Generate HTML report
//! cargo bench --bench erasure_benchmark -- --output-format html
//!
//! # 只测试编码性能
//! # Test encoding performance only
//! cargo bench encode
//!
//! # 只测试解码性能
//! # Test decoding performance only
//! cargo bench decode
//! ```
//!
@@ -29,24 +27,24 @@
//! - Different data sizes: 1KB, 64KB, 1MB, 16MB
//! - Different erasure coding configurations: (4,2), (6,3), (8,4)
//! - Both encoding and decoding operations
//! - Small vs large shard scenarios for SIMD optimization
//! - SIMD optimization for different shard sizes
use criterion::{BenchmarkId, Criterion, Throughput, black_box, criterion_group, criterion_main};
use ecstore::erasure_coding::{Erasure, calc_shard_size};
use std::time::Duration;
/// 基准测试配置结构体
/// Benchmark configuration structure
#[derive(Clone, Debug)]
struct BenchConfig {
/// 数据分片数量
/// Number of data shards
data_shards: usize,
/// 奇偶校验分片数量
/// Number of parity shards
parity_shards: usize,
/// 测试数据大小(字节)
/// Test data size (bytes)
data_size: usize,
/// 块大小(字节)
/// Block size (bytes)
block_size: usize,
/// 配置名称
/// Configuration name
name: String,
}
@@ -62,27 +60,27 @@ impl BenchConfig {
}
}
/// 生成测试数据
/// Generate test data
fn generate_test_data(size: usize) -> Vec<u8> {
(0..size).map(|i| (i % 256) as u8).collect()
}
/// 基准测试: 编码性能对比
/// Benchmark: Encoding performance
fn bench_encode_performance(c: &mut Criterion) {
let configs = vec![
// 小数据量测试 - 1KB
// Small data tests - 1KB
BenchConfig::new(4, 2, 1024, 1024),
BenchConfig::new(6, 3, 1024, 1024),
BenchConfig::new(8, 4, 1024, 1024),
// 中等数据量测试 - 64KB
// Medium data tests - 64KB
BenchConfig::new(4, 2, 64 * 1024, 64 * 1024),
BenchConfig::new(6, 3, 64 * 1024, 64 * 1024),
BenchConfig::new(8, 4, 64 * 1024, 64 * 1024),
// 大数据量测试 - 1MB
// Large data tests - 1MB
BenchConfig::new(4, 2, 1024 * 1024, 1024 * 1024),
BenchConfig::new(6, 3, 1024 * 1024, 1024 * 1024),
BenchConfig::new(8, 4, 1024 * 1024, 1024 * 1024),
// 超大数据量测试 - 16MB
// Extra large data tests - 16MB
BenchConfig::new(4, 2, 16 * 1024 * 1024, 16 * 1024 * 1024),
BenchConfig::new(6, 3, 16 * 1024 * 1024, 16 * 1024 * 1024),
];
@@ -90,13 +88,13 @@ fn bench_encode_performance(c: &mut Criterion) {
for config in configs {
let data = generate_test_data(config.data_size);
// 测试当前默认实现通常是SIMD
let mut group = c.benchmark_group("encode_current");
// Test SIMD encoding performance
let mut group = c.benchmark_group("encode_simd");
group.throughput(Throughput::Bytes(config.data_size as u64));
group.sample_size(10);
group.measurement_time(Duration::from_secs(5));
group.bench_with_input(BenchmarkId::new("current_impl", &config.name), &(&data, &config), |b, (data, config)| {
group.bench_with_input(BenchmarkId::new("simd_impl", &config.name), &(&data, &config), |b, (data, config)| {
let erasure = Erasure::new(config.data_shards, config.parity_shards, config.block_size);
b.iter(|| {
let shards = erasure.encode_data(black_box(data)).unwrap();
@@ -105,99 +103,55 @@ fn bench_encode_performance(c: &mut Criterion) {
});
group.finish();
// 如果SIMD feature启用测试专用的erasure实现对比
#[cfg(feature = "reed-solomon-simd")]
{
use ecstore::erasure_coding::ReedSolomonEncoder;
// Test direct SIMD implementation for large shards (>= 512 bytes)
let shard_size = calc_shard_size(config.data_size, config.data_shards);
if shard_size >= 512 {
let mut simd_group = c.benchmark_group("encode_simd_direct");
simd_group.throughput(Throughput::Bytes(config.data_size as u64));
simd_group.sample_size(10);
simd_group.measurement_time(Duration::from_secs(5));
let mut erasure_group = c.benchmark_group("encode_erasure_only");
erasure_group.throughput(Throughput::Bytes(config.data_size as u64));
erasure_group.sample_size(10);
erasure_group.measurement_time(Duration::from_secs(5));
simd_group.bench_with_input(BenchmarkId::new("simd_direct", &config.name), &(&data, &config), |b, (data, config)| {
b.iter(|| {
// Direct SIMD implementation
let per_shard_size = calc_shard_size(data.len(), config.data_shards);
match reed_solomon_simd::ReedSolomonEncoder::new(config.data_shards, config.parity_shards, per_shard_size) {
Ok(mut encoder) => {
// Create properly sized buffer and fill with data
let mut buffer = vec![0u8; per_shard_size * config.data_shards];
let copy_len = data.len().min(buffer.len());
buffer[..copy_len].copy_from_slice(&data[..copy_len]);
erasure_group.bench_with_input(
BenchmarkId::new("erasure_impl", &config.name),
&(&data, &config),
|b, (data, config)| {
let encoder = ReedSolomonEncoder::new(config.data_shards, config.parity_shards).unwrap();
b.iter(|| {
// 创建编码所需的数据结构
let per_shard_size = calc_shard_size(data.len(), config.data_shards);
let total_size = per_shard_size * (config.data_shards + config.parity_shards);
let mut buffer = vec![0u8; total_size];
buffer[..data.len()].copy_from_slice(data);
let slices: smallvec::SmallVec<[&mut [u8]; 16]> = buffer.chunks_exact_mut(per_shard_size).collect();
encoder.encode(black_box(slices)).unwrap();
black_box(&buffer);
});
},
);
erasure_group.finish();
}
// 如果使用SIMD feature测试直接SIMD实现对比
#[cfg(feature = "reed-solomon-simd")]
{
// 只对大shard测试SIMD小于512字节的shard SIMD性能不佳
let shard_size = calc_shard_size(config.data_size, config.data_shards);
if shard_size >= 512 {
let mut simd_group = c.benchmark_group("encode_simd_direct");
simd_group.throughput(Throughput::Bytes(config.data_size as u64));
simd_group.sample_size(10);
simd_group.measurement_time(Duration::from_secs(5));
simd_group.bench_with_input(
BenchmarkId::new("simd_impl", &config.name),
&(&data, &config),
|b, (data, config)| {
b.iter(|| {
// 直接使用SIMD实现
let per_shard_size = calc_shard_size(data.len(), config.data_shards);
match reed_solomon_simd::ReedSolomonEncoder::new(
config.data_shards,
config.parity_shards,
per_shard_size,
) {
Ok(mut encoder) => {
// 创建正确大小的缓冲区,并填充数据
let mut buffer = vec![0u8; per_shard_size * config.data_shards];
let copy_len = data.len().min(buffer.len());
buffer[..copy_len].copy_from_slice(&data[..copy_len]);
// 按正确的分片大小添加数据分片
for chunk in buffer.chunks_exact(per_shard_size) {
encoder.add_original_shard(black_box(chunk)).unwrap();
}
let result = encoder.encode().unwrap();
black_box(result);
}
Err(_) => {
// SIMD不支持此配置跳过
black_box(());
}
// Add data shards with correct shard size
for chunk in buffer.chunks_exact(per_shard_size) {
encoder.add_original_shard(black_box(chunk)).unwrap();
}
});
},
);
simd_group.finish();
}
let result = encoder.encode().unwrap();
black_box(result);
}
Err(_) => {
// SIMD doesn't support this configuration, skip
black_box(());
}
}
});
});
simd_group.finish();
}
}
}
/// 基准测试: 解码性能对比
/// Benchmark: Decoding performance
fn bench_decode_performance(c: &mut Criterion) {
let configs = vec![
// 中等数据量测试 - 64KB
// Medium data tests - 64KB
BenchConfig::new(4, 2, 64 * 1024, 64 * 1024),
BenchConfig::new(6, 3, 64 * 1024, 64 * 1024),
// 大数据量测试 - 1MB
// Large data tests - 1MB
BenchConfig::new(4, 2, 1024 * 1024, 1024 * 1024),
BenchConfig::new(6, 3, 1024 * 1024, 1024 * 1024),
// 超大数据量测试 - 16MB
// Extra large data tests - 16MB
BenchConfig::new(4, 2, 16 * 1024 * 1024, 16 * 1024 * 1024),
];
@@ -205,25 +159,25 @@ fn bench_decode_performance(c: &mut Criterion) {
let data = generate_test_data(config.data_size);
let erasure = Erasure::new(config.data_shards, config.parity_shards, config.block_size);
// 预先编码数据
// Pre-encode data
let encoded_shards = erasure.encode_data(&data).unwrap();
// 测试当前默认实现的解码性能
let mut group = c.benchmark_group("decode_current");
// Test SIMD decoding performance
let mut group = c.benchmark_group("decode_simd");
group.throughput(Throughput::Bytes(config.data_size as u64));
group.sample_size(10);
group.measurement_time(Duration::from_secs(5));
group.bench_with_input(
BenchmarkId::new("current_impl", &config.name),
BenchmarkId::new("simd_impl", &config.name),
&(&encoded_shards, &config),
|b, (shards, config)| {
let erasure = Erasure::new(config.data_shards, config.parity_shards, config.block_size);
b.iter(|| {
// 模拟数据丢失 - 丢失一个数据分片和一个奇偶分片
// Simulate data loss - lose one data shard and one parity shard
let mut shards_opt: Vec<Option<Vec<u8>>> = shards.iter().map(|shard| Some(shard.to_vec())).collect();
// 丢失最后一个数据分片和第一个奇偶分片
// Lose last data shard and first parity shard
shards_opt[config.data_shards - 1] = None;
shards_opt[config.data_shards] = None;
@@ -234,58 +188,52 @@ fn bench_decode_performance(c: &mut Criterion) {
);
group.finish();
// 如果使用混合模式默认测试SIMD解码性能
#[cfg(not(feature = "reed-solomon-erasure"))]
{
let shard_size = calc_shard_size(config.data_size, config.data_shards);
if shard_size >= 512 {
let mut simd_group = c.benchmark_group("decode_simd_direct");
simd_group.throughput(Throughput::Bytes(config.data_size as u64));
simd_group.sample_size(10);
simd_group.measurement_time(Duration::from_secs(5));
// Test direct SIMD decoding for large shards
let shard_size = calc_shard_size(config.data_size, config.data_shards);
if shard_size >= 512 {
let mut simd_group = c.benchmark_group("decode_simd_direct");
simd_group.throughput(Throughput::Bytes(config.data_size as u64));
simd_group.sample_size(10);
simd_group.measurement_time(Duration::from_secs(5));
simd_group.bench_with_input(
BenchmarkId::new("simd_impl", &config.name),
&(&encoded_shards, &config),
|b, (shards, config)| {
b.iter(|| {
let per_shard_size = calc_shard_size(config.data_size, config.data_shards);
match reed_solomon_simd::ReedSolomonDecoder::new(
config.data_shards,
config.parity_shards,
per_shard_size,
) {
Ok(mut decoder) => {
// 添加可用的分片(除了丢失的)
for (i, shard) in shards.iter().enumerate() {
if i != config.data_shards - 1 && i != config.data_shards {
if i < config.data_shards {
decoder.add_original_shard(i, black_box(shard)).unwrap();
} else {
let recovery_idx = i - config.data_shards;
decoder.add_recovery_shard(recovery_idx, black_box(shard)).unwrap();
}
simd_group.bench_with_input(
BenchmarkId::new("simd_direct", &config.name),
&(&encoded_shards, &config),
|b, (shards, config)| {
b.iter(|| {
let per_shard_size = calc_shard_size(config.data_size, config.data_shards);
match reed_solomon_simd::ReedSolomonDecoder::new(config.data_shards, config.parity_shards, per_shard_size)
{
Ok(mut decoder) => {
// Add available shards (except lost ones)
for (i, shard) in shards.iter().enumerate() {
if i != config.data_shards - 1 && i != config.data_shards {
if i < config.data_shards {
decoder.add_original_shard(i, black_box(shard)).unwrap();
} else {
let recovery_idx = i - config.data_shards;
decoder.add_recovery_shard(recovery_idx, black_box(shard)).unwrap();
}
}
}
let result = decoder.decode().unwrap();
black_box(result);
}
Err(_) => {
// SIMD不支持此配置跳过
black_box(());
}
let result = decoder.decode().unwrap();
black_box(result);
}
});
},
);
simd_group.finish();
}
Err(_) => {
// SIMD doesn't support this configuration, skip
black_box(());
}
}
});
},
);
simd_group.finish();
}
}
}
/// 基准测试: 不同分片大小对性能的影响
/// Benchmark: Impact of different shard sizes on performance
fn bench_shard_size_impact(c: &mut Criterion) {
let shard_sizes = vec![64, 128, 256, 512, 1024, 2048, 4096, 8192];
let data_shards = 4;
@@ -301,8 +249,8 @@ fn bench_shard_size_impact(c: &mut Criterion) {
group.throughput(Throughput::Bytes(total_data_size as u64));
// 测试当前实现
group.bench_with_input(BenchmarkId::new("current", format!("shard_{}B", shard_size)), &data, |b, data| {
// Test SIMD implementation
group.bench_with_input(BenchmarkId::new("simd", format!("shard_{}B", shard_size)), &data, |b, data| {
let erasure = Erasure::new(data_shards, parity_shards, total_data_size);
b.iter(|| {
let shards = erasure.encode_data(black_box(data)).unwrap();
@@ -313,19 +261,19 @@ fn bench_shard_size_impact(c: &mut Criterion) {
group.finish();
}
/// 基准测试: 编码配置对性能的影响
/// Benchmark: Impact of coding configurations on performance
fn bench_coding_configurations(c: &mut Criterion) {
let configs = vec![
(2, 1), // 最小冗余
(3, 2), // 中等冗余
(4, 2), // 常用配置
(6, 3), // 50%冗余
(8, 4), // 50%冗余,更多分片
(10, 5), // 50%冗余,大量分片
(12, 6), // 50%冗余,更大量分片
(2, 1), // Minimal redundancy
(3, 2), // Medium redundancy
(4, 2), // Common configuration
(6, 3), // 50% redundancy
(8, 4), // 50% redundancy, more shards
(10, 5), // 50% redundancy, many shards
(12, 6), // 50% redundancy, very many shards
];
let data_size = 1024 * 1024; // 1MB测试数据
let data_size = 1024 * 1024; // 1MB test data
let data = generate_test_data(data_size);
let mut group = c.benchmark_group("coding_configurations");
@@ -347,17 +295,17 @@ fn bench_coding_configurations(c: &mut Criterion) {
group.finish();
}
/// 基准测试: 内存使用模式
/// Benchmark: Memory usage patterns
fn bench_memory_patterns(c: &mut Criterion) {
let data_shards = 4;
let parity_shards = 2;
let block_size = 1024 * 1024; // 1MB
let block_size = 1024 * 1024; // 1MB block
let mut group = c.benchmark_group("memory_patterns");
group.sample_size(10);
group.measurement_time(Duration::from_secs(5));
// 测试重复使用同一个Erasure实例
// Test reusing the same Erasure instance
group.bench_function("reuse_erasure_instance", |b| {
let erasure = Erasure::new(data_shards, parity_shards, block_size);
let data = generate_test_data(block_size);
@@ -368,7 +316,7 @@ fn bench_memory_patterns(c: &mut Criterion) {
});
});
// 测试每次创建新的Erasure实例
// Test creating new Erasure instance each time
group.bench_function("new_erasure_instance", |b| {
let data = generate_test_data(block_size);
@@ -382,7 +330,7 @@ fn bench_memory_patterns(c: &mut Criterion) {
group.finish();
}
// 基准测试组配置
// Benchmark group configuration
criterion_group!(
benches,
bench_encode_performance,

View File

@@ -1,54 +1,48 @@
#!/bin/bash
# Reed-Solomon 实现性能比较脚本
#
# 这个脚本将运行不同的基准测试来比较SIMD模式和纯Erasure模式的性能
#
# 使用方法:
# ./run_benchmarks.sh [quick|full|comparison]
#
# quick - 快速测试主要场景
# full - 完整基准测试套件
# comparison - 专门对比两种实现模式
# Reed-Solomon SIMD 性能基准测试脚本
# 使用高性能 SIMD 实现进行纠删码性能测试
set -e
# 颜色输出
# ANSI 颜色码
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
PURPLE='\033[0;35m'
NC='\033[0m' # No Color
# 输出带颜色的
# 打印带颜色的
print_info() {
echo -e "${BLUE}[INFO]${NC} $1"
echo -e "${BLUE} $1${NC}"
}
print_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
echo -e "${GREEN}$1${NC}"
}
print_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
echo -e "${YELLOW}⚠️ $1${NC}"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
echo -e "${RED}$1${NC}"
}
# 检查是否安装了必要工具
# 检查系统要求
check_requirements() {
print_info "检查系统要求..."
# 检查 Rust
if ! command -v cargo &> /dev/null; then
print_error "cargo 未安装,请先安装 Rust 工具链"
print_error "Cargo 未找到,请确保已安装 Rust"
exit 1
fi
# 检查是否安装了 criterion
if ! grep -q "criterion" Cargo.toml; then
print_error "Cargo.toml 中未找到 criterion 依赖"
# 检查 criterion
if ! cargo --list | grep -q "bench"; then
print_error "未找到基准测试支持,请确保使用的是支持基准测试的 Rust 版本"
exit 1
fi
@@ -62,28 +56,15 @@ cleanup() {
print_success "清理完成"
}
# 运行纯 Erasure 模式基准测试
run_erasure_benchmark() {
print_info "🏛️ 开始运行纯 Erasure 模式基准测试..."
echo "================================================"
cargo bench --bench comparison_benchmark \
--features reed-solomon-erasure \
-- --save-baseline erasure_baseline
print_success "纯 Erasure 模式基准测试完成"
}
# 运行SIMD模式基准测试
# 运行 SIMD 模式基准测试
run_simd_benchmark() {
print_info "🎯 开始运行SIMD模式基准测试..."
print_info "🎯 开始运行 SIMD 模式基准测试..."
echo "================================================"
cargo bench --bench comparison_benchmark \
--features reed-solomon-simd \
-- --save-baseline simd_baseline
print_success "SIMD模式基准测试完成"
print_success "SIMD 模式基准测试完成"
}
# 运行完整的基准测试套件
@@ -91,33 +72,42 @@ run_full_benchmark() {
print_info "🚀 开始运行完整基准测试套件..."
echo "================================================"
# 运行详细的基准测试使用默认纯Erasure模式
# 运行详细的基准测试
cargo bench --bench erasure_benchmark
print_success "完整基准测试套件完成"
}
# 运行性能对比测试
run_comparison_benchmark() {
print_info "📊 开始运行性能对比测试..."
# 运行性能测试
run_performance_test() {
print_info "📊 开始运行性能测试..."
echo "================================================"
print_info "步骤 1: 测试纯 Erasure 模式..."
print_info "步骤 1: 运行编码基准测试..."
cargo bench --bench comparison_benchmark \
--features reed-solomon-erasure \
-- --save-baseline erasure_baseline
-- encode --save-baseline encode_baseline
print_info "步骤 2: 测试SIMD模式并与 Erasure 模式对比..."
print_info "步骤 2: 运行解码基准测试..."
cargo bench --bench comparison_benchmark \
--features reed-solomon-simd \
-- --baseline erasure_baseline
-- decode --save-baseline decode_baseline
print_success "性能对比测试完成"
print_success "性能测试完成"
}
# 运行大数据集测试
run_large_data_test() {
print_info "🗂️ 开始运行大数据集测试..."
echo "================================================"
cargo bench --bench erasure_benchmark \
-- large_data --save-baseline large_data_baseline
print_success "大数据集测试完成"
}
# 生成比较报告
generate_comparison_report() {
print_info "📊 生成性能比较报告..."
print_info "📊 生成性能报告..."
if [ -d "target/criterion" ]; then
print_info "基准测试结果已保存到 target/criterion/ 目录"
@@ -138,49 +128,48 @@ generate_comparison_report() {
run_quick_test() {
print_info "🏃 运行快速性能测试..."
print_info "测试纯 Erasure 模式..."
print_info "测试 SIMD 编码性能..."
cargo bench --bench comparison_benchmark \
--features reed-solomon-erasure \
-- encode_comparison --quick
-- encode --quick
print_info "测试SIMD模式..."
print_info "测试 SIMD 解码性能..."
cargo bench --bench comparison_benchmark \
--features reed-solomon-simd \
-- encode_comparison --quick
-- decode --quick
print_success "快速测试完成"
}
# 显示帮助信息
show_help() {
echo "Reed-Solomon 性能基准测试脚本"
echo "Reed-Solomon SIMD 性能基准测试脚本"
echo ""
echo "实现模式:"
echo " 🏛️ 纯 Erasure 模式(默认)- 稳定兼容的 reed-solomon-erasure 实现"
echo " 🎯 SIMD模式 - 高性能SIMD优化实现"
echo " 🎯 SIMD 模式 - 高性能 SIMD 优化的 reed-solomon-simd 实现"
echo ""
echo "使用方法:"
echo " $0 [command]"
echo ""
echo "命令:"
echo " quick 运行快速性能测试"
echo " full 运行完整基准测试套件默认Erasure模式"
echo " comparison 运行详细的实现模式对比测试"
echo " erasure 只测试纯 Erasure 模式"
echo " simd 只测试SIMD模式"
echo " full 运行完整基准测试套件"
echo " performance 运行详细的性能测试"
echo " simd 运行 SIMD 模式测试"
echo " large 运行大数据集测试"
echo " clean 清理测试结果"
echo " help 显示此帮助信息"
echo ""
echo "示例:"
echo " $0 quick # 快速测试两种模式"
echo " $0 comparison # 详细对比测试"
echo " $0 full # 完整测试套件默认Erasure模式"
echo " $0 simd # 只测试SIMD模式"
echo " $0 erasure # 只测试纯 Erasure 模式"
echo " $0 quick # 快速性能测试"
echo " $0 performance # 详细性能测试"
echo " $0 full # 完整测试套件"
echo " $0 simd # SIMD 模式测试"
echo " $0 large # 大数据集测试"
echo ""
echo "模式说明:"
echo " Erasure模式: 使用reed-solomon-erasure实现稳定可靠"
echo " SIMD模式: 使用reed-solomon-simd实现高性能优化"
echo "实现特性:"
echo " - 使用 reed-solomon-simd 高性能 SIMD 实现"
echo " - 支持编码器/解码器实例缓存"
echo " - 优化的内存管理和线程安全"
echo " - 跨平台 SIMD 指令支持"
}
# 显示测试配置信息
@@ -196,22 +185,22 @@ show_test_info() {
if [ -f "/proc/cpuinfo" ]; then
echo " - CPU 型号: $(grep 'model name' /proc/cpuinfo | head -1 | cut -d: -f2 | xargs)"
if grep -q "avx2" /proc/cpuinfo; then
echo " - SIMD 支持: AVX2 ✅ (SIMD模式将利用SIMD优化)"
echo " - SIMD 支持: AVX2 ✅ (将使用高级 SIMD 优化)"
elif grep -q "sse4" /proc/cpuinfo; then
echo " - SIMD 支持: SSE4 ✅ (SIMD模式将利用SIMD优化)"
echo " - SIMD 支持: SSE4 ✅ (将使用 SIMD 优化)"
else
echo " - SIMD 支持: 未检测到高级 SIMD 特性"
echo " - SIMD 支持: 基础 SIMD 特性"
fi
fi
echo " - 默认模式: 纯Erasure模式 (稳定可靠)"
echo " - 高性能模式: SIMD模式 (性能优化)"
echo " - 实现: reed-solomon-simd (高性能 SIMD 优化)"
echo " - 特性: 实例缓存、线程安全、跨平台 SIMD"
echo ""
}
# 主函数
main() {
print_info "🧪 Reed-Solomon 实现性能基准测试"
print_info "🧪 Reed-Solomon SIMD 实现性能基准测试"
echo "================================================"
check_requirements
@@ -227,14 +216,9 @@ main() {
run_full_benchmark
generate_comparison_report
;;
"comparison")
"performance")
cleanup
run_comparison_benchmark
generate_comparison_report
;;
"erasure")
cleanup
run_erasure_benchmark
run_performance_test
generate_comparison_report
;;
"simd")
@@ -242,6 +226,11 @@ main() {
run_simd_benchmark
generate_comparison_report
;;
"large")
cleanup
run_large_data_test
generate_comparison_report
;;
"clean")
cleanup
;;
@@ -257,10 +246,7 @@ main() {
esac
print_success "✨ 基准测试执行完成!"
print_info "💡 提示: 推荐使用默认的纯Erasure模式对于高性能需求可考虑SIMD模式"
}
# 如果直接运行此脚本,调用主函数
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
main "$@"
fi
# 启动脚本
main "$@"

View File

@@ -1,6 +1,7 @@
#![allow(unused_variables)]
#![allow(dead_code)]
// use error::Error;
use crate::StorageAPI;
use crate::bucket::metadata_sys::get_replication_config;
use crate::bucket::versioning_sys::BucketVersioningSys;
use crate::error::Error;
@@ -11,26 +12,25 @@ use crate::store_api::ObjectIO;
use crate::store_api::ObjectInfo;
use crate::store_api::ObjectOptions;
use crate::store_api::ObjectToDelete;
use crate::StorageAPI;
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::Config;
use aws_sdk_s3::config::BehaviorVersion;
use aws_sdk_s3::config::Credentials;
use aws_sdk_s3::config::Region;
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::Config;
use bytes::Bytes;
use chrono::DateTime;
use chrono::Duration;
use chrono::Utc;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use http::HeaderMap;
use http::Method;
use lazy_static::lazy_static;
// use std::time::SystemTime;
use once_cell::sync::Lazy;
use regex::Regex;
use rustfs_rsc::provider::StaticProvider;
use rustfs_rsc::Minio;
use rustfs_rsc::provider::StaticProvider;
use s3s::dto::DeleteMarkerReplicationStatus;
use s3s::dto::DeleteReplicationStatus;
use s3s::dto::ExistingObjectReplicationStatus;
@@ -43,14 +43,14 @@ use std::collections::HashSet;
use std::fmt;
use std::iter::Iterator;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::vec;
use time::OffsetDateTime;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task;
use tracing::{debug, error, info, warn};
use uuid::Uuid;

View File

@@ -1,4 +1,4 @@
use super::{storageclass, Config, GLOBAL_StorageClass};
use super::{Config, GLOBAL_StorageClass, storageclass};
use crate::disk::RUSTFS_META_BUCKET;
use crate::error::{Error, Result};
use crate::store_api::{ObjectInfo, ObjectOptions, PutObjReader, StorageAPI};

View File

@@ -5,7 +5,7 @@ pub mod storageclass;
use crate::error::Result;
use crate::store::ECStore;
use com::{lookup_configs, read_config_without_migrate, STORAGE_CLASS_SUB_SYS};
use com::{STORAGE_CLASS_SUB_SYS, lookup_configs, read_config_without_migrate};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

View File

@@ -1,27 +1,15 @@
//! Erasure coding implementation supporting multiple Reed-Solomon backends.
//! Erasure coding implementation using Reed-Solomon SIMD backend.
//!
//! This module provides erasure coding functionality with support for two different
//! Reed-Solomon implementations:
//! This module provides erasure coding functionality with high-performance SIMD
//! Reed-Solomon implementation:
//!
//! ## Reed-Solomon Implementations
//! ## Reed-Solomon Implementation
//!
//! ### Pure Erasure Mode (Default)
//! - **Stability**: Pure erasure implementation, mature and well-tested
//! - **Performance**: Good performance with consistent behavior
//! - **Compatibility**: Works with any shard size
//! - **Use case**: Default behavior, recommended for most production use cases
//!
//! ### SIMD Mode (`reed-solomon-simd` feature)
//! ### SIMD Mode (Only)
//! - **Performance**: Uses SIMD optimization for high-performance encoding/decoding
//! - **Compatibility**: Works with any shard size through SIMD implementation
//! - **Reliability**: High-performance SIMD implementation for large data processing
//! - **Use case**: Use when maximum performance is needed for large data processing
//!
//! ## Feature Flags
//!
//! - Default: Use pure reed-solomon-erasure implementation (stable and reliable)
//! - `reed-solomon-simd`: Use SIMD mode for optimal performance
//! - `reed-solomon-erasure`: Explicitly enable pure erasure mode (same as default)
//! - **Use case**: Optimized for maximum performance in large data processing scenarios
//!
//! ## Example
//!
@@ -35,8 +23,6 @@
//! ```
use bytes::{Bytes, BytesMut};
use reed_solomon_erasure::galois_8::ReedSolomon as ReedSolomonErasure;
#[cfg(feature = "reed-solomon-simd")]
use reed_solomon_simd;
use smallvec::SmallVec;
use std::io;
@@ -44,38 +30,23 @@ use tokio::io::AsyncRead;
use tracing::warn;
use uuid::Uuid;
/// Reed-Solomon encoder variants supporting different implementations.
#[allow(clippy::large_enum_variant)]
pub enum ReedSolomonEncoder {
/// SIMD mode: High-performance SIMD implementation (when reed-solomon-simd feature is enabled)
#[cfg(feature = "reed-solomon-simd")]
SIMD {
data_shards: usize,
parity_shards: usize,
// 使用RwLock确保线程安全实现Send + Sync
encoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonEncoder>>,
decoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonDecoder>>,
},
/// Pure erasure mode: default and when reed-solomon-erasure feature is specified
Erasure(Box<ReedSolomonErasure>),
/// Reed-Solomon encoder using SIMD implementation.
pub struct ReedSolomonEncoder {
data_shards: usize,
parity_shards: usize,
// 使用RwLock确保线程安全实现Send + Sync
encoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonEncoder>>,
decoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonDecoder>>,
}
impl Clone for ReedSolomonEncoder {
fn clone(&self) -> Self {
match self {
#[cfg(feature = "reed-solomon-simd")]
ReedSolomonEncoder::SIMD {
data_shards,
parity_shards,
..
} => ReedSolomonEncoder::SIMD {
data_shards: *data_shards,
parity_shards: *parity_shards,
// 为新实例创建空的缓存,不共享缓存
encoder_cache: std::sync::RwLock::new(None),
decoder_cache: std::sync::RwLock::new(None),
},
ReedSolomonEncoder::Erasure(encoder) => ReedSolomonEncoder::Erasure(encoder.clone()),
Self {
data_shards: self.data_shards,
parity_shards: self.parity_shards,
// 为新实例创建空的缓存,不共享缓存
encoder_cache: std::sync::RwLock::new(None),
decoder_cache: std::sync::RwLock::new(None),
}
}
}
@@ -83,81 +54,50 @@ impl Clone for ReedSolomonEncoder {
impl ReedSolomonEncoder {
/// Create a new Reed-Solomon encoder with specified data and parity shards.
pub fn new(data_shards: usize, parity_shards: usize) -> io::Result<Self> {
#[cfg(feature = "reed-solomon-simd")]
{
// SIMD mode when reed-solomon-simd feature is enabled
Ok(ReedSolomonEncoder::SIMD {
data_shards,
parity_shards,
encoder_cache: std::sync::RwLock::new(None),
decoder_cache: std::sync::RwLock::new(None),
})
}
#[cfg(not(feature = "reed-solomon-simd"))]
{
// Pure erasure mode when reed-solomon-simd feature is not enabled (default or reed-solomon-erasure)
let encoder = ReedSolomonErasure::new(data_shards, parity_shards)
.map_err(|e| io::Error::other(format!("Failed to create erasure encoder: {:?}", e)))?;
Ok(ReedSolomonEncoder::Erasure(Box::new(encoder)))
}
Ok(ReedSolomonEncoder {
data_shards,
parity_shards,
encoder_cache: std::sync::RwLock::new(None),
decoder_cache: std::sync::RwLock::new(None),
})
}
/// Encode data shards with parity.
pub fn encode(&self, shards: SmallVec<[&mut [u8]; 16]>) -> io::Result<()> {
match self {
#[cfg(feature = "reed-solomon-simd")]
ReedSolomonEncoder::SIMD {
data_shards,
parity_shards,
encoder_cache,
..
} => {
let mut shards_vec: Vec<&mut [u8]> = shards.into_vec();
if shards_vec.is_empty() {
return Ok(());
}
let mut shards_vec: Vec<&mut [u8]> = shards.into_vec();
if shards_vec.is_empty() {
return Ok(());
}
// 使用 SIMD 进行编码
let simd_result = self.encode_with_simd(*data_shards, *parity_shards, encoder_cache, &mut shards_vec);
// 使用 SIMD 进行编码
let simd_result = self.encode_with_simd(&mut shards_vec);
match simd_result {
Ok(()) => Ok(()),
Err(simd_error) => {
warn!("SIMD encoding failed: {}", simd_error);
Err(simd_error)
}
}
match simd_result {
Ok(()) => Ok(()),
Err(simd_error) => {
warn!("SIMD encoding failed: {}", simd_error);
Err(simd_error)
}
ReedSolomonEncoder::Erasure(encoder) => encoder
.encode(shards)
.map_err(|e| io::Error::other(format!("Erasure encode error: {:?}", e))),
}
}
#[cfg(feature = "reed-solomon-simd")]
fn encode_with_simd(
&self,
data_shards: usize,
parity_shards: usize,
encoder_cache: &std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonEncoder>>,
shards_vec: &mut [&mut [u8]],
) -> io::Result<()> {
fn encode_with_simd(&self, shards_vec: &mut [&mut [u8]]) -> io::Result<()> {
let shard_len = shards_vec[0].len();
// 获取或创建encoder
let mut encoder = {
let mut cache_guard = encoder_cache
let mut cache_guard = self
.encoder_cache
.write()
.map_err(|_| io::Error::other("Failed to acquire encoder cache lock"))?;
match cache_guard.take() {
Some(mut cached_encoder) => {
// 使用reset方法重置现有encoder以适应新的参数
if let Err(e) = cached_encoder.reset(data_shards, parity_shards, shard_len) {
if let Err(e) = cached_encoder.reset(self.data_shards, self.parity_shards, shard_len) {
warn!("Failed to reset SIMD encoder: {:?}, creating new one", e);
// 如果reset失败创建新的encoder
reed_solomon_simd::ReedSolomonEncoder::new(data_shards, parity_shards, shard_len)
reed_solomon_simd::ReedSolomonEncoder::new(self.data_shards, self.parity_shards, shard_len)
.map_err(|e| io::Error::other(format!("Failed to create SIMD encoder: {:?}", e)))?
} else {
cached_encoder
@@ -165,14 +105,14 @@ impl ReedSolomonEncoder {
}
None => {
// 第一次使用创建新encoder
reed_solomon_simd::ReedSolomonEncoder::new(data_shards, parity_shards, shard_len)
reed_solomon_simd::ReedSolomonEncoder::new(self.data_shards, self.parity_shards, shard_len)
.map_err(|e| io::Error::other(format!("Failed to create SIMD encoder: {:?}", e)))?
}
}
};
// 添加原始shards
for (i, shard) in shards_vec.iter().enumerate().take(data_shards) {
for (i, shard) in shards_vec.iter().enumerate().take(self.data_shards) {
encoder
.add_original_shard(shard)
.map_err(|e| io::Error::other(format!("Failed to add shard {}: {:?}", i, e)))?;
@@ -185,15 +125,16 @@ impl ReedSolomonEncoder {
// 将恢复shards复制到输出缓冲区
for (i, recovery_shard) in result.recovery_iter().enumerate() {
if i + data_shards < shards_vec.len() {
shards_vec[i + data_shards].copy_from_slice(recovery_shard);
if i + self.data_shards < shards_vec.len() {
shards_vec[i + self.data_shards].copy_from_slice(recovery_shard);
}
}
// 将encoder放回缓存在result被drop后encoder自动重置可以重用
drop(result); // 显式drop result确保encoder被重置
*encoder_cache
*self
.encoder_cache
.write()
.map_err(|_| io::Error::other("Failed to return encoder to cache"))? = Some(encoder);
@@ -202,39 +143,19 @@ impl ReedSolomonEncoder {
/// Reconstruct missing shards.
pub fn reconstruct(&self, shards: &mut [Option<Vec<u8>>]) -> io::Result<()> {
match self {
#[cfg(feature = "reed-solomon-simd")]
ReedSolomonEncoder::SIMD {
data_shards,
parity_shards,
decoder_cache,
..
} => {
// 使用 SIMD 进行重构
let simd_result = self.reconstruct_with_simd(*data_shards, *parity_shards, decoder_cache, shards);
// 使用 SIMD 进行重构
let simd_result = self.reconstruct_with_simd(shards);
match simd_result {
Ok(()) => Ok(()),
Err(simd_error) => {
warn!("SIMD reconstruction failed: {}", simd_error);
Err(simd_error)
}
}
match simd_result {
Ok(()) => Ok(()),
Err(simd_error) => {
warn!("SIMD reconstruction failed: {}", simd_error);
Err(simd_error)
}
ReedSolomonEncoder::Erasure(encoder) => encoder
.reconstruct(shards)
.map_err(|e| io::Error::other(format!("Erasure reconstruct error: {:?}", e))),
}
}
#[cfg(feature = "reed-solomon-simd")]
fn reconstruct_with_simd(
&self,
data_shards: usize,
parity_shards: usize,
decoder_cache: &std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonDecoder>>,
shards: &mut [Option<Vec<u8>>],
) -> io::Result<()> {
fn reconstruct_with_simd(&self, shards: &mut [Option<Vec<u8>>]) -> io::Result<()> {
// Find a valid shard to determine length
let shard_len = shards
.iter()
@@ -243,17 +164,18 @@ impl ReedSolomonEncoder {
// 获取或创建decoder
let mut decoder = {
let mut cache_guard = decoder_cache
let mut cache_guard = self
.decoder_cache
.write()
.map_err(|_| io::Error::other("Failed to acquire decoder cache lock"))?;
match cache_guard.take() {
Some(mut cached_decoder) => {
// 使用reset方法重置现有decoder
if let Err(e) = cached_decoder.reset(data_shards, parity_shards, shard_len) {
if let Err(e) = cached_decoder.reset(self.data_shards, self.parity_shards, shard_len) {
warn!("Failed to reset SIMD decoder: {:?}, creating new one", e);
// 如果reset失败创建新的decoder
reed_solomon_simd::ReedSolomonDecoder::new(data_shards, parity_shards, shard_len)
reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len)
.map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {:?}", e)))?
} else {
cached_decoder
@@ -261,7 +183,7 @@ impl ReedSolomonEncoder {
}
None => {
// 第一次使用创建新decoder
reed_solomon_simd::ReedSolomonDecoder::new(data_shards, parity_shards, shard_len)
reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len)
.map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {:?}", e)))?
}
}
@@ -270,12 +192,12 @@ impl ReedSolomonEncoder {
// Add available shards (both data and parity)
for (i, shard_opt) in shards.iter().enumerate() {
if let Some(shard) = shard_opt {
if i < data_shards {
if i < self.data_shards {
decoder
.add_original_shard(i, shard)
.map_err(|e| io::Error::other(format!("Failed to add original shard for reconstruction: {:?}", e)))?;
} else {
let recovery_idx = i - data_shards;
let recovery_idx = i - self.data_shards;
decoder
.add_recovery_shard(recovery_idx, shard)
.map_err(|e| io::Error::other(format!("Failed to add recovery shard for reconstruction: {:?}", e)))?;
@@ -289,7 +211,7 @@ impl ReedSolomonEncoder {
// Fill in missing data shards from reconstruction result
for (i, shard_opt) in shards.iter_mut().enumerate() {
if shard_opt.is_none() && i < data_shards {
if shard_opt.is_none() && i < self.data_shards {
for (restored_index, restored_data) in result.restored_original_iter() {
if restored_index == i {
*shard_opt = Some(restored_data.to_vec());
@@ -302,7 +224,8 @@ impl ReedSolomonEncoder {
// 将decoder放回缓存在result被drop后decoder自动重置可以重用
drop(result); // 显式drop result确保decoder被重置
*decoder_cache
*self
.decoder_cache
.write()
.map_err(|_| io::Error::other("Failed to return decoder to cache"))? = Some(decoder);
@@ -592,19 +515,10 @@ mod tests {
fn test_encode_decode_roundtrip() {
let data_shards = 4;
let parity_shards = 2;
// Use different block sizes based on feature
#[cfg(not(feature = "reed-solomon-simd"))]
let block_size = 8; // Pure erasure mode (default)
#[cfg(feature = "reed-solomon-simd")]
let block_size = 1024; // SIMD mode - SIMD with fallback
let block_size = 1024; // SIMD mode
let erasure = Erasure::new(data_shards, parity_shards, block_size);
// Use different test data based on feature
#[cfg(not(feature = "reed-solomon-simd"))]
let test_data = b"hello world".to_vec(); // Small data for erasure (default)
#[cfg(feature = "reed-solomon-simd")]
// Use sufficient test data for SIMD optimization
let test_data = b"SIMD mode test data for encoding and decoding roundtrip verification with sufficient length to ensure shard size requirements are met for proper SIMD optimization.".repeat(20); // ~3KB for SIMD
let data = &test_data;
@@ -632,13 +546,7 @@ mod tests {
fn test_encode_decode_large_1m() {
let data_shards = 4;
let parity_shards = 2;
// Use different block sizes based on feature
#[cfg(feature = "reed-solomon-simd")]
let block_size = 512 * 3; // SIMD mode
#[cfg(not(feature = "reed-solomon-simd"))]
let block_size = 8192; // Pure erasure mode (default)
let erasure = Erasure::new(data_shards, parity_shards, block_size);
// Generate 1MB test data
@@ -704,16 +612,10 @@ mod tests {
let data_shards = 4;
let parity_shards = 2;
// Use different block sizes based on feature
#[cfg(feature = "reed-solomon-simd")]
let block_size = 1024; // SIMD mode
#[cfg(not(feature = "reed-solomon-simd"))]
let block_size = 8; // Pure erasure mode (default)
let erasure = Arc::new(Erasure::new(data_shards, parity_shards, block_size));
// Use test data suitable for both modes
// Use test data suitable for SIMD mode
let data =
b"Async error test data with sufficient length to meet requirements for proper testing and validation.".repeat(20); // ~2KB
@@ -747,13 +649,7 @@ mod tests {
let data_shards = 4;
let parity_shards = 2;
// Use different block sizes based on feature
#[cfg(feature = "reed-solomon-simd")]
let block_size = 1024; // SIMD mode
#[cfg(not(feature = "reed-solomon-simd"))]
let block_size = 8; // Pure erasure mode (default)
let erasure = Arc::new(Erasure::new(data_shards, parity_shards, block_size));
// Use test data that fits in exactly one block to avoid multi-block complexity
@@ -761,8 +657,6 @@ mod tests {
b"Channel async callback test data with sufficient length to ensure proper operation and validation requirements."
.repeat(8); // ~1KB
// let data = b"callback".to_vec(); // 8 bytes to fit exactly in one 8-byte block
let data_clone = data.clone(); // Clone for later comparison
let mut reader = Cursor::new(data);
let (tx, mut rx) = mpsc::channel::<Vec<Bytes>>(8);
@@ -801,8 +695,7 @@ mod tests {
assert_eq!(&recovered, &data_clone);
}
// Tests specifically for SIMD mode
#[cfg(feature = "reed-solomon-simd")]
// SIMD mode specific tests
mod simd_tests {
use super::*;
@@ -1171,47 +1064,4 @@ mod tests {
assert_eq!(&recovered, &data_clone);
}
}
// Comparative tests between different implementations
#[cfg(not(feature = "reed-solomon-simd"))]
mod comparative_tests {
use super::*;
#[test]
fn test_implementation_consistency() {
let data_shards = 4;
let parity_shards = 2;
let block_size = 2048; // Large enough for SIMD requirements
// Create test data that ensures each shard is >= 512 bytes (SIMD minimum)
let test_data = b"This is test data for comparing reed-solomon-simd and reed-solomon-erasure implementations to ensure they produce consistent results when given the same input parameters and data. This data needs to be sufficiently large to meet SIMD requirements.";
let data = test_data.repeat(50); // Create much larger data: ~13KB total, ~3.25KB per shard
// Test with erasure implementation (default)
let erasure_erasure = Erasure::new(data_shards, parity_shards, block_size);
let erasure_shards = erasure_erasure.encode_data(&data).unwrap();
// Test data integrity with erasure
let mut erasure_shards_opt: Vec<Option<Vec<u8>>> = erasure_shards.iter().map(|shard| Some(shard.to_vec())).collect();
// Lose some shards
erasure_shards_opt[1] = None; // Data shard
erasure_shards_opt[4] = None; // Parity shard
erasure_erasure.decode_data(&mut erasure_shards_opt).unwrap();
let mut erasure_recovered = Vec::new();
for shard in erasure_shards_opt.iter().take(data_shards) {
erasure_recovered.extend_from_slice(shard.as_ref().unwrap());
}
erasure_recovered.truncate(data.len());
// Verify erasure implementation works correctly
assert_eq!(&erasure_recovered, &data, "Erasure implementation failed to recover data correctly");
println!("✅ Both implementations are available and working correctly");
println!("✅ Default (reed-solomon-erasure): Data recovery successful");
println!("✅ SIMD tests are available as separate test suite");
}
}
}

View File

@@ -1,8 +1,8 @@
use crate::bitrot::{create_bitrot_reader, create_bitrot_writer};
use crate::disk::error_reduce::{reduce_read_quorum_errs, reduce_write_quorum_errs, OBJECT_OP_IGNORED_ERRS};
use crate::disk::error_reduce::{OBJECT_OP_IGNORED_ERRS, reduce_read_quorum_errs, reduce_write_quorum_errs};
use crate::disk::{
self, conv_part_err_to_int, has_part_err, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT,
CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS,
self, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS,
conv_part_err_to_int, has_part_err,
};
use crate::erasure_coding;
use crate::erasure_coding::bitrot_verify;
@@ -12,24 +12,24 @@ use crate::heal::data_usage_cache::DataUsageCache;
use crate::heal::heal_ops::{HealEntryFn, HealSequence};
use crate::store_api::ObjectToDelete;
use crate::{
cache_value::metacache_set::{list_path_raw, ListPathRawOptions},
config::{storageclass, GLOBAL_StorageClass},
cache_value::metacache_set::{ListPathRawOptions, list_path_raw},
config::{GLOBAL_StorageClass, storageclass},
disk::{
endpoint::Endpoint, error::DiskError, format::FormatV3, new_disk, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo,
DiskInfoOptions, DiskOption, DiskStore, FileInfoVersions, ReadMultipleReq, ReadMultipleResp,
ReadOptions, UpdateMetadataOpts, RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET,
CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskOption, DiskStore, FileInfoVersions,
RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET, ReadMultipleReq, ReadMultipleResp, ReadOptions,
UpdateMetadataOpts, endpoint::Endpoint, error::DiskError, format::FormatV3, new_disk,
},
error::{to_object_err, StorageError},
error::{StorageError, to_object_err},
global::{
get_global_deployment_id, is_dist_erasure, GLOBAL_BackgroundHealState, GLOBAL_LOCAL_DISK_MAP,
GLOBAL_LOCAL_DISK_SET_DRIVES,
GLOBAL_BackgroundHealState, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES, get_global_deployment_id,
is_dist_erasure,
},
heal::{
data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT},
data_usage_cache::{DataUsageCacheInfo, DataUsageEntry, DataUsageEntryInfo},
heal_commands::{
HealOpts, HealScanMode, HealingTracker, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE,
DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT, HEAL_NORMAL_SCAN,
DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT,
HEAL_NORMAL_SCAN, HealOpts, HealScanMode, HealingTracker,
},
heal_ops::BG_HEALING_UUID,
},
@@ -42,7 +42,7 @@ use crate::{
};
use crate::{disk::STORAGE_FORMAT_FILE, heal::mrf::PartialOperation};
use crate::{
heal::data_scanner::{globalHealConfig, HEAL_DELETE_DANGLING},
heal::data_scanner::{HEAL_DELETE_DANGLING, globalHealConfig},
store_api::ListObjectVersionsInfo,
};
use bytes::Bytes;
@@ -51,22 +51,22 @@ use chrono::Utc;
use futures::future::join_all;
use glob::Pattern;
use http::HeaderMap;
use lock::{namespace_lock::NsLockMap, LockApi};
use lock::{LockApi, namespace_lock::NsLockMap};
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
use md5::{Digest as Md5Digest, Md5};
use rand::{seq::SliceRandom, Rng};
use rand::{Rng, seq::SliceRandom};
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::{
file_info_from_raw, headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS}, merge_file_meta_versions, FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries,
MetaCacheEntry, MetadataResolutionParams,
ObjectPartInfo,
RawFileInfo,
FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, ObjectPartInfo,
RawFileInfo, file_info_from_raw,
headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS},
merge_file_meta_versions,
};
use rustfs_rio::{EtagResolvable, HashReader, TryGetIndex as _, WarpReader};
use rustfs_utils::{
crypto::{base64_decode, base64_encode, hex},
path::{encode_dir_object, has_suffix, path_join_buf, SLASH_SEPARATOR},
HashAlgorithm,
crypto::{base64_decode, base64_encode, hex},
path::{SLASH_SEPARATOR, encode_dir_object, has_suffix, path_join_buf},
};
use sha2::Sha256;
use std::hash::Hash;
@@ -82,7 +82,7 @@ use std::{
use time::OffsetDateTime;
use tokio::{
io::AsyncWrite,
sync::{broadcast, RwLock},
sync::{RwLock, broadcast},
};
use tokio::{
select,
@@ -5837,9 +5837,9 @@ fn get_complete_multipart_md5(parts: &[CompletePart]) -> String {
#[cfg(test)]
mod tests {
use super::*;
use crate::disk::error::DiskError;
use crate::disk::CHECK_PART_UNKNOWN;
use crate::disk::CHECK_PART_VOLUME_NOT_FOUND;
use crate::disk::error::DiskError;
use crate::store_api::CompletePart;
use rustfs_filemeta::ErasureInfo;
use std::collections::HashMap;

View File

@@ -8,10 +8,10 @@ use crate::{disk::DiskStore, heal::heal_commands::HealOpts};
use http::{HeaderMap, HeaderValue};
use madmin::heal_commands::HealResultItem;
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::{headers::AMZ_OBJECT_TAGGING, FileInfo, MetaCacheEntriesSorted, ObjectPartInfo};
use rustfs_filemeta::{FileInfo, MetaCacheEntriesSorted, ObjectPartInfo, headers::AMZ_OBJECT_TAGGING};
use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader};
use rustfs_utils::path::decode_dir_object;
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::path::decode_dir_object;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Debug;

View File

@@ -1,24 +1,24 @@
use crate::StorageAPI;
use crate::bucket::metadata_sys::get_versioning_config;
use crate::bucket::versioning::VersioningApi;
use crate::cache_value::metacache_set::{list_path_raw, ListPathRawOptions};
use crate::cache_value::metacache_set::{ListPathRawOptions, list_path_raw};
use crate::disk::error::DiskError;
use crate::disk::{DiskInfo, DiskStore};
use crate::error::{
is_all_not_found, is_all_volume_not_found, is_err_bucket_not_found, to_object_err, Error, Result, StorageError,
Error, Result, StorageError, is_all_not_found, is_all_volume_not_found, is_err_bucket_not_found, to_object_err,
};
use crate::set_disk::SetDisks;
use crate::store::check_list_objs_args;
use crate::store_api::{ListObjectVersionsInfo, ListObjectsInfo, ObjectInfo, ObjectOptions};
use crate::store_utils::is_reserved_or_invalid_bucket;
use crate::StorageAPI;
use crate::{store::ECStore, store_api::ListObjectsV2Info};
use futures::future::join_all;
use rand::seq::SliceRandom;
use rustfs_filemeta::{
merge_file_meta_versions, FileInfo, MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntriesSortedResult, MetaCacheEntry,
MetadataResolutionParams,
FileInfo, MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntriesSortedResult, MetaCacheEntry, MetadataResolutionParams,
merge_file_meta_versions,
};
use rustfs_utils::path::{self, base_dir_from_prefix, SLASH_SEPARATOR};
use rustfs_utils::path::{self, SLASH_SEPARATOR, base_dir_from_prefix};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast::{self, Receiver as B_Receiver};

View File

@@ -1,21 +1,21 @@
use crate::error::{is_err_config_not_found, Error, Result};
use crate::error::{Error, Result, is_err_config_not_found};
use crate::{
cache::{Cache, CacheEntity},
error::{is_err_no_such_group, is_err_no_such_policy, is_err_no_such_user, Error as IamError},
store::{object::IAM_CONFIG_PREFIX, GroupInfo, MappedPolicy, Store, UserType},
error::{Error as IamError, is_err_no_such_group, is_err_no_such_policy, is_err_no_such_user},
store::{GroupInfo, MappedPolicy, Store, UserType, object::IAM_CONFIG_PREFIX},
sys::{
UpdateServiceAccountOpts, MAX_SVCSESSION_POLICY_SIZE, SESSION_POLICY_NAME, SESSION_POLICY_NAME_EXTRACTED, STATUS_DISABLED,
STATUS_ENABLED,
MAX_SVCSESSION_POLICY_SIZE, SESSION_POLICY_NAME, SESSION_POLICY_NAME_EXTRACTED, STATUS_DISABLED, STATUS_ENABLED,
UpdateServiceAccountOpts,
},
};
use ecstore::global::get_global_action_cred;
use madmin::{AccountStatus, AddOrUpdateUserReq, GroupDesc};
use policy::{
arn::ARN,
auth::{self, get_claims_from_token_with_secret, is_secret_key_valid, jwt_sign, Credentials, UserIdentity},
auth::{self, Credentials, UserIdentity, get_claims_from_token_with_secret, is_secret_key_valid, jwt_sign},
format::Format,
policy::{
default::DEFAULT_POLICIES, iam_policy_claim_name_sa, Policy, PolicyDoc, EMBEDDED_POLICY_TYPE, INHERITED_POLICY_TYPE,
EMBEDDED_POLICY_TYPE, INHERITED_POLICY_TYPE, Policy, PolicyDoc, default::DEFAULT_POLICIES, iam_policy_claim_name_sa,
},
};
use rustfs_utils::crypto::base64_encode;
@@ -25,8 +25,8 @@ use serde_json::Value;
use std::{
collections::{HashMap, HashSet},
sync::{
atomic::{AtomicBool, AtomicI64, Ordering},
Arc,
atomic::{AtomicBool, AtomicI64, Ordering},
},
time::Duration,
};

View File

@@ -10,12 +10,12 @@ use http::StatusCode;
use hyper::Method;
use matchit::Params;
use rustfs_utils::net::bytes_stream;
use s3s::dto::StreamingBlob;
use s3s::s3_error;
use s3s::Body;
use s3s::S3Request;
use s3s::S3Response;
use s3s::S3Result;
use s3s::dto::StreamingBlob;
use s3s::s3_error;
use serde_urlencoded::from_bytes;
use tokio::io::AsyncWriteExt;
use tokio_util::io::ReaderStream;

View File

@@ -12,9 +12,9 @@ mod service;
mod storage;
use crate::auth::IAMAuth;
use crate::console::{init_console_cfg, CONSOLE_CONFIG};
use crate::console::{CONSOLE_CONFIG, init_console_cfg};
// Ensure the correct path for parse_license is imported
use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT};
use crate::server::{SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, wait_for_shutdown};
use bytes::Bytes;
use chrono::Datelike;
use clap::Parser;
@@ -22,6 +22,7 @@ use common::{
// error::{Error, Result},
globals::set_global_addr,
};
use ecstore::StorageAPI;
use ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
use ecstore::cmd::bucket_replication::init_bucket_replication_pool;
use ecstore::config as ecconfig;
@@ -29,12 +30,11 @@ use ecstore::config::GLOBAL_ConfigSys;
use ecstore::heal::background_heal_ops::init_auto_heal;
use ecstore::rpc::make_server;
use ecstore::store_api::BucketOptions;
use ecstore::StorageAPI;
use ecstore::{
endpoints::EndpointServerPools,
heal::data_scanner::init_data_scanner,
set_global_endpoints,
store::{init_local_disks, ECStore},
store::{ECStore, init_local_disks},
update_erasure_type,
};
use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys};
@@ -49,7 +49,7 @@ use iam::init_iam_sys;
use license::init_license;
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_obs::{init_obs, set_global_guard, SystemObserver};
use rustfs_obs::{SystemObserver, init_obs, set_global_guard};
use rustfs_utils::net::parse_and_resolve_address;
use rustls::ServerConfig;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
@@ -60,13 +60,13 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::signal::unix::{signal, SignalKind};
use tokio::signal::unix::{SignalKind, signal};
use tokio_rustls::TlsAcceptor;
use tonic::{metadata::MetadataValue, Request, Status};
use tonic::{Request, Status, metadata::MetadataValue};
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use tracing::{Span, instrument};
use tracing::{debug, error, info, warn};
use tracing::{instrument, Span};
const MI_B: usize = 1024 * 1024;
@@ -502,11 +502,9 @@ async fn run(opt: config::Opt) -> Result<()> {
});
// init store
let store = ECStore::new(server_addr.clone(), endpoint_pools.clone())
.await
.inspect_err(|err| {
error!("ECStore::new {:?}", err);
})?;
let store = ECStore::new(server_addr, endpoint_pools.clone()).await.inspect_err(|err| {
error!("ECStore::new {:?}", err);
})?;
ecconfig::init();
// config system configuration

View File

@@ -16,8 +16,8 @@ use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG;
use ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG;
use ecstore::bucket::metadata::BUCKET_POLICY_CONFIG;
@@ -32,13 +32,13 @@ use ecstore::bucket::tagging::decode_tags;
use ecstore::bucket::tagging::encode_tags;
use ecstore::bucket::utils::serialize;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::cmd::bucket_replication::ReplicationStatusType;
use ecstore::cmd::bucket_replication::ReplicationType;
use ecstore::cmd::bucket_replication::get_must_replicate_options;
use ecstore::cmd::bucket_replication::must_replicate;
use ecstore::cmd::bucket_replication::schedule_replication;
use ecstore::cmd::bucket_replication::ReplicationStatusType;
use ecstore::cmd::bucket_replication::ReplicationType;
use ecstore::compress::is_compressible;
use ecstore::compress::MIN_COMPRESSIBLE_SIZE;
use ecstore::compress::is_compressible;
use ecstore::error::StorageError;
use ecstore::new_object_layer_fn;
use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE;
@@ -58,11 +58,11 @@ use futures::StreamExt;
use http::HeaderMap;
use lazy_static::lazy_static;
use policy::auth;
use policy::policy::action::Action;
use policy::policy::action::S3Action;
use policy::policy::BucketPolicy;
use policy::policy::BucketPolicyArgs;
use policy::policy::Validator;
use policy::policy::action::Action;
use policy::policy::action::S3Action;
use query::instance::make_rustfsms;
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
@@ -70,23 +70,23 @@ use rustfs_rio::CompressReader;
use rustfs_rio::HashReader;
use rustfs_rio::Reader;
use rustfs_rio::WarpReader;
use rustfs_utils::path::path_join_buf;
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::path::path_join_buf;
use rustfs_zip::CompressionFormat;
use s3s::dto::*;
use s3s::s3_error;
use s3s::S3;
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::S3Result;
use s3s::S3;
use s3s::dto::*;
use s3s::s3_error;
use s3s::{S3Request, S3Response};
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_tar::Archive;