From 26f84a9696acdaa2e2a96a6127722a2c58dfff96 Mon Sep 17 00:00:00 2001 From: houseme Date: Fri, 27 Jun 2025 00:28:28 +0800 Subject: [PATCH] Feature/event (#513) * improve code for notify * fix * cargo fmt * improve code and create `DEFAULT_DELIMITER` * fix * fix * improve code for notify * fmt * Update crates/notify/src/registry.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update crates/notify/src/factory.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix cllipy * fix --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- Cargo.lock | 1 - crates/config/Cargo.toml | 2 +- crates/notify/Cargo.toml | 1 - crates/utils/Cargo.toml | 29 ++++----- crates/utils/src/lib.rs | 5 ++ .../utils/src/notify/mod.rs | 12 ++++ rustfs/src/storage/ecfs.rs | 63 ++++++++++--------- rustfs/src/storage/mod.rs | 1 - 8 files changed, 65 insertions(+), 49 deletions(-) rename rustfs/src/storage/global.rs => crates/utils/src/notify/mod.rs (74%) diff --git a/Cargo.lock b/Cargo.lock index 546350f9..2f50fe8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8333,7 +8333,6 @@ dependencies = [ "async-trait", "axum", "chrono", - "const-str", "dashmap 6.1.0", "ecstore", "form_urlencoded", diff --git a/crates/config/Cargo.toml b/crates/config/Cargo.toml index 3069ba94..1e9556b8 100644 --- a/crates/config/Cargo.toml +++ b/crates/config/Cargo.toml @@ -18,6 +18,6 @@ workspace = true [features] default = [] constants = ["dep:const-str"] -notify = [] +notify = ["dep:const-str"] observability = [] diff --git a/crates/notify/Cargo.toml b/crates/notify/Cargo.toml index cbd02707..ac3e8464 100644 --- a/crates/notify/Cargo.toml +++ b/crates/notify/Cargo.toml @@ -11,7 +11,6 @@ rustfs-config = { workspace = true, features = ["notify"] } rustfs-utils = { workspace = true, features = ["path", "sys"] } async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } -const-str = { workspace = true } dashmap = { workspace = true } ecstore = { workspace = true } form_urlencoded = { workspace = true } diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index 08339cbe..96f9b475 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -7,17 +7,17 @@ rust-version.workspace = true version.workspace = true [dependencies] -base64-simd= { workspace = true , optional = true} +base64-simd = { workspace = true, optional = true } blake3 = { workspace = true, optional = true } crc32fast.workspace = true -hex-simd= { workspace = true , optional = true} +hex-simd = { workspace = true, optional = true } highway = { workspace = true, optional = true } -lazy_static= { workspace = true , optional = true} +lazy_static = { workspace = true, optional = true } local-ip-address = { workspace = true, optional = true } md-5 = { workspace = true, optional = true } -netif= { workspace = true , optional = true} +netif = { workspace = true, optional = true } nix = { workspace = true, optional = true } -regex= { workspace = true, optional = true } +regex = { workspace = true, optional = true } rustfs-config = { workspace = true, features = ["constants"] } rustls = { workspace = true, optional = true } rustls-pemfile = { workspace = true, optional = true } @@ -27,7 +27,7 @@ siphasher = { workspace = true, optional = true } tempfile = { workspace = true, optional = true } tokio = { workspace = true, optional = true, features = ["io-util", "macros"] } tracing = { workspace = true } -url = { workspace = true , optional = true} +url = { workspace = true, optional = true } flate2 = { workspace = true, optional = true } brotli = { workspace = true, optional = true } zstd = { workspace = true, optional = true } @@ -38,13 +38,13 @@ futures = { workspace = true, optional = true } transform-stream = { workspace = true, optional = true } bytes = { workspace = true, optional = true } sysinfo = { workspace = true, optional = true } -hyper.workspace = true -hyper-util.workspace = true +hyper-util = { workspace = true, optional = true } common.workspace = true -sha1 = { workspace = true } +sha1 = { workspace = true, optional = true } sha2 = { workspace = true, optional = true } -hmac.workspace = true -s3s.workspace = true +hmac = { workspace = true, optional = true } +s3s = { workspace = true, optional = true } +hyper = { workspace = true, optional = true } [dev-dependencies] tempfile = { workspace = true } @@ -60,14 +60,15 @@ workspace = true default = ["ip"] # features that are enabled by default ip = ["dep:local-ip-address"] # ip characteristics and their dependencies tls = ["dep:rustls", "dep:rustls-pemfile", "dep:rustls-pki-types"] # tls characteristics and their dependencies -net = ["ip", "dep:url", "dep:netif", "dep:lazy_static", "dep:futures", "dep:transform-stream", "dep:bytes"] # empty network features +net = ["ip", "dep:url", "dep:netif", "dep:lazy_static", "dep:futures", "dep:transform-stream", "dep:bytes", "dep:s3s", "dep:hyper", "dep:hyper-util"] # empty network features io = ["dep:tokio"] path = [] +notify = ["dep:hyper", "dep:s3s"] # file system notification features compress = ["dep:flate2", "dep:brotli", "dep:snap", "dep:lz4", "dep:zstd"] string = ["dep:regex", "dep:lazy_static", "dep:rand"] -crypto = ["dep:base64-simd","dep:hex-simd"] +crypto = ["dep:base64-simd", "dep:hex-simd", "dep:hmac", "dep:hyper"] hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:siphasher"] os = ["dep:nix", "dep:tempfile", "winapi"] # operating system utilities integration = [] # integration test features sys = ["dep:sysinfo"] # system information features -full = ["ip", "tls", "net", "io", "hash", "os", "integration", "path", "crypto", "string", "compress", "sys"] # all features +full = ["ip", "tls", "net", "io", "hash", "os", "integration", "path", "crypto", "string", "compress", "sys", "notify"] # all features diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 19d1a9b9..fd385ff7 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -51,5 +51,10 @@ pub use crypto::*; #[cfg(feature = "compress")] pub use compress::*; +#[cfg(feature = "notify")] +mod notify; #[cfg(feature = "sys")] pub mod sys; + +#[cfg(feature = "notify")] +pub use notify::*; diff --git a/rustfs/src/storage/global.rs b/crates/utils/src/notify/mod.rs similarity index 74% rename from rustfs/src/storage/global.rs rename to crates/utils/src/notify/mod.rs index a6d7ad58..7fb5771d 100644 --- a/rustfs/src/storage/global.rs +++ b/crates/utils/src/notify/mod.rs @@ -14,6 +14,18 @@ pub fn extract_req_params(req: &S3Request) -> HashMap { params } +/// Extract request parameters from hyper::HeaderMap, mainly header information. +/// This function is useful when you have a raw HTTP request and need to extract parameters. +pub fn extract_req_params_header(head: &HeaderMap) -> HashMap { + let mut params = HashMap::new(); + for (key, value) in head.iter() { + if let Ok(val_str) = value.to_str() { + params.insert(key.as_str().to_string(), val_str.to_string()); + } + } + params +} + /// Extract response elements from S3Response, mainly header information. #[allow(dead_code)] pub fn extract_resp_elements(resp: &S3Response) -> HashMap { diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 2e0d8b67..45d45a95 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -16,9 +16,11 @@ use bytes::Bytes; use chrono::DateTime; use chrono::Utc; use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder; -use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder; use datafusion::arrow::json::writer::JsonArray; +use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder; +// use ecstore::store_api::RESERVED_METADATA_PREFIX; +use ecstore::bucket::lifecycle::bucket_lifecycle_ops::validate_transition_tier; use ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG; use ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG; use ecstore::bucket::metadata::BUCKET_POLICY_CONFIG; @@ -33,13 +35,13 @@ use ecstore::bucket::tagging::decode_tags; use ecstore::bucket::tagging::encode_tags; use ecstore::bucket::utils::serialize; use ecstore::bucket::versioning_sys::BucketVersioningSys; -use ecstore::cmd::bucket_replication::ReplicationStatusType; -use ecstore::cmd::bucket_replication::ReplicationType; use ecstore::cmd::bucket_replication::get_must_replicate_options; use ecstore::cmd::bucket_replication::must_replicate; use ecstore::cmd::bucket_replication::schedule_replication; -use ecstore::compress::MIN_COMPRESSIBLE_SIZE; +use ecstore::cmd::bucket_replication::ReplicationStatusType; +use ecstore::cmd::bucket_replication::ReplicationType; use ecstore::compress::is_compressible; +use ecstore::compress::MIN_COMPRESSIBLE_SIZE; use ecstore::error::StorageError; use ecstore::new_object_layer_fn; use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE; @@ -54,17 +56,15 @@ use ecstore::store_api::ObjectOptions; use ecstore::store_api::ObjectToDelete; use ecstore::store_api::PutObjReader; use ecstore::store_api::StorageAPI; -// use ecstore::store_api::RESERVED_METADATA_PREFIX; -use ecstore::bucket::lifecycle::bucket_lifecycle_ops::validate_transition_tier; use futures::StreamExt; use http::HeaderMap; use lazy_static::lazy_static; use policy::auth; +use policy::policy::action::Action; +use policy::policy::action::S3Action; use policy::policy::BucketPolicy; use policy::policy::BucketPolicyArgs; use policy::policy::Validator; -use policy::policy::action::Action; -use policy::policy::action::S3Action; use query::instance::make_rustfsms; use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER; use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING}; @@ -73,23 +73,23 @@ use rustfs_rio::EtagReader; use rustfs_rio::HashReader; use rustfs_rio::Reader; use rustfs_rio::WarpReader; -use rustfs_utils::CompressionAlgorithm; use rustfs_utils::path::path_join_buf; +use rustfs_utils::CompressionAlgorithm; use rustfs_zip::CompressionFormat; -use s3s::S3; +use s3s::dto::*; +use s3s::s3_error; use s3s::S3Error; use s3s::S3ErrorCode; use s3s::S3Result; -use s3s::dto::*; -use s3s::s3_error; +use s3s::S3; use s3s::{S3Request, S3Response}; use std::collections::HashMap; use std::fmt::Debug; use std::path::Path; use std::str::FromStr; use std::sync::Arc; -use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; +use time::OffsetDateTime; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_tar::Archive; @@ -102,6 +102,7 @@ use tracing::warn; use uuid::Uuid; use ecstore::bucket::lifecycle::lifecycle::Lifecycle; +use rustfs_notify::EventName; macro_rules! try_ { ($result:expr) => { @@ -224,25 +225,25 @@ impl FS { // // store.put_object(bucket, object, data, opts); - // let output = PutObjectOutput { - // e_tag, - // ..Default::default() - // }; + let output = PutObjectOutput { + e_tag, + ..Default::default() + }; - // let event_args = rustfs_notify::event::EventArgs { - // event_name: EventName::ObjectCreatedPut, // 或者其他相应的事件类型 - // bucket_name: bucket.clone(), - // object: _obj_info.clone(), // clone() 或传递所需字段 - // req_params: crate::storage::global::extract_req_params(&req), // 假设有一个辅助函数来提取请求参数 - // resp_elements: crate::storage::global::extract_resp_elements(&output), // 假设有一个辅助函数来提取响应元素 - // host: crate::storage::global::get_request_host(&req.headers), // 假设的辅助函数 - // user_agent: crate::storage::global::get_request_user_agent(&req.headers), // 假设的辅助函数 - // }; - // - // // 异步调用,不会阻塞当前请求的响应 - // tokio::spawn(async move { - // rustfs_notify::notifier::GLOBAL_NOTIFIER.notify(event_args).await; - // }); + let event_args = rustfs_notify::event::EventArgs { + event_name: EventName::ObjectCreatedPut, // 或者其他相应的事件类型 + bucket_name: bucket.clone(), + object: _obj_info.clone(), // clone() 或传递所需字段 + req_params: rustfs_utils::extract_req_params_header(&req.headers), // 假设有一个辅助函数来提取请求参数 + resp_elements: rustfs_utils::extract_resp_elements(&output), // 假设有一个辅助函数来提取响应元素 + host: rustfs_utils::get_request_host(&req.headers), // 假设的辅助函数 + user_agent: rustfs_utils::get_request_user_agent(&req.headers), // 假设的辅助函数 + }; + + // 异步调用,不会阻塞当前请求的响应 + tokio::spawn(async move { + rustfs_notify::global::notifier_instance().notify(event_args).await; + }); } } diff --git a/rustfs/src/storage/mod.rs b/rustfs/src/storage/mod.rs index 21af29ec..90261803 100644 --- a/rustfs/src/storage/mod.rs +++ b/rustfs/src/storage/mod.rs @@ -1,5 +1,4 @@ pub mod access; pub mod ecfs; // pub mod error; -mod global; pub mod options;