fix(notify): ignore disabled targets when sending events (#2117)

Co-authored-by: houseme <housemecn@gmail.com>
This commit is contained in:
安正超
2026-03-11 00:37:30 +08:00
committed by GitHub
parent 3df7105dae
commit bb4fbf5ae2
2 changed files with 116 additions and 1 deletions

View File

@@ -355,7 +355,7 @@ impl NotificationSystem {
changed = true;
}
if targets.is_empty() {
config.0.remove(target_type);
config.0.remove(&ttype);
}
}
if !changed {

View File

@@ -202,6 +202,10 @@ impl EventNotifier {
// Clone an Arc<Box<dyn Target>> (which is where target_list is stored) to move into an asynchronous task
// target_arc is already Arc, clone it for the async task
let target_for_task = target_arc.clone();
if !target_for_task.is_enabled() {
debug!("Skipping disabled target: {}", target_for_task.name());
continue;
}
let limiter = self.send_limiter.clone();
let event_clone = event.clone();
let target_name_for_task = target_for_task.name(); // Get the name before generating the task
@@ -385,3 +389,114 @@ impl TargetList {
self.targets.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use rustfs_s3_common::EventName;
use rustfs_targets::StoreError;
use rustfs_targets::{
TargetError,
store::{Key, Store},
target::EntityTarget,
};
use serde::{Serialize, de::DeserializeOwned};
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
#[derive(Clone)]
struct TestTarget {
id: TargetID,
enabled: bool,
save_calls: Arc<AtomicUsize>,
}
impl TestTarget {
fn new(id: &str, name: &str, enabled: bool) -> Self {
Self {
id: TargetID::new(id.to_string(), name.to_string()),
enabled,
save_calls: Arc::new(AtomicUsize::new(0)),
}
}
}
#[async_trait]
impl<E> Target<E> for TestTarget
where
E: Send + Sync + 'static + Clone + Serialize + DeserializeOwned,
{
fn id(&self) -> TargetID {
self.id.clone()
}
async fn is_active(&self) -> Result<bool, TargetError> {
Ok(self.enabled)
}
async fn save(&self, _event: Arc<EntityTarget<E>>) -> Result<(), TargetError> {
self.save_calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
async fn send_from_store(&self, _key: Key) -> Result<(), TargetError> {
Ok(())
}
async fn close(&self) -> Result<(), TargetError> {
Ok(())
}
fn store(&self) -> Option<&(dyn Store<EntityTarget<E>, Error = StoreError, Key = Key> + Send + Sync)> {
None
}
fn clone_dyn(&self) -> Box<dyn Target<E> + Send + Sync> {
let cloned = self.clone();
Box::new(cloned)
}
async fn init(&self) -> Result<(), TargetError> {
Ok(())
}
fn is_enabled(&self) -> bool {
self.enabled
}
}
#[tokio::test]
async fn test_send_event_skips_disabled_target() {
let notifier = EventNotifier::new();
let enabled_target = TestTarget::new("enabled-target", "webhook", true);
let disabled_target = TestTarget::new("disabled-target", "webhook", false);
let mut rules_map = RulesMap::new();
rules_map.add_rule_config(&[EventName::ObjectCreatedPut], "*".to_string(), enabled_target.id.clone());
rules_map.add_rule_config(&[EventName::ObjectCreatedPut], "*".to_string(), disabled_target.id.clone());
notifier.add_rules_map("bucket", rules_map).await;
notifier
.target_list()
.write()
.await
.add(Arc::new(enabled_target.clone()))
.unwrap();
notifier
.target_list()
.write()
.await
.add(Arc::new(disabled_target.clone()))
.unwrap();
let event = Arc::new(Event::new_test_event("bucket", "object", EventName::ObjectCreatedPut));
notifier.send(event).await;
assert_eq!(enabled_target.save_calls.load(Ordering::SeqCst), 1);
assert_eq!(disabled_target.save_calls.load(Ordering::SeqCst), 0);
}
}