diff --git a/Cargo.lock b/Cargo.lock index 57daec18..546350f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8341,6 +8341,7 @@ dependencies = [ "quick-xml", "reqwest", "rumqttc", + "rustfs-config", "rustfs-utils", "serde", "serde_json", diff --git a/cli/rustfs-gui/src/utils/config.rs b/cli/rustfs-gui/src/utils/config.rs index ca125cdc..66b2ad86 100644 --- a/cli/rustfs-gui/src/utils/config.rs +++ b/cli/rustfs-gui/src/utils/config.rs @@ -201,7 +201,7 @@ impl RustFSConfig { /// Clear the stored configuration from the system keyring /// /// # Returns - /// Returns `Ok(())` if the configuration was successfully cleared, or an error if the operation failed. + /// `Ok(())` if the configuration was successfully cleared, or an error if the operation failed. /// /// # Example /// ``` @@ -525,14 +525,14 @@ mod tests { host: "127.0.0.1".to_string(), port: "9000".to_string(), access_key: "用户名".to_string(), - secret_key: "密码123".to_string(), + secret_key: "密码 123".to_string(), domain_name: "测试.com".to_string(), volume_name: "/数据/存储".to_string(), console_address: "127.0.0.1:9001".to_string(), }; assert_eq!(config.access_key, "用户名"); - assert_eq!(config.secret_key, "密码123"); + assert_eq!(config.secret_key, "密码 123"); assert_eq!(config.domain_name, "测试.com"); assert_eq!(config.volume_name, "/数据/存储"); } diff --git a/crates/config/Cargo.toml b/crates/config/Cargo.toml index 0256f30e..3069ba94 100644 --- a/crates/config/Cargo.toml +++ b/crates/config/Cargo.toml @@ -18,5 +18,6 @@ workspace = true [features] default = [] constants = ["dep:const-str"] +notify = [] observability = [] diff --git a/crates/config/src/constants/env.rs b/crates/config/src/constants/env.rs new file mode 100644 index 00000000..12efd1a5 --- /dev/null +++ b/crates/config/src/constants/env.rs @@ -0,0 +1,7 @@ +pub const DEFAULT_DELIMITER: &str = "_"; +pub const ENV_PREFIX: &str = "RUSTFS_"; +pub const ENV_WORD_DELIMITER: &str = "_"; + +/// Medium-drawn lines separator +/// This is used to separate words in environment variable names. +pub const ENV_WORD_DELIMITER_DASH: &str = "-"; diff --git a/crates/config/src/constants/mod.rs b/crates/config/src/constants/mod.rs index 04023c88..2df1ffdc 100644 --- a/crates/config/src/constants/mod.rs +++ b/crates/config/src/constants/mod.rs @@ -1 +1,2 @@ pub(crate) mod app; +pub(crate) mod env; diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index da496971..e63a2221 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -2,6 +2,9 @@ pub mod constants; #[cfg(feature = "constants")] pub use constants::app::*; - +#[cfg(feature = "constants")] +pub use constants::env::*; +#[cfg(feature = "notify")] +pub mod notify; #[cfg(feature = "observability")] pub mod observability; diff --git a/crates/config/src/notify/arn.rs b/crates/config/src/notify/arn.rs new file mode 100644 index 00000000..ceaf0970 --- /dev/null +++ b/crates/config/src/notify/arn.rs @@ -0,0 +1,7 @@ +pub const DEFAULT_ARN_PARTITION: &str = "rustfs"; + +pub const DEFAULT_ARN_SERVICE: &str = "sqs"; + +/// Default ARN prefix for SQS +/// "arn:rustfs:sqs:" +pub const ARN_PREFIX: &str = const_str::concat!("arn:", DEFAULT_ARN_PARTITION, ":", DEFAULT_ARN_SERVICE, ":"); diff --git a/crates/config/src/notify/mod.rs b/crates/config/src/notify/mod.rs new file mode 100644 index 00000000..fd7625f2 --- /dev/null +++ b/crates/config/src/notify/mod.rs @@ -0,0 +1,38 @@ +mod arn; +mod mqtt; +mod store; +mod webhook; + +pub use arn::*; +pub use mqtt::*; +pub use store::*; +pub use webhook::*; + +// --- Configuration Constants --- +pub const DEFAULT_TARGET: &str = "1"; + +pub const NOTIFY_PREFIX: &str = "notify"; + +pub const NOTIFY_ROUTE_PREFIX: &str = "notify_"; + +#[allow(dead_code)] +pub const NOTIFY_SUB_SYSTEMS: &[&str] = &[NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS]; + +#[allow(dead_code)] +pub const NOTIFY_KAFKA_SUB_SYS: &str = "notify_kafka"; +pub const NOTIFY_MQTT_SUB_SYS: &str = "notify_mqtt"; +#[allow(dead_code)] +pub const NOTIFY_MY_SQL_SUB_SYS: &str = "notify_mysql"; +#[allow(dead_code)] +pub const NOTIFY_NATS_SUB_SYS: &str = "notify_nats"; +#[allow(dead_code)] +pub const NOTIFY_NSQ_SUB_SYS: &str = "notify_nsq"; +#[allow(dead_code)] +pub const NOTIFY_ES_SUB_SYS: &str = "notify_elasticsearch"; +#[allow(dead_code)] +pub const NOTIFY_AMQP_SUB_SYS: &str = "notify_amqp"; +#[allow(dead_code)] +pub const NOTIFY_POSTGRES_SUB_SYS: &str = "notify_postgres"; +#[allow(dead_code)] +pub const NOTIFY_REDIS_SUB_SYS: &str = "notify_redis"; +pub const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook"; diff --git a/crates/config/src/notify/mqtt.rs b/crates/config/src/notify/mqtt.rs new file mode 100644 index 00000000..8177b3d5 --- /dev/null +++ b/crates/config/src/notify/mqtt.rs @@ -0,0 +1,22 @@ +// MQTT Keys +pub const MQTT_BROKER: &str = "broker"; +pub const MQTT_TOPIC: &str = "topic"; +pub const MQTT_QOS: &str = "qos"; +pub const MQTT_USERNAME: &str = "username"; +pub const MQTT_PASSWORD: &str = "password"; +pub const MQTT_RECONNECT_INTERVAL: &str = "reconnect_interval"; +pub const MQTT_KEEP_ALIVE_INTERVAL: &str = "keep_alive_interval"; +pub const MQTT_QUEUE_DIR: &str = "queue_dir"; +pub const MQTT_QUEUE_LIMIT: &str = "queue_limit"; + +// MQTT Environment Variables +pub const ENV_MQTT_ENABLE: &str = "RUSTFS_NOTIFY_MQTT_ENABLE"; +pub const ENV_MQTT_BROKER: &str = "RUSTFS_NOTIFY_MQTT_BROKER"; +pub const ENV_MQTT_TOPIC: &str = "RUSTFS_NOTIFY_MQTT_TOPIC"; +pub const ENV_MQTT_QOS: &str = "RUSTFS_NOTIFY_MQTT_QOS"; +pub const ENV_MQTT_USERNAME: &str = "RUSTFS_NOTIFY_MQTT_USERNAME"; +pub const ENV_MQTT_PASSWORD: &str = "RUSTFS_NOTIFY_MQTT_PASSWORD"; +pub const ENV_MQTT_RECONNECT_INTERVAL: &str = "RUSTFS_NOTIFY_MQTT_RECONNECT_INTERVAL"; +pub const ENV_MQTT_KEEP_ALIVE_INTERVAL: &str = "RUSTFS_NOTIFY_MQTT_KEEP_ALIVE_INTERVAL"; +pub const ENV_MQTT_QUEUE_DIR: &str = "RUSTFS_NOTIFY_MQTT_QUEUE_DIR"; +pub const ENV_MQTT_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_MQTT_QUEUE_LIMIT"; diff --git a/crates/config/src/notify/store.rs b/crates/config/src/notify/store.rs new file mode 100644 index 00000000..604987f1 --- /dev/null +++ b/crates/config/src/notify/store.rs @@ -0,0 +1,7 @@ +pub const DEFAULT_DIR: &str = "/opt/rustfs/events"; // Default directory for event store +pub const DEFAULT_LIMIT: u64 = 100000; // Default store limit +pub const DEFAULT_EXT: &str = ".unknown"; // Default file extension +pub const COMPRESS_EXT: &str = ".snappy"; // Extension for compressed files + +/// STORE_EXTENSION - file extension of an event file in store +pub const STORE_EXTENSION: &str = ".event"; diff --git a/crates/config/src/notify/webhook.rs b/crates/config/src/notify/webhook.rs new file mode 100644 index 00000000..8b9d938b --- /dev/null +++ b/crates/config/src/notify/webhook.rs @@ -0,0 +1,16 @@ +// Webhook Keys +pub const WEBHOOK_ENDPOINT: &str = "endpoint"; +pub const WEBHOOK_AUTH_TOKEN: &str = "auth_token"; +pub const WEBHOOK_QUEUE_LIMIT: &str = "queue_limit"; +pub const WEBHOOK_QUEUE_DIR: &str = "queue_dir"; +pub const WEBHOOK_CLIENT_CERT: &str = "client_cert"; +pub const WEBHOOK_CLIENT_KEY: &str = "client_key"; + +// Webhook Environment Variables +pub const ENV_WEBHOOK_ENABLE: &str = "RUSTFS_NOTIFY_WEBHOOK_ENABLE"; +pub const ENV_WEBHOOK_ENDPOINT: &str = "RUSTFS_NOTIFY_WEBHOOK_ENDPOINT"; +pub const ENV_WEBHOOK_AUTH_TOKEN: &str = "RUSTFS_NOTIFY_WEBHOOK_AUTH_TOKEN"; +pub const ENV_WEBHOOK_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_LIMIT"; +pub const ENV_WEBHOOK_QUEUE_DIR: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR"; +pub const ENV_WEBHOOK_CLIENT_CERT: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_CERT"; +pub const ENV_WEBHOOK_CLIENT_KEY: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_KEY"; diff --git a/crates/notify/Cargo.toml b/crates/notify/Cargo.toml index 9737075b..cbd02707 100644 --- a/crates/notify/Cargo.toml +++ b/crates/notify/Cargo.toml @@ -7,6 +7,7 @@ rust-version.workspace = true version.workspace = true [dependencies] +rustfs-config = { workspace = true, features = ["notify"] } rustfs-utils = { workspace = true, features = ["path", "sys"] } async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } diff --git a/crates/notify/examples/full_demo.rs b/crates/notify/examples/full_demo.rs index 181f850e..6723a4b5 100644 --- a/crates/notify/examples/full_demo.rs +++ b/crates/notify/examples/full_demo.rs @@ -1,10 +1,10 @@ use ecstore::config::{Config, ENABLE_KEY, ENABLE_ON, KV, KVS}; -use rustfs_notify::arn::TargetID; -use rustfs_notify::factory::{ - DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME, - NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, +use rustfs_config::notify::{ + DEFAULT_LIMIT, DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, + MQTT_USERNAME, NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, + WEBHOOK_QUEUE_LIMIT, }; -use rustfs_notify::store::DEFAULT_LIMIT; +use rustfs_notify::arn::TargetID; use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger}; use rustfs_notify::{initialize, notification_system}; use std::sync::Arc; diff --git a/crates/notify/examples/full_demo_one.rs b/crates/notify/examples/full_demo_one.rs index 381287bc..26539860 100644 --- a/crates/notify/examples/full_demo_one.rs +++ b/crates/notify/examples/full_demo_one.rs @@ -1,14 +1,14 @@ use ecstore::config::{Config, ENABLE_KEY, ENABLE_ON, KV, KVS}; -use std::sync::Arc; // Using Global Accessories -use rustfs_notify::arn::TargetID; -use rustfs_notify::factory::{ - DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME, - NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, +use rustfs_config::notify::{ + DEFAULT_LIMIT, DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, + MQTT_USERNAME, NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, + WEBHOOK_QUEUE_LIMIT, }; -use rustfs_notify::store::DEFAULT_LIMIT; +use rustfs_notify::arn::TargetID; use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger}; use rustfs_notify::{initialize, notification_system}; +use std::sync::Arc; use std::time::Duration; use tracing::info; diff --git a/crates/notify/examples/webhook.rs b/crates/notify/examples/webhook.rs index 0357b6cf..dcba7049 100644 --- a/crates/notify/examples/webhook.rs +++ b/crates/notify/examples/webhook.rs @@ -6,6 +6,7 @@ use axum::{ routing::post, }; use serde_json::Value; +use std::net::SocketAddr; use std::time::{SystemTime, UNIX_EPOCH}; use axum::extract::Query; @@ -17,7 +18,9 @@ struct ResetParams { } // Define a global variable and count the number of data received +use rustfs_utils::parse_and_resolve_address; use std::sync::atomic::{AtomicU64, Ordering}; +use tokio::net::TcpListener; static WEBHOOK_COUNT: AtomicU64 = AtomicU64::new(0); @@ -30,16 +33,23 @@ async fn main() { .route("/webhook/reset", get(reset_webhook_count)) .route("/webhook", get(receive_webhook)); // Start the server - let addr = "0.0.0.0:3020"; - let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); - println!("Server running on {}", addr); + // let addr = "[0.0.0.0.0.0.0.0]:3020"; + let server_addr = match parse_and_resolve_address(":3020") { + Ok(addr) => addr, + Err(e) => { + eprintln!("Failed to parse address: {}", e); + return; + } + }; + let listener = TcpListener::bind(server_addr).await.unwrap(); + println!("Server running on {}", server_addr); // Self-checking after the service is started tokio::spawn(async move { // Give the server some time to start tokio::time::sleep(std::time::Duration::from_secs(1)).await; - match is_service_active(addr).await { + match is_service_active(server_addr).await { Ok(true) => println!("Service health check: Successful - Service is running normally"), Ok(false) => eprintln!("Service Health Check: Failed - Service Not Responded"), Err(e) => eprintln!("Service health check errors:{}", e), @@ -106,7 +116,7 @@ async fn reset_webhook_count(Query(params): Query, headers: HeaderM .unwrap() } -async fn is_service_active(addr: &str) -> Result { +async fn is_service_active(addr: SocketAddr) -> Result { let socket_addr = tokio::net::lookup_host(addr) .await .map_err(|e| format!("Unable to resolve host:{}", e))? diff --git a/crates/notify/src/arn.rs b/crates/notify/src/arn.rs index 9be689b8..c84ff5bb 100644 --- a/crates/notify/src/arn.rs +++ b/crates/notify/src/arn.rs @@ -1,18 +1,10 @@ use crate::TargetError; -use const_str::concat; +use rustfs_config::notify::{ARN_PREFIX, DEFAULT_ARN_PARTITION, DEFAULT_ARN_SERVICE}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt; use std::str::FromStr; use thiserror::Error; -pub(crate) const DEFAULT_ARN_PARTITION: &str = "rustfs"; - -pub(crate) const DEFAULT_ARN_SERVICE: &str = "sqs"; - -/// Default ARN prefix for SQS -/// "arn:rustfs:sqs:" -const ARN_PREFIX: &str = concat!("arn:", DEFAULT_ARN_PARTITION, ":", DEFAULT_ARN_SERVICE, ":"); - #[derive(Debug, Error)] pub enum TargetIDError { #[error("Invalid TargetID format '{0}', expect 'ID:Name'")] diff --git a/crates/notify/src/factory.rs b/crates/notify/src/factory.rs index b21ed419..b7f4a5c3 100644 --- a/crates/notify/src/factory.rs +++ b/crates/notify/src/factory.rs @@ -1,4 +1,3 @@ -use crate::store::DEFAULT_LIMIT; use crate::{ error::TargetError, target::{Target, mqtt::MQTTArgs, webhook::WebhookArgs}, @@ -6,85 +5,30 @@ use crate::{ use async_trait::async_trait; use ecstore::config::{ENABLE_KEY, ENABLE_ON, KVS}; use rumqttc::QoS; +use rustfs_config::notify::{ + DEFAULT_DIR, DEFAULT_LIMIT, ENV_MQTT_BROKER, ENV_MQTT_ENABLE, ENV_MQTT_KEEP_ALIVE_INTERVAL, ENV_MQTT_PASSWORD, ENV_MQTT_QOS, + ENV_MQTT_QUEUE_DIR, ENV_MQTT_QUEUE_LIMIT, ENV_MQTT_RECONNECT_INTERVAL, ENV_MQTT_TOPIC, ENV_MQTT_USERNAME, + ENV_WEBHOOK_AUTH_TOKEN, ENV_WEBHOOK_CLIENT_CERT, ENV_WEBHOOK_CLIENT_KEY, ENV_WEBHOOK_ENABLE, ENV_WEBHOOK_ENDPOINT, + ENV_WEBHOOK_QUEUE_DIR, ENV_WEBHOOK_QUEUE_LIMIT, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, + MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, + WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, +}; +use rustfs_config::{DEFAULT_DELIMITER, ENV_WORD_DELIMITER_DASH}; use std::time::Duration; -use tracing::warn; +use tracing::{debug, warn}; use url::Url; -// --- Configuration Constants --- - -// General - -pub const DEFAULT_TARGET: &str = "1"; - -#[allow(dead_code)] -pub const NOTIFY_KAFKA_SUB_SYS: &str = "notify_kafka"; -#[allow(dead_code)] -pub const NOTIFY_MQTT_SUB_SYS: &str = "notify_mqtt"; -#[allow(dead_code)] -pub const NOTIFY_MY_SQL_SUB_SYS: &str = "notify_mysql"; -#[allow(dead_code)] -pub const NOTIFY_NATS_SUB_SYS: &str = "notify_nats"; -#[allow(dead_code)] -pub const NOTIFY_NSQ_SUB_SYS: &str = "notify_nsq"; -#[allow(dead_code)] -pub const NOTIFY_ES_SUB_SYS: &str = "notify_elasticsearch"; -#[allow(dead_code)] -pub const NOTIFY_AMQP_SUB_SYS: &str = "notify_amqp"; -#[allow(dead_code)] -pub const NOTIFY_POSTGRES_SUB_SYS: &str = "notify_postgres"; -#[allow(dead_code)] -pub const NOTIFY_REDIS_SUB_SYS: &str = "notify_redis"; -pub const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook"; - -#[allow(dead_code)] -pub const NOTIFY_SUB_SYSTEMS: &[&str] = &[NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS]; - -// Webhook Keys -pub const WEBHOOK_ENDPOINT: &str = "endpoint"; -pub const WEBHOOK_AUTH_TOKEN: &str = "auth_token"; -pub const WEBHOOK_QUEUE_LIMIT: &str = "queue_limit"; -pub const WEBHOOK_QUEUE_DIR: &str = "queue_dir"; -pub const WEBHOOK_CLIENT_CERT: &str = "client_cert"; -pub const WEBHOOK_CLIENT_KEY: &str = "client_key"; - -// Webhook Environment Variables -const ENV_WEBHOOK_ENABLE: &str = "RUSTFS_NOTIFY_WEBHOOK_ENABLE"; -const ENV_WEBHOOK_ENDPOINT: &str = "RUSTFS_NOTIFY_WEBHOOK_ENDPOINT"; -const ENV_WEBHOOK_AUTH_TOKEN: &str = "RUSTFS_NOTIFY_WEBHOOK_AUTH_TOKEN"; -const ENV_WEBHOOK_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_LIMIT"; -const ENV_WEBHOOK_QUEUE_DIR: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR"; -const ENV_WEBHOOK_CLIENT_CERT: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_CERT"; -const ENV_WEBHOOK_CLIENT_KEY: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_KEY"; - -// MQTT Keys -pub const MQTT_BROKER: &str = "broker"; -pub const MQTT_TOPIC: &str = "topic"; -pub const MQTT_QOS: &str = "qos"; -pub const MQTT_USERNAME: &str = "username"; -pub const MQTT_PASSWORD: &str = "password"; -pub const MQTT_RECONNECT_INTERVAL: &str = "reconnect_interval"; -pub const MQTT_KEEP_ALIVE_INTERVAL: &str = "keep_alive_interval"; -pub const MQTT_QUEUE_DIR: &str = "queue_dir"; -pub const MQTT_QUEUE_LIMIT: &str = "queue_limit"; - -// MQTT Environment Variables -const ENV_MQTT_ENABLE: &str = "RUSTFS_NOTIFY_MQTT_ENABLE"; -const ENV_MQTT_BROKER: &str = "RUSTFS_NOTIFY_MQTT_BROKER"; -const ENV_MQTT_TOPIC: &str = "RUSTFS_NOTIFY_MQTT_TOPIC"; -const ENV_MQTT_QOS: &str = "RUSTFS_NOTIFY_MQTT_QOS"; -const ENV_MQTT_USERNAME: &str = "RUSTFS_NOTIFY_MQTT_USERNAME"; -const ENV_MQTT_PASSWORD: &str = "RUSTFS_NOTIFY_MQTT_PASSWORD"; -const ENV_MQTT_RECONNECT_INTERVAL: &str = "RUSTFS_NOTIFY_MQTT_RECONNECT_INTERVAL"; -const ENV_MQTT_KEEP_ALIVE_INTERVAL: &str = "RUSTFS_NOTIFY_MQTT_KEEP_ALIVE_INTERVAL"; -const ENV_MQTT_QUEUE_DIR: &str = "RUSTFS_NOTIFY_MQTT_QUEUE_DIR"; -const ENV_MQTT_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_MQTT_QUEUE_LIMIT"; - /// Helper function to get values from environment variables or KVS configurations. /// /// It will give priority to reading from environment variables such as `BASE_ENV_KEY_ID` and fall back to the KVS configuration if it fails. fn get_config_value(id: &str, base_env_key: &str, config_key: &str, config: &KVS) -> Option { - let env_key = if id != DEFAULT_TARGET { - format!("{}_{}", base_env_key, id.to_uppercase().replace('-', "_")) + let env_key = if id != DEFAULT_DELIMITER { + format!( + "{}{}{}", + base_env_key, + DEFAULT_DELIMITER, + id.to_uppercase().replace(ENV_WORD_DELIMITER_DASH, DEFAULT_DELIMITER) + ) } else { base_env_key.to_string() }; @@ -123,11 +67,12 @@ impl TargetFactory for WebhookTargetFactory { let endpoint = get(ENV_WEBHOOK_ENDPOINT, WEBHOOK_ENDPOINT) .ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?; - let endpoint_url = - Url::parse(&endpoint).map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {}", e)))?; + let endpoint_url = Url::parse(&endpoint) + .map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {} (value: '{}')", e, endpoint)))?; let auth_token = get(ENV_WEBHOOK_AUTH_TOKEN, WEBHOOK_AUTH_TOKEN).unwrap_or_default(); - let queue_dir = get(ENV_WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_DIR).unwrap_or_default(); + let queue_dir = get(ENV_WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_DIR) + .unwrap_or(DEFAULT_DIR.to_string()); let queue_limit = get(ENV_WEBHOOK_QUEUE_LIMIT, WEBHOOK_QUEUE_LIMIT) .and_then(|v| v.parse::().ok()) @@ -163,7 +108,10 @@ impl TargetFactory for WebhookTargetFactory { let endpoint = get(ENV_WEBHOOK_ENDPOINT, WEBHOOK_ENDPOINT) .ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?; - Url::parse(&endpoint).map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {}", e)))?; + debug!("endpoint: {}", endpoint); + let parsed_endpoint = endpoint.trim(); + Url::parse(parsed_endpoint) + .map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {} (value: '{}')", e, parsed_endpoint)))?; let client_cert = get(ENV_WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_CERT).unwrap_or_default(); let client_key = get(ENV_WEBHOOK_CLIENT_KEY, WEBHOOK_CLIENT_KEY).unwrap_or_default(); @@ -174,7 +122,9 @@ impl TargetFactory for WebhookTargetFactory { )); } - let queue_dir = get(ENV_WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_DIR).unwrap_or_default(); + let queue_dir = get(ENV_WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_DIR) + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_DIR.to_string()); if !queue_dir.is_empty() && !std::path::Path::new(&queue_dir).is_absolute() { return Err(TargetError::Configuration("Webhook queue directory must be an absolute path".to_string())); } @@ -201,7 +151,8 @@ impl TargetFactory for MQTTTargetFactory { let broker = get(ENV_MQTT_BROKER, MQTT_BROKER).ok_or_else(|| TargetError::Configuration("Missing MQTT broker".to_string()))?; - let broker_url = Url::parse(&broker).map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {}", e)))?; + let broker_url = Url::parse(&broker) + .map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {} (value: '{}')", e, broker)))?; let topic = get(ENV_MQTT_TOPIC, MQTT_TOPIC).ok_or_else(|| TargetError::Configuration("Missing MQTT topic".to_string()))?; @@ -229,7 +180,9 @@ impl TargetFactory for MQTTTargetFactory { .map(Duration::from_secs) .unwrap_or_else(|| Duration::from_secs(30)); - let queue_dir = get(ENV_MQTT_QUEUE_DIR, MQTT_QUEUE_DIR).unwrap_or_default(); + let queue_dir = get(ENV_MQTT_QUEUE_DIR, MQTT_QUEUE_DIR) + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_DIR.to_string()); let queue_limit = get(ENV_MQTT_QUEUE_LIMIT, MQTT_QUEUE_LIMIT) .and_then(|v| v.parse::().ok()) .unwrap_or(DEFAULT_LIMIT); @@ -264,7 +217,8 @@ impl TargetFactory for MQTTTargetFactory { let broker = get(ENV_MQTT_BROKER, MQTT_BROKER).ok_or_else(|| TargetError::Configuration("Missing MQTT broker".to_string()))?; - let url = Url::parse(&broker).map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {}", e)))?; + let url = Url::parse(&broker) + .map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {} (value: '{}')", e, broker)))?; match url.scheme() { "tcp" | "ssl" | "ws" | "wss" | "mqtt" | "mqtts" => {} @@ -286,7 +240,9 @@ impl TargetFactory for MQTTTargetFactory { } } - let queue_dir = get(ENV_MQTT_QUEUE_DIR, MQTT_QUEUE_DIR).unwrap_or_default(); + let queue_dir = get(ENV_MQTT_QUEUE_DIR, MQTT_QUEUE_DIR) + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_DIR.to_string()); if !queue_dir.is_empty() { if !std::path::Path::new(&queue_dir).is_absolute() { return Err(TargetError::Configuration("MQTT queue directory must be an absolute path".to_string())); diff --git a/crates/notify/src/global.rs b/crates/notify/src/global.rs index ebae7b84..3014a096 100644 --- a/crates/notify/src/global.rs +++ b/crates/notify/src/global.rs @@ -2,10 +2,11 @@ use crate::{Event, EventArgs, NotificationError, NotificationSystem}; use ecstore::config::Config; use once_cell::sync::Lazy; use std::sync::{Arc, OnceLock}; +use tracing::instrument; static NOTIFICATION_SYSTEM: OnceLock> = OnceLock::new(); // Create a globally unique Notifier instance -pub static GLOBAL_NOTIFIER: Lazy = Lazy::new(|| Notifier {}); +static GLOBAL_NOTIFIER: Lazy = Lazy::new(|| Notifier {}); /// Initialize the global notification system with the given configuration. /// This function should only be called once throughout the application life cycle. @@ -27,14 +28,22 @@ pub fn notification_system() -> Option> { NOTIFICATION_SYSTEM.get().cloned() } -pub struct Notifier { - // Notifier can hold state, but in this design we make it stateless, - // Rely on getting an instance of NotificationSystem from the outside. +/// Check if the notification system has been initialized. +pub fn is_notification_system_initialized() -> bool { + NOTIFICATION_SYSTEM.get().is_some() } +/// Returns a reference to the global Notifier instance. +pub fn notifier_instance() -> &'static Notifier { + &GLOBAL_NOTIFIER +} + +pub struct Notifier {} + impl Notifier { /// Notify an event asynchronously. /// This is the only entry point for all event notifications in the system. + #[instrument(skip(self, args))] pub async fn notify(&self, args: EventArgs) { // Dependency injection or service positioning mode obtain NotificationSystem instance let notification_sys = match notification_system() { @@ -51,6 +60,11 @@ impl Notifier { return; } + // Check if any subscribers are interested in the event + if !notification_sys.has_subscriber(&args.bucket_name, &args.event_name).await { + return; + } + // Create an event and send it let event = Arc::new(Event::new(args)); notification_sys.send_event(event).await; diff --git a/crates/notify/src/registry.rs b/crates/notify/src/registry.rs index 748b5356..d706ed63 100644 --- a/crates/notify/src/registry.rs +++ b/crates/notify/src/registry.rs @@ -5,8 +5,10 @@ use crate::{ target::Target, }; use ecstore::config::{Config, ENABLE_KEY, ENABLE_OFF, ENABLE_ON, KVS}; +use rustfs_config::notify::NOTIFY_ROUTE_PREFIX; +use rustfs_config::{DEFAULT_DELIMITER, ENV_PREFIX}; use std::collections::HashMap; -use tracing::{error, info}; +use tracing::{debug, error, info}; /// Registry for managing target factories pub struct TargetRegistry { @@ -64,20 +66,58 @@ impl TargetRegistry { // Iterate through configuration sections for (section, subsections) in &config.0 { // Only process notification sections - if !section.starts_with("notify_") { + if !section.starts_with(NOTIFY_ROUTE_PREFIX) { continue; } // Extract target type from section name - let target_type = section.trim_start_matches("notify_"); + let target_type = section.trim_start_matches(NOTIFY_ROUTE_PREFIX); // Iterate through subsections (each representing a target instance) for (target_id, target_config) in subsections { // Skip disabled targets - if target_config.lookup(ENABLE_KEY).unwrap_or_else(|| ENABLE_OFF.to_string()) != ENABLE_ON { + + let enable_from_config = target_config.lookup(ENABLE_KEY).unwrap_or_else(|| ENABLE_OFF.to_string()); + debug!("Target enablement from config: {}/{}: {}", target_type, target_id, enable_from_config); + // Check environment variable for target enablement example: RUSTFS_NOTIFY_WEBHOOK_ENABLE|RUSTFS_NOTIFY_WEBHOOK_ENABLE_[TARGET_ID] + let env_key = if target_id == DEFAULT_DELIMITER { + // If no specific target ID, use the base target type, example: RUSTFS_NOTIFY_WEBHOOK_ENABLE + format!( + "{}{}{}{}{}", + ENV_PREFIX, + NOTIFY_ROUTE_PREFIX, + target_type.to_uppercase(), + DEFAULT_DELIMITER, + ENABLE_KEY + ) + } else { + // If specific target ID, append it to the key, example: RUSTFS_NOTIFY_WEBHOOK_ENABLE_[TARGET_ID] + format!( + "{}{}{}{}{}{}{}", + ENV_PREFIX, + NOTIFY_ROUTE_PREFIX, + target_type.to_uppercase(), + DEFAULT_DELIMITER, + ENABLE_KEY, + DEFAULT_DELIMITER, + target_id.to_uppercase() + ) + } + .to_uppercase(); + debug!("Target env key: {},Target id: {}", env_key, target_id); + let enable_from_env = std::env::var(&env_key) + .map(|v| v.eq_ignore_ascii_case(ENABLE_ON) || v.eq_ignore_ascii_case("true")) + .unwrap_or(false); + debug!("Target env value: {},key: {},Target id: {}", enable_from_env, env_key, target_id); + debug!( + "Target enablement from env: {}/{}: result: {}", + target_type, target_id, enable_from_config + ); + if enable_from_config != ENABLE_ON && !enable_from_env { + info!("Skipping disabled target: {}/{}", target_type, target_id); continue; } - + debug!("create target: {}/{} start", target_type, target_id); // Create target match self.create_target(target_type, target_id.clone(), target_config).await { Ok(target) => { @@ -85,7 +125,7 @@ impl TargetRegistry { targets.push(target); } Err(e) => { - error!("Failed to create target {}/{}: {}", target_type, target_id, e); + error!("Failed to create target {}/{}: reason: {}", target_type, target_id, e); } } } diff --git a/crates/notify/src/store.rs b/crates/notify/src/store.rs index fde4776a..f54d48b9 100644 --- a/crates/notify/src/store.rs +++ b/crates/notify/src/store.rs @@ -1,4 +1,5 @@ use crate::error::StoreError; +use rustfs_config::notify::{COMPRESS_EXT, DEFAULT_EXT, DEFAULT_LIMIT}; use serde::{Serialize, de::DeserializeOwned}; use snap::raw::{Decoder, Encoder}; use std::sync::{Arc, RwLock}; @@ -11,13 +12,6 @@ use std::{ use tracing::{debug, warn}; use uuid::Uuid; -pub const DEFAULT_LIMIT: u64 = 100000; // Default store limit -pub const DEFAULT_EXT: &str = ".unknown"; // Default file extension -pub const COMPRESS_EXT: &str = ".snappy"; // Extension for compressed files - -/// STORE_EXTENSION - file extension of an event file in store -pub const STORE_EXTENSION: &str = ".event"; - /// Represents a key for an entry in the store #[derive(Debug, Clone)] pub struct Key { diff --git a/crates/notify/src/target/mqtt.rs b/crates/notify/src/target/mqtt.rs index f7b589ba..7a895d08 100644 --- a/crates/notify/src/target/mqtt.rs +++ b/crates/notify/src/target/mqtt.rs @@ -1,4 +1,4 @@ -use crate::store::{Key, STORE_EXTENSION}; +use crate::store::Key; use crate::target::ChannelTargetType; use crate::{ StoreError, Target, @@ -10,6 +10,7 @@ use crate::{ use async_trait::async_trait; use rumqttc::{AsyncClient, EventLoop, MqttOptions, Outgoing, Packet, QoS}; use rumqttc::{ConnectionError, mqttbytes::Error as MqttBytesError}; +use rustfs_config::notify::STORE_EXTENSION; use std::sync::Arc; use std::{ path::PathBuf, diff --git a/crates/notify/src/target/webhook.rs b/crates/notify/src/target/webhook.rs index 91316026..b64fb915 100644 --- a/crates/notify/src/target/webhook.rs +++ b/crates/notify/src/target/webhook.rs @@ -1,4 +1,3 @@ -use crate::store::STORE_EXTENSION; use crate::target::ChannelTargetType; use crate::{ StoreError, Target, @@ -9,6 +8,7 @@ use crate::{ }; use async_trait::async_trait; use reqwest::{Client, StatusCode, Url}; +use rustfs_config::notify::STORE_EXTENSION; use std::{ path::PathBuf, sync::{ @@ -41,8 +41,8 @@ pub struct WebhookArgs { pub client_key: String, } -// WebhookArgs 的验证方法 impl WebhookArgs { + /// WebhookArgs verification method pub fn validate(&self) -> Result<(), TargetError> { if !self.enable { return Ok(()); @@ -134,7 +134,7 @@ impl WebhookTarget { target_id.name, target_id.id )); - let store = super::super::store::QueueStore::::new(queue_dir, args.queue_limit, STORE_EXTENSION); + let store = crate::store::QueueStore::::new(queue_dir, args.queue_limit, STORE_EXTENSION); if let Err(e) = store.open() { error!("Failed to open store for Webhook target {}: {}", target_id.id, e); @@ -172,9 +172,9 @@ impl WebhookTarget { } async fn init(&self) -> Result<(), TargetError> { - // 使用 CAS 操作确保线程安全初始化 + // Use CAS operations to ensure thread-safe initialization if !self.initialized.load(Ordering::SeqCst) { - // 检查连接 + // Check the connection match self.is_active().await { Ok(true) => { info!("Webhook target {} is active", self.id); @@ -209,12 +209,12 @@ impl WebhookTarget { let data = serde_json::to_vec(&log).map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {}", e)))?; - // Vec 转换为 String + // Vec Convert to String let data_string = String::from_utf8(data.clone()) .map_err(|e| TargetError::Encoding(format!("Failed to convert event data to UTF-8: {}", e)))?; debug!("Sending event to webhook target: {}, event log: {}", self.id, data_string); - // 构建请求 + // build request let mut req_builder = self .http_client .post(self.args.endpoint.as_str()) diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index dfb42f3b..6333a998 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -14,7 +14,7 @@ workspace = true default = [] [dependencies] -rustfs-config = { workspace = true, features = ["constants"] } +rustfs-config = { workspace = true, features = ["constants", "notify"] } async-trait.workspace = true backon.workspace = true blake2 = { workspace = true } @@ -92,7 +92,7 @@ urlencoding = { workspace = true } smallvec = { workspace = true } shadow-rs.workspace = true rustfs-filemeta.workspace = true -rustfs-utils ={workspace = true, features=["full"]} +rustfs-utils = { workspace = true, features = ["full"] } rustfs-rio.workspace = true futures-util.workspace = true reader = { workspace = true } diff --git a/ecstore/src/config/com.rs b/ecstore/src/config/com.rs index 7ba9e512..0dfab586 100644 --- a/ecstore/src/config/com.rs +++ b/ecstore/src/config/com.rs @@ -4,6 +4,7 @@ use crate::error::{Error, Result}; use crate::store_api::{ObjectInfo, ObjectOptions, PutObjReader, StorageAPI}; use http::HeaderMap; use lazy_static::lazy_static; +use rustfs_config::DEFAULT_DELIMITER; use rustfs_utils::path::SLASH_SEPARATOR; use std::collections::HashSet; use std::sync::Arc; @@ -13,7 +14,6 @@ pub const CONFIG_PREFIX: &str = "config"; const CONFIG_FILE: &str = "config.json"; pub const STORAGE_CLASS_SUB_SYS: &str = "storage_class"; -pub const DEFAULT_KV_KEY: &str = "_"; lazy_static! { static ref CONFIG_BUCKET: String = format!("{}{}{}", RUSTFS_META_BUCKET, SLASH_SEPARATOR, CONFIG_PREFIX); @@ -193,7 +193,7 @@ async fn apply_dynamic_config(cfg: &mut Config, api: Arc) -> R async fn apply_dynamic_config_for_sub_sys(cfg: &mut Config, api: Arc, subsys: &str) -> Result<()> { let set_drive_counts = api.set_drive_counts(); if subsys == STORAGE_CLASS_SUB_SYS { - let kvs = cfg.get_value(STORAGE_CLASS_SUB_SYS, DEFAULT_KV_KEY).unwrap_or_default(); + let kvs = cfg.get_value(STORAGE_CLASS_SUB_SYS, DEFAULT_DELIMITER).unwrap_or_default(); for (i, count) in set_drive_counts.iter().enumerate() { match storageclass::lookup_config(&kvs, *count) { diff --git a/ecstore/src/config/mod.rs b/ecstore/src/config/mod.rs index 24e8eb40..4f2ae678 100644 --- a/ecstore/src/config/mod.rs +++ b/ecstore/src/config/mod.rs @@ -1,12 +1,14 @@ pub mod com; #[allow(dead_code)] pub mod heal; +mod notify; pub mod storageclass; use crate::error::Result; use crate::store::ECStore; use com::{STORAGE_CLASS_SUB_SYS, lookup_configs, read_config_without_migrate}; use lazy_static::lazy_static; +use rustfs_config::DEFAULT_DELIMITER; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::{Arc, OnceLock}; @@ -120,11 +122,11 @@ impl Config { for (k, v) in defaults.iter() { if !self.0.contains_key(k) { let mut default = HashMap::new(); - default.insert("_".to_owned(), v.clone()); + default.insert(DEFAULT_DELIMITER.to_owned(), v.clone()); self.0.insert(k.clone(), default); - } else if !self.0[k].contains_key("_") { + } else if !self.0[k].contains_key(DEFAULT_DELIMITER) { if let Some(m) = self.0.get_mut(k) { - m.insert("_".to_owned(), v.clone()); + m.insert(DEFAULT_DELIMITER.to_owned(), v.clone()); } } } @@ -160,7 +162,16 @@ pub fn register_default_kvs(kvs: HashMap) { pub fn init() { let mut kvs = HashMap::new(); + // Load storageclass default configuration kvs.insert(STORAGE_CLASS_SUB_SYS.to_owned(), storageclass::DefaultKVS.clone()); - // TODO: other default + // New: Loading default configurations for notify_webhook and notify_mqtt + // Referring subsystem names through constants to improve the readability and maintainability of the code + kvs.insert( + rustfs_config::notify::NOTIFY_WEBHOOK_SUB_SYS.to_owned(), + notify::DefaultWebhookKVS.clone(), + ); + kvs.insert(rustfs_config::notify::NOTIFY_MQTT_SUB_SYS.to_owned(), notify::DefaultMqttKVS.clone()); + + // Register all default configurations register_default_kvs(kvs) } diff --git a/ecstore/src/config/notify.rs b/ecstore/src/config/notify.rs new file mode 100644 index 00000000..72f42123 --- /dev/null +++ b/ecstore/src/config/notify.rs @@ -0,0 +1,37 @@ +use crate::config::{ENABLE_KEY, ENABLE_OFF, KV, KVS}; +use lazy_static::lazy_static; +use rustfs_config::notify::{ + DEFAULT_DIR, DEFAULT_LIMIT, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, + MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, + WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, +}; + +lazy_static! { + /// The default configuration collection of webhooks, + /// Use lazy_static! to ensure that these configurations are initialized only once during the program life cycle, enabling high-performance lazy loading. + pub static ref DefaultWebhookKVS: KVS = KVS(vec![ + KV { key: ENABLE_KEY.to_owned(), value: ENABLE_OFF.to_owned(), hidden_if_empty: false }, + KV { key: WEBHOOK_ENDPOINT.to_owned(), value: "".to_owned(), hidden_if_empty: false }, + // Sensitive information such as authentication tokens is hidden when the value is empty, enhancing security + KV { key: WEBHOOK_AUTH_TOKEN.to_owned(), value: "".to_owned(), hidden_if_empty: true }, + KV { key: WEBHOOK_QUEUE_LIMIT.to_owned(), value: DEFAULT_LIMIT.to_string().to_owned(), hidden_if_empty: false }, + KV { key: WEBHOOK_QUEUE_DIR.to_owned(), value: DEFAULT_DIR.to_owned(), hidden_if_empty: false }, + KV { key: WEBHOOK_CLIENT_CERT.to_owned(), value: "".to_owned(), hidden_if_empty: false }, + KV { key: WEBHOOK_CLIENT_KEY.to_owned(), value: "".to_owned(), hidden_if_empty: false }, + ]); + + /// MQTT's default configuration collection + pub static ref DefaultMqttKVS: KVS = KVS(vec![ + KV { key: ENABLE_KEY.to_owned(), value: ENABLE_OFF.to_owned(), hidden_if_empty: false }, + KV { key: MQTT_BROKER.to_owned(), value: "".to_owned(), hidden_if_empty: false }, + KV { key: MQTT_TOPIC.to_owned(), value: "".to_owned(), hidden_if_empty: false }, + // Sensitive information such as passwords are hidden when the value is empty + KV { key: MQTT_PASSWORD.to_owned(), value: "".to_owned(), hidden_if_empty: true }, + KV { key: MQTT_USERNAME.to_owned(), value: "".to_owned(), hidden_if_empty: false }, + KV { key: MQTT_QOS.to_owned(), value: "0".to_owned(), hidden_if_empty: false }, + KV { key: MQTT_KEEP_ALIVE_INTERVAL.to_owned(), value: "0s".to_owned(), hidden_if_empty: false }, + KV { key: MQTT_RECONNECT_INTERVAL.to_owned(), value: "0s".to_owned(), hidden_if_empty: false }, + KV { key: MQTT_QUEUE_DIR.to_owned(), value: DEFAULT_DIR.to_owned(), hidden_if_empty: false }, + KV { key: MQTT_QUEUE_LIMIT.to_owned(), value: DEFAULT_LIMIT.to_string().to_owned(), hidden_if_empty: false }, + ]); +} diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 6e53df5d..5300e9de 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -57,7 +57,7 @@ protos.workspace = true query = { workspace = true } regex = { workspace = true } rmp-serde.workspace = true -rustfs-config = { workspace = true, features = ["constants"] } +rustfs-config = { workspace = true, features = ["constants", "notify"] } rustfs-notify = { workspace = true } rustfs-obs = { workspace = true } rustfs-utils = { workspace = true, features = ["full"] } diff --git a/rustfs/src/event.rs b/rustfs/src/event.rs index 2a855275..6d5712cd 100644 --- a/rustfs/src/event.rs +++ b/rustfs/src/event.rs @@ -1,4 +1,5 @@ use ecstore::config::GLOBAL_ServerConfig; +use rustfs_config::DEFAULT_DELIMITER; use tracing::{error, info, instrument}; #[instrument] @@ -14,8 +15,15 @@ pub(crate) async fn init_event_notifier() { } }; + info!("Global server configuration loaded successfully. config: {:?}", server_config); // 2. Check if the notify subsystem exists in the configuration, and skip initialization if it doesn't - if server_config.get_value("notify", "_").is_none() { + if server_config + .get_value(rustfs_config::notify::NOTIFY_MQTT_SUB_SYS, DEFAULT_DELIMITER) + .is_none() + || server_config + .get_value(rustfs_config::notify::NOTIFY_WEBHOOK_SUB_SYS, DEFAULT_DELIMITER) + .is_none() + { info!("'notify' subsystem not configured, skipping event notifier initialization."); return; } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 6f6b1fc5..7deac4fa 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -634,7 +634,7 @@ fn handle_connection_error(err: &(dyn std::error::Error + 'static)) { } else { error!("Unknown hyper error:{:?}", hyper_err); } - } else if let Some(io_err) = err.downcast_ref::() { + } else if let Some(io_err) = err.downcast_ref::() { error!("Unknown connection IO error:{}", io_err); } else { error!("Unknown connection error type:{:?}", err); diff --git a/scripts/notify.sh b/scripts/notify.sh new file mode 100755 index 00000000..18fb70d1 --- /dev/null +++ b/scripts/notify.sh @@ -0,0 +1,17 @@ +#!/bin/bash -e + +current_dir=$(pwd) +echo "Current directory: $current_dir" + +if [ -z "$RUST_LOG" ]; then + export RUST_BACKTRACE=1 + export RUST_LOG="rustfs=info,ecstore=info,s3s=debug,iam=info" +fi + +# deploy/logs/notify directory +echo "Creating log directory if it does not exist..." +mkdir -p "$current_dir/deploy/logs/notify" + +# 启动 webhook 服务器 +echo "Starting webhook server..." +cargo run --example webhook -p rustfs-notify & \ No newline at end of file diff --git a/scripts/run.sh b/scripts/run.sh index 9f7ea053..5ec57595 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -12,6 +12,7 @@ if [ -z "$SKIP_BUILD" ]; then fi current_dir=$(pwd) +echo "Current directory: $current_dir" # mkdir -p ./target/volume/test mkdir -p ./target/volume/test{0..4} @@ -19,7 +20,7 @@ mkdir -p ./target/volume/test{0..4} if [ -z "$RUST_LOG" ]; then export RUST_BACKTRACE=1 - export RUST_LOG="rustfs=info,ecstore=info,s3s=debug,iam=info" + export RUST_LOG="rustfs=debug,ecstore=info,s3s=debug,iam=info" fi # export RUSTFS_ERASURE_SET_DRIVE_COUNT=5 @@ -45,12 +46,12 @@ export RUSTFS_OBS_ENDPOINT=http://localhost:4317 # OpenTelemetry Collector 的 #export RUSTFS_OBS_ENVIRONMENT=develop # 环境名称 export RUSTFS_OBS_LOGGER_LEVEL=debug # 日志级别,支持 trace, debug, info, warn, error export RUSTFS_OBS_LOCAL_LOGGING_ENABLED=true # 是否启用本地日志记录 -export RUSTFS_OBS_LOG_DIRECTORY="./deploy/logs" # Log directory +export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory export RUSTFS_OBS_LOG_ROTATION_TIME="minute" # Log rotation time unit, can be "second", "minute", "hour", "day" export RUSTFS_OBS_LOG_ROTATION_SIZE_MB=1 # Log rotation size in MB # -export RUSTFS_SINKS_FILE_PATH=./deploy/logs/rustfs.log +export RUSTFS_SINKS_FILE_PATH="$current_dir/deploy/logs/rustfs.log" export RUSTFS_SINKS_FILE_BUFFER_SIZE=12 export RUSTFS_SINKS_FILE_FLUSH_INTERVAL_MS=1000 export RUSTFS_SINKS_FILE_FLUSH_THRESHOLD=100 @@ -74,6 +75,12 @@ export OTEL_INSTRUMENTATION_VERSION="0.1.1" export OTEL_INSTRUMENTATION_SCHEMA_URL="https://opentelemetry.io/schemas/1.31.0" export OTEL_INSTRUMENTATION_ATTRIBUTES="env=production" +# notify +export RUSTFS_NOTIFY_WEBHOOK_ENABLE="true" # 是否启用 webhook 通知 +export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT="http://[::]:3020/webhook" # webhook 通知地址 +export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/notify" + + export RUSTFS_NS_SCANNER_INTERVAL=60 # 对象扫描间隔时间,单位为秒 # exportRUSTFS_SKIP_BACKGROUND_TASK=true @@ -87,6 +94,6 @@ if [ -n "$1" ]; then fi # 启动 webhook 服务器 -#cargo run --example webhook -p rustfs-event & +#cargo run --example webhook -p rustfs-notify & # 启动主服务 cargo run --bin rustfs \ No newline at end of file