From df9347b34a6eb50916ba594450d03aba1c9cbde9 Mon Sep 17 00:00:00 2001 From: houseme Date: Fri, 6 Jun 2025 19:05:16 +0800 Subject: [PATCH] init config --- .gitignore | 1 + crates/config/src/config.rs | 84 ------- crates/config/src/event/adapters.rs | 27 --- crates/config/src/event/config.rs | 334 -------------------------- crates/config/src/event/kafka.rs | 29 --- crates/config/src/event/mod.rs | 5 - crates/config/src/event/mqtt.rs | 31 --- crates/config/src/event/webhook.rs | 51 ---- crates/config/src/lib.rs | 4 +- crates/config/src/notify/config.rs | 53 +++++ crates/config/src/notify/help.rs | 26 +++ crates/config/src/notify/legacy.rs | 347 ++++++++++++++++++++++++++++ crates/config/src/notify/mod.rs | 5 + crates/config/src/notify/mqtt.rs | 114 +++++++++ crates/config/src/notify/webhook.rs | 80 +++++++ crates/notify/src/adapter/mod.rs | 2 +- 16 files changed, 628 insertions(+), 565 deletions(-) delete mode 100644 crates/config/src/event/adapters.rs delete mode 100644 crates/config/src/event/config.rs delete mode 100644 crates/config/src/event/kafka.rs delete mode 100644 crates/config/src/event/mod.rs delete mode 100644 crates/config/src/event/mqtt.rs delete mode 100644 crates/config/src/event/webhook.rs create mode 100644 crates/config/src/notify/config.rs create mode 100644 crates/config/src/notify/help.rs create mode 100644 crates/config/src/notify/legacy.rs create mode 100644 crates/config/src/notify/mod.rs create mode 100644 crates/config/src/notify/mqtt.rs create mode 100644 crates/config/src/notify/webhook.rs diff --git a/.gitignore b/.gitignore index ef532b66..5ee8e2a0 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ deploy/certs/* .cargo profile.json .docker/openobserve-otel/data +rustfs \ No newline at end of file diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index b9d1f31e..e3fd7808 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -1,17 +1,14 @@ -use crate::event::config::NotifierConfig; use crate::ObservabilityConfig; /// RustFs configuration pub struct RustFsConfig { pub observability: ObservabilityConfig, - pub event: NotifierConfig, } impl RustFsConfig { pub fn new() -> Self { Self { observability: ObservabilityConfig::new(), - event: NotifierConfig::new(), } } } @@ -33,11 +30,6 @@ mod tests { // Verify that observability config is properly initialized assert!(!config.observability.sinks.is_empty(), "Observability sinks should not be empty"); assert!(config.observability.logger.is_some(), "Logger config should be present"); - - // Verify that event config is properly initialized - assert!(!config.event.store_path.is_empty(), "Event store path should not be empty"); - assert!(config.event.channel_capacity > 0, "Channel capacity should be positive"); - assert!(!config.event.adapters.is_empty(), "Event adapters should not be empty"); } #[test] @@ -50,11 +42,6 @@ mod tests { // Compare observability config assert_eq!(config.observability.sinks.len(), new_config.observability.sinks.len()); assert_eq!(config.observability.logger.is_some(), new_config.observability.logger.is_some()); - - // Compare event config - assert_eq!(config.event.store_path, new_config.event.store_path); - assert_eq!(config.event.channel_capacity, new_config.event.channel_capacity); - assert_eq!(config.event.adapters.len(), new_config.event.adapters.len()); } #[test] @@ -64,10 +51,6 @@ mod tests { // Modify observability config config.observability.sinks.clear(); - // Event config should remain unchanged - assert!(!config.event.adapters.is_empty(), "Event adapters should remain unchanged"); - assert!(config.event.channel_capacity > 0, "Channel capacity should remain unchanged"); - // Create new config to verify independence let new_config = RustFsConfig::new(); assert!(!new_config.observability.sinks.is_empty(), "New config should have default sinks"); @@ -88,38 +71,6 @@ mod tests { assert!(config.observability.otel.logger_level.is_some()); } - #[test] - fn test_rustfs_config_event_integration() { - let config = RustFsConfig::new(); - - // Test event config properties - assert!(!config.event.store_path.is_empty(), "Store path should not be empty"); - assert!( - config.event.channel_capacity >= 1000, - "Channel capacity should be reasonable for production" - ); - - // Test that store path is a valid path format - let store_path = &config.event.store_path; - assert!(!store_path.contains('\0'), "Store path should not contain null characters"); - - // Test adapters configuration - for adapter in &config.event.adapters { - // Each adapter should have a valid configuration - match adapter { - crate::event::adapters::AdapterConfig::Webhook(_) => { - // Webhook adapter should be properly configured - } - crate::event::adapters::AdapterConfig::Kafka(_) => { - // Kafka adapter should be properly configured - } - crate::event::adapters::AdapterConfig::Mqtt(_) => { - // MQTT adapter should be properly configured - } - } - } - } - #[test] fn test_rustfs_config_memory_usage() { // Test that config doesn't use excessive memory @@ -128,12 +79,8 @@ mod tests { // Basic memory usage checks assert!(std::mem::size_of_val(&config) < 10000, "Config should not use excessive memory"); - // Test that strings are not excessively long - assert!(config.event.store_path.len() < 1000, "Store path should not be excessively long"); - // Test that collections are reasonably sized assert!(config.observability.sinks.len() < 100, "Sinks collection should be reasonably sized"); - assert!(config.event.adapters.len() < 100, "Adapters collection should be reasonably sized"); } #[test] @@ -143,10 +90,6 @@ mod tests { // Test that observability config can be serialized (it has Serialize trait) let observability_json = serde_json::to_string(&config.observability); assert!(observability_json.is_ok(), "Observability config should be serializable"); - - // Test that event config can be serialized (it has Serialize trait) - let event_json = serde_json::to_string(&config.event); - assert!(event_json.is_ok(), "Event config should be serializable"); } #[test] @@ -160,11 +103,6 @@ mod tests { observability_debug.contains("ObservabilityConfig"), "Debug output should contain type name" ); - - // Test that event config has Debug trait - let event_debug = format!("{:?}", config.event); - assert!(!event_debug.is_empty(), "Event config should have debug output"); - assert!(event_debug.contains("NotifierConfig"), "Debug output should contain type name"); } #[test] @@ -174,27 +112,5 @@ mod tests { // Test that observability config can be cloned let observability_clone = config.observability.clone(); assert_eq!(observability_clone.sinks.len(), config.observability.sinks.len()); - - // Test that event config can be cloned - let event_clone = config.event.clone(); - assert_eq!(event_clone.store_path, config.event.store_path); - assert_eq!(event_clone.channel_capacity, config.event.channel_capacity); - } - - #[test] - fn test_rustfs_config_environment_independence() { - // Test that config creation doesn't depend on specific environment variables - // This test ensures the config can be created in any environment - - let config1 = RustFsConfig::new(); - let config2 = RustFsConfig::new(); - - // Both configs should have the same structure - assert_eq!(config1.observability.sinks.len(), config2.observability.sinks.len()); - assert_eq!(config1.event.adapters.len(), config2.event.adapters.len()); - - // Store paths should be consistent - assert_eq!(config1.event.store_path, config2.event.store_path); - assert_eq!(config1.event.channel_capacity, config2.event.channel_capacity); } } diff --git a/crates/config/src/event/adapters.rs b/crates/config/src/event/adapters.rs deleted file mode 100644 index d66bf19e..00000000 --- a/crates/config/src/event/adapters.rs +++ /dev/null @@ -1,27 +0,0 @@ -use crate::event::kafka::KafkaAdapter; -use crate::event::mqtt::MqttAdapter; -use crate::event::webhook::WebhookAdapter; -use serde::{Deserialize, Serialize}; - -/// Configuration for the notification system. -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum AdapterConfig { - Webhook(WebhookAdapter), - Kafka(KafkaAdapter), - Mqtt(MqttAdapter), -} - -impl AdapterConfig { - /// create a new configuration with default values - pub fn new() -> Self { - Self::Webhook(WebhookAdapter::new()) - } -} - -impl Default for AdapterConfig { - /// create a new configuration with default values - fn default() -> Self { - Self::new() - } -} diff --git a/crates/config/src/event/config.rs b/crates/config/src/event/config.rs deleted file mode 100644 index e72c4697..00000000 --- a/crates/config/src/event/config.rs +++ /dev/null @@ -1,334 +0,0 @@ -use crate::event::adapters::AdapterConfig; -use serde::{Deserialize, Serialize}; -use std::env; - -#[allow(dead_code)] -const DEFAULT_CONFIG_FILE: &str = "event"; - -/// Configuration for the notification system. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NotifierConfig { - #[serde(default = "default_store_path")] - pub store_path: String, - #[serde(default = "default_channel_capacity")] - pub channel_capacity: usize, - pub adapters: Vec, -} - -impl Default for NotifierConfig { - fn default() -> Self { - Self::new() - } -} - -impl NotifierConfig { - /// create a new configuration with default values - pub fn new() -> Self { - Self { - store_path: default_store_path(), - channel_capacity: default_channel_capacity(), - adapters: vec![AdapterConfig::new()], - } - } -} - -/// Provide temporary directories as default storage paths -fn default_store_path() -> String { - env::temp_dir().join("event-notification").to_string_lossy().to_string() -} - -/// Provides the recommended default channel capacity for high concurrency systems -fn default_channel_capacity() -> usize { - 10000 // Reasonable default values for high concurrency systems -} - -#[cfg(test)] -mod tests { - use super::*; - use std::path::Path; - - #[test] - fn test_notifier_config_new() { - let config = NotifierConfig::new(); - - // Verify store path is set - assert!(!config.store_path.is_empty(), "Store path should not be empty"); - assert!( - config.store_path.contains("event-notification"), - "Store path should contain event-notification" - ); - - // Verify channel capacity is reasonable - assert_eq!(config.channel_capacity, 10000, "Channel capacity should be 10000"); - assert!(config.channel_capacity > 0, "Channel capacity should be positive"); - - // Verify adapters are initialized - assert!(!config.adapters.is_empty(), "Adapters should not be empty"); - assert_eq!(config.adapters.len(), 1, "Should have exactly one default adapter"); - } - - #[test] - fn test_notifier_config_default() { - let config = NotifierConfig::default(); - let new_config = NotifierConfig::new(); - - // Default should be equivalent to new() - assert_eq!(config.store_path, new_config.store_path); - assert_eq!(config.channel_capacity, new_config.channel_capacity); - assert_eq!(config.adapters.len(), new_config.adapters.len()); - } - - #[test] - fn test_default_store_path() { - let store_path = default_store_path(); - - // Verify store path properties - assert!(!store_path.is_empty(), "Store path should not be empty"); - assert!(store_path.contains("event-notification"), "Store path should contain event-notification"); - - // Verify it's a valid path format - let path = Path::new(&store_path); - assert!(path.is_absolute() || path.is_relative(), "Store path should be a valid path"); - - // Verify it doesn't contain invalid characters - assert!(!store_path.contains('\0'), "Store path should not contain null characters"); - - // Verify it's based on temp directory - let temp_dir = env::temp_dir(); - let expected_path = temp_dir.join("event-notification"); - assert_eq!(store_path, expected_path.to_string_lossy().to_string()); - } - - #[test] - fn test_default_channel_capacity() { - let capacity = default_channel_capacity(); - - // Verify capacity is reasonable - assert_eq!(capacity, 10000, "Default capacity should be 10000"); - assert!(capacity > 0, "Capacity should be positive"); - assert!(capacity >= 1000, "Capacity should be at least 1000 for production use"); - assert!(capacity <= 1_000_000, "Capacity should not be excessively large"); - } - - #[test] - fn test_notifier_config_serialization() { - let config = NotifierConfig::new(); - - // Test serialization to JSON - let json_result = serde_json::to_string(&config); - assert!(json_result.is_ok(), "Config should be serializable to JSON"); - - let json_str = json_result.unwrap(); - assert!(!json_str.is_empty(), "Serialized JSON should not be empty"); - assert!(json_str.contains("store_path"), "JSON should contain store_path"); - assert!(json_str.contains("channel_capacity"), "JSON should contain channel_capacity"); - assert!(json_str.contains("adapters"), "JSON should contain adapters"); - - // Test deserialization from JSON - let deserialized_result: Result = serde_json::from_str(&json_str); - assert!(deserialized_result.is_ok(), "Config should be deserializable from JSON"); - - let deserialized_config = deserialized_result.unwrap(); - assert_eq!(deserialized_config.store_path, config.store_path); - assert_eq!(deserialized_config.channel_capacity, config.channel_capacity); - assert_eq!(deserialized_config.adapters.len(), config.adapters.len()); - } - - #[test] - fn test_notifier_config_serialization_with_defaults() { - // Test serialization with minimal JSON (using serde defaults) - let minimal_json = r#"{"adapters": []}"#; - - let deserialized_result: Result = serde_json::from_str(minimal_json); - assert!(deserialized_result.is_ok(), "Config should deserialize with defaults"); - - let config = deserialized_result.unwrap(); - assert_eq!(config.store_path, default_store_path(), "Should use default store path"); - assert_eq!(config.channel_capacity, default_channel_capacity(), "Should use default channel capacity"); - assert!(config.adapters.is_empty(), "Should have empty adapters as specified"); - } - - #[test] - fn test_notifier_config_debug_format() { - let config = NotifierConfig::new(); - - let debug_str = format!("{:?}", config); - assert!(!debug_str.is_empty(), "Debug output should not be empty"); - assert!(debug_str.contains("NotifierConfig"), "Debug output should contain struct name"); - assert!(debug_str.contains("store_path"), "Debug output should contain store_path field"); - assert!( - debug_str.contains("channel_capacity"), - "Debug output should contain channel_capacity field" - ); - assert!(debug_str.contains("adapters"), "Debug output should contain adapters field"); - } - - #[test] - fn test_notifier_config_clone() { - let config = NotifierConfig::new(); - let cloned_config = config.clone(); - - // Test that clone creates an independent copy - assert_eq!(cloned_config.store_path, config.store_path); - assert_eq!(cloned_config.channel_capacity, config.channel_capacity); - assert_eq!(cloned_config.adapters.len(), config.adapters.len()); - - // Verify they are independent (modifying one doesn't affect the other) - let mut modified_config = config.clone(); - modified_config.channel_capacity = 5000; - assert_ne!(modified_config.channel_capacity, config.channel_capacity); - assert_eq!(cloned_config.channel_capacity, config.channel_capacity); - } - - #[test] - fn test_notifier_config_modification() { - let mut config = NotifierConfig::new(); - - // Test modifying store path - let original_store_path = config.store_path.clone(); - config.store_path = "/custom/path".to_string(); - assert_ne!(config.store_path, original_store_path); - assert_eq!(config.store_path, "/custom/path"); - - // Test modifying channel capacity - let original_capacity = config.channel_capacity; - config.channel_capacity = 5000; - assert_ne!(config.channel_capacity, original_capacity); - assert_eq!(config.channel_capacity, 5000); - - // Test modifying adapters - let original_adapters_len = config.adapters.len(); - config.adapters.push(AdapterConfig::new()); - assert_eq!(config.adapters.len(), original_adapters_len + 1); - - // Test clearing adapters - config.adapters.clear(); - assert!(config.adapters.is_empty()); - } - - #[test] - fn test_notifier_config_adapters() { - let config = NotifierConfig::new(); - - // Test default adapter configuration - assert_eq!(config.adapters.len(), 1, "Should have exactly one default adapter"); - - // Test that we can add more adapters - let mut config_mut = config.clone(); - config_mut.adapters.push(AdapterConfig::new()); - assert_eq!(config_mut.adapters.len(), 2, "Should be able to add more adapters"); - - // Test adapter types - for adapter in &config.adapters { - match adapter { - AdapterConfig::Webhook(_) => { - // Webhook adapter should be properly configured - } - AdapterConfig::Kafka(_) => { - // Kafka adapter should be properly configured - } - AdapterConfig::Mqtt(_) => { - // MQTT adapter should be properly configured - } - } - } - } - - #[test] - fn test_notifier_config_edge_cases() { - // Test with empty adapters - let mut config = NotifierConfig::new(); - config.adapters.clear(); - assert!(config.adapters.is_empty(), "Adapters should be empty after clearing"); - - // Test serialization with empty adapters - let json_result = serde_json::to_string(&config); - assert!(json_result.is_ok(), "Config with empty adapters should be serializable"); - - // Test with very large channel capacity - config.channel_capacity = 1_000_000; - assert_eq!(config.channel_capacity, 1_000_000); - - // Test with minimum channel capacity - config.channel_capacity = 1; - assert_eq!(config.channel_capacity, 1); - - // Test with empty store path - config.store_path = String::new(); - assert!(config.store_path.is_empty()); - } - - #[test] - fn test_notifier_config_memory_efficiency() { - let config = NotifierConfig::new(); - - // Test that config doesn't use excessive memory - let config_size = std::mem::size_of_val(&config); - assert!(config_size < 5000, "Config should not use excessive memory"); - - // Test that store path is not excessively long - assert!(config.store_path.len() < 1000, "Store path should not be excessively long"); - - // Test that adapters collection is reasonably sized - assert!(config.adapters.len() < 100, "Adapters collection should be reasonably sized"); - } - - #[test] - fn test_notifier_config_consistency() { - // Create multiple configs and ensure they're consistent - let config1 = NotifierConfig::new(); - let config2 = NotifierConfig::new(); - - // Both configs should have the same default values - assert_eq!(config1.store_path, config2.store_path); - assert_eq!(config1.channel_capacity, config2.channel_capacity); - assert_eq!(config1.adapters.len(), config2.adapters.len()); - } - - #[test] - fn test_notifier_config_path_validation() { - let config = NotifierConfig::new(); - - // Test that store path is a valid path - let path = Path::new(&config.store_path); - - // Path should be valid - assert!(path.components().count() > 0, "Path should have components"); - - // Path should not contain invalid characters for most filesystems - assert!(!config.store_path.contains('\0'), "Path should not contain null characters"); - assert!(!config.store_path.contains('\x01'), "Path should not contain control characters"); - - // Path should be reasonable length - assert!(config.store_path.len() < 260, "Path should be shorter than Windows MAX_PATH"); - } - - #[test] - fn test_notifier_config_production_readiness() { - let config = NotifierConfig::new(); - - // Test production readiness criteria - assert!(config.channel_capacity >= 1000, "Channel capacity should be sufficient for production"); - assert!(!config.store_path.is_empty(), "Store path should be configured"); - assert!(!config.adapters.is_empty(), "At least one adapter should be configured"); - - // Test that configuration is reasonable for high-load scenarios - assert!(config.channel_capacity <= 10_000_000, "Channel capacity should not be excessive"); - - // Test that store path is in a reasonable location (temp directory) - assert!(config.store_path.contains("event-notification"), "Store path should be identifiable"); - } - - #[test] - fn test_default_config_file_constant() { - // Test that the constant is properly defined - assert_eq!(DEFAULT_CONFIG_FILE, "event"); - // DEFAULT_CONFIG_FILE is a const, so is_empty() check is redundant - // assert!(!DEFAULT_CONFIG_FILE.is_empty(), "Config file name should not be empty"); - assert!(!DEFAULT_CONFIG_FILE.contains('/'), "Config file name should not contain path separators"); - assert!( - !DEFAULT_CONFIG_FILE.contains('\\'), - "Config file name should not contain Windows path separators" - ); - } -} diff --git a/crates/config/src/event/kafka.rs b/crates/config/src/event/kafka.rs deleted file mode 100644 index 16411374..00000000 --- a/crates/config/src/event/kafka.rs +++ /dev/null @@ -1,29 +0,0 @@ -use serde::{Deserialize, Serialize}; - -/// Configuration for the Kafka adapter. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct KafkaAdapter { - pub brokers: String, - pub topic: String, - pub max_retries: u32, - pub timeout: u64, -} - -impl KafkaAdapter { - /// create a new configuration with default values - pub fn new() -> Self { - Self { - brokers: "localhost:9092".to_string(), - topic: "kafka_topic".to_string(), - max_retries: 3, - timeout: 1000, - } - } -} - -impl Default for KafkaAdapter { - /// create a new configuration with default values - fn default() -> Self { - Self::new() - } -} diff --git a/crates/config/src/event/mod.rs b/crates/config/src/event/mod.rs deleted file mode 100644 index 80dfd45e..00000000 --- a/crates/config/src/event/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub(crate) mod adapters; -pub(crate) mod config; -pub(crate) mod kafka; -pub(crate) mod mqtt; -pub(crate) mod webhook; diff --git a/crates/config/src/event/mqtt.rs b/crates/config/src/event/mqtt.rs deleted file mode 100644 index ee983532..00000000 --- a/crates/config/src/event/mqtt.rs +++ /dev/null @@ -1,31 +0,0 @@ -use serde::{Deserialize, Serialize}; - -/// Configuration for the MQTT adapter. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct MqttAdapter { - pub broker: String, - pub port: u16, - pub client_id: String, - pub topic: String, - pub max_retries: u32, -} - -impl MqttAdapter { - /// create a new configuration with default values - pub fn new() -> Self { - Self { - broker: "localhost".to_string(), - port: 1883, - client_id: "mqtt_client".to_string(), - topic: "mqtt_topic".to_string(), - max_retries: 3, - } - } -} - -impl Default for MqttAdapter { - /// create a new configuration with default values - fn default() -> Self { - Self::new() - } -} diff --git a/crates/config/src/event/webhook.rs b/crates/config/src/event/webhook.rs deleted file mode 100644 index 95b3adad..00000000 --- a/crates/config/src/event/webhook.rs +++ /dev/null @@ -1,51 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; - -/// Configuration for the notification system. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WebhookAdapter { - pub endpoint: String, - pub auth_token: Option, - pub custom_headers: Option>, - pub max_retries: u32, - pub timeout: u64, -} - -impl WebhookAdapter { - /// verify that the configuration is valid - pub fn validate(&self) -> Result<(), String> { - // verify that endpoint cannot be empty - if self.endpoint.trim().is_empty() { - return Err("Webhook endpoint cannot be empty".to_string()); - } - - // verification timeout must be reasonable - if self.timeout == 0 { - return Err("Webhook timeout must be greater than 0".to_string()); - } - - // Verify that the maximum number of retry is reasonable - if self.max_retries > 10 { - return Err("Maximum retry count cannot exceed 10".to_string()); - } - - Ok(()) - } - - /// Get the default configuration - pub fn new() -> Self { - Self { - endpoint: "".to_string(), - auth_token: None, - custom_headers: Some(HashMap::new()), - max_retries: 3, - timeout: 1000, - } - } -} - -impl Default for WebhookAdapter { - fn default() -> Self { - Self::new() - } -} diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index 44a9fe3c..2b2746ab 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -2,10 +2,8 @@ use crate::observability::config::ObservabilityConfig; mod config; mod constants; -mod event; +mod notify; mod observability; pub use config::RustFsConfig; pub use constants::app::*; - -pub use event::config::NotifierConfig; diff --git a/crates/config/src/notify/config.rs b/crates/config/src/notify/config.rs new file mode 100644 index 00000000..a3394803 --- /dev/null +++ b/crates/config/src/notify/config.rs @@ -0,0 +1,53 @@ +use crate::notify::mqtt::MQTTArgs; +use crate::notify::webhook::WebhookArgs; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Config - notification target configuration structure, holds +/// information about various notification targets. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NotifyConfig { + pub mqtt: HashMap, + pub webhook: HashMap, +} + +impl NotifyConfig { + /// Create a new configuration with default values. + pub fn new() -> Self { + let mut config = NotifyConfig { + webhook: HashMap::new(), + mqtt: HashMap::new(), + }; + // Insert default target for each backend + config.webhook.insert("1".to_string(), WebhookArgs::new()); + config.mqtt.insert("1".to_string(), MQTTArgs::new()); + config + } +} + +impl Default for NotifyConfig { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use crate::notify::config::NotifyConfig; + + #[test] + fn test_notify_config_new() { + let config = NotifyConfig::new(); + assert_eq!(config.webhook.len(), 1); + assert_eq!(config.mqtt.len(), 1); + assert!(config.webhook.contains_key("1")); + assert!(config.mqtt.contains_key("1")); + } + + #[test] + fn test_notify_config_default() { + let config = NotifyConfig::default(); + assert_eq!(config.webhook.len(), 1); + assert_eq!(config.mqtt.len(), 1); + } +} diff --git a/crates/config/src/notify/help.rs b/crates/config/src/notify/help.rs new file mode 100644 index 00000000..8bbe5860 --- /dev/null +++ b/crates/config/src/notify/help.rs @@ -0,0 +1,26 @@ +/// Help text for Webhook configuration. +pub const HELP_WEBHOOK: &str = r#" +Webhook configuration: +- enable: Enable or disable the webhook target (true/false) +- endpoint: Webhook server endpoint (e.g., http://localhost:8080/rustfs/events) +- auth_token: Opaque string or JWT authorization token (optional) +- queue_dir: Absolute path for persistent event queue (optional) +- queue_limit: Maximum number of events to queue (optional, default: 0) +- client_cert: Path to client certificate file (optional) +- client_key: Path to client private key file (optional) +"#; + +/// Help text for MQTT configuration. +pub const HELP_MQTT: &str = r#" +MQTT configuration: +- enable: Enable or disable the MQTT target (true/false) +- broker: MQTT broker address (e.g., tcp://localhost:1883) +- topic: MQTT topic (e.g., rustfs/events) +- qos: Quality of Service level (0, 1, or 2) +- username: Username for MQTT authentication (optional) +- password: Password for MQTT authentication (optional) +- reconnect_interval: Reconnect interval in milliseconds (optional) +- keep_alive_interval: Keep alive interval in milliseconds (optional) +- queue_dir: Absolute path for persistent event queue (optional) +- queue_limit: Maximum number of events to queue (optional, default: 0) +"#; \ No newline at end of file diff --git a/crates/config/src/notify/legacy.rs b/crates/config/src/notify/legacy.rs new file mode 100644 index 00000000..2a2de304 --- /dev/null +++ b/crates/config/src/notify/legacy.rs @@ -0,0 +1,347 @@ +use crate::notify::webhook::WebhookArgs; +use crate::notify::mqtt::MQTTArgs; +use std::collections::HashMap; + +/// Convert legacy webhook configuration to the new WebhookArgs struct. +pub fn convert_webhook_config(config: &HashMap) -> Result { + let mut args = WebhookArgs::new(); + args.enable = config.get("enable").map_or(false, |v| v == "true"); + args.endpoint = config.get("endpoint").unwrap_or(&"".to_string()).clone(); + args.auth_token = config.get("auth_token").unwrap_or(&"".to_string()).clone(); + args.queue_dir = config.get("queue_dir").unwrap_or(&"".to_string()).clone(); + args.queue_limit = config.get("queue_limit").map_or(0, |v| v.parse().unwrap_or(0)); + args.client_cert = config.get("client_cert").unwrap_or(&"".to_string()).clone(); + args.client_key = config.get("client_key").unwrap_or(&"".to_string()).clone(); + Ok(args) +} + +/// Convert legacy MQTT configuration to the new MQTTArgs struct. +pub fn convert_mqtt_config(config: &HashMap) -> Result { + let mut args = MQTTArgs::new(); + args.enable = config.get("enable").map_or(false, |v| v == "true"); + args.broker = config.get("broker").unwrap_or(&"".to_string()).clone(); + args.topic = config.get("topic").unwrap_or(&"".to_string()).clone(); + args.qos = config.get("qos").map_or(0, |v| v.parse().unwrap_or(0)); + args.username = config.get("username").unwrap_or(&"".to_string()).clone(); + args.password = config.get("password").unwrap_or(&"".to_string()).clone(); + args.reconnect_interval = config.get("reconnect_interval").map_or(0, |v| v.parse().unwrap_or(0)); + args.keep_alive_interval = config.get("keep_alive_interval").map_or(0, |v| v.parse().unwrap_or(0)); + args.queue_dir = config.get("queue_dir").unwrap_or(&"".to_string()).clone(); + args.queue_limit = config.get("queue_limit").map_or(0, |v| v.parse().unwrap_or(0)); + Ok(args) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_convert_webhook_config() { + let mut old_config = HashMap::new(); + old_config.insert("endpoint".to_string(), "http://example.com".to_string()); + old_config.insert("auth_token".to_string(), "token123".to_string()); + old_config.insert("max_retries".to_string(), "5".to_string()); + old_config.insert("timeout".to_string(), "2000".to_string()); + + let result = convert_webhook_config(&old_config); + assert!(result.is_ok()); + let args = result.unwrap(); + assert_eq!(args.endpoint, "http://example.com"); + assert_eq!(args.auth_token, Some("token123".to_string())); + assert_eq!(args.max_retries, 5); + assert_eq!(args.timeout, 2000); + } + + #[test] + fn test_convert_mqtt_config() { + let mut old_config = HashMap::new(); + old_config.insert("broker".to_string(), "mqtt.example.com".to_string()); + old_config.insert("port".to_string(), "1883".to_string()); + old_config.insert("client_id".to_string(), "test_client".to_string()); + old_config.insert("topic".to_string(), "test_topic".to_string()); + old_config.insert("max_retries".to_string(), "4".to_string()); + + let result = convert_mqtt_config(&old_config); + assert!(result.is_ok()); + let args = result.unwrap(); + assert_eq!(args.broker, "mqtt.example.com"); + assert_eq!(args.port, 1883); + assert_eq!(args.client_id, "test_client"); + assert_eq!(args.topic, "test_topic"); + assert_eq!(args.max_retries, 4); + } + + #[test] + fn test_convert_webhook_config_invalid() { + let mut old_config = HashMap::new(); + old_config.insert("max_retries".to_string(), "invalid".to_string()); + let result = convert_webhook_config(&old_config); + assert!(result.is_err()); + } + + #[test] + fn test_convert_mqtt_config_invalid() { + let mut old_config = HashMap::new(); + old_config.insert("port".to_string(), "invalid".to_string()); + let result = convert_mqtt_config(&old_config); + assert!(result.is_err()); + } + + #[test] + fn test_convert_empty_config() { + let empty_config = HashMap::new(); + let webhook_result = convert_webhook_config(&empty_config); + assert!(webhook_result.is_ok()); + let mqtt_result = convert_mqtt_config(&empty_config); + assert!(mqtt_result.is_ok()); + } + + #[test] + fn test_convert_partial_config() { + let mut partial_config = HashMap::new(); + partial_config.insert("endpoint".to_string(), "http://example.com".to_string()); + let webhook_result = convert_webhook_config(&partial_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.endpoint, "http://example.com"); + assert_eq!(args.max_retries, 3); // default value + assert_eq!(args.timeout, 1000); // default value + + let mut partial_mqtt_config = HashMap::new(); + partial_mqtt_config.insert("broker".to_string(), "mqtt.example.com".to_string()); + let mqtt_result = convert_mqtt_config(&partial_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.broker, "mqtt.example.com"); + assert_eq!(args.max_retries, 3); // default value + } + + #[test] + fn test_convert_config_with_extra_fields() { + let mut extra_config = HashMap::new(); + extra_config.insert("endpoint".to_string(), "http://example.com".to_string()); + extra_config.insert("extra_field".to_string(), "extra_value".to_string()); + let webhook_result = convert_webhook_config(&extra_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.endpoint, "http://example.com"); + + let mut extra_mqtt_config = HashMap::new(); + extra_mqtt_config.insert("broker".to_string(), "mqtt.example.com".to_string()); + extra_mqtt_config.insert("extra_field".to_string(), "extra_value".to_string()); + let mqtt_result = convert_mqtt_config(&extra_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.broker, "mqtt.example.com"); + } + + #[test] + fn test_convert_config_with_empty_values() { + let mut empty_values_config = HashMap::new(); + empty_values_config.insert("endpoint".to_string(), "".to_string()); + let webhook_result = convert_webhook_config(&empty_values_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.endpoint, ""); + + let mut empty_mqtt_config = HashMap::new(); + empty_mqtt_config.insert("broker".to_string(), "".to_string()); + let mqtt_result = convert_mqtt_config(&empty_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.broker, ""); + } + + #[test] + fn test_convert_config_with_whitespace_values() { + let mut whitespace_config = HashMap::new(); + whitespace_config.insert("endpoint".to_string(), " http://example.com ".to_string()); + let webhook_result = convert_webhook_config(&whitespace_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.endpoint, " http://example.com "); + + let mut whitespace_mqtt_config = HashMap::new(); + whitespace_mqtt_config.insert("broker".to_string(), " mqtt.example.com ".to_string()); + let mqtt_result = convert_mqtt_config(&whitespace_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.broker, " mqtt.example.com "); + } + + #[test] + fn test_convert_config_with_special_characters() { + let mut special_chars_config = HashMap::new(); + special_chars_config.insert("endpoint".to_string(), "http://example.com/path?param=value&other=123".to_string()); + let webhook_result = convert_webhook_config(&special_chars_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.endpoint, "http://example.com/path?param=value&other=123"); + + let mut special_chars_mqtt_config = HashMap::new(); + special_chars_mqtt_config.insert("broker".to_string(), "mqtt.example.com:1883".to_string()); + let mqtt_result = convert_mqtt_config(&special_chars_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.broker, "mqtt.example.com:1883"); + } + + #[test] + fn test_convert_config_with_numeric_values() { + let mut numeric_config = HashMap::new(); + numeric_config.insert("max_retries".to_string(), "5".to_string()); + numeric_config.insert("timeout".to_string(), "2000".to_string()); + let webhook_result = convert_webhook_config(&numeric_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.max_retries, 5); + assert_eq!(args.timeout, 2000); + + let mut numeric_mqtt_config = HashMap::new(); + numeric_mqtt_config.insert("port".to_string(), "1883".to_string()); + numeric_mqtt_config.insert("max_retries".to_string(), "4".to_string()); + let mqtt_result = convert_mqtt_config(&numeric_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.port, 1883); + assert_eq!(args.max_retries, 4); + } + + #[test] + fn test_convert_config_with_boolean_values() { + let mut boolean_config = HashMap::new(); + boolean_config.insert("enable".to_string(), "true".to_string()); + let webhook_result = convert_webhook_config(&boolean_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.endpoint, ""); // default value + + let mut boolean_mqtt_config = HashMap::new(); + boolean_mqtt_config.insert("enable".to_string(), "false".to_string()); + let mqtt_result = convert_mqtt_config(&boolean_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.broker, "localhost"); // default value + } + + #[test] + fn test_convert_config_with_null_values() { + let mut null_config = HashMap::new(); + null_config.insert("endpoint".to_string(), "null".to_string()); + let webhook_result = convert_webhook_config(&null_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.endpoint, "null"); + + let mut null_mqtt_config = HashMap::new(); + null_mqtt_config.insert("broker".to_string(), "null".to_string()); + let mqtt_result = convert_mqtt_config(&null_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.broker, "null"); + } + + #[test] + fn test_convert_config_with_duplicate_keys() { + let mut duplicate_config = HashMap::new(); + duplicate_config.insert("endpoint".to_string(), "http://example.com".to_string()); + duplicate_config.insert("endpoint".to_string(), "http://example.org".to_string()); + let webhook_result = convert_webhook_config(&duplicate_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.endpoint, "http://example.org"); // last value wins + + let mut duplicate_mqtt_config = HashMap::new(); + duplicate_mqtt_config.insert("broker".to_string(), "mqtt.example.com".to_string()); + duplicate_mqtt_config.insert("broker".to_string(), "mqtt.example.org".to_string()); + let mqtt_result = convert_mqtt_config(&duplicate_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.broker, "mqtt.example.org"); // last value wins + } + + #[test] + fn test_convert_config_with_case_insensitive_keys() { + let mut case_insensitive_config = HashMap::new(); + case_insensitive_config.insert("ENDPOINT".to_string(), "http://example.com".to_string()); + let webhook_result = convert_webhook_config(&case_insensitive_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.endpoint, "http://example.com"); + + let mut case_insensitive_mqtt_config = HashMap::new(); + case_insensitive_mqtt_config.insert("BROKER".to_string(), "mqtt.example.com".to_string()); + let mqtt_result = convert_mqtt_config(&case_insensitive_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.broker, "mqtt.example.com"); + } + + #[test] + fn test_convert_config_with_mixed_case_keys() { + let mut mixed_case_config = HashMap::new(); + mixed_case_config.insert("EndPoint".to_string(), "http://example.com".to_string()); + let webhook_result = convert_webhook_config(&mixed_case_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.endpoint, "http://example.com"); + + let mut mixed_case_mqtt_config = HashMap::new(); + mixed_case_mqtt_config.insert("BroKer".to_string(), "mqtt.example.com".to_string()); + let mqtt_result = convert_mqtt_config(&mixed_case_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.broker, "mqtt.example.com"); + } + + #[test] + fn test_convert_config_with_snake_case_keys() { + let mut snake_case_config = HashMap::new(); + snake_case_config.insert("end_point".to_string(), "http://example.com".to_string()); + let webhook_result = convert_webhook_config(&snake_case_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.endpoint, "http://example.com"); + + let mut snake_case_mqtt_config = HashMap::new(); + snake_case_mqtt_config.insert("bro_ker".to_string(), "mqtt.example.com".to_string()); + let mqtt_result = convert_mqtt_config(&snake_case_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.broker, "mqtt.example.com"); + } + + #[test] + fn test_convert_config_with_kebab_case_keys() { + let mut kebab_case_config = HashMap::new(); + kebab_case_config.insert("end-point".to_string(), "http://example.com".to_string()); + let webhook_result = convert_webhook_config(&kebab_case_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.endpoint, "http://example.com"); + + let mut kebab_case_mqtt_config = HashMap::new(); + kebab_case_mqtt_config.insert("bro-ker".to_string(), "mqtt.example.com".to_string()); + let mqtt_result = convert_mqtt_config(&kebab_case_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.broker, "mqtt.example.com"); + } + + #[test] + fn test_convert_config_with_camel_case_keys() { + let mut camel_case_config = HashMap::new(); + camel_case_config.insert("endPoint".to_string(), "http://example.com".to_string()); + let webhook_result = convert_webhook_config(&camel_case_config); + assert!(webhook_result.is_ok()); + let args = webhook_result.unwrap(); + assert_eq!(args.endpoint, "http://example.com"); + + let mut camel_case_mqtt_config = HashMap::new(); + camel_case_mqtt_config.insert("broKer".to_string(), "mqtt.example.com".to_string()); + let mqtt_result = convert_mqtt_config(&camel_case_mqtt_config); + assert!(mqtt_result.is_ok()); + let args = mqtt_result.unwrap(); + assert_eq!(args.broker, "mqtt.example.com"); + } +} diff --git a/crates/config/src/notify/mod.rs b/crates/config/src/notify/mod.rs new file mode 100644 index 00000000..a7cd30f0 --- /dev/null +++ b/crates/config/src/notify/mod.rs @@ -0,0 +1,5 @@ +pub mod config; +pub mod webhook; +pub mod mqtt; +pub mod help; +pub mod legacy; diff --git a/crates/config/src/notify/mqtt.rs b/crates/config/src/notify/mqtt.rs new file mode 100644 index 00000000..8b400f3f --- /dev/null +++ b/crates/config/src/notify/mqtt.rs @@ -0,0 +1,114 @@ +use serde::{Deserialize, Serialize}; + +/// MQTTArgs - MQTT target arguments. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MQTTArgs { + pub enable: bool, + pub broker: String, + pub topic: String, + pub qos: u8, + pub username: String, + pub password: String, + pub reconnect_interval: u64, + pub keep_alive_interval: u64, + #[serde(skip)] + pub root_cas: Option<()>, // Placeholder for *x509.CertPool + pub queue_dir: String, + pub queue_limit: u64, +} + +impl MQTTArgs { + /// Create a new configuration with default values. + pub fn new() -> Self { + Self { + enable: false, + broker: "".to_string(), + topic: "".to_string(), + qos: 0, + username: "".to_string(), + password: "".to_string(), + reconnect_interval: 0, + keep_alive_interval: 0, + root_cas: None, + queue_dir: "".to_string(), + queue_limit: 0, + } + } + + /// Validate MQTTArgs fields + pub fn validate(&self) -> Result<(), String> { + if !self.enable { + return Ok(()); + } + if self.broker.trim().is_empty() { + return Err("MQTT broker cannot be empty".to_string()); + } + if self.topic.trim().is_empty() { + return Err("MQTT topic cannot be empty".to_string()); + } + if self.queue_dir != "" && !self.queue_dir.starts_with('/') { + return Err("queueDir path should be absolute".to_string()); + } + if self.qos == 0 && self.queue_dir != "" { + return Err("qos should be set to 1 or 2 if queueDir is set".to_string()); + } + Ok(()) + } +} + +impl Default for MQTTArgs { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_mqtt_args_new() { + let args = MQTTArgs::new(); + assert_eq!(args.broker, ""); + assert_eq!(args.topic, ""); + assert_eq!(args.qos, 0); + assert_eq!(args.username, ""); + assert_eq!(args.password, ""); + assert_eq!(args.reconnect_interval, 0); + assert_eq!(args.keep_alive_interval, 0); + assert!(args.root_cas.is_none()); + assert_eq!(args.queue_dir, ""); + assert_eq!(args.queue_limit, 0); + assert!(!args.enable); + } + + #[test] + fn test_mqtt_args_validate() { + let mut args = MQTTArgs::new(); + assert!(args.validate().is_ok()); + args.broker = "".to_string(); + assert!(args.validate().is_err()); + args.broker = "localhost".to_string(); + args.topic = "".to_string(); + assert!(args.validate().is_err()); + args.topic = "mqtt_topic".to_string(); + args.reconnect_interval = 10001; + assert!(args.validate().is_err()); + args.reconnect_interval = 1000; + args.keep_alive_interval = 10001; + assert!(args.validate().is_err()); + args.keep_alive_interval = 1000; + args.queue_limit = 10001; + assert!(args.validate().is_err()); + args.queue_dir = "invalid_path".to_string(); + assert!(args.validate().is_err()); + args.queue_dir = "/valid_path".to_string(); + assert!(args.validate().is_ok()); + args.qos = 0; + assert!(args.validate().is_err()); + args.qos = 1; + assert!(args.validate().is_ok()); + args.qos = 2; + assert!(args.validate().is_ok()); + } +} diff --git a/crates/config/src/notify/webhook.rs b/crates/config/src/notify/webhook.rs new file mode 100644 index 00000000..eb664093 --- /dev/null +++ b/crates/config/src/notify/webhook.rs @@ -0,0 +1,80 @@ +use serde::{Deserialize, Serialize}; + +/// WebhookArgs - Webhook target arguments. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WebhookArgs { + pub enable: bool, + pub endpoint: String, + pub auth_token: String, + #[serde(skip)] + pub transport: Option<()>, // Placeholder for *http.Transport + pub queue_dir: String, + pub queue_limit: u64, + pub client_cert: String, + pub client_key: String, +} + +impl WebhookArgs { + /// Create a new configuration with default values. + pub fn new() -> Self { + Self { + enable: false, + endpoint: "".to_string(), + auth_token: "".to_string(), + transport: None, + queue_dir: "".to_string(), + queue_limit: 0, + client_cert: "".to_string(), + client_key: "".to_string(), + } + } + + /// Validate WebhookArgs fields + pub fn validate(&self) -> Result<(), String> { + if !self.enable { + return Ok(()); + } + if self.endpoint.trim().is_empty() { + return Err("endpoint empty".to_string()); + } + if self.queue_dir != "" && !self.queue_dir.starts_with('/') { + return Err("queueDir path should be absolute".to_string()); + } + if (self.client_cert != "" && self.client_key == "") || (self.client_cert == "" && self.client_key != "") { + return Err("cert and key must be specified as a pair".to_string()); + } + Ok(()) + } +} + +impl Default for WebhookArgs { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_webhook_args_new() { + let args = WebhookArgs::new(); + assert_eq!(args.endpoint, ""); + assert_eq!(args.auth_token, ""); + assert!(args.transport.is_none()); + assert_eq!(args.queue_dir, ""); + assert_eq!(args.queue_limit, 0); + assert_eq!(args.client_cert, ""); + assert_eq!(args.client_key, ""); + assert!(!args.enable); + } + + #[test] + fn test_webhook_args_validate() { + let mut args = WebhookArgs::new(); + assert!(args.validate().is_err()); + args.endpoint = "http://example.com".to_string(); + assert!(args.validate().is_ok()); + } +} diff --git a/crates/notify/src/adapter/mod.rs b/crates/notify/src/adapter/mod.rs index 89c01a42..d3389f7a 100644 --- a/crates/notify/src/adapter/mod.rs +++ b/crates/notify/src/adapter/mod.rs @@ -43,7 +43,7 @@ const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook"; /// # Example /// /// ``` -/// use rustfs_event::ChannelAdapterType; +/// use rustfs_notify::ChannelAdapterType; /// /// let adapter_type = ChannelAdapterType::Webhook; /// match adapter_type {