Merge branch 'main' of github.com:rustfs/s3-rustfs into feature/observability-metrics

# Conflicts:
#	crates/notify/src/event.rs
#	crates/notify/src/global.rs
#	crates/notify/src/integration.rs
#	crates/notify/src/notifier.rs
This commit is contained in:
houseme
2025-06-23 13:33:25 +08:00
59 changed files with 571 additions and 907 deletions

125
Cargo.lock generated
View File

@@ -52,17 +52,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "ahash"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9"
dependencies = [
"getrandom 0.2.16",
"once_cell",
"version_check",
]
[[package]]
name = "ahash"
version = "0.8.12"
@@ -301,7 +290,7 @@ version = "54.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a12fcdb3f1d03f69d3ec26ac67645a8fe3f878d77b5ebb0b15d64a116c212985"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow-buffer",
"arrow-data",
"arrow-schema",
@@ -446,7 +435,7 @@ version = "54.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69efcd706420e52cd44f5c4358d279801993846d1c2a8e52111853d61d55a619"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow-array",
"arrow-buffer",
"arrow-data",
@@ -819,7 +808,7 @@ dependencies = [
"http 0.2.12",
"http 1.3.1",
"http-body 0.4.6",
"lru 0.12.5",
"lru",
"percent-encoding",
"regex-lite",
"sha2 0.10.9",
@@ -2381,7 +2370,7 @@ dependencies = [
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core 0.9.11",
"parking_lot_core",
]
[[package]]
@@ -2395,7 +2384,7 @@ dependencies = [
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core 0.9.11",
"parking_lot_core",
]
[[package]]
@@ -2442,7 +2431,7 @@ dependencies = [
"itertools 0.14.0",
"log",
"object_store",
"parking_lot 0.12.4",
"parking_lot",
"parquet",
"rand 0.8.5",
"regex",
@@ -2472,7 +2461,7 @@ dependencies = [
"futures",
"itertools 0.14.0",
"log",
"parking_lot 0.12.4",
"parking_lot",
]
[[package]]
@@ -2503,7 +2492,7 @@ version = "46.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f53d7ec508e1b3f68bd301cee3f649834fad51eff9240d898a4b2614cfd0a7a"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"arrow-ipc",
"base64 0.22.1",
@@ -2584,7 +2573,7 @@ dependencies = [
"futures",
"log",
"object_store",
"parking_lot 0.12.4",
"parking_lot",
"rand 0.8.5",
"tempfile",
"url",
@@ -2659,7 +2648,7 @@ version = "46.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adfc2d074d5ee4d9354fdcc9283d5b2b9037849237ddecb8942a29144b77ca05"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"datafusion-common",
"datafusion-doc",
@@ -2680,7 +2669,7 @@ version = "46.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cbceba0f98d921309a9121b702bcd49289d383684cccabf9a92cda1602f3bbb"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"datafusion-common",
"datafusion-expr-common",
@@ -2720,7 +2709,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datafusion-physical-plan",
"parking_lot 0.12.4",
"parking_lot",
"paste",
]
@@ -2787,7 +2776,7 @@ version = "46.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1447c2c6bc8674a16be4786b4abf528c302803fafa186aa6275692570e64d85"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"datafusion-common",
"datafusion-expr",
@@ -2809,7 +2798,7 @@ version = "46.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f8c25dcd069073a75b3d2840a79d0f81e64bdd2c05f2d3d18939afb36a7dcb"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"datafusion-common",
"datafusion-expr-common",
@@ -2842,7 +2831,7 @@ version = "46.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88cc160df00e413e370b3b259c8ea7bfbebc134d32de16325950e9e923846b7f"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow",
"arrow-ord",
"arrow-schema",
@@ -2861,7 +2850,7 @@ dependencies = [
"indexmap 2.9.0",
"itertools 0.14.0",
"log",
"parking_lot 0.12.4",
"parking_lot",
"pin-project-lite",
"tokio",
]
@@ -3412,7 +3401,7 @@ dependencies = [
"futures-util",
"generational-box",
"once_cell",
"parking_lot 0.12.4",
"parking_lot",
"rustc-hash 1.1.0",
"tracing",
"warnings",
@@ -3646,7 +3635,6 @@ dependencies = [
"policy",
"protos",
"rand 0.9.1",
"reed-solomon-erasure",
"reed-solomon-simd",
"regex",
"reqwest",
@@ -4244,7 +4232,7 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a673cf4fb0ea6a91aa86c08695756dfe875277a912cdbf33db9a9f62d47ed82b"
dependencies = [
"parking_lot 0.12.4",
"parking_lot",
"tracing",
]
@@ -4612,9 +4600,6 @@ name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [
"ahash 0.7.8",
]
[[package]]
name = "hashbrown"
@@ -4622,7 +4607,7 @@ version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash 0.8.12",
"ahash",
"allocator-api2",
]
@@ -5723,15 +5708,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3bd0dd2cd90571056fdb71f6275fada10131182f84899f4b2a916e565d81d86"
[[package]]
name = "lru"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a"
dependencies = [
"hashbrown 0.12.3",
]
[[package]]
name = "lru"
version = "0.12.5"
@@ -6611,7 +6587,7 @@ dependencies = [
"futures",
"humantime",
"itertools 0.13.0",
"parking_lot 0.12.4",
"parking_lot",
"percent-encoding",
"snafu",
"tokio",
@@ -6821,17 +6797,6 @@ version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core 0.8.6",
]
[[package]]
name = "parking_lot"
version = "0.12.4"
@@ -6839,21 +6804,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13"
dependencies = [
"lock_api",
"parking_lot_core 0.9.11",
]
[[package]]
name = "parking_lot_core"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall 0.2.16",
"smallvec",
"winapi",
"parking_lot_core",
]
[[package]]
@@ -6875,7 +6826,7 @@ version = "54.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfb15796ac6f56b429fd99e33ba133783ad75b27c36b4b5ce06f1f82cc97754e"
dependencies = [
"ahash 0.8.12",
"ahash",
"arrow-array",
"arrow-buffer",
"arrow-cast",
@@ -7554,7 +7505,7 @@ dependencies = [
"derive_builder",
"futures",
"lazy_static",
"parking_lot 0.12.4",
"parking_lot",
"s3s",
"snafu",
"tokio",
@@ -7840,15 +7791,6 @@ dependencies = [
"syn 2.0.103",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.3.5"
@@ -7878,21 +7820,6 @@ dependencies = [
"thiserror 2.0.12",
]
[[package]]
name = "reed-solomon-erasure"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f"
dependencies = [
"cc",
"libc",
"libm",
"lru 0.7.8",
"parking_lot 0.11.2",
"smallvec",
"spin",
]
[[package]]
name = "reed-solomon-simd"
version = "3.0.1"
@@ -9486,7 +9413,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf776ba3fa74f83bf4b63c3dcbbf82173db2632ed8452cb2d891d33f459de70f"
dependencies = [
"new_debug_unreachable",
"parking_lot 0.12.4",
"parking_lot",
"phf_shared 0.11.3",
"precomputed-hash",
"serde",
@@ -9733,7 +9660,7 @@ dependencies = [
"ndk-sys",
"objc",
"once_cell",
"parking_lot 0.12.4",
"parking_lot",
"raw-window-handle 0.5.2",
"raw-window-handle 0.6.2",
"scopeguard",
@@ -10002,7 +9929,7 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot 0.12.4",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",

View File

@@ -84,7 +84,6 @@ cfg-if = "1.0.0"
chacha20poly1305 = { version = "0.10.1" }
chrono = { version = "0.4.41", features = ["serde"] }
clap = { version = "4.5.40", features = ["derive", "env"] }
config = "0.15.11"
const-str = { version = "0.6.2", features = ["std", "proc"] }
crc32fast = "1.4.2"
dashmap = "6.1.0"
@@ -92,7 +91,6 @@ datafusion = "46.0.1"
derive_builder = "0.20.2"
dioxus = { version = "0.6.3", features = ["router"] }
dirs = "6.0.0"
dotenvy = "0.15.7"
flatbuffers = "25.2.10"
flexi_logger = { version = "0.30.2", features = ["trc", "dont_minimize_extra_stacks"] }
form_urlencoded = "1.2.1"
@@ -168,7 +166,7 @@ flate2 = "1.1.1"
zstd = "0.13.3"
lz4 = "1.28.1"
rdkafka = { version = "0.37.0", features = ["tokio"] }
reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] }
reed-solomon-simd = { version = "3.0.0" }
regex = { version = "1.11.1" }
reqwest = { version = "0.12.20", default-features = false, features = [
@@ -201,7 +199,6 @@ shadow-rs = { version = "1.1.1", default-features = false }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
serde_urlencoded = "0.7.1"
serde_with = "3.12.0"
sha2 = "0.10.9"
siphasher = "1.0.1"
smallvec = { version = "1.15.1", features = ["serde"] }

View File

@@ -1,8 +1,8 @@
use rsa::Pkcs1v15Encrypt;
use rsa::{
RsaPrivateKey, RsaPublicKey,
pkcs8::{DecodePrivateKey, DecodePublicKey},
rand_core::OsRng,
RsaPrivateKey, RsaPublicKey,
};
use serde::{Deserialize, Serialize};
use std::io::{Error, Result};
@@ -58,8 +58,8 @@ static TEST_PRIVATE_KEY: &str = "-----BEGIN PRIVATE KEY-----\nMIIEvAIBADANBgkqhk
mod tests {
use super::*;
use rsa::{
pkcs8::{EncodePrivateKey, EncodePublicKey, LineEnding},
RsaPrivateKey,
pkcs8::{EncodePrivateKey, EncodePublicKey, LineEnding},
};
use std::time::{SystemTime, UNIX_EPOCH};
#[test]

View File

@@ -1,4 +1,3 @@
use std::sync::Arc;
use ecstore::config::{Config, ENABLE_KEY, ENABLE_ON, KV, KVS};
use rustfs_notify::arn::TargetID;
use rustfs_notify::factory::{
@@ -6,8 +5,9 @@ use rustfs_notify::factory::{
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
};
use rustfs_notify::store::DEFAULT_LIMIT;
use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError};
use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger};
use rustfs_notify::{initialize, notification_system};
use std::sync::Arc;
use std::time::Duration;
use tracing::info;

View File

@@ -1,5 +1,5 @@
use std::sync::Arc;
use ecstore::config::{Config, ENABLE_KEY, ENABLE_ON, KV, KVS};
use std::sync::Arc;
// Using Global Accessories
use rustfs_notify::arn::TargetID;
use rustfs_notify::factory::{
@@ -7,7 +7,7 @@ use rustfs_notify::factory::{
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
};
use rustfs_notify::store::DEFAULT_LIMIT;
use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError};
use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger};
use rustfs_notify::{initialize, notification_system};
use std::time::Duration;
use tracing::info;

View File

@@ -1,9 +1,9 @@
use axum::routing::get;
use axum::{
Router,
extract::Json,
http::{HeaderMap, Response, StatusCode},
routing::post,
Router,
};
use serde_json::Value;
use std::time::{SystemTime, UNIX_EPOCH};

View File

@@ -41,7 +41,7 @@ impl TargetID {
ARN {
target_id: self.clone(),
region: region.to_string(),
service: DEFAULT_ARN_SERVICE.to_string(), // Default Service
service: DEFAULT_ARN_SERVICE.to_string(), // Default Service
partition: DEFAULT_ARN_PARTITION.to_string(), // Default partition
}
}
@@ -112,7 +112,7 @@ impl ARN {
ARN {
target_id,
region,
service: DEFAULT_ARN_SERVICE.to_string(), // Default is sqs
service: DEFAULT_ARN_SERVICE.to_string(), // Default is sqs
partition: DEFAULT_ARN_PARTITION.to_string(), // Default is rustfs partition
}
}
@@ -121,16 +121,10 @@ impl ARN {
/// Returns the ARN string in the format "{ARN_PREFIX}:{region}:{target_id}"
#[allow(clippy::inherent_to_string)]
pub fn to_arn_string(&self) -> String {
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty()
{
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() {
return String::new();
}
format!(
"{}:{}:{}",
ARN_PREFIX,
self.region,
self.target_id.to_id_string()
)
format!("{}:{}:{}", ARN_PREFIX, self.region, self.target_id.to_id_string())
}
/// Parsing ARN from string
@@ -162,8 +156,7 @@ impl ARN {
impl fmt::Display for ARN {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty()
{
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() {
// Returns an empty string if all parts are empty
return Ok(());
}

View File

@@ -1,6 +1,6 @@
use crate::arn::TargetID;
use std::io;
use thiserror::Error;
use crate::arn::TargetID;
/// Error types for the store
#[derive(Debug, Error)]

View File

@@ -1,7 +1,7 @@
use crate::store::DEFAULT_LIMIT;
use crate::{
error::TargetError,
target::{mqtt::MQTTArgs, webhook::WebhookArgs, Target},
target::{Target, mqtt::MQTTArgs, webhook::WebhookArgs},
};
use async_trait::async_trait;
use ecstore::config::{ENABLE_KEY, ENABLE_ON, KVS};

View File

@@ -1,15 +1,15 @@
use crate::arn::TargetID;
use crate::store::{Key, Store};
use crate::{
error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, Event, EventName,
StoreError, Target,
Event, EventName, StoreError, Target, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry,
rules::BucketNotificationConfig, stream,
};
use ecstore::config::{Config, KVS};
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, RwLock, Semaphore};
use tokio::sync::{RwLock, Semaphore, mpsc};
use tracing::{debug, error, info, warn};
/// Notify the system of monitoring indicators

View File

@@ -26,7 +26,7 @@ pub use rules::BucketNotificationConfig;
use std::io::IsTerminal;
pub use target::Target;
use tracing_subscriber::{fmt, prelude::*, util::SubscriberInitExt, EnvFilter};
use tracing_subscriber::{EnvFilter, fmt, prelude::*, util::SubscriberInitExt};
/// Initialize the tracing log system
///

View File

@@ -1,5 +1,5 @@
use crate::arn::TargetID;
use crate::{error::NotificationError, event::Event, rules::RulesMap, target::Target, EventName};
use crate::{EventName, error::NotificationError, event::Event, rules::RulesMap, target::Target};
use dashmap::DashMap;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;

View File

@@ -1,11 +1,11 @@
use super::rules_map::RulesMap;
// Keep for existing structure if any, or remove if not used
use super::xml_config::ParseConfigError as BucketNotificationConfigError;
use crate::EventName;
use crate::arn::TargetID;
use crate::rules::NotificationConfiguration;
use crate::rules::pattern_rules;
use crate::rules::target_id_set;
use crate::rules::NotificationConfiguration;
use crate::EventName;
use std::collections::HashMap;
use std::io::Read;

View File

@@ -78,10 +78,7 @@ mod tests {
assert_eq!(new_pattern(Some(""), Some("b")), "*b");
assert_eq!(new_pattern(None, None), "");
assert_eq!(new_pattern(Some("prefix"), Some("suffix")), "prefix*suffix");
assert_eq!(
new_pattern(Some("prefix/"), Some("/suffix")),
"prefix/*suffix"
); // prefix/* + */suffix -> prefix/**/suffix -> prefix/*/suffix
assert_eq!(new_pattern(Some("prefix/"), Some("/suffix")), "prefix/*suffix"); // prefix/* + */suffix -> prefix/**/suffix -> prefix/*/suffix
}
#[test]

View File

@@ -23,9 +23,7 @@ impl PatternRules {
/// Checks if there are any rules that match the given object name.
pub fn match_simple(&self, object_name: &str) -> bool {
self.rules
.keys()
.any(|p| pattern::match_simple(p, object_name))
self.rules.keys().any(|p| pattern::match_simple(p, object_name))
}
/// Returns all TargetIDs that match the object name.
@@ -61,8 +59,7 @@ impl PatternRules {
for (pattern, self_targets) in &self.rules {
match other.rules.get(pattern) {
Some(other_targets) => {
let diff_targets: TargetIdSet =
self_targets.difference(other_targets).cloned().collect();
let diff_targets: TargetIdSet = self_targets.difference(other_targets).cloned().collect();
if !diff_targets.is_empty() {
result_rules.insert(pattern.clone(), diff_targets);
}
@@ -73,8 +70,6 @@ impl PatternRules {
}
}
}
PatternRules {
rules: result_rules,
}
PatternRules { rules: result_rules }
}
}

View File

@@ -1,5 +1,5 @@
use super::pattern;
use crate::arn::{ArnError, TargetIDError, ARN};
use crate::arn::{ARN, ArnError, TargetIDError};
use crate::event::EventName;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;

View File

@@ -1,5 +1,5 @@
use crate::error::StoreError;
use serde::{de::DeserializeOwned, Serialize};
use serde::{Serialize, de::DeserializeOwned};
use snap::raw::{Decoder, Encoder};
use std::sync::{Arc, RwLock};
use std::{

View File

@@ -1,13 +1,13 @@
use crate::{
error::TargetError, integration::NotificationMetrics,
Event, StoreError,
error::TargetError,
integration::NotificationMetrics,
store::{Key, Store},
target::Target,
Event,
StoreError,
};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, Semaphore};
use tokio::sync::{Semaphore, mpsc};
use tokio::time::sleep;
use tracing::{debug, error, info, warn};

View File

@@ -1,8 +1,8 @@
use std::sync::Arc;
use crate::arn::TargetID;
use crate::store::{Key, Store};
use crate::{Event, StoreError, TargetError};
use async_trait::async_trait;
use std::sync::Arc;
pub mod mqtt;
pub mod webhook;

View File

@@ -1,22 +1,22 @@
use crate::store::{Key, STORE_EXTENSION};
use crate::target::ChannelTargetType;
use crate::{
arn::TargetID, error::TargetError,
StoreError, Target,
arn::TargetID,
error::TargetError,
event::{Event, EventLog},
store::Store,
StoreError,
Target,
};
use async_trait::async_trait;
use rumqttc::{mqttbytes::Error as MqttBytesError, ConnectionError};
use rumqttc::{AsyncClient, EventLoop, MqttOptions, Outgoing, Packet, QoS};
use rumqttc::{ConnectionError, mqttbytes::Error as MqttBytesError};
use std::sync::Arc;
use std::{
path::PathBuf,
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use tokio::sync::{mpsc, Mutex, OnceCell};
use tokio::sync::{Mutex, OnceCell, mpsc};
use tracing::{debug, error, info, instrument, trace, warn};
use url::Url;
use urlencoding;

View File

@@ -1,19 +1,19 @@
use crate::store::STORE_EXTENSION;
use crate::target::ChannelTargetType;
use crate::{
arn::TargetID, error::TargetError,
StoreError, Target,
arn::TargetID,
error::TargetError,
event::{Event, EventLog},
store::{Key, Store},
StoreError,
Target,
};
use async_trait::async_trait;
use reqwest::{Client, StatusCode, Url};
use std::{
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};

View File

@@ -1,7 +1,7 @@
/// audit related metric descriptors
///
/// This module contains the metric descriptors for the audit subsystem.
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
const TARGET_ID: &str = "target_id";

View File

@@ -1,5 +1,5 @@
/// bucket level s3 metric descriptor
use crate::metrics::{new_counter_md, new_gauge_md, new_histogram_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, new_histogram_md, subsystems};
lazy_static::lazy_static! {
pub static ref BUCKET_API_TRAFFIC_SENT_BYTES_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Bucket copy metric descriptor
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
/// Bucket level replication metric descriptor
pub const BUCKET_L: &str = "bucket";

View File

@@ -1,5 +1,5 @@
/// Metric descriptors related to cluster configuration
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref CONFIG_RRS_PARITY_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Erasure code set related metric descriptors
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
/// The label for the pool ID
pub const POOL_ID_L: &str = "pool_id";

View File

@@ -1,5 +1,5 @@
/// Cluster health-related metric descriptors
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref HEALTH_DRIVES_OFFLINE_COUNT_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// IAM related metric descriptors
use crate::metrics::{new_counter_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems};
lazy_static::lazy_static! {
pub static ref LAST_SYNC_DURATION_MILLIS_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Notify the relevant metric descriptor
use crate::metrics::{new_counter_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems};
lazy_static::lazy_static! {
pub static ref NOTIFICATION_CURRENT_SEND_IN_PROGRESS_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Descriptors of metrics related to cluster object and bucket usage
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
/// Bucket labels
pub const BUCKET_LABEL: &str = "bucket";

View File

@@ -1,5 +1,5 @@
/// ILM-related metric descriptors
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref ILM_EXPIRY_PENDING_TASKS_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// A descriptor for metrics related to webhook logs
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
/// Define label constants for webhook metrics
/// name label

View File

@@ -23,6 +23,6 @@ pub use entry::descriptor::MetricDescriptor;
pub use entry::metric_name::MetricName;
pub use entry::metric_type::MetricType;
pub use entry::namespace::MetricNamespace;
pub use entry::subsystem::subsystems;
pub use entry::subsystem::MetricSubsystem;
pub use entry::subsystem::subsystems;
pub use entry::{new_counter_md, new_gauge_md, new_histogram_md};

View File

@@ -1,5 +1,5 @@
/// Copy the relevant metric descriptor
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref REPLICATION_AVERAGE_ACTIVE_WORKERS_MD: MetricDescriptor =

View File

@@ -1,4 +1,4 @@
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName, MetricSubsystem};
use crate::metrics::{MetricDescriptor, MetricName, MetricSubsystem, new_counter_md, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref API_REJECTED_AUTH_TOTAL_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Scanner-related metric descriptors
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref SCANNER_BUCKET_SCANS_FINISHED_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// CPU system-related metric descriptors
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref SYS_CPU_AVG_IDLE_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Drive-related metric descriptors
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
/// drive related labels
pub const DRIVE_LABEL: &str = "drive";

View File

@@ -1,5 +1,5 @@
/// Memory-related metric descriptors
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref MEM_TOTAL_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// Network-related metric descriptors
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref INTERNODE_ERRORS_TOTAL_MD: MetricDescriptor =

View File

@@ -1,5 +1,5 @@
/// process related metric descriptors
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
lazy_static::lazy_static! {
pub static ref PROCESS_LOCKS_READ_TOTAL_MD: MetricDescriptor =

View File

@@ -1,19 +1,19 @@
use crate::OtelConfig;
use flexi_logger::{style, Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode};
use flexi_logger::{Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode, style};
use nu_ansi_term::Color;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry::{KeyValue, global};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::{
Resource,
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
Resource,
};
use opentelemetry_semantic_conventions::{
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
SCHEMA_URL,
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_DIR, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO,
@@ -28,7 +28,7 @@ use tracing_error::ErrorLayer;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::fmt::time::LocalTime;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt};
/// A guard object that manages the lifecycle of OpenTelemetry components.
///

View File

@@ -1,4 +1,4 @@
mod user_agent;
pub use user_agent::get_user_agent;
pub use user_agent::ServiceType;
pub use user_agent::get_user_agent;

View File

@@ -11,9 +11,7 @@ rust-version.workspace = true
workspace = true
[features]
default = ["reed-solomon-simd"]
reed-solomon-simd = []
reed-solomon-erasure = []
default = []
[dependencies]
rustfs-config = { workspace = true, features = ["constants"] }
@@ -40,7 +38,6 @@ http.workspace = true
highway = { workspace = true }
url.workspace = true
uuid = { workspace = true, features = ["v4", "fast-rng", "serde"] }
reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] }
reed-solomon-simd = { version = "3.0.0" }
transform-stream = "0.3.1"
lazy_static.workspace = true

View File

@@ -1,38 +1,15 @@
# ECStore - Erasure Coding Storage
ECStore provides erasure coding functionality for the RustFS project, supporting multiple Reed-Solomon implementations for optimal performance and compatibility.
ECStore provides erasure coding functionality for the RustFS project, using high-performance Reed-Solomon SIMD implementation for optimal performance.
## Reed-Solomon Implementations
## Reed-Solomon Implementation
### Available Backends
### SIMD Backend (Only)
#### `reed-solomon-erasure` (Default)
- **Stability**: Mature and well-tested implementation
- **Performance**: Good performance with SIMD acceleration when available
- **Compatibility**: Works with any shard size
- **Memory**: Efficient memory usage
- **Use case**: Recommended for production use
#### `reed-solomon-simd` (Optional)
- **Performance**: Optimized SIMD implementation for maximum speed
- **Limitations**: Has restrictions on shard sizes (must be >= 64 bytes typically)
- **Memory**: May use more memory for small shards
- **Use case**: Best for large data blocks where performance is critical
### Feature Flags
Configure the Reed-Solomon implementation using Cargo features:
```toml
# Use default implementation (reed-solomon-erasure)
ecstore = "0.0.1"
# Use SIMD implementation for maximum performance
ecstore = { version = "0.0.1", features = ["reed-solomon-simd"], default-features = false }
# Use traditional implementation explicitly
ecstore = { version = "0.0.1", features = ["reed-solomon-erasure"], default-features = false }
```
- **Performance**: Uses SIMD optimization for high-performance encoding/decoding
- **Compatibility**: Works with any shard size through SIMD implementation
- **Reliability**: High-performance SIMD implementation for large data processing
- **Use case**: Optimized for maximum performance in large data processing scenarios
### Usage Example
@@ -68,42 +45,52 @@ assert_eq!(&recovered, data);
## Performance Considerations
### When to use `reed-solomon-simd`
- Large block sizes (>= 1KB recommended)
- High-throughput scenarios
- CPU-intensive workloads where encoding/decoding is the bottleneck
### When to use `reed-solomon-erasure`
- Small block sizes
- Memory-constrained environments
- General-purpose usage
- Production deployments requiring maximum stability
### SIMD Implementation Benefits
- **High Throughput**: Optimized for large block sizes (>= 1KB recommended)
- **CPU Optimization**: Leverages modern CPU SIMD instructions
- **Scalability**: Excellent performance for high-throughput scenarios
### Implementation Details
#### `reed-solomon-erasure`
- **Instance Reuse**: The encoder instance is cached and reused across multiple operations
- **Thread Safety**: Thread-safe with interior mutability
- **Memory Efficiency**: Lower memory footprint for small data
#### `reed-solomon-simd`
- **Instance Creation**: New encoder/decoder instances are created for each operation
- **API Design**: The SIMD implementation's API is designed for single-use instances
- **Performance Trade-off**: While instances are created per operation, the SIMD optimizations provide significant performance benefits for large data blocks
- **Optimization**: Future versions may implement instance pooling if the underlying API supports reuse
- **Instance Caching**: Encoder/decoder instances are cached and reused for optimal performance
- **Thread Safety**: Thread-safe with RwLock-based caching
- **SIMD Optimization**: Leverages CPU SIMD instructions for maximum performance
- **Reset Capability**: Cached instances are reset for different parameters, avoiding unnecessary allocations
### Performance Tips
1. **Batch Operations**: When possible, batch multiple small operations into larger blocks
2. **Block Size Optimization**: Use block sizes that are multiples of 64 bytes for SIMD implementations
2. **Block Size Optimization**: Use block sizes that are multiples of 64 bytes for optimal SIMD performance
3. **Memory Allocation**: Pre-allocate buffers when processing multiple blocks
4. **Feature Selection**: Choose the appropriate feature based on your data size and performance requirements
4. **Cache Warming**: Initial operations may be slower due to cache setup, subsequent operations benefit from caching
## Cross-Platform Compatibility
Both implementations support:
- x86_64 with SIMD acceleration
- aarch64 (ARM64) with optimizations
The SIMD implementation supports:
- x86_64 with advanced SIMD instructions (AVX2, SSE)
- aarch64 (ARM64) with NEON SIMD optimizations
- Other architectures with fallback implementations
The `reed-solomon-erasure` implementation provides better cross-platform compatibility and is recommended for most use cases.
The implementation automatically selects the best available SIMD instructions for the target platform, providing optimal performance across different architectures.
## Testing and Benchmarking
Run performance benchmarks:
```bash
# Run erasure coding benchmarks
cargo bench --bench erasure_benchmark
# Run comparison benchmarks
cargo bench --bench comparison_benchmark
# Generate benchmark reports
./run_benchmarks.sh
```
## Error Handling
All operations return `Result` types with comprehensive error information:
- Encoding errors: Invalid parameters, insufficient memory
- Decoding errors: Too many missing shards, corrupted data
- Configuration errors: Invalid shard counts, unsupported parameters

View File

@@ -1,29 +1,28 @@
//! 专门比较 Pure Erasure 和 Hybrid (SIMD) 模式性能的基准测试
//! Reed-Solomon SIMD performance analysis benchmarks
//!
//! 这个基准测试使用不同的feature编译配置来直接对比两种实现的性能。
//! This benchmark analyzes the performance characteristics of the SIMD Reed-Solomon implementation
//! across different data sizes, shard configurations, and usage patterns.
//!
//! ## 运行比较测试
//! ## Running Performance Analysis
//!
//! ```bash
//! # 测试 Pure Erasure 实现 (默认)
//! # Run all SIMD performance tests
//! cargo bench --bench comparison_benchmark
//!
//! # 测试 Hybrid (SIMD) 实现
//! cargo bench --bench comparison_benchmark --features reed-solomon-simd
//! # Generate detailed performance report
//! cargo bench --bench comparison_benchmark -- --save-baseline simd_analysis
//!
//! # 测试强制 erasure-only 模式
//! cargo bench --bench comparison_benchmark --features reed-solomon-erasure
//!
//! # 生成对比报告
//! cargo bench --bench comparison_benchmark -- --save-baseline erasure
//! cargo bench --bench comparison_benchmark --features reed-solomon-simd -- --save-baseline hybrid
//! # Run specific test categories
//! cargo bench --bench comparison_benchmark encode_analysis
//! cargo bench --bench comparison_benchmark decode_analysis
//! cargo bench --bench comparison_benchmark shard_analysis
//! ```
use criterion::{BenchmarkId, Criterion, Throughput, black_box, criterion_group, criterion_main};
use ecstore::erasure_coding::Erasure;
use std::time::Duration;
/// 基准测试数据配置
/// Performance test data configuration
struct TestData {
data: Vec<u8>,
size_name: &'static str,
@@ -36,41 +35,41 @@ impl TestData {
}
}
/// 生成不同大小的测试数据集
/// Generate different sized test datasets for performance analysis
fn generate_test_datasets() -> Vec<TestData> {
vec![
TestData::new(1024, "1KB"), // 小数据
TestData::new(8 * 1024, "8KB"), // 中小数据
TestData::new(64 * 1024, "64KB"), // 中等数据
TestData::new(256 * 1024, "256KB"), // 中大数据
TestData::new(1024 * 1024, "1MB"), // 大数据
TestData::new(4 * 1024 * 1024, "4MB"), // 超大数据
TestData::new(1024, "1KB"), // Small data
TestData::new(8 * 1024, "8KB"), // Medium-small data
TestData::new(64 * 1024, "64KB"), // Medium data
TestData::new(256 * 1024, "256KB"), // Medium-large data
TestData::new(1024 * 1024, "1MB"), // Large data
TestData::new(4 * 1024 * 1024, "4MB"), // Extra large data
]
}
/// 编码性能比较基准测试
fn bench_encode_comparison(c: &mut Criterion) {
/// SIMD encoding performance analysis
fn bench_encode_analysis(c: &mut Criterion) {
let datasets = generate_test_datasets();
let configs = vec![
(4, 2, "4+2"), // 常用配置
(6, 3, "6+3"), // 50%冗余
(8, 4, "8+4"), // 50%冗余,更多分片
(4, 2, "4+2"), // Common configuration
(6, 3, "6+3"), // 50% redundancy
(8, 4, "8+4"), // 50% redundancy, more shards
];
for dataset in &datasets {
for (data_shards, parity_shards, config_name) in &configs {
let test_name = format!("{}_{}_{}", dataset.size_name, config_name, get_implementation_name());
let test_name = format!("{}_{}_{}", dataset.size_name, config_name, "simd");
let mut group = c.benchmark_group("encode_comparison");
let mut group = c.benchmark_group("encode_analysis");
group.throughput(Throughput::Bytes(dataset.data.len() as u64));
group.sample_size(20);
group.measurement_time(Duration::from_secs(10));
// 检查是否能够创建erasure实例某些配置在纯SIMD模式下可能失败
// Test SIMD encoding performance
match Erasure::new(*data_shards, *parity_shards, dataset.data.len()).encode_data(&dataset.data) {
Ok(_) => {
group.bench_with_input(
BenchmarkId::new("implementation", &test_name),
BenchmarkId::new("simd_encode", &test_name),
&(&dataset.data, *data_shards, *parity_shards),
|b, (data, data_shards, parity_shards)| {
let erasure = Erasure::new(*data_shards, *parity_shards, data.len());
@@ -82,7 +81,7 @@ fn bench_encode_comparison(c: &mut Criterion) {
);
}
Err(e) => {
println!("⚠️ 跳过测试 {} - 配置不支持: {}", test_name, e);
println!("⚠️ Skipping test {} - configuration not supported: {}", test_name, e);
}
}
group.finish();
@@ -90,35 +89,35 @@ fn bench_encode_comparison(c: &mut Criterion) {
}
}
/// 解码性能比较基准测试
fn bench_decode_comparison(c: &mut Criterion) {
/// SIMD decoding performance analysis
fn bench_decode_analysis(c: &mut Criterion) {
let datasets = generate_test_datasets();
let configs = vec![(4, 2, "4+2"), (6, 3, "6+3"), (8, 4, "8+4")];
for dataset in &datasets {
for (data_shards, parity_shards, config_name) in &configs {
let test_name = format!("{}_{}_{}", dataset.size_name, config_name, get_implementation_name());
let test_name = format!("{}_{}_{}", dataset.size_name, config_name, "simd");
let erasure = Erasure::new(*data_shards, *parity_shards, dataset.data.len());
// 预先编码数据 - 检查是否支持此配置
// Pre-encode data - check if this configuration is supported
match erasure.encode_data(&dataset.data) {
Ok(encoded_shards) => {
let mut group = c.benchmark_group("decode_comparison");
let mut group = c.benchmark_group("decode_analysis");
group.throughput(Throughput::Bytes(dataset.data.len() as u64));
group.sample_size(20);
group.measurement_time(Duration::from_secs(10));
group.bench_with_input(
BenchmarkId::new("implementation", &test_name),
BenchmarkId::new("simd_decode", &test_name),
&(&encoded_shards, *data_shards, *parity_shards),
|b, (shards, data_shards, parity_shards)| {
let erasure = Erasure::new(*data_shards, *parity_shards, dataset.data.len());
b.iter(|| {
// 模拟最大可恢复的数据丢失
// Simulate maximum recoverable data loss
let mut shards_opt: Vec<Option<Vec<u8>>> =
shards.iter().map(|shard| Some(shard.to_vec())).collect();
// 丢失等于奇偶校验分片数量的分片
// Lose up to parity_shards number of shards
for item in shards_opt.iter_mut().take(*parity_shards) {
*item = None;
}
@@ -131,33 +130,33 @@ fn bench_decode_comparison(c: &mut Criterion) {
group.finish();
}
Err(e) => {
println!("⚠️ 跳过解码测试 {} - 配置不支持: {}", test_name, e);
println!("⚠️ Skipping decode test {} - configuration not supported: {}", test_name, e);
}
}
}
}
}
/// 分片大小敏感性测试
fn bench_shard_size_sensitivity(c: &mut Criterion) {
/// Shard size sensitivity analysis for SIMD optimization
fn bench_shard_size_analysis(c: &mut Criterion) {
let data_shards = 4;
let parity_shards = 2;
// 测试不同的分片大小特别关注SIMD的临界点
// Test different shard sizes, focusing on SIMD optimization thresholds
let shard_sizes = vec![32, 64, 128, 256, 512, 1024, 2048, 4096, 8192];
let mut group = c.benchmark_group("shard_size_sensitivity");
let mut group = c.benchmark_group("shard_size_analysis");
group.sample_size(15);
group.measurement_time(Duration::from_secs(8));
for shard_size in shard_sizes {
let total_size = shard_size * data_shards;
let data = (0..total_size).map(|i| (i % 256) as u8).collect::<Vec<u8>>();
let test_name = format!("{}B_shard_{}", shard_size, get_implementation_name());
let test_name = format!("{}B_shard_simd", shard_size);
group.throughput(Throughput::Bytes(total_size as u64));
// 检查此分片大小是否支持
// Check if this shard size is supported
let erasure = Erasure::new(data_shards, parity_shards, data.len());
match erasure.encode_data(&data) {
Ok(_) => {
@@ -170,15 +169,15 @@ fn bench_shard_size_sensitivity(c: &mut Criterion) {
});
}
Err(e) => {
println!("⚠️ 跳过分片大小测试 {} - 不支持: {}", test_name, e);
println!("⚠️ Skipping shard size test {} - not supported: {}", test_name, e);
}
}
}
group.finish();
}
/// 高负载并发测试
fn bench_concurrent_load(c: &mut Criterion) {
/// High-load concurrent performance analysis
fn bench_concurrent_analysis(c: &mut Criterion) {
use std::sync::Arc;
use std::thread;
@@ -186,14 +185,14 @@ fn bench_concurrent_load(c: &mut Criterion) {
let data = Arc::new((0..data_size).map(|i| (i % 256) as u8).collect::<Vec<u8>>());
let erasure = Arc::new(Erasure::new(4, 2, data_size));
let mut group = c.benchmark_group("concurrent_load");
let mut group = c.benchmark_group("concurrent_analysis");
group.throughput(Throughput::Bytes(data_size as u64));
group.sample_size(10);
group.measurement_time(Duration::from_secs(15));
let test_name = format!("1MB_concurrent_{}", get_implementation_name());
let test_name = "1MB_concurrent_simd";
group.bench_function(&test_name, |b| {
group.bench_function(test_name, |b| {
b.iter(|| {
let handles: Vec<_> = (0..4)
.map(|_| {
@@ -214,42 +213,44 @@ fn bench_concurrent_load(c: &mut Criterion) {
group.finish();
}
/// 错误恢复能力测试
fn bench_error_recovery_performance(c: &mut Criterion) {
let data_size = 256 * 1024; // 256KB
/// Error recovery performance analysis
fn bench_error_recovery_analysis(c: &mut Criterion) {
let data_size = 512 * 1024; // 512KB
let data = (0..data_size).map(|i| (i % 256) as u8).collect::<Vec<u8>>();
let configs = vec![
(4, 2, 1), // 丢失1个分片
(4, 2, 2), // 丢失2个分片最大可恢复
(6, 3, 2), // 丢失2个分片
(6, 3, 3), // 丢失3个分片最大可恢复
(8, 4, 3), // 丢失3个分片
(8, 4, 4), // 丢失4个分片最大可恢复
// Test different error recovery scenarios
let scenarios = vec![
(4, 2, 1, "single_loss"), // Lose 1 shard
(4, 2, 2, "double_loss"), // Lose 2 shards (maximum)
(6, 3, 1, "single_loss_6_3"), // Lose 1 shard with 6+3
(6, 3, 3, "triple_loss_6_3"), // Lose 3 shards (maximum)
(8, 4, 2, "double_loss_8_4"), // Lose 2 shards with 8+4
(8, 4, 4, "quad_loss_8_4"), // Lose 4 shards (maximum)
];
let mut group = c.benchmark_group("error_recovery");
let mut group = c.benchmark_group("error_recovery_analysis");
group.throughput(Throughput::Bytes(data_size as u64));
group.sample_size(15);
group.measurement_time(Duration::from_secs(8));
group.measurement_time(Duration::from_secs(10));
for (data_shards, parity_shards, lost_shards) in configs {
for (data_shards, parity_shards, loss_count, scenario_name) in scenarios {
let erasure = Erasure::new(data_shards, parity_shards, data_size);
let test_name = format!("{}+{}_lost{}_{}", data_shards, parity_shards, lost_shards, get_implementation_name());
// 检查此配置是否支持
match erasure.encode_data(&data) {
Ok(encoded_shards) => {
let test_name = format!("{}+{}_{}", data_shards, parity_shards, scenario_name);
group.bench_with_input(
BenchmarkId::new("recovery", &test_name),
&(&encoded_shards, data_shards, parity_shards, lost_shards),
|b, (shards, data_shards, parity_shards, lost_shards)| {
&(&encoded_shards, data_shards, parity_shards, loss_count),
|b, (shards, data_shards, parity_shards, loss_count)| {
let erasure = Erasure::new(*data_shards, *parity_shards, data_size);
b.iter(|| {
// Simulate specific number of shard losses
let mut shards_opt: Vec<Option<Vec<u8>>> = shards.iter().map(|shard| Some(shard.to_vec())).collect();
// 丢失指定数量的分片
for item in shards_opt.iter_mut().take(*lost_shards) {
// Lose the specified number of shards
for item in shards_opt.iter_mut().take(*loss_count) {
*item = None;
}
@@ -260,71 +261,57 @@ fn bench_error_recovery_performance(c: &mut Criterion) {
);
}
Err(e) => {
println!("⚠️ 跳过错误恢复测试 {} - 配置不支持: {}", test_name, e);
println!("⚠️ Skipping recovery test {}: {}", scenario_name, e);
}
}
}
group.finish();
}
/// 内存效率测试
fn bench_memory_efficiency(c: &mut Criterion) {
let data_shards = 4;
let parity_shards = 2;
let data_size = 1024 * 1024; // 1MB
/// Memory efficiency analysis
fn bench_memory_analysis(c: &mut Criterion) {
let data_sizes = vec![64 * 1024, 256 * 1024, 1024 * 1024]; // 64KB, 256KB, 1MB
let config = (4, 2); // 4+2 configuration
let mut group = c.benchmark_group("memory_efficiency");
group.throughput(Throughput::Bytes(data_size as u64));
group.sample_size(10);
let mut group = c.benchmark_group("memory_analysis");
group.sample_size(15);
group.measurement_time(Duration::from_secs(8));
let test_name = format!("memory_pattern_{}", get_implementation_name());
for data_size in data_sizes {
let data = (0..data_size).map(|i| (i % 256) as u8).collect::<Vec<u8>>();
let size_name = format!("{}KB", data_size / 1024);
// 测试连续多次编码对内存的影响
group.bench_function(format!("{}_continuous", test_name), |b| {
let erasure = Erasure::new(data_shards, parity_shards, data_size);
b.iter(|| {
for i in 0..10 {
let data = vec![(i % 256) as u8; data_size];
let shards = erasure.encode_data(black_box(&data)).unwrap();
group.throughput(Throughput::Bytes(data_size as u64));
// Test instance reuse vs new instance creation
group.bench_with_input(BenchmarkId::new("reuse_instance", &size_name), &data, |b, data| {
let erasure = Erasure::new(config.0, config.1, data.len());
b.iter(|| {
let shards = erasure.encode_data(black_box(data)).unwrap();
black_box(shards);
}
});
});
});
// 测试大量小编码任务
group.bench_function(format!("{}_small_chunks", test_name), |b| {
let chunk_size = 1024; // 1KB chunks
let erasure = Erasure::new(data_shards, parity_shards, chunk_size);
b.iter(|| {
for i in 0..1024 {
let data = vec![(i % 256) as u8; chunk_size];
let shards = erasure.encode_data(black_box(&data)).unwrap();
group.bench_with_input(BenchmarkId::new("new_instance", &size_name), &data, |b, data| {
b.iter(|| {
let erasure = Erasure::new(config.0, config.1, data.len());
let shards = erasure.encode_data(black_box(data)).unwrap();
black_box(shards);
}
});
});
});
}
group.finish();
}
/// 获取当前实现的名称
fn get_implementation_name() -> &'static str {
#[cfg(feature = "reed-solomon-simd")]
return "hybrid";
#[cfg(not(feature = "reed-solomon-simd"))]
return "erasure";
}
// Benchmark group configuration
criterion_group!(
benches,
bench_encode_comparison,
bench_decode_comparison,
bench_shard_size_sensitivity,
bench_concurrent_load,
bench_error_recovery_performance,
bench_memory_efficiency
bench_encode_analysis,
bench_decode_analysis,
bench_shard_size_analysis,
bench_concurrent_analysis,
bench_error_recovery_analysis,
bench_memory_analysis
);
criterion_main!(benches);

View File

@@ -1,25 +1,23 @@
//! Reed-Solomon erasure coding performance benchmarks.
//! Reed-Solomon SIMD erasure coding performance benchmarks.
//!
//! This benchmark compares the performance of different Reed-Solomon implementations:
//! - Default (Pure erasure): Stable reed-solomon-erasure implementation
//! - `reed-solomon-simd` feature: SIMD mode with optimized performance
//! This benchmark tests the performance of the high-performance SIMD Reed-Solomon implementation.
//!
//! ## Running Benchmarks
//!
//! ```bash
//! # 运行所有基准测试
//! # Run all benchmarks
//! cargo bench
//!
//! # 运行特定的基准测试
//! # Run specific benchmark
//! cargo bench --bench erasure_benchmark
//!
//! # 生成HTML报告
//! # Generate HTML report
//! cargo bench --bench erasure_benchmark -- --output-format html
//!
//! # 只测试编码性能
//! # Test encoding performance only
//! cargo bench encode
//!
//! # 只测试解码性能
//! # Test decoding performance only
//! cargo bench decode
//! ```
//!
@@ -29,24 +27,24 @@
//! - Different data sizes: 1KB, 64KB, 1MB, 16MB
//! - Different erasure coding configurations: (4,2), (6,3), (8,4)
//! - Both encoding and decoding operations
//! - Small vs large shard scenarios for SIMD optimization
//! - SIMD optimization for different shard sizes
use criterion::{BenchmarkId, Criterion, Throughput, black_box, criterion_group, criterion_main};
use ecstore::erasure_coding::{Erasure, calc_shard_size};
use std::time::Duration;
/// 基准测试配置结构体
/// Benchmark configuration structure
#[derive(Clone, Debug)]
struct BenchConfig {
/// 数据分片数量
/// Number of data shards
data_shards: usize,
/// 奇偶校验分片数量
/// Number of parity shards
parity_shards: usize,
/// 测试数据大小(字节)
/// Test data size (bytes)
data_size: usize,
/// 块大小(字节)
/// Block size (bytes)
block_size: usize,
/// 配置名称
/// Configuration name
name: String,
}
@@ -62,27 +60,27 @@ impl BenchConfig {
}
}
/// 生成测试数据
/// Generate test data
fn generate_test_data(size: usize) -> Vec<u8> {
(0..size).map(|i| (i % 256) as u8).collect()
}
/// 基准测试: 编码性能对比
/// Benchmark: Encoding performance
fn bench_encode_performance(c: &mut Criterion) {
let configs = vec![
// 小数据量测试 - 1KB
// Small data tests - 1KB
BenchConfig::new(4, 2, 1024, 1024),
BenchConfig::new(6, 3, 1024, 1024),
BenchConfig::new(8, 4, 1024, 1024),
// 中等数据量测试 - 64KB
// Medium data tests - 64KB
BenchConfig::new(4, 2, 64 * 1024, 64 * 1024),
BenchConfig::new(6, 3, 64 * 1024, 64 * 1024),
BenchConfig::new(8, 4, 64 * 1024, 64 * 1024),
// 大数据量测试 - 1MB
// Large data tests - 1MB
BenchConfig::new(4, 2, 1024 * 1024, 1024 * 1024),
BenchConfig::new(6, 3, 1024 * 1024, 1024 * 1024),
BenchConfig::new(8, 4, 1024 * 1024, 1024 * 1024),
// 超大数据量测试 - 16MB
// Extra large data tests - 16MB
BenchConfig::new(4, 2, 16 * 1024 * 1024, 16 * 1024 * 1024),
BenchConfig::new(6, 3, 16 * 1024 * 1024, 16 * 1024 * 1024),
];
@@ -90,13 +88,13 @@ fn bench_encode_performance(c: &mut Criterion) {
for config in configs {
let data = generate_test_data(config.data_size);
// 测试当前默认实现通常是SIMD
let mut group = c.benchmark_group("encode_current");
// Test SIMD encoding performance
let mut group = c.benchmark_group("encode_simd");
group.throughput(Throughput::Bytes(config.data_size as u64));
group.sample_size(10);
group.measurement_time(Duration::from_secs(5));
group.bench_with_input(BenchmarkId::new("current_impl", &config.name), &(&data, &config), |b, (data, config)| {
group.bench_with_input(BenchmarkId::new("simd_impl", &config.name), &(&data, &config), |b, (data, config)| {
let erasure = Erasure::new(config.data_shards, config.parity_shards, config.block_size);
b.iter(|| {
let shards = erasure.encode_data(black_box(data)).unwrap();
@@ -105,99 +103,55 @@ fn bench_encode_performance(c: &mut Criterion) {
});
group.finish();
// 如果SIMD feature启用测试专用的erasure实现对比
#[cfg(feature = "reed-solomon-simd")]
{
use ecstore::erasure_coding::ReedSolomonEncoder;
// Test direct SIMD implementation for large shards (>= 512 bytes)
let shard_size = calc_shard_size(config.data_size, config.data_shards);
if shard_size >= 512 {
let mut simd_group = c.benchmark_group("encode_simd_direct");
simd_group.throughput(Throughput::Bytes(config.data_size as u64));
simd_group.sample_size(10);
simd_group.measurement_time(Duration::from_secs(5));
let mut erasure_group = c.benchmark_group("encode_erasure_only");
erasure_group.throughput(Throughput::Bytes(config.data_size as u64));
erasure_group.sample_size(10);
erasure_group.measurement_time(Duration::from_secs(5));
simd_group.bench_with_input(BenchmarkId::new("simd_direct", &config.name), &(&data, &config), |b, (data, config)| {
b.iter(|| {
// Direct SIMD implementation
let per_shard_size = calc_shard_size(data.len(), config.data_shards);
match reed_solomon_simd::ReedSolomonEncoder::new(config.data_shards, config.parity_shards, per_shard_size) {
Ok(mut encoder) => {
// Create properly sized buffer and fill with data
let mut buffer = vec![0u8; per_shard_size * config.data_shards];
let copy_len = data.len().min(buffer.len());
buffer[..copy_len].copy_from_slice(&data[..copy_len]);
erasure_group.bench_with_input(
BenchmarkId::new("erasure_impl", &config.name),
&(&data, &config),
|b, (data, config)| {
let encoder = ReedSolomonEncoder::new(config.data_shards, config.parity_shards).unwrap();
b.iter(|| {
// 创建编码所需的数据结构
let per_shard_size = calc_shard_size(data.len(), config.data_shards);
let total_size = per_shard_size * (config.data_shards + config.parity_shards);
let mut buffer = vec![0u8; total_size];
buffer[..data.len()].copy_from_slice(data);
let slices: smallvec::SmallVec<[&mut [u8]; 16]> = buffer.chunks_exact_mut(per_shard_size).collect();
encoder.encode(black_box(slices)).unwrap();
black_box(&buffer);
});
},
);
erasure_group.finish();
}
// 如果使用SIMD feature测试直接SIMD实现对比
#[cfg(feature = "reed-solomon-simd")]
{
// 只对大shard测试SIMD小于512字节的shard SIMD性能不佳
let shard_size = calc_shard_size(config.data_size, config.data_shards);
if shard_size >= 512 {
let mut simd_group = c.benchmark_group("encode_simd_direct");
simd_group.throughput(Throughput::Bytes(config.data_size as u64));
simd_group.sample_size(10);
simd_group.measurement_time(Duration::from_secs(5));
simd_group.bench_with_input(
BenchmarkId::new("simd_impl", &config.name),
&(&data, &config),
|b, (data, config)| {
b.iter(|| {
// 直接使用SIMD实现
let per_shard_size = calc_shard_size(data.len(), config.data_shards);
match reed_solomon_simd::ReedSolomonEncoder::new(
config.data_shards,
config.parity_shards,
per_shard_size,
) {
Ok(mut encoder) => {
// 创建正确大小的缓冲区,并填充数据
let mut buffer = vec![0u8; per_shard_size * config.data_shards];
let copy_len = data.len().min(buffer.len());
buffer[..copy_len].copy_from_slice(&data[..copy_len]);
// 按正确的分片大小添加数据分片
for chunk in buffer.chunks_exact(per_shard_size) {
encoder.add_original_shard(black_box(chunk)).unwrap();
}
let result = encoder.encode().unwrap();
black_box(result);
}
Err(_) => {
// SIMD不支持此配置跳过
black_box(());
}
// Add data shards with correct shard size
for chunk in buffer.chunks_exact(per_shard_size) {
encoder.add_original_shard(black_box(chunk)).unwrap();
}
});
},
);
simd_group.finish();
}
let result = encoder.encode().unwrap();
black_box(result);
}
Err(_) => {
// SIMD doesn't support this configuration, skip
black_box(());
}
}
});
});
simd_group.finish();
}
}
}
/// 基准测试: 解码性能对比
/// Benchmark: Decoding performance
fn bench_decode_performance(c: &mut Criterion) {
let configs = vec![
// 中等数据量测试 - 64KB
// Medium data tests - 64KB
BenchConfig::new(4, 2, 64 * 1024, 64 * 1024),
BenchConfig::new(6, 3, 64 * 1024, 64 * 1024),
// 大数据量测试 - 1MB
// Large data tests - 1MB
BenchConfig::new(4, 2, 1024 * 1024, 1024 * 1024),
BenchConfig::new(6, 3, 1024 * 1024, 1024 * 1024),
// 超大数据量测试 - 16MB
// Extra large data tests - 16MB
BenchConfig::new(4, 2, 16 * 1024 * 1024, 16 * 1024 * 1024),
];
@@ -205,25 +159,25 @@ fn bench_decode_performance(c: &mut Criterion) {
let data = generate_test_data(config.data_size);
let erasure = Erasure::new(config.data_shards, config.parity_shards, config.block_size);
// 预先编码数据
// Pre-encode data
let encoded_shards = erasure.encode_data(&data).unwrap();
// 测试当前默认实现的解码性能
let mut group = c.benchmark_group("decode_current");
// Test SIMD decoding performance
let mut group = c.benchmark_group("decode_simd");
group.throughput(Throughput::Bytes(config.data_size as u64));
group.sample_size(10);
group.measurement_time(Duration::from_secs(5));
group.bench_with_input(
BenchmarkId::new("current_impl", &config.name),
BenchmarkId::new("simd_impl", &config.name),
&(&encoded_shards, &config),
|b, (shards, config)| {
let erasure = Erasure::new(config.data_shards, config.parity_shards, config.block_size);
b.iter(|| {
// 模拟数据丢失 - 丢失一个数据分片和一个奇偶分片
// Simulate data loss - lose one data shard and one parity shard
let mut shards_opt: Vec<Option<Vec<u8>>> = shards.iter().map(|shard| Some(shard.to_vec())).collect();
// 丢失最后一个数据分片和第一个奇偶分片
// Lose last data shard and first parity shard
shards_opt[config.data_shards - 1] = None;
shards_opt[config.data_shards] = None;
@@ -234,58 +188,52 @@ fn bench_decode_performance(c: &mut Criterion) {
);
group.finish();
// 如果使用混合模式默认测试SIMD解码性能
#[cfg(not(feature = "reed-solomon-erasure"))]
{
let shard_size = calc_shard_size(config.data_size, config.data_shards);
if shard_size >= 512 {
let mut simd_group = c.benchmark_group("decode_simd_direct");
simd_group.throughput(Throughput::Bytes(config.data_size as u64));
simd_group.sample_size(10);
simd_group.measurement_time(Duration::from_secs(5));
// Test direct SIMD decoding for large shards
let shard_size = calc_shard_size(config.data_size, config.data_shards);
if shard_size >= 512 {
let mut simd_group = c.benchmark_group("decode_simd_direct");
simd_group.throughput(Throughput::Bytes(config.data_size as u64));
simd_group.sample_size(10);
simd_group.measurement_time(Duration::from_secs(5));
simd_group.bench_with_input(
BenchmarkId::new("simd_impl", &config.name),
&(&encoded_shards, &config),
|b, (shards, config)| {
b.iter(|| {
let per_shard_size = calc_shard_size(config.data_size, config.data_shards);
match reed_solomon_simd::ReedSolomonDecoder::new(
config.data_shards,
config.parity_shards,
per_shard_size,
) {
Ok(mut decoder) => {
// 添加可用的分片(除了丢失的)
for (i, shard) in shards.iter().enumerate() {
if i != config.data_shards - 1 && i != config.data_shards {
if i < config.data_shards {
decoder.add_original_shard(i, black_box(shard)).unwrap();
} else {
let recovery_idx = i - config.data_shards;
decoder.add_recovery_shard(recovery_idx, black_box(shard)).unwrap();
}
simd_group.bench_with_input(
BenchmarkId::new("simd_direct", &config.name),
&(&encoded_shards, &config),
|b, (shards, config)| {
b.iter(|| {
let per_shard_size = calc_shard_size(config.data_size, config.data_shards);
match reed_solomon_simd::ReedSolomonDecoder::new(config.data_shards, config.parity_shards, per_shard_size)
{
Ok(mut decoder) => {
// Add available shards (except lost ones)
for (i, shard) in shards.iter().enumerate() {
if i != config.data_shards - 1 && i != config.data_shards {
if i < config.data_shards {
decoder.add_original_shard(i, black_box(shard)).unwrap();
} else {
let recovery_idx = i - config.data_shards;
decoder.add_recovery_shard(recovery_idx, black_box(shard)).unwrap();
}
}
}
let result = decoder.decode().unwrap();
black_box(result);
}
Err(_) => {
// SIMD不支持此配置跳过
black_box(());
}
let result = decoder.decode().unwrap();
black_box(result);
}
});
},
);
simd_group.finish();
}
Err(_) => {
// SIMD doesn't support this configuration, skip
black_box(());
}
}
});
},
);
simd_group.finish();
}
}
}
/// 基准测试: 不同分片大小对性能的影响
/// Benchmark: Impact of different shard sizes on performance
fn bench_shard_size_impact(c: &mut Criterion) {
let shard_sizes = vec![64, 128, 256, 512, 1024, 2048, 4096, 8192];
let data_shards = 4;
@@ -301,8 +249,8 @@ fn bench_shard_size_impact(c: &mut Criterion) {
group.throughput(Throughput::Bytes(total_data_size as u64));
// 测试当前实现
group.bench_with_input(BenchmarkId::new("current", format!("shard_{}B", shard_size)), &data, |b, data| {
// Test SIMD implementation
group.bench_with_input(BenchmarkId::new("simd", format!("shard_{}B", shard_size)), &data, |b, data| {
let erasure = Erasure::new(data_shards, parity_shards, total_data_size);
b.iter(|| {
let shards = erasure.encode_data(black_box(data)).unwrap();
@@ -313,19 +261,19 @@ fn bench_shard_size_impact(c: &mut Criterion) {
group.finish();
}
/// 基准测试: 编码配置对性能的影响
/// Benchmark: Impact of coding configurations on performance
fn bench_coding_configurations(c: &mut Criterion) {
let configs = vec![
(2, 1), // 最小冗余
(3, 2), // 中等冗余
(4, 2), // 常用配置
(6, 3), // 50%冗余
(8, 4), // 50%冗余,更多分片
(10, 5), // 50%冗余,大量分片
(12, 6), // 50%冗余,更大量分片
(2, 1), // Minimal redundancy
(3, 2), // Medium redundancy
(4, 2), // Common configuration
(6, 3), // 50% redundancy
(8, 4), // 50% redundancy, more shards
(10, 5), // 50% redundancy, many shards
(12, 6), // 50% redundancy, very many shards
];
let data_size = 1024 * 1024; // 1MB测试数据
let data_size = 1024 * 1024; // 1MB test data
let data = generate_test_data(data_size);
let mut group = c.benchmark_group("coding_configurations");
@@ -347,17 +295,17 @@ fn bench_coding_configurations(c: &mut Criterion) {
group.finish();
}
/// 基准测试: 内存使用模式
/// Benchmark: Memory usage patterns
fn bench_memory_patterns(c: &mut Criterion) {
let data_shards = 4;
let parity_shards = 2;
let block_size = 1024 * 1024; // 1MB
let block_size = 1024 * 1024; // 1MB block
let mut group = c.benchmark_group("memory_patterns");
group.sample_size(10);
group.measurement_time(Duration::from_secs(5));
// 测试重复使用同一个Erasure实例
// Test reusing the same Erasure instance
group.bench_function("reuse_erasure_instance", |b| {
let erasure = Erasure::new(data_shards, parity_shards, block_size);
let data = generate_test_data(block_size);
@@ -368,7 +316,7 @@ fn bench_memory_patterns(c: &mut Criterion) {
});
});
// 测试每次创建新的Erasure实例
// Test creating new Erasure instance each time
group.bench_function("new_erasure_instance", |b| {
let data = generate_test_data(block_size);
@@ -382,7 +330,7 @@ fn bench_memory_patterns(c: &mut Criterion) {
group.finish();
}
// 基准测试组配置
// Benchmark group configuration
criterion_group!(
benches,
bench_encode_performance,

View File

@@ -1,54 +1,48 @@
#!/bin/bash
# Reed-Solomon 实现性能比较脚本
#
# 这个脚本将运行不同的基准测试来比较SIMD模式和纯Erasure模式的性能
#
# 使用方法:
# ./run_benchmarks.sh [quick|full|comparison]
#
# quick - 快速测试主要场景
# full - 完整基准测试套件
# comparison - 专门对比两种实现模式
# Reed-Solomon SIMD 性能基准测试脚本
# 使用高性能 SIMD 实现进行纠删码性能测试
set -e
# 颜色输出
# ANSI 颜色码
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
PURPLE='\033[0;35m'
NC='\033[0m' # No Color
# 输出带颜色的
# 打印带颜色的
print_info() {
echo -e "${BLUE}[INFO]${NC} $1"
echo -e "${BLUE} $1${NC}"
}
print_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
echo -e "${GREEN}$1${NC}"
}
print_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
echo -e "${YELLOW}⚠️ $1${NC}"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
echo -e "${RED}$1${NC}"
}
# 检查是否安装了必要工具
# 检查系统要求
check_requirements() {
print_info "检查系统要求..."
# 检查 Rust
if ! command -v cargo &> /dev/null; then
print_error "cargo 未安装,请先安装 Rust 工具链"
print_error "Cargo 未找到,请确保已安装 Rust"
exit 1
fi
# 检查是否安装了 criterion
if ! grep -q "criterion" Cargo.toml; then
print_error "Cargo.toml 中未找到 criterion 依赖"
# 检查 criterion
if ! cargo --list | grep -q "bench"; then
print_error "未找到基准测试支持,请确保使用的是支持基准测试的 Rust 版本"
exit 1
fi
@@ -62,28 +56,15 @@ cleanup() {
print_success "清理完成"
}
# 运行纯 Erasure 模式基准测试
run_erasure_benchmark() {
print_info "🏛️ 开始运行纯 Erasure 模式基准测试..."
echo "================================================"
cargo bench --bench comparison_benchmark \
--features reed-solomon-erasure \
-- --save-baseline erasure_baseline
print_success "纯 Erasure 模式基准测试完成"
}
# 运行SIMD模式基准测试
# 运行 SIMD 模式基准测试
run_simd_benchmark() {
print_info "🎯 开始运行SIMD模式基准测试..."
print_info "🎯 开始运行 SIMD 模式基准测试..."
echo "================================================"
cargo bench --bench comparison_benchmark \
--features reed-solomon-simd \
-- --save-baseline simd_baseline
print_success "SIMD模式基准测试完成"
print_success "SIMD 模式基准测试完成"
}
# 运行完整的基准测试套件
@@ -91,33 +72,42 @@ run_full_benchmark() {
print_info "🚀 开始运行完整基准测试套件..."
echo "================================================"
# 运行详细的基准测试使用默认纯Erasure模式
# 运行详细的基准测试
cargo bench --bench erasure_benchmark
print_success "完整基准测试套件完成"
}
# 运行性能对比测试
run_comparison_benchmark() {
print_info "📊 开始运行性能对比测试..."
# 运行性能测试
run_performance_test() {
print_info "📊 开始运行性能测试..."
echo "================================================"
print_info "步骤 1: 测试纯 Erasure 模式..."
print_info "步骤 1: 运行编码基准测试..."
cargo bench --bench comparison_benchmark \
--features reed-solomon-erasure \
-- --save-baseline erasure_baseline
-- encode --save-baseline encode_baseline
print_info "步骤 2: 测试SIMD模式并与 Erasure 模式对比..."
print_info "步骤 2: 运行解码基准测试..."
cargo bench --bench comparison_benchmark \
--features reed-solomon-simd \
-- --baseline erasure_baseline
-- decode --save-baseline decode_baseline
print_success "性能对比测试完成"
print_success "性能测试完成"
}
# 运行大数据集测试
run_large_data_test() {
print_info "🗂️ 开始运行大数据集测试..."
echo "================================================"
cargo bench --bench erasure_benchmark \
-- large_data --save-baseline large_data_baseline
print_success "大数据集测试完成"
}
# 生成比较报告
generate_comparison_report() {
print_info "📊 生成性能比较报告..."
print_info "📊 生成性能报告..."
if [ -d "target/criterion" ]; then
print_info "基准测试结果已保存到 target/criterion/ 目录"
@@ -138,49 +128,48 @@ generate_comparison_report() {
run_quick_test() {
print_info "🏃 运行快速性能测试..."
print_info "测试纯 Erasure 模式..."
print_info "测试 SIMD 编码性能..."
cargo bench --bench comparison_benchmark \
--features reed-solomon-erasure \
-- encode_comparison --quick
-- encode --quick
print_info "测试SIMD模式..."
print_info "测试 SIMD 解码性能..."
cargo bench --bench comparison_benchmark \
--features reed-solomon-simd \
-- encode_comparison --quick
-- decode --quick
print_success "快速测试完成"
}
# 显示帮助信息
show_help() {
echo "Reed-Solomon 性能基准测试脚本"
echo "Reed-Solomon SIMD 性能基准测试脚本"
echo ""
echo "实现模式:"
echo " 🏛️ 纯 Erasure 模式(默认)- 稳定兼容的 reed-solomon-erasure 实现"
echo " 🎯 SIMD模式 - 高性能SIMD优化实现"
echo " 🎯 SIMD 模式 - 高性能 SIMD 优化的 reed-solomon-simd 实现"
echo ""
echo "使用方法:"
echo " $0 [command]"
echo ""
echo "命令:"
echo " quick 运行快速性能测试"
echo " full 运行完整基准测试套件默认Erasure模式"
echo " comparison 运行详细的实现模式对比测试"
echo " erasure 只测试纯 Erasure 模式"
echo " simd 只测试SIMD模式"
echo " full 运行完整基准测试套件"
echo " performance 运行详细的性能测试"
echo " simd 运行 SIMD 模式测试"
echo " large 运行大数据集测试"
echo " clean 清理测试结果"
echo " help 显示此帮助信息"
echo ""
echo "示例:"
echo " $0 quick # 快速测试两种模式"
echo " $0 comparison # 详细对比测试"
echo " $0 full # 完整测试套件默认Erasure模式"
echo " $0 simd # 只测试SIMD模式"
echo " $0 erasure # 只测试纯 Erasure 模式"
echo " $0 quick # 快速性能测试"
echo " $0 performance # 详细性能测试"
echo " $0 full # 完整测试套件"
echo " $0 simd # SIMD 模式测试"
echo " $0 large # 大数据集测试"
echo ""
echo "模式说明:"
echo " Erasure模式: 使用reed-solomon-erasure实现稳定可靠"
echo " SIMD模式: 使用reed-solomon-simd实现高性能优化"
echo "实现特性:"
echo " - 使用 reed-solomon-simd 高性能 SIMD 实现"
echo " - 支持编码器/解码器实例缓存"
echo " - 优化的内存管理和线程安全"
echo " - 跨平台 SIMD 指令支持"
}
# 显示测试配置信息
@@ -196,22 +185,22 @@ show_test_info() {
if [ -f "/proc/cpuinfo" ]; then
echo " - CPU 型号: $(grep 'model name' /proc/cpuinfo | head -1 | cut -d: -f2 | xargs)"
if grep -q "avx2" /proc/cpuinfo; then
echo " - SIMD 支持: AVX2 ✅ (SIMD模式将利用SIMD优化)"
echo " - SIMD 支持: AVX2 ✅ (将使用高级 SIMD 优化)"
elif grep -q "sse4" /proc/cpuinfo; then
echo " - SIMD 支持: SSE4 ✅ (SIMD模式将利用SIMD优化)"
echo " - SIMD 支持: SSE4 ✅ (将使用 SIMD 优化)"
else
echo " - SIMD 支持: 未检测到高级 SIMD 特性"
echo " - SIMD 支持: 基础 SIMD 特性"
fi
fi
echo " - 默认模式: 纯Erasure模式 (稳定可靠)"
echo " - 高性能模式: SIMD模式 (性能优化)"
echo " - 实现: reed-solomon-simd (高性能 SIMD 优化)"
echo " - 特性: 实例缓存、线程安全、跨平台 SIMD"
echo ""
}
# 主函数
main() {
print_info "🧪 Reed-Solomon 实现性能基准测试"
print_info "🧪 Reed-Solomon SIMD 实现性能基准测试"
echo "================================================"
check_requirements
@@ -227,14 +216,9 @@ main() {
run_full_benchmark
generate_comparison_report
;;
"comparison")
"performance")
cleanup
run_comparison_benchmark
generate_comparison_report
;;
"erasure")
cleanup
run_erasure_benchmark
run_performance_test
generate_comparison_report
;;
"simd")
@@ -242,6 +226,11 @@ main() {
run_simd_benchmark
generate_comparison_report
;;
"large")
cleanup
run_large_data_test
generate_comparison_report
;;
"clean")
cleanup
;;
@@ -257,10 +246,7 @@ main() {
esac
print_success "✨ 基准测试执行完成!"
print_info "💡 提示: 推荐使用默认的纯Erasure模式对于高性能需求可考虑SIMD模式"
}
# 如果直接运行此脚本,调用主函数
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
main "$@"
fi
# 启动脚本
main "$@"

View File

@@ -1,6 +1,7 @@
#![allow(unused_variables)]
#![allow(dead_code)]
// use error::Error;
use crate::StorageAPI;
use crate::bucket::metadata_sys::get_replication_config;
use crate::bucket::versioning_sys::BucketVersioningSys;
use crate::error::Error;
@@ -11,26 +12,25 @@ use crate::store_api::ObjectIO;
use crate::store_api::ObjectInfo;
use crate::store_api::ObjectOptions;
use crate::store_api::ObjectToDelete;
use crate::StorageAPI;
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::Config;
use aws_sdk_s3::config::BehaviorVersion;
use aws_sdk_s3::config::Credentials;
use aws_sdk_s3::config::Region;
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::Config;
use bytes::Bytes;
use chrono::DateTime;
use chrono::Duration;
use chrono::Utc;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use http::HeaderMap;
use http::Method;
use lazy_static::lazy_static;
// use std::time::SystemTime;
use once_cell::sync::Lazy;
use regex::Regex;
use rustfs_rsc::provider::StaticProvider;
use rustfs_rsc::Minio;
use rustfs_rsc::provider::StaticProvider;
use s3s::dto::DeleteMarkerReplicationStatus;
use s3s::dto::DeleteReplicationStatus;
use s3s::dto::ExistingObjectReplicationStatus;
@@ -43,14 +43,14 @@ use std::collections::HashSet;
use std::fmt;
use std::iter::Iterator;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::vec;
use time::OffsetDateTime;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task;
use tracing::{debug, error, info, warn};
use uuid::Uuid;

View File

@@ -1,4 +1,4 @@
use super::{storageclass, Config, GLOBAL_StorageClass};
use super::{Config, GLOBAL_StorageClass, storageclass};
use crate::disk::RUSTFS_META_BUCKET;
use crate::error::{Error, Result};
use crate::store_api::{ObjectInfo, ObjectOptions, PutObjReader, StorageAPI};

View File

@@ -5,7 +5,7 @@ pub mod storageclass;
use crate::error::Result;
use crate::store::ECStore;
use com::{lookup_configs, read_config_without_migrate, STORAGE_CLASS_SUB_SYS};
use com::{STORAGE_CLASS_SUB_SYS, lookup_configs, read_config_without_migrate};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

View File

@@ -1,27 +1,15 @@
//! Erasure coding implementation supporting multiple Reed-Solomon backends.
//! Erasure coding implementation using Reed-Solomon SIMD backend.
//!
//! This module provides erasure coding functionality with support for two different
//! Reed-Solomon implementations:
//! This module provides erasure coding functionality with high-performance SIMD
//! Reed-Solomon implementation:
//!
//! ## Reed-Solomon Implementations
//! ## Reed-Solomon Implementation
//!
//! ### Pure Erasure Mode (Default)
//! - **Stability**: Pure erasure implementation, mature and well-tested
//! - **Performance**: Good performance with consistent behavior
//! - **Compatibility**: Works with any shard size
//! - **Use case**: Default behavior, recommended for most production use cases
//!
//! ### SIMD Mode (`reed-solomon-simd` feature)
//! ### SIMD Mode (Only)
//! - **Performance**: Uses SIMD optimization for high-performance encoding/decoding
//! - **Compatibility**: Works with any shard size through SIMD implementation
//! - **Reliability**: High-performance SIMD implementation for large data processing
//! - **Use case**: Use when maximum performance is needed for large data processing
//!
//! ## Feature Flags
//!
//! - Default: Use pure reed-solomon-erasure implementation (stable and reliable)
//! - `reed-solomon-simd`: Use SIMD mode for optimal performance
//! - `reed-solomon-erasure`: Explicitly enable pure erasure mode (same as default)
//! - **Use case**: Optimized for maximum performance in large data processing scenarios
//!
//! ## Example
//!
@@ -35,8 +23,6 @@
//! ```
use bytes::{Bytes, BytesMut};
use reed_solomon_erasure::galois_8::ReedSolomon as ReedSolomonErasure;
#[cfg(feature = "reed-solomon-simd")]
use reed_solomon_simd;
use smallvec::SmallVec;
use std::io;
@@ -44,38 +30,23 @@ use tokio::io::AsyncRead;
use tracing::warn;
use uuid::Uuid;
/// Reed-Solomon encoder variants supporting different implementations.
#[allow(clippy::large_enum_variant)]
pub enum ReedSolomonEncoder {
/// SIMD mode: High-performance SIMD implementation (when reed-solomon-simd feature is enabled)
#[cfg(feature = "reed-solomon-simd")]
SIMD {
data_shards: usize,
parity_shards: usize,
// 使用RwLock确保线程安全实现Send + Sync
encoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonEncoder>>,
decoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonDecoder>>,
},
/// Pure erasure mode: default and when reed-solomon-erasure feature is specified
Erasure(Box<ReedSolomonErasure>),
/// Reed-Solomon encoder using SIMD implementation.
pub struct ReedSolomonEncoder {
data_shards: usize,
parity_shards: usize,
// 使用RwLock确保线程安全实现Send + Sync
encoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonEncoder>>,
decoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonDecoder>>,
}
impl Clone for ReedSolomonEncoder {
fn clone(&self) -> Self {
match self {
#[cfg(feature = "reed-solomon-simd")]
ReedSolomonEncoder::SIMD {
data_shards,
parity_shards,
..
} => ReedSolomonEncoder::SIMD {
data_shards: *data_shards,
parity_shards: *parity_shards,
// 为新实例创建空的缓存,不共享缓存
encoder_cache: std::sync::RwLock::new(None),
decoder_cache: std::sync::RwLock::new(None),
},
ReedSolomonEncoder::Erasure(encoder) => ReedSolomonEncoder::Erasure(encoder.clone()),
Self {
data_shards: self.data_shards,
parity_shards: self.parity_shards,
// 为新实例创建空的缓存,不共享缓存
encoder_cache: std::sync::RwLock::new(None),
decoder_cache: std::sync::RwLock::new(None),
}
}
}
@@ -83,81 +54,50 @@ impl Clone for ReedSolomonEncoder {
impl ReedSolomonEncoder {
/// Create a new Reed-Solomon encoder with specified data and parity shards.
pub fn new(data_shards: usize, parity_shards: usize) -> io::Result<Self> {
#[cfg(feature = "reed-solomon-simd")]
{
// SIMD mode when reed-solomon-simd feature is enabled
Ok(ReedSolomonEncoder::SIMD {
data_shards,
parity_shards,
encoder_cache: std::sync::RwLock::new(None),
decoder_cache: std::sync::RwLock::new(None),
})
}
#[cfg(not(feature = "reed-solomon-simd"))]
{
// Pure erasure mode when reed-solomon-simd feature is not enabled (default or reed-solomon-erasure)
let encoder = ReedSolomonErasure::new(data_shards, parity_shards)
.map_err(|e| io::Error::other(format!("Failed to create erasure encoder: {:?}", e)))?;
Ok(ReedSolomonEncoder::Erasure(Box::new(encoder)))
}
Ok(ReedSolomonEncoder {
data_shards,
parity_shards,
encoder_cache: std::sync::RwLock::new(None),
decoder_cache: std::sync::RwLock::new(None),
})
}
/// Encode data shards with parity.
pub fn encode(&self, shards: SmallVec<[&mut [u8]; 16]>) -> io::Result<()> {
match self {
#[cfg(feature = "reed-solomon-simd")]
ReedSolomonEncoder::SIMD {
data_shards,
parity_shards,
encoder_cache,
..
} => {
let mut shards_vec: Vec<&mut [u8]> = shards.into_vec();
if shards_vec.is_empty() {
return Ok(());
}
let mut shards_vec: Vec<&mut [u8]> = shards.into_vec();
if shards_vec.is_empty() {
return Ok(());
}
// 使用 SIMD 进行编码
let simd_result = self.encode_with_simd(*data_shards, *parity_shards, encoder_cache, &mut shards_vec);
// 使用 SIMD 进行编码
let simd_result = self.encode_with_simd(&mut shards_vec);
match simd_result {
Ok(()) => Ok(()),
Err(simd_error) => {
warn!("SIMD encoding failed: {}", simd_error);
Err(simd_error)
}
}
match simd_result {
Ok(()) => Ok(()),
Err(simd_error) => {
warn!("SIMD encoding failed: {}", simd_error);
Err(simd_error)
}
ReedSolomonEncoder::Erasure(encoder) => encoder
.encode(shards)
.map_err(|e| io::Error::other(format!("Erasure encode error: {:?}", e))),
}
}
#[cfg(feature = "reed-solomon-simd")]
fn encode_with_simd(
&self,
data_shards: usize,
parity_shards: usize,
encoder_cache: &std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonEncoder>>,
shards_vec: &mut [&mut [u8]],
) -> io::Result<()> {
fn encode_with_simd(&self, shards_vec: &mut [&mut [u8]]) -> io::Result<()> {
let shard_len = shards_vec[0].len();
// 获取或创建encoder
let mut encoder = {
let mut cache_guard = encoder_cache
let mut cache_guard = self
.encoder_cache
.write()
.map_err(|_| io::Error::other("Failed to acquire encoder cache lock"))?;
match cache_guard.take() {
Some(mut cached_encoder) => {
// 使用reset方法重置现有encoder以适应新的参数
if let Err(e) = cached_encoder.reset(data_shards, parity_shards, shard_len) {
if let Err(e) = cached_encoder.reset(self.data_shards, self.parity_shards, shard_len) {
warn!("Failed to reset SIMD encoder: {:?}, creating new one", e);
// 如果reset失败创建新的encoder
reed_solomon_simd::ReedSolomonEncoder::new(data_shards, parity_shards, shard_len)
reed_solomon_simd::ReedSolomonEncoder::new(self.data_shards, self.parity_shards, shard_len)
.map_err(|e| io::Error::other(format!("Failed to create SIMD encoder: {:?}", e)))?
} else {
cached_encoder
@@ -165,14 +105,14 @@ impl ReedSolomonEncoder {
}
None => {
// 第一次使用创建新encoder
reed_solomon_simd::ReedSolomonEncoder::new(data_shards, parity_shards, shard_len)
reed_solomon_simd::ReedSolomonEncoder::new(self.data_shards, self.parity_shards, shard_len)
.map_err(|e| io::Error::other(format!("Failed to create SIMD encoder: {:?}", e)))?
}
}
};
// 添加原始shards
for (i, shard) in shards_vec.iter().enumerate().take(data_shards) {
for (i, shard) in shards_vec.iter().enumerate().take(self.data_shards) {
encoder
.add_original_shard(shard)
.map_err(|e| io::Error::other(format!("Failed to add shard {}: {:?}", i, e)))?;
@@ -185,15 +125,16 @@ impl ReedSolomonEncoder {
// 将恢复shards复制到输出缓冲区
for (i, recovery_shard) in result.recovery_iter().enumerate() {
if i + data_shards < shards_vec.len() {
shards_vec[i + data_shards].copy_from_slice(recovery_shard);
if i + self.data_shards < shards_vec.len() {
shards_vec[i + self.data_shards].copy_from_slice(recovery_shard);
}
}
// 将encoder放回缓存在result被drop后encoder自动重置可以重用
drop(result); // 显式drop result确保encoder被重置
*encoder_cache
*self
.encoder_cache
.write()
.map_err(|_| io::Error::other("Failed to return encoder to cache"))? = Some(encoder);
@@ -202,39 +143,19 @@ impl ReedSolomonEncoder {
/// Reconstruct missing shards.
pub fn reconstruct(&self, shards: &mut [Option<Vec<u8>>]) -> io::Result<()> {
match self {
#[cfg(feature = "reed-solomon-simd")]
ReedSolomonEncoder::SIMD {
data_shards,
parity_shards,
decoder_cache,
..
} => {
// 使用 SIMD 进行重构
let simd_result = self.reconstruct_with_simd(*data_shards, *parity_shards, decoder_cache, shards);
// 使用 SIMD 进行重构
let simd_result = self.reconstruct_with_simd(shards);
match simd_result {
Ok(()) => Ok(()),
Err(simd_error) => {
warn!("SIMD reconstruction failed: {}", simd_error);
Err(simd_error)
}
}
match simd_result {
Ok(()) => Ok(()),
Err(simd_error) => {
warn!("SIMD reconstruction failed: {}", simd_error);
Err(simd_error)
}
ReedSolomonEncoder::Erasure(encoder) => encoder
.reconstruct(shards)
.map_err(|e| io::Error::other(format!("Erasure reconstruct error: {:?}", e))),
}
}
#[cfg(feature = "reed-solomon-simd")]
fn reconstruct_with_simd(
&self,
data_shards: usize,
parity_shards: usize,
decoder_cache: &std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonDecoder>>,
shards: &mut [Option<Vec<u8>>],
) -> io::Result<()> {
fn reconstruct_with_simd(&self, shards: &mut [Option<Vec<u8>>]) -> io::Result<()> {
// Find a valid shard to determine length
let shard_len = shards
.iter()
@@ -243,17 +164,18 @@ impl ReedSolomonEncoder {
// 获取或创建decoder
let mut decoder = {
let mut cache_guard = decoder_cache
let mut cache_guard = self
.decoder_cache
.write()
.map_err(|_| io::Error::other("Failed to acquire decoder cache lock"))?;
match cache_guard.take() {
Some(mut cached_decoder) => {
// 使用reset方法重置现有decoder
if let Err(e) = cached_decoder.reset(data_shards, parity_shards, shard_len) {
if let Err(e) = cached_decoder.reset(self.data_shards, self.parity_shards, shard_len) {
warn!("Failed to reset SIMD decoder: {:?}, creating new one", e);
// 如果reset失败创建新的decoder
reed_solomon_simd::ReedSolomonDecoder::new(data_shards, parity_shards, shard_len)
reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len)
.map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {:?}", e)))?
} else {
cached_decoder
@@ -261,7 +183,7 @@ impl ReedSolomonEncoder {
}
None => {
// 第一次使用创建新decoder
reed_solomon_simd::ReedSolomonDecoder::new(data_shards, parity_shards, shard_len)
reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len)
.map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {:?}", e)))?
}
}
@@ -270,12 +192,12 @@ impl ReedSolomonEncoder {
// Add available shards (both data and parity)
for (i, shard_opt) in shards.iter().enumerate() {
if let Some(shard) = shard_opt {
if i < data_shards {
if i < self.data_shards {
decoder
.add_original_shard(i, shard)
.map_err(|e| io::Error::other(format!("Failed to add original shard for reconstruction: {:?}", e)))?;
} else {
let recovery_idx = i - data_shards;
let recovery_idx = i - self.data_shards;
decoder
.add_recovery_shard(recovery_idx, shard)
.map_err(|e| io::Error::other(format!("Failed to add recovery shard for reconstruction: {:?}", e)))?;
@@ -289,7 +211,7 @@ impl ReedSolomonEncoder {
// Fill in missing data shards from reconstruction result
for (i, shard_opt) in shards.iter_mut().enumerate() {
if shard_opt.is_none() && i < data_shards {
if shard_opt.is_none() && i < self.data_shards {
for (restored_index, restored_data) in result.restored_original_iter() {
if restored_index == i {
*shard_opt = Some(restored_data.to_vec());
@@ -302,7 +224,8 @@ impl ReedSolomonEncoder {
// 将decoder放回缓存在result被drop后decoder自动重置可以重用
drop(result); // 显式drop result确保decoder被重置
*decoder_cache
*self
.decoder_cache
.write()
.map_err(|_| io::Error::other("Failed to return decoder to cache"))? = Some(decoder);
@@ -592,19 +515,10 @@ mod tests {
fn test_encode_decode_roundtrip() {
let data_shards = 4;
let parity_shards = 2;
// Use different block sizes based on feature
#[cfg(not(feature = "reed-solomon-simd"))]
let block_size = 8; // Pure erasure mode (default)
#[cfg(feature = "reed-solomon-simd")]
let block_size = 1024; // SIMD mode - SIMD with fallback
let block_size = 1024; // SIMD mode
let erasure = Erasure::new(data_shards, parity_shards, block_size);
// Use different test data based on feature
#[cfg(not(feature = "reed-solomon-simd"))]
let test_data = b"hello world".to_vec(); // Small data for erasure (default)
#[cfg(feature = "reed-solomon-simd")]
// Use sufficient test data for SIMD optimization
let test_data = b"SIMD mode test data for encoding and decoding roundtrip verification with sufficient length to ensure shard size requirements are met for proper SIMD optimization.".repeat(20); // ~3KB for SIMD
let data = &test_data;
@@ -632,13 +546,7 @@ mod tests {
fn test_encode_decode_large_1m() {
let data_shards = 4;
let parity_shards = 2;
// Use different block sizes based on feature
#[cfg(feature = "reed-solomon-simd")]
let block_size = 512 * 3; // SIMD mode
#[cfg(not(feature = "reed-solomon-simd"))]
let block_size = 8192; // Pure erasure mode (default)
let erasure = Erasure::new(data_shards, parity_shards, block_size);
// Generate 1MB test data
@@ -704,16 +612,10 @@ mod tests {
let data_shards = 4;
let parity_shards = 2;
// Use different block sizes based on feature
#[cfg(feature = "reed-solomon-simd")]
let block_size = 1024; // SIMD mode
#[cfg(not(feature = "reed-solomon-simd"))]
let block_size = 8; // Pure erasure mode (default)
let erasure = Arc::new(Erasure::new(data_shards, parity_shards, block_size));
// Use test data suitable for both modes
// Use test data suitable for SIMD mode
let data =
b"Async error test data with sufficient length to meet requirements for proper testing and validation.".repeat(20); // ~2KB
@@ -747,13 +649,7 @@ mod tests {
let data_shards = 4;
let parity_shards = 2;
// Use different block sizes based on feature
#[cfg(feature = "reed-solomon-simd")]
let block_size = 1024; // SIMD mode
#[cfg(not(feature = "reed-solomon-simd"))]
let block_size = 8; // Pure erasure mode (default)
let erasure = Arc::new(Erasure::new(data_shards, parity_shards, block_size));
// Use test data that fits in exactly one block to avoid multi-block complexity
@@ -761,8 +657,6 @@ mod tests {
b"Channel async callback test data with sufficient length to ensure proper operation and validation requirements."
.repeat(8); // ~1KB
// let data = b"callback".to_vec(); // 8 bytes to fit exactly in one 8-byte block
let data_clone = data.clone(); // Clone for later comparison
let mut reader = Cursor::new(data);
let (tx, mut rx) = mpsc::channel::<Vec<Bytes>>(8);
@@ -801,8 +695,7 @@ mod tests {
assert_eq!(&recovered, &data_clone);
}
// Tests specifically for SIMD mode
#[cfg(feature = "reed-solomon-simd")]
// SIMD mode specific tests
mod simd_tests {
use super::*;
@@ -1171,47 +1064,4 @@ mod tests {
assert_eq!(&recovered, &data_clone);
}
}
// Comparative tests between different implementations
#[cfg(not(feature = "reed-solomon-simd"))]
mod comparative_tests {
use super::*;
#[test]
fn test_implementation_consistency() {
let data_shards = 4;
let parity_shards = 2;
let block_size = 2048; // Large enough for SIMD requirements
// Create test data that ensures each shard is >= 512 bytes (SIMD minimum)
let test_data = b"This is test data for comparing reed-solomon-simd and reed-solomon-erasure implementations to ensure they produce consistent results when given the same input parameters and data. This data needs to be sufficiently large to meet SIMD requirements.";
let data = test_data.repeat(50); // Create much larger data: ~13KB total, ~3.25KB per shard
// Test with erasure implementation (default)
let erasure_erasure = Erasure::new(data_shards, parity_shards, block_size);
let erasure_shards = erasure_erasure.encode_data(&data).unwrap();
// Test data integrity with erasure
let mut erasure_shards_opt: Vec<Option<Vec<u8>>> = erasure_shards.iter().map(|shard| Some(shard.to_vec())).collect();
// Lose some shards
erasure_shards_opt[1] = None; // Data shard
erasure_shards_opt[4] = None; // Parity shard
erasure_erasure.decode_data(&mut erasure_shards_opt).unwrap();
let mut erasure_recovered = Vec::new();
for shard in erasure_shards_opt.iter().take(data_shards) {
erasure_recovered.extend_from_slice(shard.as_ref().unwrap());
}
erasure_recovered.truncate(data.len());
// Verify erasure implementation works correctly
assert_eq!(&erasure_recovered, &data, "Erasure implementation failed to recover data correctly");
println!("✅ Both implementations are available and working correctly");
println!("✅ Default (reed-solomon-erasure): Data recovery successful");
println!("✅ SIMD tests are available as separate test suite");
}
}
}

View File

@@ -1,8 +1,8 @@
use crate::bitrot::{create_bitrot_reader, create_bitrot_writer};
use crate::disk::error_reduce::{reduce_read_quorum_errs, reduce_write_quorum_errs, OBJECT_OP_IGNORED_ERRS};
use crate::disk::error_reduce::{OBJECT_OP_IGNORED_ERRS, reduce_read_quorum_errs, reduce_write_quorum_errs};
use crate::disk::{
self, conv_part_err_to_int, has_part_err, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT,
CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS,
self, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS,
conv_part_err_to_int, has_part_err,
};
use crate::erasure_coding;
use crate::erasure_coding::bitrot_verify;
@@ -12,24 +12,24 @@ use crate::heal::data_usage_cache::DataUsageCache;
use crate::heal::heal_ops::{HealEntryFn, HealSequence};
use crate::store_api::ObjectToDelete;
use crate::{
cache_value::metacache_set::{list_path_raw, ListPathRawOptions},
config::{storageclass, GLOBAL_StorageClass},
cache_value::metacache_set::{ListPathRawOptions, list_path_raw},
config::{GLOBAL_StorageClass, storageclass},
disk::{
endpoint::Endpoint, error::DiskError, format::FormatV3, new_disk, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo,
DiskInfoOptions, DiskOption, DiskStore, FileInfoVersions, ReadMultipleReq, ReadMultipleResp,
ReadOptions, UpdateMetadataOpts, RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET,
CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskOption, DiskStore, FileInfoVersions,
RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET, ReadMultipleReq, ReadMultipleResp, ReadOptions,
UpdateMetadataOpts, endpoint::Endpoint, error::DiskError, format::FormatV3, new_disk,
},
error::{to_object_err, StorageError},
error::{StorageError, to_object_err},
global::{
get_global_deployment_id, is_dist_erasure, GLOBAL_BackgroundHealState, GLOBAL_LOCAL_DISK_MAP,
GLOBAL_LOCAL_DISK_SET_DRIVES,
GLOBAL_BackgroundHealState, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES, get_global_deployment_id,
is_dist_erasure,
},
heal::{
data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT},
data_usage_cache::{DataUsageCacheInfo, DataUsageEntry, DataUsageEntryInfo},
heal_commands::{
HealOpts, HealScanMode, HealingTracker, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE,
DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT, HEAL_NORMAL_SCAN,
DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT,
HEAL_NORMAL_SCAN, HealOpts, HealScanMode, HealingTracker,
},
heal_ops::BG_HEALING_UUID,
},
@@ -42,7 +42,7 @@ use crate::{
};
use crate::{disk::STORAGE_FORMAT_FILE, heal::mrf::PartialOperation};
use crate::{
heal::data_scanner::{globalHealConfig, HEAL_DELETE_DANGLING},
heal::data_scanner::{HEAL_DELETE_DANGLING, globalHealConfig},
store_api::ListObjectVersionsInfo,
};
use bytes::Bytes;
@@ -51,22 +51,22 @@ use chrono::Utc;
use futures::future::join_all;
use glob::Pattern;
use http::HeaderMap;
use lock::{namespace_lock::NsLockMap, LockApi};
use lock::{LockApi, namespace_lock::NsLockMap};
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
use md5::{Digest as Md5Digest, Md5};
use rand::{seq::SliceRandom, Rng};
use rand::{Rng, seq::SliceRandom};
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::{
file_info_from_raw, headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS}, merge_file_meta_versions, FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries,
MetaCacheEntry, MetadataResolutionParams,
ObjectPartInfo,
RawFileInfo,
FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, ObjectPartInfo,
RawFileInfo, file_info_from_raw,
headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS},
merge_file_meta_versions,
};
use rustfs_rio::{EtagResolvable, HashReader, TryGetIndex as _, WarpReader};
use rustfs_utils::{
crypto::{base64_decode, base64_encode, hex},
path::{encode_dir_object, has_suffix, path_join_buf, SLASH_SEPARATOR},
HashAlgorithm,
crypto::{base64_decode, base64_encode, hex},
path::{SLASH_SEPARATOR, encode_dir_object, has_suffix, path_join_buf},
};
use sha2::Sha256;
use std::hash::Hash;
@@ -82,7 +82,7 @@ use std::{
use time::OffsetDateTime;
use tokio::{
io::AsyncWrite,
sync::{broadcast, RwLock},
sync::{RwLock, broadcast},
};
use tokio::{
select,
@@ -5837,9 +5837,9 @@ fn get_complete_multipart_md5(parts: &[CompletePart]) -> String {
#[cfg(test)]
mod tests {
use super::*;
use crate::disk::error::DiskError;
use crate::disk::CHECK_PART_UNKNOWN;
use crate::disk::CHECK_PART_VOLUME_NOT_FOUND;
use crate::disk::error::DiskError;
use crate::store_api::CompletePart;
use rustfs_filemeta::ErasureInfo;
use std::collections::HashMap;

View File

@@ -8,10 +8,10 @@ use crate::{disk::DiskStore, heal::heal_commands::HealOpts};
use http::{HeaderMap, HeaderValue};
use madmin::heal_commands::HealResultItem;
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::{headers::AMZ_OBJECT_TAGGING, FileInfo, MetaCacheEntriesSorted, ObjectPartInfo};
use rustfs_filemeta::{FileInfo, MetaCacheEntriesSorted, ObjectPartInfo, headers::AMZ_OBJECT_TAGGING};
use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader};
use rustfs_utils::path::decode_dir_object;
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::path::decode_dir_object;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Debug;

View File

@@ -1,24 +1,24 @@
use crate::StorageAPI;
use crate::bucket::metadata_sys::get_versioning_config;
use crate::bucket::versioning::VersioningApi;
use crate::cache_value::metacache_set::{list_path_raw, ListPathRawOptions};
use crate::cache_value::metacache_set::{ListPathRawOptions, list_path_raw};
use crate::disk::error::DiskError;
use crate::disk::{DiskInfo, DiskStore};
use crate::error::{
is_all_not_found, is_all_volume_not_found, is_err_bucket_not_found, to_object_err, Error, Result, StorageError,
Error, Result, StorageError, is_all_not_found, is_all_volume_not_found, is_err_bucket_not_found, to_object_err,
};
use crate::set_disk::SetDisks;
use crate::store::check_list_objs_args;
use crate::store_api::{ListObjectVersionsInfo, ListObjectsInfo, ObjectInfo, ObjectOptions};
use crate::store_utils::is_reserved_or_invalid_bucket;
use crate::StorageAPI;
use crate::{store::ECStore, store_api::ListObjectsV2Info};
use futures::future::join_all;
use rand::seq::SliceRandom;
use rustfs_filemeta::{
merge_file_meta_versions, FileInfo, MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntriesSortedResult, MetaCacheEntry,
MetadataResolutionParams,
FileInfo, MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntriesSortedResult, MetaCacheEntry, MetadataResolutionParams,
merge_file_meta_versions,
};
use rustfs_utils::path::{self, base_dir_from_prefix, SLASH_SEPARATOR};
use rustfs_utils::path::{self, SLASH_SEPARATOR, base_dir_from_prefix};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast::{self, Receiver as B_Receiver};

View File

@@ -1,21 +1,21 @@
use crate::error::{is_err_config_not_found, Error, Result};
use crate::error::{Error, Result, is_err_config_not_found};
use crate::{
cache::{Cache, CacheEntity},
error::{is_err_no_such_group, is_err_no_such_policy, is_err_no_such_user, Error as IamError},
store::{object::IAM_CONFIG_PREFIX, GroupInfo, MappedPolicy, Store, UserType},
error::{Error as IamError, is_err_no_such_group, is_err_no_such_policy, is_err_no_such_user},
store::{GroupInfo, MappedPolicy, Store, UserType, object::IAM_CONFIG_PREFIX},
sys::{
UpdateServiceAccountOpts, MAX_SVCSESSION_POLICY_SIZE, SESSION_POLICY_NAME, SESSION_POLICY_NAME_EXTRACTED, STATUS_DISABLED,
STATUS_ENABLED,
MAX_SVCSESSION_POLICY_SIZE, SESSION_POLICY_NAME, SESSION_POLICY_NAME_EXTRACTED, STATUS_DISABLED, STATUS_ENABLED,
UpdateServiceAccountOpts,
},
};
use ecstore::global::get_global_action_cred;
use madmin::{AccountStatus, AddOrUpdateUserReq, GroupDesc};
use policy::{
arn::ARN,
auth::{self, get_claims_from_token_with_secret, is_secret_key_valid, jwt_sign, Credentials, UserIdentity},
auth::{self, Credentials, UserIdentity, get_claims_from_token_with_secret, is_secret_key_valid, jwt_sign},
format::Format,
policy::{
default::DEFAULT_POLICIES, iam_policy_claim_name_sa, Policy, PolicyDoc, EMBEDDED_POLICY_TYPE, INHERITED_POLICY_TYPE,
EMBEDDED_POLICY_TYPE, INHERITED_POLICY_TYPE, Policy, PolicyDoc, default::DEFAULT_POLICIES, iam_policy_claim_name_sa,
},
};
use rustfs_utils::crypto::base64_encode;
@@ -25,8 +25,8 @@ use serde_json::Value;
use std::{
collections::{HashMap, HashSet},
sync::{
atomic::{AtomicBool, AtomicI64, Ordering},
Arc,
atomic::{AtomicBool, AtomicI64, Ordering},
},
time::Duration,
};

View File

@@ -10,12 +10,12 @@ use http::StatusCode;
use hyper::Method;
use matchit::Params;
use rustfs_utils::net::bytes_stream;
use s3s::dto::StreamingBlob;
use s3s::s3_error;
use s3s::Body;
use s3s::S3Request;
use s3s::S3Response;
use s3s::S3Result;
use s3s::dto::StreamingBlob;
use s3s::s3_error;
use serde_urlencoded::from_bytes;
use tokio::io::AsyncWriteExt;
use tokio_util::io::ReaderStream;

View File

@@ -12,9 +12,9 @@ mod service;
mod storage;
use crate::auth::IAMAuth;
use crate::console::{init_console_cfg, CONSOLE_CONFIG};
use crate::console::{CONSOLE_CONFIG, init_console_cfg};
// Ensure the correct path for parse_license is imported
use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT};
use crate::server::{SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, wait_for_shutdown};
use bytes::Bytes;
use chrono::Datelike;
use clap::Parser;
@@ -22,6 +22,7 @@ use common::{
// error::{Error, Result},
globals::set_global_addr,
};
use ecstore::StorageAPI;
use ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
use ecstore::cmd::bucket_replication::init_bucket_replication_pool;
use ecstore::config as ecconfig;
@@ -29,12 +30,11 @@ use ecstore::config::GLOBAL_ConfigSys;
use ecstore::heal::background_heal_ops::init_auto_heal;
use ecstore::rpc::make_server;
use ecstore::store_api::BucketOptions;
use ecstore::StorageAPI;
use ecstore::{
endpoints::EndpointServerPools,
heal::data_scanner::init_data_scanner,
set_global_endpoints,
store::{init_local_disks, ECStore},
store::{ECStore, init_local_disks},
update_erasure_type,
};
use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys};
@@ -49,7 +49,7 @@ use iam::init_iam_sys;
use license::init_license;
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_obs::{init_obs, set_global_guard, SystemObserver};
use rustfs_obs::{SystemObserver, init_obs, set_global_guard};
use rustfs_utils::net::parse_and_resolve_address;
use rustls::ServerConfig;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
@@ -60,13 +60,13 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::signal::unix::{signal, SignalKind};
use tokio::signal::unix::{SignalKind, signal};
use tokio_rustls::TlsAcceptor;
use tonic::{metadata::MetadataValue, Request, Status};
use tonic::{Request, Status, metadata::MetadataValue};
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use tracing::{Span, instrument};
use tracing::{debug, error, info, warn};
use tracing::{instrument, Span};
const MI_B: usize = 1024 * 1024;

View File

@@ -16,8 +16,8 @@ use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG;
use ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG;
use ecstore::bucket::metadata::BUCKET_POLICY_CONFIG;
@@ -32,13 +32,13 @@ use ecstore::bucket::tagging::decode_tags;
use ecstore::bucket::tagging::encode_tags;
use ecstore::bucket::utils::serialize;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::cmd::bucket_replication::ReplicationStatusType;
use ecstore::cmd::bucket_replication::ReplicationType;
use ecstore::cmd::bucket_replication::get_must_replicate_options;
use ecstore::cmd::bucket_replication::must_replicate;
use ecstore::cmd::bucket_replication::schedule_replication;
use ecstore::cmd::bucket_replication::ReplicationStatusType;
use ecstore::cmd::bucket_replication::ReplicationType;
use ecstore::compress::is_compressible;
use ecstore::compress::MIN_COMPRESSIBLE_SIZE;
use ecstore::compress::is_compressible;
use ecstore::error::StorageError;
use ecstore::new_object_layer_fn;
use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE;
@@ -58,11 +58,11 @@ use futures::StreamExt;
use http::HeaderMap;
use lazy_static::lazy_static;
use policy::auth;
use policy::policy::action::Action;
use policy::policy::action::S3Action;
use policy::policy::BucketPolicy;
use policy::policy::BucketPolicyArgs;
use policy::policy::Validator;
use policy::policy::action::Action;
use policy::policy::action::S3Action;
use query::instance::make_rustfsms;
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
@@ -70,23 +70,23 @@ use rustfs_rio::CompressReader;
use rustfs_rio::HashReader;
use rustfs_rio::Reader;
use rustfs_rio::WarpReader;
use rustfs_utils::path::path_join_buf;
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::path::path_join_buf;
use rustfs_zip::CompressionFormat;
use s3s::dto::*;
use s3s::s3_error;
use s3s::S3;
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::S3Result;
use s3s::S3;
use s3s::dto::*;
use s3s::s3_error;
use s3s::{S3Request, S3Response};
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_tar::Archive;