diff --git a/Cargo.lock b/Cargo.lock index e1b3a4cc..2ec45197 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8316,6 +8316,7 @@ dependencies = [ "axum", "chrono", "const-str", + "dashmap 6.1.0", "ecstore", "form_urlencoded", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 562e71fd..c688354c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,8 +59,6 @@ rustfs-notify = { path = "crates/notify", version = "0.0.1" } rustfs-utils = { path = "crates/utils", version = "0.0.1" } rustfs-rio = { path = "crates/rio", version = "0.0.1" } rustfs-filemeta = { path = "crates/filemeta", version = "0.0.1" } -rustfs-disk = { path = "crates/disk", version = "0.0.1" } -rustfs-error = { path = "crates/error", version = "0.0.1" } workers = { path = "./common/workers", version = "0.0.1" } aes-gcm = { version = "0.10.3", features = ["std"] } arc-swap = "1.7.1" @@ -84,14 +82,13 @@ cfg-if = "1.0.0" chacha20poly1305 = { version = "0.10.1" } chrono = { version = "0.4.41", features = ["serde"] } clap = { version = "4.5.40", features = ["derive", "env"] } -config = "0.15.11" const-str = { version = "0.6.2", features = ["std", "proc"] } crc32fast = "1.4.2" +dashmap = "6.1.0" datafusion = "46.0.1" derive_builder = "0.20.2" dioxus = { version = "0.6.3", features = ["router"] } dirs = "6.0.0" -dotenvy = "0.15.7" flatbuffers = "25.2.10" flexi_logger = { version = "0.30.2", features = ["trc", "dont_minimize_extra_stacks"] } form_urlencoded = "1.2.1" @@ -200,7 +197,6 @@ shadow-rs = { version = "1.1.1", default-features = false } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" serde_urlencoded = "0.7.1" -serde_with = "3.12.0" sha2 = "0.10.9" siphasher = "1.0.1" smallvec = { version = "1.15.1", features = ["serde"] } diff --git a/crates/notify/Cargo.toml b/crates/notify/Cargo.toml index 37a3a526..9737075b 100644 --- a/crates/notify/Cargo.toml +++ b/crates/notify/Cargo.toml @@ -11,6 +11,7 @@ rustfs-utils = { workspace = true, features = ["path", "sys"] } async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } const-str = { workspace = true } +dashmap = { workspace = true } ecstore = { workspace = true } form_urlencoded = { workspace = true } once_cell = { workspace = true } diff --git a/crates/notify/examples/full_demo.rs b/crates/notify/examples/full_demo.rs index 8331150a..181f850e 100644 --- a/crates/notify/examples/full_demo.rs +++ b/crates/notify/examples/full_demo.rs @@ -7,6 +7,7 @@ use rustfs_notify::factory::{ use rustfs_notify::store::DEFAULT_LIMIT; use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger}; use rustfs_notify::{initialize, notification_system}; +use std::sync::Arc; use std::time::Duration; use tracing::info; @@ -159,10 +160,8 @@ async fn main() -> Result<(), NotificationError> { system.load_bucket_notification_config("my-bucket", &bucket_config).await?; info!("\n---> Sending an event..."); - let event = Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut); - system - .send_event("my-bucket", "s3:ObjectCreated:Put", "document.pdf", event) - .await; + let event = Arc::new(Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut)); + system.send_event(event).await; info!("✅ Event sent. Only the Webhook target should receive it. Check logs for warnings about the missing MQTT target."); tokio::time::sleep(Duration::from_secs(2)).await; diff --git a/crates/notify/examples/full_demo_one.rs b/crates/notify/examples/full_demo_one.rs index 93a40c3d..381287bc 100644 --- a/crates/notify/examples/full_demo_one.rs +++ b/crates/notify/examples/full_demo_one.rs @@ -1,4 +1,5 @@ use ecstore::config::{Config, ENABLE_KEY, ENABLE_ON, KV, KVS}; +use std::sync::Arc; // Using Global Accessories use rustfs_notify::arn::TargetID; use rustfs_notify::factory::{ @@ -153,10 +154,8 @@ async fn main() -> Result<(), NotificationError> { // --- Send events --- info!("\n---> Sending an event..."); - let event = Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut); - system - .send_event("my-bucket", "s3:ObjectCreated:Put", "document.pdf", event) - .await; + let event = Arc::new(Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut)); + system.send_event(event).await; info!("✅ Event sent. Both Webhook and MQTT targets should receive it."); tokio::time::sleep(Duration::from_secs(2)).await; diff --git a/crates/notify/src/error.rs b/crates/notify/src/error.rs index a7a38e9d..6522aeb5 100644 --- a/crates/notify/src/error.rs +++ b/crates/notify/src/error.rs @@ -1,3 +1,4 @@ +use crate::arn::TargetID; use std::io; use thiserror::Error; @@ -96,8 +97,20 @@ pub enum NotificationError { #[error("Notification system has already been initialized")] AlreadyInitialized, - #[error("Io error: {0}")] + #[error("I/O error: {0}")] Io(std::io::Error), + + #[error("Failed to read configuration: {0}")] + ReadConfig(String), + + #[error("Failed to save configuration: {0}")] + SaveConfig(String), + + #[error("Target '{0}' not found")] + TargetNotFound(TargetID), + + #[error("Server not initialized")] + ServerNotInitialized, } impl From for TargetError { diff --git a/crates/notify/src/event.rs b/crates/notify/src/event.rs index 6c7c30f4..f4c06441 100644 --- a/crates/notify/src/event.rs +++ b/crates/notify/src/event.rs @@ -445,25 +445,20 @@ impl Event { }; let mut resp_elements = args.resp_elements.clone(); - resp_elements.entry("x-amz-request-id".to_string()).or_default(); - resp_elements.entry("x-amz-id-2".to_string()).or_default(); - // ... Filling of other response elements + initialize_response_elements(&mut resp_elements, &["x-amz-request-id", "x-amz-id-2"]); // URL encoding of object keys let key_name = form_urlencoded::byte_serialize(args.object.name.as_bytes()).collect::(); - - let principal_id = args.req_params.get("principalId").cloned().unwrap_or_default(); - let owner_identity = Identity { - principal_id: principal_id.clone(), - }; - let user_identity = Identity { principal_id }; + let principal_id = args.req_params.get("principalId").unwrap_or(&String::new()).to_string(); let mut s3_metadata = Metadata { schema_version: "1.0".to_string(), configuration_id: "Config".to_string(), // or from args bucket: Bucket { name: args.bucket_name.clone(), - owner_identity, + owner_identity: Identity { + principal_id: principal_id.clone(), + }, arn: format!("arn:aws:s3:::{}", args.bucket_name), }, object: Object { @@ -499,7 +494,7 @@ impl Event { aws_region: args.req_params.get("region").cloned().unwrap_or_default(), event_time: event_time.and_utc(), event_name: args.event_name, - user_identity, + user_identity: Identity { principal_id }, request_parameters: args.req_params, response_elements: resp_elements, s3: s3_metadata, @@ -512,6 +507,12 @@ impl Event { } } +fn initialize_response_elements(elements: &mut HashMap, keys: &[&str]) { + for key in keys { + elements.entry(key.to_string()).or_default(); + } +} + /// Represents a log of events for sending to targets #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EventLog { diff --git a/crates/notify/src/global.rs b/crates/notify/src/global.rs index 71418390..ebae7b84 100644 --- a/crates/notify/src/global.rs +++ b/crates/notify/src/global.rs @@ -52,9 +52,7 @@ impl Notifier { } // Create an event and send it - let event = Event::new(args.clone()); - notification_sys - .send_event(&args.bucket_name, args.event_name.as_str(), args.object.name.as_str(), event) - .await; + let event = Arc::new(Event::new(args)); + notification_sys.send_event(event).await; } } diff --git a/crates/notify/src/integration.rs b/crates/notify/src/integration.rs index 669cbf90..7fa9cfae 100644 --- a/crates/notify/src/integration.rs +++ b/crates/notify/src/integration.rs @@ -1,7 +1,7 @@ use crate::arn::TargetID; use crate::store::{Key, Store}; use crate::{ - Event, StoreError, Target, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, + Event, EventName, StoreError, Target, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, }; use ecstore::config::{Config, KVS}; @@ -173,6 +173,38 @@ impl NotificationSystem { self.notifier.target_list().read().await.keys() } + /// Checks if there are active subscribers for the given bucket and event name. + pub async fn has_subscriber(&self, bucket: &str, event_name: &EventName) -> bool { + self.notifier.has_subscriber(bucket, event_name).await + } + + async fn update_config_and_reload(&self, mut modifier: F) -> Result<(), NotificationError> + where + F: FnMut(&mut Config) -> bool, // The closure returns a boolean value indicating whether the configuration has been changed + { + let Some(store) = ecstore::global::new_object_layer_fn() else { + return Err(NotificationError::ServerNotInitialized); + }; + + let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone()) + .await + .map_err(|e| NotificationError::ReadConfig(e.to_string()))?; + + if !modifier(&mut new_config) { + // If the closure indication has not changed, return in advance + info!("Configuration not changed, skipping save and reload."); + return Ok(()); + } + + if let Err(e) = ecstore::config::com::save_server_config(store, &new_config).await { + error!("Failed to save config: {}", e); + return Err(NotificationError::SaveConfig(e.to_string())); + } + + info!("Configuration updated. Reloading system..."); + self.reload_config(new_config).await + } + /// Accurately remove a Target and its related resources through TargetID. /// /// This process includes: @@ -188,40 +220,23 @@ impl NotificationSystem { pub async fn remove_target(&self, target_id: &TargetID, target_type: &str) -> Result<(), NotificationError> { info!("Attempting to remove target: {}", target_id); - let Some(store) = ecstore::global::new_object_layer_fn() else { - return Err(NotificationError::Io(std::io::Error::other("errServerNotInitialized"))); - }; - - let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone()) - .await - .map_err(|e| NotificationError::Configuration(format!("Failed to read notification config: {}", e)))?; - - let mut changed = false; - if let Some(targets_of_type) = new_config.0.get_mut(target_type) { - if targets_of_type.remove(&target_id.name).is_some() { - info!("Removed target {} from the configuration.", target_id); - changed = true; + self.update_config_and_reload(|config| { + let mut changed = false; + if let Some(targets_of_type) = config.0.get_mut(target_type) { + if targets_of_type.remove(&target_id.name).is_some() { + info!("Removed target {} from configuration", target_id); + changed = true; + } + if targets_of_type.is_empty() { + config.0.remove(target_type); + } } - if targets_of_type.is_empty() { - new_config.0.remove(target_type); + if !changed { + warn!("Target {} not found in configuration", target_id); } - } - - if !changed { - warn!("Target {} was not found in the configuration.", target_id); - return Ok(()); - } - - if let Err(e) = ecstore::config::com::save_server_config(store, &new_config).await { - error!("Failed to save config for target removal: {}", e); - return Err(NotificationError::Configuration(format!("Failed to save config: {}", e))); - } - - info!( - "Configuration updated and persisted for target {} removal. Reloading system...", - target_id - ); - self.reload_config(new_config).await + changed + }) + .await } /// Set or update a Target configuration. @@ -238,46 +253,15 @@ impl NotificationSystem { /// If the target configuration is invalid, it returns Err(NotificationError::Configuration). pub async fn set_target_config(&self, target_type: &str, target_name: &str, kvs: KVS) -> Result<(), NotificationError> { info!("Setting config for target {} of type {}", target_name, target_type); - // 1. Get the storage handle - let Some(store) = ecstore::global::new_object_layer_fn() else { - return Err(NotificationError::Io(std::io::Error::other("errServerNotInitialized"))); - }; - - // 2. Read the latest configuration from storage - let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone()) - .await - .map_err(|e| NotificationError::Configuration(format!("Failed to read notification config: {}", e)))?; - - // 3. Modify the configuration copy - new_config - .0 - .entry(target_type.to_string()) - .or_default() - .insert(target_name.to_string(), kvs); - - // 4. Persist the new configuration - if let Err(e) = ecstore::config::com::save_server_config(store, &new_config).await { - error!("Failed to save notification config: {}", e); - return Err(NotificationError::Configuration(format!("Failed to save notification config: {}", e))); - } - - // 5. After the persistence is successful, the system will be reloaded to apply changes. - match self.reload_config(new_config).await { - Ok(_) => { - info!( - "Target {} of type {} configuration updated and reloaded successfully", - target_name, target_type - ); - Ok(()) - } - Err(e) => { - error!("Failed to reload config for target {} of type {}: {}", target_name, target_type, e); - Err(NotificationError::Configuration(format!( - "Configuration saved, but failed to reload: {}", - e - ))) - } - } + self.update_config_and_reload(|config| { + config + .0 + .entry(target_type.to_string()) + .or_default() + .insert(target_name.to_string(), kvs.clone()); + true // The configuration is always modified + }) + .await } /// Removes all notification configurations for a bucket. @@ -299,39 +283,22 @@ impl NotificationSystem { /// If the target configuration does not exist, it returns Ok(()) without making any changes. pub async fn remove_target_config(&self, target_type: &str, target_name: &str) -> Result<(), NotificationError> { info!("Removing config for target {} of type {}", target_name, target_type); - let Some(store) = ecstore::global::new_object_layer_fn() else { - return Err(NotificationError::Io(std::io::Error::other("errServerNotInitialized"))); - }; - - let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone()) - .await - .map_err(|e| NotificationError::Configuration(format!("Failed to read notification config: {}", e)))?; - - let mut changed = false; - if let Some(targets) = new_config.0.get_mut(target_type) { - if targets.remove(target_name).is_some() { - changed = true; + self.update_config_and_reload(|config| { + let mut changed = false; + if let Some(targets) = config.0.get_mut(target_type) { + if targets.remove(target_name).is_some() { + changed = true; + } + if targets.is_empty() { + config.0.remove(target_type); + } } - if targets.is_empty() { - new_config.0.remove(target_type); + if !changed { + info!("Target {} of type {} not found, no changes made.", target_name, target_type); } - } - - if !changed { - info!("Target {} of type {} not found, no changes made.", target_name, target_type); - return Ok(()); - } - - if let Err(e) = ecstore::config::com::save_server_config(store, &new_config).await { - error!("Failed to save config for target removal: {}", e); - return Err(NotificationError::Configuration(format!("Failed to save config: {}", e))); - } - - info!( - "Configuration updated and persisted for target {} removal. Reloading system...", - target_name - ); - self.reload_config(new_config).await + changed + }) + .await } /// Enhanced event stream startup function, including monitoring and concurrency control @@ -346,6 +313,12 @@ impl NotificationSystem { stream::start_event_stream_with_batching(store, target, metrics, semaphore) } + /// Update configuration + async fn update_config(&self, new_config: Config) { + let mut config = self.config.write().await; + *config = new_config; + } + /// Reloads the configuration pub async fn reload_config(&self, new_config: Config) -> Result<(), NotificationError> { info!("Reload notification configuration starts"); @@ -358,10 +331,7 @@ impl NotificationSystem { } // Update the config - { - let mut config = self.config.write().await; - *config = new_config.clone(); - } + self.update_config(new_config.clone()).await; // Create a new target from configuration let targets: Vec> = self @@ -450,8 +420,8 @@ impl NotificationSystem { } /// Sends an event - pub async fn send_event(&self, bucket_name: &str, event_name: &str, object_key: &str, event: Event) { - self.notifier.send(bucket_name, event_name, object_key, event).await; + pub async fn send_event(&self, event: Arc) { + self.notifier.send(event).await; } /// Obtain system status information diff --git a/crates/notify/src/notifier.rs b/crates/notify/src/notifier.rs index db6959d3..4447c305 100644 --- a/crates/notify/src/notifier.rs +++ b/crates/notify/src/notifier.rs @@ -1,5 +1,6 @@ use crate::arn::TargetID; use crate::{EventName, error::NotificationError, event::Event, rules::RulesMap, target::Target}; +use dashmap::DashMap; use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; use tracing::{debug, error, info, instrument, warn}; @@ -7,7 +8,7 @@ use tracing::{debug, error, info, instrument, warn}; /// Manages event notification to targets based on rules pub struct EventNotifier { target_list: Arc>, - bucket_rules_map: Arc>>, + bucket_rules_map: Arc>, } impl Default for EventNotifier { @@ -21,7 +22,7 @@ impl EventNotifier { pub fn new() -> Self { EventNotifier { target_list: Arc::new(RwLock::new(TargetList::new())), - bucket_rules_map: Arc::new(RwLock::new(HashMap::new())), + bucket_rules_map: Arc::new(DashMap::new()), } } @@ -40,8 +41,7 @@ impl EventNotifier { /// This method removes all rules associated with the specified bucket name. /// It will log a message indicating the removal of rules. pub async fn remove_rules_map(&self, bucket_name: &str) { - let mut rules_map = self.bucket_rules_map.write().await; - if rules_map.remove(bucket_name).is_some() { + if self.bucket_rules_map.remove(bucket_name).is_some() { info!("Removed all notification rules for bucket: {}", bucket_name); } } @@ -58,19 +58,17 @@ impl EventNotifier { /// Adds a rules map for a bucket pub async fn add_rules_map(&self, bucket_name: &str, rules_map: RulesMap) { - let mut bucket_rules_guard = self.bucket_rules_map.write().await; if rules_map.is_empty() { - bucket_rules_guard.remove(bucket_name); + self.bucket_rules_map.remove(bucket_name); } else { - bucket_rules_guard.insert(bucket_name.to_string(), rules_map); + self.bucket_rules_map.insert(bucket_name.to_string(), rules_map); } info!("Added rules for bucket: {}", bucket_name); } /// Removes notification rules for a bucket pub async fn remove_notification(&self, bucket_name: &str) { - let mut bucket_rules_guard = self.bucket_rules_map.write().await; - bucket_rules_guard.remove(bucket_name); + self.bucket_rules_map.remove(bucket_name); info!("Removed notification rules for bucket: {}", bucket_name); } @@ -83,12 +81,34 @@ impl EventNotifier { info!("Removed all targets and their streams"); } + /// Checks if there are active subscribers for the given bucket and event name. + /// + /// # Parameters + /// * `bucket_name` - bucket name. + /// * `event_name` - Event name. + /// + /// # Return value + /// Return `true` if at least one matching notification rule exists. + pub async fn has_subscriber(&self, bucket_name: &str, event_name: &EventName) -> bool { + // Rules to check if the bucket exists + if let Some(rules_map) = self.bucket_rules_map.get(bucket_name) { + // A composite event (such as ObjectCreatedAll) is expanded to multiple single events. + // We need to check whether any of these single events have the rules configured. + rules_map.has_subscriber(event_name) + } else { + // If no bucket is found, no subscribers + false + } + } + /// Sends an event to the appropriate targets based on the bucket rules #[instrument(skip(self, event))] - pub async fn send(&self, bucket_name: &str, event_name: &str, object_key: &str, event: Event) { - let bucket_rules_guard = self.bucket_rules_map.read().await; - if let Some(rules) = bucket_rules_guard.get(bucket_name) { - let target_ids = rules.match_rules(EventName::from(event_name), object_key); + pub async fn send(&self, event: Arc) { + let bucket_name = &event.s3.bucket.name; + let object_key = &event.s3.object.key; + let event_name = event.event_name; + if let Some(rules) = self.bucket_rules_map.get(bucket_name) { + let target_ids = rules.match_rules(event_name, object_key); if target_ids.is_empty() { debug!("No matching targets for event in bucket: {}", bucket_name); return; diff --git a/crates/notify/src/rules/config.rs b/crates/notify/src/rules/config.rs index 2f47326f..11b684af 100644 --- a/crates/notify/src/rules/config.rs +++ b/crates/notify/src/rules/config.rs @@ -8,7 +8,6 @@ use crate::rules::pattern_rules; use crate::rules::target_id_set; use std::collections::HashMap; use std::io::Read; -// Assuming this is the XML config structure /// Configuration for bucket notifications. /// This struct now holds the parsed and validated rules in the new RulesMap format. @@ -102,10 +101,6 @@ impl BucketNotificationConfig { &self.rules } - pub fn to_rules_map(&self) -> RulesMap { - self.rules.clone() - } - /// Sets the region for the configuration pub fn set_region(&mut self, region: &str) { self.region = region.to_string(); diff --git a/crates/notify/src/rules/rules_map.rs b/crates/notify/src/rules/rules_map.rs index 7ec1b3bb..86ac2172 100644 --- a/crates/notify/src/rules/rules_map.rs +++ b/crates/notify/src/rules/rules_map.rs @@ -9,32 +9,43 @@ use std::collections::HashMap; #[derive(Debug, Clone, Default)] pub struct RulesMap { map: HashMap, + /// A bitmask that represents the union of all event types in this map. + /// Used for quick checks in `has_subscriber`. + total_events_mask: u64, } impl RulesMap { + /// Create a new, empty RulesMap. pub fn new() -> Self { Default::default() } - /// Add rule configuration. - /// event_names: A set of event names。 - /// pattern: Object key pattern. - /// target_id: Notify the target. + /// Add a rule configuration to the map. /// - /// This method expands the composite event name. + /// This method handles composite event names (such as `s3:ObjectCreated:*`), expanding them as + /// Multiple specific event types and add rules for each event type. + /// + /// # Parameters + /// * `event_names` - List of event names associated with this rule. + /// * `pattern` - Matching pattern for object keys. If empty, the default is `*` (match all). + /// * `target_id` - The target ID of the notification. pub fn add_rule_config(&mut self, event_names: &[EventName], pattern: String, target_id: TargetID) { - let mut effective_pattern = pattern; - if effective_pattern.is_empty() { - effective_pattern = "*".to_string(); // Match all by default - } + let effective_pattern = if pattern.is_empty() { + "*".to_string() // Match all by default + } else { + pattern + }; for event_name_spec in event_names { + // Expand compound event types, for example ObjectCreatedAll -> [ObjectCreatedPut, ObjectCreatedPost, ...] for expanded_event_name in event_name_spec.expand() { // Make sure EventName::expand() returns Vec self.map .entry(expanded_event_name) .or_default() .add(effective_pattern.clone(), target_id.clone()); + // Update the total_events_mask to include this event type + self.total_events_mask |= expanded_event_name.mask(); } } } @@ -44,13 +55,17 @@ impl RulesMap { pub fn add_map(&mut self, other_map: &Self) { for (event_name, other_pattern_rules) in &other_map.map { let self_pattern_rules = self.map.entry(*event_name).or_default(); - // PatternRules::union 返回新的 PatternRules,我们需要修改现有的 + // PatternRules::union Returns the new PatternRules, we need to modify the existing ones let merged_rules = self_pattern_rules.union(other_pattern_rules); *self_pattern_rules = merged_rules; } + // Directly merge two masks. + self.total_events_mask |= other_map.total_events_mask; } /// Remove another rule defined in the RulesMap from the current RulesMap. + /// + /// After the rule is removed, `total_events_mask` is recalculated to ensure its accuracy. pub fn remove_map(&mut self, other_map: &Self) { let mut events_to_remove = Vec::new(); for (event_name, self_pattern_rules) in &mut self.map { @@ -64,10 +79,30 @@ impl RulesMap { for event_name in events_to_remove { self.map.remove(&event_name); } + // After removing the rule, recalculate total_events_mask. + self.recalculate_mask(); } - ///Rules matching the given event name and object key, returning all matching TargetIDs. + /// Checks whether any configured rules exist for a given event type. + /// + /// This method uses a bitmask for a quick check of O(1) complexity. + /// `event_name` can be a compound type, such as `ObjectCreatedAll`. + pub fn has_subscriber(&self, event_name: &EventName) -> bool { + // event_name.mask() will handle compound events correctly + (self.total_events_mask & event_name.mask()) != 0 + } + + /// Rules matching the given event and object keys and return all matching target IDs. + /// + /// # Notice + /// The `event_name` parameter should be a specific, non-compound event type. + /// Because this is taken from the `Event` object that actually occurs. pub fn match_rules(&self, event_name: EventName, object_key: &str) -> TargetIdSet { + // Use bitmask to quickly determine whether there is a matching rule + if (self.total_events_mask & event_name.mask()) == 0 { + return TargetIdSet::new(); // No matching rules + } + // First try to directly match the event name if let Some(pattern_rules) = self.map.get(&event_name) { let targets = pattern_rules.match_targets(object_key); @@ -89,6 +124,7 @@ impl RulesMap { .map_or_else(TargetIdSet::new, |pr| pr.match_targets(object_key)) } + /// Check if RulesMap is empty. pub fn is_empty(&self) -> bool { self.map.is_empty() } @@ -97,4 +133,42 @@ impl RulesMap { pub fn inner(&self) -> &HashMap { &self.map } + + /// A private helper function that recalculates `total_events_mask` based on the content of the current `map`. + /// Called after the removal operation to ensure the accuracy of the mask. + fn recalculate_mask(&mut self) { + let mut new_mask = 0u64; + for event_name in self.map.keys() { + new_mask |= event_name.mask(); + } + self.total_events_mask = new_mask; + } + + /// Remove rules and optimize performance + #[allow(dead_code)] + pub fn remove_rule(&mut self, event_name: &EventName, pattern: &str) { + if let Some(pattern_rules) = self.map.get_mut(event_name) { + pattern_rules.rules.remove(pattern); + if pattern_rules.is_empty() { + self.map.remove(event_name); + } + } + self.recalculate_mask(); // Delay calculation mask + } + + /// Batch Delete Rules + #[allow(dead_code)] + pub fn remove_rules(&mut self, event_names: &[EventName]) { + for event_name in event_names { + self.map.remove(event_name); + } + self.recalculate_mask(); // Unified calculation of mask after batch processing + } + + /// Update rules and optimize performance + #[allow(dead_code)] + pub fn update_rule(&mut self, event_name: EventName, pattern: String, target_id: TargetID) { + self.map.entry(event_name).or_default().add(pattern, target_id); + self.total_events_mask |= event_name.mask(); // Update only the relevant bitmask + } } diff --git a/crates/notify/src/store.rs b/crates/notify/src/store.rs index f3816cc7..fde4776a 100644 --- a/crates/notify/src/store.rs +++ b/crates/notify/src/store.rs @@ -125,7 +125,7 @@ pub trait Store: Send + Sync { fn open(&self) -> Result<(), Self::Error>; /// Stores a single item - fn put(&self, item: T) -> Result; + fn put(&self, item: Arc) -> Result; /// Stores multiple items in a single batch fn put_multiple(&self, items: Vec) -> Result; @@ -279,7 +279,7 @@ where Ok(()) } - fn put(&self, item: T) -> Result { + fn put(&self, item: Arc) -> Result { // Check storage limits { let entries = self diff --git a/crates/notify/src/target/mod.rs b/crates/notify/src/target/mod.rs index ab984d09..b4004280 100644 --- a/crates/notify/src/target/mod.rs +++ b/crates/notify/src/target/mod.rs @@ -2,6 +2,7 @@ use crate::arn::TargetID; use crate::store::{Key, Store}; use crate::{Event, StoreError, TargetError}; use async_trait::async_trait; +use std::sync::Arc; pub mod mqtt; pub mod webhook; @@ -21,7 +22,7 @@ pub trait Target: Send + Sync + 'static { async fn is_active(&self) -> Result; /// Saves an event (either sends it immediately or stores it for later) - async fn save(&self, event: Event) -> Result<(), TargetError>; + async fn save(&self, event: Arc) -> Result<(), TargetError>; /// Sends an event from the store async fn send_from_store(&self, key: Key) -> Result<(), TargetError>; diff --git a/crates/notify/src/target/mqtt.rs b/crates/notify/src/target/mqtt.rs index 319cd5c8..f7b589ba 100644 --- a/crates/notify/src/target/mqtt.rs +++ b/crates/notify/src/target/mqtt.rs @@ -463,7 +463,7 @@ impl Target for MQTTTarget { } #[instrument(skip(self, event), fields(target_id = %self.id))] - async fn save(&self, event: Event) -> Result<(), TargetError> { + async fn save(&self, event: Arc) -> Result<(), TargetError> { if let Some(store) = &self.store { debug!(target_id = %self.id, "Event saved to store start"); // If store is configured, ONLY put the event into the store. diff --git a/crates/notify/src/target/webhook.rs b/crates/notify/src/target/webhook.rs index 29c5f7cb..91316026 100644 --- a/crates/notify/src/target/webhook.rs +++ b/crates/notify/src/target/webhook.rs @@ -221,24 +221,24 @@ impl WebhookTarget { .header("Content-Type", "application/json"); if !self.args.auth_token.is_empty() { - // 分割 auth_token 字符串,检查是否已包含认证类型 + // Split auth_token string to check if the authentication type is included let tokens: Vec<&str> = self.args.auth_token.split_whitespace().collect(); match tokens.len() { 2 => { - // 已经包含认证类型和令牌,如 "Bearer token123" + // Already include authentication type and token, such as "Bearer token123" req_builder = req_builder.header("Authorization", &self.args.auth_token); } 1 => { - // 只有令牌,需要添加 "Bearer" 前缀 + // Only tokens, need to add "Bearer" prefix req_builder = req_builder.header("Authorization", format!("Bearer {}", self.args.auth_token)); } _ => { - // 空字符串或其他情况,不添加认证头 + // Empty string or other situations, no authentication header is added } } } - // 发送请求 + // Send a request let resp = req_builder.body(data).send().await.map_err(|e| { if e.is_timeout() || e.is_connect() { TargetError::NotConnected @@ -271,7 +271,7 @@ impl Target for WebhookTarget { self.id.clone() } - // 确保 Future 是 Send + // Make sure Future is Send async fn is_active(&self) -> Result { let socket_addr = lookup_host(&self.addr) .await @@ -296,7 +296,7 @@ impl Target for WebhookTarget { } } - async fn save(&self, event: Event) -> Result<(), TargetError> { + async fn save(&self, event: Arc) -> Result<(), TargetError> { if let Some(store) = &self.store { // Call the store method directly, no longer need to acquire the lock store diff --git a/crates/utils/src/sys/user_agent.rs b/crates/utils/src/sys/user_agent.rs index efc0f62e..99d53ffc 100644 --- a/crates/utils/src/sys/user_agent.rs +++ b/crates/utils/src/sys/user_agent.rs @@ -55,13 +55,12 @@ impl UserAgent { /// Obtain operating system platform information fn get_os_platform() -> String { - let sys = System::new_all(); if cfg!(target_os = "windows") { - Self::get_windows_platform(&sys) + Self::get_windows_platform() } else if cfg!(target_os = "macos") { - Self::get_macos_platform(&sys) + Self::get_macos_platform() } else if cfg!(target_os = "linux") { - Self::get_linux_platform(&sys) + Self::get_linux_platform() } else { "Unknown".to_string() } @@ -69,9 +68,9 @@ impl UserAgent { /// Get Windows platform information #[cfg(windows)] - fn get_windows_platform(sys: &System) -> String { + fn get_windows_platform() -> String { // Priority to using sysinfo to get versions - if let Some(version) = sys.os_version() { + if let Some(version) = System::os_version() { format!("Windows NT {}", version) } else { // Fallback to cmd /c ver @@ -91,13 +90,13 @@ impl UserAgent { } #[cfg(not(windows))] - fn get_windows_platform(_sys: &System) -> String { + fn get_windows_platform() -> String { "N/A".to_string() } /// Get macOS platform information #[cfg(target_os = "macos")] - fn get_macos_platform(_sys: &System) -> String { + fn get_macos_platform() -> String { let binding = System::os_version().unwrap_or("14.5.0".to_string()); let version = binding.split('.').collect::>(); let major = version.first().unwrap_or(&"14").to_string(); @@ -112,20 +111,18 @@ impl UserAgent { } #[cfg(not(target_os = "macos"))] - fn get_macos_platform(_sys: &System) -> String { + fn get_macos_platform() -> String { "N/A".to_string() } /// Get Linux platform information #[cfg(target_os = "linux")] - fn get_linux_platform(sys: &System) -> String { - let name = sys.name().unwrap_or("Linux".to_string()); - let version = sys.os_version().unwrap_or("Unknown".to_string()); - format!("X11; {} {}", name, version) + fn get_linux_platform() -> String { + format!("X11; {}", System::long_os_version().unwrap_or("Linux Unknown".to_string())) } #[cfg(not(target_os = "linux"))] - fn get_linux_platform(_sys: &System) -> String { + fn get_linux_platform() -> String { "N/A".to_string() } } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index e2dae171..bc495d9b 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -12,9 +12,9 @@ mod service; mod storage; use crate::auth::IAMAuth; -use crate::console::{CONSOLE_CONFIG, init_console_cfg}; +use crate::console::{init_console_cfg, CONSOLE_CONFIG}; // Ensure the correct path for parse_license is imported -use crate::server::{SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, wait_for_shutdown}; +use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT}; use bytes::Bytes; use chrono::Datelike; use clap::Parser; @@ -22,7 +22,6 @@ use common::{ // error::{Error, Result}, globals::set_global_addr, }; -use ecstore::StorageAPI; use ecstore::bucket::metadata_sys::init_bucket_metadata_sys; use ecstore::cmd::bucket_replication::init_bucket_replication_pool; use ecstore::config as ecconfig; @@ -30,11 +29,12 @@ use ecstore::config::GLOBAL_ConfigSys; use ecstore::heal::background_heal_ops::init_auto_heal; use ecstore::rpc::make_server; use ecstore::store_api::BucketOptions; +use ecstore::StorageAPI; use ecstore::{ endpoints::EndpointServerPools, heal::data_scanner::init_data_scanner, set_global_endpoints, - store::{ECStore, init_local_disks}, + store::{init_local_disks, ECStore}, update_erasure_type, }; use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys}; @@ -49,7 +49,7 @@ use iam::init_iam_sys; use license::init_license; use protos::proto_gen::node_service::node_service_server::NodeServiceServer; use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; -use rustfs_obs::{SystemObserver, init_obs, set_global_guard}; +use rustfs_obs::{init_obs, set_global_guard, SystemObserver}; use rustfs_utils::net::parse_and_resolve_address; use rustls::ServerConfig; use s3s::{host::MultiDomain, service::S3ServiceBuilder}; @@ -60,13 +60,12 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tokio::net::TcpListener; -use tokio::signal::unix::{SignalKind, signal}; +use tokio::signal::unix::{signal, SignalKind}; use tokio_rustls::TlsAcceptor; -use tonic::{Request, Status, metadata::MetadataValue}; +use tonic::{metadata::MetadataValue, Request, Status}; use tower_http::cors::CorsLayer; use tower_http::trace::TraceLayer; -use tracing::{Span, instrument}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, instrument, warn, Span}; const MI_B: usize = 1024 * 1024; @@ -119,9 +118,6 @@ async fn main() -> Result<()> { async fn run(opt: config::Opt) -> Result<()> { debug!("opt: {:?}", &opt); - // Initialize event notifier - // event::init_event_notifier(opt.event_config).await; - let server_addr = parse_and_resolve_address(opt.address.as_str()).map_err(Error::other)?; let server_port = server_addr.port(); let server_address = server_addr.to_string(); @@ -510,9 +506,6 @@ async fn run(opt: config::Opt) -> Result<()> { // config system configuration GLOBAL_ConfigSys.init(store.clone()).await?; - // event system configuration - // GLOBAL_EVENT_SYS.init(store.clone()).await?; - // Initialize event notifier event::init_event_notifier().await;