init config

This commit is contained in:
houseme
2025-06-06 19:05:16 +08:00
parent 3862e255f0
commit df9347b34a
16 changed files with 628 additions and 565 deletions

1
.gitignore vendored
View File

@@ -19,3 +19,4 @@ deploy/certs/*
.cargo
profile.json
.docker/openobserve-otel/data
rustfs

View File

@@ -1,17 +1,14 @@
use crate::event::config::NotifierConfig;
use crate::ObservabilityConfig;
/// RustFs configuration
pub struct RustFsConfig {
pub observability: ObservabilityConfig,
pub event: NotifierConfig,
}
impl RustFsConfig {
pub fn new() -> Self {
Self {
observability: ObservabilityConfig::new(),
event: NotifierConfig::new(),
}
}
}
@@ -33,11 +30,6 @@ mod tests {
// Verify that observability config is properly initialized
assert!(!config.observability.sinks.is_empty(), "Observability sinks should not be empty");
assert!(config.observability.logger.is_some(), "Logger config should be present");
// Verify that event config is properly initialized
assert!(!config.event.store_path.is_empty(), "Event store path should not be empty");
assert!(config.event.channel_capacity > 0, "Channel capacity should be positive");
assert!(!config.event.adapters.is_empty(), "Event adapters should not be empty");
}
#[test]
@@ -50,11 +42,6 @@ mod tests {
// Compare observability config
assert_eq!(config.observability.sinks.len(), new_config.observability.sinks.len());
assert_eq!(config.observability.logger.is_some(), new_config.observability.logger.is_some());
// Compare event config
assert_eq!(config.event.store_path, new_config.event.store_path);
assert_eq!(config.event.channel_capacity, new_config.event.channel_capacity);
assert_eq!(config.event.adapters.len(), new_config.event.adapters.len());
}
#[test]
@@ -64,10 +51,6 @@ mod tests {
// Modify observability config
config.observability.sinks.clear();
// Event config should remain unchanged
assert!(!config.event.adapters.is_empty(), "Event adapters should remain unchanged");
assert!(config.event.channel_capacity > 0, "Channel capacity should remain unchanged");
// Create new config to verify independence
let new_config = RustFsConfig::new();
assert!(!new_config.observability.sinks.is_empty(), "New config should have default sinks");
@@ -88,38 +71,6 @@ mod tests {
assert!(config.observability.otel.logger_level.is_some());
}
#[test]
fn test_rustfs_config_event_integration() {
let config = RustFsConfig::new();
// Test event config properties
assert!(!config.event.store_path.is_empty(), "Store path should not be empty");
assert!(
config.event.channel_capacity >= 1000,
"Channel capacity should be reasonable for production"
);
// Test that store path is a valid path format
let store_path = &config.event.store_path;
assert!(!store_path.contains('\0'), "Store path should not contain null characters");
// Test adapters configuration
for adapter in &config.event.adapters {
// Each adapter should have a valid configuration
match adapter {
crate::event::adapters::AdapterConfig::Webhook(_) => {
// Webhook adapter should be properly configured
}
crate::event::adapters::AdapterConfig::Kafka(_) => {
// Kafka adapter should be properly configured
}
crate::event::adapters::AdapterConfig::Mqtt(_) => {
// MQTT adapter should be properly configured
}
}
}
}
#[test]
fn test_rustfs_config_memory_usage() {
// Test that config doesn't use excessive memory
@@ -128,12 +79,8 @@ mod tests {
// Basic memory usage checks
assert!(std::mem::size_of_val(&config) < 10000, "Config should not use excessive memory");
// Test that strings are not excessively long
assert!(config.event.store_path.len() < 1000, "Store path should not be excessively long");
// Test that collections are reasonably sized
assert!(config.observability.sinks.len() < 100, "Sinks collection should be reasonably sized");
assert!(config.event.adapters.len() < 100, "Adapters collection should be reasonably sized");
}
#[test]
@@ -143,10 +90,6 @@ mod tests {
// Test that observability config can be serialized (it has Serialize trait)
let observability_json = serde_json::to_string(&config.observability);
assert!(observability_json.is_ok(), "Observability config should be serializable");
// Test that event config can be serialized (it has Serialize trait)
let event_json = serde_json::to_string(&config.event);
assert!(event_json.is_ok(), "Event config should be serializable");
}
#[test]
@@ -160,11 +103,6 @@ mod tests {
observability_debug.contains("ObservabilityConfig"),
"Debug output should contain type name"
);
// Test that event config has Debug trait
let event_debug = format!("{:?}", config.event);
assert!(!event_debug.is_empty(), "Event config should have debug output");
assert!(event_debug.contains("NotifierConfig"), "Debug output should contain type name");
}
#[test]
@@ -174,27 +112,5 @@ mod tests {
// Test that observability config can be cloned
let observability_clone = config.observability.clone();
assert_eq!(observability_clone.sinks.len(), config.observability.sinks.len());
// Test that event config can be cloned
let event_clone = config.event.clone();
assert_eq!(event_clone.store_path, config.event.store_path);
assert_eq!(event_clone.channel_capacity, config.event.channel_capacity);
}
#[test]
fn test_rustfs_config_environment_independence() {
// Test that config creation doesn't depend on specific environment variables
// This test ensures the config can be created in any environment
let config1 = RustFsConfig::new();
let config2 = RustFsConfig::new();
// Both configs should have the same structure
assert_eq!(config1.observability.sinks.len(), config2.observability.sinks.len());
assert_eq!(config1.event.adapters.len(), config2.event.adapters.len());
// Store paths should be consistent
assert_eq!(config1.event.store_path, config2.event.store_path);
assert_eq!(config1.event.channel_capacity, config2.event.channel_capacity);
}
}

View File

@@ -1,27 +0,0 @@
use crate::event::kafka::KafkaAdapter;
use crate::event::mqtt::MqttAdapter;
use crate::event::webhook::WebhookAdapter;
use serde::{Deserialize, Serialize};
/// Configuration for the notification system.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum AdapterConfig {
Webhook(WebhookAdapter),
Kafka(KafkaAdapter),
Mqtt(MqttAdapter),
}
impl AdapterConfig {
/// create a new configuration with default values
pub fn new() -> Self {
Self::Webhook(WebhookAdapter::new())
}
}
impl Default for AdapterConfig {
/// create a new configuration with default values
fn default() -> Self {
Self::new()
}
}

View File

@@ -1,334 +0,0 @@
use crate::event::adapters::AdapterConfig;
use serde::{Deserialize, Serialize};
use std::env;
#[allow(dead_code)]
const DEFAULT_CONFIG_FILE: &str = "event";
/// Configuration for the notification system.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NotifierConfig {
#[serde(default = "default_store_path")]
pub store_path: String,
#[serde(default = "default_channel_capacity")]
pub channel_capacity: usize,
pub adapters: Vec<AdapterConfig>,
}
impl Default for NotifierConfig {
fn default() -> Self {
Self::new()
}
}
impl NotifierConfig {
/// create a new configuration with default values
pub fn new() -> Self {
Self {
store_path: default_store_path(),
channel_capacity: default_channel_capacity(),
adapters: vec![AdapterConfig::new()],
}
}
}
/// Provide temporary directories as default storage paths
fn default_store_path() -> String {
env::temp_dir().join("event-notification").to_string_lossy().to_string()
}
/// Provides the recommended default channel capacity for high concurrency systems
fn default_channel_capacity() -> usize {
10000 // Reasonable default values for high concurrency systems
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
#[test]
fn test_notifier_config_new() {
let config = NotifierConfig::new();
// Verify store path is set
assert!(!config.store_path.is_empty(), "Store path should not be empty");
assert!(
config.store_path.contains("event-notification"),
"Store path should contain event-notification"
);
// Verify channel capacity is reasonable
assert_eq!(config.channel_capacity, 10000, "Channel capacity should be 10000");
assert!(config.channel_capacity > 0, "Channel capacity should be positive");
// Verify adapters are initialized
assert!(!config.adapters.is_empty(), "Adapters should not be empty");
assert_eq!(config.adapters.len(), 1, "Should have exactly one default adapter");
}
#[test]
fn test_notifier_config_default() {
let config = NotifierConfig::default();
let new_config = NotifierConfig::new();
// Default should be equivalent to new()
assert_eq!(config.store_path, new_config.store_path);
assert_eq!(config.channel_capacity, new_config.channel_capacity);
assert_eq!(config.adapters.len(), new_config.adapters.len());
}
#[test]
fn test_default_store_path() {
let store_path = default_store_path();
// Verify store path properties
assert!(!store_path.is_empty(), "Store path should not be empty");
assert!(store_path.contains("event-notification"), "Store path should contain event-notification");
// Verify it's a valid path format
let path = Path::new(&store_path);
assert!(path.is_absolute() || path.is_relative(), "Store path should be a valid path");
// Verify it doesn't contain invalid characters
assert!(!store_path.contains('\0'), "Store path should not contain null characters");
// Verify it's based on temp directory
let temp_dir = env::temp_dir();
let expected_path = temp_dir.join("event-notification");
assert_eq!(store_path, expected_path.to_string_lossy().to_string());
}
#[test]
fn test_default_channel_capacity() {
let capacity = default_channel_capacity();
// Verify capacity is reasonable
assert_eq!(capacity, 10000, "Default capacity should be 10000");
assert!(capacity > 0, "Capacity should be positive");
assert!(capacity >= 1000, "Capacity should be at least 1000 for production use");
assert!(capacity <= 1_000_000, "Capacity should not be excessively large");
}
#[test]
fn test_notifier_config_serialization() {
let config = NotifierConfig::new();
// Test serialization to JSON
let json_result = serde_json::to_string(&config);
assert!(json_result.is_ok(), "Config should be serializable to JSON");
let json_str = json_result.unwrap();
assert!(!json_str.is_empty(), "Serialized JSON should not be empty");
assert!(json_str.contains("store_path"), "JSON should contain store_path");
assert!(json_str.contains("channel_capacity"), "JSON should contain channel_capacity");
assert!(json_str.contains("adapters"), "JSON should contain adapters");
// Test deserialization from JSON
let deserialized_result: Result<NotifierConfig, _> = serde_json::from_str(&json_str);
assert!(deserialized_result.is_ok(), "Config should be deserializable from JSON");
let deserialized_config = deserialized_result.unwrap();
assert_eq!(deserialized_config.store_path, config.store_path);
assert_eq!(deserialized_config.channel_capacity, config.channel_capacity);
assert_eq!(deserialized_config.adapters.len(), config.adapters.len());
}
#[test]
fn test_notifier_config_serialization_with_defaults() {
// Test serialization with minimal JSON (using serde defaults)
let minimal_json = r#"{"adapters": []}"#;
let deserialized_result: Result<NotifierConfig, _> = serde_json::from_str(minimal_json);
assert!(deserialized_result.is_ok(), "Config should deserialize with defaults");
let config = deserialized_result.unwrap();
assert_eq!(config.store_path, default_store_path(), "Should use default store path");
assert_eq!(config.channel_capacity, default_channel_capacity(), "Should use default channel capacity");
assert!(config.adapters.is_empty(), "Should have empty adapters as specified");
}
#[test]
fn test_notifier_config_debug_format() {
let config = NotifierConfig::new();
let debug_str = format!("{:?}", config);
assert!(!debug_str.is_empty(), "Debug output should not be empty");
assert!(debug_str.contains("NotifierConfig"), "Debug output should contain struct name");
assert!(debug_str.contains("store_path"), "Debug output should contain store_path field");
assert!(
debug_str.contains("channel_capacity"),
"Debug output should contain channel_capacity field"
);
assert!(debug_str.contains("adapters"), "Debug output should contain adapters field");
}
#[test]
fn test_notifier_config_clone() {
let config = NotifierConfig::new();
let cloned_config = config.clone();
// Test that clone creates an independent copy
assert_eq!(cloned_config.store_path, config.store_path);
assert_eq!(cloned_config.channel_capacity, config.channel_capacity);
assert_eq!(cloned_config.adapters.len(), config.adapters.len());
// Verify they are independent (modifying one doesn't affect the other)
let mut modified_config = config.clone();
modified_config.channel_capacity = 5000;
assert_ne!(modified_config.channel_capacity, config.channel_capacity);
assert_eq!(cloned_config.channel_capacity, config.channel_capacity);
}
#[test]
fn test_notifier_config_modification() {
let mut config = NotifierConfig::new();
// Test modifying store path
let original_store_path = config.store_path.clone();
config.store_path = "/custom/path".to_string();
assert_ne!(config.store_path, original_store_path);
assert_eq!(config.store_path, "/custom/path");
// Test modifying channel capacity
let original_capacity = config.channel_capacity;
config.channel_capacity = 5000;
assert_ne!(config.channel_capacity, original_capacity);
assert_eq!(config.channel_capacity, 5000);
// Test modifying adapters
let original_adapters_len = config.adapters.len();
config.adapters.push(AdapterConfig::new());
assert_eq!(config.adapters.len(), original_adapters_len + 1);
// Test clearing adapters
config.adapters.clear();
assert!(config.adapters.is_empty());
}
#[test]
fn test_notifier_config_adapters() {
let config = NotifierConfig::new();
// Test default adapter configuration
assert_eq!(config.adapters.len(), 1, "Should have exactly one default adapter");
// Test that we can add more adapters
let mut config_mut = config.clone();
config_mut.adapters.push(AdapterConfig::new());
assert_eq!(config_mut.adapters.len(), 2, "Should be able to add more adapters");
// Test adapter types
for adapter in &config.adapters {
match adapter {
AdapterConfig::Webhook(_) => {
// Webhook adapter should be properly configured
}
AdapterConfig::Kafka(_) => {
// Kafka adapter should be properly configured
}
AdapterConfig::Mqtt(_) => {
// MQTT adapter should be properly configured
}
}
}
}
#[test]
fn test_notifier_config_edge_cases() {
// Test with empty adapters
let mut config = NotifierConfig::new();
config.adapters.clear();
assert!(config.adapters.is_empty(), "Adapters should be empty after clearing");
// Test serialization with empty adapters
let json_result = serde_json::to_string(&config);
assert!(json_result.is_ok(), "Config with empty adapters should be serializable");
// Test with very large channel capacity
config.channel_capacity = 1_000_000;
assert_eq!(config.channel_capacity, 1_000_000);
// Test with minimum channel capacity
config.channel_capacity = 1;
assert_eq!(config.channel_capacity, 1);
// Test with empty store path
config.store_path = String::new();
assert!(config.store_path.is_empty());
}
#[test]
fn test_notifier_config_memory_efficiency() {
let config = NotifierConfig::new();
// Test that config doesn't use excessive memory
let config_size = std::mem::size_of_val(&config);
assert!(config_size < 5000, "Config should not use excessive memory");
// Test that store path is not excessively long
assert!(config.store_path.len() < 1000, "Store path should not be excessively long");
// Test that adapters collection is reasonably sized
assert!(config.adapters.len() < 100, "Adapters collection should be reasonably sized");
}
#[test]
fn test_notifier_config_consistency() {
// Create multiple configs and ensure they're consistent
let config1 = NotifierConfig::new();
let config2 = NotifierConfig::new();
// Both configs should have the same default values
assert_eq!(config1.store_path, config2.store_path);
assert_eq!(config1.channel_capacity, config2.channel_capacity);
assert_eq!(config1.adapters.len(), config2.adapters.len());
}
#[test]
fn test_notifier_config_path_validation() {
let config = NotifierConfig::new();
// Test that store path is a valid path
let path = Path::new(&config.store_path);
// Path should be valid
assert!(path.components().count() > 0, "Path should have components");
// Path should not contain invalid characters for most filesystems
assert!(!config.store_path.contains('\0'), "Path should not contain null characters");
assert!(!config.store_path.contains('\x01'), "Path should not contain control characters");
// Path should be reasonable length
assert!(config.store_path.len() < 260, "Path should be shorter than Windows MAX_PATH");
}
#[test]
fn test_notifier_config_production_readiness() {
let config = NotifierConfig::new();
// Test production readiness criteria
assert!(config.channel_capacity >= 1000, "Channel capacity should be sufficient for production");
assert!(!config.store_path.is_empty(), "Store path should be configured");
assert!(!config.adapters.is_empty(), "At least one adapter should be configured");
// Test that configuration is reasonable for high-load scenarios
assert!(config.channel_capacity <= 10_000_000, "Channel capacity should not be excessive");
// Test that store path is in a reasonable location (temp directory)
assert!(config.store_path.contains("event-notification"), "Store path should be identifiable");
}
#[test]
fn test_default_config_file_constant() {
// Test that the constant is properly defined
assert_eq!(DEFAULT_CONFIG_FILE, "event");
// DEFAULT_CONFIG_FILE is a const, so is_empty() check is redundant
// assert!(!DEFAULT_CONFIG_FILE.is_empty(), "Config file name should not be empty");
assert!(!DEFAULT_CONFIG_FILE.contains('/'), "Config file name should not contain path separators");
assert!(
!DEFAULT_CONFIG_FILE.contains('\\'),
"Config file name should not contain Windows path separators"
);
}
}

View File

@@ -1,29 +0,0 @@
use serde::{Deserialize, Serialize};
/// Configuration for the Kafka adapter.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaAdapter {
pub brokers: String,
pub topic: String,
pub max_retries: u32,
pub timeout: u64,
}
impl KafkaAdapter {
/// create a new configuration with default values
pub fn new() -> Self {
Self {
brokers: "localhost:9092".to_string(),
topic: "kafka_topic".to_string(),
max_retries: 3,
timeout: 1000,
}
}
}
impl Default for KafkaAdapter {
/// create a new configuration with default values
fn default() -> Self {
Self::new()
}
}

View File

@@ -1,5 +0,0 @@
pub(crate) mod adapters;
pub(crate) mod config;
pub(crate) mod kafka;
pub(crate) mod mqtt;
pub(crate) mod webhook;

View File

@@ -1,31 +0,0 @@
use serde::{Deserialize, Serialize};
/// Configuration for the MQTT adapter.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MqttAdapter {
pub broker: String,
pub port: u16,
pub client_id: String,
pub topic: String,
pub max_retries: u32,
}
impl MqttAdapter {
/// create a new configuration with default values
pub fn new() -> Self {
Self {
broker: "localhost".to_string(),
port: 1883,
client_id: "mqtt_client".to_string(),
topic: "mqtt_topic".to_string(),
max_retries: 3,
}
}
}
impl Default for MqttAdapter {
/// create a new configuration with default values
fn default() -> Self {
Self::new()
}
}

View File

@@ -1,51 +0,0 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Configuration for the notification system.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookAdapter {
pub endpoint: String,
pub auth_token: Option<String>,
pub custom_headers: Option<HashMap<String, String>>,
pub max_retries: u32,
pub timeout: u64,
}
impl WebhookAdapter {
/// verify that the configuration is valid
pub fn validate(&self) -> Result<(), String> {
// verify that endpoint cannot be empty
if self.endpoint.trim().is_empty() {
return Err("Webhook endpoint cannot be empty".to_string());
}
// verification timeout must be reasonable
if self.timeout == 0 {
return Err("Webhook timeout must be greater than 0".to_string());
}
// Verify that the maximum number of retry is reasonable
if self.max_retries > 10 {
return Err("Maximum retry count cannot exceed 10".to_string());
}
Ok(())
}
/// Get the default configuration
pub fn new() -> Self {
Self {
endpoint: "".to_string(),
auth_token: None,
custom_headers: Some(HashMap::new()),
max_retries: 3,
timeout: 1000,
}
}
}
impl Default for WebhookAdapter {
fn default() -> Self {
Self::new()
}
}

View File

@@ -2,10 +2,8 @@ use crate::observability::config::ObservabilityConfig;
mod config;
mod constants;
mod event;
mod notify;
mod observability;
pub use config::RustFsConfig;
pub use constants::app::*;
pub use event::config::NotifierConfig;

View File

@@ -0,0 +1,53 @@
use crate::notify::mqtt::MQTTArgs;
use crate::notify::webhook::WebhookArgs;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Config - notification target configuration structure, holds
/// information about various notification targets.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NotifyConfig {
pub mqtt: HashMap<String, MQTTArgs>,
pub webhook: HashMap<String, WebhookArgs>,
}
impl NotifyConfig {
/// Create a new configuration with default values.
pub fn new() -> Self {
let mut config = NotifyConfig {
webhook: HashMap::new(),
mqtt: HashMap::new(),
};
// Insert default target for each backend
config.webhook.insert("1".to_string(), WebhookArgs::new());
config.mqtt.insert("1".to_string(), MQTTArgs::new());
config
}
}
impl Default for NotifyConfig {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use crate::notify::config::NotifyConfig;
#[test]
fn test_notify_config_new() {
let config = NotifyConfig::new();
assert_eq!(config.webhook.len(), 1);
assert_eq!(config.mqtt.len(), 1);
assert!(config.webhook.contains_key("1"));
assert!(config.mqtt.contains_key("1"));
}
#[test]
fn test_notify_config_default() {
let config = NotifyConfig::default();
assert_eq!(config.webhook.len(), 1);
assert_eq!(config.mqtt.len(), 1);
}
}

View File

@@ -0,0 +1,26 @@
/// Help text for Webhook configuration.
pub const HELP_WEBHOOK: &str = r#"
Webhook configuration:
- enable: Enable or disable the webhook target (true/false)
- endpoint: Webhook server endpoint (e.g., http://localhost:8080/rustfs/events)
- auth_token: Opaque string or JWT authorization token (optional)
- queue_dir: Absolute path for persistent event queue (optional)
- queue_limit: Maximum number of events to queue (optional, default: 0)
- client_cert: Path to client certificate file (optional)
- client_key: Path to client private key file (optional)
"#;
/// Help text for MQTT configuration.
pub const HELP_MQTT: &str = r#"
MQTT configuration:
- enable: Enable or disable the MQTT target (true/false)
- broker: MQTT broker address (e.g., tcp://localhost:1883)
- topic: MQTT topic (e.g., rustfs/events)
- qos: Quality of Service level (0, 1, or 2)
- username: Username for MQTT authentication (optional)
- password: Password for MQTT authentication (optional)
- reconnect_interval: Reconnect interval in milliseconds (optional)
- keep_alive_interval: Keep alive interval in milliseconds (optional)
- queue_dir: Absolute path for persistent event queue (optional)
- queue_limit: Maximum number of events to queue (optional, default: 0)
"#;

View File

@@ -0,0 +1,347 @@
use crate::notify::webhook::WebhookArgs;
use crate::notify::mqtt::MQTTArgs;
use std::collections::HashMap;
/// Convert legacy webhook configuration to the new WebhookArgs struct.
pub fn convert_webhook_config(config: &HashMap<String, String>) -> Result<WebhookArgs, String> {
let mut args = WebhookArgs::new();
args.enable = config.get("enable").map_or(false, |v| v == "true");
args.endpoint = config.get("endpoint").unwrap_or(&"".to_string()).clone();
args.auth_token = config.get("auth_token").unwrap_or(&"".to_string()).clone();
args.queue_dir = config.get("queue_dir").unwrap_or(&"".to_string()).clone();
args.queue_limit = config.get("queue_limit").map_or(0, |v| v.parse().unwrap_or(0));
args.client_cert = config.get("client_cert").unwrap_or(&"".to_string()).clone();
args.client_key = config.get("client_key").unwrap_or(&"".to_string()).clone();
Ok(args)
}
/// Convert legacy MQTT configuration to the new MQTTArgs struct.
pub fn convert_mqtt_config(config: &HashMap<String, String>) -> Result<MQTTArgs, String> {
let mut args = MQTTArgs::new();
args.enable = config.get("enable").map_or(false, |v| v == "true");
args.broker = config.get("broker").unwrap_or(&"".to_string()).clone();
args.topic = config.get("topic").unwrap_or(&"".to_string()).clone();
args.qos = config.get("qos").map_or(0, |v| v.parse().unwrap_or(0));
args.username = config.get("username").unwrap_or(&"".to_string()).clone();
args.password = config.get("password").unwrap_or(&"".to_string()).clone();
args.reconnect_interval = config.get("reconnect_interval").map_or(0, |v| v.parse().unwrap_or(0));
args.keep_alive_interval = config.get("keep_alive_interval").map_or(0, |v| v.parse().unwrap_or(0));
args.queue_dir = config.get("queue_dir").unwrap_or(&"".to_string()).clone();
args.queue_limit = config.get("queue_limit").map_or(0, |v| v.parse().unwrap_or(0));
Ok(args)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_convert_webhook_config() {
let mut old_config = HashMap::new();
old_config.insert("endpoint".to_string(), "http://example.com".to_string());
old_config.insert("auth_token".to_string(), "token123".to_string());
old_config.insert("max_retries".to_string(), "5".to_string());
old_config.insert("timeout".to_string(), "2000".to_string());
let result = convert_webhook_config(&old_config);
assert!(result.is_ok());
let args = result.unwrap();
assert_eq!(args.endpoint, "http://example.com");
assert_eq!(args.auth_token, Some("token123".to_string()));
assert_eq!(args.max_retries, 5);
assert_eq!(args.timeout, 2000);
}
#[test]
fn test_convert_mqtt_config() {
let mut old_config = HashMap::new();
old_config.insert("broker".to_string(), "mqtt.example.com".to_string());
old_config.insert("port".to_string(), "1883".to_string());
old_config.insert("client_id".to_string(), "test_client".to_string());
old_config.insert("topic".to_string(), "test_topic".to_string());
old_config.insert("max_retries".to_string(), "4".to_string());
let result = convert_mqtt_config(&old_config);
assert!(result.is_ok());
let args = result.unwrap();
assert_eq!(args.broker, "mqtt.example.com");
assert_eq!(args.port, 1883);
assert_eq!(args.client_id, "test_client");
assert_eq!(args.topic, "test_topic");
assert_eq!(args.max_retries, 4);
}
#[test]
fn test_convert_webhook_config_invalid() {
let mut old_config = HashMap::new();
old_config.insert("max_retries".to_string(), "invalid".to_string());
let result = convert_webhook_config(&old_config);
assert!(result.is_err());
}
#[test]
fn test_convert_mqtt_config_invalid() {
let mut old_config = HashMap::new();
old_config.insert("port".to_string(), "invalid".to_string());
let result = convert_mqtt_config(&old_config);
assert!(result.is_err());
}
#[test]
fn test_convert_empty_config() {
let empty_config = HashMap::new();
let webhook_result = convert_webhook_config(&empty_config);
assert!(webhook_result.is_ok());
let mqtt_result = convert_mqtt_config(&empty_config);
assert!(mqtt_result.is_ok());
}
#[test]
fn test_convert_partial_config() {
let mut partial_config = HashMap::new();
partial_config.insert("endpoint".to_string(), "http://example.com".to_string());
let webhook_result = convert_webhook_config(&partial_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.endpoint, "http://example.com");
assert_eq!(args.max_retries, 3); // default value
assert_eq!(args.timeout, 1000); // default value
let mut partial_mqtt_config = HashMap::new();
partial_mqtt_config.insert("broker".to_string(), "mqtt.example.com".to_string());
let mqtt_result = convert_mqtt_config(&partial_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.broker, "mqtt.example.com");
assert_eq!(args.max_retries, 3); // default value
}
#[test]
fn test_convert_config_with_extra_fields() {
let mut extra_config = HashMap::new();
extra_config.insert("endpoint".to_string(), "http://example.com".to_string());
extra_config.insert("extra_field".to_string(), "extra_value".to_string());
let webhook_result = convert_webhook_config(&extra_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.endpoint, "http://example.com");
let mut extra_mqtt_config = HashMap::new();
extra_mqtt_config.insert("broker".to_string(), "mqtt.example.com".to_string());
extra_mqtt_config.insert("extra_field".to_string(), "extra_value".to_string());
let mqtt_result = convert_mqtt_config(&extra_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.broker, "mqtt.example.com");
}
#[test]
fn test_convert_config_with_empty_values() {
let mut empty_values_config = HashMap::new();
empty_values_config.insert("endpoint".to_string(), "".to_string());
let webhook_result = convert_webhook_config(&empty_values_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.endpoint, "");
let mut empty_mqtt_config = HashMap::new();
empty_mqtt_config.insert("broker".to_string(), "".to_string());
let mqtt_result = convert_mqtt_config(&empty_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.broker, "");
}
#[test]
fn test_convert_config_with_whitespace_values() {
let mut whitespace_config = HashMap::new();
whitespace_config.insert("endpoint".to_string(), " http://example.com ".to_string());
let webhook_result = convert_webhook_config(&whitespace_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.endpoint, " http://example.com ");
let mut whitespace_mqtt_config = HashMap::new();
whitespace_mqtt_config.insert("broker".to_string(), " mqtt.example.com ".to_string());
let mqtt_result = convert_mqtt_config(&whitespace_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.broker, " mqtt.example.com ");
}
#[test]
fn test_convert_config_with_special_characters() {
let mut special_chars_config = HashMap::new();
special_chars_config.insert("endpoint".to_string(), "http://example.com/path?param=value&other=123".to_string());
let webhook_result = convert_webhook_config(&special_chars_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.endpoint, "http://example.com/path?param=value&other=123");
let mut special_chars_mqtt_config = HashMap::new();
special_chars_mqtt_config.insert("broker".to_string(), "mqtt.example.com:1883".to_string());
let mqtt_result = convert_mqtt_config(&special_chars_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.broker, "mqtt.example.com:1883");
}
#[test]
fn test_convert_config_with_numeric_values() {
let mut numeric_config = HashMap::new();
numeric_config.insert("max_retries".to_string(), "5".to_string());
numeric_config.insert("timeout".to_string(), "2000".to_string());
let webhook_result = convert_webhook_config(&numeric_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.max_retries, 5);
assert_eq!(args.timeout, 2000);
let mut numeric_mqtt_config = HashMap::new();
numeric_mqtt_config.insert("port".to_string(), "1883".to_string());
numeric_mqtt_config.insert("max_retries".to_string(), "4".to_string());
let mqtt_result = convert_mqtt_config(&numeric_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.port, 1883);
assert_eq!(args.max_retries, 4);
}
#[test]
fn test_convert_config_with_boolean_values() {
let mut boolean_config = HashMap::new();
boolean_config.insert("enable".to_string(), "true".to_string());
let webhook_result = convert_webhook_config(&boolean_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.endpoint, ""); // default value
let mut boolean_mqtt_config = HashMap::new();
boolean_mqtt_config.insert("enable".to_string(), "false".to_string());
let mqtt_result = convert_mqtt_config(&boolean_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.broker, "localhost"); // default value
}
#[test]
fn test_convert_config_with_null_values() {
let mut null_config = HashMap::new();
null_config.insert("endpoint".to_string(), "null".to_string());
let webhook_result = convert_webhook_config(&null_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.endpoint, "null");
let mut null_mqtt_config = HashMap::new();
null_mqtt_config.insert("broker".to_string(), "null".to_string());
let mqtt_result = convert_mqtt_config(&null_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.broker, "null");
}
#[test]
fn test_convert_config_with_duplicate_keys() {
let mut duplicate_config = HashMap::new();
duplicate_config.insert("endpoint".to_string(), "http://example.com".to_string());
duplicate_config.insert("endpoint".to_string(), "http://example.org".to_string());
let webhook_result = convert_webhook_config(&duplicate_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.endpoint, "http://example.org"); // last value wins
let mut duplicate_mqtt_config = HashMap::new();
duplicate_mqtt_config.insert("broker".to_string(), "mqtt.example.com".to_string());
duplicate_mqtt_config.insert("broker".to_string(), "mqtt.example.org".to_string());
let mqtt_result = convert_mqtt_config(&duplicate_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.broker, "mqtt.example.org"); // last value wins
}
#[test]
fn test_convert_config_with_case_insensitive_keys() {
let mut case_insensitive_config = HashMap::new();
case_insensitive_config.insert("ENDPOINT".to_string(), "http://example.com".to_string());
let webhook_result = convert_webhook_config(&case_insensitive_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.endpoint, "http://example.com");
let mut case_insensitive_mqtt_config = HashMap::new();
case_insensitive_mqtt_config.insert("BROKER".to_string(), "mqtt.example.com".to_string());
let mqtt_result = convert_mqtt_config(&case_insensitive_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.broker, "mqtt.example.com");
}
#[test]
fn test_convert_config_with_mixed_case_keys() {
let mut mixed_case_config = HashMap::new();
mixed_case_config.insert("EndPoint".to_string(), "http://example.com".to_string());
let webhook_result = convert_webhook_config(&mixed_case_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.endpoint, "http://example.com");
let mut mixed_case_mqtt_config = HashMap::new();
mixed_case_mqtt_config.insert("BroKer".to_string(), "mqtt.example.com".to_string());
let mqtt_result = convert_mqtt_config(&mixed_case_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.broker, "mqtt.example.com");
}
#[test]
fn test_convert_config_with_snake_case_keys() {
let mut snake_case_config = HashMap::new();
snake_case_config.insert("end_point".to_string(), "http://example.com".to_string());
let webhook_result = convert_webhook_config(&snake_case_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.endpoint, "http://example.com");
let mut snake_case_mqtt_config = HashMap::new();
snake_case_mqtt_config.insert("bro_ker".to_string(), "mqtt.example.com".to_string());
let mqtt_result = convert_mqtt_config(&snake_case_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.broker, "mqtt.example.com");
}
#[test]
fn test_convert_config_with_kebab_case_keys() {
let mut kebab_case_config = HashMap::new();
kebab_case_config.insert("end-point".to_string(), "http://example.com".to_string());
let webhook_result = convert_webhook_config(&kebab_case_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.endpoint, "http://example.com");
let mut kebab_case_mqtt_config = HashMap::new();
kebab_case_mqtt_config.insert("bro-ker".to_string(), "mqtt.example.com".to_string());
let mqtt_result = convert_mqtt_config(&kebab_case_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.broker, "mqtt.example.com");
}
#[test]
fn test_convert_config_with_camel_case_keys() {
let mut camel_case_config = HashMap::new();
camel_case_config.insert("endPoint".to_string(), "http://example.com".to_string());
let webhook_result = convert_webhook_config(&camel_case_config);
assert!(webhook_result.is_ok());
let args = webhook_result.unwrap();
assert_eq!(args.endpoint, "http://example.com");
let mut camel_case_mqtt_config = HashMap::new();
camel_case_mqtt_config.insert("broKer".to_string(), "mqtt.example.com".to_string());
let mqtt_result = convert_mqtt_config(&camel_case_mqtt_config);
assert!(mqtt_result.is_ok());
let args = mqtt_result.unwrap();
assert_eq!(args.broker, "mqtt.example.com");
}
}

View File

@@ -0,0 +1,5 @@
pub mod config;
pub mod webhook;
pub mod mqtt;
pub mod help;
pub mod legacy;

View File

@@ -0,0 +1,114 @@
use serde::{Deserialize, Serialize};
/// MQTTArgs - MQTT target arguments.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MQTTArgs {
pub enable: bool,
pub broker: String,
pub topic: String,
pub qos: u8,
pub username: String,
pub password: String,
pub reconnect_interval: u64,
pub keep_alive_interval: u64,
#[serde(skip)]
pub root_cas: Option<()>, // Placeholder for *x509.CertPool
pub queue_dir: String,
pub queue_limit: u64,
}
impl MQTTArgs {
/// Create a new configuration with default values.
pub fn new() -> Self {
Self {
enable: false,
broker: "".to_string(),
topic: "".to_string(),
qos: 0,
username: "".to_string(),
password: "".to_string(),
reconnect_interval: 0,
keep_alive_interval: 0,
root_cas: None,
queue_dir: "".to_string(),
queue_limit: 0,
}
}
/// Validate MQTTArgs fields
pub fn validate(&self) -> Result<(), String> {
if !self.enable {
return Ok(());
}
if self.broker.trim().is_empty() {
return Err("MQTT broker cannot be empty".to_string());
}
if self.topic.trim().is_empty() {
return Err("MQTT topic cannot be empty".to_string());
}
if self.queue_dir != "" && !self.queue_dir.starts_with('/') {
return Err("queueDir path should be absolute".to_string());
}
if self.qos == 0 && self.queue_dir != "" {
return Err("qos should be set to 1 or 2 if queueDir is set".to_string());
}
Ok(())
}
}
impl Default for MQTTArgs {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mqtt_args_new() {
let args = MQTTArgs::new();
assert_eq!(args.broker, "");
assert_eq!(args.topic, "");
assert_eq!(args.qos, 0);
assert_eq!(args.username, "");
assert_eq!(args.password, "");
assert_eq!(args.reconnect_interval, 0);
assert_eq!(args.keep_alive_interval, 0);
assert!(args.root_cas.is_none());
assert_eq!(args.queue_dir, "");
assert_eq!(args.queue_limit, 0);
assert!(!args.enable);
}
#[test]
fn test_mqtt_args_validate() {
let mut args = MQTTArgs::new();
assert!(args.validate().is_ok());
args.broker = "".to_string();
assert!(args.validate().is_err());
args.broker = "localhost".to_string();
args.topic = "".to_string();
assert!(args.validate().is_err());
args.topic = "mqtt_topic".to_string();
args.reconnect_interval = 10001;
assert!(args.validate().is_err());
args.reconnect_interval = 1000;
args.keep_alive_interval = 10001;
assert!(args.validate().is_err());
args.keep_alive_interval = 1000;
args.queue_limit = 10001;
assert!(args.validate().is_err());
args.queue_dir = "invalid_path".to_string();
assert!(args.validate().is_err());
args.queue_dir = "/valid_path".to_string();
assert!(args.validate().is_ok());
args.qos = 0;
assert!(args.validate().is_err());
args.qos = 1;
assert!(args.validate().is_ok());
args.qos = 2;
assert!(args.validate().is_ok());
}
}

View File

@@ -0,0 +1,80 @@
use serde::{Deserialize, Serialize};
/// WebhookArgs - Webhook target arguments.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookArgs {
pub enable: bool,
pub endpoint: String,
pub auth_token: String,
#[serde(skip)]
pub transport: Option<()>, // Placeholder for *http.Transport
pub queue_dir: String,
pub queue_limit: u64,
pub client_cert: String,
pub client_key: String,
}
impl WebhookArgs {
/// Create a new configuration with default values.
pub fn new() -> Self {
Self {
enable: false,
endpoint: "".to_string(),
auth_token: "".to_string(),
transport: None,
queue_dir: "".to_string(),
queue_limit: 0,
client_cert: "".to_string(),
client_key: "".to_string(),
}
}
/// Validate WebhookArgs fields
pub fn validate(&self) -> Result<(), String> {
if !self.enable {
return Ok(());
}
if self.endpoint.trim().is_empty() {
return Err("endpoint empty".to_string());
}
if self.queue_dir != "" && !self.queue_dir.starts_with('/') {
return Err("queueDir path should be absolute".to_string());
}
if (self.client_cert != "" && self.client_key == "") || (self.client_cert == "" && self.client_key != "") {
return Err("cert and key must be specified as a pair".to_string());
}
Ok(())
}
}
impl Default for WebhookArgs {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_webhook_args_new() {
let args = WebhookArgs::new();
assert_eq!(args.endpoint, "");
assert_eq!(args.auth_token, "");
assert!(args.transport.is_none());
assert_eq!(args.queue_dir, "");
assert_eq!(args.queue_limit, 0);
assert_eq!(args.client_cert, "");
assert_eq!(args.client_key, "");
assert!(!args.enable);
}
#[test]
fn test_webhook_args_validate() {
let mut args = WebhookArgs::new();
assert!(args.validate().is_err());
args.endpoint = "http://example.com".to_string();
assert!(args.validate().is_ok());
}
}

View File

@@ -43,7 +43,7 @@ const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook";
/// # Example
///
/// ```
/// use rustfs_event::ChannelAdapterType;
/// use rustfs_notify::ChannelAdapterType;
///
/// let adapter_type = ChannelAdapterType::Webhook;
/// match adapter_type {