mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
feat: migrate to reed-solomon-simd only implementation
- Remove reed-solomon-erasure dependency and all related code - Simplify ReedSolomonEncoder from enum to struct with SIMD-only implementation - Eliminate all conditional compilation (#[cfg(feature = ...)]) - Add instance caching with RwLock-based encoder/decoder reuse - Implement reset mechanism to avoid unnecessary allocations - Ensure thread safety with proper cache management - Update documentation and benchmark scripts for SIMD-only approach - Apply code formatting across all files Breaking Changes: - Removes support for reed-solomon-erasure feature flag - API remains compatible but implementation is now SIMD-only Performance Impact: - Improved encoding/decoding performance through SIMD optimization - Reduced memory allocations via instance caching - Enhanced thread safety and concurrency support
This commit is contained in:
125
Cargo.lock
generated
125
Cargo.lock
generated
@@ -52,17 +52,6 @@ dependencies = [
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.7.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9"
|
||||
dependencies = [
|
||||
"getrandom 0.2.16",
|
||||
"once_cell",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.8.12"
|
||||
@@ -301,7 +290,7 @@ version = "54.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a12fcdb3f1d03f69d3ec26ac67645a8fe3f878d77b5ebb0b15d64a116c212985"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"ahash",
|
||||
"arrow-buffer",
|
||||
"arrow-data",
|
||||
"arrow-schema",
|
||||
@@ -446,7 +435,7 @@ version = "54.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69efcd706420e52cd44f5c4358d279801993846d1c2a8e52111853d61d55a619"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"ahash",
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
"arrow-data",
|
||||
@@ -819,7 +808,7 @@ dependencies = [
|
||||
"http 0.2.12",
|
||||
"http 1.3.1",
|
||||
"http-body 0.4.6",
|
||||
"lru 0.12.5",
|
||||
"lru",
|
||||
"percent-encoding",
|
||||
"regex-lite",
|
||||
"sha2 0.10.9",
|
||||
@@ -2381,7 +2370,7 @@ dependencies = [
|
||||
"hashbrown 0.14.5",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core 0.9.11",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2395,7 +2384,7 @@ dependencies = [
|
||||
"hashbrown 0.14.5",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core 0.9.11",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2442,7 +2431,7 @@ dependencies = [
|
||||
"itertools 0.14.0",
|
||||
"log",
|
||||
"object_store",
|
||||
"parking_lot 0.12.4",
|
||||
"parking_lot",
|
||||
"parquet",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
@@ -2472,7 +2461,7 @@ dependencies = [
|
||||
"futures",
|
||||
"itertools 0.14.0",
|
||||
"log",
|
||||
"parking_lot 0.12.4",
|
||||
"parking_lot",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2503,7 +2492,7 @@ version = "46.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1f53d7ec508e1b3f68bd301cee3f649834fad51eff9240d898a4b2614cfd0a7a"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"ahash",
|
||||
"arrow",
|
||||
"arrow-ipc",
|
||||
"base64 0.22.1",
|
||||
@@ -2584,7 +2573,7 @@ dependencies = [
|
||||
"futures",
|
||||
"log",
|
||||
"object_store",
|
||||
"parking_lot 0.12.4",
|
||||
"parking_lot",
|
||||
"rand 0.8.5",
|
||||
"tempfile",
|
||||
"url",
|
||||
@@ -2659,7 +2648,7 @@ version = "46.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "adfc2d074d5ee4d9354fdcc9283d5b2b9037849237ddecb8942a29144b77ca05"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"ahash",
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
"datafusion-doc",
|
||||
@@ -2680,7 +2669,7 @@ version = "46.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1cbceba0f98d921309a9121b702bcd49289d383684cccabf9a92cda1602f3bbb"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"ahash",
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
"datafusion-expr-common",
|
||||
@@ -2720,7 +2709,7 @@ dependencies = [
|
||||
"datafusion-common",
|
||||
"datafusion-expr",
|
||||
"datafusion-physical-plan",
|
||||
"parking_lot 0.12.4",
|
||||
"parking_lot",
|
||||
"paste",
|
||||
]
|
||||
|
||||
@@ -2787,7 +2776,7 @@ version = "46.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1447c2c6bc8674a16be4786b4abf528c302803fafa186aa6275692570e64d85"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"ahash",
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
"datafusion-expr",
|
||||
@@ -2809,7 +2798,7 @@ version = "46.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69f8c25dcd069073a75b3d2840a79d0f81e64bdd2c05f2d3d18939afb36a7dcb"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"ahash",
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
"datafusion-expr-common",
|
||||
@@ -2842,7 +2831,7 @@ version = "46.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "88cc160df00e413e370b3b259c8ea7bfbebc134d32de16325950e9e923846b7f"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"ahash",
|
||||
"arrow",
|
||||
"arrow-ord",
|
||||
"arrow-schema",
|
||||
@@ -2861,7 +2850,7 @@ dependencies = [
|
||||
"indexmap 2.9.0",
|
||||
"itertools 0.14.0",
|
||||
"log",
|
||||
"parking_lot 0.12.4",
|
||||
"parking_lot",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
@@ -3412,7 +3401,7 @@ dependencies = [
|
||||
"futures-util",
|
||||
"generational-box",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.4",
|
||||
"parking_lot",
|
||||
"rustc-hash 1.1.0",
|
||||
"tracing",
|
||||
"warnings",
|
||||
@@ -3646,7 +3635,6 @@ dependencies = [
|
||||
"policy",
|
||||
"protos",
|
||||
"rand 0.9.1",
|
||||
"reed-solomon-erasure",
|
||||
"reed-solomon-simd",
|
||||
"regex",
|
||||
"reqwest",
|
||||
@@ -4244,7 +4232,7 @@ version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a673cf4fb0ea6a91aa86c08695756dfe875277a912cdbf33db9a9f62d47ed82b"
|
||||
dependencies = [
|
||||
"parking_lot 0.12.4",
|
||||
"parking_lot",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
@@ -4612,9 +4600,6 @@ name = "hashbrown"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
|
||||
dependencies = [
|
||||
"ahash 0.7.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
@@ -4622,7 +4607,7 @@ version = "0.14.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"ahash",
|
||||
"allocator-api2",
|
||||
]
|
||||
|
||||
@@ -5723,15 +5708,6 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b3bd0dd2cd90571056fdb71f6275fada10131182f84899f4b2a916e565d81d86"
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.7.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a"
|
||||
dependencies = [
|
||||
"hashbrown 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.12.5"
|
||||
@@ -6611,7 +6587,7 @@ dependencies = [
|
||||
"futures",
|
||||
"humantime",
|
||||
"itertools 0.13.0",
|
||||
"parking_lot 0.12.4",
|
||||
"parking_lot",
|
||||
"percent-encoding",
|
||||
"snafu",
|
||||
"tokio",
|
||||
@@ -6821,17 +6797,6 @@ version = "2.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
|
||||
dependencies = [
|
||||
"instant",
|
||||
"lock_api",
|
||||
"parking_lot_core 0.8.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.4"
|
||||
@@ -6839,21 +6804,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
"parking_lot_core 0.9.11",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot_core"
|
||||
version = "0.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"instant",
|
||||
"libc",
|
||||
"redox_syscall 0.2.16",
|
||||
"smallvec",
|
||||
"winapi",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6875,7 +6826,7 @@ version = "54.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bfb15796ac6f56b429fd99e33ba133783ad75b27c36b4b5ce06f1f82cc97754e"
|
||||
dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"ahash",
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
"arrow-cast",
|
||||
@@ -7554,7 +7505,7 @@ dependencies = [
|
||||
"derive_builder",
|
||||
"futures",
|
||||
"lazy_static",
|
||||
"parking_lot 0.12.4",
|
||||
"parking_lot",
|
||||
"s3s",
|
||||
"snafu",
|
||||
"tokio",
|
||||
@@ -7840,15 +7791,6 @@ dependencies = [
|
||||
"syn 2.0.103",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.3.5"
|
||||
@@ -7878,21 +7820,6 @@ dependencies = [
|
||||
"thiserror 2.0.12",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reed-solomon-erasure"
|
||||
version = "6.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"libm",
|
||||
"lru 0.7.8",
|
||||
"parking_lot 0.11.2",
|
||||
"smallvec",
|
||||
"spin",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reed-solomon-simd"
|
||||
version = "3.0.1"
|
||||
@@ -9485,7 +9412,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bf776ba3fa74f83bf4b63c3dcbbf82173db2632ed8452cb2d891d33f459de70f"
|
||||
dependencies = [
|
||||
"new_debug_unreachable",
|
||||
"parking_lot 0.12.4",
|
||||
"parking_lot",
|
||||
"phf_shared 0.11.3",
|
||||
"precomputed-hash",
|
||||
"serde",
|
||||
@@ -9732,7 +9659,7 @@ dependencies = [
|
||||
"ndk-sys",
|
||||
"objc",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.4",
|
||||
"parking_lot",
|
||||
"raw-window-handle 0.5.2",
|
||||
"raw-window-handle 0.6.2",
|
||||
"scopeguard",
|
||||
@@ -10001,7 +9928,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"libc",
|
||||
"mio",
|
||||
"parking_lot 0.12.4",
|
||||
"parking_lot",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2",
|
||||
|
||||
@@ -167,7 +167,7 @@ flate2 = "1.1.1"
|
||||
zstd = "0.13.3"
|
||||
lz4 = "1.28.1"
|
||||
rdkafka = { version = "0.37.0", features = ["tokio"] }
|
||||
reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] }
|
||||
|
||||
reed-solomon-simd = { version = "3.0.0" }
|
||||
regex = { version = "1.11.1" }
|
||||
reqwest = { version = "0.12.20", default-features = false, features = [
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use rsa::Pkcs1v15Encrypt;
|
||||
use rsa::{
|
||||
RsaPrivateKey, RsaPublicKey,
|
||||
pkcs8::{DecodePrivateKey, DecodePublicKey},
|
||||
rand_core::OsRng,
|
||||
RsaPrivateKey, RsaPublicKey,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::{Error, Result};
|
||||
@@ -58,8 +58,8 @@ static TEST_PRIVATE_KEY: &str = "-----BEGIN PRIVATE KEY-----\nMIIEvAIBADANBgkqhk
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rsa::{
|
||||
pkcs8::{EncodePrivateKey, EncodePublicKey, LineEnding},
|
||||
RsaPrivateKey,
|
||||
pkcs8::{EncodePrivateKey, EncodePublicKey, LineEnding},
|
||||
};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
#[test]
|
||||
|
||||
@@ -5,7 +5,7 @@ use rustfs_notify::factory::{
|
||||
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
|
||||
};
|
||||
use rustfs_notify::store::DEFAULT_LIMIT;
|
||||
use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError};
|
||||
use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger};
|
||||
use rustfs_notify::{initialize, notification_system};
|
||||
use std::time::Duration;
|
||||
use tracing::info;
|
||||
|
||||
@@ -6,7 +6,7 @@ use rustfs_notify::factory::{
|
||||
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
|
||||
};
|
||||
use rustfs_notify::store::DEFAULT_LIMIT;
|
||||
use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError};
|
||||
use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger};
|
||||
use rustfs_notify::{initialize, notification_system};
|
||||
use std::time::Duration;
|
||||
use tracing::info;
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use axum::routing::get;
|
||||
use axum::{
|
||||
Router,
|
||||
extract::Json,
|
||||
http::{HeaderMap, Response, StatusCode},
|
||||
routing::post,
|
||||
Router,
|
||||
};
|
||||
use serde_json::Value;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
@@ -41,7 +41,7 @@ impl TargetID {
|
||||
ARN {
|
||||
target_id: self.clone(),
|
||||
region: region.to_string(),
|
||||
service: DEFAULT_ARN_SERVICE.to_string(), // Default Service
|
||||
service: DEFAULT_ARN_SERVICE.to_string(), // Default Service
|
||||
partition: DEFAULT_ARN_PARTITION.to_string(), // Default partition
|
||||
}
|
||||
}
|
||||
@@ -112,7 +112,7 @@ impl ARN {
|
||||
ARN {
|
||||
target_id,
|
||||
region,
|
||||
service: DEFAULT_ARN_SERVICE.to_string(), // Default is sqs
|
||||
service: DEFAULT_ARN_SERVICE.to_string(), // Default is sqs
|
||||
partition: DEFAULT_ARN_PARTITION.to_string(), // Default is rustfs partition
|
||||
}
|
||||
}
|
||||
@@ -121,16 +121,10 @@ impl ARN {
|
||||
/// Returns the ARN string in the format "{ARN_PREFIX}:{region}:{target_id}"
|
||||
#[allow(clippy::inherent_to_string)]
|
||||
pub fn to_arn_string(&self) -> String {
|
||||
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty()
|
||||
{
|
||||
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() {
|
||||
return String::new();
|
||||
}
|
||||
format!(
|
||||
"{}:{}:{}",
|
||||
ARN_PREFIX,
|
||||
self.region,
|
||||
self.target_id.to_id_string()
|
||||
)
|
||||
format!("{}:{}:{}", ARN_PREFIX, self.region, self.target_id.to_id_string())
|
||||
}
|
||||
|
||||
/// Parsing ARN from string
|
||||
@@ -162,8 +156,7 @@ impl ARN {
|
||||
|
||||
impl fmt::Display for ARN {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty()
|
||||
{
|
||||
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() {
|
||||
// Returns an empty string if all parts are empty
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::store::DEFAULT_LIMIT;
|
||||
use crate::{
|
||||
error::TargetError,
|
||||
target::{mqtt::MQTTArgs, webhook::WebhookArgs, Target},
|
||||
target::{Target, mqtt::MQTTArgs, webhook::WebhookArgs},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use ecstore::config::{ENABLE_KEY, ENABLE_ON, KVS};
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
use crate::arn::TargetID;
|
||||
use crate::store::{Key, Store};
|
||||
use crate::{
|
||||
error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, Event,
|
||||
StoreError, Target,
|
||||
Event, StoreError, Target, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry,
|
||||
rules::BucketNotificationConfig, stream,
|
||||
};
|
||||
use ecstore::config::{Config, KVS};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::{mpsc, RwLock, Semaphore};
|
||||
use tokio::sync::{RwLock, Semaphore, mpsc};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
/// Notify the system of monitoring indicators
|
||||
|
||||
@@ -26,7 +26,7 @@ pub use rules::BucketNotificationConfig;
|
||||
use std::io::IsTerminal;
|
||||
pub use target::Target;
|
||||
|
||||
use tracing_subscriber::{fmt, prelude::*, util::SubscriberInitExt, EnvFilter};
|
||||
use tracing_subscriber::{EnvFilter, fmt, prelude::*, util::SubscriberInitExt};
|
||||
|
||||
/// Initialize the tracing log system
|
||||
///
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::arn::TargetID;
|
||||
use crate::{error::NotificationError, event::Event, rules::RulesMap, target::Target, EventName};
|
||||
use crate::{EventName, error::NotificationError, event::Event, rules::RulesMap, target::Target};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
use super::rules_map::RulesMap;
|
||||
// Keep for existing structure if any, or remove if not used
|
||||
use super::xml_config::ParseConfigError as BucketNotificationConfigError;
|
||||
use crate::EventName;
|
||||
use crate::arn::TargetID;
|
||||
use crate::rules::NotificationConfiguration;
|
||||
use crate::rules::pattern_rules;
|
||||
use crate::rules::target_id_set;
|
||||
use crate::rules::NotificationConfiguration;
|
||||
use crate::EventName;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Read;
|
||||
// Assuming this is the XML config structure
|
||||
|
||||
@@ -78,10 +78,7 @@ mod tests {
|
||||
assert_eq!(new_pattern(Some(""), Some("b")), "*b");
|
||||
assert_eq!(new_pattern(None, None), "");
|
||||
assert_eq!(new_pattern(Some("prefix"), Some("suffix")), "prefix*suffix");
|
||||
assert_eq!(
|
||||
new_pattern(Some("prefix/"), Some("/suffix")),
|
||||
"prefix/*suffix"
|
||||
); // prefix/* + */suffix -> prefix/**/suffix -> prefix/*/suffix
|
||||
assert_eq!(new_pattern(Some("prefix/"), Some("/suffix")), "prefix/*suffix"); // prefix/* + */suffix -> prefix/**/suffix -> prefix/*/suffix
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -23,9 +23,7 @@ impl PatternRules {
|
||||
|
||||
/// Checks if there are any rules that match the given object name.
|
||||
pub fn match_simple(&self, object_name: &str) -> bool {
|
||||
self.rules
|
||||
.keys()
|
||||
.any(|p| pattern::match_simple(p, object_name))
|
||||
self.rules.keys().any(|p| pattern::match_simple(p, object_name))
|
||||
}
|
||||
|
||||
/// Returns all TargetIDs that match the object name.
|
||||
@@ -61,8 +59,7 @@ impl PatternRules {
|
||||
for (pattern, self_targets) in &self.rules {
|
||||
match other.rules.get(pattern) {
|
||||
Some(other_targets) => {
|
||||
let diff_targets: TargetIdSet =
|
||||
self_targets.difference(other_targets).cloned().collect();
|
||||
let diff_targets: TargetIdSet = self_targets.difference(other_targets).cloned().collect();
|
||||
if !diff_targets.is_empty() {
|
||||
result_rules.insert(pattern.clone(), diff_targets);
|
||||
}
|
||||
@@ -73,8 +70,6 @@ impl PatternRules {
|
||||
}
|
||||
}
|
||||
}
|
||||
PatternRules {
|
||||
rules: result_rules,
|
||||
}
|
||||
PatternRules { rules: result_rules }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use super::pattern;
|
||||
use crate::arn::{ArnError, TargetIDError, ARN};
|
||||
use crate::arn::{ARN, ArnError, TargetIDError};
|
||||
use crate::event::EventName;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::error::StoreError;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use serde::{Serialize, de::DeserializeOwned};
|
||||
use snap::raw::{Decoder, Encoder};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::{
|
||||
@@ -195,11 +195,7 @@ impl<T: Serialize + DeserializeOwned + Send + Sync> QueueStore<T> {
|
||||
/// Reads a file for the given key
|
||||
fn read_file(&self, key: &Key) -> Result<Vec<u8>, StoreError> {
|
||||
let path = self.file_path(key);
|
||||
debug!(
|
||||
"Reading file for key: {},path: {}",
|
||||
key.to_string(),
|
||||
path.display()
|
||||
);
|
||||
debug!("Reading file for key: {},path: {}", key.to_string(), path.display());
|
||||
let data = std::fs::read(&path).map_err(|e| {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
StoreError::NotFound
|
||||
@@ -240,13 +236,11 @@ impl<T: Serialize + DeserializeOwned + Send + Sync> QueueStore<T> {
|
||||
};
|
||||
|
||||
std::fs::write(&path, &data).map_err(StoreError::Io)?;
|
||||
let modified = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_nanos() as i64;
|
||||
let mut entries = self.entries.write().map_err(|_| {
|
||||
StoreError::Internal("Failed to acquire write lock on entries".to_string())
|
||||
})?;
|
||||
let modified = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as i64;
|
||||
let mut entries = self
|
||||
.entries
|
||||
.write()
|
||||
.map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?;
|
||||
entries.insert(key.to_string(), modified);
|
||||
debug!("Wrote event to store: {}", key.to_string());
|
||||
Ok(())
|
||||
@@ -265,18 +259,16 @@ where
|
||||
|
||||
let entries = std::fs::read_dir(&self.directory).map_err(StoreError::Io)?;
|
||||
// Get the write lock to update the internal state
|
||||
let mut entries_map = self.entries.write().map_err(|_| {
|
||||
StoreError::Internal("Failed to acquire write lock on entries".to_string())
|
||||
})?;
|
||||
let mut entries_map = self
|
||||
.entries
|
||||
.write()
|
||||
.map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?;
|
||||
for entry in entries {
|
||||
let entry = entry.map_err(StoreError::Io)?;
|
||||
let metadata = entry.metadata().map_err(StoreError::Io)?;
|
||||
if metadata.is_file() {
|
||||
let modified = metadata.modified().map_err(StoreError::Io)?;
|
||||
let unix_nano = modified
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_nanos() as i64;
|
||||
let unix_nano = modified.duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as i64;
|
||||
|
||||
let file_name = entry.file_name().to_string_lossy().to_string();
|
||||
entries_map.insert(file_name, unix_nano);
|
||||
@@ -290,9 +282,10 @@ where
|
||||
fn put(&self, item: T) -> Result<Self::Key, Self::Error> {
|
||||
// Check storage limits
|
||||
{
|
||||
let entries = self.entries.read().map_err(|_| {
|
||||
StoreError::Internal("Failed to acquire read lock on entries".to_string())
|
||||
})?;
|
||||
let entries = self
|
||||
.entries
|
||||
.read()
|
||||
.map_err(|_| StoreError::Internal("Failed to acquire read lock on entries".to_string()))?;
|
||||
|
||||
if entries.len() as u64 >= self.entry_limit {
|
||||
return Err(StoreError::LimitExceeded);
|
||||
@@ -307,8 +300,7 @@ where
|
||||
compress: true,
|
||||
};
|
||||
|
||||
let data =
|
||||
serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?;
|
||||
let data = serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?;
|
||||
self.write_file(&key, &data)?;
|
||||
|
||||
Ok(key)
|
||||
@@ -317,9 +309,10 @@ where
|
||||
fn put_multiple(&self, items: Vec<T>) -> Result<Self::Key, Self::Error> {
|
||||
// Check storage limits
|
||||
{
|
||||
let entries = self.entries.read().map_err(|_| {
|
||||
StoreError::Internal("Failed to acquire read lock on entries".to_string())
|
||||
})?;
|
||||
let entries = self
|
||||
.entries
|
||||
.read()
|
||||
.map_err(|_| StoreError::Internal("Failed to acquire read lock on entries".to_string()))?;
|
||||
|
||||
if entries.len() as u64 >= self.entry_limit {
|
||||
return Err(StoreError::LimitExceeded);
|
||||
@@ -327,9 +320,7 @@ where
|
||||
}
|
||||
if items.is_empty() {
|
||||
// Or return an error, or a special key?
|
||||
return Err(StoreError::Internal(
|
||||
"Cannot put_multiple with empty items list".to_string(),
|
||||
));
|
||||
return Err(StoreError::Internal("Cannot put_multiple with empty items list".to_string()));
|
||||
}
|
||||
let uuid = Uuid::new_v4();
|
||||
let key = Key {
|
||||
@@ -348,8 +339,7 @@ where
|
||||
for item in items {
|
||||
// If items are Vec<Event>, and Event is large, this could be inefficient.
|
||||
// The current get_multiple deserializes one by one.
|
||||
let item_data =
|
||||
serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?;
|
||||
let item_data = serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?;
|
||||
buffer.extend_from_slice(&item_data);
|
||||
// If using JSON array: buffer = serde_json::to_vec(&items)?
|
||||
}
|
||||
@@ -374,9 +364,7 @@ where
|
||||
debug!("Reading items from store for key: {}", key.to_string());
|
||||
let data = self.read_file(key)?;
|
||||
if data.is_empty() {
|
||||
return Err(StoreError::Deserialization(
|
||||
"Cannot deserialize empty data".to_string(),
|
||||
));
|
||||
return Err(StoreError::Deserialization("Cannot deserialize empty data".to_string()));
|
||||
}
|
||||
let mut items = Vec::with_capacity(key.item_count);
|
||||
|
||||
@@ -395,10 +383,7 @@ where
|
||||
match deserializer.next() {
|
||||
Some(Ok(item)) => items.push(item),
|
||||
Some(Err(e)) => {
|
||||
return Err(StoreError::Deserialization(format!(
|
||||
"Failed to deserialize item in batch: {}",
|
||||
e
|
||||
)));
|
||||
return Err(StoreError::Deserialization(format!("Failed to deserialize item in batch: {}", e)));
|
||||
}
|
||||
None => {
|
||||
// Reached end of stream sooner than item_count
|
||||
@@ -435,7 +420,10 @@ where
|
||||
std::fs::remove_file(&path).map_err(|e| {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
// If file not found, still try to remove from entries map in case of inconsistency
|
||||
warn!("File not found for key {} during del, but proceeding to remove from entries map.", key.to_string());
|
||||
warn!(
|
||||
"File not found for key {} during del, but proceeding to remove from entries map.",
|
||||
key.to_string()
|
||||
);
|
||||
StoreError::NotFound
|
||||
} else {
|
||||
StoreError::Io(e)
|
||||
@@ -443,17 +431,15 @@ where
|
||||
})?;
|
||||
|
||||
// Get the write lock to update the internal state
|
||||
let mut entries = self.entries.write().map_err(|_| {
|
||||
StoreError::Internal("Failed to acquire write lock on entries".to_string())
|
||||
})?;
|
||||
let mut entries = self
|
||||
.entries
|
||||
.write()
|
||||
.map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?;
|
||||
|
||||
if entries.remove(&key.to_string()).is_none() {
|
||||
// Key was not in the map, could be an inconsistency or already deleted.
|
||||
// This is not necessarily an error if the file deletion succeeded or was NotFound.
|
||||
debug!(
|
||||
"Key {} not found in entries map during del, might have been already removed.",
|
||||
key
|
||||
);
|
||||
debug!("Key {} not found in entries map during del, might have been already removed.", key);
|
||||
}
|
||||
debug!("Deleted event from store: {}", key.to_string());
|
||||
Ok(())
|
||||
@@ -492,7 +478,6 @@ where
|
||||
}
|
||||
|
||||
fn boxed_clone(&self) -> Box<dyn Store<T, Error = Self::Error, Key = Self::Key> + Send + Sync> {
|
||||
Box::new(self.clone())
|
||||
as Box<dyn Store<T, Error = Self::Error, Key = Self::Key> + Send + Sync>
|
||||
Box::new(self.clone()) as Box<dyn Store<T, Error = Self::Error, Key = Self::Key> + Send + Sync>
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
use crate::{
|
||||
error::TargetError, integration::NotificationMetrics,
|
||||
Event, StoreError,
|
||||
error::TargetError,
|
||||
integration::NotificationMetrics,
|
||||
store::{Key, Store},
|
||||
target::Target,
|
||||
Event,
|
||||
StoreError,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::{mpsc, Semaphore};
|
||||
use tokio::sync::{Semaphore, mpsc};
|
||||
use tokio::time::sleep;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
|
||||
@@ -1,22 +1,22 @@
|
||||
use crate::store::{Key, STORE_EXTENSION};
|
||||
use crate::target::ChannelTargetType;
|
||||
use crate::{
|
||||
arn::TargetID, error::TargetError,
|
||||
StoreError, Target,
|
||||
arn::TargetID,
|
||||
error::TargetError,
|
||||
event::{Event, EventLog},
|
||||
store::Store,
|
||||
StoreError,
|
||||
Target,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use rumqttc::{mqttbytes::Error as MqttBytesError, ConnectionError};
|
||||
use rumqttc::{AsyncClient, EventLoop, MqttOptions, Outgoing, Packet, QoS};
|
||||
use rumqttc::{ConnectionError, mqttbytes::Error as MqttBytesError};
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::sync::{mpsc, Mutex, OnceCell};
|
||||
use tokio::sync::{Mutex, OnceCell, mpsc};
|
||||
use tracing::{debug, error, info, instrument, trace, warn};
|
||||
use url::Url;
|
||||
use urlencoding;
|
||||
@@ -58,24 +58,19 @@ impl MQTTArgs {
|
||||
match self.broker.scheme() {
|
||||
"ws" | "wss" | "tcp" | "ssl" | "tls" | "tcps" | "mqtt" | "mqtts" => {}
|
||||
_ => {
|
||||
return Err(TargetError::Configuration(
|
||||
"unknown protocol in broker address".to_string(),
|
||||
));
|
||||
return Err(TargetError::Configuration("unknown protocol in broker address".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
if !self.queue_dir.is_empty() {
|
||||
let path = std::path::Path::new(&self.queue_dir);
|
||||
if !path.is_absolute() {
|
||||
return Err(TargetError::Configuration(
|
||||
"mqtt queueDir path should be absolute".to_string(),
|
||||
));
|
||||
return Err(TargetError::Configuration("mqtt queueDir path should be absolute".to_string()));
|
||||
}
|
||||
|
||||
if self.qos == QoS::AtMostOnce {
|
||||
return Err(TargetError::Configuration(
|
||||
"QoS should be AtLeastOnce (1) or ExactlyOnce (2) if queueDir is set"
|
||||
.to_string(),
|
||||
"QoS should be AtLeastOnce (1) or ExactlyOnce (2) if queueDir is set".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -107,21 +102,12 @@ impl MQTTTarget {
|
||||
let target_id = TargetID::new(id.clone(), ChannelTargetType::Mqtt.as_str().to_string());
|
||||
let queue_store = if !args.queue_dir.is_empty() {
|
||||
let base_path = PathBuf::from(&args.queue_dir);
|
||||
let unique_dir_name = format!(
|
||||
"rustfs-{}-{}-{}",
|
||||
ChannelTargetType::Mqtt.as_str(),
|
||||
target_id.name,
|
||||
target_id.id
|
||||
)
|
||||
.replace(":", "_");
|
||||
let unique_dir_name =
|
||||
format!("rustfs-{}-{}-{}", ChannelTargetType::Mqtt.as_str(), target_id.name, target_id.id).replace(":", "_");
|
||||
// Ensure the directory name is valid for filesystem
|
||||
let specific_queue_path = base_path.join(unique_dir_name);
|
||||
debug!(target_id = %target_id, path = %specific_queue_path.display(), "Initializing queue store for MQTT target");
|
||||
let store = crate::store::QueueStore::<Event>::new(
|
||||
specific_queue_path,
|
||||
args.queue_limit,
|
||||
STORE_EXTENSION,
|
||||
);
|
||||
let store = crate::store::QueueStore::<Event>::new(specific_queue_path, args.queue_limit, STORE_EXTENSION);
|
||||
if let Err(e) = store.open() {
|
||||
error!(
|
||||
target_id = %target_id,
|
||||
@@ -130,10 +116,7 @@ impl MQTTTarget {
|
||||
);
|
||||
return Err(TargetError::Storage(format!("{}", e)));
|
||||
}
|
||||
Some(Box::new(store)
|
||||
as Box<
|
||||
dyn Store<Event, Error = StoreError, Key = Key> + Send + Sync,
|
||||
>)
|
||||
Some(Box::new(store) as Box<dyn Store<Event, Error = StoreError, Key = Key> + Send + Sync>)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -175,18 +158,13 @@ impl MQTTTarget {
|
||||
debug!(target_id = %target_id_clone, "Initializing MQTT background task.");
|
||||
let host = args_clone.broker.host_str().unwrap_or("localhost");
|
||||
let port = args_clone.broker.port().unwrap_or(1883);
|
||||
let mut mqtt_options = MqttOptions::new(
|
||||
format!("rustfs_notify_{}", uuid::Uuid::new_v4()),
|
||||
host,
|
||||
port,
|
||||
);
|
||||
let mut mqtt_options = MqttOptions::new(format!("rustfs_notify_{}", uuid::Uuid::new_v4()), host, port);
|
||||
mqtt_options
|
||||
.set_keep_alive(args_clone.keep_alive)
|
||||
.set_max_packet_size(100 * 1024 * 1024, 100 * 1024 * 1024); // 100MB
|
||||
|
||||
if !args_clone.username.is_empty() {
|
||||
mqtt_options
|
||||
.set_credentials(args_clone.username.clone(), args_clone.password.clone());
|
||||
mqtt_options.set_credentials(args_clone.username.clone(), args_clone.password.clone());
|
||||
}
|
||||
|
||||
let (new_client, eventloop) = AsyncClient::new(mqtt_options, 10);
|
||||
@@ -206,12 +184,8 @@ impl MQTTTarget {
|
||||
*client_arc.lock().await = Some(new_client.clone());
|
||||
|
||||
info!(target_id = %target_id_clone, "Spawning MQTT event loop task.");
|
||||
let task_handle = tokio::spawn(run_mqtt_event_loop(
|
||||
eventloop,
|
||||
connected_arc.clone(),
|
||||
target_id_clone.clone(),
|
||||
cancel_rx,
|
||||
));
|
||||
let task_handle =
|
||||
tokio::spawn(run_mqtt_event_loop(eventloop, connected_arc.clone(), target_id_clone.clone(), cancel_rx));
|
||||
Ok(task_handle)
|
||||
})
|
||||
.await
|
||||
@@ -266,17 +240,13 @@ impl MQTTTarget {
|
||||
records: vec![event.clone()],
|
||||
};
|
||||
|
||||
let data = serde_json::to_vec(&log)
|
||||
.map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {}", e)))?;
|
||||
let data =
|
||||
serde_json::to_vec(&log).map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {}", e)))?;
|
||||
|
||||
// Vec<u8> Convert to String, only for printing logs
|
||||
let data_string = String::from_utf8(data.clone()).map_err(|e| {
|
||||
TargetError::Encoding(format!("Failed to convert event data to UTF-8: {}", e))
|
||||
})?;
|
||||
debug!(
|
||||
"Sending event to mqtt target: {}, event log: {}",
|
||||
self.id, data_string
|
||||
);
|
||||
let data_string = String::from_utf8(data.clone())
|
||||
.map_err(|e| TargetError::Encoding(format!("Failed to convert event data to UTF-8: {}", e)))?;
|
||||
debug!("Sending event to mqtt target: {}, event log: {}", self.id, data_string);
|
||||
|
||||
client
|
||||
.publish(&self.args.topic, self.args.qos, false, data)
|
||||
@@ -474,9 +444,7 @@ impl Target for MQTTTarget {
|
||||
if let Some(handle) = self.bg_task_manager.init_cell.get() {
|
||||
if handle.is_finished() {
|
||||
error!(target_id = %self.id, "MQTT background task has finished, possibly due to an error. Target is not active.");
|
||||
return Err(TargetError::Network(
|
||||
"MQTT background task terminated".to_string(),
|
||||
));
|
||||
return Err(TargetError::Network("MQTT background task terminated".to_string()));
|
||||
}
|
||||
}
|
||||
debug!(target_id = %self.id, "MQTT client not yet initialized or task not running/connected.");
|
||||
@@ -507,10 +475,7 @@ impl Target for MQTTTarget {
|
||||
}
|
||||
Err(e) => {
|
||||
error!(target_id = %self.id, error = %e, "Failed to save event to store");
|
||||
return Err(TargetError::Storage(format!(
|
||||
"Failed to save event to store: {}",
|
||||
e
|
||||
)));
|
||||
return Err(TargetError::Storage(format!("Failed to save event to store: {}", e)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -581,10 +546,7 @@ impl Target for MQTTTarget {
|
||||
error = %e,
|
||||
"Failed to get event from store"
|
||||
);
|
||||
return Err(TargetError::Storage(format!(
|
||||
"Failed to get event from store: {}",
|
||||
e
|
||||
)));
|
||||
return Err(TargetError::Storage(format!("Failed to get event from store: {}", e)));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -608,10 +570,7 @@ impl Target for MQTTTarget {
|
||||
}
|
||||
Err(e) => {
|
||||
error!(target_id = %self.id, error = %e, "Failed to delete event from store after send.");
|
||||
return Err(TargetError::Storage(format!(
|
||||
"Failed to delete event from store: {}",
|
||||
e
|
||||
)));
|
||||
return Err(TargetError::Storage(format!("Failed to delete event from store: {}", e)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
use crate::store::STORE_EXTENSION;
|
||||
use crate::target::ChannelTargetType;
|
||||
use crate::{
|
||||
arn::TargetID, error::TargetError,
|
||||
StoreError, Target,
|
||||
arn::TargetID,
|
||||
error::TargetError,
|
||||
event::{Event, EventLog},
|
||||
store::{Key, Store},
|
||||
StoreError,
|
||||
Target,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use reqwest::{Client, StatusCode, Url};
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
atomic::{AtomicBool, Ordering},
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
/// audit related metric descriptors
|
||||
///
|
||||
/// This module contains the metric descriptors for the audit subsystem.
|
||||
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
|
||||
|
||||
const TARGET_ID: &str = "target_id";
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// bucket level s3 metric descriptor
|
||||
use crate::metrics::{new_counter_md, new_gauge_md, new_histogram_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, new_histogram_md, subsystems};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref BUCKET_API_TRAFFIC_SENT_BYTES_MD: MetricDescriptor =
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// Bucket copy metric descriptor
|
||||
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
|
||||
|
||||
/// Bucket level replication metric descriptor
|
||||
pub const BUCKET_L: &str = "bucket";
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// Metric descriptors related to cluster configuration
|
||||
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref CONFIG_RRS_PARITY_MD: MetricDescriptor =
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// Erasure code set related metric descriptors
|
||||
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
|
||||
|
||||
/// The label for the pool ID
|
||||
pub const POOL_ID_L: &str = "pool_id";
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// Cluster health-related metric descriptors
|
||||
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref HEALTH_DRIVES_OFFLINE_COUNT_MD: MetricDescriptor =
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// IAM related metric descriptors
|
||||
use crate::metrics::{new_counter_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref LAST_SYNC_DURATION_MILLIS_MD: MetricDescriptor =
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// Notify the relevant metric descriptor
|
||||
use crate::metrics::{new_counter_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, subsystems};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref NOTIFICATION_CURRENT_SEND_IN_PROGRESS_MD: MetricDescriptor =
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// Descriptors of metrics related to cluster object and bucket usage
|
||||
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
|
||||
|
||||
/// Bucket labels
|
||||
pub const BUCKET_LABEL: &str = "bucket";
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// ILM-related metric descriptors
|
||||
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref ILM_EXPIRY_PENDING_TASKS_MD: MetricDescriptor =
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// A descriptor for metrics related to webhook logs
|
||||
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
|
||||
|
||||
/// Define label constants for webhook metrics
|
||||
/// name label
|
||||
|
||||
@@ -23,6 +23,6 @@ pub use entry::descriptor::MetricDescriptor;
|
||||
pub use entry::metric_name::MetricName;
|
||||
pub use entry::metric_type::MetricType;
|
||||
pub use entry::namespace::MetricNamespace;
|
||||
pub use entry::subsystem::subsystems;
|
||||
pub use entry::subsystem::MetricSubsystem;
|
||||
pub use entry::subsystem::subsystems;
|
||||
pub use entry::{new_counter_md, new_gauge_md, new_histogram_md};
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// Copy the relevant metric descriptor
|
||||
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref REPLICATION_AVERAGE_ACTIVE_WORKERS_MD: MetricDescriptor =
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName, MetricSubsystem};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, MetricSubsystem, new_counter_md, new_gauge_md, subsystems};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref API_REJECTED_AUTH_TOTAL_MD: MetricDescriptor =
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// Scanner-related metric descriptors
|
||||
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref SCANNER_BUCKET_SCANS_FINISHED_MD: MetricDescriptor =
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// CPU system-related metric descriptors
|
||||
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref SYS_CPU_AVG_IDLE_MD: MetricDescriptor =
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// Drive-related metric descriptors
|
||||
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
|
||||
|
||||
/// drive related labels
|
||||
pub const DRIVE_LABEL: &str = "drive";
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// Memory-related metric descriptors
|
||||
use crate::metrics::{new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_gauge_md, subsystems};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref MEM_TOTAL_MD: MetricDescriptor =
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// Network-related metric descriptors
|
||||
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref INTERNODE_ERRORS_TOTAL_MD: MetricDescriptor =
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/// process related metric descriptors
|
||||
use crate::metrics::{new_counter_md, new_gauge_md, subsystems, MetricDescriptor, MetricName};
|
||||
use crate::metrics::{MetricDescriptor, MetricName, new_counter_md, new_gauge_md, subsystems};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref PROCESS_LOCKS_READ_TOTAL_MD: MetricDescriptor =
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
use crate::OtelConfig;
|
||||
use flexi_logger::{style, Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode};
|
||||
use flexi_logger::{Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode, style};
|
||||
use nu_ansi_term::Color;
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
use opentelemetry::{global, KeyValue};
|
||||
use opentelemetry::{KeyValue, global};
|
||||
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
use opentelemetry_sdk::logs::SdkLoggerProvider;
|
||||
use opentelemetry_sdk::{
|
||||
Resource,
|
||||
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
|
||||
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
|
||||
Resource,
|
||||
};
|
||||
use opentelemetry_semantic_conventions::{
|
||||
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
|
||||
SCHEMA_URL,
|
||||
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
|
||||
};
|
||||
use rustfs_config::{
|
||||
APP_NAME, DEFAULT_LOG_DIR, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO,
|
||||
@@ -28,7 +28,7 @@ use tracing_error::ErrorLayer;
|
||||
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
|
||||
use tracing_subscriber::fmt::format::FmtSpan;
|
||||
use tracing_subscriber::fmt::time::LocalTime;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
|
||||
use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
/// A guard object that manages the lifecycle of OpenTelemetry components.
|
||||
///
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
mod user_agent;
|
||||
|
||||
pub use user_agent::get_user_agent;
|
||||
pub use user_agent::ServiceType;
|
||||
pub use user_agent::get_user_agent;
|
||||
|
||||
@@ -11,9 +11,7 @@ rust-version.workspace = true
|
||||
workspace = true
|
||||
|
||||
[features]
|
||||
default = ["reed-solomon-simd"]
|
||||
reed-solomon-simd = []
|
||||
reed-solomon-erasure = []
|
||||
default = []
|
||||
|
||||
[dependencies]
|
||||
rustfs-config = { workspace = true, features = ["constants"] }
|
||||
@@ -40,7 +38,6 @@ http.workspace = true
|
||||
highway = { workspace = true }
|
||||
url.workspace = true
|
||||
uuid = { workspace = true, features = ["v4", "fast-rng", "serde"] }
|
||||
reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] }
|
||||
reed-solomon-simd = { version = "3.0.0" }
|
||||
transform-stream = "0.3.1"
|
||||
lazy_static.workspace = true
|
||||
|
||||
@@ -1,38 +1,15 @@
|
||||
# ECStore - Erasure Coding Storage
|
||||
|
||||
ECStore provides erasure coding functionality for the RustFS project, supporting multiple Reed-Solomon implementations for optimal performance and compatibility.
|
||||
ECStore provides erasure coding functionality for the RustFS project, using high-performance Reed-Solomon SIMD implementation for optimal performance.
|
||||
|
||||
## Reed-Solomon Implementations
|
||||
## Reed-Solomon Implementation
|
||||
|
||||
### Available Backends
|
||||
### SIMD Backend (Only)
|
||||
|
||||
#### `reed-solomon-erasure` (Default)
|
||||
- **Stability**: Mature and well-tested implementation
|
||||
- **Performance**: Good performance with SIMD acceleration when available
|
||||
- **Compatibility**: Works with any shard size
|
||||
- **Memory**: Efficient memory usage
|
||||
- **Use case**: Recommended for production use
|
||||
|
||||
#### `reed-solomon-simd` (Optional)
|
||||
- **Performance**: Optimized SIMD implementation for maximum speed
|
||||
- **Limitations**: Has restrictions on shard sizes (must be >= 64 bytes typically)
|
||||
- **Memory**: May use more memory for small shards
|
||||
- **Use case**: Best for large data blocks where performance is critical
|
||||
|
||||
### Feature Flags
|
||||
|
||||
Configure the Reed-Solomon implementation using Cargo features:
|
||||
|
||||
```toml
|
||||
# Use default implementation (reed-solomon-erasure)
|
||||
ecstore = "0.0.1"
|
||||
|
||||
# Use SIMD implementation for maximum performance
|
||||
ecstore = { version = "0.0.1", features = ["reed-solomon-simd"], default-features = false }
|
||||
|
||||
# Use traditional implementation explicitly
|
||||
ecstore = { version = "0.0.1", features = ["reed-solomon-erasure"], default-features = false }
|
||||
```
|
||||
- **Performance**: Uses SIMD optimization for high-performance encoding/decoding
|
||||
- **Compatibility**: Works with any shard size through SIMD implementation
|
||||
- **Reliability**: High-performance SIMD implementation for large data processing
|
||||
- **Use case**: Optimized for maximum performance in large data processing scenarios
|
||||
|
||||
### Usage Example
|
||||
|
||||
@@ -68,42 +45,52 @@ assert_eq!(&recovered, data);
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
### When to use `reed-solomon-simd`
|
||||
- Large block sizes (>= 1KB recommended)
|
||||
- High-throughput scenarios
|
||||
- CPU-intensive workloads where encoding/decoding is the bottleneck
|
||||
|
||||
### When to use `reed-solomon-erasure`
|
||||
- Small block sizes
|
||||
- Memory-constrained environments
|
||||
- General-purpose usage
|
||||
- Production deployments requiring maximum stability
|
||||
### SIMD Implementation Benefits
|
||||
- **High Throughput**: Optimized for large block sizes (>= 1KB recommended)
|
||||
- **CPU Optimization**: Leverages modern CPU SIMD instructions
|
||||
- **Scalability**: Excellent performance for high-throughput scenarios
|
||||
|
||||
### Implementation Details
|
||||
|
||||
#### `reed-solomon-erasure`
|
||||
- **Instance Reuse**: The encoder instance is cached and reused across multiple operations
|
||||
- **Thread Safety**: Thread-safe with interior mutability
|
||||
- **Memory Efficiency**: Lower memory footprint for small data
|
||||
|
||||
#### `reed-solomon-simd`
|
||||
- **Instance Creation**: New encoder/decoder instances are created for each operation
|
||||
- **API Design**: The SIMD implementation's API is designed for single-use instances
|
||||
- **Performance Trade-off**: While instances are created per operation, the SIMD optimizations provide significant performance benefits for large data blocks
|
||||
- **Optimization**: Future versions may implement instance pooling if the underlying API supports reuse
|
||||
- **Instance Caching**: Encoder/decoder instances are cached and reused for optimal performance
|
||||
- **Thread Safety**: Thread-safe with RwLock-based caching
|
||||
- **SIMD Optimization**: Leverages CPU SIMD instructions for maximum performance
|
||||
- **Reset Capability**: Cached instances are reset for different parameters, avoiding unnecessary allocations
|
||||
|
||||
### Performance Tips
|
||||
|
||||
1. **Batch Operations**: When possible, batch multiple small operations into larger blocks
|
||||
2. **Block Size Optimization**: Use block sizes that are multiples of 64 bytes for SIMD implementations
|
||||
2. **Block Size Optimization**: Use block sizes that are multiples of 64 bytes for optimal SIMD performance
|
||||
3. **Memory Allocation**: Pre-allocate buffers when processing multiple blocks
|
||||
4. **Feature Selection**: Choose the appropriate feature based on your data size and performance requirements
|
||||
4. **Cache Warming**: Initial operations may be slower due to cache setup, subsequent operations benefit from caching
|
||||
|
||||
## Cross-Platform Compatibility
|
||||
|
||||
Both implementations support:
|
||||
- x86_64 with SIMD acceleration
|
||||
- aarch64 (ARM64) with optimizations
|
||||
The SIMD implementation supports:
|
||||
- x86_64 with advanced SIMD instructions (AVX2, SSE)
|
||||
- aarch64 (ARM64) with NEON SIMD optimizations
|
||||
- Other architectures with fallback implementations
|
||||
|
||||
The `reed-solomon-erasure` implementation provides better cross-platform compatibility and is recommended for most use cases.
|
||||
The implementation automatically selects the best available SIMD instructions for the target platform, providing optimal performance across different architectures.
|
||||
|
||||
## Testing and Benchmarking
|
||||
|
||||
Run performance benchmarks:
|
||||
```bash
|
||||
# Run erasure coding benchmarks
|
||||
cargo bench --bench erasure_benchmark
|
||||
|
||||
# Run comparison benchmarks
|
||||
cargo bench --bench comparison_benchmark
|
||||
|
||||
# Generate benchmark reports
|
||||
./run_benchmarks.sh
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
All operations return `Result` types with comprehensive error information:
|
||||
- Encoding errors: Invalid parameters, insufficient memory
|
||||
- Decoding errors: Too many missing shards, corrupted data
|
||||
- Configuration errors: Invalid shard counts, unsupported parameters
|
||||
@@ -12,7 +12,7 @@
|
||||
//! cargo bench --bench comparison_benchmark --features reed-solomon-simd
|
||||
//!
|
||||
//! # 测试强制 erasure-only 模式
|
||||
//! cargo bench --bench comparison_benchmark --features reed-solomon-erasure
|
||||
//! cargo bench --bench comparison_benchmark
|
||||
//!
|
||||
//! # 生成对比报告
|
||||
//! cargo bench --bench comparison_benchmark -- --save-baseline erasure
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Reed-Solomon erasure coding performance benchmarks.
|
||||
//!
|
||||
//! This benchmark compares the performance of different Reed-Solomon implementations:
|
||||
//! - Default (Pure erasure): Stable reed-solomon-erasure implementation
|
||||
//! - SIMD mode: High-performance reed-solomon-simd implementation
|
||||
//! - `reed-solomon-simd` feature: SIMD mode with optimized performance
|
||||
//!
|
||||
//! ## Running Benchmarks
|
||||
@@ -235,7 +235,7 @@ fn bench_decode_performance(c: &mut Criterion) {
|
||||
group.finish();
|
||||
|
||||
// 如果使用混合模式(默认),测试SIMD解码性能
|
||||
#[cfg(not(feature = "reed-solomon-erasure"))]
|
||||
|
||||
{
|
||||
let shard_size = calc_shard_size(config.data_size, config.data_shards);
|
||||
if shard_size >= 512 {
|
||||
|
||||
@@ -1,54 +1,48 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Reed-Solomon 实现性能比较脚本
|
||||
#
|
||||
# 这个脚本将运行不同的基准测试来比较SIMD模式和纯Erasure模式的性能
|
||||
#
|
||||
# 使用方法:
|
||||
# ./run_benchmarks.sh [quick|full|comparison]
|
||||
#
|
||||
# quick - 快速测试主要场景
|
||||
# full - 完整基准测试套件
|
||||
# comparison - 专门对比两种实现模式
|
||||
# Reed-Solomon SIMD 性能基准测试脚本
|
||||
# 使用高性能 SIMD 实现进行纠删码性能测试
|
||||
|
||||
set -e
|
||||
|
||||
# 颜色输出
|
||||
# ANSI 颜色码
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
BLUE='\033[0;34m'
|
||||
PURPLE='\033[0;35m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
# 输出带颜色的信息
|
||||
# 打印带颜色的消息
|
||||
print_info() {
|
||||
echo -e "${BLUE}[INFO]${NC} $1"
|
||||
echo -e "${BLUE}ℹ️ $1${NC}"
|
||||
}
|
||||
|
||||
print_success() {
|
||||
echo -e "${GREEN}[SUCCESS]${NC} $1"
|
||||
echo -e "${GREEN}✅ $1${NC}"
|
||||
}
|
||||
|
||||
print_warning() {
|
||||
echo -e "${YELLOW}[WARNING]${NC} $1"
|
||||
echo -e "${YELLOW}⚠️ $1${NC}"
|
||||
}
|
||||
|
||||
print_error() {
|
||||
echo -e "${RED}[ERROR]${NC} $1"
|
||||
echo -e "${RED}❌ $1${NC}"
|
||||
}
|
||||
|
||||
# 检查是否安装了必要工具
|
||||
# 检查系统要求
|
||||
check_requirements() {
|
||||
print_info "检查系统要求..."
|
||||
|
||||
# 检查 Rust
|
||||
if ! command -v cargo &> /dev/null; then
|
||||
print_error "cargo 未安装,请先安装 Rust 工具链"
|
||||
print_error "Cargo 未找到,请确保已安装 Rust"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 检查是否安装了 criterion
|
||||
if ! grep -q "criterion" Cargo.toml; then
|
||||
print_error "Cargo.toml 中未找到 criterion 依赖"
|
||||
# 检查 criterion
|
||||
if ! cargo --list | grep -q "bench"; then
|
||||
print_error "未找到基准测试支持,请确保使用的是支持基准测试的 Rust 版本"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -62,28 +56,15 @@ cleanup() {
|
||||
print_success "清理完成"
|
||||
}
|
||||
|
||||
# 运行纯 Erasure 模式基准测试
|
||||
run_erasure_benchmark() {
|
||||
print_info "🏛️ 开始运行纯 Erasure 模式基准测试..."
|
||||
echo "================================================"
|
||||
|
||||
cargo bench --bench comparison_benchmark \
|
||||
--features reed-solomon-erasure \
|
||||
-- --save-baseline erasure_baseline
|
||||
|
||||
print_success "纯 Erasure 模式基准测试完成"
|
||||
}
|
||||
|
||||
# 运行SIMD模式基准测试
|
||||
# 运行 SIMD 模式基准测试
|
||||
run_simd_benchmark() {
|
||||
print_info "🎯 开始运行SIMD模式基准测试..."
|
||||
print_info "🎯 开始运行 SIMD 模式基准测试..."
|
||||
echo "================================================"
|
||||
|
||||
cargo bench --bench comparison_benchmark \
|
||||
--features reed-solomon-simd \
|
||||
-- --save-baseline simd_baseline
|
||||
|
||||
print_success "SIMD模式基准测试完成"
|
||||
print_success "SIMD 模式基准测试完成"
|
||||
}
|
||||
|
||||
# 运行完整的基准测试套件
|
||||
@@ -91,33 +72,42 @@ run_full_benchmark() {
|
||||
print_info "🚀 开始运行完整基准测试套件..."
|
||||
echo "================================================"
|
||||
|
||||
# 运行详细的基准测试(使用默认纯Erasure模式)
|
||||
# 运行详细的基准测试
|
||||
cargo bench --bench erasure_benchmark
|
||||
|
||||
print_success "完整基准测试套件完成"
|
||||
}
|
||||
|
||||
# 运行性能对比测试
|
||||
run_comparison_benchmark() {
|
||||
print_info "📊 开始运行性能对比测试..."
|
||||
# 运行性能测试
|
||||
run_performance_test() {
|
||||
print_info "📊 开始运行性能测试..."
|
||||
echo "================================================"
|
||||
|
||||
print_info "步骤 1: 测试纯 Erasure 模式..."
|
||||
print_info "步骤 1: 运行编码基准测试..."
|
||||
cargo bench --bench comparison_benchmark \
|
||||
--features reed-solomon-erasure \
|
||||
-- --save-baseline erasure_baseline
|
||||
-- encode --save-baseline encode_baseline
|
||||
|
||||
print_info "步骤 2: 测试SIMD模式并与 Erasure 模式对比..."
|
||||
print_info "步骤 2: 运行解码基准测试..."
|
||||
cargo bench --bench comparison_benchmark \
|
||||
--features reed-solomon-simd \
|
||||
-- --baseline erasure_baseline
|
||||
-- decode --save-baseline decode_baseline
|
||||
|
||||
print_success "性能对比测试完成"
|
||||
print_success "性能测试完成"
|
||||
}
|
||||
|
||||
# 运行大数据集测试
|
||||
run_large_data_test() {
|
||||
print_info "🗂️ 开始运行大数据集测试..."
|
||||
echo "================================================"
|
||||
|
||||
cargo bench --bench erasure_benchmark \
|
||||
-- large_data --save-baseline large_data_baseline
|
||||
|
||||
print_success "大数据集测试完成"
|
||||
}
|
||||
|
||||
# 生成比较报告
|
||||
generate_comparison_report() {
|
||||
print_info "📊 生成性能比较报告..."
|
||||
print_info "📊 生成性能报告..."
|
||||
|
||||
if [ -d "target/criterion" ]; then
|
||||
print_info "基准测试结果已保存到 target/criterion/ 目录"
|
||||
@@ -138,49 +128,48 @@ generate_comparison_report() {
|
||||
run_quick_test() {
|
||||
print_info "🏃 运行快速性能测试..."
|
||||
|
||||
print_info "测试纯 Erasure 模式..."
|
||||
print_info "测试 SIMD 编码性能..."
|
||||
cargo bench --bench comparison_benchmark \
|
||||
--features reed-solomon-erasure \
|
||||
-- encode_comparison --quick
|
||||
-- encode --quick
|
||||
|
||||
print_info "测试SIMD模式..."
|
||||
print_info "测试 SIMD 解码性能..."
|
||||
cargo bench --bench comparison_benchmark \
|
||||
--features reed-solomon-simd \
|
||||
-- encode_comparison --quick
|
||||
-- decode --quick
|
||||
|
||||
print_success "快速测试完成"
|
||||
}
|
||||
|
||||
# 显示帮助信息
|
||||
show_help() {
|
||||
echo "Reed-Solomon 性能基准测试脚本"
|
||||
echo "Reed-Solomon SIMD 性能基准测试脚本"
|
||||
echo ""
|
||||
echo "实现模式:"
|
||||
echo " 🏛️ 纯 Erasure 模式(默认)- 稳定兼容的 reed-solomon-erasure 实现"
|
||||
echo " 🎯 SIMD模式 - 高性能SIMD优化实现"
|
||||
echo " 🎯 SIMD 模式 - 高性能 SIMD 优化的 reed-solomon-simd 实现"
|
||||
echo ""
|
||||
echo "使用方法:"
|
||||
echo " $0 [command]"
|
||||
echo ""
|
||||
echo "命令:"
|
||||
echo " quick 运行快速性能测试"
|
||||
echo " full 运行完整基准测试套件(默认Erasure模式)"
|
||||
echo " comparison 运行详细的实现模式对比测试"
|
||||
echo " erasure 只测试纯 Erasure 模式"
|
||||
echo " simd 只测试SIMD模式"
|
||||
echo " full 运行完整基准测试套件"
|
||||
echo " performance 运行详细的性能测试"
|
||||
echo " simd 运行 SIMD 模式测试"
|
||||
echo " large 运行大数据集测试"
|
||||
echo " clean 清理测试结果"
|
||||
echo " help 显示此帮助信息"
|
||||
echo ""
|
||||
echo "示例:"
|
||||
echo " $0 quick # 快速测试两种模式"
|
||||
echo " $0 comparison # 详细对比测试"
|
||||
echo " $0 full # 完整测试套件(默认Erasure模式)"
|
||||
echo " $0 simd # 只测试SIMD模式"
|
||||
echo " $0 erasure # 只测试纯 Erasure 模式"
|
||||
echo " $0 quick # 快速性能测试"
|
||||
echo " $0 performance # 详细性能测试"
|
||||
echo " $0 full # 完整测试套件"
|
||||
echo " $0 simd # SIMD 模式测试"
|
||||
echo " $0 large # 大数据集测试"
|
||||
echo ""
|
||||
echo "模式说明:"
|
||||
echo " Erasure模式: 使用reed-solomon-erasure实现,稳定可靠"
|
||||
echo " SIMD模式: 使用reed-solomon-simd实现,高性能优化"
|
||||
echo "实现特性:"
|
||||
echo " - 使用 reed-solomon-simd 高性能 SIMD 实现"
|
||||
echo " - 支持编码器/解码器实例缓存"
|
||||
echo " - 优化的内存管理和线程安全"
|
||||
echo " - 跨平台 SIMD 指令支持"
|
||||
}
|
||||
|
||||
# 显示测试配置信息
|
||||
@@ -196,22 +185,22 @@ show_test_info() {
|
||||
if [ -f "/proc/cpuinfo" ]; then
|
||||
echo " - CPU 型号: $(grep 'model name' /proc/cpuinfo | head -1 | cut -d: -f2 | xargs)"
|
||||
if grep -q "avx2" /proc/cpuinfo; then
|
||||
echo " - SIMD 支持: AVX2 ✅ (SIMD模式将利用SIMD优化)"
|
||||
echo " - SIMD 支持: AVX2 ✅ (将使用高级 SIMD 优化)"
|
||||
elif grep -q "sse4" /proc/cpuinfo; then
|
||||
echo " - SIMD 支持: SSE4 ✅ (SIMD模式将利用SIMD优化)"
|
||||
echo " - SIMD 支持: SSE4 ✅ (将使用 SIMD 优化)"
|
||||
else
|
||||
echo " - SIMD 支持: 未检测到高级 SIMD 特性"
|
||||
echo " - SIMD 支持: 基础 SIMD 特性"
|
||||
fi
|
||||
fi
|
||||
|
||||
echo " - 默认模式: 纯Erasure模式 (稳定可靠)"
|
||||
echo " - 高性能模式: SIMD模式 (性能优化)"
|
||||
echo " - 实现: reed-solomon-simd (高性能 SIMD 优化)"
|
||||
echo " - 特性: 实例缓存、线程安全、跨平台 SIMD"
|
||||
echo ""
|
||||
}
|
||||
|
||||
# 主函数
|
||||
main() {
|
||||
print_info "🧪 Reed-Solomon 实现性能基准测试"
|
||||
print_info "🧪 Reed-Solomon SIMD 实现性能基准测试"
|
||||
echo "================================================"
|
||||
|
||||
check_requirements
|
||||
@@ -227,14 +216,9 @@ main() {
|
||||
run_full_benchmark
|
||||
generate_comparison_report
|
||||
;;
|
||||
"comparison")
|
||||
"performance")
|
||||
cleanup
|
||||
run_comparison_benchmark
|
||||
generate_comparison_report
|
||||
;;
|
||||
"erasure")
|
||||
cleanup
|
||||
run_erasure_benchmark
|
||||
run_performance_test
|
||||
generate_comparison_report
|
||||
;;
|
||||
"simd")
|
||||
@@ -242,6 +226,11 @@ main() {
|
||||
run_simd_benchmark
|
||||
generate_comparison_report
|
||||
;;
|
||||
"large")
|
||||
cleanup
|
||||
run_large_data_test
|
||||
generate_comparison_report
|
||||
;;
|
||||
"clean")
|
||||
cleanup
|
||||
;;
|
||||
@@ -257,10 +246,7 @@ main() {
|
||||
esac
|
||||
|
||||
print_success "✨ 基准测试执行完成!"
|
||||
print_info "💡 提示: 推荐使用默认的纯Erasure模式,对于高性能需求可考虑SIMD模式"
|
||||
}
|
||||
|
||||
# 如果直接运行此脚本,调用主函数
|
||||
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
|
||||
main "$@"
|
||||
fi
|
||||
# 启动脚本
|
||||
main "$@"
|
||||
@@ -1,6 +1,7 @@
|
||||
#![allow(unused_variables)]
|
||||
#![allow(dead_code)]
|
||||
// use error::Error;
|
||||
use crate::StorageAPI;
|
||||
use crate::bucket::metadata_sys::get_replication_config;
|
||||
use crate::bucket::versioning_sys::BucketVersioningSys;
|
||||
use crate::error::Error;
|
||||
@@ -11,26 +12,25 @@ use crate::store_api::ObjectIO;
|
||||
use crate::store_api::ObjectInfo;
|
||||
use crate::store_api::ObjectOptions;
|
||||
use crate::store_api::ObjectToDelete;
|
||||
use crate::StorageAPI;
|
||||
use aws_sdk_s3::Client as S3Client;
|
||||
use aws_sdk_s3::Config;
|
||||
use aws_sdk_s3::config::BehaviorVersion;
|
||||
use aws_sdk_s3::config::Credentials;
|
||||
use aws_sdk_s3::config::Region;
|
||||
use aws_sdk_s3::Client as S3Client;
|
||||
use aws_sdk_s3::Config;
|
||||
use bytes::Bytes;
|
||||
use chrono::DateTime;
|
||||
use chrono::Duration;
|
||||
use chrono::Utc;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use http::HeaderMap;
|
||||
use http::Method;
|
||||
use lazy_static::lazy_static;
|
||||
// use std::time::SystemTime;
|
||||
use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
use rustfs_rsc::provider::StaticProvider;
|
||||
use rustfs_rsc::Minio;
|
||||
use rustfs_rsc::provider::StaticProvider;
|
||||
use s3s::dto::DeleteMarkerReplicationStatus;
|
||||
use s3s::dto::DeleteReplicationStatus;
|
||||
use s3s::dto::ExistingObjectReplicationStatus;
|
||||
@@ -43,14 +43,14 @@ use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
use std::iter::Iterator;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI32;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::vec;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::task;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use super::{storageclass, Config, GLOBAL_StorageClass};
|
||||
use super::{Config, GLOBAL_StorageClass, storageclass};
|
||||
use crate::disk::RUSTFS_META_BUCKET;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::store_api::{ObjectInfo, ObjectOptions, PutObjReader, StorageAPI};
|
||||
|
||||
@@ -5,7 +5,7 @@ pub mod storageclass;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::store::ECStore;
|
||||
use com::{lookup_configs, read_config_without_migrate, STORAGE_CLASS_SUB_SYS};
|
||||
use com::{STORAGE_CLASS_SUB_SYS, lookup_configs, read_config_without_migrate};
|
||||
use lazy_static::lazy_static;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -1,27 +1,15 @@
|
||||
//! Erasure coding implementation supporting multiple Reed-Solomon backends.
|
||||
//! Erasure coding implementation using Reed-Solomon SIMD backend.
|
||||
//!
|
||||
//! This module provides erasure coding functionality with support for two different
|
||||
//! Reed-Solomon implementations:
|
||||
//! This module provides erasure coding functionality with high-performance SIMD
|
||||
//! Reed-Solomon implementation:
|
||||
//!
|
||||
//! ## Reed-Solomon Implementations
|
||||
//! ## Reed-Solomon Implementation
|
||||
//!
|
||||
//! ### Pure Erasure Mode (Default)
|
||||
//! - **Stability**: Pure erasure implementation, mature and well-tested
|
||||
//! - **Performance**: Good performance with consistent behavior
|
||||
//! - **Compatibility**: Works with any shard size
|
||||
//! - **Use case**: Default behavior, recommended for most production use cases
|
||||
//!
|
||||
//! ### SIMD Mode (`reed-solomon-simd` feature)
|
||||
//! ### SIMD Mode (Only)
|
||||
//! - **Performance**: Uses SIMD optimization for high-performance encoding/decoding
|
||||
//! - **Compatibility**: Works with any shard size through SIMD implementation
|
||||
//! - **Reliability**: High-performance SIMD implementation for large data processing
|
||||
//! - **Use case**: Use when maximum performance is needed for large data processing
|
||||
//!
|
||||
//! ## Feature Flags
|
||||
//!
|
||||
//! - Default: Use pure reed-solomon-erasure implementation (stable and reliable)
|
||||
//! - `reed-solomon-simd`: Use SIMD mode for optimal performance
|
||||
//! - `reed-solomon-erasure`: Explicitly enable pure erasure mode (same as default)
|
||||
//! - **Use case**: Optimized for maximum performance in large data processing scenarios
|
||||
//!
|
||||
//! ## Example
|
||||
//!
|
||||
@@ -35,8 +23,6 @@
|
||||
//! ```
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use reed_solomon_erasure::galois_8::ReedSolomon as ReedSolomonErasure;
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
use reed_solomon_simd;
|
||||
use smallvec::SmallVec;
|
||||
use std::io;
|
||||
@@ -44,38 +30,23 @@ use tokio::io::AsyncRead;
|
||||
use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Reed-Solomon encoder variants supporting different implementations.
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub enum ReedSolomonEncoder {
|
||||
/// SIMD mode: High-performance SIMD implementation (when reed-solomon-simd feature is enabled)
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
SIMD {
|
||||
data_shards: usize,
|
||||
parity_shards: usize,
|
||||
// 使用RwLock确保线程安全,实现Send + Sync
|
||||
encoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonEncoder>>,
|
||||
decoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonDecoder>>,
|
||||
},
|
||||
/// Pure erasure mode: default and when reed-solomon-erasure feature is specified
|
||||
Erasure(Box<ReedSolomonErasure>),
|
||||
/// Reed-Solomon encoder using SIMD implementation.
|
||||
pub struct ReedSolomonEncoder {
|
||||
data_shards: usize,
|
||||
parity_shards: usize,
|
||||
// 使用RwLock确保线程安全,实现Send + Sync
|
||||
encoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonEncoder>>,
|
||||
decoder_cache: std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonDecoder>>,
|
||||
}
|
||||
|
||||
impl Clone for ReedSolomonEncoder {
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
ReedSolomonEncoder::SIMD {
|
||||
data_shards,
|
||||
parity_shards,
|
||||
..
|
||||
} => ReedSolomonEncoder::SIMD {
|
||||
data_shards: *data_shards,
|
||||
parity_shards: *parity_shards,
|
||||
// 为新实例创建空的缓存,不共享缓存
|
||||
encoder_cache: std::sync::RwLock::new(None),
|
||||
decoder_cache: std::sync::RwLock::new(None),
|
||||
},
|
||||
ReedSolomonEncoder::Erasure(encoder) => ReedSolomonEncoder::Erasure(encoder.clone()),
|
||||
Self {
|
||||
data_shards: self.data_shards,
|
||||
parity_shards: self.parity_shards,
|
||||
// 为新实例创建空的缓存,不共享缓存
|
||||
encoder_cache: std::sync::RwLock::new(None),
|
||||
decoder_cache: std::sync::RwLock::new(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -83,81 +54,50 @@ impl Clone for ReedSolomonEncoder {
|
||||
impl ReedSolomonEncoder {
|
||||
/// Create a new Reed-Solomon encoder with specified data and parity shards.
|
||||
pub fn new(data_shards: usize, parity_shards: usize) -> io::Result<Self> {
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
{
|
||||
// SIMD mode when reed-solomon-simd feature is enabled
|
||||
Ok(ReedSolomonEncoder::SIMD {
|
||||
data_shards,
|
||||
parity_shards,
|
||||
encoder_cache: std::sync::RwLock::new(None),
|
||||
decoder_cache: std::sync::RwLock::new(None),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "reed-solomon-simd"))]
|
||||
{
|
||||
// Pure erasure mode when reed-solomon-simd feature is not enabled (default or reed-solomon-erasure)
|
||||
let encoder = ReedSolomonErasure::new(data_shards, parity_shards)
|
||||
.map_err(|e| io::Error::other(format!("Failed to create erasure encoder: {:?}", e)))?;
|
||||
Ok(ReedSolomonEncoder::Erasure(Box::new(encoder)))
|
||||
}
|
||||
Ok(ReedSolomonEncoder {
|
||||
data_shards,
|
||||
parity_shards,
|
||||
encoder_cache: std::sync::RwLock::new(None),
|
||||
decoder_cache: std::sync::RwLock::new(None),
|
||||
})
|
||||
}
|
||||
|
||||
/// Encode data shards with parity.
|
||||
pub fn encode(&self, shards: SmallVec<[&mut [u8]; 16]>) -> io::Result<()> {
|
||||
match self {
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
ReedSolomonEncoder::SIMD {
|
||||
data_shards,
|
||||
parity_shards,
|
||||
encoder_cache,
|
||||
..
|
||||
} => {
|
||||
let mut shards_vec: Vec<&mut [u8]> = shards.into_vec();
|
||||
if shards_vec.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let mut shards_vec: Vec<&mut [u8]> = shards.into_vec();
|
||||
if shards_vec.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// 使用 SIMD 进行编码
|
||||
let simd_result = self.encode_with_simd(*data_shards, *parity_shards, encoder_cache, &mut shards_vec);
|
||||
// 使用 SIMD 进行编码
|
||||
let simd_result = self.encode_with_simd(&mut shards_vec);
|
||||
|
||||
match simd_result {
|
||||
Ok(()) => Ok(()),
|
||||
Err(simd_error) => {
|
||||
warn!("SIMD encoding failed: {}", simd_error);
|
||||
Err(simd_error)
|
||||
}
|
||||
}
|
||||
match simd_result {
|
||||
Ok(()) => Ok(()),
|
||||
Err(simd_error) => {
|
||||
warn!("SIMD encoding failed: {}", simd_error);
|
||||
Err(simd_error)
|
||||
}
|
||||
ReedSolomonEncoder::Erasure(encoder) => encoder
|
||||
.encode(shards)
|
||||
.map_err(|e| io::Error::other(format!("Erasure encode error: {:?}", e))),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
fn encode_with_simd(
|
||||
&self,
|
||||
data_shards: usize,
|
||||
parity_shards: usize,
|
||||
encoder_cache: &std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonEncoder>>,
|
||||
shards_vec: &mut [&mut [u8]],
|
||||
) -> io::Result<()> {
|
||||
fn encode_with_simd(&self, shards_vec: &mut [&mut [u8]]) -> io::Result<()> {
|
||||
let shard_len = shards_vec[0].len();
|
||||
|
||||
// 获取或创建encoder
|
||||
let mut encoder = {
|
||||
let mut cache_guard = encoder_cache
|
||||
let mut cache_guard = self
|
||||
.encoder_cache
|
||||
.write()
|
||||
.map_err(|_| io::Error::other("Failed to acquire encoder cache lock"))?;
|
||||
|
||||
match cache_guard.take() {
|
||||
Some(mut cached_encoder) => {
|
||||
// 使用reset方法重置现有encoder以适应新的参数
|
||||
if let Err(e) = cached_encoder.reset(data_shards, parity_shards, shard_len) {
|
||||
if let Err(e) = cached_encoder.reset(self.data_shards, self.parity_shards, shard_len) {
|
||||
warn!("Failed to reset SIMD encoder: {:?}, creating new one", e);
|
||||
// 如果reset失败,创建新的encoder
|
||||
reed_solomon_simd::ReedSolomonEncoder::new(data_shards, parity_shards, shard_len)
|
||||
reed_solomon_simd::ReedSolomonEncoder::new(self.data_shards, self.parity_shards, shard_len)
|
||||
.map_err(|e| io::Error::other(format!("Failed to create SIMD encoder: {:?}", e)))?
|
||||
} else {
|
||||
cached_encoder
|
||||
@@ -165,14 +105,14 @@ impl ReedSolomonEncoder {
|
||||
}
|
||||
None => {
|
||||
// 第一次使用,创建新encoder
|
||||
reed_solomon_simd::ReedSolomonEncoder::new(data_shards, parity_shards, shard_len)
|
||||
reed_solomon_simd::ReedSolomonEncoder::new(self.data_shards, self.parity_shards, shard_len)
|
||||
.map_err(|e| io::Error::other(format!("Failed to create SIMD encoder: {:?}", e)))?
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// 添加原始shards
|
||||
for (i, shard) in shards_vec.iter().enumerate().take(data_shards) {
|
||||
for (i, shard) in shards_vec.iter().enumerate().take(self.data_shards) {
|
||||
encoder
|
||||
.add_original_shard(shard)
|
||||
.map_err(|e| io::Error::other(format!("Failed to add shard {}: {:?}", i, e)))?;
|
||||
@@ -185,15 +125,16 @@ impl ReedSolomonEncoder {
|
||||
|
||||
// 将恢复shards复制到输出缓冲区
|
||||
for (i, recovery_shard) in result.recovery_iter().enumerate() {
|
||||
if i + data_shards < shards_vec.len() {
|
||||
shards_vec[i + data_shards].copy_from_slice(recovery_shard);
|
||||
if i + self.data_shards < shards_vec.len() {
|
||||
shards_vec[i + self.data_shards].copy_from_slice(recovery_shard);
|
||||
}
|
||||
}
|
||||
|
||||
// 将encoder放回缓存(在result被drop后encoder自动重置,可以重用)
|
||||
drop(result); // 显式drop result,确保encoder被重置
|
||||
|
||||
*encoder_cache
|
||||
*self
|
||||
.encoder_cache
|
||||
.write()
|
||||
.map_err(|_| io::Error::other("Failed to return encoder to cache"))? = Some(encoder);
|
||||
|
||||
@@ -202,39 +143,19 @@ impl ReedSolomonEncoder {
|
||||
|
||||
/// Reconstruct missing shards.
|
||||
pub fn reconstruct(&self, shards: &mut [Option<Vec<u8>>]) -> io::Result<()> {
|
||||
match self {
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
ReedSolomonEncoder::SIMD {
|
||||
data_shards,
|
||||
parity_shards,
|
||||
decoder_cache,
|
||||
..
|
||||
} => {
|
||||
// 使用 SIMD 进行重构
|
||||
let simd_result = self.reconstruct_with_simd(*data_shards, *parity_shards, decoder_cache, shards);
|
||||
// 使用 SIMD 进行重构
|
||||
let simd_result = self.reconstruct_with_simd(shards);
|
||||
|
||||
match simd_result {
|
||||
Ok(()) => Ok(()),
|
||||
Err(simd_error) => {
|
||||
warn!("SIMD reconstruction failed: {}", simd_error);
|
||||
Err(simd_error)
|
||||
}
|
||||
}
|
||||
match simd_result {
|
||||
Ok(()) => Ok(()),
|
||||
Err(simd_error) => {
|
||||
warn!("SIMD reconstruction failed: {}", simd_error);
|
||||
Err(simd_error)
|
||||
}
|
||||
ReedSolomonEncoder::Erasure(encoder) => encoder
|
||||
.reconstruct(shards)
|
||||
.map_err(|e| io::Error::other(format!("Erasure reconstruct error: {:?}", e))),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
fn reconstruct_with_simd(
|
||||
&self,
|
||||
data_shards: usize,
|
||||
parity_shards: usize,
|
||||
decoder_cache: &std::sync::RwLock<Option<reed_solomon_simd::ReedSolomonDecoder>>,
|
||||
shards: &mut [Option<Vec<u8>>],
|
||||
) -> io::Result<()> {
|
||||
fn reconstruct_with_simd(&self, shards: &mut [Option<Vec<u8>>]) -> io::Result<()> {
|
||||
// Find a valid shard to determine length
|
||||
let shard_len = shards
|
||||
.iter()
|
||||
@@ -243,17 +164,18 @@ impl ReedSolomonEncoder {
|
||||
|
||||
// 获取或创建decoder
|
||||
let mut decoder = {
|
||||
let mut cache_guard = decoder_cache
|
||||
let mut cache_guard = self
|
||||
.decoder_cache
|
||||
.write()
|
||||
.map_err(|_| io::Error::other("Failed to acquire decoder cache lock"))?;
|
||||
|
||||
match cache_guard.take() {
|
||||
Some(mut cached_decoder) => {
|
||||
// 使用reset方法重置现有decoder
|
||||
if let Err(e) = cached_decoder.reset(data_shards, parity_shards, shard_len) {
|
||||
if let Err(e) = cached_decoder.reset(self.data_shards, self.parity_shards, shard_len) {
|
||||
warn!("Failed to reset SIMD decoder: {:?}, creating new one", e);
|
||||
// 如果reset失败,创建新的decoder
|
||||
reed_solomon_simd::ReedSolomonDecoder::new(data_shards, parity_shards, shard_len)
|
||||
reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len)
|
||||
.map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {:?}", e)))?
|
||||
} else {
|
||||
cached_decoder
|
||||
@@ -261,7 +183,7 @@ impl ReedSolomonEncoder {
|
||||
}
|
||||
None => {
|
||||
// 第一次使用,创建新decoder
|
||||
reed_solomon_simd::ReedSolomonDecoder::new(data_shards, parity_shards, shard_len)
|
||||
reed_solomon_simd::ReedSolomonDecoder::new(self.data_shards, self.parity_shards, shard_len)
|
||||
.map_err(|e| io::Error::other(format!("Failed to create SIMD decoder: {:?}", e)))?
|
||||
}
|
||||
}
|
||||
@@ -270,12 +192,12 @@ impl ReedSolomonEncoder {
|
||||
// Add available shards (both data and parity)
|
||||
for (i, shard_opt) in shards.iter().enumerate() {
|
||||
if let Some(shard) = shard_opt {
|
||||
if i < data_shards {
|
||||
if i < self.data_shards {
|
||||
decoder
|
||||
.add_original_shard(i, shard)
|
||||
.map_err(|e| io::Error::other(format!("Failed to add original shard for reconstruction: {:?}", e)))?;
|
||||
} else {
|
||||
let recovery_idx = i - data_shards;
|
||||
let recovery_idx = i - self.data_shards;
|
||||
decoder
|
||||
.add_recovery_shard(recovery_idx, shard)
|
||||
.map_err(|e| io::Error::other(format!("Failed to add recovery shard for reconstruction: {:?}", e)))?;
|
||||
@@ -289,7 +211,7 @@ impl ReedSolomonEncoder {
|
||||
|
||||
// Fill in missing data shards from reconstruction result
|
||||
for (i, shard_opt) in shards.iter_mut().enumerate() {
|
||||
if shard_opt.is_none() && i < data_shards {
|
||||
if shard_opt.is_none() && i < self.data_shards {
|
||||
for (restored_index, restored_data) in result.restored_original_iter() {
|
||||
if restored_index == i {
|
||||
*shard_opt = Some(restored_data.to_vec());
|
||||
@@ -302,7 +224,8 @@ impl ReedSolomonEncoder {
|
||||
// 将decoder放回缓存(在result被drop后decoder自动重置,可以重用)
|
||||
drop(result); // 显式drop result,确保decoder被重置
|
||||
|
||||
*decoder_cache
|
||||
*self
|
||||
.decoder_cache
|
||||
.write()
|
||||
.map_err(|_| io::Error::other("Failed to return decoder to cache"))? = Some(decoder);
|
||||
|
||||
@@ -592,19 +515,10 @@ mod tests {
|
||||
fn test_encode_decode_roundtrip() {
|
||||
let data_shards = 4;
|
||||
let parity_shards = 2;
|
||||
|
||||
// Use different block sizes based on feature
|
||||
#[cfg(not(feature = "reed-solomon-simd"))]
|
||||
let block_size = 8; // Pure erasure mode (default)
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
let block_size = 1024; // SIMD mode - SIMD with fallback
|
||||
|
||||
let block_size = 1024; // SIMD mode
|
||||
let erasure = Erasure::new(data_shards, parity_shards, block_size);
|
||||
|
||||
// Use different test data based on feature
|
||||
#[cfg(not(feature = "reed-solomon-simd"))]
|
||||
let test_data = b"hello world".to_vec(); // Small data for erasure (default)
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
// Use sufficient test data for SIMD optimization
|
||||
let test_data = b"SIMD mode test data for encoding and decoding roundtrip verification with sufficient length to ensure shard size requirements are met for proper SIMD optimization.".repeat(20); // ~3KB for SIMD
|
||||
|
||||
let data = &test_data;
|
||||
@@ -632,13 +546,7 @@ mod tests {
|
||||
fn test_encode_decode_large_1m() {
|
||||
let data_shards = 4;
|
||||
let parity_shards = 2;
|
||||
|
||||
// Use different block sizes based on feature
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
let block_size = 512 * 3; // SIMD mode
|
||||
#[cfg(not(feature = "reed-solomon-simd"))]
|
||||
let block_size = 8192; // Pure erasure mode (default)
|
||||
|
||||
let erasure = Erasure::new(data_shards, parity_shards, block_size);
|
||||
|
||||
// Generate 1MB test data
|
||||
@@ -704,16 +612,10 @@ mod tests {
|
||||
|
||||
let data_shards = 4;
|
||||
let parity_shards = 2;
|
||||
|
||||
// Use different block sizes based on feature
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
let block_size = 1024; // SIMD mode
|
||||
#[cfg(not(feature = "reed-solomon-simd"))]
|
||||
let block_size = 8; // Pure erasure mode (default)
|
||||
|
||||
let erasure = Arc::new(Erasure::new(data_shards, parity_shards, block_size));
|
||||
|
||||
// Use test data suitable for both modes
|
||||
// Use test data suitable for SIMD mode
|
||||
let data =
|
||||
b"Async error test data with sufficient length to meet requirements for proper testing and validation.".repeat(20); // ~2KB
|
||||
|
||||
@@ -747,13 +649,7 @@ mod tests {
|
||||
|
||||
let data_shards = 4;
|
||||
let parity_shards = 2;
|
||||
|
||||
// Use different block sizes based on feature
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
let block_size = 1024; // SIMD mode
|
||||
#[cfg(not(feature = "reed-solomon-simd"))]
|
||||
let block_size = 8; // Pure erasure mode (default)
|
||||
|
||||
let erasure = Arc::new(Erasure::new(data_shards, parity_shards, block_size));
|
||||
|
||||
// Use test data that fits in exactly one block to avoid multi-block complexity
|
||||
@@ -761,8 +657,6 @@ mod tests {
|
||||
b"Channel async callback test data with sufficient length to ensure proper operation and validation requirements."
|
||||
.repeat(8); // ~1KB
|
||||
|
||||
// let data = b"callback".to_vec(); // 8 bytes to fit exactly in one 8-byte block
|
||||
|
||||
let data_clone = data.clone(); // Clone for later comparison
|
||||
let mut reader = Cursor::new(data);
|
||||
let (tx, mut rx) = mpsc::channel::<Vec<Bytes>>(8);
|
||||
@@ -801,8 +695,7 @@ mod tests {
|
||||
assert_eq!(&recovered, &data_clone);
|
||||
}
|
||||
|
||||
// Tests specifically for SIMD mode
|
||||
#[cfg(feature = "reed-solomon-simd")]
|
||||
// SIMD mode specific tests
|
||||
mod simd_tests {
|
||||
use super::*;
|
||||
|
||||
@@ -1171,47 +1064,4 @@ mod tests {
|
||||
assert_eq!(&recovered, &data_clone);
|
||||
}
|
||||
}
|
||||
|
||||
// Comparative tests between different implementations
|
||||
#[cfg(not(feature = "reed-solomon-simd"))]
|
||||
mod comparative_tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_implementation_consistency() {
|
||||
let data_shards = 4;
|
||||
let parity_shards = 2;
|
||||
let block_size = 2048; // Large enough for SIMD requirements
|
||||
|
||||
// Create test data that ensures each shard is >= 512 bytes (SIMD minimum)
|
||||
let test_data = b"This is test data for comparing reed-solomon-simd and reed-solomon-erasure implementations to ensure they produce consistent results when given the same input parameters and data. This data needs to be sufficiently large to meet SIMD requirements.";
|
||||
let data = test_data.repeat(50); // Create much larger data: ~13KB total, ~3.25KB per shard
|
||||
|
||||
// Test with erasure implementation (default)
|
||||
let erasure_erasure = Erasure::new(data_shards, parity_shards, block_size);
|
||||
let erasure_shards = erasure_erasure.encode_data(&data).unwrap();
|
||||
|
||||
// Test data integrity with erasure
|
||||
let mut erasure_shards_opt: Vec<Option<Vec<u8>>> = erasure_shards.iter().map(|shard| Some(shard.to_vec())).collect();
|
||||
|
||||
// Lose some shards
|
||||
erasure_shards_opt[1] = None; // Data shard
|
||||
erasure_shards_opt[4] = None; // Parity shard
|
||||
|
||||
erasure_erasure.decode_data(&mut erasure_shards_opt).unwrap();
|
||||
|
||||
let mut erasure_recovered = Vec::new();
|
||||
for shard in erasure_shards_opt.iter().take(data_shards) {
|
||||
erasure_recovered.extend_from_slice(shard.as_ref().unwrap());
|
||||
}
|
||||
erasure_recovered.truncate(data.len());
|
||||
|
||||
// Verify erasure implementation works correctly
|
||||
assert_eq!(&erasure_recovered, &data, "Erasure implementation failed to recover data correctly");
|
||||
|
||||
println!("✅ Both implementations are available and working correctly");
|
||||
println!("✅ Default (reed-solomon-erasure): Data recovery successful");
|
||||
println!("✅ SIMD tests are available as separate test suite");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use crate::bitrot::{create_bitrot_reader, create_bitrot_writer};
|
||||
use crate::disk::error_reduce::{reduce_read_quorum_errs, reduce_write_quorum_errs, OBJECT_OP_IGNORED_ERRS};
|
||||
use crate::disk::error_reduce::{OBJECT_OP_IGNORED_ERRS, reduce_read_quorum_errs, reduce_write_quorum_errs};
|
||||
use crate::disk::{
|
||||
self, conv_part_err_to_int, has_part_err, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT,
|
||||
CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS,
|
||||
self, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS,
|
||||
conv_part_err_to_int, has_part_err,
|
||||
};
|
||||
use crate::erasure_coding;
|
||||
use crate::erasure_coding::bitrot_verify;
|
||||
@@ -12,24 +12,24 @@ use crate::heal::data_usage_cache::DataUsageCache;
|
||||
use crate::heal::heal_ops::{HealEntryFn, HealSequence};
|
||||
use crate::store_api::ObjectToDelete;
|
||||
use crate::{
|
||||
cache_value::metacache_set::{list_path_raw, ListPathRawOptions},
|
||||
config::{storageclass, GLOBAL_StorageClass},
|
||||
cache_value::metacache_set::{ListPathRawOptions, list_path_raw},
|
||||
config::{GLOBAL_StorageClass, storageclass},
|
||||
disk::{
|
||||
endpoint::Endpoint, error::DiskError, format::FormatV3, new_disk, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo,
|
||||
DiskInfoOptions, DiskOption, DiskStore, FileInfoVersions, ReadMultipleReq, ReadMultipleResp,
|
||||
ReadOptions, UpdateMetadataOpts, RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET,
|
||||
CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, DiskInfoOptions, DiskOption, DiskStore, FileInfoVersions,
|
||||
RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET, ReadMultipleReq, ReadMultipleResp, ReadOptions,
|
||||
UpdateMetadataOpts, endpoint::Endpoint, error::DiskError, format::FormatV3, new_disk,
|
||||
},
|
||||
error::{to_object_err, StorageError},
|
||||
error::{StorageError, to_object_err},
|
||||
global::{
|
||||
get_global_deployment_id, is_dist_erasure, GLOBAL_BackgroundHealState, GLOBAL_LOCAL_DISK_MAP,
|
||||
GLOBAL_LOCAL_DISK_SET_DRIVES,
|
||||
GLOBAL_BackgroundHealState, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES, get_global_deployment_id,
|
||||
is_dist_erasure,
|
||||
},
|
||||
heal::{
|
||||
data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT},
|
||||
data_usage_cache::{DataUsageCacheInfo, DataUsageEntry, DataUsageEntryInfo},
|
||||
heal_commands::{
|
||||
HealOpts, HealScanMode, HealingTracker, DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE,
|
||||
DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT, HEAL_NORMAL_SCAN,
|
||||
DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT,
|
||||
HEAL_NORMAL_SCAN, HealOpts, HealScanMode, HealingTracker,
|
||||
},
|
||||
heal_ops::BG_HEALING_UUID,
|
||||
},
|
||||
@@ -42,7 +42,7 @@ use crate::{
|
||||
};
|
||||
use crate::{disk::STORAGE_FORMAT_FILE, heal::mrf::PartialOperation};
|
||||
use crate::{
|
||||
heal::data_scanner::{globalHealConfig, HEAL_DELETE_DANGLING},
|
||||
heal::data_scanner::{HEAL_DELETE_DANGLING, globalHealConfig},
|
||||
store_api::ListObjectVersionsInfo,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
@@ -51,22 +51,22 @@ use chrono::Utc;
|
||||
use futures::future::join_all;
|
||||
use glob::Pattern;
|
||||
use http::HeaderMap;
|
||||
use lock::{namespace_lock::NsLockMap, LockApi};
|
||||
use lock::{LockApi, namespace_lock::NsLockMap};
|
||||
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
|
||||
use md5::{Digest as Md5Digest, Md5};
|
||||
use rand::{seq::SliceRandom, Rng};
|
||||
use rand::{Rng, seq::SliceRandom};
|
||||
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
|
||||
use rustfs_filemeta::{
|
||||
file_info_from_raw, headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS}, merge_file_meta_versions, FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries,
|
||||
MetaCacheEntry, MetadataResolutionParams,
|
||||
ObjectPartInfo,
|
||||
RawFileInfo,
|
||||
FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, ObjectPartInfo,
|
||||
RawFileInfo, file_info_from_raw,
|
||||
headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS},
|
||||
merge_file_meta_versions,
|
||||
};
|
||||
use rustfs_rio::{EtagResolvable, HashReader, TryGetIndex as _, WarpReader};
|
||||
use rustfs_utils::{
|
||||
crypto::{base64_decode, base64_encode, hex},
|
||||
path::{encode_dir_object, has_suffix, path_join_buf, SLASH_SEPARATOR},
|
||||
HashAlgorithm,
|
||||
crypto::{base64_decode, base64_encode, hex},
|
||||
path::{SLASH_SEPARATOR, encode_dir_object, has_suffix, path_join_buf},
|
||||
};
|
||||
use sha2::Sha256;
|
||||
use std::hash::Hash;
|
||||
@@ -82,7 +82,7 @@ use std::{
|
||||
use time::OffsetDateTime;
|
||||
use tokio::{
|
||||
io::AsyncWrite,
|
||||
sync::{broadcast, RwLock},
|
||||
sync::{RwLock, broadcast},
|
||||
};
|
||||
use tokio::{
|
||||
select,
|
||||
@@ -5837,9 +5837,9 @@ fn get_complete_multipart_md5(parts: &[CompletePart]) -> String {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::disk::error::DiskError;
|
||||
use crate::disk::CHECK_PART_UNKNOWN;
|
||||
use crate::disk::CHECK_PART_VOLUME_NOT_FOUND;
|
||||
use crate::disk::error::DiskError;
|
||||
use crate::store_api::CompletePart;
|
||||
use rustfs_filemeta::ErasureInfo;
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -8,10 +8,10 @@ use crate::{disk::DiskStore, heal::heal_commands::HealOpts};
|
||||
use http::{HeaderMap, HeaderValue};
|
||||
use madmin::heal_commands::HealResultItem;
|
||||
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
|
||||
use rustfs_filemeta::{headers::AMZ_OBJECT_TAGGING, FileInfo, MetaCacheEntriesSorted, ObjectPartInfo};
|
||||
use rustfs_filemeta::{FileInfo, MetaCacheEntriesSorted, ObjectPartInfo, headers::AMZ_OBJECT_TAGGING};
|
||||
use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader};
|
||||
use rustfs_utils::path::decode_dir_object;
|
||||
use rustfs_utils::CompressionAlgorithm;
|
||||
use rustfs_utils::path::decode_dir_object;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
|
||||
@@ -1,24 +1,24 @@
|
||||
use crate::StorageAPI;
|
||||
use crate::bucket::metadata_sys::get_versioning_config;
|
||||
use crate::bucket::versioning::VersioningApi;
|
||||
use crate::cache_value::metacache_set::{list_path_raw, ListPathRawOptions};
|
||||
use crate::cache_value::metacache_set::{ListPathRawOptions, list_path_raw};
|
||||
use crate::disk::error::DiskError;
|
||||
use crate::disk::{DiskInfo, DiskStore};
|
||||
use crate::error::{
|
||||
is_all_not_found, is_all_volume_not_found, is_err_bucket_not_found, to_object_err, Error, Result, StorageError,
|
||||
Error, Result, StorageError, is_all_not_found, is_all_volume_not_found, is_err_bucket_not_found, to_object_err,
|
||||
};
|
||||
use crate::set_disk::SetDisks;
|
||||
use crate::store::check_list_objs_args;
|
||||
use crate::store_api::{ListObjectVersionsInfo, ListObjectsInfo, ObjectInfo, ObjectOptions};
|
||||
use crate::store_utils::is_reserved_or_invalid_bucket;
|
||||
use crate::StorageAPI;
|
||||
use crate::{store::ECStore, store_api::ListObjectsV2Info};
|
||||
use futures::future::join_all;
|
||||
use rand::seq::SliceRandom;
|
||||
use rustfs_filemeta::{
|
||||
merge_file_meta_versions, FileInfo, MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntriesSortedResult, MetaCacheEntry,
|
||||
MetadataResolutionParams,
|
||||
FileInfo, MetaCacheEntries, MetaCacheEntriesSorted, MetaCacheEntriesSortedResult, MetaCacheEntry, MetadataResolutionParams,
|
||||
merge_file_meta_versions,
|
||||
};
|
||||
use rustfs_utils::path::{self, base_dir_from_prefix, SLASH_SEPARATOR};
|
||||
use rustfs_utils::path::{self, SLASH_SEPARATOR, base_dir_from_prefix};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::broadcast::{self, Receiver as B_Receiver};
|
||||
|
||||
@@ -1,21 +1,21 @@
|
||||
use crate::error::{is_err_config_not_found, Error, Result};
|
||||
use crate::error::{Error, Result, is_err_config_not_found};
|
||||
use crate::{
|
||||
cache::{Cache, CacheEntity},
|
||||
error::{is_err_no_such_group, is_err_no_such_policy, is_err_no_such_user, Error as IamError},
|
||||
store::{object::IAM_CONFIG_PREFIX, GroupInfo, MappedPolicy, Store, UserType},
|
||||
error::{Error as IamError, is_err_no_such_group, is_err_no_such_policy, is_err_no_such_user},
|
||||
store::{GroupInfo, MappedPolicy, Store, UserType, object::IAM_CONFIG_PREFIX},
|
||||
sys::{
|
||||
UpdateServiceAccountOpts, MAX_SVCSESSION_POLICY_SIZE, SESSION_POLICY_NAME, SESSION_POLICY_NAME_EXTRACTED, STATUS_DISABLED,
|
||||
STATUS_ENABLED,
|
||||
MAX_SVCSESSION_POLICY_SIZE, SESSION_POLICY_NAME, SESSION_POLICY_NAME_EXTRACTED, STATUS_DISABLED, STATUS_ENABLED,
|
||||
UpdateServiceAccountOpts,
|
||||
},
|
||||
};
|
||||
use ecstore::global::get_global_action_cred;
|
||||
use madmin::{AccountStatus, AddOrUpdateUserReq, GroupDesc};
|
||||
use policy::{
|
||||
arn::ARN,
|
||||
auth::{self, get_claims_from_token_with_secret, is_secret_key_valid, jwt_sign, Credentials, UserIdentity},
|
||||
auth::{self, Credentials, UserIdentity, get_claims_from_token_with_secret, is_secret_key_valid, jwt_sign},
|
||||
format::Format,
|
||||
policy::{
|
||||
default::DEFAULT_POLICIES, iam_policy_claim_name_sa, Policy, PolicyDoc, EMBEDDED_POLICY_TYPE, INHERITED_POLICY_TYPE,
|
||||
EMBEDDED_POLICY_TYPE, INHERITED_POLICY_TYPE, Policy, PolicyDoc, default::DEFAULT_POLICIES, iam_policy_claim_name_sa,
|
||||
},
|
||||
};
|
||||
use rustfs_utils::crypto::base64_encode;
|
||||
@@ -25,8 +25,8 @@ use serde_json::Value;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicI64, Ordering},
|
||||
Arc,
|
||||
atomic::{AtomicBool, AtomicI64, Ordering},
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
@@ -10,12 +10,12 @@ use http::StatusCode;
|
||||
use hyper::Method;
|
||||
use matchit::Params;
|
||||
use rustfs_utils::net::bytes_stream;
|
||||
use s3s::dto::StreamingBlob;
|
||||
use s3s::s3_error;
|
||||
use s3s::Body;
|
||||
use s3s::S3Request;
|
||||
use s3s::S3Response;
|
||||
use s3s::S3Result;
|
||||
use s3s::dto::StreamingBlob;
|
||||
use s3s::s3_error;
|
||||
use serde_urlencoded::from_bytes;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
@@ -12,9 +12,9 @@ mod service;
|
||||
mod storage;
|
||||
|
||||
use crate::auth::IAMAuth;
|
||||
use crate::console::{init_console_cfg, CONSOLE_CONFIG};
|
||||
use crate::console::{CONSOLE_CONFIG, init_console_cfg};
|
||||
// Ensure the correct path for parse_license is imported
|
||||
use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT};
|
||||
use crate::server::{SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, wait_for_shutdown};
|
||||
use bytes::Bytes;
|
||||
use chrono::Datelike;
|
||||
use clap::Parser;
|
||||
@@ -22,6 +22,7 @@ use common::{
|
||||
// error::{Error, Result},
|
||||
globals::set_global_addr,
|
||||
};
|
||||
use ecstore::StorageAPI;
|
||||
use ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
|
||||
use ecstore::cmd::bucket_replication::init_bucket_replication_pool;
|
||||
use ecstore::config as ecconfig;
|
||||
@@ -29,12 +30,11 @@ use ecstore::config::GLOBAL_ConfigSys;
|
||||
use ecstore::heal::background_heal_ops::init_auto_heal;
|
||||
use ecstore::rpc::make_server;
|
||||
use ecstore::store_api::BucketOptions;
|
||||
use ecstore::StorageAPI;
|
||||
use ecstore::{
|
||||
endpoints::EndpointServerPools,
|
||||
heal::data_scanner::init_data_scanner,
|
||||
set_global_endpoints,
|
||||
store::{init_local_disks, ECStore},
|
||||
store::{ECStore, init_local_disks},
|
||||
update_erasure_type,
|
||||
};
|
||||
use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys};
|
||||
@@ -49,7 +49,7 @@ use iam::init_iam_sys;
|
||||
use license::init_license;
|
||||
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
|
||||
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
|
||||
use rustfs_obs::{init_obs, set_global_guard, SystemObserver};
|
||||
use rustfs_obs::{SystemObserver, init_obs, set_global_guard};
|
||||
use rustfs_utils::net::parse_and_resolve_address;
|
||||
use rustls::ServerConfig;
|
||||
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
|
||||
@@ -60,13 +60,13 @@ use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
use tokio::signal::unix::{SignalKind, signal};
|
||||
use tokio_rustls::TlsAcceptor;
|
||||
use tonic::{metadata::MetadataValue, Request, Status};
|
||||
use tonic::{Request, Status, metadata::MetadataValue};
|
||||
use tower_http::cors::CorsLayer;
|
||||
use tower_http::trace::TraceLayer;
|
||||
use tracing::{Span, instrument};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing::{instrument, Span};
|
||||
|
||||
const MI_B: usize = 1024 * 1024;
|
||||
|
||||
|
||||
@@ -16,8 +16,8 @@ use bytes::Bytes;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
|
||||
use datafusion::arrow::json::writer::JsonArray;
|
||||
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
|
||||
use datafusion::arrow::json::writer::JsonArray;
|
||||
use ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG;
|
||||
use ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG;
|
||||
use ecstore::bucket::metadata::BUCKET_POLICY_CONFIG;
|
||||
@@ -32,13 +32,13 @@ use ecstore::bucket::tagging::decode_tags;
|
||||
use ecstore::bucket::tagging::encode_tags;
|
||||
use ecstore::bucket::utils::serialize;
|
||||
use ecstore::bucket::versioning_sys::BucketVersioningSys;
|
||||
use ecstore::cmd::bucket_replication::ReplicationStatusType;
|
||||
use ecstore::cmd::bucket_replication::ReplicationType;
|
||||
use ecstore::cmd::bucket_replication::get_must_replicate_options;
|
||||
use ecstore::cmd::bucket_replication::must_replicate;
|
||||
use ecstore::cmd::bucket_replication::schedule_replication;
|
||||
use ecstore::cmd::bucket_replication::ReplicationStatusType;
|
||||
use ecstore::cmd::bucket_replication::ReplicationType;
|
||||
use ecstore::compress::is_compressible;
|
||||
use ecstore::compress::MIN_COMPRESSIBLE_SIZE;
|
||||
use ecstore::compress::is_compressible;
|
||||
use ecstore::error::StorageError;
|
||||
use ecstore::new_object_layer_fn;
|
||||
use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE;
|
||||
@@ -58,11 +58,11 @@ use futures::StreamExt;
|
||||
use http::HeaderMap;
|
||||
use lazy_static::lazy_static;
|
||||
use policy::auth;
|
||||
use policy::policy::action::Action;
|
||||
use policy::policy::action::S3Action;
|
||||
use policy::policy::BucketPolicy;
|
||||
use policy::policy::BucketPolicyArgs;
|
||||
use policy::policy::Validator;
|
||||
use policy::policy::action::Action;
|
||||
use policy::policy::action::S3Action;
|
||||
use query::instance::make_rustfsms;
|
||||
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
|
||||
use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
|
||||
@@ -70,23 +70,23 @@ use rustfs_rio::CompressReader;
|
||||
use rustfs_rio::HashReader;
|
||||
use rustfs_rio::Reader;
|
||||
use rustfs_rio::WarpReader;
|
||||
use rustfs_utils::path::path_join_buf;
|
||||
use rustfs_utils::CompressionAlgorithm;
|
||||
use rustfs_utils::path::path_join_buf;
|
||||
use rustfs_zip::CompressionFormat;
|
||||
use s3s::dto::*;
|
||||
use s3s::s3_error;
|
||||
use s3s::S3;
|
||||
use s3s::S3Error;
|
||||
use s3s::S3ErrorCode;
|
||||
use s3s::S3Result;
|
||||
use s3s::S3;
|
||||
use s3s::dto::*;
|
||||
use s3s::s3_error;
|
||||
use s3s::{S3Request, S3Response};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
use time::OffsetDateTime;
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_tar::Archive;
|
||||
|
||||
Reference in New Issue
Block a user