diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3a285ba6..ae3a308c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -104,7 +104,7 @@ jobs: name: Test and Lint needs: skip-check if: needs.skip-check.outputs.should_skip != 'true' - runs-on: ubicloud-standard-2 + runs-on: ubicloud-standard-4 timeout-minutes: 60 steps: - name: Checkout repository diff --git a/Cargo.lock b/Cargo.lock index 3b2d43a8..08641555 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7437,6 +7437,7 @@ dependencies = [ name = "rustfs-notify" version = "0.0.5" dependencies = [ + "arc-swap", "async-trait", "axum", "chrono", diff --git a/crates/audit/src/factory.rs b/crates/audit/src/factory.rs index ea8cd9b9..9beded31 100644 --- a/crates/audit/src/factory.rs +++ b/crates/audit/src/factory.rs @@ -60,8 +60,9 @@ impl TargetFactory for WebhookTargetFactory { let endpoint = config .lookup(WEBHOOK_ENDPOINT) .ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?; - let endpoint_url = Url::parse(&endpoint) - .map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {e} (value: '{endpoint}')")))?; + let parsed_endpoint = endpoint.trim(); + let endpoint_url = Url::parse(parsed_endpoint) + .map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {e} (value: '{parsed_endpoint}')")))?; let args = WebhookArgs { enable: true, // If we are here, it's already enabled. diff --git a/crates/config/src/notify/mod.rs b/crates/config/src/notify/mod.rs index 6abb2bf8..59e6493f 100644 --- a/crates/config/src/notify/mod.rs +++ b/crates/config/src/notify/mod.rs @@ -51,6 +51,18 @@ pub const ENV_NOTIFY_TARGET_STREAM_CONCURRENCY: &str = "RUSTFS_NOTIFY_TARGET_STR /// Adjust this value based on your system's capabilities and expected load. pub const DEFAULT_NOTIFY_TARGET_STREAM_CONCURRENCY: usize = 20; +/// Name of the environment variable that configures send concurrency. +/// Controls how many send operations are processed in parallel by the notification system. +/// Defaults to [`DEFAULT_NOTIFY_SEND_CONCURRENCY`] if not set. +/// Example: `RUSTFS_NOTIFY_SEND_CONCURRENCY=64`. +pub const ENV_NOTIFY_SEND_CONCURRENCY: &str = "RUSTFS_NOTIFY_SEND_CONCURRENCY"; + +/// Default concurrency for send operations in the notification system +/// This value is used if the environment variable `RUSTFS_NOTIFY_SEND_CONCURRENCY` is not set. +/// It defines how many send operations can be processed in parallel by the notification system at any given time. +/// Adjust this value based on your system's capabilities and expected load. +pub const DEFAULT_NOTIFY_SEND_CONCURRENCY: usize = 64; + #[allow(dead_code)] pub const NOTIFY_SUB_SYSTEMS: &[&str] = &[NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS]; diff --git a/crates/notify/Cargo.toml b/crates/notify/Cargo.toml index 0f02b70a..a4626675 100644 --- a/crates/notify/Cargo.toml +++ b/crates/notify/Cargo.toml @@ -30,6 +30,7 @@ rustfs-config = { workspace = true, features = ["notify", "constants"] } rustfs-ecstore = { workspace = true } rustfs-targets = { workspace = true } rustfs-utils = { workspace = true } +arc-swap = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } futures = { workspace = true } diff --git a/crates/notify/src/factory.rs b/crates/notify/src/factory.rs index e15f5c5d..fb4d6312 100644 --- a/crates/notify/src/factory.rs +++ b/crates/notify/src/factory.rs @@ -60,8 +60,9 @@ impl TargetFactory for WebhookTargetFactory { let endpoint = config .lookup(WEBHOOK_ENDPOINT) .ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?; - let endpoint_url = Url::parse(&endpoint) - .map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {e} (value: '{endpoint}')")))?; + let parsed_endpoint = endpoint.trim(); + let endpoint_url = Url::parse(parsed_endpoint) + .map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {e} (value: '{parsed_endpoint}')")))?; let args = WebhookArgs { enable: true, // If we are here, it's already enabled. diff --git a/crates/notify/src/integration.rs b/crates/notify/src/integration.rs index 790d43f9..ddce7560 100644 --- a/crates/notify/src/integration.rs +++ b/crates/notify/src/integration.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::notification_system_subscriber::NotificationSystemSubscriberView; use crate::{ Event, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, }; @@ -104,6 +105,8 @@ pub struct NotificationSystem { concurrency_limiter: Arc, /// Monitoring indicators metrics: Arc, + /// Subscriber view + subscriber_view: NotificationSystemSubscriberView, } impl NotificationSystem { @@ -112,6 +115,7 @@ impl NotificationSystem { let concurrency_limiter = rustfs_utils::get_env_usize(ENV_NOTIFY_TARGET_STREAM_CONCURRENCY, DEFAULT_NOTIFY_TARGET_STREAM_CONCURRENCY); NotificationSystem { + subscriber_view: NotificationSystemSubscriberView::new(), notifier: Arc::new(EventNotifier::new()), registry: Arc::new(TargetRegistry::new()), config: Arc::new(RwLock::new(config)), @@ -188,8 +192,11 @@ impl NotificationSystem { } /// Checks if there are active subscribers for the given bucket and event name. - pub async fn has_subscriber(&self, bucket: &str, event_name: &EventName) -> bool { - self.notifier.has_subscriber(bucket, event_name).await + pub async fn has_subscriber(&self, bucket: &str, event: &EventName) -> bool { + if !self.subscriber_view.has_subscriber(bucket, event) { + return false; + } + self.notifier.has_subscriber(bucket, event).await } async fn update_config_and_reload(&self, mut modifier: F) -> Result<(), NotificationError> @@ -236,15 +243,18 @@ impl NotificationSystem { pub async fn remove_target(&self, target_id: &TargetID, target_type: &str) -> Result<(), NotificationError> { info!("Attempting to remove target: {}", target_id); + let ttype = target_type.to_lowercase(); + let tname = target_id.name.to_lowercase(); + self.update_config_and_reload(|config| { let mut changed = false; - if let Some(targets_of_type) = config.0.get_mut(target_type) { - if targets_of_type.remove(&target_id.name).is_some() { + if let Some(targets_of_type) = config.0.get_mut(&ttype) { + if targets_of_type.remove(&tname).is_some() { info!("Removed target {} from configuration", target_id); changed = true; } if targets_of_type.is_empty() { - config.0.remove(target_type); + config.0.remove(&ttype); } } if !changed { @@ -269,20 +279,24 @@ impl NotificationSystem { /// If the target configuration is invalid, it returns Err(NotificationError::Configuration). pub async fn set_target_config(&self, target_type: &str, target_name: &str, kvs: KVS) -> Result<(), NotificationError> { info!("Setting config for target {} of type {}", target_name, target_type); + let ttype = target_type.to_lowercase(); + let tname = target_name.to_lowercase(); self.update_config_and_reload(|config| { - config - .0 - .entry(target_type.to_lowercase()) - .or_default() - .insert(target_name.to_lowercase(), kvs.clone()); + config.0.entry(ttype.clone()).or_default().insert(tname.clone(), kvs.clone()); true // The configuration is always modified }) .await } /// Removes all notification configurations for a bucket. - pub async fn remove_bucket_notification_config(&self, bucket_name: &str) { - self.notifier.remove_rules_map(bucket_name).await; + /// If the configuration is successfully removed, the entire notification system will be automatically reloaded. + /// + /// # Arguments + /// * `bucket` - The name of the bucket whose notification configuration is to be removed. + /// + pub async fn remove_bucket_notification_config(&self, bucket: &str) { + self.subscriber_view.clear_bucket(bucket); + self.notifier.remove_rules_map(bucket).await; } /// Removes a Target configuration. @@ -299,11 +313,28 @@ impl NotificationSystem { /// If the target configuration does not exist, it returns Ok(()) without making any changes. pub async fn remove_target_config(&self, target_type: &str, target_name: &str) -> Result<(), NotificationError> { info!("Removing config for target {} of type {}", target_name, target_type); + + let ttype = target_type.to_lowercase(); + let tname = target_name.to_lowercase(); + + let target_id = TargetID { + id: tname.clone(), + name: ttype.clone(), + }; + + // Deletion is prohibited if bucket rules refer to it + if self.notifier.is_target_bound_to_any_bucket(&target_id).await { + return Err(NotificationError::Configuration(format!( + "Target is still bound to bucket rules and deletion is prohibited: type={} name={}", + ttype, tname + ))); + } + let config_result = self .update_config_and_reload(|config| { let mut changed = false; - if let Some(targets) = config.0.get_mut(&target_type.to_lowercase()) { - if targets.remove(&target_name.to_lowercase()).is_some() { + if let Some(targets) = config.0.get_mut(&ttype) { + if targets.remove(&tname).is_some() { changed = true; } if targets.is_empty() { @@ -319,8 +350,6 @@ impl NotificationSystem { .await; if config_result.is_ok() { - let target_id = TargetID::new(target_name.to_string(), target_type.to_string()); - // Remove from target list let target_list = self.notifier.target_list(); let mut target_list_guard = target_list.write().await; @@ -358,6 +387,9 @@ impl NotificationSystem { let _ = cancel_tx.send(()).await; } + // Clear the target_list and ensure that reload is a replacement reconstruction (solve the target_list len unchanged/residual problem) + self.notifier.remove_all_bucket_targets().await; + // Update the config self.update_config(new_config.clone()).await; @@ -388,15 +420,16 @@ impl NotificationSystem { // The storage of the cloned target and the target itself let store_clone = store.boxed_clone(); - let target_box = target.clone_dyn(); - let target_arc = Arc::from(target_box); - - // Add a reference to the monitoring metrics - let metrics = self.metrics.clone(); - let semaphore = self.concurrency_limiter.clone(); + // let target_box = target.clone_dyn(); + let target_arc = Arc::from(target.clone_dyn()); // Encapsulated enhanced version of start_event_stream - let cancel_tx = self.enhanced_start_event_stream(store_clone, target_arc, metrics, semaphore); + let cancel_tx = self.enhanced_start_event_stream( + store_clone, + target_arc, + self.metrics.clone(), + self.concurrency_limiter.clone(), + ); // Start event stream processing and save cancel sender // let cancel_tx = start_event_stream(store_clone, target_clone); @@ -423,17 +456,18 @@ impl NotificationSystem { /// Loads the bucket notification configuration pub async fn load_bucket_notification_config( &self, - bucket_name: &str, - config: &BucketNotificationConfig, + bucket: &str, + cfg: &BucketNotificationConfig, ) -> Result<(), NotificationError> { - let arn_list = self.notifier.get_arn_list(&config.region).await; + self.subscriber_view.apply_bucket_config(bucket, cfg); + let arn_list = self.notifier.get_arn_list(&cfg.region).await; if arn_list.is_empty() { return Err(NotificationError::Configuration("No targets configured".to_string())); } info!("Available ARNs: {:?}", arn_list); // Validate the configuration against the available ARNs - if let Err(e) = config.validate(&config.region, &arn_list) { - debug!("Bucket notification config validation region:{} failed: {}", &config.region, e); + if let Err(e) = cfg.validate(&cfg.region, &arn_list) { + debug!("Bucket notification config validation region:{} failed: {}", &cfg.region, e); if !e.to_string().contains("ARN not found") { return Err(NotificationError::BucketNotification(e.to_string())); } else { @@ -441,9 +475,9 @@ impl NotificationSystem { } } - let rules_map = config.get_rules_map(); - self.notifier.add_rules_map(bucket_name, rules_map.clone()).await; - info!("Loaded notification config for bucket: {}", bucket_name); + let rules_map = cfg.get_rules_map(); + self.notifier.add_rules_map(bucket, rules_map.clone()).await; + info!("Loaded notification config for bucket: {}", bucket); Ok(()) } diff --git a/crates/notify/src/lib.rs b/crates/notify/src/lib.rs index cc514dbe..4181e4d0 100644 --- a/crates/notify/src/lib.rs +++ b/crates/notify/src/lib.rs @@ -23,6 +23,7 @@ mod event; pub mod factory; mod global; pub mod integration; +mod notification_system_subscriber; pub mod notifier; pub mod registry; pub mod rules; diff --git a/crates/notify/src/notification_system_subscriber.rs b/crates/notify/src/notification_system_subscriber.rs new file mode 100644 index 00000000..11014fb5 --- /dev/null +++ b/crates/notify/src/notification_system_subscriber.rs @@ -0,0 +1,74 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::BucketNotificationConfig; +use crate::rules::{BucketRulesSnapshot, DynRulesContainer, SubscriberIndex}; +use rustfs_targets::EventName; + +/// NotificationSystemSubscriberView - Provides an interface to manage and query +/// the subscription status of buckets in the notification system. +#[derive(Debug)] +pub struct NotificationSystemSubscriberView { + index: SubscriberIndex, +} + +impl NotificationSystemSubscriberView { + /// Creates a new NotificationSystemSubscriberView with an empty SubscriberIndex. + /// + /// Returns a new instance of NotificationSystemSubscriberView. + pub fn new() -> Self { + Self { + index: SubscriberIndex::default(), + } + } + + /// Checks if a bucket has any subscribers for a specific event. + /// This is a quick check using the event mask in the snapshot. + /// + /// # Arguments + /// * `bucket` - The name of the bucket to check. + /// * `event` - The event name to check for subscriptions. + /// + /// Returns `true` if there are subscribers for the event, `false` otherwise. + #[inline] + pub fn has_subscriber(&self, bucket: &str, event: &EventName) -> bool { + self.index.has_subscriber(bucket, event) + } + + /// Builds and atomically replaces a bucket's subscription snapshot from the configuration. + /// + /// Core principle: masks and rules are calculated and stored together in the same update. + /// + /// # Arguments + /// * `bucket` - The name of the bucket to update. + /// * `cfg` - The bucket notification configuration to compile into a snapshot. + pub fn apply_bucket_config(&self, bucket: &str, cfg: &BucketNotificationConfig) { + // *It is recommended to merge compile into one function to ensure the same origin. + let snapshot: BucketRulesSnapshot = cfg.compile_snapshot(); + + // *debug to prevent inconsistencies from being introduced when modifying the compile logic in the future. + snapshot.debug_assert_mask_consistent(); + + self.index.store_snapshot(bucket, snapshot); + } + + /// Clears a bucket's subscription snapshot. + /// + /// #Arguments + /// * `bucket` - The name of the bucket to clear. + #[inline] + pub fn clear_bucket(&self, bucket: &str) { + self.index.clear_bucket(bucket); + } +} diff --git a/crates/notify/src/notifier.rs b/crates/notify/src/notifier.rs index 10aa5767..78beda9c 100644 --- a/crates/notify/src/notifier.rs +++ b/crates/notify/src/notifier.rs @@ -14,19 +14,21 @@ use crate::{error::NotificationError, event::Event, rules::RulesMap}; use hashbrown::HashMap; +use rustfs_config::notify::{DEFAULT_NOTIFY_SEND_CONCURRENCY, ENV_NOTIFY_SEND_CONCURRENCY}; use rustfs_targets::EventName; use rustfs_targets::Target; use rustfs_targets::arn::TargetID; use rustfs_targets::target::EntityTarget; use starshard::AsyncShardedHashMap; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, Semaphore}; use tracing::{debug, error, info, instrument, warn}; /// Manages event notification to targets based on rules pub struct EventNotifier { target_list: Arc>, bucket_rules_map: Arc>, + send_limiter: Arc, } impl Default for EventNotifier { @@ -37,16 +39,41 @@ impl Default for EventNotifier { impl EventNotifier { /// Creates a new EventNotifier + /// + /// # Returns + /// Returns a new instance of EventNotifier. pub fn new() -> Self { + let max_inflight = rustfs_utils::get_env_usize(ENV_NOTIFY_SEND_CONCURRENCY, DEFAULT_NOTIFY_SEND_CONCURRENCY); EventNotifier { target_list: Arc::new(RwLock::new(TargetList::new())), bucket_rules_map: Arc::new(AsyncShardedHashMap::new(0)), + send_limiter: Arc::new(Semaphore::new(max_inflight)), } } + /// Checks whether a TargetID is still referenced by any bucket's rules. + /// + /// # Arguments + /// * `target_id` - The TargetID to check. + /// + /// # Returns + /// Returns `true` if the TargetID is bound to any bucket, otherwise `false`. + pub async fn is_target_bound_to_any_bucket(&self, target_id: &TargetID) -> bool { + // `AsyncShardedHashMap::iter()`: Traverse (bucket_name, rules_map) + let items = self.bucket_rules_map.iter().await; + for (_bucket, rules_map) in items { + if rules_map.contains_target_id(target_id) { + return true; + } + } + false + } + /// Returns a reference to the target list /// This method provides access to the target list for external use. /// + /// # Returns + /// Returns an `Arc>` representing the target list. pub fn target_list(&self) -> Arc> { Arc::clone(&self.target_list) } @@ -54,17 +81,23 @@ impl EventNotifier { /// Removes all notification rules for a bucket /// /// # Arguments - /// * `bucket_name` - The name of the bucket for which to remove rules + /// * `bucket` - The name of the bucket for which to remove rules /// /// This method removes all rules associated with the specified bucket name. /// It will log a message indicating the removal of rules. - pub async fn remove_rules_map(&self, bucket_name: &str) { - if self.bucket_rules_map.remove(&bucket_name.to_string()).await.is_some() { - info!("Removed all notification rules for bucket: {}", bucket_name); + pub async fn remove_rules_map(&self, bucket: &str) { + if self.bucket_rules_map.remove(&bucket.to_string()).await.is_some() { + info!("Removed all notification rules for bucket: {}", bucket); } } /// Returns a list of ARNs for the registered targets + /// + /// # Arguments + /// * `region` - The region to use for generating the ARNs + /// + /// # Returns + /// Returns a vector of strings representing the ARNs of the registered targets pub async fn get_arn_list(&self, region: &str) -> Vec { let target_list_guard = self.target_list.read().await; target_list_guard @@ -75,24 +108,37 @@ impl EventNotifier { } /// Adds a rules map for a bucket - pub async fn add_rules_map(&self, bucket_name: &str, rules_map: RulesMap) { + /// + /// # Arguments + /// * `bucket` - The name of the bucket for which to add the rules map + /// * `rules_map` - The rules map to add for the bucket + pub async fn add_rules_map(&self, bucket: &str, rules_map: RulesMap) { if rules_map.is_empty() { - self.bucket_rules_map.remove(&bucket_name.to_string()).await; + self.bucket_rules_map.remove(&bucket.to_string()).await; } else { - self.bucket_rules_map.insert(bucket_name.to_string(), rules_map).await; + self.bucket_rules_map.insert(bucket.to_string(), rules_map).await; } - info!("Added rules for bucket: {}", bucket_name); + info!("Added rules for bucket: {}", bucket); } /// Gets the rules map for a specific bucket. - pub async fn get_rules_map(&self, bucket_name: &str) -> Option { - self.bucket_rules_map.get(&bucket_name.to_string()).await + /// + /// # Arguments + /// * `bucket` - The name of the bucket for which to get the rules map + /// + /// # Returns + /// Returns `Some(RulesMap)` if rules exist for the bucket, otherwise returns `None`. + pub async fn get_rules_map(&self, bucket: &str) -> Option { + self.bucket_rules_map.get(&bucket.to_string()).await } /// Removes notification rules for a bucket - pub async fn remove_notification(&self, bucket_name: &str) { - self.bucket_rules_map.remove(&bucket_name.to_string()).await; - info!("Removed notification rules for bucket: {}", bucket_name); + /// + /// # Arguments + /// * `bucket` - The name of the bucket for which to remove notification rules + pub async fn remove_notification(&self, bucket: &str) { + self.bucket_rules_map.remove(&bucket.to_string()).await; + info!("Removed notification rules for bucket: {}", bucket); } /// Removes all targets @@ -125,69 +171,87 @@ impl EventNotifier { } /// Sends an event to the appropriate targets based on the bucket rules + /// + /// # Arguments + /// * `event` - The event to send #[instrument(skip_all)] pub async fn send(&self, event: Arc) { let bucket_name = &event.s3.bucket.name; let object_key = &event.s3.object.key; let event_name = event.event_name; - if let Some(rules) = self.bucket_rules_map.get(bucket_name).await { - let target_ids = rules.match_rules(event_name, object_key); - if target_ids.is_empty() { - debug!("No matching targets for event in bucket: {}", bucket_name); - return; - } - let target_ids_len = target_ids.len(); - let mut handles = vec![]; - // Use scope to limit the borrow scope of target_list - { - let target_list_guard = self.target_list.read().await; - info!("Sending event to targets: {:?}", target_ids); - for target_id in target_ids { - // `get` now returns Option> - if let Some(target_arc) = target_list_guard.get(&target_id) { - // Clone an Arc> (which is where target_list is stored) to move into an asynchronous task - // target_arc is already Arc, clone it for the async task - let cloned_target_for_task = target_arc.clone(); - let event_clone = event.clone(); - let target_name_for_task = cloned_target_for_task.name(); // Get the name before generating the task - debug!("Preparing to send event to target: {}", target_name_for_task); - // Use cloned data in closures to avoid borrowing conflicts - // Create an EntityTarget from the event - let entity_target: Arc> = Arc::new(EntityTarget { - object_name: object_key.to_string(), - bucket_name: bucket_name.to_string(), - event_name, - data: event_clone.clone().as_ref().clone(), - }); - let handle = tokio::spawn(async move { - if let Err(e) = cloned_target_for_task.save(entity_target.clone()).await { - error!("Failed to send event to target {}: {}", target_name_for_task, e); - } else { - debug!("Successfully saved event to target {}", target_name_for_task); - } - }); - handles.push(handle); - } else { - warn!("Target ID {:?} found in rules but not in target list.", target_id); - } - } - // target_list is automatically released here - } - - // Wait for all tasks to be completed - for handle in handles { - if let Err(e) = handle.await { - error!("Task for sending/saving event failed: {}", e); - } - } - info!("Event processing initiated for {} targets for bucket: {}", target_ids_len, bucket_name); - } else { + let Some(rules) = self.bucket_rules_map.get(bucket_name).await else { debug!("No rules found for bucket: {}", bucket_name); + return; + }; + + let target_ids = rules.match_rules(event_name, object_key); + if target_ids.is_empty() { + debug!("No matching targets for event in bucket: {}", bucket_name); + return; } + let target_ids_len = target_ids.len(); + let mut handles = vec![]; + + // Use scope to limit the borrow scope of target_list + let target_list_guard = self.target_list.read().await; + info!("Sending event to targets: {:?}", target_ids); + for target_id in target_ids { + // `get` now returns Option> + if let Some(target_arc) = target_list_guard.get(&target_id) { + // Clone an Arc> (which is where target_list is stored) to move into an asynchronous task + // target_arc is already Arc, clone it for the async task + let target_for_task = target_arc.clone(); + let limiter = self.send_limiter.clone(); + let event_clone = event.clone(); + let target_name_for_task = target_for_task.name(); // Get the name before generating the task + debug!("Preparing to send event to target: {}", target_name_for_task); + // Use cloned data in closures to avoid borrowing conflicts + // Create an EntityTarget from the event + let entity_target: Arc> = Arc::new(EntityTarget { + object_name: object_key.to_string(), + bucket_name: bucket_name.to_string(), + event_name, + data: event_clone.as_ref().clone(), + }); + let handle = tokio::spawn(async move { + let _permit = match limiter.acquire_owned().await { + Ok(p) => p, + Err(e) => { + error!("Failed to acquire send permit for target {}: {}", target_name_for_task, e); + return; + } + }; + if let Err(e) = target_for_task.save(entity_target.clone()).await { + error!("Failed to send event to target {}: {}", target_name_for_task, e); + } else { + debug!("Successfully saved event to target {}", target_name_for_task); + } + }); + handles.push(handle); + } else { + warn!("Target ID {:?} found in rules but not in target list.", target_id); + } + } + // target_list is automatically released here + drop(target_list_guard); + + // Wait for all tasks to be completed + for handle in handles { + if let Err(e) = handle.await { + error!("Task for sending/saving event failed: {}", e); + } + } + info!("Event processing initiated for {} targets for bucket: {}", target_ids_len, bucket_name); } /// Initializes the targets for buckets + /// + /// # Arguments + /// * `targets_to_init` - A vector of boxed targets to initialize + /// + /// # Returns + /// Returns `Ok(())` if initialization is successful, otherwise returns a `NotificationError`. #[instrument(skip(self, targets_to_init))] pub async fn init_bucket_targets( &self, @@ -218,6 +282,7 @@ impl EventNotifier { /// A thread-safe list of targets pub struct TargetList { + /// Map of TargetID to Target targets: HashMap + Send + Sync>>, } @@ -234,6 +299,12 @@ impl TargetList { } /// Adds a target to the list + /// + /// # Arguments + /// * `target` - The target to add + /// + /// # Returns + /// Returns `Ok(())` if the target was added successfully, or a `NotificationError` if an error occurred. pub fn add(&mut self, target: Arc + Send + Sync>) -> Result<(), NotificationError> { let id = target.id(); if self.targets.contains_key(&id) { @@ -251,6 +322,12 @@ impl TargetList { /// Removes a target by ID. Note: This does not stop its associated event stream. /// Stream cancellation should be handled by EventNotifier. + /// + /// # Arguments + /// * `id` - The ID of the target to remove + /// + /// # Returns + /// Returns the removed target if it existed, otherwise `None`. pub async fn remove_target_only(&mut self, id: &TargetID) -> Option + Send + Sync>> { if let Some(target_arc) = self.targets.remove(id) { if let Err(e) = target_arc.close().await { @@ -278,6 +355,12 @@ impl TargetList { } /// Returns a target by ID + /// + /// # Arguments + /// * `id` - The ID of the target to retrieve + /// + /// # Returns + /// Returns the target if it exists, otherwise `None`. pub fn get(&self, id: &TargetID) -> Option + Send + Sync>> { self.targets.get(id).cloned() } @@ -292,7 +375,7 @@ impl TargetList { self.targets.len() } - // is_empty can be derived from len() + /// is_empty can be derived from len() pub fn is_empty(&self) -> bool { self.targets.is_empty() } diff --git a/crates/notify/src/rules/config.rs b/crates/notify/src/rules/config.rs index 5be48e8d..607e6aa0 100644 --- a/crates/notify/src/rules/config.rs +++ b/crates/notify/src/rules/config.rs @@ -15,13 +15,60 @@ use super::rules_map::RulesMap; use super::xml_config::ParseConfigError as BucketNotificationConfigError; use crate::rules::NotificationConfiguration; -use crate::rules::pattern_rules; -use crate::rules::target_id_set; -use hashbrown::HashMap; +use crate::rules::subscriber_snapshot::{BucketRulesSnapshot, DynRulesContainer, RuleEvents, RulesContainer}; use rustfs_targets::EventName; use rustfs_targets::arn::TargetID; use serde::{Deserialize, Serialize}; use std::io::Read; +use std::sync::Arc; + +/// A "rule view", only used for snapshot mask/consistency verification. +/// Here we choose to generate the view by "single event" to ensure that event_mask calculation is reliable and simple. +#[derive(Debug)] +struct RuleView { + events: Vec, +} + +impl RuleEvents for RuleView { + fn subscribed_events(&self) -> &[EventName] { + &self.events + } +} + +/// Adapt RulesMap to RulesContainer. +/// Key point: The items returned by iter_rules are &dyn RuleEvents, so a RuleView list is cached in the container. +#[derive(Debug)] +struct CompiledRules { + // Keep RulesMap (can be used later if you want to make more complex judgments during the snapshot reading phase) + #[allow(dead_code)] + rules_map: RulesMap, + // for RulesContainer::iter_rules + rule_views: Vec, +} + +impl CompiledRules { + fn from_rules_map(rules_map: &RulesMap) -> Self { + let mut rule_views = Vec::new(); + + for ev in rules_map.iter_events() { + rule_views.push(RuleView { events: vec![ev] }); + } + + Self { + rules_map: rules_map.clone(), + rule_views, + } + } +} + +impl RulesContainer for CompiledRules { + type Rule = dyn RuleEvents; + + fn iter_rules<'a>(&'a self) -> Box + 'a> { + // Key: Convert &RuleView into &dyn RuleEvents + Box::new(self.rule_views.iter().map(|v| v as &dyn RuleEvents)) + } +} /// Configuration for bucket notifications. /// This struct now holds the parsed and validated rules in the new RulesMap format. @@ -119,11 +166,26 @@ impl BucketNotificationConfig { pub fn set_region(&mut self, region: &str) { self.region = region.to_string(); } -} -// Add a helper to PatternRules if not already present -impl pattern_rules::PatternRules { - pub fn inner(&self) -> &HashMap { - &self.rules + /// Compiles the current BucketNotificationConfig into a BucketRulesSnapshot. + /// This involves transforming the rules into a format suitable for runtime use, + /// and calculating the event mask based on the subscribed events of the rules. + /// + /// # Returns + /// A BucketRulesSnapshot containing the compiled rules and event mask. + pub fn compile_snapshot(&self) -> BucketRulesSnapshot { + // 1) Generate container from RulesMap + let compiled = CompiledRules::from_rules_map(self.get_rules_map()); + let rules: Arc = Arc::new(compiled) as Arc; + + // 2) Calculate event_mask + let mut mask = 0u64; + for rule in rules.iter_rules() { + for ev in rule.subscribed_events() { + mask |= ev.mask(); + } + } + + BucketRulesSnapshot { event_mask: mask, rules } } } diff --git a/crates/notify/src/rules/mod.rs b/crates/notify/src/rules/mod.rs index 69b141f4..b976ddd9 100644 --- a/crates/notify/src/rules/mod.rs +++ b/crates/notify/src/rules/mod.rs @@ -12,22 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod config; pub mod pattern; -pub mod pattern_rules; -pub mod rules_map; -pub mod target_id_set; +mod pattern_rules; +mod rules_map; +mod subscriber_index; +mod subscriber_snapshot; +mod target_id_set; pub mod xml_config; // For XML structure definition and parsing - -pub mod config; // Definition and parsing for BucketNotificationConfig +// Definition and parsing for BucketNotificationConfig // Re-export key types from submodules for easy access to `crate::rules::TypeName` // Re-export key types from submodules for external use pub use config::BucketNotificationConfig; // Assume that BucketNotificationConfigError is also defined in config.rs // Or if it is still an alias for xml_config::ParseConfigError , adjust accordingly -pub use xml_config::ParseConfigError as BucketNotificationConfigError; - pub use pattern_rules::PatternRules; pub use rules_map::RulesMap; +pub use subscriber_index::*; +pub use subscriber_snapshot::*; pub use target_id_set::TargetIdSet; -pub use xml_config::{NotificationConfiguration, ParseConfigError}; +pub use xml_config::{NotificationConfiguration, ParseConfigError, ParseConfigError as BucketNotificationConfigError}; diff --git a/crates/notify/src/rules/pattern_rules.rs b/crates/notify/src/rules/pattern_rules.rs index 20b0fe93..06b31f07 100644 --- a/crates/notify/src/rules/pattern_rules.rs +++ b/crates/notify/src/rules/pattern_rules.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::pattern; -use super::target_id_set::TargetIdSet; +use crate::rules::TargetIdSet; +use crate::rules::pattern; use hashbrown::HashMap; use rayon::prelude::*; use rustfs_targets::arn::TargetID; @@ -27,31 +27,69 @@ pub struct PatternRules { } impl PatternRules { + /// Create a new, empty PatternRules. pub fn new() -> Self { Default::default() } /// Add rules: Pattern and Target ID. /// If the schema already exists, add target_id to the existing TargetIdSet. + /// + /// # Arguments + /// * `pattern` - The object name pattern. + /// * `target_id` - The TargetID to associate with the pattern. pub fn add(&mut self, pattern: String, target_id: TargetID) { self.rules.entry(pattern).or_default().insert(target_id); } /// Checks if there are any rules that match the given object name. + /// + /// # Arguments + /// * `object_name` - The object name to match against the patterns. + /// + /// # Returns + /// `true` if any pattern matches the object name, otherwise `false`. pub fn match_simple(&self, object_name: &str) -> bool { self.rules.keys().any(|p| pattern::match_simple(p, object_name)) } /// Returns all TargetIDs that match the object name. + /// + /// Performance optimization points: + /// 1) Small collections are serialized directly to avoid rayon scheduling/merging overhead + /// 2) When hitting, no longer temporarily allocate TargetIdSet for each rule, but directly extend + /// + /// # Arguments + /// * `object_name` - The object name to match against the patterns. + /// + /// # Returns + /// A TargetIdSet containing all TargetIDs that match the object name. pub fn match_targets(&self, object_name: &str) -> TargetIdSet { + let n = self.rules.len(); + if n == 0 { + return TargetIdSet::new(); + } + + // Experience Threshold: Serial is usually faster below this value (can be adjusted after benchmarking) + const PAR_THRESHOLD: usize = 128; + + if n < PAR_THRESHOLD { + let mut out = TargetIdSet::new(); + for (pattern_str, target_set) in self.rules.iter() { + if pattern::match_simple(pattern_str, object_name) { + out.extend(target_set.iter().cloned()); + } + } + return out; + } + // Parallel path: Each thread accumulates a local set and finally merges it to reduce frequent allocations self.rules .par_iter() - .filter_map(|(pattern_str, target_set)| { + .fold(TargetIdSet::new, |mut local, (pattern_str, target_set)| { if pattern::match_simple(pattern_str, object_name) { - Some(target_set.iter().cloned().collect::()) - } else { - None + local.extend(target_set.iter().cloned()); } + local }) .reduce(TargetIdSet::new, |mut acc, set| { acc.extend(set); @@ -65,6 +103,11 @@ impl PatternRules { /// Merge another PatternRules. /// Corresponding to Go's `Rules.Union`. + /// # Arguments + /// * `other` - The PatternRules to merge with. + /// + /// # Returns + /// A new PatternRules containing the union of both. pub fn union(&self, other: &Self) -> Self { let mut new_rules = self.clone(); for (pattern, their_targets) in &other.rules { @@ -76,6 +119,13 @@ impl PatternRules { /// Calculate the difference from another PatternRules. /// Corresponding to Go's `Rules.Difference`. + /// The result contains only the patterns and TargetIDs that are in `self` but not in `other`. + /// + /// # Arguments + /// * `other` - The PatternRules to compare against. + /// + /// # Returns + /// A new PatternRules containing the difference. pub fn difference(&self, other: &Self) -> Self { let mut result_rules = HashMap::new(); for (pattern, self_targets) in &self.rules { @@ -94,4 +144,59 @@ impl PatternRules { } PatternRules { rules: result_rules } } + + /// Merge another PatternRules into self in place. + /// Corresponding to Go's `Rules.UnionInPlace`. + /// # Arguments + /// * `other` - The PatternRules to merge with. + pub fn union_in_place(&mut self, other: &Self) { + for (pattern, their_targets) in &other.rules { + self.rules + .entry(pattern.clone()) + .or_default() + .extend(their_targets.iter().cloned()); + } + } + + /// Calculate the difference from another PatternRules in place. + /// Corresponding to Go's `Rules.DifferenceInPlace`. + /// The result contains only the patterns and TargetIDs that are in `self` but not in `other`. + /// # Arguments + /// * `other` - The PatternRules to compare against. + pub fn difference_in_place(&mut self, other: &Self) { + self.rules.retain(|pattern, self_targets| { + if let Some(other_targets) = other.rules.get(pattern) { + // Remove other_targets from self_targets + self_targets.retain(|tid| !other_targets.contains(tid)); + } + !self_targets.is_empty() + }); + } + + /// Remove a pattern and its associated TargetID set from the PatternRules. + /// + /// # Arguments + /// * `pattern` - The pattern to remove. + pub fn remove_pattern(&mut self, pattern: &str) -> bool { + self.rules.remove(pattern).is_some() + } + + /// Determine whether the current PatternRules contains the specified TargetID (referenced by any pattern). + /// + /// # Parameters + /// * `target_id` - The TargetID to check for existence within the PatternRules + /// + /// # Returns + /// * `true` if the TargetID exists in any of the patterns; `false` otherwise. + pub fn contains_target_id(&self, target_id: &TargetID) -> bool { + self.rules.values().any(|set| set.contains(target_id)) + } + + /// Expose the internal rules for use in scenarios such as BucketNotificationConfig::validate. + /// + /// # Returns + /// A reference to the internal HashMap of patterns to TargetIdSets. + pub fn inner(&self) -> &HashMap { + &self.rules + } } diff --git a/crates/notify/src/rules/rules_map.rs b/crates/notify/src/rules/rules_map.rs index 59bb9c6c..c0f29675 100644 --- a/crates/notify/src/rules/rules_map.rs +++ b/crates/notify/src/rules/rules_map.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::pattern_rules::PatternRules; -use super::target_id_set::TargetIdSet; +use crate::rules::{PatternRules, TargetIdSet}; use hashbrown::HashMap; use rustfs_targets::EventName; use rustfs_targets::arn::TargetID; @@ -31,6 +30,9 @@ pub struct RulesMap { impl RulesMap { /// Create a new, empty RulesMap. + /// + /// # Returns + /// A new instance of RulesMap with an empty map and a total_events_mask set to 0. pub fn new() -> Self { Default::default() } @@ -67,12 +69,12 @@ impl RulesMap { /// Merge another RulesMap. /// `RulesMap.Add(rulesMap2 RulesMap) corresponding to Go + /// + /// # Parameters + /// * `other_map` - The other RulesMap to be merged into the current one. pub fn add_map(&mut self, other_map: &Self) { for (event_name, other_pattern_rules) in &other_map.map { - let self_pattern_rules = self.map.entry(*event_name).or_default(); - // PatternRules::union Returns the new PatternRules, we need to modify the existing ones - let merged_rules = self_pattern_rules.union(other_pattern_rules); - *self_pattern_rules = merged_rules; + self.map.entry(*event_name).or_default().union_in_place(other_pattern_rules); } // Directly merge two masks. self.total_events_mask |= other_map.total_events_mask; @@ -81,11 +83,14 @@ impl RulesMap { /// Remove another rule defined in the RulesMap from the current RulesMap. /// /// After the rule is removed, `total_events_mask` is recalculated to ensure its accuracy. + /// + /// # Parameters + /// * `other_map` - The other RulesMap containing rules to be removed from the current one. pub fn remove_map(&mut self, other_map: &Self) { let mut events_to_remove = Vec::new(); for (event_name, self_pattern_rules) in &mut self.map { if let Some(other_pattern_rules) = other_map.map.get(event_name) { - *self_pattern_rules = self_pattern_rules.difference(other_pattern_rules); + self_pattern_rules.difference_in_place(other_pattern_rules); if self_pattern_rules.is_empty() { events_to_remove.push(*event_name); } @@ -102,6 +107,9 @@ impl RulesMap { /// /// This method uses a bitmask for a quick check of O(1) complexity. /// `event_name` can be a compound type, such as `ObjectCreatedAll`. + /// + /// # Parameters + /// * `event_name` - The event name to check for subscribers. pub fn has_subscriber(&self, event_name: &EventName) -> bool { // event_name.mask() will handle compound events correctly (self.total_events_mask & event_name.mask()) != 0 @@ -112,39 +120,54 @@ impl RulesMap { /// # Notice /// The `event_name` parameter should be a specific, non-compound event type. /// Because this is taken from the `Event` object that actually occurs. + /// + /// # Parameters + /// * `event_name` - The specific event name to match against. + /// * `object_key` - The object key to match against the patterns in the rules. + /// + /// # Returns + /// * A set of TargetIDs that match the given event and object key. pub fn match_rules(&self, event_name: EventName, object_key: &str) -> TargetIdSet { // Use bitmask to quickly determine whether there is a matching rule if (self.total_events_mask & event_name.mask()) == 0 { return TargetIdSet::new(); // No matching rules } - // First try to directly match the event name - if let Some(pattern_rules) = self.map.get(&event_name) { - let targets = pattern_rules.match_targets(object_key); - if !targets.is_empty() { - return targets; - } - } - // Go's RulesMap[eventName] is directly retrieved, and if it does not exist, it is empty Rules. - // Rust's HashMap::get returns Option. If the event name does not exist, there is no rule. - // Compound events (such as ObjectCreatedAll) have been expanded as a single event when add_rule_config. - // Therefore, a single event name should be used when querying. - // If event_name itself is a single type, look it up directly. - // If event_name is a compound type, Go's logic is expanded when added. - // Here match_rules should receive events that may already be single. - // If the caller passes in a compound event, it should expand itself or handle this function first. - // Assume that event_name is already a specific event that can be used for searching. + // In Go, RulesMap[eventName] returns empty rules if the key doesn't exist. + // Rust's HashMap::get returns Option, so missing key means no rules. + // Compound events like ObjectCreatedAll are expanded into specific events during add_rule_config. + // Thus, queries should use specific event names. + // If event_name is compound, expansion happens at addition time. + // match_rules assumes event_name is already a specific event for lookup. + // Callers should expand compound events before calling this method. self.map .get(&event_name) .map_or_else(TargetIdSet::new, |pr| pr.match_targets(object_key)) } /// Check if RulesMap is empty. + /// + /// # Returns + /// * `true` if there are no rules in the map; `false` otherwise pub fn is_empty(&self) -> bool { self.map.is_empty() } + /// Determine whether the current RulesMap contains the specified TargetID (referenced by any event / pattern). + /// + /// # Parameters + /// * `target_id` - The TargetID to check for existence within the RulesMap + /// + /// # Returns + /// * `true` if the TargetID exists in any of the PatternRules; `false` otherwise. + pub fn contains_target_id(&self, target_id: &TargetID) -> bool { + self.map.values().any(|pr| pr.contains_target_id(target_id)) + } + /// Returns a clone of internal rules for use in scenarios such as BucketNotificationConfig::validate. + /// + /// # Returns + /// A reference to the internal HashMap of EventName to PatternRules. pub fn inner(&self) -> &HashMap { &self.map } @@ -160,18 +183,32 @@ impl RulesMap { } /// Remove rules and optimize performance + /// + /// # Parameters + /// * `event_name` - The EventName from which to remove the rule. + /// * `pattern` - The pattern of the rule to be removed. #[allow(dead_code)] pub fn remove_rule(&mut self, event_name: &EventName, pattern: &str) { + let mut remove_event = false; + if let Some(pattern_rules) = self.map.get_mut(event_name) { - pattern_rules.rules.remove(pattern); + pattern_rules.remove_pattern(pattern); if pattern_rules.is_empty() { - self.map.remove(event_name); + remove_event = true; } } + + if remove_event { + self.map.remove(event_name); + } + self.recalculate_mask(); // Delay calculation mask } - /// Batch Delete Rules + /// Batch Delete Rules and Optimize Performance + /// + /// # Parameters + /// * `event_names` - A slice of EventNames to be removed. #[allow(dead_code)] pub fn remove_rules(&mut self, event_names: &[EventName]) { for event_name in event_names { @@ -181,9 +218,27 @@ impl RulesMap { } /// Update rules and optimize performance + /// + /// # Parameters + /// * `event_name` - The EventName to update. + /// * `pattern` - The pattern of the rule to be updated. + /// * `target_id` - The TargetID to be added. #[allow(dead_code)] pub fn update_rule(&mut self, event_name: EventName, pattern: String, target_id: TargetID) { self.map.entry(event_name).or_default().add(pattern, target_id); self.total_events_mask |= event_name.mask(); // Update only the relevant bitmask } + + /// Iterate all EventName keys contained in this RulesMap. + /// + /// Used by snapshot compilation to compute bucket event_mask. + /// + /// # Returns + /// An iterator over all EventName keys in the RulesMap. + #[inline] + pub fn iter_events(&self) -> impl Iterator + '_ { + // `inner()` is already used by config.rs, so we reuse it here. + // If the key type is `EventName`, `.copied()` is the cheapest way to return values. + self.inner().keys().copied() + } } diff --git a/crates/notify/src/rules/subscriber_index.rs b/crates/notify/src/rules/subscriber_index.rs new file mode 100644 index 00000000..205bc58a --- /dev/null +++ b/crates/notify/src/rules/subscriber_index.rs @@ -0,0 +1,131 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::rules::{BucketRulesSnapshot, BucketSnapshotRef, DynRulesContainer}; +use arc_swap::ArcSwap; +use rustfs_targets::EventName; +use starshard::ShardedHashMap; +use std::fmt; +use std::sync::Arc; + +/// A global bucket -> snapshot index. +/// +/// Read path: lock-free load (ArcSwap) +/// Write path: atomic replacement after building a new snapshot +pub struct SubscriberIndex { + // Use starshard for sharding to reduce lock competition when the number of buckets is large + inner: ShardedHashMap>>>, + // Cache an "empty rule container" for empty snapshots (avoids building every time) + empty_rules: Arc, +} + +/// Avoid deriving fields that do not support Debug +impl fmt::Debug for SubscriberIndex { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SubscriberIndex").finish_non_exhaustive() + } +} + +impl SubscriberIndex { + /// Create a new SubscriberIndex. + /// + /// # Arguments + /// * `empty_rules` - An Arc to an empty rules container used for empty snapshots + /// + /// Returns a new instance of SubscriberIndex. + pub fn new(empty_rules: Arc) -> Self { + Self { + inner: ShardedHashMap::new(64), + empty_rules, + } + } + + /// Get the current snapshot of a bucket. + /// If it does not exist, return empty snapshot. + /// + /// # Arguments + /// * `bucket` - The name of the bucket to load. + /// + /// Returns the snapshot reference for the specified bucket. + pub fn load_snapshot(&self, bucket: &str) -> BucketSnapshotRef { + match self.inner.get(&bucket.to_string()) { + Some(cell) => cell.load_full(), + None => Arc::new(BucketRulesSnapshot::empty(self.empty_rules.clone())), + } + } + + /// Quickly determine whether the bucket has a subscription to an event. + /// This judgment can be consistent with subsequent rule matching when reading the same snapshot. + /// + /// # Arguments + /// * `bucket` - The name of the bucket to check. + /// * `event` - The event name to check for subscriptions. + /// + /// Returns `true` if there are subscribers for the event, `false` otherwise. + #[inline] + pub fn has_subscriber(&self, bucket: &str, event: &EventName) -> bool { + let snap = self.load_snapshot(bucket); + if snap.event_mask == 0 { + return false; + } + snap.has_event(event) + } + + /// Atomically update a bucket's snapshot (whole package replacement). + /// + /// - The caller first builds the complete `BucketRulesSnapshot` (including event\_mask and rules). + /// - This method ensures that the read path will not observe intermediate states. + /// + /// # Arguments + /// * `bucket` - The name of the bucket to update. + /// * `new_snapshot` - The new snapshot to store for the bucket. + pub fn store_snapshot(&self, bucket: &str, new_snapshot: BucketRulesSnapshot) { + let key = bucket.to_string(); + + let cell = self.inner.get(&key).unwrap_or_else(|| { + // Insert a default cell (empty snapshot) + let init = Arc::new(ArcSwap::from_pointee(BucketRulesSnapshot::empty(self.empty_rules.clone()))); + self.inner.insert(key.clone(), init.clone()); + init + }); + + cell.store(Arc::new(new_snapshot)); + } + + /// Delete the bucket's subscription view (make it empty). + /// + /// # Arguments + /// * `bucket` - The name of the bucket to clear. + pub fn clear_bucket(&self, bucket: &str) { + if let Some(cell) = self.inner.get(&bucket.to_string()) { + cell.store(Arc::new(BucketRulesSnapshot::empty(self.empty_rules.clone()))); + } + } +} + +impl Default for SubscriberIndex { + fn default() -> Self { + // An available empty rule container is required; here it is implemented using minimal empty + #[derive(Debug)] + struct EmptyRules; + impl crate::rules::subscriber_snapshot::RulesContainer for EmptyRules { + type Rule = dyn crate::rules::subscriber_snapshot::RuleEvents; + fn iter_rules<'a>(&'a self) -> Box + 'a> { + Box::new(std::iter::empty()) + } + } + + Self::new(Arc::new(EmptyRules) as Arc) + } +} diff --git a/crates/notify/src/rules/subscriber_snapshot.rs b/crates/notify/src/rules/subscriber_snapshot.rs new file mode 100644 index 00000000..4eed5d28 --- /dev/null +++ b/crates/notify/src/rules/subscriber_snapshot.rs @@ -0,0 +1,117 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use rustfs_targets::EventName; +use std::sync::Arc; + +/// Let the rules structure provide "what events it is subscribed to". +/// This way BucketRulesSnapshot does not need to know the internal shape of rules. +pub trait RuleEvents { + fn subscribed_events(&self) -> &[EventName]; +} + +/// Let the rules container provide the ability to iterate over all rules (abstracting only to the minimum necessary). +pub trait RulesContainer { + type Rule: RuleEvents + ?Sized; + fn iter_rules<'a>(&'a self) -> Box + 'a>; + + /// Fast empty judgment for snapshots (fix missing `rules.is_empty()`) + fn is_empty(&self) -> bool { + self.iter_rules().next().is_none() + } +} + +/// Represents a bucket's notification subscription view snapshot (immutable). +/// +/// - `event_mask`: Quickly determine whether there is a subscription to a certain type of event (bitset/flags). +/// - `rules`: precise rule mapping (prefix/suffix/pattern -> targets). +/// +/// The read path only reads this snapshot to ensure consistency. +#[derive(Debug, Clone)] +pub struct BucketRulesSnapshot +where + R: RulesContainer + ?Sized, +{ + pub event_mask: u64, + pub rules: Arc, +} + +impl BucketRulesSnapshot +where + R: RulesContainer + ?Sized, +{ + /// Create an empty snapshot with no subscribed events and no rules. + /// + /// # Arguments + /// * `rules` - An Arc to a rules container (can be an empty container). + /// + /// # Returns + /// An instance of `BucketRulesSnapshot` with an empty event mask. + #[inline] + pub fn empty(rules: Arc) -> Self { + Self { event_mask: 0, rules } + } + + /// Check if the snapshot has any subscribers for the specified event. + /// + /// # Arguments + /// * `event` - The event name to check for subscriptions. + /// + /// # Returns + /// `true` if there are subscribers for the event, `false` otherwise. + #[inline] + pub fn has_event(&self, event: &EventName) -> bool { + (self.event_mask & event.mask()) != 0 + } + + /// Check if the snapshot is empty (no subscribed events or rules). + /// + /// # Returns + /// `true` if the snapshot is empty, `false` otherwise. + #[inline] + pub fn is_empty(&self) -> bool { + self.event_mask == 0 || self.rules.is_empty() + } + + /// [debug] Assert that `event_mask` is consistent with the event declared in `rules`. + /// + /// Constraints: + /// - only runs in debug builds (release incurs no cost). + /// - If the rule contains compound events (\*All / Everything), rely on `EventName::mask()` to automatically expand. + #[inline] + pub fn debug_assert_mask_consistent(&self) { + #[cfg(debug_assertions)] + { + let mut recomputed = 0u64; + for rule in self.rules.iter_rules() { + for ev in rule.subscribed_events() { + recomputed |= ev.mask(); + } + } + + debug_assert!( + recomputed == self.event_mask, + "BucketRulesSnapshot.event_mask inconsistent: stored={:#x}, recomputed={:#x}", + self.event_mask, + recomputed + ); + } + } +} + +/// Unify trait-object snapshot types (fix Sized / missing generic arguments) +pub type DynRulesContainer = dyn RulesContainer + Send + Sync; + +/// Expose Arc form to facilitate sharing. +pub type BucketSnapshotRef = Arc>; diff --git a/crates/notify/src/rules/xml_config.rs b/crates/notify/src/rules/xml_config.rs index 134f0db2..698167d6 100644 --- a/crates/notify/src/rules/xml_config.rs +++ b/crates/notify/src/rules/xml_config.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::pattern; +use crate::rules::pattern; use hashbrown::HashSet; use rustfs_targets::EventName; use rustfs_targets::arn::{ARN, ArnError, TargetIDError}; diff --git a/crates/notify/src/stream.rs b/crates/notify/src/stream.rs index 9b37c13b..8c70d3c2 100644 --- a/crates/notify/src/stream.rs +++ b/crates/notify/src/stream.rs @@ -13,18 +13,23 @@ // limitations under the License. use crate::{Event, integration::NotificationMetrics}; -use rustfs_targets::StoreError; -use rustfs_targets::Target; -use rustfs_targets::TargetError; -use rustfs_targets::store::{Key, Store}; -use rustfs_targets::target::EntityTarget; +use rustfs_targets::{ + StoreError, Target, TargetError, + store::{Key, Store}, + target::EntityTarget, +}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{Semaphore, mpsc}; use tokio::time::sleep; use tracing::{debug, error, info, warn}; -/// Streams events from the store to the target +/// Streams events from the store to the target with retry logic +/// +/// # Arguments +/// - `store`: The event store +/// - `target`: The target to send events to +/// - `cancel_rx`: Receiver to listen for cancellation signals pub async fn stream_events( store: &mut (dyn Store + Send), target: &dyn Target, @@ -67,6 +72,7 @@ pub async fn stream_events( match target.send_from_store(key.clone()).await { Ok(_) => { info!("Successfully sent event for target: {}", target.name()); + // send_from_store deletes the event from store on success success = true; } Err(e) => { @@ -104,6 +110,13 @@ pub async fn stream_events( } /// Starts the event streaming process for a target +/// +/// # Arguments +/// - `store`: The event store +/// - `target`: The target to send events to +/// +/// # Returns +/// A sender to signal cancellation of the event stream pub fn start_event_stream( mut store: Box + Send>, target: Arc + Send + Sync>, @@ -119,6 +132,15 @@ pub fn start_event_stream( } /// Start event stream with batch processing +/// +/// # Arguments +/// - `store`: The event store +/// - `target`: The target to send events to clients +/// - `metrics`: Metrics for monitoring +/// - `semaphore`: Semaphore to limit concurrency +/// +/// # Returns +/// A sender to signal cancellation of the event stream pub fn start_event_stream_with_batching( mut store: Box, Error = StoreError, Key = Key> + Send>, target: Arc + Send + Sync>, @@ -136,6 +158,16 @@ pub fn start_event_stream_with_batching( } /// Event stream processing with batch processing +/// +/// # Arguments +/// - `store`: The event store +/// - `target`: The target to send events to clients +/// - `cancel_rx`: Receiver to listen for cancellation signals +/// - `metrics`: Metrics for monitoring +/// - `semaphore`: Semaphore to limit concurrency +/// +/// # Notes +/// This function processes events in batches to improve efficiency. pub async fn stream_events_with_batching( store: &mut (dyn Store, Error = StoreError, Key = Key> + Send), target: &dyn Target, @@ -231,7 +263,17 @@ pub async fn stream_events_with_batching( } } -/// Processing event batches +/// Processing event batches for targets +/// # Arguments +/// - `batch`: The batch of events to process +/// - `batch_keys`: The corresponding keys of the events in the batch +/// - `target`: The target to send events to clients +/// - `max_retries`: Maximum number of retries for sending an event +/// - `base_delay`: Base delay duration for retries +/// - `metrics`: Metrics for monitoring +/// - `semaphore`: Semaphore to limit concurrency +/// # Notes +/// This function processes a batch of events, sending each event to the target with retry async fn process_batch( batch: &mut Vec>, batch_keys: &mut Vec, @@ -262,6 +304,7 @@ async fn process_batch( // Retry logic while retry_count < max_retries && !success { + // After sending successfully, the event in the storage is deleted synchronously. match target.send_from_store(key.clone()).await { Ok(_) => { info!("Successfully sent event for target: {}, Key: {}", target.name(), key.to_string()); diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index e2c5baf7..2aa2642c 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -39,9 +39,9 @@ use rustfs_config::{ ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FLUSH_MS, ENV_OBS_LOG_MESSAGE_CAPA, ENV_OBS_LOG_POOL_CAPA, }, }; -use rustfs_utils::{get_env_u64, get_env_usize, get_local_ip_with_default}; +use rustfs_utils::{get_env_opt_str, get_env_u64, get_env_usize, get_local_ip_with_default}; use smallvec::SmallVec; -use std::{borrow::Cow, env, fs, io::IsTerminal, time::Duration}; +use std::{borrow::Cow, fs, io::IsTerminal, time::Duration}; use tracing::info; use tracing_error::ErrorLayer; use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; @@ -574,8 +574,8 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> Result s, + Err(e) => { + warn!("Failed to create socket for {:?}: {}, falling back to IPv4", server_addr, e); + let ipv4_addr = SocketAddr::new(std::net::Ipv4Addr::UNSPECIFIED.into(), server_addr.port()); + server_addr = ipv4_addr; + socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP))? + } + }; + // If address is IPv6 try to enable dual-stack; on failure, switch to IPv4 socket. if server_addr.is_ipv6() { if let Err(e) = socket.set_only_v6(false) { - warn!("Failed to set IPV6_V6ONLY=false, falling back to IPv4-only: {}", e); - // Fallback to a new IPv4 socket if setting dual-stack fails. + warn!("Failed to set IPV6_V6ONLY=false, attempting IPv4 fallback: {}", e); let ipv4_addr = SocketAddr::new(std::net::Ipv4Addr::UNSPECIFIED.into(), server_addr.port()); server_addr = ipv4_addr; socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; @@ -140,8 +150,27 @@ pub async fn start_http_server( socket.set_reuse_address(true)?; // Set the socket to non-blocking before passing it to Tokio. socket.set_nonblocking(true)?; - socket.bind(&server_addr.into())?; - socket.listen(backlog)?; + + // Attempt bind; if bind fails for IPv6, try IPv4 fallback once more. + if let Err(bind_err) = socket.bind(&server_addr.into()) { + warn!("Failed to bind to {}: {}.", server_addr, bind_err); + if server_addr.is_ipv6() { + // Try IPv4 fallback + let ipv4_addr = SocketAddr::new(std::net::Ipv4Addr::UNSPECIFIED.into(), server_addr.port()); + server_addr = ipv4_addr; + socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; + socket.set_reuse_address(true)?; + socket.set_nonblocking(true)?; + socket.bind(&server_addr.into())?; + // [FIX] Ensure fallback socket is moved to listening state as well. + socket.listen(backlog)?; + } else { + return Err(bind_err); + } + } else { + // Listen on the socket when initial bind succeeded + socket.listen(backlog)?; + } TcpListener::from_std(socket.into())? };