diff --git a/.gitignore b/.gitignore index 677845ec..b93bd2a0 100644 --- a/.gitignore +++ b/.gitignore @@ -17,5 +17,4 @@ deploy/certs/* .rustfs.sys .cargo profile.json -.docker/openobserve-otel/data -rustfs \ No newline at end of file +.docker/openobserve-otel/data \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 5ccf3d7c..08b92977 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -506,16 +506,6 @@ dependencies = [ "zbus 5.7.1", ] -[[package]] -name = "assert-json-diff" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" -dependencies = [ - "serde", - "serde_json", -] - [[package]] name = "async-broadcast" version = "0.7.2" @@ -1839,15 +1829,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" -[[package]] -name = "colored" -version = "3.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e" -dependencies = [ - "windows-sys 0.59.0", -] - [[package]] name = "combine" version = "4.6.7" @@ -5989,30 +5970,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "mockito" -version = "1.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7760e0e418d9b7e5777c0374009ca4c93861b9066f18cb334a20ce50ab63aa48" -dependencies = [ - "assert-json-diff", - "bytes", - "colored", - "futures-util", - "http 1.3.1", - "http-body 1.0.1", - "http-body-util", - "hyper 1.6.0", - "hyper-util", - "log", - "rand 0.9.1", - "regex", - "serde_json", - "serde_urlencoded", - "similar", - "tokio", -] - [[package]] name = "muda" version = "0.11.5" @@ -8431,13 +8388,12 @@ dependencies = [ "chrono", "const-str", "ecstore", - "libc", - "mockito", "once_cell", "quick-xml", "reqwest", "rumqttc", "rustfs-config", + "rustfs-utils", "serde", "serde_json", "snap", @@ -9288,12 +9244,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" -[[package]] -name = "similar" -version = "2.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" - [[package]] name = "simple_asn1" version = "0.6.3" diff --git a/Cargo.toml b/Cargo.toml index 1a931d99..a888d171 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,14 +121,12 @@ keyring = { version = "3.6.2", features = [ "sync-secret-service", ] } lazy_static = "1.5.0" -libc = "0.2.174" libsystemd = { version = "0.7.2" } local-ip-address = "0.6.5" matchit = "0.8.4" md-5 = "0.10.6" mime = "0.3.17" mime_guess = "2.0.5" -mockito = "1.7.0" netif = "0.1.6" nix = { version = "0.30.1", features = ["fs"] } nu-ansi-term = "0.50.1" @@ -171,7 +169,7 @@ rdkafka = { version = "0.37.0", features = ["tokio"] } reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] } reed-solomon-simd = { version = "3.0.0" } regex = { version = "1.11.1" } -reqwest = { version = "0.12.19", default-features = false, features = [ +reqwest = { version = "0.12.20", default-features = false, features = [ "rustls-tls", "charset", "http2", diff --git a/crates/filemeta/src/filemeta.rs b/crates/filemeta/src/filemeta.rs index a4ead1ab..308991ad 100644 --- a/crates/filemeta/src/filemeta.rs +++ b/crates/filemeta/src/filemeta.rs @@ -10,11 +10,10 @@ use bytes::Bytes; use rmp::Marker; use serde::{Deserialize, Serialize}; use std::cmp::Ordering; +use std::convert::TryFrom; use std::hash::Hasher; use std::io::{Read, Write}; use std::{collections::HashMap, io::Cursor}; -use std::convert::TryFrom; -use std::fs::File; use time::OffsetDateTime; use tokio::io::AsyncRead; use uuid::Uuid; @@ -518,11 +517,7 @@ impl FileMeta { let has_vid = { if !version_id.is_empty() { let id = Uuid::parse_str(version_id)?; - if !id.is_nil() { - Some(id) - } else { - None - } + if !id.is_nil() { Some(id) } else { None } } else { None } @@ -1253,11 +1248,7 @@ impl FileMetaVersionHeader { cur.read_exact(&mut buf)?; self.version_id = { let id = Uuid::from_bytes(buf); - if id.is_nil() { - None - } else { - Some(id) - } + if id.is_nil() { None } else { Some(id) } }; // mod_time @@ -1431,11 +1422,7 @@ impl MetaObject { cur.read_exact(&mut buf)?; self.version_id = { let id = Uuid::from_bytes(buf); - if id.is_nil() { - None - } else { - Some(id) - } + if id.is_nil() { None } else { Some(id) } }; } "DDir" => { @@ -1444,11 +1431,7 @@ impl MetaObject { cur.read_exact(&mut buf)?; self.data_dir = { let id = Uuid::from_bytes(buf); - if id.is_nil() { - None - } else { - Some(id) - } + if id.is_nil() { None } else { Some(id) } }; } "EcAlgo" => { @@ -2017,11 +2000,7 @@ impl MetaDeleteMarker { cur.read_exact(&mut buf)?; self.version_id = { let id = Uuid::from_bytes(buf); - if id.is_nil() { - None - } else { - Some(id) - } + if id.is_nil() { None } else { Some(id) } }; } diff --git a/crates/notify/Cargo.toml b/crates/notify/Cargo.toml index fcc3e926..d9e07e0b 100644 --- a/crates/notify/Cargo.toml +++ b/crates/notify/Cargo.toml @@ -12,7 +12,6 @@ async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } const-str = { workspace = true } ecstore = { workspace = true } -libc = { workspace = true } once_cell = { workspace = true } quick-xml = { workspace = true, features = ["serialize", "async-tokio"] } reqwest = { workspace = true } @@ -31,9 +30,9 @@ wildmatch = { workspace = true, features = ["serde"] } [dev-dependencies] tokio = { workspace = true, features = ["test-util"] } -mockito = "1.7" reqwest = { workspace = true, default-features = false, features = ["rustls-tls", "charset", "http2", "system-proxy", "stream", "json", "blocking"] } axum = { workspace = true } +rustfs-utils = { workspace = true, features = ["path"] } [lints] workspace = true diff --git a/crates/notify/examples/full_demo.rs b/crates/notify/examples/full_demo.rs index 17151d15..047ccfea 100644 --- a/crates/notify/examples/full_demo.rs +++ b/crates/notify/examples/full_demo.rs @@ -1,8 +1,6 @@ -use notify::arn::TargetID; -use notify::global::notification_system; -use notify::{ - init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, -}; +use rustfs_notify::arn::TargetID; +use rustfs_notify::global::notification_system; +use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError}; use std::time::Duration; use tracing::info; @@ -12,18 +10,24 @@ async fn main() -> Result<(), NotificationError> { let system = notification_system(); - // --- 初始配置 (Webhook 和 MQTT) --- - let mut config = notify::Config::new(); - + // --- Initial configuration (Webhook and MQTT) --- + let mut config = rustfs_notify::Config::new(); + let current_root = rustfs_utils::dirs::get_project_root().expect("failed to get project root"); + println!("Current project root: {}", current_root.display()); // Webhook target configuration - let mut webhook_kvs = notify::KVS::new(); + let mut webhook_kvs = rustfs_notify::KVS::new(); webhook_kvs.set("enable", "on"); webhook_kvs.set("endpoint", "http://127.0.0.1:3020/webhook"); webhook_kvs.set("auth_token", "secret-token"); // webhook_kvs.set("queue_dir", "/tmp/data/webhook"); webhook_kvs.set( "queue_dir", - "/Users/qun/Documents/rust/rustfs/notify/logs/webhook", + current_root + .clone() + .join("../../deploy/logs/notify/webhook") + .to_str() + .unwrap() + .to_string(), ); webhook_kvs.set("queue_limit", "10000"); let mut webhook_targets = std::collections::HashMap::new(); @@ -31,7 +35,7 @@ async fn main() -> Result<(), NotificationError> { config.insert("notify_webhook".to_string(), webhook_targets); // MQTT target configuration - let mut mqtt_kvs = notify::KVS::new(); + let mut mqtt_kvs = rustfs_notify::KVS::new(); mqtt_kvs.set("enable", "on"); mqtt_kvs.set("broker", "mqtt://localhost:1883"); mqtt_kvs.set("topic", "rustfs/events"); @@ -41,7 +45,11 @@ async fn main() -> Result<(), NotificationError> { // webhook_kvs.set("queue_dir", "/tmp/data/mqtt"); mqtt_kvs.set( "queue_dir", - "/Users/qun/Documents/rust/rustfs/notify/logs/mqtt", + current_root + .join("../../deploy/logs/notify/mqtt") + .to_str() + .unwrap() + .to_string(), ); mqtt_kvs.set("queue_limit", "10000"); @@ -49,35 +57,32 @@ async fn main() -> Result<(), NotificationError> { mqtt_targets.insert("1".to_string(), mqtt_kvs); config.insert("notify_mqtt".to_string(), mqtt_targets); - // 加载配置并初始化系统 + // Load the configuration and initialize the system *system.config.write().await = config; system.init().await?; info!("✅ System initialized with Webhook and MQTT targets."); - // --- 1. 查询当前活动的 Target --- + // --- Query the currently active Target --- let active_targets = system.get_active_targets().await; info!("\n---> Currently active targets: {:?}", active_targets); assert_eq!(active_targets.len(), 2); tokio::time::sleep(Duration::from_secs(1)).await; - // --- 2. 精确删除一个 Target (例如 MQTT) --- + // --- Exactly delete a Target (e.g. MQTT) --- info!("\n---> Removing MQTT target..."); let mqtt_target_id = TargetID::new("1".to_string(), "mqtt".to_string()); system.remove_target(&mqtt_target_id, "notify_mqtt").await?; info!("✅ MQTT target removed."); - // --- 3. 再次查询活动的 Target --- + // --- Query the activity's Target again --- let active_targets_after_removal = system.get_active_targets().await; - info!( - "\n---> Active targets after removal: {:?}", - active_targets_after_removal - ); + info!("\n---> Active targets after removal: {:?}", active_targets_after_removal); assert_eq!(active_targets_after_removal.len(), 1); assert_eq!(active_targets_after_removal[0].id, "1".to_string()); - // --- 4. 发送事件进行验证 --- - // 配置一个规则,指向 Webhook 和已删除的 MQTT + // --- Send events for verification --- + // Configure a rule to point to the Webhook and deleted MQTT let mut bucket_config = BucketNotificationConfig::new("us-east-1"); bucket_config.add_rule( &[EventName::ObjectCreatedPut], @@ -87,20 +92,16 @@ async fn main() -> Result<(), NotificationError> { bucket_config.add_rule( &[EventName::ObjectCreatedPut], "*".to_string(), - TargetID::new("1".to_string(), "mqtt".to_string()), // 这个规则会匹配,但找不到 Target + TargetID::new("1".to_string(), "mqtt".to_string()), // This rule will match, but the Target cannot be found ); - system - .load_bucket_notification_config("my-bucket", &bucket_config) - .await?; + system.load_bucket_notification_config("my-bucket", &bucket_config).await?; info!("\n---> Sending an event..."); let event = Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut); system .send_event("my-bucket", "s3:ObjectCreated:Put", "document.pdf", event) .await; - info!( - "✅ Event sent. Only the Webhook target should receive it. Check logs for warnings about the missing MQTT target." - ); + info!("✅ Event sent. Only the Webhook target should receive it. Check logs for warnings about the missing MQTT target."); tokio::time::sleep(Duration::from_secs(2)).await; diff --git a/crates/notify/examples/full_demo_one.rs b/crates/notify/examples/full_demo_one.rs index 51d07b69..4e22fbe1 100644 --- a/crates/notify/examples/full_demo_one.rs +++ b/crates/notify/examples/full_demo_one.rs @@ -1,9 +1,8 @@ -use notify::arn::TargetID; -use notify::global::notification_system; -// 1. 使用全局访问器 -use notify::{ - init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, KVS, -}; +// Using Global Accessories +use rustfs_config::notify; +use rustfs_notify::arn::TargetID; +use rustfs_notify::global::notification_system; +use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, KVS}; use std::time::Duration; use tracing::info; @@ -11,11 +10,12 @@ use tracing::info; async fn main() -> Result<(), NotificationError> { init_logger(LogLevel::Debug); - // 获取全局 NotificationSystem 实例 + // Get global NotificationSystem instance let system = notification_system(); - // --- 初始配置 --- - let mut config = notify::Config::new(); + // --- Initial configuration --- + let mut config = rustfs_notify::Config::new(); + let current_root = rustfs_utils::dirs::get_project_root().expect("failed to get project root"); // Webhook target let mut webhook_kvs = KVS::new(); webhook_kvs.set("enable", "on"); @@ -23,20 +23,25 @@ async fn main() -> Result<(), NotificationError> { // webhook_kvs.set("queue_dir", "./logs/webhook"); webhook_kvs.set( "queue_dir", - "/Users/qun/Documents/rust/rustfs/notify/logs/webhook", + current_root + .clone() + .join("/deploy/logs/notify/webhook") + .to_str() + .unwrap() + .to_string(), ); let mut webhook_targets = std::collections::HashMap::new(); webhook_targets.insert("1".to_string(), webhook_kvs); config.insert("notify_webhook".to_string(), webhook_targets); - // 加载初始配置并初始化系统 + // Load the initial configuration and initialize the system *system.config.write().await = config; system.init().await?; info!("✅ System initialized with Webhook target."); tokio::time::sleep(Duration::from_secs(1)).await; - // --- 2. 动态更新系统配置:添加一个 MQTT Target --- + // --- Dynamically update system configuration: Add an MQTT Target --- info!("\n---> Dynamically adding MQTT target..."); let mut mqtt_kvs = KVS::new(); mqtt_kvs.set("enable", "on"); @@ -47,18 +52,13 @@ async fn main() -> Result<(), NotificationError> { mqtt_kvs.set("password", "123456"); mqtt_kvs.set("queue_limit", "10000"); // mqtt_kvs.set("queue_dir", "./logs/mqtt"); - mqtt_kvs.set( - "queue_dir", - "/Users/qun/Documents/rust/rustfs/notify/logs/mqtt", - ); - system - .set_target_config("notify_mqtt", "1", mqtt_kvs) - .await?; + mqtt_kvs.set("queue_dir", current_root.join("/deploy/logs/notify/mqtt").to_str().unwrap().to_string()); + system.set_target_config("notify_mqtt", "1", mqtt_kvs).await?; info!("✅ MQTT target added and system reloaded."); tokio::time::sleep(Duration::from_secs(1)).await; - // --- 3. 加载和管理 Bucket 配置 --- + // --- Loading and managing Bucket configurations --- info!("\n---> Loading bucket notification config..."); let mut bucket_config = BucketNotificationConfig::new("us-east-1"); bucket_config.add_rule( @@ -71,12 +71,10 @@ async fn main() -> Result<(), NotificationError> { "*".to_string(), TargetID::new("1".to_string(), "mqtt".to_string()), ); - system - .load_bucket_notification_config("my-bucket", &bucket_config) - .await?; + system.load_bucket_notification_config("my-bucket", &bucket_config).await?; info!("✅ Bucket 'my-bucket' config loaded."); - // --- 发送事件 --- + // --- Send events --- info!("\n---> Sending an event..."); let event = Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut); system @@ -86,7 +84,7 @@ async fn main() -> Result<(), NotificationError> { tokio::time::sleep(Duration::from_secs(2)).await; - // --- 动态移除配置 --- + // --- Dynamically remove configuration --- info!("\n---> Dynamically removing Webhook target..."); system.remove_target_config("notify_webhook", "1").await?; info!("✅ Webhook target removed and system reloaded."); diff --git a/crates/notify/examples/webhook.rs b/crates/notify/examples/webhook.rs index fea52016..362fe3f4 100644 --- a/crates/notify/examples/webhook.rs +++ b/crates/notify/examples/webhook.rs @@ -16,40 +16,37 @@ struct ResetParams { reason: Option, } -// 定义一个全局变量 统计接受到数据条数 +// Define a global variable and count the number of data received use std::sync::atomic::{AtomicU64, Ordering}; static WEBHOOK_COUNT: AtomicU64 = AtomicU64::new(0); #[tokio::main] async fn main() { - // 构建应用 + // Build an application let app = Router::new() .route("/webhook", post(receive_webhook)) - .route( - "/webhook/reset/{reason}", - get(reset_webhook_count_with_path), - ) + .route("/webhook/reset/{reason}", get(reset_webhook_count_with_path)) .route("/webhook/reset", get(reset_webhook_count)) .route("/webhook", get(receive_webhook)); - // 启动服务器 + // Start the server let addr = "0.0.0.0:3020"; let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); println!("Server running on {}", addr); - // 服务启动后进行自检 + // Self-checking after the service is started tokio::spawn(async move { - // 给服务器一点时间启动 + // Give the server some time to start tokio::time::sleep(std::time::Duration::from_secs(1)).await; match is_service_active(addr).await { - Ok(true) => println!("服务健康检查:成功 - 服务正常运行"), - Ok(false) => eprintln!("服务健康检查:失败 - 服务未响应"), - Err(e) => eprintln!("服务健康检查错误:{}", e), + Ok(true) => println!("Service health check: Successful - Service is running normally"), + Ok(false) => eprintln!("Service Health Check: Failed - Service Not Responded"), + Err(e) => eprintln!("Service health check errors:{}", e), } }); - // 创建关闭信号处理 + // Create a shutdown signal processing tokio::select! { result = axum::serve(listener, app) => { if let Err(e) = result { @@ -62,16 +59,14 @@ async fn main() { } } -/// 创建一个方法重置 WEBHOOK_COUNT 的值 -async fn reset_webhook_count_with_path( - axum::extract::Path(reason): axum::extract::Path, -) -> Response { - // 输出当前计数器的值 +/// Create a method to reset the value of WEBHOOK_COUNT +async fn reset_webhook_count_with_path(axum::extract::Path(reason): axum::extract::Path) -> Response { + // Output the value of the current counter let current_count = WEBHOOK_COUNT.load(Ordering::SeqCst); println!("Current webhook count: {}", current_count); println!("Reset webhook count, reason: {}", reason); - // 将计数器重置为 0 + // Reset the counter to 0 WEBHOOK_COUNT.store(0, Ordering::SeqCst); println!("Webhook count has been reset to 0."); @@ -85,17 +80,14 @@ async fn reset_webhook_count_with_path( .unwrap() } -/// 创建一个方法重置 WEBHOOK_COUNT 的值 -/// 可以通过调用此方法来重置计数器 -async fn reset_webhook_count( - Query(params): Query, - headers: HeaderMap, -) -> Response { - // 输出当前计数器的值 +/// Create a method to reset the value of WEBHOOK_COUNT +/// You can reset the counter by calling this method +async fn reset_webhook_count(Query(params): Query, headers: HeaderMap) -> Response { + // Output the value of the current counter let current_count = WEBHOOK_COUNT.load(Ordering::SeqCst); println!("Current webhook count: {}", current_count); - let reason = params.reason.unwrap_or_else(|| "未提供原因".to_string()); + let reason = params.reason.unwrap_or_else(|| "Reason not provided".to_string()); println!("Reset webhook count, reason: {}", reason); for header in headers { @@ -104,51 +96,41 @@ async fn reset_webhook_count( } println!("Reset webhook count printed headers"); - // 将计数器重置为 0 + // Reset the counter to 0 WEBHOOK_COUNT.store(0, Ordering::SeqCst); println!("Webhook count has been reset to 0."); Response::builder() .header("Foo", "Bar") .status(StatusCode::OK) - .body(format!( - "Webhook count reset successfully current_count:{}", - current_count - )) + .body(format!("Webhook count reset successfully current_count:{}", current_count)) .unwrap() } async fn is_service_active(addr: &str) -> Result { let socket_addr = tokio::net::lookup_host(addr) .await - .map_err(|e| format!("无法解析主机:{}", e))? + .map_err(|e| format!("Unable to resolve host:{}", e))? .next() - .ok_or_else(|| "未找到地址".to_string())?; + .ok_or_else(|| "Address not found".to_string())?; - println!("正在检查服务状态:{}", socket_addr); + println!("Checking service status:{}", socket_addr); - match tokio::time::timeout( - std::time::Duration::from_secs(5), - tokio::net::TcpStream::connect(socket_addr), - ) - .await - { + match tokio::time::timeout(std::time::Duration::from_secs(5), tokio::net::TcpStream::connect(socket_addr)).await { Ok(Ok(_)) => Ok(true), Ok(Err(e)) => { if e.kind() == std::io::ErrorKind::ConnectionRefused { Ok(false) } else { - Err(format!("连接失败:{}", e)) + Err(format!("Connection failed:{}", e)) } } - Err(_) => Err("连接超时".to_string()), + Err(_) => Err("Connection timeout".to_string()), } } async fn receive_webhook(Json(payload): Json) -> StatusCode { let start = SystemTime::now(); - let since_the_epoch = start - .duration_since(UNIX_EPOCH) - .expect("Time went backwards"); + let since_the_epoch = start.duration_since(UNIX_EPOCH).expect("Time went backwards"); // get the number of seconds since the unix era let seconds = since_the_epoch.as_secs(); @@ -157,20 +139,14 @@ async fn receive_webhook(Json(payload): Json) -> StatusCode { let (year, month, day, hour, minute, second) = convert_seconds_to_date(seconds); // output result - println!( - "current time:{:04}-{:02}-{:02} {:02}:{:02}:{:02}", - year, month, day, hour, minute, second - ); + println!("current time:{:04}-{:02}-{:02} {:02}:{:02}:{:02}", year, month, day, hour, minute, second); println!( "received a webhook request time:{} content:\n {}", seconds, serde_json::to_string_pretty(&payload).unwrap() ); WEBHOOK_COUNT.fetch_add(1, Ordering::SeqCst); - println!( - "Total webhook requests received: {}", - WEBHOOK_COUNT.load(Ordering::SeqCst) - ); + println!("Total webhook requests received: {}", WEBHOOK_COUNT.load(Ordering::SeqCst)); StatusCode::OK } @@ -221,12 +197,5 @@ fn convert_seconds_to_date(seconds: u64) -> (u32, u32, u32, u32, u32, u32) { // calculate the number of seconds second += total_seconds; - ( - year as u32, - month as u32, - day as u32, - hour as u32, - minute as u32, - second as u32, - ) + (year as u32, month as u32, day as u32, hour as u32, minute as u32, second as u32) } diff --git a/crates/notify/src/event.rs b/crates/notify/src/event.rs index f0e258b2..db08bef7 100644 --- a/crates/notify/src/event.rs +++ b/crates/notify/src/event.rs @@ -3,23 +3,23 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; -/// 当解析事件名称字符串失败时返回的错误。 +/// Error returned when parsing event name string fails。 #[derive(Debug, Clone, PartialEq, Eq)] pub struct ParseEventNameError(String); impl fmt::Display for ParseEventNameError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "无效的事件名称:{}", self.0) + write!(f, "Invalid event name:{}", self.0) } } impl std::error::Error for ParseEventNameError {} -/// 表示对象上发生的事件类型。 -/// 基于 AWS S3 事件类型,并包含 RustFS 扩展。 +/// Represents the type of event that occurs on the object. +/// Based on AWS S3 event type and includes RustFS extension. #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] pub enum EventName { - // 单一事件类型 (值为 1-32 以兼容掩码逻辑) + // Single event type (values are 1-32 for compatible mask logic) ObjectAccessedGet = 1, ObjectAccessedGetRetention = 2, ObjectAccessedGetLegalHold = 3, @@ -48,23 +48,23 @@ pub enum EventName { ObjectRestoreCompleted = 26, ObjectTransitionFailed = 27, ObjectTransitionComplete = 28, - ScannerManyVersions = 29, // 对应 Go 的 ObjectManyVersions - ScannerLargeVersions = 30, // 对应 Go 的 ObjectLargeVersions - ScannerBigPrefix = 31, // 对应 Go 的 PrefixManyFolders - LifecycleDelMarkerExpirationDelete = 32, // 对应 Go 的 ILMDelMarkerExpirationDelete + ScannerManyVersions = 29, // ObjectManyVersions corresponding to Go + ScannerLargeVersions = 30, // ObjectLargeVersions corresponding to Go + ScannerBigPrefix = 31, // PrefixManyFolders corresponding to Go + LifecycleDelMarkerExpirationDelete = 32, // ILMDelMarkerExpirationDelete corresponding to Go - // 复合 "All" 事件类型 (没有用于掩码的顺序值) + // Compound "All" event type (no sequential value for mask) ObjectAccessedAll, ObjectCreatedAll, ObjectRemovedAll, ObjectReplicationAll, ObjectRestoreAll, ObjectTransitionAll, - ObjectScannerAll, // 新增,来自 Go - Everything, // 新增,来自 Go + ObjectScannerAll, // New, from Go + Everything, // New, from Go } -// 用于 Everything.expand() 的单一事件类型顺序数组 +// Single event type sequential array for Everything.expand() const SINGLE_EVENT_NAMES_IN_ORDER: [EventName; 32] = [ EventName::ObjectAccessedGet, EventName::ObjectAccessedGetRetention, @@ -103,7 +103,7 @@ const SINGLE_EVENT_NAMES_IN_ORDER: [EventName; 32] = [ const LAST_SINGLE_TYPE_VALUE: u32 = EventName::LifecycleDelMarkerExpirationDelete as u32; impl EventName { - /// 解析字符串为 EventName。 + /// The parsed string is EventName. pub fn parse(s: &str) -> Result { match s { "s3:BucketCreated:*" => Ok(EventName::BucketCreated), @@ -115,9 +115,7 @@ impl EventName { "s3:ObjectAccessed:Head" => Ok(EventName::ObjectAccessedHead), "s3:ObjectAccessed:Attributes" => Ok(EventName::ObjectAccessedAttributes), "s3:ObjectCreated:*" => Ok(EventName::ObjectCreatedAll), - "s3:ObjectCreated:CompleteMultipartUpload" => { - Ok(EventName::ObjectCreatedCompleteMultipartUpload) - } + "s3:ObjectCreated:CompleteMultipartUpload" => Ok(EventName::ObjectCreatedCompleteMultipartUpload), "s3:ObjectCreated:Copy" => Ok(EventName::ObjectCreatedCopy), "s3:ObjectCreated:Post" => Ok(EventName::ObjectCreatedPost), "s3:ObjectCreated:Put" => Ok(EventName::ObjectCreatedPut), @@ -127,25 +125,15 @@ impl EventName { "s3:ObjectCreated:DeleteTagging" => Ok(EventName::ObjectCreatedDeleteTagging), "s3:ObjectRemoved:*" => Ok(EventName::ObjectRemovedAll), "s3:ObjectRemoved:Delete" => Ok(EventName::ObjectRemovedDelete), - "s3:ObjectRemoved:DeleteMarkerCreated" => { - Ok(EventName::ObjectRemovedDeleteMarkerCreated) - } + "s3:ObjectRemoved:DeleteMarkerCreated" => Ok(EventName::ObjectRemovedDeleteMarkerCreated), "s3:ObjectRemoved:NoOP" => Ok(EventName::ObjectRemovedNoOP), "s3:ObjectRemoved:DeleteAllVersions" => Ok(EventName::ObjectRemovedDeleteAllVersions), - "s3:LifecycleDelMarkerExpiration:Delete" => { - Ok(EventName::LifecycleDelMarkerExpirationDelete) - } + "s3:LifecycleDelMarkerExpiration:Delete" => Ok(EventName::LifecycleDelMarkerExpirationDelete), "s3:Replication:*" => Ok(EventName::ObjectReplicationAll), "s3:Replication:OperationFailedReplication" => Ok(EventName::ObjectReplicationFailed), - "s3:Replication:OperationCompletedReplication" => { - Ok(EventName::ObjectReplicationComplete) - } - "s3:Replication:OperationMissedThreshold" => { - Ok(EventName::ObjectReplicationMissedThreshold) - } - "s3:Replication:OperationReplicatedAfterThreshold" => { - Ok(EventName::ObjectReplicationReplicatedAfterThreshold) - } + "s3:Replication:OperationCompletedReplication" => Ok(EventName::ObjectReplicationComplete), + "s3:Replication:OperationMissedThreshold" => Ok(EventName::ObjectReplicationMissedThreshold), + "s3:Replication:OperationReplicatedAfterThreshold" => Ok(EventName::ObjectReplicationReplicatedAfterThreshold), "s3:Replication:OperationNotTracked" => Ok(EventName::ObjectReplicationNotTracked), "s3:ObjectRestore:*" => Ok(EventName::ObjectRestoreAll), "s3:ObjectRestore:Post" => Ok(EventName::ObjectRestorePost), @@ -156,12 +144,12 @@ impl EventName { "s3:Scanner:ManyVersions" => Ok(EventName::ScannerManyVersions), "s3:Scanner:LargeVersions" => Ok(EventName::ScannerLargeVersions), "s3:Scanner:BigPrefix" => Ok(EventName::ScannerBigPrefix), - // ObjectScannerAll 和 Everything 不能从字符串解析,因为 Go 版本也没有定义它们的字符串表示 + // ObjectScannerAll and Everything cannot be parsed from strings, because the Go version also does not define their string representation. _ => Err(ParseEventNameError(s.to_string())), } } - /// 返回事件类型的字符串表示。 + /// Returns a string representation of the event type. pub fn as_str(&self) -> &'static str { match self { EventName::BucketCreated => "s3:BucketCreated:*", @@ -173,9 +161,7 @@ impl EventName { EventName::ObjectAccessedHead => "s3:ObjectAccessed:Head", EventName::ObjectAccessedAttributes => "s3:ObjectAccessed:Attributes", EventName::ObjectCreatedAll => "s3:ObjectCreated:*", - EventName::ObjectCreatedCompleteMultipartUpload => { - "s3:ObjectCreated:CompleteMultipartUpload" - } + EventName::ObjectCreatedCompleteMultipartUpload => "s3:ObjectCreated:CompleteMultipartUpload", EventName::ObjectCreatedCopy => "s3:ObjectCreated:Copy", EventName::ObjectCreatedPost => "s3:ObjectCreated:Post", EventName::ObjectCreatedPut => "s3:ObjectCreated:Put", @@ -188,19 +174,13 @@ impl EventName { EventName::ObjectRemovedDeleteMarkerCreated => "s3:ObjectRemoved:DeleteMarkerCreated", EventName::ObjectRemovedNoOP => "s3:ObjectRemoved:NoOP", EventName::ObjectRemovedDeleteAllVersions => "s3:ObjectRemoved:DeleteAllVersions", - EventName::LifecycleDelMarkerExpirationDelete => { - "s3:LifecycleDelMarkerExpiration:Delete" - } + EventName::LifecycleDelMarkerExpirationDelete => "s3:LifecycleDelMarkerExpiration:Delete", EventName::ObjectReplicationAll => "s3:Replication:*", EventName::ObjectReplicationFailed => "s3:Replication:OperationFailedReplication", EventName::ObjectReplicationComplete => "s3:Replication:OperationCompletedReplication", EventName::ObjectReplicationNotTracked => "s3:Replication:OperationNotTracked", - EventName::ObjectReplicationMissedThreshold => { - "s3:Replication:OperationMissedThreshold" - } - EventName::ObjectReplicationReplicatedAfterThreshold => { - "s3:Replication:OperationReplicatedAfterThreshold" - } + EventName::ObjectReplicationMissedThreshold => "s3:Replication:OperationMissedThreshold", + EventName::ObjectReplicationReplicatedAfterThreshold => "s3:Replication:OperationReplicatedAfterThreshold", EventName::ObjectRestoreAll => "s3:ObjectRestore:*", EventName::ObjectRestorePost => "s3:ObjectRestore:Post", EventName::ObjectRestoreCompleted => "s3:ObjectRestore:Completed", @@ -210,13 +190,13 @@ impl EventName { EventName::ScannerManyVersions => "s3:Scanner:ManyVersions", EventName::ScannerLargeVersions => "s3:Scanner:LargeVersions", EventName::ScannerBigPrefix => "s3:Scanner:BigPrefix", - // Go 的 String() 对 ObjectScannerAll 和 Everything 返回 "" - EventName::ObjectScannerAll => "s3:Scanner:*", // 遵循 Go Expand 中的模式 - EventName::Everything => "", // Go String() 对未处理的返回 "" + // Go's String() returns "" for ObjectScannerAll and Everything + EventName::ObjectScannerAll => "s3:Scanner:*", // Follow the pattern in Go Expand + EventName::Everything => "", // Go String() returns "" to unprocessed } } - /// 返回缩写事件类型的扩展值。 + /// Returns the extended value of the abbreviation event type. pub fn expand(&self) -> Vec { match self { EventName::ObjectAccessedAll => vec![ @@ -249,41 +229,35 @@ impl EventName { EventName::ObjectReplicationMissedThreshold, EventName::ObjectReplicationReplicatedAfterThreshold, ], - EventName::ObjectRestoreAll => vec![ - EventName::ObjectRestorePost, - EventName::ObjectRestoreCompleted, - ], - EventName::ObjectTransitionAll => vec![ - EventName::ObjectTransitionFailed, - EventName::ObjectTransitionComplete, - ], + EventName::ObjectRestoreAll => vec![EventName::ObjectRestorePost, EventName::ObjectRestoreCompleted], + EventName::ObjectTransitionAll => vec![EventName::ObjectTransitionFailed, EventName::ObjectTransitionComplete], EventName::ObjectScannerAll => vec![ - // 新增 + // New EventName::ScannerManyVersions, EventName::ScannerLargeVersions, EventName::ScannerBigPrefix, ], EventName::Everything => { - // 新增 + // New SINGLE_EVENT_NAMES_IN_ORDER.to_vec() } - // 单一类型直接返回自身 + // A single type returns to itself directly _ => vec![*self], } } - /// 返回类型的掩码。 - /// 复合 "All" 类型会被展开。 + /// Returns the mask of type. + /// The compound "All" type will be expanded. pub fn mask(&self) -> u64 { let value = *self as u32; if value > 0 && value <= LAST_SINGLE_TYPE_VALUE { - // 是单一类型 + // It's a single type 1u64 << (value - 1) } else { - // 是复合类型 + // It's a compound type let mut mask = 0u64; for n in self.expand() { - mask |= n.mask(); // 递归调用 mask + mask |= n.mask(); // Recursively call mask } mask } @@ -296,7 +270,7 @@ impl fmt::Display for EventName { } } -/// 根据字符串转换为 `EventName` +/// Convert to `EventName` according to string impl From<&str> for EventName { fn from(event_str: &str) -> Self { EventName::parse(event_str).unwrap_or_else(|e| panic!("{}", e)) @@ -399,28 +373,16 @@ impl Event { pub fn new_test_event(bucket: &str, key: &str, event_name: EventName) -> Self { let mut user_metadata = HashMap::new(); user_metadata.insert("x-amz-meta-test".to_string(), "value".to_string()); - user_metadata.insert( - "x-amz-storage-storage-options".to_string(), - "value".to_string(), - ); + user_metadata.insert("x-amz-storage-storage-options".to_string(), "value".to_string()); user_metadata.insert("x-amz-meta-".to_string(), "value".to_string()); user_metadata.insert("x-rustfs-meta-".to_string(), "rustfs-value".to_string()); user_metadata.insert("x-request-id".to_string(), "request-id-123".to_string()); user_metadata.insert("x-bucket".to_string(), "bucket".to_string()); user_metadata.insert("x-object".to_string(), "object".to_string()); - user_metadata.insert( - "x-rustfs-origin-endpoint".to_string(), - "http://127.0.0.1".to_string(), - ); + user_metadata.insert("x-rustfs-origin-endpoint".to_string(), "http://127.0.0.1".to_string()); user_metadata.insert("x-rustfs-user-metadata".to_string(), "metadata".to_string()); - user_metadata.insert( - "x-rustfs-deployment-id".to_string(), - "deployment-id-123".to_string(), - ); - user_metadata.insert( - "x-rustfs-origin-endpoint-code".to_string(), - "http://127.0.0.1".to_string(), - ); + user_metadata.insert("x-rustfs-deployment-id".to_string(), "deployment-id-123".to_string()); + user_metadata.insert("x-rustfs-origin-endpoint-code".to_string(), "http://127.0.0.1".to_string()); user_metadata.insert("x-rustfs-bucket-name".to_string(), "bucket".to_string()); user_metadata.insert("x-rustfs-object-key".to_string(), key.to_string()); user_metadata.insert("x-rustfs-object-size".to_string(), "1024".to_string()); @@ -466,7 +428,7 @@ impl Event { }, } } - /// 返回事件掩码 + /// Return event mask pub fn mask(&self) -> u64 { self.event_name.mask() } diff --git a/crates/notify/src/lib.rs b/crates/notify/src/lib.rs index 610f30ba..26e0906a 100644 --- a/crates/notify/src/lib.rs +++ b/crates/notify/src/lib.rs @@ -1,4 +1,4 @@ -//! RustFs Notify - A flexible and extensible event notification system for object storage. +//! RustFS Notify - A flexible and extensible event notification system for object storage. //! //! This library provides a Rust implementation of a storage bucket notification system, //! similar to RustFS's notification system. It supports sending events to various targets @@ -35,7 +35,7 @@ use tracing_subscriber::{fmt, prelude::*, util::SubscriberInitExt, EnvFilter}; /// /// # Example /// ``` -/// notify::init_logger(notify::LogLevel::Info); +/// rustfs_notify::init_logger(rustfs_notify::LogLevel::Info); /// ``` pub fn init_logger(level: LogLevel) { let filter = EnvFilter::default().add_directive(level.into()); diff --git a/crates/notify/src/rules/config.rs b/crates/notify/src/rules/config.rs index 8beeb361..08ff8ed8 100644 --- a/crates/notify/src/rules/config.rs +++ b/crates/notify/src/rules/config.rs @@ -39,7 +39,7 @@ impl BucketNotificationConfig { /// Parses notification configuration from XML. /// `arn_list` is a list of valid ARN strings for validation. - pub fn from_xml( + pub fn from_xml( reader: R, current_region: &str, arn_list: &[String], @@ -72,11 +72,7 @@ impl BucketNotificationConfig { /// However, Go's Config has a Validate method. /// The primary validation now happens during `from_xml` via `NotificationConfiguration::validate`. /// This method could re-check against an updated arn_list or region if needed. - pub fn validate( - &self, - current_region: &str, - arn_list: &[String], - ) -> Result<(), BucketNotificationConfigError> { + pub fn validate(&self, current_region: &str, arn_list: &[String]) -> Result<(), BucketNotificationConfigError> { if self.region != current_region { return Err(BucketNotificationConfigError::RegionMismatch { config_region: self.region.clone(), @@ -93,9 +89,7 @@ impl BucketNotificationConfig { // Construct the ARN string for this target_id and self.region let arn_to_check = target_id.to_arn(&self.region); // Assuming TargetID has to_arn if !arn_list.contains(&arn_to_check.to_arn_string()) { - return Err(BucketNotificationConfigError::ArnNotFound( - arn_to_check.to_arn_string(), - )); + return Err(BucketNotificationConfigError::ArnNotFound(arn_to_check.to_arn_string())); } } } diff --git a/crates/notify/src/rules/rules_map.rs b/crates/notify/src/rules/rules_map.rs index 74b7501f..7ec1b3bb 100644 --- a/crates/notify/src/rules/rules_map.rs +++ b/crates/notify/src/rules/rules_map.rs @@ -22,12 +22,7 @@ impl RulesMap { /// target_id: Notify the target. /// /// This method expands the composite event name. - pub fn add_rule_config( - &mut self, - event_names: &[EventName], - pattern: String, - target_id: TargetID, - ) { + pub fn add_rule_config(&mut self, event_names: &[EventName], pattern: String, target_id: TargetID) { let mut effective_pattern = pattern; if effective_pattern.is_empty() { effective_pattern = "*".to_string(); // Match all by default @@ -55,8 +50,7 @@ impl RulesMap { } } - /// 从当前 RulesMap 中移除另一个 RulesMap 中定义的规则。 - /// 对应 Go 的 `RulesMap.Remove(rulesMap2 RulesMap)` + /// Remove another rule defined in the RulesMap from the current RulesMap. pub fn remove_map(&mut self, other_map: &Self) { let mut events_to_remove = Vec::new(); for (event_name, self_pattern_rules) in &mut self.map { @@ -72,24 +66,24 @@ impl RulesMap { } } - /// 匹配给定事件名称和对象键的规则,返回所有匹配的 TargetID。 + ///Rules matching the given event name and object key, returning all matching TargetIDs. pub fn match_rules(&self, event_name: EventName, object_key: &str) -> TargetIdSet { - // 首先尝试直接匹配事件名称 + // First try to directly match the event name if let Some(pattern_rules) = self.map.get(&event_name) { let targets = pattern_rules.match_targets(object_key); if !targets.is_empty() { return targets; } } - // Go 的 RulesMap[eventName] 直接获取,如果不存在则为空 Rules。 - // Rust 的 HashMap::get 返回 Option。如果事件名不存在,则没有规则。 - // 复合事件(如 ObjectCreatedAll)在 add_rule_config 时已展开为单一事件。 - // 因此,查询时应使用单一事件名称。 - // 如果 event_name 本身就是单一类型,则直接查找。 - // 如果 event_name 是复合类型,Go 的逻辑是在添加时展开。 - // 这里的 match_rules 应该接收已经可能是单一的事件。 - // 如果调用者传入的是复合事件,它应该先自行展开或此函数处理。 - // 假设 event_name 已经是具体的、可用于查找的事件。 + // Go's RulesMap[eventName] is directly retrieved, and if it does not exist, it is empty Rules. + // Rust's HashMap::get returns Option. If the event name does not exist, there is no rule. + // Compound events (such as ObjectCreatedAll) have been expanded as a single event when add_rule_config. + // Therefore, a single event name should be used when querying. + // If event_name itself is a single type, look it up directly. + // If event_name is a compound type, Go's logic is expanded when added. + // Here match_rules should receive events that may already be single. + // If the caller passes in a compound event, it should expand itself or handle this function first. + // Assume that event_name is already a specific event that can be used for searching. self.map .get(&event_name) .map_or_else(TargetIdSet::new, |pr| pr.match_targets(object_key)) @@ -99,7 +93,7 @@ impl RulesMap { self.map.is_empty() } - /// 返回内部规则的克隆,用于 BucketNotificationConfig::validate 等场景。 + /// Returns a clone of internal rules for use in scenarios such as BucketNotificationConfig::validate. pub fn inner(&self) -> &HashMap { &self.map } diff --git a/crates/notify/src/rules/xml_config.rs b/crates/notify/src/rules/xml_config.rs index cd9258cf..b1f6f471 100644 --- a/crates/notify/src/rules/xml_config.rs +++ b/crates/notify/src/rules/xml_config.rs @@ -9,7 +9,7 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum ParseConfigError { #[error("XML parsing error:{0}")] - XmlError(#[from] quick_xml::errors::Error), + XmlError(#[from] quick_xml::errors::serialize::DeError), #[error("Invalid filter value:{0}")] InvalidFilterValue(String), #[error("Invalid filter name: {0}, only 'prefix' or 'suffix' is allowed")] @@ -193,10 +193,10 @@ impl QueueConfig { pub struct LambdaConfigDetail { #[serde(rename = "CloudFunction")] pub arn: String, - // 根据 AWS S3 文档, 通常还包含 Id, Event, Filter - // 但为了严格对应提供的 Go `lambda` 结构体,这里只包含 ARN。 - // 如果需要完整支持,可以添加其他字段。 - // 例如: + // According to AWS S3 documentation, usually also contains Id, Event, Filter + // But in order to strictly correspond to the Go `lambda` structure provided, only ARN is included here. + // If full support is required, additional fields can be added. + // For example: // #[serde(rename = "Id", skip_serializing_if = "Option::is_none")] // pub id: Option, // #[serde(rename = "Event", default, skip_serializing_if = "Vec::is_empty")] @@ -211,7 +211,7 @@ pub struct LambdaConfigDetail { pub struct TopicConfigDetail { #[serde(rename = "Topic")] pub arn: String, - // 类似于 LambdaConfigDetail,可以根据需要扩展以包含 Id, Event, Filter 等字段。 + // Similar to LambdaConfigDetail, it can be extended to include fields such as Id, Event, Filter, etc. } #[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)] @@ -236,8 +236,8 @@ pub struct NotificationConfiguration { } impl NotificationConfiguration { - pub fn from_reader(reader: R) -> Result { - let config: NotificationConfiguration = quick_xml::reader::Reader::from_reader(reader)?; + pub fn from_reader(reader: R) -> Result { + let config: NotificationConfiguration = quick_xml::de::from_reader(reader)?; Ok(config) } @@ -268,7 +268,7 @@ impl NotificationConfiguration { if self.xmlns.is_none() { self.xmlns = Some("http://s3.amazonaws.com/doc/2006-03-01/".to_string()); } - // 注意:如果 LambdaConfigDetail 和 TopicConfigDetail 将来包含区域等信息, - // 也可能需要在这里设置默认值。但根据当前定义,它们只包含 ARN 字符串。 + // Note: If LambdaConfigDetail and TopicConfigDetail contain information such as regions in the future, + // You may also need to set the default value here. But according to the current definition, they only contain ARN strings. } } diff --git a/crates/notify/src/stream.rs b/crates/notify/src/stream.rs index f8bb1dfd..02cdc2e9 100644 --- a/crates/notify/src/stream.rs +++ b/crates/notify/src/stream.rs @@ -13,7 +13,7 @@ use tracing::{debug, error, info, warn}; /// Streams events from the store to the target pub async fn stream_events( - store: &mut (dyn Store + Send), + store: &mut (dyn Store + Send), target: &dyn Target, mut cancel_rx: mpsc::Receiver<()>, ) { @@ -42,10 +42,7 @@ pub async fn stream_events( for key in keys { // Check for cancellation before processing each event if cancel_rx.try_recv().is_ok() { - info!( - "Cancellation received during processing for target: {}", - target.name() - ); + info!("Cancellation received during processing for target: {}", target.name()); return; } @@ -70,7 +67,7 @@ pub async fn stream_events( TargetError::Timeout(_) => { warn!("Timeout for target {}, retrying...", target.name()); retry_count += 1; - sleep(Duration::from_secs((retry_count * 5) as u64)).await; // 指数退避 + sleep(Duration::from_secs((retry_count * 5) as u64)).await; // Exponential backoff } _ => { // Permanent error, skip this event @@ -84,11 +81,7 @@ pub async fn stream_events( // Remove event from store if successfully sent if retry_count >= MAX_RETRIES && !success { - warn!( - "Max retries exceeded for event {}, target: {}, skipping", - key.to_string(), - target.name() - ); + warn!("Max retries exceeded for event {}, target: {}, skipping", key.to_string(), target.name()); } } @@ -120,10 +113,7 @@ pub fn start_event_stream_with_batching( semaphore: Arc, ) -> mpsc::Sender<()> { let (cancel_tx, cancel_rx) = mpsc::channel(1); - debug!( - "Starting event stream with batching for target: {}", - target.name() - ); + debug!("Starting event stream with batching for target: {}", target.name()); tokio::spawn(async move { stream_events_with_batching(&mut *store, &*target, cancel_rx, metrics, semaphore).await; info!("Event stream stopped for target: {}", target.name()); @@ -132,7 +122,7 @@ pub fn start_event_stream_with_batching( cancel_tx } -/// 带批处理的事件流处理 +/// Event stream processing with batch processing pub async fn stream_events_with_batching( store: &mut (dyn Store + Send), target: &dyn Target, @@ -140,10 +130,7 @@ pub async fn stream_events_with_batching( metrics: Arc, semaphore: Arc, ) { - info!( - "Starting event stream with batching for target: {}", - target.name() - ); + info!("Starting event stream with batching for target: {}", target.name()); // Configuration parameters const DEFAULT_BATCH_SIZE: usize = 1; @@ -160,106 +147,64 @@ pub async fn stream_events_with_batching( let mut last_flush = Instant::now(); loop { - // 检查取消信号 + // Check the cancel signal if cancel_rx.try_recv().is_ok() { info!("Cancellation received for target: {}", target.name()); return; } - // 获取存储中的事件列表 + // Get a list of events in storage let keys = store.list(); - debug!( - "Found {} keys in store for target: {}", - keys.len(), - target.name() - ); + debug!("Found {} keys in store for target: {}", keys.len(), target.name()); if keys.is_empty() { - // 如果批处理中有数据且超时,则刷新批处理 + // If there is data in the batch and timeout, refresh the batch if !batch.is_empty() && last_flush.elapsed() >= BATCH_TIMEOUT { - process_batch( - &mut batch, - &mut batch_keys, - target, - MAX_RETRIES, - BASE_RETRY_DELAY, - &metrics, - &semaphore, - ) - .await; + process_batch(&mut batch, &mut batch_keys, target, MAX_RETRIES, BASE_RETRY_DELAY, &metrics, &semaphore).await; last_flush = Instant::now(); } - // 无事件,等待后再检查 + // No event, wait before checking tokio::time::sleep(Duration::from_millis(500)).await; continue; } - // 处理每个事件 + // Handle each event for key in keys { - // 再次检查取消信号 + // Check the cancel signal again if cancel_rx.try_recv().is_ok() { - info!( - "Cancellation received during processing for target: {}", - target.name() - ); + info!("Cancellation received during processing for target: {}", target.name()); - // 在退出前处理已收集的批次 + // Processing collected batches before exiting if !batch.is_empty() { - process_batch( - &mut batch, - &mut batch_keys, - target, - MAX_RETRIES, - BASE_RETRY_DELAY, - &metrics, - &semaphore, - ) - .await; + process_batch(&mut batch, &mut batch_keys, target, MAX_RETRIES, BASE_RETRY_DELAY, &metrics, &semaphore).await; } return; } - // 尝试从存储中获取事件 + // Try to get events from storage match store.get(&key) { Ok(event) => { - // 添加到批处理 + // Add to batch batch.push(event); batch_keys.push(key); metrics.increment_processing(); - // 如果批次已满或距离上次刷新已经过了足够时间,则处理批次 + // If the batch is full or enough time has passed since the last refresh, the batch will be processed if batch.len() >= batch_size || last_flush.elapsed() >= BATCH_TIMEOUT { - process_batch( - &mut batch, - &mut batch_keys, - target, - MAX_RETRIES, - BASE_RETRY_DELAY, - &metrics, - &semaphore, - ) - .await; + process_batch(&mut batch, &mut batch_keys, target, MAX_RETRIES, BASE_RETRY_DELAY, &metrics, &semaphore) + .await; last_flush = Instant::now(); } } Err(e) => { - error!( - "Failed to target: {}, get event {} from store: {}", - target.name(), - key.to_string(), - e - ); - // 可以考虑删除无法读取的事件,防止无限循环尝试读取 + error!("Failed to target: {}, get event {} from store: {}", target.name(), key.to_string(), e); + // Consider deleting unreadable events to prevent infinite loops from trying to read match store.del(&key) { Ok(_) => { info!("Deleted corrupted event {} from store", key.to_string()); } Err(del_err) => { - error!( - "Failed to delete corrupted event {}: {}", - key.to_string(), - del_err - ); + error!("Failed to delete corrupted event {}: {}", key.to_string(), del_err); } } @@ -268,12 +213,12 @@ pub async fn stream_events_with_batching( } } - // 小延迟再进行下一轮检查 + // A small delay will be conducted to check the next round tokio::time::sleep(Duration::from_millis(100)).await; } } -/// 处理事件批次 +/// Processing event batches async fn process_batch( batch: &mut Vec, batch_keys: &mut Vec, @@ -283,16 +228,12 @@ async fn process_batch( metrics: &Arc, semaphore: &Arc, ) { - debug!( - "Processing batch of {} events for target: {}", - batch.len(), - target.name() - ); + debug!("Processing batch of {} events for target: {}", batch.len(), target.name()); if batch.is_empty() { return; } - // 获取信号量许可,限制并发 + // Obtain semaphore permission to limit concurrency let permit = match semaphore.clone().acquire_owned().await { Ok(permit) => permit, Err(e) => { @@ -301,30 +242,26 @@ async fn process_batch( } }; - // 处理批次中的每个事件 + // Handle every event in the batch for (_event, key) in batch.iter().zip(batch_keys.iter()) { let mut retry_count = 0; let mut success = false; - // 重试逻辑 + // Retry logic while retry_count < max_retries && !success { match target.send_from_store(key.clone()).await { Ok(_) => { - info!( - "Successfully sent event for target: {}, Key: {}", - target.name(), - key.to_string() - ); + info!("Successfully sent event for target: {}, Key: {}", target.name(), key.to_string()); success = true; metrics.increment_processed(); } Err(e) => { - // 根据错误类型采用不同的重试策略 + // Different retry strategies are adopted according to the error type match &e { TargetError::NotConnected => { warn!("Target {} not connected, retrying...", target.name()); retry_count += 1; - tokio::time::sleep(base_delay * (1 << retry_count)).await; // 指数退避 + tokio::time::sleep(base_delay * (1 << retry_count)).await; // Exponential backoff } TargetError::Timeout(_) => { warn!("Timeout for target {}, retrying...", target.name()); @@ -332,7 +269,7 @@ async fn process_batch( tokio::time::sleep(base_delay * (1 << retry_count)).await; } _ => { - // 永久性错误,跳过此事件 + // Permanent error, skip this event error!("Permanent error for target {}: {}", target.name(), e); metrics.increment_failed(); break; @@ -342,21 +279,17 @@ async fn process_batch( } } - // 处理最大重试次数耗尽的情况 + // Handle the situation where the maximum number of retry exhaustion is exhausted if retry_count >= max_retries && !success { - warn!( - "Max retries exceeded for event {}, target: {}, skipping", - key.to_string(), - target.name() - ); + warn!("Max retries exceeded for event {}, target: {}, skipping", key.to_string(), target.name()); metrics.increment_failed(); } } - // 清空已处理的批次 + // Clear processed batches batch.clear(); batch_keys.clear(); - // 释放信号量许可(通过 drop) + // Release semaphore permission (via drop) drop(permit); } diff --git a/crates/notify/src/target/constants.rs b/crates/notify/src/target/constants.rs index 4ac7b315..8dd0ab52 100644 --- a/crates/notify/src/target/constants.rs +++ b/crates/notify/src/target/constants.rs @@ -1,22 +1,22 @@ #[allow(dead_code)] -const NOTIFY_KAFKA_SUB_SYS: &str = "notify_kafka"; +pub const NOTIFY_KAFKA_SUB_SYS: &str = "notify_kafka"; #[allow(dead_code)] -const NOTIFY_MQTT_SUB_SYS: &str = "notify_mqtt"; +pub const NOTIFY_MQTT_SUB_SYS: &str = "notify_mqtt"; #[allow(dead_code)] -const NOTIFY_MY_SQL_SUB_SYS: &str = "notify_mysql"; +pub const NOTIFY_MY_SQL_SUB_SYS: &str = "notify_mysql"; #[allow(dead_code)] -const NOTIFY_NATS_SUB_SYS: &str = "notify_nats"; +pub const NOTIFY_NATS_SUB_SYS: &str = "notify_nats"; #[allow(dead_code)] -const NOTIFY_NSQ_SUB_SYS: &str = "notify_nsq"; +pub const NOTIFY_NSQ_SUB_SYS: &str = "notify_nsq"; #[allow(dead_code)] -const NOTIFY_ES_SUB_SYS: &str = "notify_elasticsearch"; +pub const NOTIFY_ES_SUB_SYS: &str = "notify_elasticsearch"; #[allow(dead_code)] -const NOTIFY_AMQP_SUB_SYS: &str = "notify_amqp"; +pub const NOTIFY_AMQP_SUB_SYS: &str = "notify_amqp"; #[allow(dead_code)] -const NOTIFY_POSTGRES_SUB_SYS: &str = "notify_postgres"; +pub const NOTIFY_POSTGRES_SUB_SYS: &str = "notify_postgres"; #[allow(dead_code)] -const NOTIFY_REDIS_SUB_SYS: &str = "notify_redis"; -const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook"; +pub const NOTIFY_REDIS_SUB_SYS: &str = "notify_redis"; +pub const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook"; // Webhook constants pub const WEBHOOK_ENDPOINT: &str = "endpoint"; diff --git a/crates/notify/src/utils.rs b/crates/notify/src/utils.rs index 03303863..afb63c6f 100644 --- a/crates/notify/src/utils.rs +++ b/crates/notify/src/utils.rs @@ -1,17 +1,14 @@ -use std::env; use std::fmt; - #[cfg(unix)] -use libc::uname; -#[cfg(unix)] -use std::ffi::CStr; +use std::os::unix::process::ExitStatusExt; #[cfg(windows)] -use std::process::Command; +use std::os::windows::process::ExitStatusExt; +use std::{env, process}; -// 定义 Rustfs 版本 +// Define Rustfs version const RUSTFS_VERSION: &str = "1.0.0"; -// 业务类型枚举 +// Business Type Enumeration #[derive(Debug, Clone, PartialEq)] pub enum ServiceType { Basis, @@ -33,7 +30,7 @@ impl ServiceType { } } -// UserAgent 结构体 +// UserAgent structure struct UserAgent { os_platform: String, arch: String, @@ -42,7 +39,7 @@ struct UserAgent { } impl UserAgent { - // 创建新的 UserAgent 实例,接受业务类型参数 + // Create a new UserAgent instance and accept business type parameters fn new(service: ServiceType) -> Self { let os_platform = Self::get_os_platform(); let arch = env::consts::ARCH.to_string(); @@ -56,7 +53,7 @@ impl UserAgent { } } - // 获取操作系统平台信息 + // Obtain operating system platform information fn get_os_platform() -> String { if cfg!(target_os = "windows") { Self::get_windows_platform() @@ -69,14 +66,18 @@ impl UserAgent { } } - // 获取 Windows 平台信息 + // Get Windows platform information #[cfg(windows)] fn get_windows_platform() -> String { - // 使用 cmd /c ver 获取版本 - let output = Command::new("cmd") + // Use cmd /c ver to get the version + let output = process::Command::new("cmd") .args(&["/C", "ver"]) .output() - .unwrap_or_default(); + .unwrap_or_else(|_| process::Output { + status: process::ExitStatus::from_raw(0), + stdout: Vec::new(), + stderr: Vec::new(), + }); let version = String::from_utf8_lossy(&output.stdout); let version = version .lines() @@ -92,27 +93,29 @@ impl UserAgent { "N/A".to_string() } - // 获取 macOS 平台信息 + // Get macOS platform information #[cfg(target_os = "macos")] fn get_macos_platform() -> String { - unsafe { - let mut name = std::mem::zeroed(); - if uname(&mut name) == 0 { - let release = CStr::from_ptr(name.release.as_ptr()).to_string_lossy(); - // 映射内核版本(如 23.5.0)到 User-Agent 格式(如 14_5_0) - let major = release - .split('.') - .next() - .unwrap_or("14") - .parse::() - .unwrap_or(14); - let minor = if major >= 20 { major - 9 } else { 14 }; - let patch = release.split('.').nth(1).unwrap_or("0"); - format!("Macintosh; Intel Mac OS X {}_{}_{}", minor, patch, 0) - } else { - "Macintosh; Intel Mac OS X 14_5_0".to_string() - } - } + let output = process::Command::new("sw_vers") + .args(&["-productVersion"]) + .output() + .unwrap_or_else(|_| process::Output { + status: process::ExitStatus::from_raw(0), + stdout: Vec::new(), + stderr: Vec::new(), + }); + let version = String::from_utf8_lossy(&output.stdout).trim().to_string(); + let parts: Vec<&str> = version.split('.').collect(); + let major = parts.get(0).unwrap_or(&"10").parse::().unwrap_or(10); + let minor = parts.get(1).map_or("15", |&m| m); + let patch = parts.get(2).map_or("0", |&p| p); + + // Detect whether it is an Apple Silicon chip + let arch = env::consts::ARCH; + let cpu_info = if arch == "aarch64" { "Apple" } else { "Intel" }; + + // Convert to User-Agent format + format!("Macintosh; {} Mac OS X {}_{}_{}", cpu_info, major, minor, patch) } #[cfg(not(target_os = "macos"))] @@ -120,17 +123,22 @@ impl UserAgent { "N/A".to_string() } - // 获取 Linux 平台信息 + // Get Linux platform information #[cfg(target_os = "linux")] fn get_linux_platform() -> String { - unsafe { - let mut name = std::mem::zeroed(); - if uname(&mut name) == 0 { - let release = CStr::from_ptr(name.release.as_ptr()).to_string_lossy(); - format!("X11; Linux {}", release) - } else { - "X11; Linux Unknown".to_string() - } + let output = process::Command::new("uname") + .arg("-r") + .output() + .unwrap_or_else(|_| process::Output { + status: process::ExitStatus::from_raw(0), + stdout: Vec::new(), + stderr: Vec::new(), + }); + if output.status.success() { + let release = String::from_utf8_lossy(&output.stdout).trim().to_string(); + format!("X11; Linux {}", release) + } else { + "X11; Linux Unknown".to_string() } } @@ -140,15 +148,11 @@ impl UserAgent { } } -// 实现 Display trait 以格式化 User-Agent +// Implement Display trait to format User-Agent impl fmt::Display for UserAgent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { if self.service == ServiceType::Basis { - return write!( - f, - "Mozilla/5.0 ({}; {}) Rustfs/{}", - self.os_platform, self.arch, self.version - ); + return write!(f, "Mozilla/5.0 ({}; {}) Rustfs/{}", self.os_platform, self.arch, self.version); } write!( f, @@ -161,7 +165,7 @@ impl fmt::Display for UserAgent { } } -// 获取 User-Agent 字符串,接受业务类型参数 +// Get the User-Agent string and accept business type parameters pub fn get_user_agent(service: ServiceType) -> String { UserAgent::new(service).to_string() } diff --git a/crates/utils/src/dirs.rs b/crates/utils/src/dirs.rs new file mode 100644 index 00000000..e2bcaebd --- /dev/null +++ b/crates/utils/src/dirs.rs @@ -0,0 +1,60 @@ +use std::env; +use std::path::{Path, PathBuf}; + +/// Get the absolute path to the current project +/// +/// This function will try the following method to get the project path: +/// 1. Use the `CARGO_MANIFEST_DIR` environment variable to get the project root directory. +/// 2. Use `std::env::current_exe()` to get the executable file path and deduce the project root directory. +/// 3. Use `std::env::current_dir()` to get the current working directory and try to deduce the project root directory. +/// +/// If all methods fail, an error is returned. +/// +/// # Returns +/// - `Ok(PathBuf)`: The absolute path of the project that was successfully obtained. +/// - `Err(String)`: Error message for the failed path. +pub fn get_project_root() -> Result { + // Try to get the project root directory through the CARGO_MANIFEST_DIR environment variable + if let Ok(manifest_dir) = env::var("CARGO_MANIFEST_DIR") { + let project_root = Path::new(&manifest_dir).to_path_buf(); + println!("Get the project root directory with CARGO_MANIFEST_DIR:{}", project_root.display()); + return Ok(project_root); + } + + // Try to deduce the project root directory through the current executable file path + if let Ok(current_exe) = env::current_exe() { + let mut project_root = current_exe; + // Assume that the project root directory is in the parent directory of the parent directory of the executable path (usually target/debug or target/release) + project_root.pop(); // Remove the executable file name + project_root.pop(); // Remove target/debug or target/release + println!("Deduce the project root directory through current_exe:{}", project_root.display()); + return Ok(project_root); + } + + // Try to deduce the project root directory from the current working directory + if let Ok(mut current_dir) = env::current_dir() { + // Assume that the project root directory is in the parent directory of the current working directory + current_dir.pop(); + println!("Deduce the project root directory through current_dir:{}", current_dir.display()); + return Ok(current_dir); + } + + // If all methods fail, return an error + Err("The project root directory cannot be obtained. Please check the running environment and project structure.".to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_project_root() { + match get_project_root() { + Ok(path) => { + assert!(path.exists(), "The project root directory does not exist:{}", path.display()); + println!("The test is passed, the project root directory:{}", path.display()); + } + Err(e) => panic!("Failed to get the project root directory:{}", e), + } + } +} diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index c9fcfbd3..0f9433ba 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -28,6 +28,9 @@ pub mod crypto; #[cfg(feature = "compress")] pub mod compress; +#[cfg(feature = "path")] +pub mod dirs; + #[cfg(feature = "tls")] pub use certs::*; #[cfg(feature = "hash")] diff --git a/ecstore/src/config/com.rs b/ecstore/src/config/com.rs index 5897e296..7642d691 100644 --- a/ecstore/src/config/com.rs +++ b/ecstore/src/config/com.rs @@ -8,7 +8,6 @@ use rustfs_utils::path::SLASH_SEPARATOR; use std::collections::HashSet; use std::sync::Arc; use tracing::{error, warn}; -use crate::disk::fs::SLASH_SEPARATOR; pub const CONFIG_PREFIX: &str = "config"; const CONFIG_FILE: &str = "config.json"; diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 1dee2301..a235feef 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -1,9 +1,8 @@ -use core::slice::SlicePattern; use crate::bitrot::{create_bitrot_reader, create_bitrot_writer}; use crate::disk::error_reduce::{reduce_read_quorum_errs, reduce_write_quorum_errs, OBJECT_OP_IGNORED_ERRS}; use crate::disk::{ - self, conv_part_err_to_int, has_part_err, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, - CHECK_PART_SUCCESS, + self, conv_part_err_to_int, has_part_err, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, + CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, }; use crate::erasure_coding; use crate::erasure_coding::bitrot_verify; @@ -17,8 +16,8 @@ use crate::{ config::{storageclass, GLOBAL_StorageClass}, disk::{ endpoint::Endpoint, error::DiskError, format::FormatV3, new_disk, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo, - DiskInfoOptions, DiskOption, DiskStore, FileInfoVersions, ReadMultipleReq, ReadMultipleResp, ReadOptions, - UpdateMetadataOpts, RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET, + DiskInfoOptions, DiskOption, DiskStore, FileInfoVersions, ReadMultipleReq, ReadMultipleResp, + ReadOptions, UpdateMetadataOpts, RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET, }, error::{to_object_err, StorageError}, global::{ @@ -58,10 +57,10 @@ use md5::{Digest as Md5Digest, Md5}; use rand::{seq::SliceRandom, Rng}; use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER; use rustfs_filemeta::{ - file_info_from_raw, - headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS}, - merge_file_meta_versions, FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, MetaCacheEntry, - MetadataResolutionParams, ObjectPartInfo, RawFileInfo, + file_info_from_raw, headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS}, merge_file_meta_versions, FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, + MetaCacheEntry, MetadataResolutionParams, + ObjectPartInfo, + RawFileInfo, }; use rustfs_rio::{EtagResolvable, HashReader, TryGetIndex as _, WarpReader}; use rustfs_utils::{ @@ -80,7 +79,6 @@ use std::{ sync::Arc, time::Duration, }; -use sha2::digest::HashReader; use time::OffsetDateTime; use tokio::{ io::AsyncWrite, @@ -95,7 +93,6 @@ use tracing::error; use tracing::{debug, info, warn}; use uuid::Uuid; use workers::workers::Workers; -use crate::disk::fs::SLASH_SEPARATOR; pub const DEFAULT_READ_BUFFER_SIZE: usize = 1024 * 1024; @@ -404,11 +401,7 @@ impl SetDisks { } } - if max >= write_quorum { - data_dir - } else { - None - } + if max >= write_quorum { data_dir } else { None } } #[allow(dead_code)] @@ -739,11 +732,7 @@ impl SetDisks { fn common_time(times: &[Option], quorum: usize) -> Option { let (time, count) = Self::common_time_and_occurrence(times); - if count >= quorum { - time - } else { - None - } + if count >= quorum { time } else { None } } fn common_time_and_occurrence(times: &[Option]) -> (Option, usize) { @@ -784,11 +773,7 @@ impl SetDisks { fn common_etag(etags: &[Option], quorum: usize) -> Option { let (etag, count) = Self::common_etags(etags); - if count >= quorum { - etag - } else { - None - } + if count >= quorum { etag } else { None } } fn common_etags(etags: &[Option]) -> (Option, usize) { @@ -4069,7 +4054,7 @@ impl StorageAPI for SetDisks { async fn local_storage_info(&self) -> madmin::StorageInfo { let disks = self.get_disks_internal().await; - let mut local_disks: Vec>> = Vec::new(); + let mut local_disks: Vec>> = Vec::new(); let mut local_endpoints = Vec::new(); for (i, ep) in self.set_endpoints.iter().enumerate() { diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 8cbac345..246ff0e5 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -18,7 +18,6 @@ use std::fmt::Debug; use std::io::Cursor; use std::str::FromStr as _; use std::sync::Arc; -use sha2::digest::HashReader; use time::OffsetDateTime; use tokio::io::{AsyncRead, AsyncReadExt}; use tracing::warn; @@ -904,7 +903,7 @@ pub trait StorageAPI: ObjectIO { opts: &HealOpts, ) -> Result<(HealResultItem, Option)>; async fn heal_objects(&self, bucket: &str, prefix: &str, opts: &HealOpts, hs: Arc, is_meta: bool) - -> Result<()>; + -> Result<()>; async fn get_pool_and_set(&self, id: &str) -> Result<(Option, Option, Option)>; async fn check_abandoned_parts(&self, bucket: &str, object: &str, opts: &HealOpts) -> Result<()>; } diff --git a/ecstore/src/store_list_objects.rs b/ecstore/src/store_list_objects.rs index 4a35302a..378fdd99 100644 --- a/ecstore/src/store_list_objects.rs +++ b/ecstore/src/store_list_objects.rs @@ -25,7 +25,6 @@ use tokio::sync::broadcast::{self, Receiver as B_Receiver}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tracing::{error, warn}; use uuid::Uuid; -use crate::disk::fs::SLASH_SEPARATOR; const MAX_OBJECT_LIST: i32 = 1000; // const MAX_DELETE_LIST: i32 = 1000; @@ -838,11 +837,7 @@ impl ECStore { if fiter(&fi) { let item = ObjectInfoOrErr { item: Some(ObjectInfo::from_file_info(&fi, &bucket, &fi.name, { - if let Some(v) = &vcf { - v.versioned(&fi.name) - } else { - false - } + if let Some(v) = &vcf { v.versioned(&fi.name) } else { false } })), err: None, }; @@ -854,11 +849,7 @@ impl ECStore { } else { let item = ObjectInfoOrErr { item: Some(ObjectInfo::from_file_info(&fi, &bucket, &fi.name, { - if let Some(v) = &vcf { - v.versioned(&fi.name) - } else { - false - } + if let Some(v) = &vcf { v.versioned(&fi.name) } else { false } })), err: None, }; @@ -894,11 +885,7 @@ impl ECStore { if fiter(fi) { let item = ObjectInfoOrErr { item: Some(ObjectInfo::from_file_info(fi, &bucket, &fi.name, { - if let Some(v) = &vcf { - v.versioned(&fi.name) - } else { - false - } + if let Some(v) = &vcf { v.versioned(&fi.name) } else { false } })), err: None, }; @@ -910,11 +897,7 @@ impl ECStore { } else { let item = ObjectInfoOrErr { item: Some(ObjectInfo::from_file_info(fi, &bucket, &fi.name, { - if let Some(v) = &vcf { - v.versioned(&fi.name) - } else { - false - } + if let Some(v) = &vcf { v.versioned(&fi.name) } else { false } })), err: None, };