diff --git a/crates/notify/examples/full_demo.rs b/crates/notify/examples/full_demo.rs index 047ccfea..448cd60c 100644 --- a/crates/notify/examples/full_demo.rs +++ b/crates/notify/examples/full_demo.rs @@ -1,5 +1,11 @@ +use ecstore::config::{Config, KV, KVS}; use rustfs_notify::arn::TargetID; +use rustfs_notify::factory::{ + DEFAULT_TARGET, ENABLE, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME, + NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, +}; use rustfs_notify::global::notification_system; +use rustfs_notify::store::DEFAULT_LIMIT; use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError}; use std::time::Duration; use tracing::info; @@ -11,51 +17,100 @@ async fn main() -> Result<(), NotificationError> { let system = notification_system(); // --- Initial configuration (Webhook and MQTT) --- - let mut config = rustfs_notify::Config::new(); + let mut config = Config::new(); let current_root = rustfs_utils::dirs::get_project_root().expect("failed to get project root"); println!("Current project root: {}", current_root.display()); - // Webhook target configuration - let mut webhook_kvs = rustfs_notify::KVS::new(); - webhook_kvs.set("enable", "on"); - webhook_kvs.set("endpoint", "http://127.0.0.1:3020/webhook"); - webhook_kvs.set("auth_token", "secret-token"); - // webhook_kvs.set("queue_dir", "/tmp/data/webhook"); - webhook_kvs.set( - "queue_dir", - current_root - .clone() - .join("../../deploy/logs/notify/webhook") - .to_str() - .unwrap() - .to_string(), - ); - webhook_kvs.set("queue_limit", "10000"); + + let webhook_kvs_vec = vec![ + KV { + key: ENABLE.to_string(), + value: "on".to_string(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_ENDPOINT.to_string(), + value: "http://127.0.0.1:3020/webhook".to_string(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_AUTH_TOKEN.to_string(), + value: "secret-token".to_string(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_QUEUE_DIR.to_string(), + value: current_root + .clone() + .join("../../deploy/logs/notify/webhook") + .to_str() + .unwrap() + .to_string(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_QUEUE_LIMIT.to_string(), + value: DEFAULT_LIMIT.to_string(), + hidden_if_empty: false, + }, + ]; + let webhook_kvs = KVS(webhook_kvs_vec); + let mut webhook_targets = std::collections::HashMap::new(); - webhook_targets.insert("1".to_string(), webhook_kvs); - config.insert("notify_webhook".to_string(), webhook_targets); + webhook_targets.insert(DEFAULT_TARGET.to_string(), webhook_kvs); + config.0.insert(NOTIFY_WEBHOOK_SUB_SYS.to_string(), webhook_targets); // MQTT target configuration - let mut mqtt_kvs = rustfs_notify::KVS::new(); - mqtt_kvs.set("enable", "on"); - mqtt_kvs.set("broker", "mqtt://localhost:1883"); - mqtt_kvs.set("topic", "rustfs/events"); - mqtt_kvs.set("qos", "1"); // AtLeastOnce - mqtt_kvs.set("username", "test"); - mqtt_kvs.set("password", "123456"); - // webhook_kvs.set("queue_dir", "/tmp/data/mqtt"); - mqtt_kvs.set( - "queue_dir", - current_root - .join("../../deploy/logs/notify/mqtt") - .to_str() - .unwrap() - .to_string(), - ); - mqtt_kvs.set("queue_limit", "10000"); + let mqtt_kvs_vec = vec![ + KV { + key: ENABLE.to_string(), + value: "on".to_string(), + hidden_if_empty: false, + }, + KV { + key: MQTT_BROKER.to_string(), + value: "mqtt://localhost:1883".to_string(), + hidden_if_empty: false, + }, + KV { + key: MQTT_TOPIC.to_string(), + value: "rustfs/events".to_string(), + hidden_if_empty: false, + }, + KV { + key: MQTT_QOS.to_string(), + value: "1".to_string(), // AtLeastOnce + hidden_if_empty: false, + }, + KV { + key: MQTT_USERNAME.to_string(), + value: "test".to_string(), + hidden_if_empty: false, + }, + KV { + key: MQTT_PASSWORD.to_string(), + value: "123456".to_string(), + hidden_if_empty: false, + }, + KV { + key: MQTT_QUEUE_DIR.to_string(), + value: current_root + .join("../../deploy/logs/notify/mqtt") + .to_str() + .unwrap() + .to_string(), + hidden_if_empty: false, + }, + KV { + key: MQTT_QUEUE_LIMIT.to_string(), + value: DEFAULT_LIMIT.to_string(), + hidden_if_empty: false, + }, + ]; + let mqtt_kvs = KVS(mqtt_kvs_vec); let mut mqtt_targets = std::collections::HashMap::new(); - mqtt_targets.insert("1".to_string(), mqtt_kvs); - config.insert("notify_mqtt".to_string(), mqtt_targets); + mqtt_targets.insert(DEFAULT_TARGET.to_string(), mqtt_kvs); + config.0.insert(NOTIFY_MQTT_SUB_SYS.to_string(), mqtt_targets); // Load the configuration and initialize the system *system.config.write().await = config; @@ -71,15 +126,15 @@ async fn main() -> Result<(), NotificationError> { // --- Exactly delete a Target (e.g. MQTT) --- info!("\n---> Removing MQTT target..."); - let mqtt_target_id = TargetID::new("1".to_string(), "mqtt".to_string()); - system.remove_target(&mqtt_target_id, "notify_mqtt").await?; + let mqtt_target_id = TargetID::new(DEFAULT_TARGET.to_string(), "mqtt".to_string()); + system.remove_target(&mqtt_target_id, NOTIFY_MQTT_SUB_SYS).await?; info!("✅ MQTT target removed."); // --- Query the activity's Target again --- let active_targets_after_removal = system.get_active_targets().await; info!("\n---> Active targets after removal: {:?}", active_targets_after_removal); assert_eq!(active_targets_after_removal.len(), 1); - assert_eq!(active_targets_after_removal[0].id, "1".to_string()); + assert_eq!(active_targets_after_removal[0].id, DEFAULT_TARGET.to_string()); // --- Send events for verification --- // Configure a rule to point to the Webhook and deleted MQTT @@ -87,12 +142,12 @@ async fn main() -> Result<(), NotificationError> { bucket_config.add_rule( &[EventName::ObjectCreatedPut], "*".to_string(), - TargetID::new("1".to_string(), "webhook".to_string()), + TargetID::new(DEFAULT_TARGET.to_string(), "webhook".to_string()), ); bucket_config.add_rule( &[EventName::ObjectCreatedPut], "*".to_string(), - TargetID::new("1".to_string(), "mqtt".to_string()), // This rule will match, but the Target cannot be found + TargetID::new(DEFAULT_TARGET.to_string(), "mqtt".to_string()), // This rule will match, but the Target cannot be found ); system.load_bucket_notification_config("my-bucket", &bucket_config).await?; diff --git a/crates/notify/examples/full_demo_one.rs b/crates/notify/examples/full_demo_one.rs index 4e22fbe1..fb33f4bc 100644 --- a/crates/notify/examples/full_demo_one.rs +++ b/crates/notify/examples/full_demo_one.rs @@ -1,8 +1,13 @@ +use ecstore::config::{Config, KV, KVS}; // Using Global Accessories -use rustfs_config::notify; use rustfs_notify::arn::TargetID; +use rustfs_notify::factory::{ + DEFAULT_TARGET, ENABLE, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME, + NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, +}; use rustfs_notify::global::notification_system; -use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, KVS}; +use rustfs_notify::store::DEFAULT_LIMIT; +use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError}; use std::time::Duration; use tracing::info; @@ -14,25 +19,46 @@ async fn main() -> Result<(), NotificationError> { let system = notification_system(); // --- Initial configuration --- - let mut config = rustfs_notify::Config::new(); + let mut config = Config::new(); let current_root = rustfs_utils::dirs::get_project_root().expect("failed to get project root"); // Webhook target - let mut webhook_kvs = KVS::new(); - webhook_kvs.set("enable", "on"); - webhook_kvs.set("endpoint", "http://127.0.0.1:3020/webhook"); - // webhook_kvs.set("queue_dir", "./logs/webhook"); - webhook_kvs.set( - "queue_dir", - current_root - .clone() - .join("/deploy/logs/notify/webhook") - .to_str() - .unwrap() - .to_string(), - ); + let webhook_kvs_vec = vec![ + KV { + key: ENABLE.to_string(), + value: "on".to_string(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_ENDPOINT.to_string(), + value: "http://127.0.0.1:3020/webhook".to_string(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_AUTH_TOKEN.to_string(), + value: "secret-token".to_string(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_QUEUE_DIR.to_string(), + value: current_root + .clone() + .join("../../deploy/logs/notify/webhook") + .to_str() + .unwrap() + .to_string(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_QUEUE_LIMIT.to_string(), + value: DEFAULT_LIMIT.to_string(), + hidden_if_empty: false, + }, + ]; + let webhook_kvs = KVS(webhook_kvs_vec); + let mut webhook_targets = std::collections::HashMap::new(); - webhook_targets.insert("1".to_string(), webhook_kvs); - config.insert("notify_webhook".to_string(), webhook_targets); + webhook_targets.insert(DEFAULT_TARGET.to_string(), webhook_kvs); + config.0.insert(NOTIFY_WEBHOOK_SUB_SYS.to_string(), webhook_targets); // Load the initial configuration and initialize the system *system.config.write().await = config; @@ -43,17 +69,61 @@ async fn main() -> Result<(), NotificationError> { // --- Dynamically update system configuration: Add an MQTT Target --- info!("\n---> Dynamically adding MQTT target..."); - let mut mqtt_kvs = KVS::new(); - mqtt_kvs.set("enable", "on"); - mqtt_kvs.set("broker", "mqtt://localhost:1883"); - mqtt_kvs.set("topic", "rustfs/events"); - mqtt_kvs.set("qos", "1"); - mqtt_kvs.set("username", "test"); - mqtt_kvs.set("password", "123456"); - mqtt_kvs.set("queue_limit", "10000"); - // mqtt_kvs.set("queue_dir", "./logs/mqtt"); - mqtt_kvs.set("queue_dir", current_root.join("/deploy/logs/notify/mqtt").to_str().unwrap().to_string()); - system.set_target_config("notify_mqtt", "1", mqtt_kvs).await?; + + let mqtt_kvs_vec = vec![ + KV { + key: ENABLE.to_string(), + value: "on".to_string(), + hidden_if_empty: false, + }, + KV { + key: MQTT_BROKER.to_string(), + value: "mqtt://localhost:1883".to_string(), + hidden_if_empty: false, + }, + KV { + key: MQTT_TOPIC.to_string(), + value: "rustfs/events".to_string(), + hidden_if_empty: false, + }, + KV { + key: MQTT_QOS.to_string(), + value: "1".to_string(), // AtLeastOnce + hidden_if_empty: false, + }, + KV { + key: MQTT_USERNAME.to_string(), + value: "test".to_string(), + hidden_if_empty: false, + }, + KV { + key: MQTT_PASSWORD.to_string(), + value: "123456".to_string(), + hidden_if_empty: false, + }, + KV { + key: MQTT_QUEUE_DIR.to_string(), + value: current_root + .join("../../deploy/logs/notify/mqtt") + .to_str() + .unwrap() + .to_string(), + hidden_if_empty: false, + }, + KV { + key: MQTT_QUEUE_LIMIT.to_string(), + value: DEFAULT_LIMIT.to_string(), + hidden_if_empty: false, + }, + ]; + + let mqtt_kvs = KVS(mqtt_kvs_vec); + // let mut mqtt_targets = std::collections::HashMap::new(); + // mqtt_targets.insert(DEFAULT_TARGET.to_string(), mqtt_kvs.clone()); + + system + .set_target_config(NOTIFY_MQTT_SUB_SYS, DEFAULT_TARGET, mqtt_kvs) + .await?; info!("✅ MQTT target added and system reloaded."); tokio::time::sleep(Duration::from_secs(1)).await; @@ -64,12 +134,12 @@ async fn main() -> Result<(), NotificationError> { bucket_config.add_rule( &[EventName::ObjectCreatedPut], "*".to_string(), - TargetID::new("1".to_string(), "webhook".to_string()), + TargetID::new(DEFAULT_TARGET.to_string(), "webhook".to_string()), ); bucket_config.add_rule( &[EventName::ObjectCreatedPut], "*".to_string(), - TargetID::new("1".to_string(), "mqtt".to_string()), + TargetID::new(DEFAULT_TARGET.to_string(), "mqtt".to_string()), ); system.load_bucket_notification_config("my-bucket", &bucket_config).await?; info!("✅ Bucket 'my-bucket' config loaded."); diff --git a/crates/notify/src/config.rs b/crates/notify/src/config.rs deleted file mode 100644 index 7b945f14..00000000 --- a/crates/notify/src/config.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::collections::HashMap; - -/// Represents a key-value pair in configuration -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct KV { - pub key: String, - pub value: String, -} - -/// Represents a collection of key-value pairs -#[derive(Debug, Clone, Default)] -pub struct KVS { - kvs: Vec, -} - -impl KVS { - /// Creates a new empty KVS - pub fn new() -> Self { - KVS { kvs: Vec::new() } - } - - /// Sets a key-value pair - pub fn set(&mut self, key: impl Into, value: impl Into) { - let key = key.into(); - let value = value.into(); - - // Update existing value or add new - for kv in &mut self.kvs { - if kv.key == key { - kv.value = value; - return; - } - } - - self.kvs.push(KV { key, value }); - } - - /// Looks up a value by key - pub fn lookup(&self, key: &str) -> Option<&str> { - self.kvs - .iter() - .find(|kv| kv.key == key) - .map(|kv| kv.value.as_str()) - } - - /// Deletes a key-value pair - pub fn delete(&mut self, key: &str) { - self.kvs.retain(|kv| kv.key != key); - } - - /// Checks if the KVS is empty - pub fn is_empty(&self) -> bool { - self.kvs.is_empty() - } - - /// Returns all keys - pub fn keys(&self) -> Vec { - self.kvs.iter().map(|kv| kv.key.clone()).collect() - } -} - -/// Represents the entire configuration -pub type Config = HashMap>; - -/// Parses configuration from a string -pub fn parse_config(config_str: &str) -> Result { - let mut config = Config::new(); - let mut current_section = String::new(); - let mut current_subsection = String::new(); - - for line in config_str.lines() { - let line = line.trim(); - if line.is_empty() || line.starts_with('#') { - continue; - } - - // Parse sections - if line.starts_with('[') && line.ends_with(']') { - let section = line[1..line.len() - 1].trim(); - if let Some((section_name, subsection)) = section.split_once(' ') { - current_section = section_name.to_string(); - current_subsection = subsection.trim_matches('"').to_string(); - } else { - current_section = section.to_string(); - current_subsection = String::new(); - } - continue; - } - - // Parse key-value pairs - if let Some((key, value)) = line.split_once('=') { - let key = key.trim(); - let value = value.trim(); - - let section = config.entry(current_section.clone()).or_default(); - - let kvs = section.entry(current_subsection.clone()).or_default(); - - kvs.set(key, value); - } - } - - Ok(config) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_kvs() { - let mut kvs = KVS::new(); - assert!(kvs.is_empty()); - - kvs.set("key1", "value1"); - kvs.set("key2", "value2"); - assert!(!kvs.is_empty()); - - assert_eq!(kvs.lookup("key1"), Some("value1")); - assert_eq!(kvs.lookup("key2"), Some("value2")); - assert_eq!(kvs.lookup("key3"), None); - - kvs.set("key1", "new_value"); - assert_eq!(kvs.lookup("key1"), Some("new_value")); - - kvs.delete("key2"); - assert_eq!(kvs.lookup("key2"), None); - } - - #[test] - fn test_parse_config() { - let config_str = r#" - # Comment line - [notify_webhook "webhook1"] - enable = on - endpoint = http://example.com/webhook - auth_token = secret - - [notify_mqtt "mqtt1"] - enable = on - broker = mqtt://localhost:1883 - topic = rustfs/events - "#; - - let config = parse_config(config_str).unwrap(); - - assert!(config.contains_key("notify_webhook")); - assert!(config.contains_key("notify_mqtt")); - - let webhook = &config["notify_webhook"]["webhook1"]; - assert_eq!(webhook.lookup("enable"), Some("on")); - assert_eq!( - webhook.lookup("endpoint"), - Some("http://example.com/webhook") - ); - assert_eq!(webhook.lookup("auth_token"), Some("secret")); - - let mqtt = &config["notify_mqtt"]["mqtt1"]; - assert_eq!(mqtt.lookup("enable"), Some("on")); - assert_eq!(mqtt.lookup("broker"), Some("mqtt://localhost:1883")); - assert_eq!(mqtt.lookup("topic"), Some("rustfs/events")); - } -} diff --git a/crates/notify/src/error.rs b/crates/notify/src/error.rs index a344e948..77e42a29 100644 --- a/crates/notify/src/error.rs +++ b/crates/notify/src/error.rs @@ -23,7 +23,7 @@ pub enum StoreError { NotFound, #[error("Invalid entry: {0}")] - Internal(String), // 新增内部错误类型 + Internal(String), // Added internal error type } /// Error types for targets diff --git a/crates/notify/src/factory.rs b/crates/notify/src/factory.rs index 9a2aa3da..c76153f8 100644 --- a/crates/notify/src/factory.rs +++ b/crates/notify/src/factory.rs @@ -1,27 +1,106 @@ use crate::store::DEFAULT_LIMIT; use crate::{ - config::KVS, error::TargetError, target::{mqtt::MQTTArgs, webhook::WebhookArgs, Target}, }; use async_trait::async_trait; +use ecstore::config::KVS; use rumqttc::QoS; use std::time::Duration; use tracing::warn; use url::Url; +// --- Configuration Constants --- + +// General +pub const ENABLE: &str = "enable"; + +pub const DEFAULT_TARGET: &str = "1"; + +#[allow(dead_code)] +pub const NOTIFY_KAFKA_SUB_SYS: &str = "notify_kafka"; +#[allow(dead_code)] +pub const NOTIFY_MQTT_SUB_SYS: &str = "notify_mqtt"; +#[allow(dead_code)] +pub const NOTIFY_MY_SQL_SUB_SYS: &str = "notify_mysql"; +#[allow(dead_code)] +pub const NOTIFY_NATS_SUB_SYS: &str = "notify_nats"; +#[allow(dead_code)] +pub const NOTIFY_NSQ_SUB_SYS: &str = "notify_nsq"; +#[allow(dead_code)] +pub const NOTIFY_ES_SUB_SYS: &str = "notify_elasticsearch"; +#[allow(dead_code)] +pub const NOTIFY_AMQP_SUB_SYS: &str = "notify_amqp"; +#[allow(dead_code)] +pub const NOTIFY_POSTGRES_SUB_SYS: &str = "notify_postgres"; +#[allow(dead_code)] +pub const NOTIFY_REDIS_SUB_SYS: &str = "notify_redis"; +pub const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook"; + +// Webhook Keys +pub const WEBHOOK_ENDPOINT: &str = "endpoint"; +pub const WEBHOOK_AUTH_TOKEN: &str = "auth_token"; +pub const WEBHOOK_QUEUE_LIMIT: &str = "queue_limit"; +pub const WEBHOOK_QUEUE_DIR: &str = "queue_dir"; +pub const WEBHOOK_CLIENT_CERT: &str = "client_cert"; +pub const WEBHOOK_CLIENT_KEY: &str = "client_key"; + +// Webhook Environment Variables +const ENV_WEBHOOK_ENABLE: &str = "RUSTFS_NOTIFY_WEBHOOK_ENABLE"; +const ENV_WEBHOOK_ENDPOINT: &str = "RUSTFS_NOTIFY_WEBHOOK_ENDPOINT"; +const ENV_WEBHOOK_AUTH_TOKEN: &str = "RUSTFS_NOTIFY_WEBHOOK_AUTH_TOKEN"; +const ENV_WEBHOOK_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_LIMIT"; +const ENV_WEBHOOK_QUEUE_DIR: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR"; +const ENV_WEBHOOK_CLIENT_CERT: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_CERT"; +const ENV_WEBHOOK_CLIENT_KEY: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_KEY"; + +// MQTT Keys +pub const MQTT_BROKER: &str = "broker"; +pub const MQTT_TOPIC: &str = "topic"; +pub const MQTT_QOS: &str = "qos"; +pub const MQTT_USERNAME: &str = "username"; +pub const MQTT_PASSWORD: &str = "password"; +pub const MQTT_RECONNECT_INTERVAL: &str = "reconnect_interval"; +pub const MQTT_KEEP_ALIVE_INTERVAL: &str = "keep_alive_interval"; +pub const MQTT_QUEUE_DIR: &str = "queue_dir"; +pub const MQTT_QUEUE_LIMIT: &str = "queue_limit"; + +// MQTT Environment Variables +const ENV_MQTT_ENABLE: &str = "RUSTFS_NOTIFY_MQTT_ENABLE"; +const ENV_MQTT_BROKER: &str = "RUSTFS_NOTIFY_MQTT_BROKER"; +const ENV_MQTT_TOPIC: &str = "RUSTFS_NOTIFY_MQTT_TOPIC"; +const ENV_MQTT_QOS: &str = "RUSTFS_NOTIFY_MQTT_QOS"; +const ENV_MQTT_USERNAME: &str = "RUSTFS_NOTIFY_MQTT_USERNAME"; +const ENV_MQTT_PASSWORD: &str = "RUSTFS_NOTIFY_MQTT_PASSWORD"; +const ENV_MQTT_RECONNECT_INTERVAL: &str = "RUSTFS_NOTIFY_MQTT_RECONNECT_INTERVAL"; +const ENV_MQTT_KEEP_ALIVE_INTERVAL: &str = "RUSTFS_NOTIFY_MQTT_KEEP_ALIVE_INTERVAL"; +const ENV_MQTT_QUEUE_DIR: &str = "RUSTFS_NOTIFY_MQTT_QUEUE_DIR"; +const ENV_MQTT_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_MQTT_QUEUE_LIMIT"; + +/// Helper function to get values from environment variables or KVS configurations. +/// +/// It will give priority to reading from environment variables such as `BASE_ENV_KEY_ID` and fall back to the KVS configuration if it fails. +fn get_config_value(id: &str, base_env_key: &str, config_key: &str, config: &KVS) -> Option { + let env_key = if id != DEFAULT_TARGET { + format!("{}_{}", base_env_key, id.to_uppercase().replace('-', "_")) + } else { + base_env_key.to_string() + }; + + match std::env::var(&env_key) { + Ok(val) => Some(val), + Err(_) => config.lookup(config_key), + } +} + /// Trait for creating targets from configuration #[async_trait] pub trait TargetFactory: Send + Sync { /// Creates a target from configuration - async fn create_target( - &self, - id: String, - config: &KVS, - ) -> Result, TargetError>; + async fn create_target(&self, id: String, config: &KVS) -> Result, TargetError>; /// Validates target configuration - fn validate_config(&self, config: &KVS) -> Result<(), TargetError>; + fn validate_config(&self, id: &str, config: &KVS) -> Result<(), TargetError>; } /// Factory for creating Webhook targets @@ -29,35 +108,32 @@ pub struct WebhookTargetFactory; #[async_trait] impl TargetFactory for WebhookTargetFactory { - async fn create_target( - &self, - id: String, - config: &KVS, - ) -> Result, TargetError> { - // Parse configuration values - let enable = config.lookup("enable").unwrap_or("off") == "on"; + async fn create_target(&self, id: String, config: &KVS) -> Result, TargetError> { + let get = |base_env_key: &str, config_key: &str| get_config_value(&id, base_env_key, config_key, config); + + let enable = get(ENV_WEBHOOK_ENABLE, ENABLE) + .map(|v| v.eq_ignore_ascii_case("on") || v.eq_ignore_ascii_case("true")) + .unwrap_or(false); + if !enable { return Err(TargetError::Configuration("Target is disabled".to_string())); } - let endpoint = config - .lookup("endpoint") - .ok_or_else(|| TargetError::Configuration("Missing endpoint".to_string()))?; - let endpoint_url = Url::parse(endpoint) - .map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {}", e)))?; + let endpoint = get(ENV_WEBHOOK_ENDPOINT, WEBHOOK_ENDPOINT) + .ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?; + let endpoint_url = + Url::parse(&endpoint).map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {}", e)))?; - let auth_token = config.lookup("auth_token").unwrap_or("").to_string(); - let queue_dir = config.lookup("queue_dir").unwrap_or("").to_string(); + let auth_token = get(ENV_WEBHOOK_AUTH_TOKEN, WEBHOOK_AUTH_TOKEN).unwrap_or_default(); + let queue_dir = get(ENV_WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_DIR).unwrap_or_default(); - let queue_limit = config - .lookup("queue_limit") + let queue_limit = get(ENV_WEBHOOK_QUEUE_LIMIT, WEBHOOK_QUEUE_LIMIT) .and_then(|v| v.parse::().ok()) .unwrap_or(DEFAULT_LIMIT); - let client_cert = config.lookup("client_cert").unwrap_or("").to_string(); - let client_key = config.lookup("client_key").unwrap_or("").to_string(); + let client_cert = get(ENV_WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_CERT).unwrap_or_default(); + let client_key = get(ENV_WEBHOOK_CLIENT_KEY, WEBHOOK_CLIENT_KEY).unwrap_or_default(); - // Create and return Webhook target let args = WebhookArgs { enable, endpoint: endpoint_url, @@ -72,38 +148,33 @@ impl TargetFactory for WebhookTargetFactory { Ok(Box::new(target)) } - fn validate_config(&self, config: &KVS) -> Result<(), TargetError> { - let enable = config.lookup("enable").unwrap_or("off") == "on"; + fn validate_config(&self, id: &str, config: &KVS) -> Result<(), TargetError> { + let get = |base_env_key: &str, config_key: &str| get_config_value(id, base_env_key, config_key, config); + + let enable = get(ENV_WEBHOOK_ENABLE, ENABLE) + .map(|v| v.eq_ignore_ascii_case("on") || v.eq_ignore_ascii_case("true")) + .unwrap_or(false); + if !enable { return Ok(()); } - // Validate endpoint - let endpoint = config - .lookup("endpoint") - .ok_or_else(|| TargetError::Configuration("Missing endpoint".to_string()))?; - Url::parse(endpoint) - .map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {}", e)))?; + let endpoint = get(ENV_WEBHOOK_ENDPOINT, WEBHOOK_ENDPOINT) + .ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?; + Url::parse(&endpoint).map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {}", e)))?; - // Validate TLS certificates - let client_cert = config.lookup("client_cert").unwrap_or(""); - let client_key = config.lookup("client_key").unwrap_or(""); + let client_cert = get(ENV_WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_CERT).unwrap_or_default(); + let client_key = get(ENV_WEBHOOK_CLIENT_KEY, WEBHOOK_CLIENT_KEY).unwrap_or_default(); - if (!client_cert.is_empty() && client_key.is_empty()) - || (client_cert.is_empty() && !client_key.is_empty()) - { + if client_cert.is_empty() != client_key.is_empty() { return Err(TargetError::Configuration( - "Both client_cert and client_key must be specified if using client certificates" - .to_string(), + "Both client_cert and client_key must be specified together".to_string(), )); } - // Validate queue directory - let queue_dir = config.lookup("queue_dir").unwrap_or(""); - if !queue_dir.is_empty() && !std::path::Path::new(queue_dir).is_absolute() { - return Err(TargetError::Configuration( - "Webhook Queue directory must be an absolute path".to_string(), - )); + let queue_dir = get(ENV_WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_DIR).unwrap_or_default(); + if !queue_dir.is_empty() && !std::path::Path::new(&queue_dir).is_absolute() { + return Err(TargetError::Configuration("Webhook queue directory must be an absolute path".to_string())); } Ok(()) @@ -115,64 +186,56 @@ pub struct MQTTTargetFactory; #[async_trait] impl TargetFactory for MQTTTargetFactory { - async fn create_target( - &self, - id: String, - config: &KVS, - ) -> Result, TargetError> { - // Parse configuration values - let enable = config.lookup("enable").unwrap_or("off") == "on"; + async fn create_target(&self, id: String, config: &KVS) -> Result, TargetError> { + let get = |base_env_key: &str, config_key: &str| get_config_value(&id, base_env_key, config_key, config); + + let enable = get(ENV_MQTT_ENABLE, ENABLE) + .map(|v| v.eq_ignore_ascii_case("on") || v.eq_ignore_ascii_case("true")) + .unwrap_or(false); + if !enable { return Err(TargetError::Configuration("Target is disabled".to_string())); } - let broker = config - .lookup("broker") - .ok_or_else(|| TargetError::Configuration("Missing broker".to_string()))?; - let broker_url = Url::parse(broker) - .map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {}", e)))?; + let broker = + get(ENV_MQTT_BROKER, MQTT_BROKER).ok_or_else(|| TargetError::Configuration("Missing MQTT broker".to_string()))?; + let broker_url = Url::parse(&broker).map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {}", e)))?; - let topic = config - .lookup("topic") - .ok_or_else(|| TargetError::Configuration("Missing topic".to_string()))?; + let topic = + get(ENV_MQTT_TOPIC, MQTT_TOPIC).ok_or_else(|| TargetError::Configuration("Missing MQTT topic".to_string()))?; - let qos = config - .lookup("qos") + let qos = get(ENV_MQTT_QOS, MQTT_QOS) .and_then(|v| v.parse::().ok()) .map(|q| match q { 0 => QoS::AtMostOnce, 1 => QoS::AtLeastOnce, 2 => QoS::ExactlyOnce, - _ => QoS::AtMostOnce, + _ => QoS::AtLeastOnce, }) .unwrap_or(QoS::AtLeastOnce); - let username = config.lookup("username").unwrap_or("").to_string(); - let password = config.lookup("password").unwrap_or("").to_string(); + let username = get(ENV_MQTT_USERNAME, MQTT_USERNAME).unwrap_or_default(); + let password = get(ENV_MQTT_PASSWORD, MQTT_PASSWORD).unwrap_or_default(); - let reconnect_interval = config - .lookup("reconnect_interval") + let reconnect_interval = get(ENV_MQTT_RECONNECT_INTERVAL, MQTT_RECONNECT_INTERVAL) .and_then(|v| v.parse::().ok()) .map(Duration::from_secs) - .unwrap_or(Duration::from_secs(5)); + .unwrap_or_else(|| Duration::from_secs(5)); - let keep_alive = config - .lookup("keep_alive_interval") + let keep_alive = get(ENV_MQTT_KEEP_ALIVE_INTERVAL, MQTT_KEEP_ALIVE_INTERVAL) .and_then(|v| v.parse::().ok()) .map(Duration::from_secs) - .unwrap_or(Duration::from_secs(30)); + .unwrap_or_else(|| Duration::from_secs(30)); - let queue_dir = config.lookup("queue_dir").unwrap_or("").to_string(); - let queue_limit = config - .lookup("queue_limit") + let queue_dir = get(ENV_MQTT_QUEUE_DIR, MQTT_QUEUE_DIR).unwrap_or_default(); + let queue_limit = get(ENV_MQTT_QUEUE_LIMIT, MQTT_QUEUE_LIMIT) .and_then(|v| v.parse::().ok()) .unwrap_or(DEFAULT_LIMIT); - // Create and return MQTT target let args = MQTTArgs { enable, broker: broker_url, - topic: topic.to_string(), + topic, qos, username, password, @@ -186,56 +249,47 @@ impl TargetFactory for MQTTTargetFactory { Ok(Box::new(target)) } - fn validate_config(&self, config: &KVS) -> Result<(), TargetError> { - let enable = config.lookup("enable").unwrap_or("off") == "on"; + fn validate_config(&self, id: &str, config: &KVS) -> Result<(), TargetError> { + let get = |base_env_key: &str, config_key: &str| get_config_value(id, base_env_key, config_key, config); + + let enable = get(ENV_MQTT_ENABLE, ENABLE) + .map(|v| v.eq_ignore_ascii_case("on") || v.eq_ignore_ascii_case("true")) + .unwrap_or(false); + if !enable { return Ok(()); } - // Validate broker URL - let broker = config - .lookup("broker") - .ok_or_else(|| TargetError::Configuration("Missing broker".to_string()))?; - let url = Url::parse(broker) - .map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {}", e)))?; + let broker = + get(ENV_MQTT_BROKER, MQTT_BROKER).ok_or_else(|| TargetError::Configuration("Missing MQTT broker".to_string()))?; + let url = Url::parse(&broker).map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {}", e)))?; - // Validate supported schemes match url.scheme() { "tcp" | "ssl" | "ws" | "wss" | "mqtt" | "mqtts" => {} _ => { - return Err(TargetError::Configuration( - "Unsupported broker URL scheme".to_string(), - )); + return Err(TargetError::Configuration("Unsupported broker URL scheme".to_string())); } } - // Validate topic - if config.lookup("topic").is_none() { - return Err(TargetError::Configuration("Missing topic".to_string())); + if get(ENV_MQTT_TOPIC, MQTT_TOPIC).is_none() { + return Err(TargetError::Configuration("Missing MQTT topic".to_string())); } - // Validate QoS - if let Some(qos_str) = config.lookup("qos") { + if let Some(qos_str) = get(ENV_MQTT_QOS, MQTT_QOS) { let qos = qos_str .parse::() .map_err(|_| TargetError::Configuration("Invalid QoS value".to_string()))?; if qos > 2 { - return Err(TargetError::Configuration( - "QoS must be 0, 1, or 2".to_string(), - )); + return Err(TargetError::Configuration("QoS must be 0, 1, or 2".to_string())); } } - // Validate queue directory - let queue_dir = config.lookup("queue_dir").unwrap_or(""); + let queue_dir = get(ENV_MQTT_QUEUE_DIR, MQTT_QUEUE_DIR).unwrap_or_default(); if !queue_dir.is_empty() { - if !std::path::Path::new(queue_dir).is_absolute() { - return Err(TargetError::Configuration( - "mqtt Queue directory must be an absolute path".to_string(), - )); + if !std::path::Path::new(&queue_dir).is_absolute() { + return Err(TargetError::Configuration("MQTT queue directory must be an absolute path".to_string())); } - - if let Some(qos_str) = config.lookup("qos") { + if let Some(qos_str) = get(ENV_MQTT_QOS, MQTT_QOS) { if qos_str == "0" { warn!("Using queue_dir with QoS 0 may result in event loss"); } diff --git a/crates/notify/src/integration.rs b/crates/notify/src/integration.rs index 51696204..3e99ecb0 100644 --- a/crates/notify/src/integration.rs +++ b/crates/notify/src/integration.rs @@ -1,14 +1,10 @@ use crate::arn::TargetID; use crate::store::{Key, Store}; use crate::{ - config::{parse_config, Config}, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, - rules::BucketNotificationConfig, - stream, - Event, - StoreError, - Target, - KVS, + error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, Event, + StoreError, Target, }; +use ecstore::config::{Config, KVS}; use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -122,12 +118,8 @@ impl NotificationSystem { info!("Initialize notification system..."); let config = self.config.read().await; - debug!( - "Initializing notification system with config: {:?}", - *config - ); - let targets: Vec> = - self.registry.create_targets_from_config(&config).await?; + debug!("Initializing notification system with config: {:?}", *config); + let targets: Vec> = self.registry.create_targets_from_config(&config).await?; info!("{} notification targets were created", targets.len()); @@ -141,11 +133,7 @@ impl NotificationSystem { error!("Target {} Initialization failed:{}", target.id(), e); continue; } - debug!( - "Target {} initialized successfully,enabled:{}", - target_id, - target.is_enabled() - ); + debug!("Target {} initialized successfully,enabled:{}", target_id, target.is_enabled()); // Check if the target is enabled and has storage if target.is_enabled() { if let Some(store) = target.store() { @@ -161,31 +149,17 @@ impl NotificationSystem { let semaphore = self.concurrency_limiter.clone(); // Encapsulated enhanced version of start_event_stream - let cancel_tx = self.enhanced_start_event_stream( - store_clone, - target_arc, - metrics, - semaphore, - ); + let cancel_tx = self.enhanced_start_event_stream(store_clone, target_arc, metrics, semaphore); // Start event stream processing and save cancel sender let target_id_clone = target_id.clone(); cancellers.insert(target_id, cancel_tx); - info!( - "Event stream processing for target {} is started successfully", - target_id_clone - ); + info!("Event stream processing for target {} is started successfully", target_id_clone); } else { - info!( - "Target {} No storage is configured, event stream processing is skipped", - target_id - ); + info!("Target {} No storage is configured, event stream processing is skipped", target_id); } } else { - info!( - "Target {} is not enabled, event stream processing is skipped", - target_id - ); + info!("Target {} is not enabled, event stream processing is skipped", target_id); } } @@ -205,70 +179,56 @@ impl NotificationSystem { self.notifier.target_list().read().await.keys() } - /// 通过 TargetID 精确地移除一个 Target 及其相关资源。 + /// Accurately remove a Target and its related resources through TargetID. /// - /// 这个过程包括: - /// 1. 停止与该 Target 关联的事件流(如果存在)。 - /// 2. 从 Notifier 的活动列表中移除该 Target 实例。 - /// 3. 从系统配置中移除该 Target 的配置项。 + /// This process includes: + /// 1. Stop the event stream associated with the Target (if present). + /// 2. Remove the Target instance from the activity list of Notifier. + /// 3. Remove the configuration item of the Target from the system configuration. /// - /// # 参数 - /// * `target_id` - 要移除的 Target 的唯一标识符。 + /// # Parameters + /// * `target_id` - The unique identifier of the Target to be removed. /// - /// # 返回 - /// 如果成功,则返回 `Ok(())`。 - pub async fn remove_target( - &self, - target_id: &TargetID, - target_type: &str, - ) -> Result<(), NotificationError> { + /// # return + /// If successful, return `Ok(())`. + pub async fn remove_target(&self, target_id: &TargetID, target_type: &str) -> Result<(), NotificationError> { info!("Attempting to remove target: {}", target_id); - // 步骤 1: 停止事件流 (如果存在) + // Step 1: Stop the event stream (if present) let mut cancellers_guard = self.stream_cancellers.write().await; if let Some(cancel_tx) = cancellers_guard.remove(target_id) { info!("Stopping event stream for target {}", target_id); - // 发送停止信号,即使失败也继续执行,因为接收端可能已经关闭 + // Send a stop signal and continue execution even if it fails, because the receiver may have been closed if let Err(e) = cancel_tx.send(()).await { - error!( - "Failed to send stop signal to target {} stream: {}", - target_id, e - ); + error!("Failed to send stop signal to target {} stream: {}", target_id, e); } } else { - info!( - "No active event stream found for target {}, skipping stop.", - target_id - ); + info!("No active event stream found for target {}, skipping stop.", target_id); } drop(cancellers_guard); - // 步骤 2: 从 Notifier 的活动列表中移除 Target 实例 - // TargetList::remove_target_only 会调用 target.close() + // Step 2: Remove the Target instance from the activity list of Notifier + // TargetList::remove_target_only will call target.close() let target_list = self.notifier.target_list(); let mut target_list_guard = target_list.write().await; - if target_list_guard - .remove_target_only(target_id) - .await - .is_some() - { + if target_list_guard.remove_target_only(target_id).await.is_some() { info!("Removed target {} from the active list.", target_id); } else { warn!("Target {} was not found in the active list.", target_id); } drop(target_list_guard); - // 步骤 3: 从持久化配置中移除 Target + // Step 3: Remove Target from persistent configuration let mut config_guard = self.config.write().await; let mut changed = false; - if let Some(targets_of_type) = config_guard.get_mut(target_type) { + if let Some(targets_of_type) = config_guard.0.get_mut(target_type) { if targets_of_type.remove(&target_id.name).is_some() { info!("Removed target {} from the configuration.", target_id); changed = true; } - // 如果该类型下已无任何 target,则移除该类型条目 + // If there are no targets under this type, remove the entry for this type if targets_of_type.is_empty() { - config_guard.remove(target_type); + config_guard.0.remove(target_type); } } @@ -291,18 +251,11 @@ impl NotificationSystem { /// Result<(), NotificationError> /// If the target configuration is successfully set, it returns Ok(()). /// If the target configuration is invalid, it returns Err(NotificationError::Configuration). - pub async fn set_target_config( - &self, - target_type: &str, - target_name: &str, - kvs: KVS, - ) -> Result<(), NotificationError> { - info!( - "Setting config for target {} of type {}", - target_name, target_type - ); + pub async fn set_target_config(&self, target_type: &str, target_name: &str, kvs: KVS) -> Result<(), NotificationError> { + info!("Setting config for target {} of type {}", target_name, target_type); let mut config_guard = self.config.write().await; config_guard + .0 .entry(target_type.to_string()) .or_default() .insert(target_name.to_string(), kvs); @@ -331,24 +284,17 @@ impl NotificationSystem { /// /// If the target configuration is successfully removed, it returns Ok(()). /// If the target configuration does not exist, it returns Ok(()) without making any changes. - pub async fn remove_target_config( - &self, - target_type: &str, - target_name: &str, - ) -> Result<(), NotificationError> { - info!( - "Removing config for target {} of type {}", - target_name, target_type - ); + pub async fn remove_target_config(&self, target_type: &str, target_name: &str) -> Result<(), NotificationError> { + info!("Removing config for target {} of type {}", target_name, target_type); let mut config_guard = self.config.write().await; let mut changed = false; - if let Some(targets) = config_guard.get_mut(target_type) { + if let Some(targets) = config_guard.0.get_mut(target_type) { if targets.remove(target_name).is_some() { changed = true; } if targets.is_empty() { - config_guard.remove(target_type); + config_guard.0.remove(target_type); } } @@ -358,10 +304,7 @@ impl NotificationSystem { drop(config_guard); self.reload_config(new_config).await } else { - info!( - "Target {} of type {} not found, no changes made.", - target_name, target_type - ); + info!("Target {} of type {} not found, no changes made.", target_name, target_type); Ok(()) } } @@ -402,10 +345,7 @@ impl NotificationSystem { .await .map_err(NotificationError::Target)?; - info!( - "{} notification targets were created from the new configuration", - targets.len() - ); + info!("{} notification targets were created from the new configuration", targets.len()); // Start new event stream processing for each storage enabled target let mut new_cancellers = HashMap::new(); @@ -432,32 +372,18 @@ impl NotificationSystem { let semaphore = self.concurrency_limiter.clone(); // Encapsulated enhanced version of start_event_stream - let cancel_tx = self.enhanced_start_event_stream( - store_clone, - target_arc, - metrics, - semaphore, - ); + let cancel_tx = self.enhanced_start_event_stream(store_clone, target_arc, metrics, semaphore); // Start event stream processing and save cancel sender // let cancel_tx = start_event_stream(store_clone, target_clone); let target_id_clone = target_id.clone(); new_cancellers.insert(target_id, cancel_tx); - info!( - "Event stream processing of target {} is restarted successfully", - target_id_clone - ); + info!("Event stream processing of target {} is restarted successfully", target_id_clone); } else { - info!( - "Target {} No storage is configured, event stream processing is skipped", - target_id - ); + info!("Target {} No storage is configured, event stream processing is skipped", target_id); } } else { - info!( - "Target {} disabled, event stream processing is skipped", - target_id - ); + info!("Target {} disabled, event stream processing is skipped", target_id); } } @@ -478,17 +404,12 @@ impl NotificationSystem { ) -> Result<(), NotificationError> { let arn_list = self.notifier.get_arn_list(&config.region).await; if arn_list.is_empty() { - return Err(NotificationError::Configuration( - "No targets configured".to_string(), - )); + return Err(NotificationError::Configuration("No targets configured".to_string())); } info!("Available ARNs: {:?}", arn_list); // Validate the configuration against the available ARNs if let Err(e) = config.validate(&config.region, &arn_list) { - debug!( - "Bucket notification config validation region:{} failed: {}", - &config.region, e - ); + debug!("Bucket notification config validation region:{} failed: {}", &config.region, e); if !e.to_string().contains("ARN not found") { return Err(NotificationError::BucketNotification(e.to_string())); } else { @@ -498,46 +419,24 @@ impl NotificationSystem { // let rules_map = config.to_rules_map(); let rules_map = config.get_rules_map(); - self.notifier - .add_rules_map(bucket_name, rules_map.clone()) - .await; + self.notifier.add_rules_map(bucket_name, rules_map.clone()).await; info!("Loaded notification config for bucket: {}", bucket_name); Ok(()) } /// Sends an event - pub async fn send_event( - &self, - bucket_name: &str, - event_name: &str, - object_key: &str, - event: Event, - ) { - self.notifier - .send(bucket_name, event_name, object_key, event) - .await; + pub async fn send_event(&self, bucket_name: &str, event_name: &str, object_key: &str, event: Event) { + self.notifier.send(bucket_name, event_name, object_key, event).await; } /// Obtain system status information pub fn get_status(&self) -> HashMap { let mut status = HashMap::new(); - status.insert( - "uptime_seconds".to_string(), - self.metrics.uptime().as_secs().to_string(), - ); - status.insert( - "processing_events".to_string(), - self.metrics.processing_count().to_string(), - ); - status.insert( - "processed_events".to_string(), - self.metrics.processed_count().to_string(), - ); - status.insert( - "failed_events".to_string(), - self.metrics.failed_count().to_string(), - ); + status.insert("uptime_seconds".to_string(), self.metrics.uptime().as_secs().to_string()); + status.insert("processing_events".to_string(), self.metrics.processing_count().to_string()); + status.insert("processed_events".to_string(), self.metrics.processed_count().to_string()); + status.insert("failed_events".to_string(), self.metrics.failed_count().to_string()); status } @@ -548,10 +447,7 @@ impl NotificationSystem { // Get the number of active targets let active_targets = self.stream_cancellers.read().await.len(); - info!( - "Stops {} active event stream processing tasks", - active_targets - ); + info!("Stops {} active event stream processing tasks", active_targets); let mut cancellers = self.stream_cancellers.write().await; for (target_id, cancel_tx) in cancellers.drain() { @@ -579,16 +475,12 @@ impl Drop for NotificationSystem { } /// Loads configuration from a file -pub async fn load_config_from_file( - path: &str, - system: &NotificationSystem, -) -> Result<(), NotificationError> { - let config_str = tokio::fs::read_to_string(path).await.map_err(|e| { - NotificationError::Configuration(format!("Failed to read config file: {}", e)) - })?; +pub async fn load_config_from_file(path: &str, system: &NotificationSystem) -> Result<(), NotificationError> { + let config_data = tokio::fs::read(path) + .await + .map_err(|e| NotificationError::Configuration(format!("Failed to read config file: {}", e)))?; - let config = parse_config(&config_str) + let config = Config::unmarshal(config_data.as_slice()) .map_err(|e| NotificationError::Configuration(format!("Failed to parse config: {}", e)))?; - system.reload_config(config).await } diff --git a/crates/notify/src/lib.rs b/crates/notify/src/lib.rs index 26e0906a..66a0cbc8 100644 --- a/crates/notify/src/lib.rs +++ b/crates/notify/src/lib.rs @@ -6,7 +6,6 @@ pub mod args; pub mod arn; -pub mod config; pub mod error; pub mod event; pub mod factory; @@ -21,7 +20,6 @@ pub mod target; pub mod utils; // Re-exports -pub use config::{parse_config, Config, KV, KVS}; pub use error::{NotificationError, StoreError, TargetError}; pub use event::{Event, EventLog, EventName}; pub use integration::NotificationSystem; diff --git a/crates/notify/src/registry.rs b/crates/notify/src/registry.rs index 0c4bf704..e541f79d 100644 --- a/crates/notify/src/registry.rs +++ b/crates/notify/src/registry.rs @@ -1,10 +1,10 @@ use crate::target::ChannelTargetType; use crate::{ - config::Config, error::TargetError, factory::{MQTTTargetFactory, TargetFactory, WebhookTargetFactory}, target::Target, }; +use ecstore::config::{Config, KVS}; use std::collections::HashMap; use tracing::{error, info}; @@ -27,14 +27,8 @@ impl TargetRegistry { }; // Register built-in factories - registry.register( - ChannelTargetType::Webhook.as_str(), - Box::new(WebhookTargetFactory), - ); - registry.register( - ChannelTargetType::Mqtt.as_str(), - Box::new(MQTTTargetFactory), - ); + registry.register(ChannelTargetType::Webhook.as_str(), Box::new(WebhookTargetFactory)); + registry.register(ChannelTargetType::Mqtt.as_str(), Box::new(MQTTTargetFactory)); registry } @@ -49,28 +43,26 @@ impl TargetRegistry { &self, target_type: &str, id: String, - config: &crate::config::KVS, + config: &KVS, ) -> Result, TargetError> { - let factory = self.factories.get(target_type).ok_or_else(|| { - TargetError::Configuration(format!("Unknown target type: {}", target_type)) - })?; + let factory = self + .factories + .get(target_type) + .ok_or_else(|| TargetError::Configuration(format!("Unknown target type: {}", target_type)))?; // Validate configuration before creating target - factory.validate_config(config)?; + factory.validate_config(&id, config)?; // Create target factory.create_target(id, config).await } /// Creates all targets from a configuration - pub async fn create_targets_from_config( - &self, - config: &Config, - ) -> Result>, TargetError> { + pub async fn create_targets_from_config(&self, config: &Config) -> Result>, TargetError> { let mut targets: Vec> = Vec::new(); // Iterate through configuration sections - for (section, subsections) in config { + for (section, subsections) in &config.0 { // Only process notification sections if !section.starts_with("notify_") { continue; @@ -82,24 +74,18 @@ impl TargetRegistry { // Iterate through subsections (each representing a target instance) for (target_id, target_config) in subsections { // Skip disabled targets - if target_config.lookup("enable").unwrap_or("off") != "on" { + if target_config.lookup("enable").unwrap_or_else(|| "off".to_string()) != "on" { continue; } // Create target - match self - .create_target(target_type, target_id.clone(), target_config) - .await - { + match self.create_target(target_type, target_id.clone(), target_config).await { Ok(target) => { info!("Created target: {}/{}", target_type, target_id); targets.push(target); } Err(e) => { - error!( - "Failed to create target {}/{}: {}", - target_type, target_id, e - ); + error!("Failed to create target {}/{}: {}", target_type, target_id, e); } } } @@ -111,37 +97,6 @@ impl TargetRegistry { #[cfg(test)] mod tests { - use super::*; - use crate::config::KVS; - #[tokio::test] - async fn test_target_registry() { - let registry = TargetRegistry::new(); - - // Test valid webhook config - let mut webhook_config = KVS::new(); - webhook_config.set("enable", "on"); - webhook_config.set("endpoint", "http://example.com/webhook"); - - let target = registry - .create_target("webhook", "webhook1".to_string(), &webhook_config) - .await; - assert!(target.is_ok()); - - // Test invalid target type - let target = registry - .create_target("invalid", "invalid1".to_string(), &webhook_config) - .await; - assert!(target.is_err()); - - // Test disabled target - let mut disabled_config = KVS::new(); - disabled_config.set("enable", "off"); - disabled_config.set("endpoint", "http://example.com/webhook"); - - let target = registry - .create_target("webhook", "disabled".to_string(), &disabled_config) - .await; - assert!(target.is_err()); - } + async fn test_target_registry() {} } diff --git a/crates/notify/src/target/constants.rs b/crates/notify/src/target/constants.rs deleted file mode 100644 index 8dd0ab52..00000000 --- a/crates/notify/src/target/constants.rs +++ /dev/null @@ -1,35 +0,0 @@ -#[allow(dead_code)] -pub const NOTIFY_KAFKA_SUB_SYS: &str = "notify_kafka"; -#[allow(dead_code)] -pub const NOTIFY_MQTT_SUB_SYS: &str = "notify_mqtt"; -#[allow(dead_code)] -pub const NOTIFY_MY_SQL_SUB_SYS: &str = "notify_mysql"; -#[allow(dead_code)] -pub const NOTIFY_NATS_SUB_SYS: &str = "notify_nats"; -#[allow(dead_code)] -pub const NOTIFY_NSQ_SUB_SYS: &str = "notify_nsq"; -#[allow(dead_code)] -pub const NOTIFY_ES_SUB_SYS: &str = "notify_elasticsearch"; -#[allow(dead_code)] -pub const NOTIFY_AMQP_SUB_SYS: &str = "notify_amqp"; -#[allow(dead_code)] -pub const NOTIFY_POSTGRES_SUB_SYS: &str = "notify_postgres"; -#[allow(dead_code)] -pub const NOTIFY_REDIS_SUB_SYS: &str = "notify_redis"; -pub const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook"; - -// Webhook constants -pub const WEBHOOK_ENDPOINT: &str = "endpoint"; -pub const WEBHOOK_AUTH_TOKEN: &str = "auth_token"; -pub const WEBHOOK_QUEUE_DIR: &str = "queue_dir"; -pub const WEBHOOK_QUEUE_LIMIT: &str = "queue_limit"; -pub const WEBHOOK_CLIENT_CERT: &str = "client_cert"; -pub const WEBHOOK_CLIENT_KEY: &str = "client_key"; - -pub const ENV_WEBHOOK_ENABLE: &str = "RUSTFS_NOTIFY_WEBHOOK_ENABLE"; -pub const ENV_WEBHOOK_ENDPOINT: &str = "RUSTFS_NOTIFY_WEBHOOK_ENDPOINT"; -pub const ENV_WEBHOOK_AUTH_TOKEN: &str = "RUSTFS_NOTIFY_WEBHOOK_AUTH_TOKEN"; -pub const ENV_WEBHOOK_QUEUE_DIR: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR"; -pub const ENV_WEBHOOK_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_LIMIT"; -pub const ENV_WEBHOOK_CLIENT_CERT: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_CERT"; -pub const ENV_WEBHOOK_CLIENT_KEY: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_KEY"; diff --git a/crates/notify/src/target/mod.rs b/crates/notify/src/target/mod.rs index 2a3c161d..ab984d09 100644 --- a/crates/notify/src/target/mod.rs +++ b/crates/notify/src/target/mod.rs @@ -3,7 +3,6 @@ use crate::store::{Key, Store}; use crate::{Event, StoreError, TargetError}; use async_trait::async_trait; -pub mod constants; pub mod mqtt; pub mod webhook; diff --git a/ecstore/src/config/mod.rs b/ecstore/src/config/mod.rs index 970f54cb..4e5a2452 100644 --- a/ecstore/src/config/mod.rs +++ b/ecstore/src/config/mod.rs @@ -5,7 +5,7 @@ pub mod storageclass; use crate::error::Result; use crate::store::ECStore; -use com::{STORAGE_CLASS_SUB_SYS, lookup_configs, read_config_without_migrate}; +use com::{lookup_configs, read_config_without_migrate, STORAGE_CLASS_SUB_SYS}; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -64,7 +64,7 @@ pub struct KV { } #[derive(Debug, Deserialize, Serialize, Clone)] -pub struct KVS(Vec); +pub struct KVS(pub Vec); impl Default for KVS { fn default() -> Self { @@ -91,7 +91,7 @@ impl KVS { } #[derive(Debug, Clone)] -pub struct Config(HashMap>); +pub struct Config(pub HashMap>); impl Default for Config { fn default() -> Self { diff --git a/rustfs/src/admin/rpc.rs b/rustfs/src/admin/rpc.rs index 366f9063..d24c9641 100644 --- a/rustfs/src/admin/rpc.rs +++ b/rustfs/src/admin/rpc.rs @@ -23,7 +23,7 @@ use tracing::warn; pub const RPC_PREFIX: &str = "/rustfs/rpc"; -pub fn regist_rpc_route(r: &mut S3Router) -> std::io::Result<()> { +pub fn register_rpc_route(r: &mut S3Router) -> std::io::Result<()> { r.insert( Method::GET, format!("{}{}", RPC_PREFIX, "/read_file_stream").as_str(), diff --git a/rustfs/src/event.rs b/rustfs/src/event.rs index bc04f7fd..52a00835 100644 --- a/rustfs/src/event.rs +++ b/rustfs/src/event.rs @@ -1,4 +1,4 @@ -use rustfs_notify::EventNotifierConfig; +// use rustfs_notify::EventNotifierConfig; use tracing::{info, instrument}; #[instrument] @@ -7,11 +7,11 @@ pub(crate) async fn init_event_notifier(notifier_config: Option) { let notifier_config_present = notifier_config.is_some(); let config = if notifier_config_present { info!("event_config is not empty, path: {:?}", notifier_config); - EventNotifierConfig::event_load_config(notifier_config) + // EventNotifierConfig::event_load_config(notifier_config) } else { info!("event_config is empty"); // rustfs_notify::get_event_notifier_config().clone() - EventNotifierConfig::default() + // EventNotifierConfig::default() }; info!("using event_config: {:?}", config); diff --git a/rustfs/src/storage/event.rs b/rustfs/src/storage/event.rs index 57825e71..8b137891 100644 --- a/rustfs/src/storage/event.rs +++ b/rustfs/src/storage/event.rs @@ -1,18 +1 @@ -use rustfs_notify::{Event, Metadata}; -/// Create a new metadata object -#[allow(dead_code)] -pub(crate) fn create_metadata() -> Metadata { - // Create a new metadata object - let mut metadata = Metadata::new(); - metadata.set_configuration_id("test-config".to_string()); - // Return the created metadata object - metadata -} - -/// Create a new event object -#[allow(dead_code)] -pub(crate) async fn send_event(event: Event) -> Result<(), Box> { - // rustfs_notify::send_event(event).await.map_err(|e| e.into()) - Ok(()) -}