Feature/event (#513)

* improve code for notify

* fix

* cargo fmt

* improve code and create `DEFAULT_DELIMITER`

* fix

* fix

* improve code for notify

* fmt

* Update crates/notify/src/registry.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update crates/notify/src/factory.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix cllipy

* fix

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
houseme
2025-06-27 00:28:28 +08:00
committed by GitHub
parent 831cb0b6d9
commit 26f84a9696
8 changed files with 65 additions and 49 deletions

1
Cargo.lock generated
View File

@@ -8333,7 +8333,6 @@ dependencies = [
"async-trait",
"axum",
"chrono",
"const-str",
"dashmap 6.1.0",
"ecstore",
"form_urlencoded",

View File

@@ -18,6 +18,6 @@ workspace = true
[features]
default = []
constants = ["dep:const-str"]
notify = []
notify = ["dep:const-str"]
observability = []

View File

@@ -11,7 +11,6 @@ rustfs-config = { workspace = true, features = ["notify"] }
rustfs-utils = { workspace = true, features = ["path", "sys"] }
async-trait = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
const-str = { workspace = true }
dashmap = { workspace = true }
ecstore = { workspace = true }
form_urlencoded = { workspace = true }

View File

@@ -7,17 +7,17 @@ rust-version.workspace = true
version.workspace = true
[dependencies]
base64-simd= { workspace = true , optional = true}
base64-simd = { workspace = true, optional = true }
blake3 = { workspace = true, optional = true }
crc32fast.workspace = true
hex-simd= { workspace = true , optional = true}
hex-simd = { workspace = true, optional = true }
highway = { workspace = true, optional = true }
lazy_static= { workspace = true , optional = true}
lazy_static = { workspace = true, optional = true }
local-ip-address = { workspace = true, optional = true }
md-5 = { workspace = true, optional = true }
netif= { workspace = true , optional = true}
netif = { workspace = true, optional = true }
nix = { workspace = true, optional = true }
regex= { workspace = true, optional = true }
regex = { workspace = true, optional = true }
rustfs-config = { workspace = true, features = ["constants"] }
rustls = { workspace = true, optional = true }
rustls-pemfile = { workspace = true, optional = true }
@@ -27,7 +27,7 @@ siphasher = { workspace = true, optional = true }
tempfile = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, features = ["io-util", "macros"] }
tracing = { workspace = true }
url = { workspace = true , optional = true}
url = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
brotli = { workspace = true, optional = true }
zstd = { workspace = true, optional = true }
@@ -38,13 +38,13 @@ futures = { workspace = true, optional = true }
transform-stream = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
sysinfo = { workspace = true, optional = true }
hyper.workspace = true
hyper-util.workspace = true
hyper-util = { workspace = true, optional = true }
common.workspace = true
sha1 = { workspace = true }
sha1 = { workspace = true, optional = true }
sha2 = { workspace = true, optional = true }
hmac.workspace = true
s3s.workspace = true
hmac = { workspace = true, optional = true }
s3s = { workspace = true, optional = true }
hyper = { workspace = true, optional = true }
[dev-dependencies]
tempfile = { workspace = true }
@@ -60,14 +60,15 @@ workspace = true
default = ["ip"] # features that are enabled by default
ip = ["dep:local-ip-address"] # ip characteristics and their dependencies
tls = ["dep:rustls", "dep:rustls-pemfile", "dep:rustls-pki-types"] # tls characteristics and their dependencies
net = ["ip", "dep:url", "dep:netif", "dep:lazy_static", "dep:futures", "dep:transform-stream", "dep:bytes"] # empty network features
net = ["ip", "dep:url", "dep:netif", "dep:lazy_static", "dep:futures", "dep:transform-stream", "dep:bytes", "dep:s3s", "dep:hyper", "dep:hyper-util"] # empty network features
io = ["dep:tokio"]
path = []
notify = ["dep:hyper", "dep:s3s"] # file system notification features
compress = ["dep:flate2", "dep:brotli", "dep:snap", "dep:lz4", "dep:zstd"]
string = ["dep:regex", "dep:lazy_static", "dep:rand"]
crypto = ["dep:base64-simd","dep:hex-simd"]
crypto = ["dep:base64-simd", "dep:hex-simd", "dep:hmac", "dep:hyper"]
hash = ["dep:highway", "dep:md-5", "dep:sha2", "dep:blake3", "dep:serde", "dep:siphasher"]
os = ["dep:nix", "dep:tempfile", "winapi"] # operating system utilities
integration = [] # integration test features
sys = ["dep:sysinfo"] # system information features
full = ["ip", "tls", "net", "io", "hash", "os", "integration", "path", "crypto", "string", "compress", "sys"] # all features
full = ["ip", "tls", "net", "io", "hash", "os", "integration", "path", "crypto", "string", "compress", "sys", "notify"] # all features

View File

@@ -51,5 +51,10 @@ pub use crypto::*;
#[cfg(feature = "compress")]
pub use compress::*;
#[cfg(feature = "notify")]
mod notify;
#[cfg(feature = "sys")]
pub mod sys;
#[cfg(feature = "notify")]
pub use notify::*;

View File

@@ -14,6 +14,18 @@ pub fn extract_req_params<T>(req: &S3Request<T>) -> HashMap<String, String> {
params
}
/// Extract request parameters from hyper::HeaderMap, mainly header information.
/// This function is useful when you have a raw HTTP request and need to extract parameters.
pub fn extract_req_params_header(head: &HeaderMap) -> HashMap<String, String> {
let mut params = HashMap::new();
for (key, value) in head.iter() {
if let Ok(val_str) = value.to_str() {
params.insert(key.as_str().to_string(), val_str.to_string());
}
}
params
}
/// Extract response elements from S3Response, mainly header information.
#[allow(dead_code)]
pub fn extract_resp_elements<T>(resp: &S3Response<T>) -> HashMap<String, String> {

View File

@@ -16,9 +16,11 @@ use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
// use ecstore::store_api::RESERVED_METADATA_PREFIX;
use ecstore::bucket::lifecycle::bucket_lifecycle_ops::validate_transition_tier;
use ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG;
use ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG;
use ecstore::bucket::metadata::BUCKET_POLICY_CONFIG;
@@ -33,13 +35,13 @@ use ecstore::bucket::tagging::decode_tags;
use ecstore::bucket::tagging::encode_tags;
use ecstore::bucket::utils::serialize;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::cmd::bucket_replication::ReplicationStatusType;
use ecstore::cmd::bucket_replication::ReplicationType;
use ecstore::cmd::bucket_replication::get_must_replicate_options;
use ecstore::cmd::bucket_replication::must_replicate;
use ecstore::cmd::bucket_replication::schedule_replication;
use ecstore::compress::MIN_COMPRESSIBLE_SIZE;
use ecstore::cmd::bucket_replication::ReplicationStatusType;
use ecstore::cmd::bucket_replication::ReplicationType;
use ecstore::compress::is_compressible;
use ecstore::compress::MIN_COMPRESSIBLE_SIZE;
use ecstore::error::StorageError;
use ecstore::new_object_layer_fn;
use ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE;
@@ -54,17 +56,15 @@ use ecstore::store_api::ObjectOptions;
use ecstore::store_api::ObjectToDelete;
use ecstore::store_api::PutObjReader;
use ecstore::store_api::StorageAPI;
// use ecstore::store_api::RESERVED_METADATA_PREFIX;
use ecstore::bucket::lifecycle::bucket_lifecycle_ops::validate_transition_tier;
use futures::StreamExt;
use http::HeaderMap;
use lazy_static::lazy_static;
use policy::auth;
use policy::policy::action::Action;
use policy::policy::action::S3Action;
use policy::policy::BucketPolicy;
use policy::policy::BucketPolicyArgs;
use policy::policy::Validator;
use policy::policy::action::Action;
use policy::policy::action::S3Action;
use query::instance::make_rustfsms;
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
@@ -73,23 +73,23 @@ use rustfs_rio::EtagReader;
use rustfs_rio::HashReader;
use rustfs_rio::Reader;
use rustfs_rio::WarpReader;
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::path::path_join_buf;
use rustfs_utils::CompressionAlgorithm;
use rustfs_zip::CompressionFormat;
use s3s::S3;
use s3s::dto::*;
use s3s::s3_error;
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::S3Result;
use s3s::dto::*;
use s3s::s3_error;
use s3s::S3;
use s3s::{S3Request, S3Response};
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_tar::Archive;
@@ -102,6 +102,7 @@ use tracing::warn;
use uuid::Uuid;
use ecstore::bucket::lifecycle::lifecycle::Lifecycle;
use rustfs_notify::EventName;
macro_rules! try_ {
($result:expr) => {
@@ -224,25 +225,25 @@ impl FS {
// // store.put_object(bucket, object, data, opts);
// let output = PutObjectOutput {
// e_tag,
// ..Default::default()
// };
let output = PutObjectOutput {
e_tag,
..Default::default()
};
// let event_args = rustfs_notify::event::EventArgs {
// event_name: EventName::ObjectCreatedPut, // 或者其他相应的事件类型
// bucket_name: bucket.clone(),
// object: _obj_info.clone(), // clone() 或传递所需字段
// req_params: crate::storage::global::extract_req_params(&req), // 假设有一个辅助函数来提取请求参数
// resp_elements: crate::storage::global::extract_resp_elements(&output), // 假设有一个辅助函数来提取响应元素
// host: crate::storage::global::get_request_host(&req.headers), // 假设的辅助函数
// user_agent: crate::storage::global::get_request_user_agent(&req.headers), // 假设的辅助函数
// };
//
// // 异步调用,不会阻塞当前请求的响应
// tokio::spawn(async move {
// rustfs_notify::notifier::GLOBAL_NOTIFIER.notify(event_args).await;
// });
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPut, // 或者其他相应的事件类型
bucket_name: bucket.clone(),
object: _obj_info.clone(), // clone() 或传递所需字段
req_params: rustfs_utils::extract_req_params_header(&req.headers), // 假设有一个辅助函数来提取请求参数
resp_elements: rustfs_utils::extract_resp_elements(&output), // 假设有一个辅助函数来提取响应元素
host: rustfs_utils::get_request_host(&req.headers), // 假设的辅助函数
user_agent: rustfs_utils::get_request_user_agent(&req.headers), // 假设的辅助函数
};
// 异步调用,不会阻塞当前请求的响应
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
});
}
}

View File

@@ -1,5 +1,4 @@
pub mod access;
pub mod ecfs;
// pub mod error;
mod global;
pub mod options;