Fix notification event stream cleanup, add bounded send concurrency, and reduce overhead (#1224)

This commit is contained in:
houseme
2025-12-22 00:57:05 +08:00
committed by GitHub
parent 1c51e204ab
commit 08f1a31f3f
20 changed files with 921 additions and 169 deletions

View File

@@ -104,7 +104,7 @@ jobs:
name: Test and Lint
needs: skip-check
if: needs.skip-check.outputs.should_skip != 'true'
runs-on: ubicloud-standard-2
runs-on: ubicloud-standard-4
timeout-minutes: 60
steps:
- name: Checkout repository

1
Cargo.lock generated
View File

@@ -7437,6 +7437,7 @@ dependencies = [
name = "rustfs-notify"
version = "0.0.5"
dependencies = [
"arc-swap",
"async-trait",
"axum",
"chrono",

View File

@@ -60,8 +60,9 @@ impl TargetFactory for WebhookTargetFactory {
let endpoint = config
.lookup(WEBHOOK_ENDPOINT)
.ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?;
let endpoint_url = Url::parse(&endpoint)
.map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {e} (value: '{endpoint}')")))?;
let parsed_endpoint = endpoint.trim();
let endpoint_url = Url::parse(parsed_endpoint)
.map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {e} (value: '{parsed_endpoint}')")))?;
let args = WebhookArgs {
enable: true, // If we are here, it's already enabled.

View File

@@ -51,6 +51,18 @@ pub const ENV_NOTIFY_TARGET_STREAM_CONCURRENCY: &str = "RUSTFS_NOTIFY_TARGET_STR
/// Adjust this value based on your system's capabilities and expected load.
pub const DEFAULT_NOTIFY_TARGET_STREAM_CONCURRENCY: usize = 20;
/// Name of the environment variable that configures send concurrency.
/// Controls how many send operations are processed in parallel by the notification system.
/// Defaults to [`DEFAULT_NOTIFY_SEND_CONCURRENCY`] if not set.
/// Example: `RUSTFS_NOTIFY_SEND_CONCURRENCY=64`.
pub const ENV_NOTIFY_SEND_CONCURRENCY: &str = "RUSTFS_NOTIFY_SEND_CONCURRENCY";
/// Default concurrency for send operations in the notification system
/// This value is used if the environment variable `RUSTFS_NOTIFY_SEND_CONCURRENCY` is not set.
/// It defines how many send operations can be processed in parallel by the notification system at any given time.
/// Adjust this value based on your system's capabilities and expected load.
pub const DEFAULT_NOTIFY_SEND_CONCURRENCY: usize = 64;
#[allow(dead_code)]
pub const NOTIFY_SUB_SYSTEMS: &[&str] = &[NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS];

View File

@@ -30,6 +30,7 @@ rustfs-config = { workspace = true, features = ["notify", "constants"] }
rustfs-ecstore = { workspace = true }
rustfs-targets = { workspace = true }
rustfs-utils = { workspace = true }
arc-swap = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
futures = { workspace = true }

View File

@@ -60,8 +60,9 @@ impl TargetFactory for WebhookTargetFactory {
let endpoint = config
.lookup(WEBHOOK_ENDPOINT)
.ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?;
let endpoint_url = Url::parse(&endpoint)
.map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {e} (value: '{endpoint}')")))?;
let parsed_endpoint = endpoint.trim();
let endpoint_url = Url::parse(parsed_endpoint)
.map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {e} (value: '{parsed_endpoint}')")))?;
let args = WebhookArgs {
enable: true, // If we are here, it's already enabled.

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::notification_system_subscriber::NotificationSystemSubscriberView;
use crate::{
Event, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream,
};
@@ -104,6 +105,8 @@ pub struct NotificationSystem {
concurrency_limiter: Arc<Semaphore>,
/// Monitoring indicators
metrics: Arc<NotificationMetrics>,
/// Subscriber view
subscriber_view: NotificationSystemSubscriberView,
}
impl NotificationSystem {
@@ -112,6 +115,7 @@ impl NotificationSystem {
let concurrency_limiter =
rustfs_utils::get_env_usize(ENV_NOTIFY_TARGET_STREAM_CONCURRENCY, DEFAULT_NOTIFY_TARGET_STREAM_CONCURRENCY);
NotificationSystem {
subscriber_view: NotificationSystemSubscriberView::new(),
notifier: Arc::new(EventNotifier::new()),
registry: Arc::new(TargetRegistry::new()),
config: Arc::new(RwLock::new(config)),
@@ -188,8 +192,11 @@ impl NotificationSystem {
}
/// Checks if there are active subscribers for the given bucket and event name.
pub async fn has_subscriber(&self, bucket: &str, event_name: &EventName) -> bool {
self.notifier.has_subscriber(bucket, event_name).await
pub async fn has_subscriber(&self, bucket: &str, event: &EventName) -> bool {
if !self.subscriber_view.has_subscriber(bucket, event) {
return false;
}
self.notifier.has_subscriber(bucket, event).await
}
async fn update_config_and_reload<F>(&self, mut modifier: F) -> Result<(), NotificationError>
@@ -236,15 +243,18 @@ impl NotificationSystem {
pub async fn remove_target(&self, target_id: &TargetID, target_type: &str) -> Result<(), NotificationError> {
info!("Attempting to remove target: {}", target_id);
let ttype = target_type.to_lowercase();
let tname = target_id.name.to_lowercase();
self.update_config_and_reload(|config| {
let mut changed = false;
if let Some(targets_of_type) = config.0.get_mut(target_type) {
if targets_of_type.remove(&target_id.name).is_some() {
if let Some(targets_of_type) = config.0.get_mut(&ttype) {
if targets_of_type.remove(&tname).is_some() {
info!("Removed target {} from configuration", target_id);
changed = true;
}
if targets_of_type.is_empty() {
config.0.remove(target_type);
config.0.remove(&ttype);
}
}
if !changed {
@@ -269,20 +279,24 @@ impl NotificationSystem {
/// If the target configuration is invalid, it returns Err(NotificationError::Configuration).
pub async fn set_target_config(&self, target_type: &str, target_name: &str, kvs: KVS) -> Result<(), NotificationError> {
info!("Setting config for target {} of type {}", target_name, target_type);
let ttype = target_type.to_lowercase();
let tname = target_name.to_lowercase();
self.update_config_and_reload(|config| {
config
.0
.entry(target_type.to_lowercase())
.or_default()
.insert(target_name.to_lowercase(), kvs.clone());
config.0.entry(ttype.clone()).or_default().insert(tname.clone(), kvs.clone());
true // The configuration is always modified
})
.await
}
/// Removes all notification configurations for a bucket.
pub async fn remove_bucket_notification_config(&self, bucket_name: &str) {
self.notifier.remove_rules_map(bucket_name).await;
/// If the configuration is successfully removed, the entire notification system will be automatically reloaded.
///
/// # Arguments
/// * `bucket` - The name of the bucket whose notification configuration is to be removed.
///
pub async fn remove_bucket_notification_config(&self, bucket: &str) {
self.subscriber_view.clear_bucket(bucket);
self.notifier.remove_rules_map(bucket).await;
}
/// Removes a Target configuration.
@@ -299,11 +313,28 @@ impl NotificationSystem {
/// If the target configuration does not exist, it returns Ok(()) without making any changes.
pub async fn remove_target_config(&self, target_type: &str, target_name: &str) -> Result<(), NotificationError> {
info!("Removing config for target {} of type {}", target_name, target_type);
let ttype = target_type.to_lowercase();
let tname = target_name.to_lowercase();
let target_id = TargetID {
id: tname.clone(),
name: ttype.clone(),
};
// Deletion is prohibited if bucket rules refer to it
if self.notifier.is_target_bound_to_any_bucket(&target_id).await {
return Err(NotificationError::Configuration(format!(
"Target is still bound to bucket rules and deletion is prohibited: type={} name={}",
ttype, tname
)));
}
let config_result = self
.update_config_and_reload(|config| {
let mut changed = false;
if let Some(targets) = config.0.get_mut(&target_type.to_lowercase()) {
if targets.remove(&target_name.to_lowercase()).is_some() {
if let Some(targets) = config.0.get_mut(&ttype) {
if targets.remove(&tname).is_some() {
changed = true;
}
if targets.is_empty() {
@@ -319,8 +350,6 @@ impl NotificationSystem {
.await;
if config_result.is_ok() {
let target_id = TargetID::new(target_name.to_string(), target_type.to_string());
// Remove from target list
let target_list = self.notifier.target_list();
let mut target_list_guard = target_list.write().await;
@@ -358,6 +387,9 @@ impl NotificationSystem {
let _ = cancel_tx.send(()).await;
}
// Clear the target_list and ensure that reload is a replacement reconstruction (solve the target_list len unchanged/residual problem)
self.notifier.remove_all_bucket_targets().await;
// Update the config
self.update_config(new_config.clone()).await;
@@ -388,15 +420,16 @@ impl NotificationSystem {
// The storage of the cloned target and the target itself
let store_clone = store.boxed_clone();
let target_box = target.clone_dyn();
let target_arc = Arc::from(target_box);
// Add a reference to the monitoring metrics
let metrics = self.metrics.clone();
let semaphore = self.concurrency_limiter.clone();
// let target_box = target.clone_dyn();
let target_arc = Arc::from(target.clone_dyn());
// Encapsulated enhanced version of start_event_stream
let cancel_tx = self.enhanced_start_event_stream(store_clone, target_arc, metrics, semaphore);
let cancel_tx = self.enhanced_start_event_stream(
store_clone,
target_arc,
self.metrics.clone(),
self.concurrency_limiter.clone(),
);
// Start event stream processing and save cancel sender
// let cancel_tx = start_event_stream(store_clone, target_clone);
@@ -423,17 +456,18 @@ impl NotificationSystem {
/// Loads the bucket notification configuration
pub async fn load_bucket_notification_config(
&self,
bucket_name: &str,
config: &BucketNotificationConfig,
bucket: &str,
cfg: &BucketNotificationConfig,
) -> Result<(), NotificationError> {
let arn_list = self.notifier.get_arn_list(&config.region).await;
self.subscriber_view.apply_bucket_config(bucket, cfg);
let arn_list = self.notifier.get_arn_list(&cfg.region).await;
if arn_list.is_empty() {
return Err(NotificationError::Configuration("No targets configured".to_string()));
}
info!("Available ARNs: {:?}", arn_list);
// Validate the configuration against the available ARNs
if let Err(e) = config.validate(&config.region, &arn_list) {
debug!("Bucket notification config validation region:{} failed: {}", &config.region, e);
if let Err(e) = cfg.validate(&cfg.region, &arn_list) {
debug!("Bucket notification config validation region:{} failed: {}", &cfg.region, e);
if !e.to_string().contains("ARN not found") {
return Err(NotificationError::BucketNotification(e.to_string()));
} else {
@@ -441,9 +475,9 @@ impl NotificationSystem {
}
}
let rules_map = config.get_rules_map();
self.notifier.add_rules_map(bucket_name, rules_map.clone()).await;
info!("Loaded notification config for bucket: {}", bucket_name);
let rules_map = cfg.get_rules_map();
self.notifier.add_rules_map(bucket, rules_map.clone()).await;
info!("Loaded notification config for bucket: {}", bucket);
Ok(())
}

View File

@@ -23,6 +23,7 @@ mod event;
pub mod factory;
mod global;
pub mod integration;
mod notification_system_subscriber;
pub mod notifier;
pub mod registry;
pub mod rules;

View File

@@ -0,0 +1,74 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::BucketNotificationConfig;
use crate::rules::{BucketRulesSnapshot, DynRulesContainer, SubscriberIndex};
use rustfs_targets::EventName;
/// NotificationSystemSubscriberView - Provides an interface to manage and query
/// the subscription status of buckets in the notification system.
#[derive(Debug)]
pub struct NotificationSystemSubscriberView {
index: SubscriberIndex,
}
impl NotificationSystemSubscriberView {
/// Creates a new NotificationSystemSubscriberView with an empty SubscriberIndex.
///
/// Returns a new instance of NotificationSystemSubscriberView.
pub fn new() -> Self {
Self {
index: SubscriberIndex::default(),
}
}
/// Checks if a bucket has any subscribers for a specific event.
/// This is a quick check using the event mask in the snapshot.
///
/// # Arguments
/// * `bucket` - The name of the bucket to check.
/// * `event` - The event name to check for subscriptions.
///
/// Returns `true` if there are subscribers for the event, `false` otherwise.
#[inline]
pub fn has_subscriber(&self, bucket: &str, event: &EventName) -> bool {
self.index.has_subscriber(bucket, event)
}
/// Builds and atomically replaces a bucket's subscription snapshot from the configuration.
///
/// Core principle: masks and rules are calculated and stored together in the same update.
///
/// # Arguments
/// * `bucket` - The name of the bucket to update.
/// * `cfg` - The bucket notification configuration to compile into a snapshot.
pub fn apply_bucket_config(&self, bucket: &str, cfg: &BucketNotificationConfig) {
// *It is recommended to merge compile into one function to ensure the same origin.
let snapshot: BucketRulesSnapshot<DynRulesContainer> = cfg.compile_snapshot();
// *debug to prevent inconsistencies from being introduced when modifying the compile logic in the future.
snapshot.debug_assert_mask_consistent();
self.index.store_snapshot(bucket, snapshot);
}
/// Clears a bucket's subscription snapshot.
///
/// #Arguments
/// * `bucket` - The name of the bucket to clear.
#[inline]
pub fn clear_bucket(&self, bucket: &str) {
self.index.clear_bucket(bucket);
}
}

View File

@@ -14,19 +14,21 @@
use crate::{error::NotificationError, event::Event, rules::RulesMap};
use hashbrown::HashMap;
use rustfs_config::notify::{DEFAULT_NOTIFY_SEND_CONCURRENCY, ENV_NOTIFY_SEND_CONCURRENCY};
use rustfs_targets::EventName;
use rustfs_targets::Target;
use rustfs_targets::arn::TargetID;
use rustfs_targets::target::EntityTarget;
use starshard::AsyncShardedHashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::sync::{RwLock, Semaphore};
use tracing::{debug, error, info, instrument, warn};
/// Manages event notification to targets based on rules
pub struct EventNotifier {
target_list: Arc<RwLock<TargetList>>,
bucket_rules_map: Arc<AsyncShardedHashMap<String, RulesMap, rustc_hash::FxBuildHasher>>,
send_limiter: Arc<Semaphore>,
}
impl Default for EventNotifier {
@@ -37,16 +39,41 @@ impl Default for EventNotifier {
impl EventNotifier {
/// Creates a new EventNotifier
///
/// # Returns
/// Returns a new instance of EventNotifier.
pub fn new() -> Self {
let max_inflight = rustfs_utils::get_env_usize(ENV_NOTIFY_SEND_CONCURRENCY, DEFAULT_NOTIFY_SEND_CONCURRENCY);
EventNotifier {
target_list: Arc::new(RwLock::new(TargetList::new())),
bucket_rules_map: Arc::new(AsyncShardedHashMap::new(0)),
send_limiter: Arc::new(Semaphore::new(max_inflight)),
}
}
/// Checks whether a TargetID is still referenced by any bucket's rules.
///
/// # Arguments
/// * `target_id` - The TargetID to check.
///
/// # Returns
/// Returns `true` if the TargetID is bound to any bucket, otherwise `false`.
pub async fn is_target_bound_to_any_bucket(&self, target_id: &TargetID) -> bool {
// `AsyncShardedHashMap::iter()`: Traverse (bucket_name, rules_map)
let items = self.bucket_rules_map.iter().await;
for (_bucket, rules_map) in items {
if rules_map.contains_target_id(target_id) {
return true;
}
}
false
}
/// Returns a reference to the target list
/// This method provides access to the target list for external use.
///
/// # Returns
/// Returns an `Arc<RwLock<TargetList>>` representing the target list.
pub fn target_list(&self) -> Arc<RwLock<TargetList>> {
Arc::clone(&self.target_list)
}
@@ -54,17 +81,23 @@ impl EventNotifier {
/// Removes all notification rules for a bucket
///
/// # Arguments
/// * `bucket_name` - The name of the bucket for which to remove rules
/// * `bucket` - The name of the bucket for which to remove rules
///
/// This method removes all rules associated with the specified bucket name.
/// It will log a message indicating the removal of rules.
pub async fn remove_rules_map(&self, bucket_name: &str) {
if self.bucket_rules_map.remove(&bucket_name.to_string()).await.is_some() {
info!("Removed all notification rules for bucket: {}", bucket_name);
pub async fn remove_rules_map(&self, bucket: &str) {
if self.bucket_rules_map.remove(&bucket.to_string()).await.is_some() {
info!("Removed all notification rules for bucket: {}", bucket);
}
}
/// Returns a list of ARNs for the registered targets
///
/// # Arguments
/// * `region` - The region to use for generating the ARNs
///
/// # Returns
/// Returns a vector of strings representing the ARNs of the registered targets
pub async fn get_arn_list(&self, region: &str) -> Vec<String> {
let target_list_guard = self.target_list.read().await;
target_list_guard
@@ -75,24 +108,37 @@ impl EventNotifier {
}
/// Adds a rules map for a bucket
pub async fn add_rules_map(&self, bucket_name: &str, rules_map: RulesMap) {
///
/// # Arguments
/// * `bucket` - The name of the bucket for which to add the rules map
/// * `rules_map` - The rules map to add for the bucket
pub async fn add_rules_map(&self, bucket: &str, rules_map: RulesMap) {
if rules_map.is_empty() {
self.bucket_rules_map.remove(&bucket_name.to_string()).await;
self.bucket_rules_map.remove(&bucket.to_string()).await;
} else {
self.bucket_rules_map.insert(bucket_name.to_string(), rules_map).await;
self.bucket_rules_map.insert(bucket.to_string(), rules_map).await;
}
info!("Added rules for bucket: {}", bucket_name);
info!("Added rules for bucket: {}", bucket);
}
/// Gets the rules map for a specific bucket.
pub async fn get_rules_map(&self, bucket_name: &str) -> Option<RulesMap> {
self.bucket_rules_map.get(&bucket_name.to_string()).await
///
/// # Arguments
/// * `bucket` - The name of the bucket for which to get the rules map
///
/// # Returns
/// Returns `Some(RulesMap)` if rules exist for the bucket, otherwise returns `None`.
pub async fn get_rules_map(&self, bucket: &str) -> Option<RulesMap> {
self.bucket_rules_map.get(&bucket.to_string()).await
}
/// Removes notification rules for a bucket
pub async fn remove_notification(&self, bucket_name: &str) {
self.bucket_rules_map.remove(&bucket_name.to_string()).await;
info!("Removed notification rules for bucket: {}", bucket_name);
///
/// # Arguments
/// * `bucket` - The name of the bucket for which to remove notification rules
pub async fn remove_notification(&self, bucket: &str) {
self.bucket_rules_map.remove(&bucket.to_string()).await;
info!("Removed notification rules for bucket: {}", bucket);
}
/// Removes all targets
@@ -125,69 +171,87 @@ impl EventNotifier {
}
/// Sends an event to the appropriate targets based on the bucket rules
///
/// # Arguments
/// * `event` - The event to send
#[instrument(skip_all)]
pub async fn send(&self, event: Arc<Event>) {
let bucket_name = &event.s3.bucket.name;
let object_key = &event.s3.object.key;
let event_name = event.event_name;
if let Some(rules) = self.bucket_rules_map.get(bucket_name).await {
let target_ids = rules.match_rules(event_name, object_key);
if target_ids.is_empty() {
debug!("No matching targets for event in bucket: {}", bucket_name);
return;
}
let target_ids_len = target_ids.len();
let mut handles = vec![];
// Use scope to limit the borrow scope of target_list
{
let target_list_guard = self.target_list.read().await;
info!("Sending event to targets: {:?}", target_ids);
for target_id in target_ids {
// `get` now returns Option<Arc<dyn Target + Send + Sync>>
if let Some(target_arc) = target_list_guard.get(&target_id) {
// Clone an Arc<Box<dyn Target>> (which is where target_list is stored) to move into an asynchronous task
// target_arc is already Arc, clone it for the async task
let cloned_target_for_task = target_arc.clone();
let event_clone = event.clone();
let target_name_for_task = cloned_target_for_task.name(); // Get the name before generating the task
debug!("Preparing to send event to target: {}", target_name_for_task);
// Use cloned data in closures to avoid borrowing conflicts
// Create an EntityTarget from the event
let entity_target: Arc<EntityTarget<Event>> = Arc::new(EntityTarget {
object_name: object_key.to_string(),
bucket_name: bucket_name.to_string(),
event_name,
data: event_clone.clone().as_ref().clone(),
});
let handle = tokio::spawn(async move {
if let Err(e) = cloned_target_for_task.save(entity_target.clone()).await {
error!("Failed to send event to target {}: {}", target_name_for_task, e);
} else {
debug!("Successfully saved event to target {}", target_name_for_task);
}
});
handles.push(handle);
} else {
warn!("Target ID {:?} found in rules but not in target list.", target_id);
}
}
// target_list is automatically released here
}
// Wait for all tasks to be completed
for handle in handles {
if let Err(e) = handle.await {
error!("Task for sending/saving event failed: {}", e);
}
}
info!("Event processing initiated for {} targets for bucket: {}", target_ids_len, bucket_name);
} else {
let Some(rules) = self.bucket_rules_map.get(bucket_name).await else {
debug!("No rules found for bucket: {}", bucket_name);
return;
};
let target_ids = rules.match_rules(event_name, object_key);
if target_ids.is_empty() {
debug!("No matching targets for event in bucket: {}", bucket_name);
return;
}
let target_ids_len = target_ids.len();
let mut handles = vec![];
// Use scope to limit the borrow scope of target_list
let target_list_guard = self.target_list.read().await;
info!("Sending event to targets: {:?}", target_ids);
for target_id in target_ids {
// `get` now returns Option<Arc<dyn Target + Send + Sync>>
if let Some(target_arc) = target_list_guard.get(&target_id) {
// Clone an Arc<Box<dyn Target>> (which is where target_list is stored) to move into an asynchronous task
// target_arc is already Arc, clone it for the async task
let target_for_task = target_arc.clone();
let limiter = self.send_limiter.clone();
let event_clone = event.clone();
let target_name_for_task = target_for_task.name(); // Get the name before generating the task
debug!("Preparing to send event to target: {}", target_name_for_task);
// Use cloned data in closures to avoid borrowing conflicts
// Create an EntityTarget from the event
let entity_target: Arc<EntityTarget<Event>> = Arc::new(EntityTarget {
object_name: object_key.to_string(),
bucket_name: bucket_name.to_string(),
event_name,
data: event_clone.as_ref().clone(),
});
let handle = tokio::spawn(async move {
let _permit = match limiter.acquire_owned().await {
Ok(p) => p,
Err(e) => {
error!("Failed to acquire send permit for target {}: {}", target_name_for_task, e);
return;
}
};
if let Err(e) = target_for_task.save(entity_target.clone()).await {
error!("Failed to send event to target {}: {}", target_name_for_task, e);
} else {
debug!("Successfully saved event to target {}", target_name_for_task);
}
});
handles.push(handle);
} else {
warn!("Target ID {:?} found in rules but not in target list.", target_id);
}
}
// target_list is automatically released here
drop(target_list_guard);
// Wait for all tasks to be completed
for handle in handles {
if let Err(e) = handle.await {
error!("Task for sending/saving event failed: {}", e);
}
}
info!("Event processing initiated for {} targets for bucket: {}", target_ids_len, bucket_name);
}
/// Initializes the targets for buckets
///
/// # Arguments
/// * `targets_to_init` - A vector of boxed targets to initialize
///
/// # Returns
/// Returns `Ok(())` if initialization is successful, otherwise returns a `NotificationError`.
#[instrument(skip(self, targets_to_init))]
pub async fn init_bucket_targets(
&self,
@@ -218,6 +282,7 @@ impl EventNotifier {
/// A thread-safe list of targets
pub struct TargetList {
/// Map of TargetID to Target
targets: HashMap<TargetID, Arc<dyn Target<Event> + Send + Sync>>,
}
@@ -234,6 +299,12 @@ impl TargetList {
}
/// Adds a target to the list
///
/// # Arguments
/// * `target` - The target to add
///
/// # Returns
/// Returns `Ok(())` if the target was added successfully, or a `NotificationError` if an error occurred.
pub fn add(&mut self, target: Arc<dyn Target<Event> + Send + Sync>) -> Result<(), NotificationError> {
let id = target.id();
if self.targets.contains_key(&id) {
@@ -251,6 +322,12 @@ impl TargetList {
/// Removes a target by ID. Note: This does not stop its associated event stream.
/// Stream cancellation should be handled by EventNotifier.
///
/// # Arguments
/// * `id` - The ID of the target to remove
///
/// # Returns
/// Returns the removed target if it existed, otherwise `None`.
pub async fn remove_target_only(&mut self, id: &TargetID) -> Option<Arc<dyn Target<Event> + Send + Sync>> {
if let Some(target_arc) = self.targets.remove(id) {
if let Err(e) = target_arc.close().await {
@@ -278,6 +355,12 @@ impl TargetList {
}
/// Returns a target by ID
///
/// # Arguments
/// * `id` - The ID of the target to retrieve
///
/// # Returns
/// Returns the target if it exists, otherwise `None`.
pub fn get(&self, id: &TargetID) -> Option<Arc<dyn Target<Event> + Send + Sync>> {
self.targets.get(id).cloned()
}
@@ -292,7 +375,7 @@ impl TargetList {
self.targets.len()
}
// is_empty can be derived from len()
/// is_empty can be derived from len()
pub fn is_empty(&self) -> bool {
self.targets.is_empty()
}

View File

@@ -15,13 +15,60 @@
use super::rules_map::RulesMap;
use super::xml_config::ParseConfigError as BucketNotificationConfigError;
use crate::rules::NotificationConfiguration;
use crate::rules::pattern_rules;
use crate::rules::target_id_set;
use hashbrown::HashMap;
use crate::rules::subscriber_snapshot::{BucketRulesSnapshot, DynRulesContainer, RuleEvents, RulesContainer};
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use serde::{Deserialize, Serialize};
use std::io::Read;
use std::sync::Arc;
/// A "rule view", only used for snapshot mask/consistency verification.
/// Here we choose to generate the view by "single event" to ensure that event_mask calculation is reliable and simple.
#[derive(Debug)]
struct RuleView {
events: Vec<EventName>,
}
impl RuleEvents for RuleView {
fn subscribed_events(&self) -> &[EventName] {
&self.events
}
}
/// Adapt RulesMap to RulesContainer.
/// Key point: The items returned by iter_rules are &dyn RuleEvents, so a RuleView list is cached in the container.
#[derive(Debug)]
struct CompiledRules {
// Keep RulesMap (can be used later if you want to make more complex judgments during the snapshot reading phase)
#[allow(dead_code)]
rules_map: RulesMap,
// for RulesContainer::iter_rules
rule_views: Vec<RuleView>,
}
impl CompiledRules {
fn from_rules_map(rules_map: &RulesMap) -> Self {
let mut rule_views = Vec::new();
for ev in rules_map.iter_events() {
rule_views.push(RuleView { events: vec![ev] });
}
Self {
rules_map: rules_map.clone(),
rule_views,
}
}
}
impl RulesContainer for CompiledRules {
type Rule = dyn RuleEvents;
fn iter_rules<'a>(&'a self) -> Box<dyn Iterator<Item = &'a Self::Rule> + 'a> {
// Key: Convert &RuleView into &dyn RuleEvents
Box::new(self.rule_views.iter().map(|v| v as &dyn RuleEvents))
}
}
/// Configuration for bucket notifications.
/// This struct now holds the parsed and validated rules in the new RulesMap format.
@@ -119,11 +166,26 @@ impl BucketNotificationConfig {
pub fn set_region(&mut self, region: &str) {
self.region = region.to_string();
}
}
// Add a helper to PatternRules if not already present
impl pattern_rules::PatternRules {
pub fn inner(&self) -> &HashMap<String, target_id_set::TargetIdSet> {
&self.rules
/// Compiles the current BucketNotificationConfig into a BucketRulesSnapshot.
/// This involves transforming the rules into a format suitable for runtime use,
/// and calculating the event mask based on the subscribed events of the rules.
///
/// # Returns
/// A BucketRulesSnapshot containing the compiled rules and event mask.
pub fn compile_snapshot(&self) -> BucketRulesSnapshot<DynRulesContainer> {
// 1) Generate container from RulesMap
let compiled = CompiledRules::from_rules_map(self.get_rules_map());
let rules: Arc<DynRulesContainer> = Arc::new(compiled) as Arc<DynRulesContainer>;
// 2) Calculate event_mask
let mut mask = 0u64;
for rule in rules.iter_rules() {
for ev in rule.subscribed_events() {
mask |= ev.mask();
}
}
BucketRulesSnapshot { event_mask: mask, rules }
}
}

View File

@@ -12,22 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod config;
pub mod pattern;
pub mod pattern_rules;
pub mod rules_map;
pub mod target_id_set;
mod pattern_rules;
mod rules_map;
mod subscriber_index;
mod subscriber_snapshot;
mod target_id_set;
pub mod xml_config; // For XML structure definition and parsing
pub mod config; // Definition and parsing for BucketNotificationConfig
// Definition and parsing for BucketNotificationConfig
// Re-export key types from submodules for easy access to `crate::rules::TypeName`
// Re-export key types from submodules for external use
pub use config::BucketNotificationConfig;
// Assume that BucketNotificationConfigError is also defined in config.rs
// Or if it is still an alias for xml_config::ParseConfigError , adjust accordingly
pub use xml_config::ParseConfigError as BucketNotificationConfigError;
pub use pattern_rules::PatternRules;
pub use rules_map::RulesMap;
pub use subscriber_index::*;
pub use subscriber_snapshot::*;
pub use target_id_set::TargetIdSet;
pub use xml_config::{NotificationConfiguration, ParseConfigError};
pub use xml_config::{NotificationConfiguration, ParseConfigError, ParseConfigError as BucketNotificationConfigError};

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::pattern;
use super::target_id_set::TargetIdSet;
use crate::rules::TargetIdSet;
use crate::rules::pattern;
use hashbrown::HashMap;
use rayon::prelude::*;
use rustfs_targets::arn::TargetID;
@@ -27,31 +27,69 @@ pub struct PatternRules {
}
impl PatternRules {
/// Create a new, empty PatternRules.
pub fn new() -> Self {
Default::default()
}
/// Add rules: Pattern and Target ID.
/// If the schema already exists, add target_id to the existing TargetIdSet.
///
/// # Arguments
/// * `pattern` - The object name pattern.
/// * `target_id` - The TargetID to associate with the pattern.
pub fn add(&mut self, pattern: String, target_id: TargetID) {
self.rules.entry(pattern).or_default().insert(target_id);
}
/// Checks if there are any rules that match the given object name.
///
/// # Arguments
/// * `object_name` - The object name to match against the patterns.
///
/// # Returns
/// `true` if any pattern matches the object name, otherwise `false`.
pub fn match_simple(&self, object_name: &str) -> bool {
self.rules.keys().any(|p| pattern::match_simple(p, object_name))
}
/// Returns all TargetIDs that match the object name.
///
/// Performance optimization points:
/// 1) Small collections are serialized directly to avoid rayon scheduling/merging overhead
/// 2) When hitting, no longer temporarily allocate TargetIdSet for each rule, but directly extend
///
/// # Arguments
/// * `object_name` - The object name to match against the patterns.
///
/// # Returns
/// A TargetIdSet containing all TargetIDs that match the object name.
pub fn match_targets(&self, object_name: &str) -> TargetIdSet {
let n = self.rules.len();
if n == 0 {
return TargetIdSet::new();
}
// Experience Threshold: Serial is usually faster below this value (can be adjusted after benchmarking)
const PAR_THRESHOLD: usize = 128;
if n < PAR_THRESHOLD {
let mut out = TargetIdSet::new();
for (pattern_str, target_set) in self.rules.iter() {
if pattern::match_simple(pattern_str, object_name) {
out.extend(target_set.iter().cloned());
}
}
return out;
}
// Parallel path: Each thread accumulates a local set and finally merges it to reduce frequent allocations
self.rules
.par_iter()
.filter_map(|(pattern_str, target_set)| {
.fold(TargetIdSet::new, |mut local, (pattern_str, target_set)| {
if pattern::match_simple(pattern_str, object_name) {
Some(target_set.iter().cloned().collect::<TargetIdSet>())
} else {
None
local.extend(target_set.iter().cloned());
}
local
})
.reduce(TargetIdSet::new, |mut acc, set| {
acc.extend(set);
@@ -65,6 +103,11 @@ impl PatternRules {
/// Merge another PatternRules.
/// Corresponding to Go's `Rules.Union`.
/// # Arguments
/// * `other` - The PatternRules to merge with.
///
/// # Returns
/// A new PatternRules containing the union of both.
pub fn union(&self, other: &Self) -> Self {
let mut new_rules = self.clone();
for (pattern, their_targets) in &other.rules {
@@ -76,6 +119,13 @@ impl PatternRules {
/// Calculate the difference from another PatternRules.
/// Corresponding to Go's `Rules.Difference`.
/// The result contains only the patterns and TargetIDs that are in `self` but not in `other`.
///
/// # Arguments
/// * `other` - The PatternRules to compare against.
///
/// # Returns
/// A new PatternRules containing the difference.
pub fn difference(&self, other: &Self) -> Self {
let mut result_rules = HashMap::new();
for (pattern, self_targets) in &self.rules {
@@ -94,4 +144,59 @@ impl PatternRules {
}
PatternRules { rules: result_rules }
}
/// Merge another PatternRules into self in place.
/// Corresponding to Go's `Rules.UnionInPlace`.
/// # Arguments
/// * `other` - The PatternRules to merge with.
pub fn union_in_place(&mut self, other: &Self) {
for (pattern, their_targets) in &other.rules {
self.rules
.entry(pattern.clone())
.or_default()
.extend(their_targets.iter().cloned());
}
}
/// Calculate the difference from another PatternRules in place.
/// Corresponding to Go's `Rules.DifferenceInPlace`.
/// The result contains only the patterns and TargetIDs that are in `self` but not in `other`.
/// # Arguments
/// * `other` - The PatternRules to compare against.
pub fn difference_in_place(&mut self, other: &Self) {
self.rules.retain(|pattern, self_targets| {
if let Some(other_targets) = other.rules.get(pattern) {
// Remove other_targets from self_targets
self_targets.retain(|tid| !other_targets.contains(tid));
}
!self_targets.is_empty()
});
}
/// Remove a pattern and its associated TargetID set from the PatternRules.
///
/// # Arguments
/// * `pattern` - The pattern to remove.
pub fn remove_pattern(&mut self, pattern: &str) -> bool {
self.rules.remove(pattern).is_some()
}
/// Determine whether the current PatternRules contains the specified TargetID (referenced by any pattern).
///
/// # Parameters
/// * `target_id` - The TargetID to check for existence within the PatternRules
///
/// # Returns
/// * `true` if the TargetID exists in any of the patterns; `false` otherwise.
pub fn contains_target_id(&self, target_id: &TargetID) -> bool {
self.rules.values().any(|set| set.contains(target_id))
}
/// Expose the internal rules for use in scenarios such as BucketNotificationConfig::validate.
///
/// # Returns
/// A reference to the internal HashMap of patterns to TargetIdSets.
pub fn inner(&self) -> &HashMap<String, TargetIdSet> {
&self.rules
}
}

View File

@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::pattern_rules::PatternRules;
use super::target_id_set::TargetIdSet;
use crate::rules::{PatternRules, TargetIdSet};
use hashbrown::HashMap;
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
@@ -31,6 +30,9 @@ pub struct RulesMap {
impl RulesMap {
/// Create a new, empty RulesMap.
///
/// # Returns
/// A new instance of RulesMap with an empty map and a total_events_mask set to 0.
pub fn new() -> Self {
Default::default()
}
@@ -67,12 +69,12 @@ impl RulesMap {
/// Merge another RulesMap.
/// `RulesMap.Add(rulesMap2 RulesMap) corresponding to Go
///
/// # Parameters
/// * `other_map` - The other RulesMap to be merged into the current one.
pub fn add_map(&mut self, other_map: &Self) {
for (event_name, other_pattern_rules) in &other_map.map {
let self_pattern_rules = self.map.entry(*event_name).or_default();
// PatternRules::union Returns the new PatternRules, we need to modify the existing ones
let merged_rules = self_pattern_rules.union(other_pattern_rules);
*self_pattern_rules = merged_rules;
self.map.entry(*event_name).or_default().union_in_place(other_pattern_rules);
}
// Directly merge two masks.
self.total_events_mask |= other_map.total_events_mask;
@@ -81,11 +83,14 @@ impl RulesMap {
/// Remove another rule defined in the RulesMap from the current RulesMap.
///
/// After the rule is removed, `total_events_mask` is recalculated to ensure its accuracy.
///
/// # Parameters
/// * `other_map` - The other RulesMap containing rules to be removed from the current one.
pub fn remove_map(&mut self, other_map: &Self) {
let mut events_to_remove = Vec::new();
for (event_name, self_pattern_rules) in &mut self.map {
if let Some(other_pattern_rules) = other_map.map.get(event_name) {
*self_pattern_rules = self_pattern_rules.difference(other_pattern_rules);
self_pattern_rules.difference_in_place(other_pattern_rules);
if self_pattern_rules.is_empty() {
events_to_remove.push(*event_name);
}
@@ -102,6 +107,9 @@ impl RulesMap {
///
/// This method uses a bitmask for a quick check of O(1) complexity.
/// `event_name` can be a compound type, such as `ObjectCreatedAll`.
///
/// # Parameters
/// * `event_name` - The event name to check for subscribers.
pub fn has_subscriber(&self, event_name: &EventName) -> bool {
// event_name.mask() will handle compound events correctly
(self.total_events_mask & event_name.mask()) != 0
@@ -112,39 +120,54 @@ impl RulesMap {
/// # Notice
/// The `event_name` parameter should be a specific, non-compound event type.
/// Because this is taken from the `Event` object that actually occurs.
///
/// # Parameters
/// * `event_name` - The specific event name to match against.
/// * `object_key` - The object key to match against the patterns in the rules.
///
/// # Returns
/// * A set of TargetIDs that match the given event and object key.
pub fn match_rules(&self, event_name: EventName, object_key: &str) -> TargetIdSet {
// Use bitmask to quickly determine whether there is a matching rule
if (self.total_events_mask & event_name.mask()) == 0 {
return TargetIdSet::new(); // No matching rules
}
// First try to directly match the event name
if let Some(pattern_rules) = self.map.get(&event_name) {
let targets = pattern_rules.match_targets(object_key);
if !targets.is_empty() {
return targets;
}
}
// Go's RulesMap[eventName] is directly retrieved, and if it does not exist, it is empty Rules.
// Rust's HashMap::get returns Option. If the event name does not exist, there is no rule.
// Compound events (such as ObjectCreatedAll) have been expanded as a single event when add_rule_config.
// Therefore, a single event name should be used when querying.
// If event_name itself is a single type, look it up directly.
// If event_name is a compound type, Go's logic is expanded when added.
// Here match_rules should receive events that may already be single.
// If the caller passes in a compound event, it should expand itself or handle this function first.
// Assume that event_name is already a specific event that can be used for searching.
// In Go, RulesMap[eventName] returns empty rules if the key doesn't exist.
// Rust's HashMap::get returns Option, so missing key means no rules.
// Compound events like ObjectCreatedAll are expanded into specific events during add_rule_config.
// Thus, queries should use specific event names.
// If event_name is compound, expansion happens at addition time.
// match_rules assumes event_name is already a specific event for lookup.
// Callers should expand compound events before calling this method.
self.map
.get(&event_name)
.map_or_else(TargetIdSet::new, |pr| pr.match_targets(object_key))
}
/// Check if RulesMap is empty.
///
/// # Returns
/// * `true` if there are no rules in the map; `false` otherwise
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
/// Determine whether the current RulesMap contains the specified TargetID (referenced by any event / pattern).
///
/// # Parameters
/// * `target_id` - The TargetID to check for existence within the RulesMap
///
/// # Returns
/// * `true` if the TargetID exists in any of the PatternRules; `false` otherwise.
pub fn contains_target_id(&self, target_id: &TargetID) -> bool {
self.map.values().any(|pr| pr.contains_target_id(target_id))
}
/// Returns a clone of internal rules for use in scenarios such as BucketNotificationConfig::validate.
///
/// # Returns
/// A reference to the internal HashMap of EventName to PatternRules.
pub fn inner(&self) -> &HashMap<EventName, PatternRules> {
&self.map
}
@@ -160,18 +183,32 @@ impl RulesMap {
}
/// Remove rules and optimize performance
///
/// # Parameters
/// * `event_name` - The EventName from which to remove the rule.
/// * `pattern` - The pattern of the rule to be removed.
#[allow(dead_code)]
pub fn remove_rule(&mut self, event_name: &EventName, pattern: &str) {
let mut remove_event = false;
if let Some(pattern_rules) = self.map.get_mut(event_name) {
pattern_rules.rules.remove(pattern);
pattern_rules.remove_pattern(pattern);
if pattern_rules.is_empty() {
self.map.remove(event_name);
remove_event = true;
}
}
if remove_event {
self.map.remove(event_name);
}
self.recalculate_mask(); // Delay calculation mask
}
/// Batch Delete Rules
/// Batch Delete Rules and Optimize Performance
///
/// # Parameters
/// * `event_names` - A slice of EventNames to be removed.
#[allow(dead_code)]
pub fn remove_rules(&mut self, event_names: &[EventName]) {
for event_name in event_names {
@@ -181,9 +218,27 @@ impl RulesMap {
}
/// Update rules and optimize performance
///
/// # Parameters
/// * `event_name` - The EventName to update.
/// * `pattern` - The pattern of the rule to be updated.
/// * `target_id` - The TargetID to be added.
#[allow(dead_code)]
pub fn update_rule(&mut self, event_name: EventName, pattern: String, target_id: TargetID) {
self.map.entry(event_name).or_default().add(pattern, target_id);
self.total_events_mask |= event_name.mask(); // Update only the relevant bitmask
}
/// Iterate all EventName keys contained in this RulesMap.
///
/// Used by snapshot compilation to compute bucket event_mask.
///
/// # Returns
/// An iterator over all EventName keys in the RulesMap.
#[inline]
pub fn iter_events(&self) -> impl Iterator<Item = EventName> + '_ {
// `inner()` is already used by config.rs, so we reuse it here.
// If the key type is `EventName`, `.copied()` is the cheapest way to return values.
self.inner().keys().copied()
}
}

View File

@@ -0,0 +1,131 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::rules::{BucketRulesSnapshot, BucketSnapshotRef, DynRulesContainer};
use arc_swap::ArcSwap;
use rustfs_targets::EventName;
use starshard::ShardedHashMap;
use std::fmt;
use std::sync::Arc;
/// A global bucket -> snapshot index.
///
/// Read path: lock-free load (ArcSwap)
/// Write path: atomic replacement after building a new snapshot
pub struct SubscriberIndex {
// Use starshard for sharding to reduce lock competition when the number of buckets is large
inner: ShardedHashMap<String, Arc<ArcSwap<BucketRulesSnapshot<DynRulesContainer>>>>,
// Cache an "empty rule container" for empty snapshots (avoids building every time)
empty_rules: Arc<DynRulesContainer>,
}
/// Avoid deriving fields that do not support Debug
impl fmt::Debug for SubscriberIndex {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SubscriberIndex").finish_non_exhaustive()
}
}
impl SubscriberIndex {
/// Create a new SubscriberIndex.
///
/// # Arguments
/// * `empty_rules` - An Arc to an empty rules container used for empty snapshots
///
/// Returns a new instance of SubscriberIndex.
pub fn new(empty_rules: Arc<DynRulesContainer>) -> Self {
Self {
inner: ShardedHashMap::new(64),
empty_rules,
}
}
/// Get the current snapshot of a bucket.
/// If it does not exist, return empty snapshot.
///
/// # Arguments
/// * `bucket` - The name of the bucket to load.
///
/// Returns the snapshot reference for the specified bucket.
pub fn load_snapshot(&self, bucket: &str) -> BucketSnapshotRef {
match self.inner.get(&bucket.to_string()) {
Some(cell) => cell.load_full(),
None => Arc::new(BucketRulesSnapshot::empty(self.empty_rules.clone())),
}
}
/// Quickly determine whether the bucket has a subscription to an event.
/// This judgment can be consistent with subsequent rule matching when reading the same snapshot.
///
/// # Arguments
/// * `bucket` - The name of the bucket to check.
/// * `event` - The event name to check for subscriptions.
///
/// Returns `true` if there are subscribers for the event, `false` otherwise.
#[inline]
pub fn has_subscriber(&self, bucket: &str, event: &EventName) -> bool {
let snap = self.load_snapshot(bucket);
if snap.event_mask == 0 {
return false;
}
snap.has_event(event)
}
/// Atomically update a bucket's snapshot (whole package replacement).
///
/// - The caller first builds the complete `BucketRulesSnapshot` (including event\_mask and rules).
/// - This method ensures that the read path will not observe intermediate states.
///
/// # Arguments
/// * `bucket` - The name of the bucket to update.
/// * `new_snapshot` - The new snapshot to store for the bucket.
pub fn store_snapshot(&self, bucket: &str, new_snapshot: BucketRulesSnapshot<DynRulesContainer>) {
let key = bucket.to_string();
let cell = self.inner.get(&key).unwrap_or_else(|| {
// Insert a default cell (empty snapshot)
let init = Arc::new(ArcSwap::from_pointee(BucketRulesSnapshot::empty(self.empty_rules.clone())));
self.inner.insert(key.clone(), init.clone());
init
});
cell.store(Arc::new(new_snapshot));
}
/// Delete the bucket's subscription view (make it empty).
///
/// # Arguments
/// * `bucket` - The name of the bucket to clear.
pub fn clear_bucket(&self, bucket: &str) {
if let Some(cell) = self.inner.get(&bucket.to_string()) {
cell.store(Arc::new(BucketRulesSnapshot::empty(self.empty_rules.clone())));
}
}
}
impl Default for SubscriberIndex {
fn default() -> Self {
// An available empty rule container is required; here it is implemented using minimal empty
#[derive(Debug)]
struct EmptyRules;
impl crate::rules::subscriber_snapshot::RulesContainer for EmptyRules {
type Rule = dyn crate::rules::subscriber_snapshot::RuleEvents;
fn iter_rules<'a>(&'a self) -> Box<dyn Iterator<Item = &'a Self::Rule> + 'a> {
Box::new(std::iter::empty())
}
}
Self::new(Arc::new(EmptyRules) as Arc<DynRulesContainer>)
}
}

View File

@@ -0,0 +1,117 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_targets::EventName;
use std::sync::Arc;
/// Let the rules structure provide "what events it is subscribed to".
/// This way BucketRulesSnapshot does not need to know the internal shape of rules.
pub trait RuleEvents {
fn subscribed_events(&self) -> &[EventName];
}
/// Let the rules container provide the ability to iterate over all rules (abstracting only to the minimum necessary).
pub trait RulesContainer {
type Rule: RuleEvents + ?Sized;
fn iter_rules<'a>(&'a self) -> Box<dyn Iterator<Item = &'a Self::Rule> + 'a>;
/// Fast empty judgment for snapshots (fix missing `rules.is_empty()`)
fn is_empty(&self) -> bool {
self.iter_rules().next().is_none()
}
}
/// Represents a bucket's notification subscription view snapshot (immutable).
///
/// - `event_mask`: Quickly determine whether there is a subscription to a certain type of event (bitset/flags).
/// - `rules`: precise rule mapping (prefix/suffix/pattern -> targets).
///
/// The read path only reads this snapshot to ensure consistency.
#[derive(Debug, Clone)]
pub struct BucketRulesSnapshot<R>
where
R: RulesContainer + ?Sized,
{
pub event_mask: u64,
pub rules: Arc<R>,
}
impl<R> BucketRulesSnapshot<R>
where
R: RulesContainer + ?Sized,
{
/// Create an empty snapshot with no subscribed events and no rules.
///
/// # Arguments
/// * `rules` - An Arc to a rules container (can be an empty container).
///
/// # Returns
/// An instance of `BucketRulesSnapshot` with an empty event mask.
#[inline]
pub fn empty(rules: Arc<R>) -> Self {
Self { event_mask: 0, rules }
}
/// Check if the snapshot has any subscribers for the specified event.
///
/// # Arguments
/// * `event` - The event name to check for subscriptions.
///
/// # Returns
/// `true` if there are subscribers for the event, `false` otherwise.
#[inline]
pub fn has_event(&self, event: &EventName) -> bool {
(self.event_mask & event.mask()) != 0
}
/// Check if the snapshot is empty (no subscribed events or rules).
///
/// # Returns
/// `true` if the snapshot is empty, `false` otherwise.
#[inline]
pub fn is_empty(&self) -> bool {
self.event_mask == 0 || self.rules.is_empty()
}
/// [debug] Assert that `event_mask` is consistent with the event declared in `rules`.
///
/// Constraints:
/// - only runs in debug builds (release incurs no cost).
/// - If the rule contains compound events (\*All / Everything), rely on `EventName::mask()` to automatically expand.
#[inline]
pub fn debug_assert_mask_consistent(&self) {
#[cfg(debug_assertions)]
{
let mut recomputed = 0u64;
for rule in self.rules.iter_rules() {
for ev in rule.subscribed_events() {
recomputed |= ev.mask();
}
}
debug_assert!(
recomputed == self.event_mask,
"BucketRulesSnapshot.event_mask inconsistent: stored={:#x}, recomputed={:#x}",
self.event_mask,
recomputed
);
}
}
}
/// Unify trait-object snapshot types (fix Sized / missing generic arguments)
pub type DynRulesContainer = dyn RulesContainer<Rule = dyn RuleEvents> + Send + Sync;
/// Expose Arc form to facilitate sharing.
pub type BucketSnapshotRef = Arc<BucketRulesSnapshot<DynRulesContainer>>;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use super::pattern;
use crate::rules::pattern;
use hashbrown::HashSet;
use rustfs_targets::EventName;
use rustfs_targets::arn::{ARN, ArnError, TargetIDError};

View File

@@ -13,18 +13,23 @@
// limitations under the License.
use crate::{Event, integration::NotificationMetrics};
use rustfs_targets::StoreError;
use rustfs_targets::Target;
use rustfs_targets::TargetError;
use rustfs_targets::store::{Key, Store};
use rustfs_targets::target::EntityTarget;
use rustfs_targets::{
StoreError, Target, TargetError,
store::{Key, Store},
target::EntityTarget,
};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{Semaphore, mpsc};
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
/// Streams events from the store to the target
/// Streams events from the store to the target with retry logic
///
/// # Arguments
/// - `store`: The event store
/// - `target`: The target to send events to
/// - `cancel_rx`: Receiver to listen for cancellation signals
pub async fn stream_events(
store: &mut (dyn Store<Event, Error = StoreError, Key = Key> + Send),
target: &dyn Target<Event>,
@@ -67,6 +72,7 @@ pub async fn stream_events(
match target.send_from_store(key.clone()).await {
Ok(_) => {
info!("Successfully sent event for target: {}", target.name());
// send_from_store deletes the event from store on success
success = true;
}
Err(e) => {
@@ -104,6 +110,13 @@ pub async fn stream_events(
}
/// Starts the event streaming process for a target
///
/// # Arguments
/// - `store`: The event store
/// - `target`: The target to send events to
///
/// # Returns
/// A sender to signal cancellation of the event stream
pub fn start_event_stream(
mut store: Box<dyn Store<Event, Error = StoreError, Key = Key> + Send>,
target: Arc<dyn Target<Event> + Send + Sync>,
@@ -119,6 +132,15 @@ pub fn start_event_stream(
}
/// Start event stream with batch processing
///
/// # Arguments
/// - `store`: The event store
/// - `target`: The target to send events to clients
/// - `metrics`: Metrics for monitoring
/// - `semaphore`: Semaphore to limit concurrency
///
/// # Returns
/// A sender to signal cancellation of the event stream
pub fn start_event_stream_with_batching(
mut store: Box<dyn Store<EntityTarget<Event>, Error = StoreError, Key = Key> + Send>,
target: Arc<dyn Target<Event> + Send + Sync>,
@@ -136,6 +158,16 @@ pub fn start_event_stream_with_batching(
}
/// Event stream processing with batch processing
///
/// # Arguments
/// - `store`: The event store
/// - `target`: The target to send events to clients
/// - `cancel_rx`: Receiver to listen for cancellation signals
/// - `metrics`: Metrics for monitoring
/// - `semaphore`: Semaphore to limit concurrency
///
/// # Notes
/// This function processes events in batches to improve efficiency.
pub async fn stream_events_with_batching(
store: &mut (dyn Store<EntityTarget<Event>, Error = StoreError, Key = Key> + Send),
target: &dyn Target<Event>,
@@ -231,7 +263,17 @@ pub async fn stream_events_with_batching(
}
}
/// Processing event batches
/// Processing event batches for targets
/// # Arguments
/// - `batch`: The batch of events to process
/// - `batch_keys`: The corresponding keys of the events in the batch
/// - `target`: The target to send events to clients
/// - `max_retries`: Maximum number of retries for sending an event
/// - `base_delay`: Base delay duration for retries
/// - `metrics`: Metrics for monitoring
/// - `semaphore`: Semaphore to limit concurrency
/// # Notes
/// This function processes a batch of events, sending each event to the target with retry
async fn process_batch(
batch: &mut Vec<EntityTarget<Event>>,
batch_keys: &mut Vec<Key>,
@@ -262,6 +304,7 @@ async fn process_batch(
// Retry logic
while retry_count < max_retries && !success {
// After sending successfully, the event in the storage is deleted synchronously.
match target.send_from_store(key.clone()).await {
Ok(_) => {
info!("Successfully sent event for target: {}, Key: {}", target.name(), key.to_string());

View File

@@ -39,9 +39,9 @@ use rustfs_config::{
ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FLUSH_MS, ENV_OBS_LOG_MESSAGE_CAPA, ENV_OBS_LOG_POOL_CAPA,
},
};
use rustfs_utils::{get_env_u64, get_env_usize, get_local_ip_with_default};
use rustfs_utils::{get_env_opt_str, get_env_u64, get_env_usize, get_local_ip_with_default};
use smallvec::SmallVec;
use std::{borrow::Cow, env, fs, io::IsTerminal, time::Duration};
use std::{borrow::Cow, fs, io::IsTerminal, time::Duration};
use tracing::info;
use tracing_error::ErrorLayer;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
@@ -574,8 +574,8 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> Result<OtelGuard, Telemetry
}
// Rule 2: The user has explicitly customized the log directory (determined by whether ENV_OBS_LOG_DIRECTORY is set)
let user_set_log_dir = env::var(ENV_OBS_LOG_DIRECTORY).is_ok();
if user_set_log_dir {
let user_set_log_dir = get_env_opt_str(ENV_OBS_LOG_DIRECTORY);
if user_set_log_dir.filter(|d| !d.is_empty()).is_some() {
return init_file_logging(config, logger_level, is_production);
}

View File

@@ -119,16 +119,26 @@ pub async fn start_http_server(
// The listening address and port are obtained from the parameters
let listener = {
let mut server_addr = server_addr;
let mut socket = socket2::Socket::new(
// Try to create a socket for the address family; if that fails, fallback to IPv4.
let mut socket = match socket2::Socket::new(
socket2::Domain::for_address(server_addr),
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)?;
) {
Ok(s) => s,
Err(e) => {
warn!("Failed to create socket for {:?}: {}, falling back to IPv4", server_addr, e);
let ipv4_addr = SocketAddr::new(std::net::Ipv4Addr::UNSPECIFIED.into(), server_addr.port());
server_addr = ipv4_addr;
socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?
}
};
// If address is IPv6 try to enable dual-stack; on failure, switch to IPv4 socket.
if server_addr.is_ipv6() {
if let Err(e) = socket.set_only_v6(false) {
warn!("Failed to set IPV6_V6ONLY=false, falling back to IPv4-only: {}", e);
// Fallback to a new IPv4 socket if setting dual-stack fails.
warn!("Failed to set IPV6_V6ONLY=false, attempting IPv4 fallback: {}", e);
let ipv4_addr = SocketAddr::new(std::net::Ipv4Addr::UNSPECIFIED.into(), server_addr.port());
server_addr = ipv4_addr;
socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
@@ -140,8 +150,27 @@ pub async fn start_http_server(
socket.set_reuse_address(true)?;
// Set the socket to non-blocking before passing it to Tokio.
socket.set_nonblocking(true)?;
socket.bind(&server_addr.into())?;
socket.listen(backlog)?;
// Attempt bind; if bind fails for IPv6, try IPv4 fallback once more.
if let Err(bind_err) = socket.bind(&server_addr.into()) {
warn!("Failed to bind to {}: {}.", server_addr, bind_err);
if server_addr.is_ipv6() {
// Try IPv4 fallback
let ipv4_addr = SocketAddr::new(std::net::Ipv4Addr::UNSPECIFIED.into(), server_addr.port());
server_addr = ipv4_addr;
socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?;
socket.bind(&server_addr.into())?;
// [FIX] Ensure fallback socket is moved to listening state as well.
socket.listen(backlog)?;
} else {
return Err(bind_err);
}
} else {
// Listen on the socket when initial bind succeeded
socket.listen(backlog)?;
}
TcpListener::from_std(socket.into())?
};