From bb4fbf5ae2692bc2cbdef509d337206fc982aaae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E6=AD=A3=E8=B6=85?= Date: Wed, 11 Mar 2026 00:37:30 +0800 Subject: [PATCH] fix(notify): ignore disabled targets when sending events (#2117) Co-authored-by: houseme --- crates/notify/src/integration.rs | 2 +- crates/notify/src/notifier.rs | 115 +++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 1 deletion(-) diff --git a/crates/notify/src/integration.rs b/crates/notify/src/integration.rs index 9a9dd09d..70e8a49d 100644 --- a/crates/notify/src/integration.rs +++ b/crates/notify/src/integration.rs @@ -355,7 +355,7 @@ impl NotificationSystem { changed = true; } if targets.is_empty() { - config.0.remove(target_type); + config.0.remove(&ttype); } } if !changed { diff --git a/crates/notify/src/notifier.rs b/crates/notify/src/notifier.rs index e3cb7a30..57c1febb 100644 --- a/crates/notify/src/notifier.rs +++ b/crates/notify/src/notifier.rs @@ -202,6 +202,10 @@ impl EventNotifier { // Clone an Arc> (which is where target_list is stored) to move into an asynchronous task // target_arc is already Arc, clone it for the async task let target_for_task = target_arc.clone(); + if !target_for_task.is_enabled() { + debug!("Skipping disabled target: {}", target_for_task.name()); + continue; + } let limiter = self.send_limiter.clone(); let event_clone = event.clone(); let target_name_for_task = target_for_task.name(); // Get the name before generating the task @@ -385,3 +389,114 @@ impl TargetList { self.targets.is_empty() } } + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + use rustfs_s3_common::EventName; + use rustfs_targets::StoreError; + use rustfs_targets::{ + TargetError, + store::{Key, Store}, + target::EntityTarget, + }; + use serde::{Serialize, de::DeserializeOwned}; + use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }; + + #[derive(Clone)] + struct TestTarget { + id: TargetID, + enabled: bool, + save_calls: Arc, + } + + impl TestTarget { + fn new(id: &str, name: &str, enabled: bool) -> Self { + Self { + id: TargetID::new(id.to_string(), name.to_string()), + enabled, + save_calls: Arc::new(AtomicUsize::new(0)), + } + } + } + + #[async_trait] + impl Target for TestTarget + where + E: Send + Sync + 'static + Clone + Serialize + DeserializeOwned, + { + fn id(&self) -> TargetID { + self.id.clone() + } + + async fn is_active(&self) -> Result { + Ok(self.enabled) + } + + async fn save(&self, _event: Arc>) -> Result<(), TargetError> { + self.save_calls.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + + async fn send_from_store(&self, _key: Key) -> Result<(), TargetError> { + Ok(()) + } + + async fn close(&self) -> Result<(), TargetError> { + Ok(()) + } + + fn store(&self) -> Option<&(dyn Store, Error = StoreError, Key = Key> + Send + Sync)> { + None + } + + fn clone_dyn(&self) -> Box + Send + Sync> { + let cloned = self.clone(); + Box::new(cloned) + } + + async fn init(&self) -> Result<(), TargetError> { + Ok(()) + } + + fn is_enabled(&self) -> bool { + self.enabled + } + } + + #[tokio::test] + async fn test_send_event_skips_disabled_target() { + let notifier = EventNotifier::new(); + + let enabled_target = TestTarget::new("enabled-target", "webhook", true); + let disabled_target = TestTarget::new("disabled-target", "webhook", false); + + let mut rules_map = RulesMap::new(); + rules_map.add_rule_config(&[EventName::ObjectCreatedPut], "*".to_string(), enabled_target.id.clone()); + rules_map.add_rule_config(&[EventName::ObjectCreatedPut], "*".to_string(), disabled_target.id.clone()); + + notifier.add_rules_map("bucket", rules_map).await; + notifier + .target_list() + .write() + .await + .add(Arc::new(enabled_target.clone())) + .unwrap(); + notifier + .target_list() + .write() + .await + .add(Arc::new(disabled_target.clone())) + .unwrap(); + + let event = Arc::new(Event::new_test_event("bucket", "object", EventName::ObjectCreatedPut)); + notifier.send(event).await; + + assert_eq!(enabled_target.save_calls.load(Ordering::SeqCst), 1); + assert_eq!(disabled_target.save_calls.load(Ordering::SeqCst), 0); + } +}