From 7bb7f9e309ee3c7c5f99c35173a4c50134812d9c Mon Sep 17 00:00:00 2001 From: houseme Date: Mon, 23 Jun 2025 12:47:58 +0800 Subject: [PATCH] improve code notify --- Cargo.lock | 1 + Cargo.toml | 1 + crates/notify/Cargo.toml | 1 + crates/notify/examples/full_demo.rs | 7 +- crates/notify/examples/full_demo_one.rs | 7 +- crates/notify/src/error.rs | 15 +- crates/notify/src/event.rs | 27 ++-- crates/notify/src/global.rs | 6 +- crates/notify/src/integration.rs | 199 ++++++++++-------------- crates/notify/src/notifier.rs | 46 ++++-- crates/notify/src/rules/config.rs | 7 +- crates/notify/src/rules/rules_map.rs | 96 ++++++++++-- crates/notify/src/store.rs | 87 +++++------ crates/notify/src/target/mod.rs | 3 +- crates/notify/src/target/mqtt.rs | 83 +++------- crates/notify/src/target/webhook.rs | 14 +- crates/utils/src/sys/user_agent.rs | 2 +- rustfs/src/main.rs | 8 +- 18 files changed, 306 insertions(+), 304 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8b1df0b9..07fc4225 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8389,6 +8389,7 @@ dependencies = [ "axum", "chrono", "const-str", + "dashmap 6.1.0", "ecstore", "form_urlencoded", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 79e32b82..758adb84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,7 @@ clap = { version = "4.5.40", features = ["derive", "env"] } config = "0.15.11" const-str = { version = "0.6.2", features = ["std", "proc"] } crc32fast = "1.4.2" +dashmap = "6.1.0" datafusion = "46.0.1" derive_builder = "0.20.2" dioxus = { version = "0.6.3", features = ["router"] } diff --git a/crates/notify/Cargo.toml b/crates/notify/Cargo.toml index 37a3a526..9737075b 100644 --- a/crates/notify/Cargo.toml +++ b/crates/notify/Cargo.toml @@ -11,6 +11,7 @@ rustfs-utils = { workspace = true, features = ["path", "sys"] } async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } const-str = { workspace = true } +dashmap = { workspace = true } ecstore = { workspace = true } form_urlencoded = { workspace = true } once_cell = { workspace = true } diff --git a/crates/notify/examples/full_demo.rs b/crates/notify/examples/full_demo.rs index 225d0bd0..78fe2735 100644 --- a/crates/notify/examples/full_demo.rs +++ b/crates/notify/examples/full_demo.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use ecstore::config::{Config, ENABLE_KEY, ENABLE_ON, KV, KVS}; use rustfs_notify::arn::TargetID; use rustfs_notify::factory::{ @@ -159,10 +160,8 @@ async fn main() -> Result<(), NotificationError> { system.load_bucket_notification_config("my-bucket", &bucket_config).await?; info!("\n---> Sending an event..."); - let event = Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut); - system - .send_event("my-bucket", "s3:ObjectCreated:Put", "document.pdf", event) - .await; + let event = Arc::new(Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut)); + system.send_event(event).await; info!("✅ Event sent. Only the Webhook target should receive it. Check logs for warnings about the missing MQTT target."); tokio::time::sleep(Duration::from_secs(2)).await; diff --git a/crates/notify/examples/full_demo_one.rs b/crates/notify/examples/full_demo_one.rs index 28d4b88a..6d4f291a 100644 --- a/crates/notify/examples/full_demo_one.rs +++ b/crates/notify/examples/full_demo_one.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use ecstore::config::{Config, ENABLE_KEY, ENABLE_ON, KV, KVS}; // Using Global Accessories use rustfs_notify::arn::TargetID; @@ -153,10 +154,8 @@ async fn main() -> Result<(), NotificationError> { // --- Send events --- info!("\n---> Sending an event..."); - let event = Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut); - system - .send_event("my-bucket", "s3:ObjectCreated:Put", "document.pdf", event) - .await; + let event = Arc::new(Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut)); + system.send_event(event).await; info!("✅ Event sent. Both Webhook and MQTT targets should receive it."); tokio::time::sleep(Duration::from_secs(2)).await; diff --git a/crates/notify/src/error.rs b/crates/notify/src/error.rs index a7a38e9d..6bd28032 100644 --- a/crates/notify/src/error.rs +++ b/crates/notify/src/error.rs @@ -1,5 +1,6 @@ use std::io; use thiserror::Error; +use crate::arn::TargetID; /// Error types for the store #[derive(Debug, Error)] @@ -96,8 +97,20 @@ pub enum NotificationError { #[error("Notification system has already been initialized")] AlreadyInitialized, - #[error("Io error: {0}")] + #[error("I/O error: {0}")] Io(std::io::Error), + + #[error("Failed to read configuration: {0}")] + ReadConfig(String), + + #[error("Failed to save configuration: {0}")] + SaveConfig(String), + + #[error("Target '{0}' not found")] + TargetNotFound(TargetID), + + #[error("Server not initialized")] + ServerNotInitialized, } impl From for TargetError { diff --git a/crates/notify/src/event.rs b/crates/notify/src/event.rs index 829c4f43..f4c06441 100644 --- a/crates/notify/src/event.rs +++ b/crates/notify/src/event.rs @@ -445,29 +445,20 @@ impl Event { }; let mut resp_elements = args.resp_elements.clone(); - resp_elements - .entry("x-amz-request-id".to_string()) - .or_insert_with(|| "".to_string()); - resp_elements - .entry("x-amz-id-2".to_string()) - .or_insert_with(|| "".to_string()); - // ... Filling of other response elements + initialize_response_elements(&mut resp_elements, &["x-amz-request-id", "x-amz-id-2"]); // URL encoding of object keys let key_name = form_urlencoded::byte_serialize(args.object.name.as_bytes()).collect::(); - - let principal_id = args.req_params.get("principalId").cloned().unwrap_or_default(); - let owner_identity = Identity { - principal_id: principal_id.clone(), - }; - let user_identity = Identity { principal_id }; + let principal_id = args.req_params.get("principalId").unwrap_or(&String::new()).to_string(); let mut s3_metadata = Metadata { schema_version: "1.0".to_string(), configuration_id: "Config".to_string(), // or from args bucket: Bucket { name: args.bucket_name.clone(), - owner_identity, + owner_identity: Identity { + principal_id: principal_id.clone(), + }, arn: format!("arn:aws:s3:::{}", args.bucket_name), }, object: Object { @@ -503,7 +494,7 @@ impl Event { aws_region: args.req_params.get("region").cloned().unwrap_or_default(), event_time: event_time.and_utc(), event_name: args.event_name, - user_identity, + user_identity: Identity { principal_id }, request_parameters: args.req_params, response_elements: resp_elements, s3: s3_metadata, @@ -516,6 +507,12 @@ impl Event { } } +fn initialize_response_elements(elements: &mut HashMap, keys: &[&str]) { + for key in keys { + elements.entry(key.to_string()).or_default(); + } +} + /// Represents a log of events for sending to targets #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EventLog { diff --git a/crates/notify/src/global.rs b/crates/notify/src/global.rs index f2b95439..ebae7b84 100644 --- a/crates/notify/src/global.rs +++ b/crates/notify/src/global.rs @@ -52,9 +52,7 @@ impl Notifier { } // Create an event and send it - let event = Event::new(args.clone()); - notification_sys - .send_event(&args.bucket_name, &args.event_name.as_str(), &args.object.name.clone(), event) - .await; + let event = Arc::new(Event::new(args)); + notification_sys.send_event(event).await; } } diff --git a/crates/notify/src/integration.rs b/crates/notify/src/integration.rs index 34bb9be6..d063bd6e 100644 --- a/crates/notify/src/integration.rs +++ b/crates/notify/src/integration.rs @@ -1,7 +1,7 @@ use crate::arn::TargetID; use crate::store::{Key, Store}; use crate::{ - error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, Event, + error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, Event, EventName, StoreError, Target, }; use ecstore::config::{Config, KVS}; @@ -173,6 +173,38 @@ impl NotificationSystem { self.notifier.target_list().read().await.keys() } + /// Checks if there are active subscribers for the given bucket and event name. + pub async fn has_subscriber(&self, bucket: &str, event_name: &EventName) -> bool { + self.notifier.has_subscriber(bucket, event_name).await + } + + async fn update_config_and_reload(&self, mut modifier: F) -> Result<(), NotificationError> + where + F: FnMut(&mut Config) -> bool, // The closure returns a boolean value indicating whether the configuration has been changed + { + let Some(store) = ecstore::global::new_object_layer_fn() else { + return Err(NotificationError::ServerNotInitialized); + }; + + let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone()) + .await + .map_err(|e| NotificationError::ReadConfig(e.to_string()))?; + + if !modifier(&mut new_config) { + // If the closure indication has not changed, return in advance + info!("Configuration not changed, skipping save and reload."); + return Ok(()); + } + + if let Err(e) = ecstore::config::com::save_server_config(store, &new_config).await { + error!("Failed to save config: {}", e); + return Err(NotificationError::SaveConfig(e.to_string())); + } + + info!("Configuration updated. Reloading system..."); + self.reload_config(new_config).await + } + /// Accurately remove a Target and its related resources through TargetID. /// /// This process includes: @@ -188,43 +220,23 @@ impl NotificationSystem { pub async fn remove_target(&self, target_id: &TargetID, target_type: &str) -> Result<(), NotificationError> { info!("Attempting to remove target: {}", target_id); - let Some(store) = ecstore::global::new_object_layer_fn() else { - return Err(NotificationError::Io(std::io::Error::new( - std::io::ErrorKind::Other, - "errServerNotInitialized", - ))); - }; - - let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone()) - .await - .map_err(|e| NotificationError::Configuration(format!("Failed to read notification config: {}", e)))?; - - let mut changed = false; - if let Some(targets_of_type) = new_config.0.get_mut(target_type) { - if targets_of_type.remove(&target_id.name).is_some() { - info!("Removed target {} from the configuration.", target_id); - changed = true; + self.update_config_and_reload(|config| { + let mut changed = false; + if let Some(targets_of_type) = config.0.get_mut(target_type) { + if targets_of_type.remove(&target_id.name).is_some() { + info!("Remove target from configuration {}", target_id); + changed = true; + } + if targets_of_type.is_empty() { + config.0.remove(target_type); + } } - if targets_of_type.is_empty() { - new_config.0.remove(target_type); + if !changed { + warn!("Target {} not found in configuration", target_id); } - } - - if !changed { - warn!("Target {} was not found in the configuration.", target_id); - return Ok(()); - } - - if let Err(e) = ecstore::config::com::save_server_config(store, &new_config).await { - error!("Failed to save config for target removal: {}", e); - return Err(NotificationError::Configuration(format!("Failed to save config: {}", e))); - } - - info!( - "Configuration updated and persisted for target {} removal. Reloading system...", - target_id - ); - self.reload_config(new_config).await + changed + }) + .await } /// Set or update a Target configuration. @@ -241,49 +253,15 @@ impl NotificationSystem { /// If the target configuration is invalid, it returns Err(NotificationError::Configuration). pub async fn set_target_config(&self, target_type: &str, target_name: &str, kvs: KVS) -> Result<(), NotificationError> { info!("Setting config for target {} of type {}", target_name, target_type); - // 1. Get the storage handle - let Some(store) = ecstore::global::new_object_layer_fn() else { - return Err(NotificationError::Io(std::io::Error::new( - std::io::ErrorKind::Other, - "errServerNotInitialized", - ))); - }; - - // 2. Read the latest configuration from storage - let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone()) - .await - .map_err(|e| NotificationError::Configuration(format!("Failed to read notification config: {}", e)))?; - - // 3. Modify the configuration copy - new_config - .0 - .entry(target_type.to_string()) - .or_default() - .insert(target_name.to_string(), kvs); - - // 4. Persist the new configuration - if let Err(e) = ecstore::config::com::save_server_config(store, &new_config).await { - error!("Failed to save notification config: {}", e); - return Err(NotificationError::Configuration(format!("Failed to save notification config: {}", e))); - } - - // 5. After the persistence is successful, the system will be reloaded to apply changes. - match self.reload_config(new_config).await { - Ok(_) => { - info!( - "Target {} of type {} configuration updated and reloaded successfully", - target_name, target_type - ); - Ok(()) - } - Err(e) => { - error!("Failed to reload config for target {} of type {}: {}", target_name, target_type, e); - Err(NotificationError::Configuration(format!( - "Configuration saved, but failed to reload: {}", - e - ))) - } - } + self.update_config_and_reload(|config| { + config + .0 + .entry(target_type.to_string()) + .or_default() + .insert(target_name.to_string(), kvs.clone()); + true // The configuration is always modified + }) + .await } /// Removes all notification configurations for a bucket. @@ -305,42 +283,22 @@ impl NotificationSystem { /// If the target configuration does not exist, it returns Ok(()) without making any changes. pub async fn remove_target_config(&self, target_type: &str, target_name: &str) -> Result<(), NotificationError> { info!("Removing config for target {} of type {}", target_name, target_type); - let Some(store) = ecstore::global::new_object_layer_fn() else { - return Err(NotificationError::Io(std::io::Error::new( - std::io::ErrorKind::Other, - "errServerNotInitialized", - ))); - }; - - let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone()) - .await - .map_err(|e| NotificationError::Configuration(format!("Failed to read notification config: {}", e)))?; - - let mut changed = false; - if let Some(targets) = new_config.0.get_mut(target_type) { - if targets.remove(target_name).is_some() { - changed = true; + self.update_config_and_reload(|config| { + let mut changed = false; + if let Some(targets) = config.0.get_mut(target_type) { + if targets.remove(target_name).is_some() { + changed = true; + } + if targets.is_empty() { + config.0.remove(target_type); + } } - if targets.is_empty() { - new_config.0.remove(target_type); + if !changed { + info!("Target {} of type {} not found, no changes made.", target_name, target_type); } - } - - if !changed { - info!("Target {} of type {} not found, no changes made.", target_name, target_type); - return Ok(()); - } - - if let Err(e) = ecstore::config::com::save_server_config(store, &new_config).await { - error!("Failed to save config for target removal: {}", e); - return Err(NotificationError::Configuration(format!("Failed to save config: {}", e))); - } - - info!( - "Configuration updated and persisted for target {} removal. Reloading system...", - target_name - ); - self.reload_config(new_config).await + changed + }) + .await } /// Enhanced event stream startup function, including monitoring and concurrency control @@ -355,6 +313,12 @@ impl NotificationSystem { stream::start_event_stream_with_batching(store, target, metrics, semaphore) } + /// Update configuration + async fn update_config(&self, new_config: Config) { + let mut config = self.config.write().await; + *config = new_config; + } + /// Reloads the configuration pub async fn reload_config(&self, new_config: Config) -> Result<(), NotificationError> { info!("Reload notification configuration starts"); @@ -367,10 +331,7 @@ impl NotificationSystem { } // Update the config - { - let mut config = self.config.write().await; - *config = new_config.clone(); - } + self.update_config(new_config.clone()).await; // Create a new target from configuration let targets: Vec> = self @@ -459,8 +420,8 @@ impl NotificationSystem { } /// Sends an event - pub async fn send_event(&self, bucket_name: &str, event_name: &str, object_key: &str, event: Event) { - self.notifier.send(bucket_name, event_name, object_key, event).await; + pub async fn send_event(&self, event: Arc) { + self.notifier.send(event).await; } /// Obtain system status information diff --git a/crates/notify/src/notifier.rs b/crates/notify/src/notifier.rs index a3827f68..3231e19f 100644 --- a/crates/notify/src/notifier.rs +++ b/crates/notify/src/notifier.rs @@ -1,5 +1,6 @@ use crate::arn::TargetID; use crate::{error::NotificationError, event::Event, rules::RulesMap, target::Target, EventName}; +use dashmap::DashMap; use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; use tracing::{debug, error, info, instrument, warn}; @@ -7,7 +8,7 @@ use tracing::{debug, error, info, instrument, warn}; /// Manages event notification to targets based on rules pub struct EventNotifier { target_list: Arc>, - bucket_rules_map: Arc>>, + bucket_rules_map: Arc>, } impl Default for EventNotifier { @@ -21,7 +22,7 @@ impl EventNotifier { pub fn new() -> Self { EventNotifier { target_list: Arc::new(RwLock::new(TargetList::new())), - bucket_rules_map: Arc::new(RwLock::new(HashMap::new())), + bucket_rules_map: Arc::new(DashMap::new()), } } @@ -40,8 +41,7 @@ impl EventNotifier { /// This method removes all rules associated with the specified bucket name. /// It will log a message indicating the removal of rules. pub async fn remove_rules_map(&self, bucket_name: &str) { - let mut rules_map = self.bucket_rules_map.write().await; - if rules_map.remove(bucket_name).is_some() { + if self.bucket_rules_map.remove(bucket_name).is_some() { info!("Removed all notification rules for bucket: {}", bucket_name); } } @@ -58,19 +58,17 @@ impl EventNotifier { /// Adds a rules map for a bucket pub async fn add_rules_map(&self, bucket_name: &str, rules_map: RulesMap) { - let mut bucket_rules_guard = self.bucket_rules_map.write().await; if rules_map.is_empty() { - bucket_rules_guard.remove(bucket_name); + self.bucket_rules_map.remove(bucket_name); } else { - bucket_rules_guard.insert(bucket_name.to_string(), rules_map); + self.bucket_rules_map.insert(bucket_name.to_string(), rules_map); } info!("Added rules for bucket: {}", bucket_name); } /// Removes notification rules for a bucket pub async fn remove_notification(&self, bucket_name: &str) { - let mut bucket_rules_guard = self.bucket_rules_map.write().await; - bucket_rules_guard.remove(bucket_name); + self.bucket_rules_map.remove(bucket_name); info!("Removed notification rules for bucket: {}", bucket_name); } @@ -83,12 +81,34 @@ impl EventNotifier { info!("Removed all targets and their streams"); } + /// Checks if there are active subscribers for the given bucket and event name. + /// + /// # Parameters + /// * `bucket_name` - bucket name. + /// * `event_name` - Event name. + /// + /// # Return value + /// Return `true` if at least one matching notification rule exists. + pub async fn has_subscriber(&self, bucket_name: &str, event_name: &EventName) -> bool { + // Rules to check if the bucket exists + if let Some(rules_map) = self.bucket_rules_map.get(bucket_name) { + // A composite event (such as ObjectCreatedAll) is expanded to multiple single events. + // We need to check whether any of these single events have the rules configured. + rules_map.has_subscriber(event_name) + } else { + // If no bucket is found, no subscribers + false + } + } + /// Sends an event to the appropriate targets based on the bucket rules #[instrument(skip(self, event))] - pub async fn send(&self, bucket_name: &str, event_name: &str, object_key: &str, event: Event) { - let bucket_rules_guard = self.bucket_rules_map.read().await; - if let Some(rules) = bucket_rules_guard.get(bucket_name) { - let target_ids = rules.match_rules(EventName::from(event_name), object_key); + pub async fn send(&self, event: Arc) { + let bucket_name = &event.s3.bucket.name; + let object_key = &event.s3.object.key; + let event_name = event.event_name; + if let Some(rules) = self.bucket_rules_map.get(bucket_name) { + let target_ids = rules.match_rules(event_name, object_key); if target_ids.is_empty() { debug!("No matching targets for event in bucket: {}", bucket_name); return; diff --git a/crates/notify/src/rules/config.rs b/crates/notify/src/rules/config.rs index 08ff8ed8..bdf61954 100644 --- a/crates/notify/src/rules/config.rs +++ b/crates/notify/src/rules/config.rs @@ -8,7 +8,6 @@ use crate::rules::NotificationConfiguration; use crate::EventName; use std::collections::HashMap; use std::io::Read; -// Assuming this is the XML config structure /// Configuration for bucket notifications. /// This struct now holds the parsed and validated rules in the new RulesMap format. @@ -98,11 +97,7 @@ impl BucketNotificationConfig { } // Expose the RulesMap for the notifier - pub fn get_rules_map(&self) -> &RulesMap { - &self.rules - } - - pub fn to_rules_map(&self) -> RulesMap { + pub fn get_rules_map(&self) -> RulesMap { self.rules.clone() } diff --git a/crates/notify/src/rules/rules_map.rs b/crates/notify/src/rules/rules_map.rs index 7ec1b3bb..86ac2172 100644 --- a/crates/notify/src/rules/rules_map.rs +++ b/crates/notify/src/rules/rules_map.rs @@ -9,32 +9,43 @@ use std::collections::HashMap; #[derive(Debug, Clone, Default)] pub struct RulesMap { map: HashMap, + /// A bitmask that represents the union of all event types in this map. + /// Used for quick checks in `has_subscriber`. + total_events_mask: u64, } impl RulesMap { + /// Create a new, empty RulesMap. pub fn new() -> Self { Default::default() } - /// Add rule configuration. - /// event_names: A set of event names。 - /// pattern: Object key pattern. - /// target_id: Notify the target. + /// Add a rule configuration to the map. /// - /// This method expands the composite event name. + /// This method handles composite event names (such as `s3:ObjectCreated:*`), expanding them as + /// Multiple specific event types and add rules for each event type. + /// + /// # Parameters + /// * `event_names` - List of event names associated with this rule. + /// * `pattern` - Matching pattern for object keys. If empty, the default is `*` (match all). + /// * `target_id` - The target ID of the notification. pub fn add_rule_config(&mut self, event_names: &[EventName], pattern: String, target_id: TargetID) { - let mut effective_pattern = pattern; - if effective_pattern.is_empty() { - effective_pattern = "*".to_string(); // Match all by default - } + let effective_pattern = if pattern.is_empty() { + "*".to_string() // Match all by default + } else { + pattern + }; for event_name_spec in event_names { + // Expand compound event types, for example ObjectCreatedAll -> [ObjectCreatedPut, ObjectCreatedPost, ...] for expanded_event_name in event_name_spec.expand() { // Make sure EventName::expand() returns Vec self.map .entry(expanded_event_name) .or_default() .add(effective_pattern.clone(), target_id.clone()); + // Update the total_events_mask to include this event type + self.total_events_mask |= expanded_event_name.mask(); } } } @@ -44,13 +55,17 @@ impl RulesMap { pub fn add_map(&mut self, other_map: &Self) { for (event_name, other_pattern_rules) in &other_map.map { let self_pattern_rules = self.map.entry(*event_name).or_default(); - // PatternRules::union 返回新的 PatternRules,我们需要修改现有的 + // PatternRules::union Returns the new PatternRules, we need to modify the existing ones let merged_rules = self_pattern_rules.union(other_pattern_rules); *self_pattern_rules = merged_rules; } + // Directly merge two masks. + self.total_events_mask |= other_map.total_events_mask; } /// Remove another rule defined in the RulesMap from the current RulesMap. + /// + /// After the rule is removed, `total_events_mask` is recalculated to ensure its accuracy. pub fn remove_map(&mut self, other_map: &Self) { let mut events_to_remove = Vec::new(); for (event_name, self_pattern_rules) in &mut self.map { @@ -64,10 +79,30 @@ impl RulesMap { for event_name in events_to_remove { self.map.remove(&event_name); } + // After removing the rule, recalculate total_events_mask. + self.recalculate_mask(); } - ///Rules matching the given event name and object key, returning all matching TargetIDs. + /// Checks whether any configured rules exist for a given event type. + /// + /// This method uses a bitmask for a quick check of O(1) complexity. + /// `event_name` can be a compound type, such as `ObjectCreatedAll`. + pub fn has_subscriber(&self, event_name: &EventName) -> bool { + // event_name.mask() will handle compound events correctly + (self.total_events_mask & event_name.mask()) != 0 + } + + /// Rules matching the given event and object keys and return all matching target IDs. + /// + /// # Notice + /// The `event_name` parameter should be a specific, non-compound event type. + /// Because this is taken from the `Event` object that actually occurs. pub fn match_rules(&self, event_name: EventName, object_key: &str) -> TargetIdSet { + // Use bitmask to quickly determine whether there is a matching rule + if (self.total_events_mask & event_name.mask()) == 0 { + return TargetIdSet::new(); // No matching rules + } + // First try to directly match the event name if let Some(pattern_rules) = self.map.get(&event_name) { let targets = pattern_rules.match_targets(object_key); @@ -89,6 +124,7 @@ impl RulesMap { .map_or_else(TargetIdSet::new, |pr| pr.match_targets(object_key)) } + /// Check if RulesMap is empty. pub fn is_empty(&self) -> bool { self.map.is_empty() } @@ -97,4 +133,42 @@ impl RulesMap { pub fn inner(&self) -> &HashMap { &self.map } + + /// A private helper function that recalculates `total_events_mask` based on the content of the current `map`. + /// Called after the removal operation to ensure the accuracy of the mask. + fn recalculate_mask(&mut self) { + let mut new_mask = 0u64; + for event_name in self.map.keys() { + new_mask |= event_name.mask(); + } + self.total_events_mask = new_mask; + } + + /// Remove rules and optimize performance + #[allow(dead_code)] + pub fn remove_rule(&mut self, event_name: &EventName, pattern: &str) { + if let Some(pattern_rules) = self.map.get_mut(event_name) { + pattern_rules.rules.remove(pattern); + if pattern_rules.is_empty() { + self.map.remove(event_name); + } + } + self.recalculate_mask(); // Delay calculation mask + } + + /// Batch Delete Rules + #[allow(dead_code)] + pub fn remove_rules(&mut self, event_names: &[EventName]) { + for event_name in event_names { + self.map.remove(event_name); + } + self.recalculate_mask(); // Unified calculation of mask after batch processing + } + + /// Update rules and optimize performance + #[allow(dead_code)] + pub fn update_rule(&mut self, event_name: EventName, pattern: String, target_id: TargetID) { + self.map.entry(event_name).or_default().add(pattern, target_id); + self.total_events_mask |= event_name.mask(); // Update only the relevant bitmask + } } diff --git a/crates/notify/src/store.rs b/crates/notify/src/store.rs index 1e7bc554..f2a19193 100644 --- a/crates/notify/src/store.rs +++ b/crates/notify/src/store.rs @@ -125,7 +125,7 @@ pub trait Store: Send + Sync { fn open(&self) -> Result<(), Self::Error>; /// Stores a single item - fn put(&self, item: T) -> Result; + fn put(&self, item: Arc) -> Result; /// Stores multiple items in a single batch fn put_multiple(&self, items: Vec) -> Result; @@ -195,11 +195,7 @@ impl QueueStore { /// Reads a file for the given key fn read_file(&self, key: &Key) -> Result, StoreError> { let path = self.file_path(key); - debug!( - "Reading file for key: {},path: {}", - key.to_string(), - path.display() - ); + debug!("Reading file for key: {},path: {}", key.to_string(), path.display()); let data = std::fs::read(&path).map_err(|e| { if e.kind() == std::io::ErrorKind::NotFound { StoreError::NotFound @@ -240,13 +236,11 @@ impl QueueStore { }; std::fs::write(&path, &data).map_err(StoreError::Io)?; - let modified = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_nanos() as i64; - let mut entries = self.entries.write().map_err(|_| { - StoreError::Internal("Failed to acquire write lock on entries".to_string()) - })?; + let modified = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as i64; + let mut entries = self + .entries + .write() + .map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?; entries.insert(key.to_string(), modified); debug!("Wrote event to store: {}", key.to_string()); Ok(()) @@ -265,18 +259,16 @@ where let entries = std::fs::read_dir(&self.directory).map_err(StoreError::Io)?; // Get the write lock to update the internal state - let mut entries_map = self.entries.write().map_err(|_| { - StoreError::Internal("Failed to acquire write lock on entries".to_string()) - })?; + let mut entries_map = self + .entries + .write() + .map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?; for entry in entries { let entry = entry.map_err(StoreError::Io)?; let metadata = entry.metadata().map_err(StoreError::Io)?; if metadata.is_file() { let modified = metadata.modified().map_err(StoreError::Io)?; - let unix_nano = modified - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_nanos() as i64; + let unix_nano = modified.duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as i64; let file_name = entry.file_name().to_string_lossy().to_string(); entries_map.insert(file_name, unix_nano); @@ -287,12 +279,13 @@ where Ok(()) } - fn put(&self, item: T) -> Result { + fn put(&self, item: Arc) -> Result { // Check storage limits { - let entries = self.entries.read().map_err(|_| { - StoreError::Internal("Failed to acquire read lock on entries".to_string()) - })?; + let entries = self + .entries + .read() + .map_err(|_| StoreError::Internal("Failed to acquire read lock on entries".to_string()))?; if entries.len() as u64 >= self.entry_limit { return Err(StoreError::LimitExceeded); @@ -307,8 +300,7 @@ where compress: true, }; - let data = - serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?; + let data = serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?; self.write_file(&key, &data)?; Ok(key) @@ -317,9 +309,10 @@ where fn put_multiple(&self, items: Vec) -> Result { // Check storage limits { - let entries = self.entries.read().map_err(|_| { - StoreError::Internal("Failed to acquire read lock on entries".to_string()) - })?; + let entries = self + .entries + .read() + .map_err(|_| StoreError::Internal("Failed to acquire read lock on entries".to_string()))?; if entries.len() as u64 >= self.entry_limit { return Err(StoreError::LimitExceeded); @@ -327,9 +320,7 @@ where } if items.is_empty() { // Or return an error, or a special key? - return Err(StoreError::Internal( - "Cannot put_multiple with empty items list".to_string(), - )); + return Err(StoreError::Internal("Cannot put_multiple with empty items list".to_string())); } let uuid = Uuid::new_v4(); let key = Key { @@ -348,8 +339,7 @@ where for item in items { // If items are Vec, and Event is large, this could be inefficient. // The current get_multiple deserializes one by one. - let item_data = - serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?; + let item_data = serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?; buffer.extend_from_slice(&item_data); // If using JSON array: buffer = serde_json::to_vec(&items)? } @@ -374,9 +364,7 @@ where debug!("Reading items from store for key: {}", key.to_string()); let data = self.read_file(key)?; if data.is_empty() { - return Err(StoreError::Deserialization( - "Cannot deserialize empty data".to_string(), - )); + return Err(StoreError::Deserialization("Cannot deserialize empty data".to_string())); } let mut items = Vec::with_capacity(key.item_count); @@ -395,10 +383,7 @@ where match deserializer.next() { Some(Ok(item)) => items.push(item), Some(Err(e)) => { - return Err(StoreError::Deserialization(format!( - "Failed to deserialize item in batch: {}", - e - ))); + return Err(StoreError::Deserialization(format!("Failed to deserialize item in batch: {}", e))); } None => { // Reached end of stream sooner than item_count @@ -435,7 +420,10 @@ where std::fs::remove_file(&path).map_err(|e| { if e.kind() == std::io::ErrorKind::NotFound { // If file not found, still try to remove from entries map in case of inconsistency - warn!("File not found for key {} during del, but proceeding to remove from entries map.", key.to_string()); + warn!( + "File not found for key {} during del, but proceeding to remove from entries map.", + key.to_string() + ); StoreError::NotFound } else { StoreError::Io(e) @@ -443,17 +431,15 @@ where })?; // Get the write lock to update the internal state - let mut entries = self.entries.write().map_err(|_| { - StoreError::Internal("Failed to acquire write lock on entries".to_string()) - })?; + let mut entries = self + .entries + .write() + .map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?; if entries.remove(&key.to_string()).is_none() { // Key was not in the map, could be an inconsistency or already deleted. // This is not necessarily an error if the file deletion succeeded or was NotFound. - debug!( - "Key {} not found in entries map during del, might have been already removed.", - key - ); + debug!("Key {} not found in entries map during del, might have been already removed.", key); } debug!("Deleted event from store: {}", key.to_string()); Ok(()) @@ -492,7 +478,6 @@ where } fn boxed_clone(&self) -> Box + Send + Sync> { - Box::new(self.clone()) - as Box + Send + Sync> + Box::new(self.clone()) as Box + Send + Sync> } } diff --git a/crates/notify/src/target/mod.rs b/crates/notify/src/target/mod.rs index ab984d09..d730c9e9 100644 --- a/crates/notify/src/target/mod.rs +++ b/crates/notify/src/target/mod.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use crate::arn::TargetID; use crate::store::{Key, Store}; use crate::{Event, StoreError, TargetError}; @@ -21,7 +22,7 @@ pub trait Target: Send + Sync + 'static { async fn is_active(&self) -> Result; /// Saves an event (either sends it immediately or stores it for later) - async fn save(&self, event: Event) -> Result<(), TargetError>; + async fn save(&self, event: Arc) -> Result<(), TargetError>; /// Sends an event from the store async fn send_from_store(&self, key: Key) -> Result<(), TargetError>; diff --git a/crates/notify/src/target/mqtt.rs b/crates/notify/src/target/mqtt.rs index 82ce8f73..a47a316a 100644 --- a/crates/notify/src/target/mqtt.rs +++ b/crates/notify/src/target/mqtt.rs @@ -58,24 +58,19 @@ impl MQTTArgs { match self.broker.scheme() { "ws" | "wss" | "tcp" | "ssl" | "tls" | "tcps" | "mqtt" | "mqtts" => {} _ => { - return Err(TargetError::Configuration( - "unknown protocol in broker address".to_string(), - )); + return Err(TargetError::Configuration("unknown protocol in broker address".to_string())); } } if !self.queue_dir.is_empty() { let path = std::path::Path::new(&self.queue_dir); if !path.is_absolute() { - return Err(TargetError::Configuration( - "mqtt queueDir path should be absolute".to_string(), - )); + return Err(TargetError::Configuration("mqtt queueDir path should be absolute".to_string())); } if self.qos == QoS::AtMostOnce { return Err(TargetError::Configuration( - "QoS should be AtLeastOnce (1) or ExactlyOnce (2) if queueDir is set" - .to_string(), + "QoS should be AtLeastOnce (1) or ExactlyOnce (2) if queueDir is set".to_string(), )); } } @@ -107,21 +102,12 @@ impl MQTTTarget { let target_id = TargetID::new(id.clone(), ChannelTargetType::Mqtt.as_str().to_string()); let queue_store = if !args.queue_dir.is_empty() { let base_path = PathBuf::from(&args.queue_dir); - let unique_dir_name = format!( - "rustfs-{}-{}-{}", - ChannelTargetType::Mqtt.as_str(), - target_id.name, - target_id.id - ) - .replace(":", "_"); + let unique_dir_name = + format!("rustfs-{}-{}-{}", ChannelTargetType::Mqtt.as_str(), target_id.name, target_id.id).replace(":", "_"); // Ensure the directory name is valid for filesystem let specific_queue_path = base_path.join(unique_dir_name); debug!(target_id = %target_id, path = %specific_queue_path.display(), "Initializing queue store for MQTT target"); - let store = crate::store::QueueStore::::new( - specific_queue_path, - args.queue_limit, - STORE_EXTENSION, - ); + let store = crate::store::QueueStore::::new(specific_queue_path, args.queue_limit, STORE_EXTENSION); if let Err(e) = store.open() { error!( target_id = %target_id, @@ -130,10 +116,7 @@ impl MQTTTarget { ); return Err(TargetError::Storage(format!("{}", e))); } - Some(Box::new(store) - as Box< - dyn Store + Send + Sync, - >) + Some(Box::new(store) as Box + Send + Sync>) } else { None }; @@ -175,18 +158,13 @@ impl MQTTTarget { debug!(target_id = %target_id_clone, "Initializing MQTT background task."); let host = args_clone.broker.host_str().unwrap_or("localhost"); let port = args_clone.broker.port().unwrap_or(1883); - let mut mqtt_options = MqttOptions::new( - format!("rustfs_notify_{}", uuid::Uuid::new_v4()), - host, - port, - ); + let mut mqtt_options = MqttOptions::new(format!("rustfs_notify_{}", uuid::Uuid::new_v4()), host, port); mqtt_options .set_keep_alive(args_clone.keep_alive) .set_max_packet_size(100 * 1024 * 1024, 100 * 1024 * 1024); // 100MB if !args_clone.username.is_empty() { - mqtt_options - .set_credentials(args_clone.username.clone(), args_clone.password.clone()); + mqtt_options.set_credentials(args_clone.username.clone(), args_clone.password.clone()); } let (new_client, eventloop) = AsyncClient::new(mqtt_options, 10); @@ -206,12 +184,8 @@ impl MQTTTarget { *client_arc.lock().await = Some(new_client.clone()); info!(target_id = %target_id_clone, "Spawning MQTT event loop task."); - let task_handle = tokio::spawn(run_mqtt_event_loop( - eventloop, - connected_arc.clone(), - target_id_clone.clone(), - cancel_rx, - )); + let task_handle = + tokio::spawn(run_mqtt_event_loop(eventloop, connected_arc.clone(), target_id_clone.clone(), cancel_rx)); Ok(task_handle) }) .await @@ -266,17 +240,13 @@ impl MQTTTarget { records: vec![event.clone()], }; - let data = serde_json::to_vec(&log) - .map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {}", e)))?; + let data = + serde_json::to_vec(&log).map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {}", e)))?; // Vec Convert to String, only for printing logs - let data_string = String::from_utf8(data.clone()).map_err(|e| { - TargetError::Encoding(format!("Failed to convert event data to UTF-8: {}", e)) - })?; - debug!( - "Sending event to mqtt target: {}, event log: {}", - self.id, data_string - ); + let data_string = String::from_utf8(data.clone()) + .map_err(|e| TargetError::Encoding(format!("Failed to convert event data to UTF-8: {}", e)))?; + debug!("Sending event to mqtt target: {}, event log: {}", self.id, data_string); client .publish(&self.args.topic, self.args.qos, false, data) @@ -474,9 +444,7 @@ impl Target for MQTTTarget { if let Some(handle) = self.bg_task_manager.init_cell.get() { if handle.is_finished() { error!(target_id = %self.id, "MQTT background task has finished, possibly due to an error. Target is not active."); - return Err(TargetError::Network( - "MQTT background task terminated".to_string(), - )); + return Err(TargetError::Network("MQTT background task terminated".to_string())); } } debug!(target_id = %self.id, "MQTT client not yet initialized or task not running/connected."); @@ -495,7 +463,7 @@ impl Target for MQTTTarget { } #[instrument(skip(self, event), fields(target_id = %self.id))] - async fn save(&self, event: Event) -> Result<(), TargetError> { + async fn save(&self, event: Arc) -> Result<(), TargetError> { if let Some(store) = &self.store { debug!(target_id = %self.id, "Event saved to store start"); // If store is configured, ONLY put the event into the store. @@ -507,10 +475,7 @@ impl Target for MQTTTarget { } Err(e) => { error!(target_id = %self.id, error = %e, "Failed to save event to store"); - return Err(TargetError::Storage(format!( - "Failed to save event to store: {}", - e - ))); + return Err(TargetError::Storage(format!("Failed to save event to store: {}", e))); } } } else { @@ -581,10 +546,7 @@ impl Target for MQTTTarget { error = %e, "Failed to get event from store" ); - return Err(TargetError::Storage(format!( - "Failed to get event from store: {}", - e - ))); + return Err(TargetError::Storage(format!("Failed to get event from store: {}", e))); } }; @@ -608,10 +570,7 @@ impl Target for MQTTTarget { } Err(e) => { error!(target_id = %self.id, error = %e, "Failed to delete event from store after send."); - return Err(TargetError::Storage(format!( - "Failed to delete event from store: {}", - e - ))); + return Err(TargetError::Storage(format!("Failed to delete event from store: {}", e))); } } diff --git a/crates/notify/src/target/webhook.rs b/crates/notify/src/target/webhook.rs index 9413067d..9528b38f 100644 --- a/crates/notify/src/target/webhook.rs +++ b/crates/notify/src/target/webhook.rs @@ -221,24 +221,24 @@ impl WebhookTarget { .header("Content-Type", "application/json"); if !self.args.auth_token.is_empty() { - // 分割 auth_token 字符串,检查是否已包含认证类型 + // Split auth_token string to check if the authentication type is included let tokens: Vec<&str> = self.args.auth_token.split_whitespace().collect(); match tokens.len() { 2 => { - // 已经包含认证类型和令牌,如 "Bearer token123" + // Already include authentication type and token, such as "Bearer token123" req_builder = req_builder.header("Authorization", &self.args.auth_token); } 1 => { - // 只有令牌,需要添加 "Bearer" 前缀 + // Only tokens, need to add "Bearer" prefix req_builder = req_builder.header("Authorization", format!("Bearer {}", self.args.auth_token)); } _ => { - // 空字符串或其他情况,不添加认证头 + // Empty string or other situations, no authentication header is added } } } - // 发送请求 + // Send a request let resp = req_builder.body(data).send().await.map_err(|e| { if e.is_timeout() || e.is_connect() { TargetError::NotConnected @@ -271,7 +271,7 @@ impl Target for WebhookTarget { self.id.clone() } - // 确保 Future 是 Send + // Make sure Future is Send async fn is_active(&self) -> Result { let socket_addr = lookup_host(&self.addr) .await @@ -296,7 +296,7 @@ impl Target for WebhookTarget { } } - async fn save(&self, event: Event) -> Result<(), TargetError> { + async fn save(&self, event: Arc) -> Result<(), TargetError> { if let Some(store) = &self.store { // Call the store method directly, no longer need to acquire the lock store diff --git a/crates/utils/src/sys/user_agent.rs b/crates/utils/src/sys/user_agent.rs index 31f42fd7..efc0f62e 100644 --- a/crates/utils/src/sys/user_agent.rs +++ b/crates/utils/src/sys/user_agent.rs @@ -100,7 +100,7 @@ impl UserAgent { fn get_macos_platform(_sys: &System) -> String { let binding = System::os_version().unwrap_or("14.5.0".to_string()); let version = binding.split('.').collect::>(); - let major = version.get(0).unwrap_or(&"14").to_string(); + let major = version.first().unwrap_or(&"14").to_string(); let minor = version.get(1).unwrap_or(&"5").to_string(); let patch = version.get(2).unwrap_or(&"0").to_string(); diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index b0695233..ee353765 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -502,11 +502,9 @@ async fn run(opt: config::Opt) -> Result<()> { }); // init store - let store = ECStore::new(server_addr.clone(), endpoint_pools.clone()) - .await - .inspect_err(|err| { - error!("ECStore::new {:?}", err); - })?; + let store = ECStore::new(server_addr, endpoint_pools.clone()).await.inspect_err(|err| { + error!("ECStore::new {:?}", err); + })?; ecconfig::init(); // config system configuration