improve code for notify

This commit is contained in:
houseme
2025-06-21 10:35:07 +08:00
parent 48d530cb13
commit e0f65e5e24
14 changed files with 442 additions and 634 deletions

View File

@@ -1,5 +1,11 @@
use ecstore::config::{Config, KV, KVS};
use rustfs_notify::arn::TargetID;
use rustfs_notify::factory::{
DEFAULT_TARGET, ENABLE, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME,
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
};
use rustfs_notify::global::notification_system;
use rustfs_notify::store::DEFAULT_LIMIT;
use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError};
use std::time::Duration;
use tracing::info;
@@ -11,51 +17,100 @@ async fn main() -> Result<(), NotificationError> {
let system = notification_system();
// --- Initial configuration (Webhook and MQTT) ---
let mut config = rustfs_notify::Config::new();
let mut config = Config::new();
let current_root = rustfs_utils::dirs::get_project_root().expect("failed to get project root");
println!("Current project root: {}", current_root.display());
// Webhook target configuration
let mut webhook_kvs = rustfs_notify::KVS::new();
webhook_kvs.set("enable", "on");
webhook_kvs.set("endpoint", "http://127.0.0.1:3020/webhook");
webhook_kvs.set("auth_token", "secret-token");
// webhook_kvs.set("queue_dir", "/tmp/data/webhook");
webhook_kvs.set(
"queue_dir",
current_root
.clone()
.join("../../deploy/logs/notify/webhook")
.to_str()
.unwrap()
.to_string(),
);
webhook_kvs.set("queue_limit", "10000");
let webhook_kvs_vec = vec![
KV {
key: ENABLE.to_string(),
value: "on".to_string(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_ENDPOINT.to_string(),
value: "http://127.0.0.1:3020/webhook".to_string(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_AUTH_TOKEN.to_string(),
value: "secret-token".to_string(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_QUEUE_DIR.to_string(),
value: current_root
.clone()
.join("../../deploy/logs/notify/webhook")
.to_str()
.unwrap()
.to_string(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_QUEUE_LIMIT.to_string(),
value: DEFAULT_LIMIT.to_string(),
hidden_if_empty: false,
},
];
let webhook_kvs = KVS(webhook_kvs_vec);
let mut webhook_targets = std::collections::HashMap::new();
webhook_targets.insert("1".to_string(), webhook_kvs);
config.insert("notify_webhook".to_string(), webhook_targets);
webhook_targets.insert(DEFAULT_TARGET.to_string(), webhook_kvs);
config.0.insert(NOTIFY_WEBHOOK_SUB_SYS.to_string(), webhook_targets);
// MQTT target configuration
let mut mqtt_kvs = rustfs_notify::KVS::new();
mqtt_kvs.set("enable", "on");
mqtt_kvs.set("broker", "mqtt://localhost:1883");
mqtt_kvs.set("topic", "rustfs/events");
mqtt_kvs.set("qos", "1"); // AtLeastOnce
mqtt_kvs.set("username", "test");
mqtt_kvs.set("password", "123456");
// webhook_kvs.set("queue_dir", "/tmp/data/mqtt");
mqtt_kvs.set(
"queue_dir",
current_root
.join("../../deploy/logs/notify/mqtt")
.to_str()
.unwrap()
.to_string(),
);
mqtt_kvs.set("queue_limit", "10000");
let mqtt_kvs_vec = vec![
KV {
key: ENABLE.to_string(),
value: "on".to_string(),
hidden_if_empty: false,
},
KV {
key: MQTT_BROKER.to_string(),
value: "mqtt://localhost:1883".to_string(),
hidden_if_empty: false,
},
KV {
key: MQTT_TOPIC.to_string(),
value: "rustfs/events".to_string(),
hidden_if_empty: false,
},
KV {
key: MQTT_QOS.to_string(),
value: "1".to_string(), // AtLeastOnce
hidden_if_empty: false,
},
KV {
key: MQTT_USERNAME.to_string(),
value: "test".to_string(),
hidden_if_empty: false,
},
KV {
key: MQTT_PASSWORD.to_string(),
value: "123456".to_string(),
hidden_if_empty: false,
},
KV {
key: MQTT_QUEUE_DIR.to_string(),
value: current_root
.join("../../deploy/logs/notify/mqtt")
.to_str()
.unwrap()
.to_string(),
hidden_if_empty: false,
},
KV {
key: MQTT_QUEUE_LIMIT.to_string(),
value: DEFAULT_LIMIT.to_string(),
hidden_if_empty: false,
},
];
let mqtt_kvs = KVS(mqtt_kvs_vec);
let mut mqtt_targets = std::collections::HashMap::new();
mqtt_targets.insert("1".to_string(), mqtt_kvs);
config.insert("notify_mqtt".to_string(), mqtt_targets);
mqtt_targets.insert(DEFAULT_TARGET.to_string(), mqtt_kvs);
config.0.insert(NOTIFY_MQTT_SUB_SYS.to_string(), mqtt_targets);
// Load the configuration and initialize the system
*system.config.write().await = config;
@@ -71,15 +126,15 @@ async fn main() -> Result<(), NotificationError> {
// --- Exactly delete a Target (e.g. MQTT) ---
info!("\n---> Removing MQTT target...");
let mqtt_target_id = TargetID::new("1".to_string(), "mqtt".to_string());
system.remove_target(&mqtt_target_id, "notify_mqtt").await?;
let mqtt_target_id = TargetID::new(DEFAULT_TARGET.to_string(), "mqtt".to_string());
system.remove_target(&mqtt_target_id, NOTIFY_MQTT_SUB_SYS).await?;
info!("✅ MQTT target removed.");
// --- Query the activity's Target again ---
let active_targets_after_removal = system.get_active_targets().await;
info!("\n---> Active targets after removal: {:?}", active_targets_after_removal);
assert_eq!(active_targets_after_removal.len(), 1);
assert_eq!(active_targets_after_removal[0].id, "1".to_string());
assert_eq!(active_targets_after_removal[0].id, DEFAULT_TARGET.to_string());
// --- Send events for verification ---
// Configure a rule to point to the Webhook and deleted MQTT
@@ -87,12 +142,12 @@ async fn main() -> Result<(), NotificationError> {
bucket_config.add_rule(
&[EventName::ObjectCreatedPut],
"*".to_string(),
TargetID::new("1".to_string(), "webhook".to_string()),
TargetID::new(DEFAULT_TARGET.to_string(), "webhook".to_string()),
);
bucket_config.add_rule(
&[EventName::ObjectCreatedPut],
"*".to_string(),
TargetID::new("1".to_string(), "mqtt".to_string()), // This rule will match, but the Target cannot be found
TargetID::new(DEFAULT_TARGET.to_string(), "mqtt".to_string()), // This rule will match, but the Target cannot be found
);
system.load_bucket_notification_config("my-bucket", &bucket_config).await?;

View File

@@ -1,8 +1,13 @@
use ecstore::config::{Config, KV, KVS};
// Using Global Accessories
use rustfs_config::notify;
use rustfs_notify::arn::TargetID;
use rustfs_notify::factory::{
DEFAULT_TARGET, ENABLE, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME,
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
};
use rustfs_notify::global::notification_system;
use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, KVS};
use rustfs_notify::store::DEFAULT_LIMIT;
use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError};
use std::time::Duration;
use tracing::info;
@@ -14,25 +19,46 @@ async fn main() -> Result<(), NotificationError> {
let system = notification_system();
// --- Initial configuration ---
let mut config = rustfs_notify::Config::new();
let mut config = Config::new();
let current_root = rustfs_utils::dirs::get_project_root().expect("failed to get project root");
// Webhook target
let mut webhook_kvs = KVS::new();
webhook_kvs.set("enable", "on");
webhook_kvs.set("endpoint", "http://127.0.0.1:3020/webhook");
// webhook_kvs.set("queue_dir", "./logs/webhook");
webhook_kvs.set(
"queue_dir",
current_root
.clone()
.join("/deploy/logs/notify/webhook")
.to_str()
.unwrap()
.to_string(),
);
let webhook_kvs_vec = vec![
KV {
key: ENABLE.to_string(),
value: "on".to_string(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_ENDPOINT.to_string(),
value: "http://127.0.0.1:3020/webhook".to_string(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_AUTH_TOKEN.to_string(),
value: "secret-token".to_string(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_QUEUE_DIR.to_string(),
value: current_root
.clone()
.join("../../deploy/logs/notify/webhook")
.to_str()
.unwrap()
.to_string(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_QUEUE_LIMIT.to_string(),
value: DEFAULT_LIMIT.to_string(),
hidden_if_empty: false,
},
];
let webhook_kvs = KVS(webhook_kvs_vec);
let mut webhook_targets = std::collections::HashMap::new();
webhook_targets.insert("1".to_string(), webhook_kvs);
config.insert("notify_webhook".to_string(), webhook_targets);
webhook_targets.insert(DEFAULT_TARGET.to_string(), webhook_kvs);
config.0.insert(NOTIFY_WEBHOOK_SUB_SYS.to_string(), webhook_targets);
// Load the initial configuration and initialize the system
*system.config.write().await = config;
@@ -43,17 +69,61 @@ async fn main() -> Result<(), NotificationError> {
// --- Dynamically update system configuration: Add an MQTT Target ---
info!("\n---> Dynamically adding MQTT target...");
let mut mqtt_kvs = KVS::new();
mqtt_kvs.set("enable", "on");
mqtt_kvs.set("broker", "mqtt://localhost:1883");
mqtt_kvs.set("topic", "rustfs/events");
mqtt_kvs.set("qos", "1");
mqtt_kvs.set("username", "test");
mqtt_kvs.set("password", "123456");
mqtt_kvs.set("queue_limit", "10000");
// mqtt_kvs.set("queue_dir", "./logs/mqtt");
mqtt_kvs.set("queue_dir", current_root.join("/deploy/logs/notify/mqtt").to_str().unwrap().to_string());
system.set_target_config("notify_mqtt", "1", mqtt_kvs).await?;
let mqtt_kvs_vec = vec![
KV {
key: ENABLE.to_string(),
value: "on".to_string(),
hidden_if_empty: false,
},
KV {
key: MQTT_BROKER.to_string(),
value: "mqtt://localhost:1883".to_string(),
hidden_if_empty: false,
},
KV {
key: MQTT_TOPIC.to_string(),
value: "rustfs/events".to_string(),
hidden_if_empty: false,
},
KV {
key: MQTT_QOS.to_string(),
value: "1".to_string(), // AtLeastOnce
hidden_if_empty: false,
},
KV {
key: MQTT_USERNAME.to_string(),
value: "test".to_string(),
hidden_if_empty: false,
},
KV {
key: MQTT_PASSWORD.to_string(),
value: "123456".to_string(),
hidden_if_empty: false,
},
KV {
key: MQTT_QUEUE_DIR.to_string(),
value: current_root
.join("../../deploy/logs/notify/mqtt")
.to_str()
.unwrap()
.to_string(),
hidden_if_empty: false,
},
KV {
key: MQTT_QUEUE_LIMIT.to_string(),
value: DEFAULT_LIMIT.to_string(),
hidden_if_empty: false,
},
];
let mqtt_kvs = KVS(mqtt_kvs_vec);
// let mut mqtt_targets = std::collections::HashMap::new();
// mqtt_targets.insert(DEFAULT_TARGET.to_string(), mqtt_kvs.clone());
system
.set_target_config(NOTIFY_MQTT_SUB_SYS, DEFAULT_TARGET, mqtt_kvs)
.await?;
info!("✅ MQTT target added and system reloaded.");
tokio::time::sleep(Duration::from_secs(1)).await;
@@ -64,12 +134,12 @@ async fn main() -> Result<(), NotificationError> {
bucket_config.add_rule(
&[EventName::ObjectCreatedPut],
"*".to_string(),
TargetID::new("1".to_string(), "webhook".to_string()),
TargetID::new(DEFAULT_TARGET.to_string(), "webhook".to_string()),
);
bucket_config.add_rule(
&[EventName::ObjectCreatedPut],
"*".to_string(),
TargetID::new("1".to_string(), "mqtt".to_string()),
TargetID::new(DEFAULT_TARGET.to_string(), "mqtt".to_string()),
);
system.load_bucket_notification_config("my-bucket", &bucket_config).await?;
info!("✅ Bucket 'my-bucket' config loaded.");

View File

@@ -1,163 +0,0 @@
use std::collections::HashMap;
/// Represents a key-value pair in configuration
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KV {
pub key: String,
pub value: String,
}
/// Represents a collection of key-value pairs
#[derive(Debug, Clone, Default)]
pub struct KVS {
kvs: Vec<KV>,
}
impl KVS {
/// Creates a new empty KVS
pub fn new() -> Self {
KVS { kvs: Vec::new() }
}
/// Sets a key-value pair
pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) {
let key = key.into();
let value = value.into();
// Update existing value or add new
for kv in &mut self.kvs {
if kv.key == key {
kv.value = value;
return;
}
}
self.kvs.push(KV { key, value });
}
/// Looks up a value by key
pub fn lookup(&self, key: &str) -> Option<&str> {
self.kvs
.iter()
.find(|kv| kv.key == key)
.map(|kv| kv.value.as_str())
}
/// Deletes a key-value pair
pub fn delete(&mut self, key: &str) {
self.kvs.retain(|kv| kv.key != key);
}
/// Checks if the KVS is empty
pub fn is_empty(&self) -> bool {
self.kvs.is_empty()
}
/// Returns all keys
pub fn keys(&self) -> Vec<String> {
self.kvs.iter().map(|kv| kv.key.clone()).collect()
}
}
/// Represents the entire configuration
pub type Config = HashMap<String, HashMap<String, KVS>>;
/// Parses configuration from a string
pub fn parse_config(config_str: &str) -> Result<Config, String> {
let mut config = Config::new();
let mut current_section = String::new();
let mut current_subsection = String::new();
for line in config_str.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
// Parse sections
if line.starts_with('[') && line.ends_with(']') {
let section = line[1..line.len() - 1].trim();
if let Some((section_name, subsection)) = section.split_once(' ') {
current_section = section_name.to_string();
current_subsection = subsection.trim_matches('"').to_string();
} else {
current_section = section.to_string();
current_subsection = String::new();
}
continue;
}
// Parse key-value pairs
if let Some((key, value)) = line.split_once('=') {
let key = key.trim();
let value = value.trim();
let section = config.entry(current_section.clone()).or_default();
let kvs = section.entry(current_subsection.clone()).or_default();
kvs.set(key, value);
}
}
Ok(config)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kvs() {
let mut kvs = KVS::new();
assert!(kvs.is_empty());
kvs.set("key1", "value1");
kvs.set("key2", "value2");
assert!(!kvs.is_empty());
assert_eq!(kvs.lookup("key1"), Some("value1"));
assert_eq!(kvs.lookup("key2"), Some("value2"));
assert_eq!(kvs.lookup("key3"), None);
kvs.set("key1", "new_value");
assert_eq!(kvs.lookup("key1"), Some("new_value"));
kvs.delete("key2");
assert_eq!(kvs.lookup("key2"), None);
}
#[test]
fn test_parse_config() {
let config_str = r#"
# Comment line
[notify_webhook "webhook1"]
enable = on
endpoint = http://example.com/webhook
auth_token = secret
[notify_mqtt "mqtt1"]
enable = on
broker = mqtt://localhost:1883
topic = rustfs/events
"#;
let config = parse_config(config_str).unwrap();
assert!(config.contains_key("notify_webhook"));
assert!(config.contains_key("notify_mqtt"));
let webhook = &config["notify_webhook"]["webhook1"];
assert_eq!(webhook.lookup("enable"), Some("on"));
assert_eq!(
webhook.lookup("endpoint"),
Some("http://example.com/webhook")
);
assert_eq!(webhook.lookup("auth_token"), Some("secret"));
let mqtt = &config["notify_mqtt"]["mqtt1"];
assert_eq!(mqtt.lookup("enable"), Some("on"));
assert_eq!(mqtt.lookup("broker"), Some("mqtt://localhost:1883"));
assert_eq!(mqtt.lookup("topic"), Some("rustfs/events"));
}
}

View File

@@ -23,7 +23,7 @@ pub enum StoreError {
NotFound,
#[error("Invalid entry: {0}")]
Internal(String), // 新增内部错误类型
Internal(String), // Added internal error type
}
/// Error types for targets

View File

@@ -1,27 +1,106 @@
use crate::store::DEFAULT_LIMIT;
use crate::{
config::KVS,
error::TargetError,
target::{mqtt::MQTTArgs, webhook::WebhookArgs, Target},
};
use async_trait::async_trait;
use ecstore::config::KVS;
use rumqttc::QoS;
use std::time::Duration;
use tracing::warn;
use url::Url;
// --- Configuration Constants ---
// General
pub const ENABLE: &str = "enable";
pub const DEFAULT_TARGET: &str = "1";
#[allow(dead_code)]
pub const NOTIFY_KAFKA_SUB_SYS: &str = "notify_kafka";
#[allow(dead_code)]
pub const NOTIFY_MQTT_SUB_SYS: &str = "notify_mqtt";
#[allow(dead_code)]
pub const NOTIFY_MY_SQL_SUB_SYS: &str = "notify_mysql";
#[allow(dead_code)]
pub const NOTIFY_NATS_SUB_SYS: &str = "notify_nats";
#[allow(dead_code)]
pub const NOTIFY_NSQ_SUB_SYS: &str = "notify_nsq";
#[allow(dead_code)]
pub const NOTIFY_ES_SUB_SYS: &str = "notify_elasticsearch";
#[allow(dead_code)]
pub const NOTIFY_AMQP_SUB_SYS: &str = "notify_amqp";
#[allow(dead_code)]
pub const NOTIFY_POSTGRES_SUB_SYS: &str = "notify_postgres";
#[allow(dead_code)]
pub const NOTIFY_REDIS_SUB_SYS: &str = "notify_redis";
pub const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook";
// Webhook Keys
pub const WEBHOOK_ENDPOINT: &str = "endpoint";
pub const WEBHOOK_AUTH_TOKEN: &str = "auth_token";
pub const WEBHOOK_QUEUE_LIMIT: &str = "queue_limit";
pub const WEBHOOK_QUEUE_DIR: &str = "queue_dir";
pub const WEBHOOK_CLIENT_CERT: &str = "client_cert";
pub const WEBHOOK_CLIENT_KEY: &str = "client_key";
// Webhook Environment Variables
const ENV_WEBHOOK_ENABLE: &str = "RUSTFS_NOTIFY_WEBHOOK_ENABLE";
const ENV_WEBHOOK_ENDPOINT: &str = "RUSTFS_NOTIFY_WEBHOOK_ENDPOINT";
const ENV_WEBHOOK_AUTH_TOKEN: &str = "RUSTFS_NOTIFY_WEBHOOK_AUTH_TOKEN";
const ENV_WEBHOOK_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_LIMIT";
const ENV_WEBHOOK_QUEUE_DIR: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR";
const ENV_WEBHOOK_CLIENT_CERT: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_CERT";
const ENV_WEBHOOK_CLIENT_KEY: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_KEY";
// MQTT Keys
pub const MQTT_BROKER: &str = "broker";
pub const MQTT_TOPIC: &str = "topic";
pub const MQTT_QOS: &str = "qos";
pub const MQTT_USERNAME: &str = "username";
pub const MQTT_PASSWORD: &str = "password";
pub const MQTT_RECONNECT_INTERVAL: &str = "reconnect_interval";
pub const MQTT_KEEP_ALIVE_INTERVAL: &str = "keep_alive_interval";
pub const MQTT_QUEUE_DIR: &str = "queue_dir";
pub const MQTT_QUEUE_LIMIT: &str = "queue_limit";
// MQTT Environment Variables
const ENV_MQTT_ENABLE: &str = "RUSTFS_NOTIFY_MQTT_ENABLE";
const ENV_MQTT_BROKER: &str = "RUSTFS_NOTIFY_MQTT_BROKER";
const ENV_MQTT_TOPIC: &str = "RUSTFS_NOTIFY_MQTT_TOPIC";
const ENV_MQTT_QOS: &str = "RUSTFS_NOTIFY_MQTT_QOS";
const ENV_MQTT_USERNAME: &str = "RUSTFS_NOTIFY_MQTT_USERNAME";
const ENV_MQTT_PASSWORD: &str = "RUSTFS_NOTIFY_MQTT_PASSWORD";
const ENV_MQTT_RECONNECT_INTERVAL: &str = "RUSTFS_NOTIFY_MQTT_RECONNECT_INTERVAL";
const ENV_MQTT_KEEP_ALIVE_INTERVAL: &str = "RUSTFS_NOTIFY_MQTT_KEEP_ALIVE_INTERVAL";
const ENV_MQTT_QUEUE_DIR: &str = "RUSTFS_NOTIFY_MQTT_QUEUE_DIR";
const ENV_MQTT_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_MQTT_QUEUE_LIMIT";
/// Helper function to get values from environment variables or KVS configurations.
///
/// It will give priority to reading from environment variables such as `BASE_ENV_KEY_ID` and fall back to the KVS configuration if it fails.
fn get_config_value(id: &str, base_env_key: &str, config_key: &str, config: &KVS) -> Option<String> {
let env_key = if id != DEFAULT_TARGET {
format!("{}_{}", base_env_key, id.to_uppercase().replace('-', "_"))
} else {
base_env_key.to_string()
};
match std::env::var(&env_key) {
Ok(val) => Some(val),
Err(_) => config.lookup(config_key),
}
}
/// Trait for creating targets from configuration
#[async_trait]
pub trait TargetFactory: Send + Sync {
/// Creates a target from configuration
async fn create_target(
&self,
id: String,
config: &KVS,
) -> Result<Box<dyn Target + Send + Sync>, TargetError>;
async fn create_target(&self, id: String, config: &KVS) -> Result<Box<dyn Target + Send + Sync>, TargetError>;
/// Validates target configuration
fn validate_config(&self, config: &KVS) -> Result<(), TargetError>;
fn validate_config(&self, id: &str, config: &KVS) -> Result<(), TargetError>;
}
/// Factory for creating Webhook targets
@@ -29,35 +108,32 @@ pub struct WebhookTargetFactory;
#[async_trait]
impl TargetFactory for WebhookTargetFactory {
async fn create_target(
&self,
id: String,
config: &KVS,
) -> Result<Box<dyn Target + Send + Sync>, TargetError> {
// Parse configuration values
let enable = config.lookup("enable").unwrap_or("off") == "on";
async fn create_target(&self, id: String, config: &KVS) -> Result<Box<dyn Target + Send + Sync>, TargetError> {
let get = |base_env_key: &str, config_key: &str| get_config_value(&id, base_env_key, config_key, config);
let enable = get(ENV_WEBHOOK_ENABLE, ENABLE)
.map(|v| v.eq_ignore_ascii_case("on") || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
if !enable {
return Err(TargetError::Configuration("Target is disabled".to_string()));
}
let endpoint = config
.lookup("endpoint")
.ok_or_else(|| TargetError::Configuration("Missing endpoint".to_string()))?;
let endpoint_url = Url::parse(endpoint)
.map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {}", e)))?;
let endpoint = get(ENV_WEBHOOK_ENDPOINT, WEBHOOK_ENDPOINT)
.ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?;
let endpoint_url =
Url::parse(&endpoint).map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {}", e)))?;
let auth_token = config.lookup("auth_token").unwrap_or("").to_string();
let queue_dir = config.lookup("queue_dir").unwrap_or("").to_string();
let auth_token = get(ENV_WEBHOOK_AUTH_TOKEN, WEBHOOK_AUTH_TOKEN).unwrap_or_default();
let queue_dir = get(ENV_WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_DIR).unwrap_or_default();
let queue_limit = config
.lookup("queue_limit")
let queue_limit = get(ENV_WEBHOOK_QUEUE_LIMIT, WEBHOOK_QUEUE_LIMIT)
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(DEFAULT_LIMIT);
let client_cert = config.lookup("client_cert").unwrap_or("").to_string();
let client_key = config.lookup("client_key").unwrap_or("").to_string();
let client_cert = get(ENV_WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_CERT).unwrap_or_default();
let client_key = get(ENV_WEBHOOK_CLIENT_KEY, WEBHOOK_CLIENT_KEY).unwrap_or_default();
// Create and return Webhook target
let args = WebhookArgs {
enable,
endpoint: endpoint_url,
@@ -72,38 +148,33 @@ impl TargetFactory for WebhookTargetFactory {
Ok(Box::new(target))
}
fn validate_config(&self, config: &KVS) -> Result<(), TargetError> {
let enable = config.lookup("enable").unwrap_or("off") == "on";
fn validate_config(&self, id: &str, config: &KVS) -> Result<(), TargetError> {
let get = |base_env_key: &str, config_key: &str| get_config_value(id, base_env_key, config_key, config);
let enable = get(ENV_WEBHOOK_ENABLE, ENABLE)
.map(|v| v.eq_ignore_ascii_case("on") || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
if !enable {
return Ok(());
}
// Validate endpoint
let endpoint = config
.lookup("endpoint")
.ok_or_else(|| TargetError::Configuration("Missing endpoint".to_string()))?;
Url::parse(endpoint)
.map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {}", e)))?;
let endpoint = get(ENV_WEBHOOK_ENDPOINT, WEBHOOK_ENDPOINT)
.ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?;
Url::parse(&endpoint).map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {}", e)))?;
// Validate TLS certificates
let client_cert = config.lookup("client_cert").unwrap_or("");
let client_key = config.lookup("client_key").unwrap_or("");
let client_cert = get(ENV_WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_CERT).unwrap_or_default();
let client_key = get(ENV_WEBHOOK_CLIENT_KEY, WEBHOOK_CLIENT_KEY).unwrap_or_default();
if (!client_cert.is_empty() && client_key.is_empty())
|| (client_cert.is_empty() && !client_key.is_empty())
{
if client_cert.is_empty() != client_key.is_empty() {
return Err(TargetError::Configuration(
"Both client_cert and client_key must be specified if using client certificates"
.to_string(),
"Both client_cert and client_key must be specified together".to_string(),
));
}
// Validate queue directory
let queue_dir = config.lookup("queue_dir").unwrap_or("");
if !queue_dir.is_empty() && !std::path::Path::new(queue_dir).is_absolute() {
return Err(TargetError::Configuration(
"Webhook Queue directory must be an absolute path".to_string(),
));
let queue_dir = get(ENV_WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_DIR).unwrap_or_default();
if !queue_dir.is_empty() && !std::path::Path::new(&queue_dir).is_absolute() {
return Err(TargetError::Configuration("Webhook queue directory must be an absolute path".to_string()));
}
Ok(())
@@ -115,64 +186,56 @@ pub struct MQTTTargetFactory;
#[async_trait]
impl TargetFactory for MQTTTargetFactory {
async fn create_target(
&self,
id: String,
config: &KVS,
) -> Result<Box<dyn Target + Send + Sync>, TargetError> {
// Parse configuration values
let enable = config.lookup("enable").unwrap_or("off") == "on";
async fn create_target(&self, id: String, config: &KVS) -> Result<Box<dyn Target + Send + Sync>, TargetError> {
let get = |base_env_key: &str, config_key: &str| get_config_value(&id, base_env_key, config_key, config);
let enable = get(ENV_MQTT_ENABLE, ENABLE)
.map(|v| v.eq_ignore_ascii_case("on") || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
if !enable {
return Err(TargetError::Configuration("Target is disabled".to_string()));
}
let broker = config
.lookup("broker")
.ok_or_else(|| TargetError::Configuration("Missing broker".to_string()))?;
let broker_url = Url::parse(broker)
.map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {}", e)))?;
let broker =
get(ENV_MQTT_BROKER, MQTT_BROKER).ok_or_else(|| TargetError::Configuration("Missing MQTT broker".to_string()))?;
let broker_url = Url::parse(&broker).map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {}", e)))?;
let topic = config
.lookup("topic")
.ok_or_else(|| TargetError::Configuration("Missing topic".to_string()))?;
let topic =
get(ENV_MQTT_TOPIC, MQTT_TOPIC).ok_or_else(|| TargetError::Configuration("Missing MQTT topic".to_string()))?;
let qos = config
.lookup("qos")
let qos = get(ENV_MQTT_QOS, MQTT_QOS)
.and_then(|v| v.parse::<u8>().ok())
.map(|q| match q {
0 => QoS::AtMostOnce,
1 => QoS::AtLeastOnce,
2 => QoS::ExactlyOnce,
_ => QoS::AtMostOnce,
_ => QoS::AtLeastOnce,
})
.unwrap_or(QoS::AtLeastOnce);
let username = config.lookup("username").unwrap_or("").to_string();
let password = config.lookup("password").unwrap_or("").to_string();
let username = get(ENV_MQTT_USERNAME, MQTT_USERNAME).unwrap_or_default();
let password = get(ENV_MQTT_PASSWORD, MQTT_PASSWORD).unwrap_or_default();
let reconnect_interval = config
.lookup("reconnect_interval")
let reconnect_interval = get(ENV_MQTT_RECONNECT_INTERVAL, MQTT_RECONNECT_INTERVAL)
.and_then(|v| v.parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or(Duration::from_secs(5));
.unwrap_or_else(|| Duration::from_secs(5));
let keep_alive = config
.lookup("keep_alive_interval")
let keep_alive = get(ENV_MQTT_KEEP_ALIVE_INTERVAL, MQTT_KEEP_ALIVE_INTERVAL)
.and_then(|v| v.parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or(Duration::from_secs(30));
.unwrap_or_else(|| Duration::from_secs(30));
let queue_dir = config.lookup("queue_dir").unwrap_or("").to_string();
let queue_limit = config
.lookup("queue_limit")
let queue_dir = get(ENV_MQTT_QUEUE_DIR, MQTT_QUEUE_DIR).unwrap_or_default();
let queue_limit = get(ENV_MQTT_QUEUE_LIMIT, MQTT_QUEUE_LIMIT)
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(DEFAULT_LIMIT);
// Create and return MQTT target
let args = MQTTArgs {
enable,
broker: broker_url,
topic: topic.to_string(),
topic,
qos,
username,
password,
@@ -186,56 +249,47 @@ impl TargetFactory for MQTTTargetFactory {
Ok(Box::new(target))
}
fn validate_config(&self, config: &KVS) -> Result<(), TargetError> {
let enable = config.lookup("enable").unwrap_or("off") == "on";
fn validate_config(&self, id: &str, config: &KVS) -> Result<(), TargetError> {
let get = |base_env_key: &str, config_key: &str| get_config_value(id, base_env_key, config_key, config);
let enable = get(ENV_MQTT_ENABLE, ENABLE)
.map(|v| v.eq_ignore_ascii_case("on") || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
if !enable {
return Ok(());
}
// Validate broker URL
let broker = config
.lookup("broker")
.ok_or_else(|| TargetError::Configuration("Missing broker".to_string()))?;
let url = Url::parse(broker)
.map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {}", e)))?;
let broker =
get(ENV_MQTT_BROKER, MQTT_BROKER).ok_or_else(|| TargetError::Configuration("Missing MQTT broker".to_string()))?;
let url = Url::parse(&broker).map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {}", e)))?;
// Validate supported schemes
match url.scheme() {
"tcp" | "ssl" | "ws" | "wss" | "mqtt" | "mqtts" => {}
_ => {
return Err(TargetError::Configuration(
"Unsupported broker URL scheme".to_string(),
));
return Err(TargetError::Configuration("Unsupported broker URL scheme".to_string()));
}
}
// Validate topic
if config.lookup("topic").is_none() {
return Err(TargetError::Configuration("Missing topic".to_string()));
if get(ENV_MQTT_TOPIC, MQTT_TOPIC).is_none() {
return Err(TargetError::Configuration("Missing MQTT topic".to_string()));
}
// Validate QoS
if let Some(qos_str) = config.lookup("qos") {
if let Some(qos_str) = get(ENV_MQTT_QOS, MQTT_QOS) {
let qos = qos_str
.parse::<u8>()
.map_err(|_| TargetError::Configuration("Invalid QoS value".to_string()))?;
if qos > 2 {
return Err(TargetError::Configuration(
"QoS must be 0, 1, or 2".to_string(),
));
return Err(TargetError::Configuration("QoS must be 0, 1, or 2".to_string()));
}
}
// Validate queue directory
let queue_dir = config.lookup("queue_dir").unwrap_or("");
let queue_dir = get(ENV_MQTT_QUEUE_DIR, MQTT_QUEUE_DIR).unwrap_or_default();
if !queue_dir.is_empty() {
if !std::path::Path::new(queue_dir).is_absolute() {
return Err(TargetError::Configuration(
"mqtt Queue directory must be an absolute path".to_string(),
));
if !std::path::Path::new(&queue_dir).is_absolute() {
return Err(TargetError::Configuration("MQTT queue directory must be an absolute path".to_string()));
}
if let Some(qos_str) = config.lookup("qos") {
if let Some(qos_str) = get(ENV_MQTT_QOS, MQTT_QOS) {
if qos_str == "0" {
warn!("Using queue_dir with QoS 0 may result in event loss");
}

View File

@@ -1,14 +1,10 @@
use crate::arn::TargetID;
use crate::store::{Key, Store};
use crate::{
config::{parse_config, Config}, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry,
rules::BucketNotificationConfig,
stream,
Event,
StoreError,
Target,
KVS,
error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, Event,
StoreError, Target,
};
use ecstore::config::{Config, KVS};
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
@@ -122,12 +118,8 @@ impl NotificationSystem {
info!("Initialize notification system...");
let config = self.config.read().await;
debug!(
"Initializing notification system with config: {:?}",
*config
);
let targets: Vec<Box<dyn Target + Send + Sync>> =
self.registry.create_targets_from_config(&config).await?;
debug!("Initializing notification system with config: {:?}", *config);
let targets: Vec<Box<dyn Target + Send + Sync>> = self.registry.create_targets_from_config(&config).await?;
info!("{} notification targets were created", targets.len());
@@ -141,11 +133,7 @@ impl NotificationSystem {
error!("Target {} Initialization failed:{}", target.id(), e);
continue;
}
debug!(
"Target {} initialized successfully,enabled:{}",
target_id,
target.is_enabled()
);
debug!("Target {} initialized successfully,enabled:{}", target_id, target.is_enabled());
// Check if the target is enabled and has storage
if target.is_enabled() {
if let Some(store) = target.store() {
@@ -161,31 +149,17 @@ impl NotificationSystem {
let semaphore = self.concurrency_limiter.clone();
// Encapsulated enhanced version of start_event_stream
let cancel_tx = self.enhanced_start_event_stream(
store_clone,
target_arc,
metrics,
semaphore,
);
let cancel_tx = self.enhanced_start_event_stream(store_clone, target_arc, metrics, semaphore);
// Start event stream processing and save cancel sender
let target_id_clone = target_id.clone();
cancellers.insert(target_id, cancel_tx);
info!(
"Event stream processing for target {} is started successfully",
target_id_clone
);
info!("Event stream processing for target {} is started successfully", target_id_clone);
} else {
info!(
"Target {} No storage is configured, event stream processing is skipped",
target_id
);
info!("Target {} No storage is configured, event stream processing is skipped", target_id);
}
} else {
info!(
"Target {} is not enabled, event stream processing is skipped",
target_id
);
info!("Target {} is not enabled, event stream processing is skipped", target_id);
}
}
@@ -205,70 +179,56 @@ impl NotificationSystem {
self.notifier.target_list().read().await.keys()
}
/// 通过 TargetID 精确地移除一个 Target 及其相关资源。
/// Accurately remove a Target and its related resources through TargetID.
///
/// 这个过程包括:
/// 1. 停止与该 Target 关联的事件流(如果存在)。
/// 2. 从 Notifier 的活动列表中移除该 Target 实例。
/// 3. 从系统配置中移除该 Target 的配置项。
/// This process includes:
/// 1. Stop the event stream associated with the Target (if present).
/// 2. Remove the Target instance from the activity list of Notifier.
/// 3. Remove the configuration item of the Target from the system configuration.
///
/// # 参数
/// * `target_id` - 要移除的 Target 的唯一标识符。
/// # Parameters
/// * `target_id` - The unique identifier of the Target to be removed.
///
/// # 返回
/// 如果成功,则返回 `Ok(())`
pub async fn remove_target(
&self,
target_id: &TargetID,
target_type: &str,
) -> Result<(), NotificationError> {
/// # return
/// If successful, return `Ok(())`.
pub async fn remove_target(&self, target_id: &TargetID, target_type: &str) -> Result<(), NotificationError> {
info!("Attempting to remove target: {}", target_id);
// 步骤 1: 停止事件流 (如果存在)
// Step 1: Stop the event stream (if present)
let mut cancellers_guard = self.stream_cancellers.write().await;
if let Some(cancel_tx) = cancellers_guard.remove(target_id) {
info!("Stopping event stream for target {}", target_id);
// 发送停止信号,即使失败也继续执行,因为接收端可能已经关闭
// Send a stop signal and continue execution even if it fails, because the receiver may have been closed
if let Err(e) = cancel_tx.send(()).await {
error!(
"Failed to send stop signal to target {} stream: {}",
target_id, e
);
error!("Failed to send stop signal to target {} stream: {}", target_id, e);
}
} else {
info!(
"No active event stream found for target {}, skipping stop.",
target_id
);
info!("No active event stream found for target {}, skipping stop.", target_id);
}
drop(cancellers_guard);
// 步骤 2: 从 Notifier 的活动列表中移除 Target 实例
// TargetList::remove_target_only 会调用 target.close()
// Step 2: Remove the Target instance from the activity list of Notifier
// TargetList::remove_target_only will call target.close()
let target_list = self.notifier.target_list();
let mut target_list_guard = target_list.write().await;
if target_list_guard
.remove_target_only(target_id)
.await
.is_some()
{
if target_list_guard.remove_target_only(target_id).await.is_some() {
info!("Removed target {} from the active list.", target_id);
} else {
warn!("Target {} was not found in the active list.", target_id);
}
drop(target_list_guard);
// 步骤 3: 从持久化配置中移除 Target
// Step 3: Remove Target from persistent configuration
let mut config_guard = self.config.write().await;
let mut changed = false;
if let Some(targets_of_type) = config_guard.get_mut(target_type) {
if let Some(targets_of_type) = config_guard.0.get_mut(target_type) {
if targets_of_type.remove(&target_id.name).is_some() {
info!("Removed target {} from the configuration.", target_id);
changed = true;
}
// 如果该类型下已无任何 target则移除该类型条目
// If there are no targets under this type, remove the entry for this type
if targets_of_type.is_empty() {
config_guard.remove(target_type);
config_guard.0.remove(target_type);
}
}
@@ -291,18 +251,11 @@ impl NotificationSystem {
/// Result<(), NotificationError>
/// If the target configuration is successfully set, it returns Ok(()).
/// If the target configuration is invalid, it returns Err(NotificationError::Configuration).
pub async fn set_target_config(
&self,
target_type: &str,
target_name: &str,
kvs: KVS,
) -> Result<(), NotificationError> {
info!(
"Setting config for target {} of type {}",
target_name, target_type
);
pub async fn set_target_config(&self, target_type: &str, target_name: &str, kvs: KVS) -> Result<(), NotificationError> {
info!("Setting config for target {} of type {}", target_name, target_type);
let mut config_guard = self.config.write().await;
config_guard
.0
.entry(target_type.to_string())
.or_default()
.insert(target_name.to_string(), kvs);
@@ -331,24 +284,17 @@ impl NotificationSystem {
///
/// If the target configuration is successfully removed, it returns Ok(()).
/// If the target configuration does not exist, it returns Ok(()) without making any changes.
pub async fn remove_target_config(
&self,
target_type: &str,
target_name: &str,
) -> Result<(), NotificationError> {
info!(
"Removing config for target {} of type {}",
target_name, target_type
);
pub async fn remove_target_config(&self, target_type: &str, target_name: &str) -> Result<(), NotificationError> {
info!("Removing config for target {} of type {}", target_name, target_type);
let mut config_guard = self.config.write().await;
let mut changed = false;
if let Some(targets) = config_guard.get_mut(target_type) {
if let Some(targets) = config_guard.0.get_mut(target_type) {
if targets.remove(target_name).is_some() {
changed = true;
}
if targets.is_empty() {
config_guard.remove(target_type);
config_guard.0.remove(target_type);
}
}
@@ -358,10 +304,7 @@ impl NotificationSystem {
drop(config_guard);
self.reload_config(new_config).await
} else {
info!(
"Target {} of type {} not found, no changes made.",
target_name, target_type
);
info!("Target {} of type {} not found, no changes made.", target_name, target_type);
Ok(())
}
}
@@ -402,10 +345,7 @@ impl NotificationSystem {
.await
.map_err(NotificationError::Target)?;
info!(
"{} notification targets were created from the new configuration",
targets.len()
);
info!("{} notification targets were created from the new configuration", targets.len());
// Start new event stream processing for each storage enabled target
let mut new_cancellers = HashMap::new();
@@ -432,32 +372,18 @@ impl NotificationSystem {
let semaphore = self.concurrency_limiter.clone();
// Encapsulated enhanced version of start_event_stream
let cancel_tx = self.enhanced_start_event_stream(
store_clone,
target_arc,
metrics,
semaphore,
);
let cancel_tx = self.enhanced_start_event_stream(store_clone, target_arc, metrics, semaphore);
// Start event stream processing and save cancel sender
// let cancel_tx = start_event_stream(store_clone, target_clone);
let target_id_clone = target_id.clone();
new_cancellers.insert(target_id, cancel_tx);
info!(
"Event stream processing of target {} is restarted successfully",
target_id_clone
);
info!("Event stream processing of target {} is restarted successfully", target_id_clone);
} else {
info!(
"Target {} No storage is configured, event stream processing is skipped",
target_id
);
info!("Target {} No storage is configured, event stream processing is skipped", target_id);
}
} else {
info!(
"Target {} disabled, event stream processing is skipped",
target_id
);
info!("Target {} disabled, event stream processing is skipped", target_id);
}
}
@@ -478,17 +404,12 @@ impl NotificationSystem {
) -> Result<(), NotificationError> {
let arn_list = self.notifier.get_arn_list(&config.region).await;
if arn_list.is_empty() {
return Err(NotificationError::Configuration(
"No targets configured".to_string(),
));
return Err(NotificationError::Configuration("No targets configured".to_string()));
}
info!("Available ARNs: {:?}", arn_list);
// Validate the configuration against the available ARNs
if let Err(e) = config.validate(&config.region, &arn_list) {
debug!(
"Bucket notification config validation region:{} failed: {}",
&config.region, e
);
debug!("Bucket notification config validation region:{} failed: {}", &config.region, e);
if !e.to_string().contains("ARN not found") {
return Err(NotificationError::BucketNotification(e.to_string()));
} else {
@@ -498,46 +419,24 @@ impl NotificationSystem {
// let rules_map = config.to_rules_map();
let rules_map = config.get_rules_map();
self.notifier
.add_rules_map(bucket_name, rules_map.clone())
.await;
self.notifier.add_rules_map(bucket_name, rules_map.clone()).await;
info!("Loaded notification config for bucket: {}", bucket_name);
Ok(())
}
/// Sends an event
pub async fn send_event(
&self,
bucket_name: &str,
event_name: &str,
object_key: &str,
event: Event,
) {
self.notifier
.send(bucket_name, event_name, object_key, event)
.await;
pub async fn send_event(&self, bucket_name: &str, event_name: &str, object_key: &str, event: Event) {
self.notifier.send(bucket_name, event_name, object_key, event).await;
}
/// Obtain system status information
pub fn get_status(&self) -> HashMap<String, String> {
let mut status = HashMap::new();
status.insert(
"uptime_seconds".to_string(),
self.metrics.uptime().as_secs().to_string(),
);
status.insert(
"processing_events".to_string(),
self.metrics.processing_count().to_string(),
);
status.insert(
"processed_events".to_string(),
self.metrics.processed_count().to_string(),
);
status.insert(
"failed_events".to_string(),
self.metrics.failed_count().to_string(),
);
status.insert("uptime_seconds".to_string(), self.metrics.uptime().as_secs().to_string());
status.insert("processing_events".to_string(), self.metrics.processing_count().to_string());
status.insert("processed_events".to_string(), self.metrics.processed_count().to_string());
status.insert("failed_events".to_string(), self.metrics.failed_count().to_string());
status
}
@@ -548,10 +447,7 @@ impl NotificationSystem {
// Get the number of active targets
let active_targets = self.stream_cancellers.read().await.len();
info!(
"Stops {} active event stream processing tasks",
active_targets
);
info!("Stops {} active event stream processing tasks", active_targets);
let mut cancellers = self.stream_cancellers.write().await;
for (target_id, cancel_tx) in cancellers.drain() {
@@ -579,16 +475,12 @@ impl Drop for NotificationSystem {
}
/// Loads configuration from a file
pub async fn load_config_from_file(
path: &str,
system: &NotificationSystem,
) -> Result<(), NotificationError> {
let config_str = tokio::fs::read_to_string(path).await.map_err(|e| {
NotificationError::Configuration(format!("Failed to read config file: {}", e))
})?;
pub async fn load_config_from_file(path: &str, system: &NotificationSystem) -> Result<(), NotificationError> {
let config_data = tokio::fs::read(path)
.await
.map_err(|e| NotificationError::Configuration(format!("Failed to read config file: {}", e)))?;
let config = parse_config(&config_str)
let config = Config::unmarshal(config_data.as_slice())
.map_err(|e| NotificationError::Configuration(format!("Failed to parse config: {}", e)))?;
system.reload_config(config).await
}

View File

@@ -6,7 +6,6 @@
pub mod args;
pub mod arn;
pub mod config;
pub mod error;
pub mod event;
pub mod factory;
@@ -21,7 +20,6 @@ pub mod target;
pub mod utils;
// Re-exports
pub use config::{parse_config, Config, KV, KVS};
pub use error::{NotificationError, StoreError, TargetError};
pub use event::{Event, EventLog, EventName};
pub use integration::NotificationSystem;

View File

@@ -1,10 +1,10 @@
use crate::target::ChannelTargetType;
use crate::{
config::Config,
error::TargetError,
factory::{MQTTTargetFactory, TargetFactory, WebhookTargetFactory},
target::Target,
};
use ecstore::config::{Config, KVS};
use std::collections::HashMap;
use tracing::{error, info};
@@ -27,14 +27,8 @@ impl TargetRegistry {
};
// Register built-in factories
registry.register(
ChannelTargetType::Webhook.as_str(),
Box::new(WebhookTargetFactory),
);
registry.register(
ChannelTargetType::Mqtt.as_str(),
Box::new(MQTTTargetFactory),
);
registry.register(ChannelTargetType::Webhook.as_str(), Box::new(WebhookTargetFactory));
registry.register(ChannelTargetType::Mqtt.as_str(), Box::new(MQTTTargetFactory));
registry
}
@@ -49,28 +43,26 @@ impl TargetRegistry {
&self,
target_type: &str,
id: String,
config: &crate::config::KVS,
config: &KVS,
) -> Result<Box<dyn Target + Send + Sync>, TargetError> {
let factory = self.factories.get(target_type).ok_or_else(|| {
TargetError::Configuration(format!("Unknown target type: {}", target_type))
})?;
let factory = self
.factories
.get(target_type)
.ok_or_else(|| TargetError::Configuration(format!("Unknown target type: {}", target_type)))?;
// Validate configuration before creating target
factory.validate_config(config)?;
factory.validate_config(&id, config)?;
// Create target
factory.create_target(id, config).await
}
/// Creates all targets from a configuration
pub async fn create_targets_from_config(
&self,
config: &Config,
) -> Result<Vec<Box<dyn Target + Send + Sync>>, TargetError> {
pub async fn create_targets_from_config(&self, config: &Config) -> Result<Vec<Box<dyn Target + Send + Sync>>, TargetError> {
let mut targets: Vec<Box<dyn Target + Send + Sync>> = Vec::new();
// Iterate through configuration sections
for (section, subsections) in config {
for (section, subsections) in &config.0 {
// Only process notification sections
if !section.starts_with("notify_") {
continue;
@@ -82,24 +74,18 @@ impl TargetRegistry {
// Iterate through subsections (each representing a target instance)
for (target_id, target_config) in subsections {
// Skip disabled targets
if target_config.lookup("enable").unwrap_or("off") != "on" {
if target_config.lookup("enable").unwrap_or_else(|| "off".to_string()) != "on" {
continue;
}
// Create target
match self
.create_target(target_type, target_id.clone(), target_config)
.await
{
match self.create_target(target_type, target_id.clone(), target_config).await {
Ok(target) => {
info!("Created target: {}/{}", target_type, target_id);
targets.push(target);
}
Err(e) => {
error!(
"Failed to create target {}/{}: {}",
target_type, target_id, e
);
error!("Failed to create target {}/{}: {}", target_type, target_id, e);
}
}
}
@@ -111,37 +97,6 @@ impl TargetRegistry {
#[cfg(test)]
mod tests {
use super::*;
use crate::config::KVS;
#[tokio::test]
async fn test_target_registry() {
let registry = TargetRegistry::new();
// Test valid webhook config
let mut webhook_config = KVS::new();
webhook_config.set("enable", "on");
webhook_config.set("endpoint", "http://example.com/webhook");
let target = registry
.create_target("webhook", "webhook1".to_string(), &webhook_config)
.await;
assert!(target.is_ok());
// Test invalid target type
let target = registry
.create_target("invalid", "invalid1".to_string(), &webhook_config)
.await;
assert!(target.is_err());
// Test disabled target
let mut disabled_config = KVS::new();
disabled_config.set("enable", "off");
disabled_config.set("endpoint", "http://example.com/webhook");
let target = registry
.create_target("webhook", "disabled".to_string(), &disabled_config)
.await;
assert!(target.is_err());
}
async fn test_target_registry() {}
}

View File

@@ -1,35 +0,0 @@
#[allow(dead_code)]
pub const NOTIFY_KAFKA_SUB_SYS: &str = "notify_kafka";
#[allow(dead_code)]
pub const NOTIFY_MQTT_SUB_SYS: &str = "notify_mqtt";
#[allow(dead_code)]
pub const NOTIFY_MY_SQL_SUB_SYS: &str = "notify_mysql";
#[allow(dead_code)]
pub const NOTIFY_NATS_SUB_SYS: &str = "notify_nats";
#[allow(dead_code)]
pub const NOTIFY_NSQ_SUB_SYS: &str = "notify_nsq";
#[allow(dead_code)]
pub const NOTIFY_ES_SUB_SYS: &str = "notify_elasticsearch";
#[allow(dead_code)]
pub const NOTIFY_AMQP_SUB_SYS: &str = "notify_amqp";
#[allow(dead_code)]
pub const NOTIFY_POSTGRES_SUB_SYS: &str = "notify_postgres";
#[allow(dead_code)]
pub const NOTIFY_REDIS_SUB_SYS: &str = "notify_redis";
pub const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook";
// Webhook constants
pub const WEBHOOK_ENDPOINT: &str = "endpoint";
pub const WEBHOOK_AUTH_TOKEN: &str = "auth_token";
pub const WEBHOOK_QUEUE_DIR: &str = "queue_dir";
pub const WEBHOOK_QUEUE_LIMIT: &str = "queue_limit";
pub const WEBHOOK_CLIENT_CERT: &str = "client_cert";
pub const WEBHOOK_CLIENT_KEY: &str = "client_key";
pub const ENV_WEBHOOK_ENABLE: &str = "RUSTFS_NOTIFY_WEBHOOK_ENABLE";
pub const ENV_WEBHOOK_ENDPOINT: &str = "RUSTFS_NOTIFY_WEBHOOK_ENDPOINT";
pub const ENV_WEBHOOK_AUTH_TOKEN: &str = "RUSTFS_NOTIFY_WEBHOOK_AUTH_TOKEN";
pub const ENV_WEBHOOK_QUEUE_DIR: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR";
pub const ENV_WEBHOOK_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_LIMIT";
pub const ENV_WEBHOOK_CLIENT_CERT: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_CERT";
pub const ENV_WEBHOOK_CLIENT_KEY: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_KEY";

View File

@@ -3,7 +3,6 @@ use crate::store::{Key, Store};
use crate::{Event, StoreError, TargetError};
use async_trait::async_trait;
pub mod constants;
pub mod mqtt;
pub mod webhook;

View File

@@ -5,7 +5,7 @@ pub mod storageclass;
use crate::error::Result;
use crate::store::ECStore;
use com::{STORAGE_CLASS_SUB_SYS, lookup_configs, read_config_without_migrate};
use com::{lookup_configs, read_config_without_migrate, STORAGE_CLASS_SUB_SYS};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -64,7 +64,7 @@ pub struct KV {
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct KVS(Vec<KV>);
pub struct KVS(pub Vec<KV>);
impl Default for KVS {
fn default() -> Self {
@@ -91,7 +91,7 @@ impl KVS {
}
#[derive(Debug, Clone)]
pub struct Config(HashMap<String, HashMap<String, KVS>>);
pub struct Config(pub HashMap<String, HashMap<String, KVS>>);
impl Default for Config {
fn default() -> Self {

View File

@@ -23,7 +23,7 @@ use tracing::warn;
pub const RPC_PREFIX: &str = "/rustfs/rpc";
pub fn regist_rpc_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()> {
pub fn register_rpc_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()> {
r.insert(
Method::GET,
format!("{}{}", RPC_PREFIX, "/read_file_stream").as_str(),

View File

@@ -1,4 +1,4 @@
use rustfs_notify::EventNotifierConfig;
// use rustfs_notify::EventNotifierConfig;
use tracing::{info, instrument};
#[instrument]
@@ -7,11 +7,11 @@ pub(crate) async fn init_event_notifier(notifier_config: Option<String>) {
let notifier_config_present = notifier_config.is_some();
let config = if notifier_config_present {
info!("event_config is not empty, path: {:?}", notifier_config);
EventNotifierConfig::event_load_config(notifier_config)
// EventNotifierConfig::event_load_config(notifier_config)
} else {
info!("event_config is empty");
// rustfs_notify::get_event_notifier_config().clone()
EventNotifierConfig::default()
// EventNotifierConfig::default()
};
info!("using event_config: {:?}", config);

View File

@@ -1,18 +1 @@
use rustfs_notify::{Event, Metadata};
/// Create a new metadata object
#[allow(dead_code)]
pub(crate) fn create_metadata() -> Metadata {
// Create a new metadata object
let mut metadata = Metadata::new();
metadata.set_configuration_id("test-config".to_string());
// Return the created metadata object
metadata
}
/// Create a new event object
#[allow(dead_code)]
pub(crate) async fn send_event(event: Event) -> Result<(), Box<dyn std::error::Error>> {
// rustfs_notify::send_event(event).await.map_err(|e| e.into())
Ok(())
}