diff --git a/.docker/observability/config/obs-multi.toml b/.docker/observability/config/obs-multi.toml index e4ea037b..2637a401 100644 --- a/.docker/observability/config/obs-multi.toml +++ b/.docker/observability/config/obs-multi.toml @@ -9,26 +9,26 @@ environments = "production" logger_level = "debug" local_logging_enabled = true -[sinks] -[sinks.kafka] # Kafka sink is disabled by default -enabled = false -bootstrap_servers = "localhost:9092" -topic = "logs" -batch_size = 100 # Default is 100 if not specified -batch_timeout_ms = 1000 # Default is 1000ms if not specified +#[[sinks]] +#type = "Kafka" +#brokers = "localhost:9092" +#topic = "logs" +#batch_size = 100 # Default is 100 if not specified +#batch_timeout_ms = 1000 # Default is 1000ms if not specified +# +#[[sinks]] +#type = "Webhook" +#endpoint = "http://localhost:8080/webhook" +#auth_token = "" +#batch_size = 100 # Default is 3 if not specified +#batch_timeout_ms = 1000 # Default is 100ms if not specified -[sinks.webhook] -enabled = false -endpoint = "http://localhost:8080/webhook" -auth_token = "" -batch_size = 100 # Default is 3 if not specified -batch_timeout_ms = 1000 # Default is 100ms if not specified - -[sinks.file] -enabled = true -path = "/root/data/logs/app.log" -batch_size = 10 -batch_timeout_ms = 1000 # Default is 8192 bytes if not specified +[[sinks]] +type = "File" +path = "/root/data/logs/rustfs.log" +buffer_size = 100 # Default is 8192 bytes if not specified +flush_interval_ms = 1000 +flush_threshold = 100 [logger] queue_capacity = 10 \ No newline at end of file diff --git a/.docker/observability/config/obs.toml b/.docker/observability/config/obs.toml index f77c25d8..58069fc5 100644 --- a/.docker/observability/config/obs.toml +++ b/.docker/observability/config/obs.toml @@ -9,26 +9,26 @@ environments = "production" logger_level = "debug" local_logging_enabled = true -[sinks] -[sinks.kafka] # Kafka sink is disabled by default -enabled = false -bootstrap_servers = "localhost:9092" -topic = "logs" -batch_size = 100 # Default is 100 if not specified -batch_timeout_ms = 1000 # Default is 1000ms if not specified +#[[sinks]] +#type = "Kafka" +#brokers = "localhost:9092" +#topic = "logs" +#batch_size = 100 # Default is 100 if not specified +#batch_timeout_ms = 1000 # Default is 1000ms if not specified +# +#[[sinks]] +#type = "Webhook" +#endpoint = "http://localhost:8080/webhook" +#auth_token = "" +#batch_size = 100 # Default is 3 if not specified +#batch_timeout_ms = 1000 # Default is 100ms if not specified -[sinks.webhook] -enabled = false -endpoint = "http://localhost:8080/webhook" -auth_token = "" -batch_size = 100 # Default is 3 if not specified -batch_timeout_ms = 1000 # Default is 100ms if not specified - -[sinks.file] -enabled = true -path = "/root/data/logs/app.log" -batch_size = 10 -batch_timeout_ms = 1000 # Default is 8192 bytes if not specified +[[sinks]] +type = "File" +path = "/root/data/logs/rustfs.log" +buffer_size = 100 # Default is 8192 bytes if not specified +flush_interval_ms = 1000 +flush_threshold = 100 [logger] queue_capacity = 10 \ No newline at end of file diff --git a/.gitignore b/.gitignore index 8f19d5fc..7ccca205 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ cli/rustfs-gui/embedded-rustfs/rustfs deploy/config/obs.toml *.log deploy/certs/* -*jsonl \ No newline at end of file +*jsonl +.env \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index cce1d3bc..01ab7a99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1746,7 +1746,7 @@ dependencies = [ "pbkdf2", "rand 0.8.5", "serde_json", - "sha2 0.10.8", + "sha2 0.10.9", "test-case", "thiserror 2.0.12", "time", @@ -2141,7 +2141,7 @@ dependencies = [ "md-5", "rand 0.8.5", "regex", - "sha2 0.10.8", + "sha2 0.10.9", "unicode-segmentation", "uuid", ] @@ -3020,6 +3020,12 @@ dependencies = [ "const-random", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "dpi" version = "0.1.1" @@ -4807,7 +4813,7 @@ dependencies = [ "nom 8.0.0", "once_cell", "serde", - "sha2 0.10.8", + "sha2 0.10.9", "thiserror 2.0.12", "uuid", ] @@ -6120,7 +6126,7 @@ checksum = "7f9f832470494906d1fca5329f8ab5791cc60beb230c74815dff541cbd2b5ca0" dependencies = [ "once_cell", "pest", - "sha2 0.10.8", + "sha2 0.10.9", ] [[package]] @@ -7209,7 +7215,7 @@ version = "8.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d55b95147fe01265d06b3955db798bdaed52e60e2211c41137701b3aba8e21" dependencies = [ - "sha2 0.10.8", + "sha2 0.10.9", "walkdir", ] @@ -7340,6 +7346,7 @@ dependencies = [ "async-trait", "axum", "config", + "dotenvy", "http", "rdkafka", "reqwest", @@ -7371,7 +7378,7 @@ dependencies = [ "rust-embed", "serde", "serde_json", - "sha2 0.10.8", + "sha2 0.10.9", "tokio", "tracing-appender", "tracing-subscriber", @@ -7394,6 +7401,7 @@ dependencies = [ "opentelemetry_sdk", "rdkafka", "reqwest", + "rustfs-config", "serde", "serde_json", "smallvec", @@ -7919,9 +7927,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.10.8" +version = "0.10.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", "cpufeatures", @@ -10218,7 +10226,7 @@ dependencies = [ "once_cell", "percent-encoding", "raw-window-handle 0.6.2", - "sha2 0.10.8", + "sha2 0.10.9", "soup3", "tao-macros", "thiserror 1.0.69", diff --git a/Cargo.toml b/Cargo.toml index 2adf24d7..a18ac9d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,7 +157,7 @@ serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" serde_urlencoded = "0.7.1" serde_with = "3.12.0" -sha2 = "0.10.8" +sha2 = "0.10.9" smallvec = { version = "1.15.0", features = ["serde"] } snafu = "0.8.5" socket2 = "0.5.9" diff --git a/README.md b/README.md index e4818d4d..9bbb8be9 100644 --- a/README.md +++ b/README.md @@ -59,30 +59,12 @@ export RUSTFS_ADDRESS="0.0.0.0:9000" export RUSTFS_CONSOLE_ENABLE=true export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9001" -# Observability config (option 1: config file) +# Observability config export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml" -# Observability config (option 2: environment variables) -export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317 -export RUSTFS__OBSERVABILITY__USE_STDOUT=true -export RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0 -export RUSTFS__OBSERVABILITY__METER_INTERVAL=30 -export RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs -export RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0 -export RUSTFS__OBSERVABILITY__ENVIRONMENT=develop -export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=info -export RUSTFS__OBSERVABILITY__LOCAL_LOGGING_ENABLED=true +# Event message configuration +#export RUSTFS_EVENT_CONFIG="./deploy/config/event.toml" -# Logging sinks -export RUSTFS__SINKS__FILE__ENABLED=true -export RUSTFS__SINKS__FILE__PATH="./deploy/logs/rustfs.log" -export RUSTFS__SINKS__WEBHOOK__ENABLED=false -export RUSTFS__SINKS__WEBHOOK__ENDPOINT="" -export RUSTFS__SINKS__WEBHOOK__AUTH_TOKEN="" -export RUSTFS__SINKS__KAFKA__ENABLED=false -export RUSTFS__SINKS__KAFKA__BOOTSTRAP_SERVERS="" -export RUSTFS__SINKS__KAFKA__TOPIC="" -export RUSTFS__LOGGER__QUEUE_CAPACITY=10 ``` #### Start the service diff --git a/README_ZH.md b/README_ZH.md index 10f05bbe..89fe320a 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -59,30 +59,11 @@ export RUSTFS_ADDRESS="0.0.0.0:9000" export RUSTFS_CONSOLE_ENABLE=true export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9001" -# 可观测性配置(方式一:配置文件) +# 可观测性配置 export RUSTFS_OBS_CONFIG="./deploy/config/obs.toml" -# 可观测性配置(方式二:环境变量) -export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317 -export RUSTFS__OBSERVABILITY__USE_STDOUT=true -export RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0 -export RUSTFS__OBSERVABILITY__METER_INTERVAL=30 -export RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs -export RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0 -export RUSTFS__OBSERVABILITY__ENVIRONMENT=develop -export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=info -export RUSTFS__OBSERVABILITY__LOCAL_LOGGING_ENABLED=true - -# 日志接收器 -export RUSTFS__SINKS__FILE__ENABLED=true -export RUSTFS__SINKS__FILE__PATH="./deploy/logs/rustfs.log" -export RUSTFS__SINKS__WEBHOOK__ENABLED=false -export RUSTFS__SINKS__WEBHOOK__ENDPOINT="" -export RUSTFS__SINKS__WEBHOOK__AUTH_TOKEN="" -export RUSTFS__SINKS__KAFKA__ENABLED=false -export RUSTFS__SINKS__KAFKA__BOOTSTRAP_SERVERS="" -export RUSTFS__SINKS__KAFKA__TOPIC="" -export RUSTFS__LOGGER__QUEUE_CAPACITY=10 +# 事件消息配置 +#export RUSTFS_EVENT_CONFIG="./deploy/config/event.toml" ``` #### 启动服务 diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index bebcbfd0..3d427f51 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -1,17 +1,17 @@ -use crate::event::config::EventConfig; +use crate::event::config::NotifierConfig; use crate::ObservabilityConfig; /// RustFs configuration pub struct RustFsConfig { pub observability: ObservabilityConfig, - pub event: EventConfig, + pub event: NotifierConfig, } impl RustFsConfig { pub fn new() -> Self { Self { observability: ObservabilityConfig::new(), - event: EventConfig::new(), + event: NotifierConfig::new(), } } } diff --git a/crates/config/src/constants/app.rs b/crates/config/src/constants/app.rs index 294c7677..467905f7 100644 --- a/crates/config/src/constants/app.rs +++ b/crates/config/src/constants/app.rs @@ -14,6 +14,25 @@ pub const VERSION: &str = "0.0.1"; /// Environment variable: RUSTFS_LOG_LEVEL pub const DEFAULT_LOG_LEVEL: &str = "info"; +/// Default configuration use stdout +/// Default value: true +pub(crate) const USE_STDOUT: bool = true; + +/// Default configuration sample ratio +/// Default value: 1.0 +pub(crate) const SAMPLE_RATIO: f64 = 1.0; +/// Default configuration meter interval +/// Default value: 30 +pub(crate) const METER_INTERVAL: u64 = 30; + +/// Default configuration service version +/// Default value: 0.0.1 +pub(crate) const SERVICE_VERSION: &str = "0.0.1"; + +/// Default configuration environment +/// Default value: production +pub(crate) const ENVIRONMENT: &str = "production"; + /// maximum number of connections /// This is the maximum number of connections that the server will accept. /// This is used to limit the number of connections to the server. diff --git a/crates/config/src/event/adapters.rs b/crates/config/src/event/adapters.rs new file mode 100644 index 00000000..d66bf19e --- /dev/null +++ b/crates/config/src/event/adapters.rs @@ -0,0 +1,27 @@ +use crate::event::kafka::KafkaAdapter; +use crate::event::mqtt::MqttAdapter; +use crate::event::webhook::WebhookAdapter; +use serde::{Deserialize, Serialize}; + +/// Configuration for the notification system. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum AdapterConfig { + Webhook(WebhookAdapter), + Kafka(KafkaAdapter), + Mqtt(MqttAdapter), +} + +impl AdapterConfig { + /// create a new configuration with default values + pub fn new() -> Self { + Self::Webhook(WebhookAdapter::new()) + } +} + +impl Default for AdapterConfig { + /// create a new configuration with default values + fn default() -> Self { + Self::new() + } +} diff --git a/crates/config/src/event/config.rs b/crates/config/src/event/config.rs index a8a43068..ea364c9a 100644 --- a/crates/config/src/event/config.rs +++ b/crates/config/src/event/config.rs @@ -1,23 +1,43 @@ -/// Event configuration module -pub struct EventConfig { - pub event_type: String, - pub event_source: String, - pub event_destination: String, +use crate::event::adapters::AdapterConfig; +use serde::{Deserialize, Serialize}; +use std::env; + +#[allow(dead_code)] +const DEFAULT_CONFIG_FILE: &str = "event"; + +/// Configuration for the notification system. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NotifierConfig { + #[serde(default = "default_store_path")] + pub store_path: String, + #[serde(default = "default_channel_capacity")] + pub channel_capacity: usize, + pub adapters: Vec, } -impl EventConfig { - /// Creates a new instance of `EventConfig` with default values. - pub fn new() -> Self { - Self { - event_type: "default".to_string(), - event_source: "default".to_string(), - event_destination: "default".to_string(), - } - } -} - -impl Default for EventConfig { +impl Default for NotifierConfig { fn default() -> Self { Self::new() } } + +impl NotifierConfig { + /// create a new configuration with default values + pub fn new() -> Self { + Self { + store_path: default_store_path(), + channel_capacity: default_channel_capacity(), + adapters: vec![AdapterConfig::new()], + } + } +} + +/// Provide temporary directories as default storage paths +fn default_store_path() -> String { + env::temp_dir().join("event-notification").to_string_lossy().to_string() +} + +/// Provides the recommended default channel capacity for high concurrency systems +fn default_channel_capacity() -> usize { + 10000 // Reasonable default values for high concurrency systems +} diff --git a/crates/config/src/event/event.rs b/crates/config/src/event/event.rs deleted file mode 100644 index 70a10369..00000000 --- a/crates/config/src/event/event.rs +++ /dev/null @@ -1,17 +0,0 @@ -/// Event configuration module -pub struct EventConfig { - pub event_type: String, - pub event_source: String, - pub event_destination: String, -} - -impl EventConfig { - /// Creates a new instance of `EventConfig` with default values. - pub fn new() -> Self { - Self { - event_type: "default".to_string(), - event_source: "default".to_string(), - event_destination: "default".to_string(), - } - } -} diff --git a/crates/config/src/event/kafka.rs b/crates/config/src/event/kafka.rs new file mode 100644 index 00000000..16411374 --- /dev/null +++ b/crates/config/src/event/kafka.rs @@ -0,0 +1,29 @@ +use serde::{Deserialize, Serialize}; + +/// Configuration for the Kafka adapter. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct KafkaAdapter { + pub brokers: String, + pub topic: String, + pub max_retries: u32, + pub timeout: u64, +} + +impl KafkaAdapter { + /// create a new configuration with default values + pub fn new() -> Self { + Self { + brokers: "localhost:9092".to_string(), + topic: "kafka_topic".to_string(), + max_retries: 3, + timeout: 1000, + } + } +} + +impl Default for KafkaAdapter { + /// create a new configuration with default values + fn default() -> Self { + Self::new() + } +} diff --git a/crates/config/src/event/mod.rs b/crates/config/src/event/mod.rs index 60280924..80dfd45e 100644 --- a/crates/config/src/event/mod.rs +++ b/crates/config/src/event/mod.rs @@ -1,2 +1,5 @@ +pub(crate) mod adapters; pub(crate) mod config; -pub(crate) mod event; +pub(crate) mod kafka; +pub(crate) mod mqtt; +pub(crate) mod webhook; diff --git a/crates/config/src/event/mqtt.rs b/crates/config/src/event/mqtt.rs new file mode 100644 index 00000000..ee983532 --- /dev/null +++ b/crates/config/src/event/mqtt.rs @@ -0,0 +1,31 @@ +use serde::{Deserialize, Serialize}; + +/// Configuration for the MQTT adapter. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MqttAdapter { + pub broker: String, + pub port: u16, + pub client_id: String, + pub topic: String, + pub max_retries: u32, +} + +impl MqttAdapter { + /// create a new configuration with default values + pub fn new() -> Self { + Self { + broker: "localhost".to_string(), + port: 1883, + client_id: "mqtt_client".to_string(), + topic: "mqtt_topic".to_string(), + max_retries: 3, + } + } +} + +impl Default for MqttAdapter { + /// create a new configuration with default values + fn default() -> Self { + Self::new() + } +} diff --git a/crates/config/src/event/webhook.rs b/crates/config/src/event/webhook.rs new file mode 100644 index 00000000..95b3adad --- /dev/null +++ b/crates/config/src/event/webhook.rs @@ -0,0 +1,51 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Configuration for the notification system. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WebhookAdapter { + pub endpoint: String, + pub auth_token: Option, + pub custom_headers: Option>, + pub max_retries: u32, + pub timeout: u64, +} + +impl WebhookAdapter { + /// verify that the configuration is valid + pub fn validate(&self) -> Result<(), String> { + // verify that endpoint cannot be empty + if self.endpoint.trim().is_empty() { + return Err("Webhook endpoint cannot be empty".to_string()); + } + + // verification timeout must be reasonable + if self.timeout == 0 { + return Err("Webhook timeout must be greater than 0".to_string()); + } + + // Verify that the maximum number of retry is reasonable + if self.max_retries > 10 { + return Err("Maximum retry count cannot exceed 10".to_string()); + } + + Ok(()) + } + + /// Get the default configuration + pub fn new() -> Self { + Self { + endpoint: "".to_string(), + auth_token: None, + custom_headers: Some(HashMap::new()), + max_retries: 3, + timeout: 1000, + } + } +} + +impl Default for WebhookAdapter { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index fd7b8bec..44a9fe3c 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -7,3 +7,5 @@ mod observability; pub use config::RustFsConfig; pub use constants::app::*; + +pub use event::config::NotifierConfig; diff --git a/crates/config/src/observability/config.rs b/crates/config/src/observability/config.rs index 361f9a6c..b43f3646 100644 --- a/crates/config/src/observability/config.rs +++ b/crates/config/src/observability/config.rs @@ -1,13 +1,13 @@ use crate::observability::logger::LoggerConfig; use crate::observability::otel::OtelConfig; use crate::observability::sink::SinkConfig; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; /// Observability configuration -#[derive(Debug, Deserialize, Clone)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct ObservabilityConfig { pub otel: OtelConfig, - pub sinks: SinkConfig, + pub sinks: Vec, pub logger: Option, } @@ -15,7 +15,7 @@ impl ObservabilityConfig { pub fn new() -> Self { Self { otel: OtelConfig::new(), - sinks: SinkConfig::new(), + sinks: vec![SinkConfig::new()], logger: Some(LoggerConfig::new()), } } diff --git a/crates/config/src/observability/file.rs b/crates/config/src/observability/file.rs new file mode 100644 index 00000000..6ec74c8b --- /dev/null +++ b/crates/config/src/observability/file.rs @@ -0,0 +1,59 @@ +use serde::{Deserialize, Serialize}; +use std::env; + +/// File sink configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FileSink { + pub path: String, + #[serde(default = "default_buffer_size")] + pub buffer_size: Option, + #[serde(default = "default_flush_interval_ms")] + pub flush_interval_ms: Option, + #[serde(default = "default_flush_threshold")] + pub flush_threshold: Option, +} + +impl FileSink { + pub fn new() -> Self { + Self { + path: env::var("RUSTFS_SINKS_FILE_PATH") + .ok() + .filter(|s| !s.trim().is_empty()) + .unwrap_or_else(default_path), + buffer_size: default_buffer_size(), + flush_interval_ms: default_flush_interval_ms(), + flush_threshold: default_flush_threshold(), + } + } +} + +impl Default for FileSink { + fn default() -> Self { + Self::new() + } +} + +fn default_buffer_size() -> Option { + Some(8192) +} +fn default_flush_interval_ms() -> Option { + Some(1000) +} +fn default_flush_threshold() -> Option { + Some(100) +} + +fn default_path() -> String { + let temp_dir = env::temp_dir().join("rustfs"); + + if let Err(e) = std::fs::create_dir_all(&temp_dir) { + eprintln!("Failed to create log directory: {}", e); + return "rustfs/rustfs.log".to_string(); + } + + temp_dir + .join("rustfs.log") + .to_str() + .unwrap_or("rustfs/rustfs.log") + .to_string() +} diff --git a/crates/config/src/observability/file_sink.rs b/crates/config/src/observability/file_sink.rs deleted file mode 100644 index d475376e..00000000 --- a/crates/config/src/observability/file_sink.rs +++ /dev/null @@ -1,25 +0,0 @@ -use serde::Deserialize; - -/// File sink configuration -#[derive(Debug, Deserialize, Clone)] -pub struct FileSinkConfig { - pub path: String, - pub max_size: u64, - pub max_backups: u64, -} - -impl FileSinkConfig { - pub fn new() -> Self { - Self { - path: "".to_string(), - max_size: 0, - max_backups: 0, - } - } -} - -impl Default for FileSinkConfig { - fn default() -> Self { - Self::new() - } -} diff --git a/crates/config/src/observability/kafka.rs b/crates/config/src/observability/kafka.rs new file mode 100644 index 00000000..104ab1da --- /dev/null +++ b/crates/config/src/observability/kafka.rs @@ -0,0 +1,36 @@ +use serde::{Deserialize, Serialize}; + +/// Kafka sink configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct KafkaSink { + pub brokers: String, + pub topic: String, + #[serde(default = "default_batch_size")] + pub batch_size: Option, + #[serde(default = "default_batch_timeout_ms")] + pub batch_timeout_ms: Option, +} + +impl KafkaSink { + pub fn new() -> Self { + Self { + brokers: "localhost:9092".to_string(), + topic: "rustfs".to_string(), + batch_size: default_batch_size(), + batch_timeout_ms: default_batch_timeout_ms(), + } + } +} + +impl Default for KafkaSink { + fn default() -> Self { + Self::new() + } +} + +fn default_batch_size() -> Option { + Some(100) +} +fn default_batch_timeout_ms() -> Option { + Some(1000) +} diff --git a/crates/config/src/observability/kafka_sink.rs b/crates/config/src/observability/kafka_sink.rs deleted file mode 100644 index f40a979b..00000000 --- a/crates/config/src/observability/kafka_sink.rs +++ /dev/null @@ -1,23 +0,0 @@ -use serde::Deserialize; - -/// Kafka sink configuration -#[derive(Debug, Deserialize, Clone)] -pub struct KafkaSinkConfig { - pub brokers: Vec, - pub topic: String, -} - -impl KafkaSinkConfig { - pub fn new() -> Self { - Self { - brokers: vec!["localhost:9092".to_string()], - topic: "rustfs".to_string(), - } - } -} - -impl Default for KafkaSinkConfig { - fn default() -> Self { - Self::new() - } -} diff --git a/crates/config/src/observability/logger.rs b/crates/config/src/observability/logger.rs index f6c70682..68a0bff4 100644 --- a/crates/config/src/observability/logger.rs +++ b/crates/config/src/observability/logger.rs @@ -1,7 +1,7 @@ -use serde::Deserialize; +use serde::{Deserialize, Serialize}; /// Logger configuration -#[derive(Debug, Deserialize, Clone)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct LoggerConfig { pub queue_capacity: Option, } diff --git a/crates/config/src/observability/mod.rs b/crates/config/src/observability/mod.rs index 65d9933b..216c074e 100644 --- a/crates/config/src/observability/mod.rs +++ b/crates/config/src/observability/mod.rs @@ -1,8 +1,7 @@ pub(crate) mod config; -pub(crate) mod file_sink; -pub(crate) mod kafka_sink; +pub(crate) mod file; +pub(crate) mod kafka; pub(crate) mod logger; -pub(crate) mod observability; pub(crate) mod otel; pub(crate) mod sink; -pub(crate) mod webhook_sink; +pub(crate) mod webhook; diff --git a/crates/config/src/observability/observability.rs b/crates/config/src/observability/observability.rs deleted file mode 100644 index 17b4e070..00000000 --- a/crates/config/src/observability/observability.rs +++ /dev/null @@ -1,22 +0,0 @@ -use crate::observability::logger::LoggerConfig; -use crate::observability::otel::OtelConfig; -use crate::observability::sink::SinkConfig; -use serde::Deserialize; - -/// Observability configuration -#[derive(Debug, Deserialize, Clone)] -pub struct ObservabilityConfig { - pub otel: OtelConfig, - pub sinks: SinkConfig, - pub logger: Option, -} - -impl ObservabilityConfig { - pub fn new() -> Self { - Self { - otel: OtelConfig::new(), - sinks: SinkConfig::new(), - logger: Some(LoggerConfig::new()), - } - } -} diff --git a/crates/config/src/observability/otel.rs b/crates/config/src/observability/otel.rs index 4ac6618b..77785c68 100644 --- a/crates/config/src/observability/otel.rs +++ b/crates/config/src/observability/otel.rs @@ -1,22 +1,25 @@ -use serde::Deserialize; +use crate::constants::app::{ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT}; +use crate::{APP_NAME, DEFAULT_LOG_LEVEL}; +use serde::{Deserialize, Serialize}; +use std::env; /// OpenTelemetry configuration -#[derive(Debug, Deserialize, Clone)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct OtelConfig { - pub endpoint: String, - pub service_name: String, - pub service_version: String, - pub resource_attributes: Vec, + pub endpoint: String, // Endpoint for metric collection + pub use_stdout: Option, // Output to stdout + pub sample_ratio: Option, // Trace sampling ratio + pub meter_interval: Option, // Metric collection interval + pub service_name: Option, // Service name + pub service_version: Option, // Service version + pub environment: Option, // Environment + pub logger_level: Option, // Logger level + pub local_logging_enabled: Option, // Local logging enabled } impl OtelConfig { pub fn new() -> Self { - Self { - endpoint: "http://localhost:4317".to_string(), - service_name: "rustfs".to_string(), - service_version: "0.1.0".to_string(), - resource_attributes: vec![], - } + extract_otel_config_from_env() } } @@ -25,3 +28,42 @@ impl Default for OtelConfig { Self::new() } } + +// Helper function: Extract observable configuration from environment variables +fn extract_otel_config_from_env() -> OtelConfig { + OtelConfig { + endpoint: env::var("RUSTFS_OBSERVABILITY_ENDPOINT").unwrap_or_else(|_| "".to_string()), + use_stdout: env::var("RUSTFS_OBSERVABILITY_USE_STDOUT") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(USE_STDOUT)), + sample_ratio: env::var("RUSTFS_OBSERVABILITY_SAMPLE_RATIO") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(SAMPLE_RATIO)), + meter_interval: env::var("RUSTFS_OBSERVABILITY_METER_INTERVAL") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(METER_INTERVAL)), + service_name: env::var("RUSTFS_OBSERVABILITY_SERVICE_NAME") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(APP_NAME.to_string())), + service_version: env::var("RUSTFS_OBSERVABILITY_SERVICE_VERSION") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(SERVICE_VERSION.to_string())), + environment: env::var("RUSTFS_OBSERVABILITY_ENVIRONMENT") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(ENVIRONMENT.to_string())), + logger_level: env::var("RUSTFS_OBSERVABILITY_LOGGER_LEVEL") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(DEFAULT_LOG_LEVEL.to_string())), + local_logging_enabled: env::var("RUSTFS_OBSERVABILITY_LOCAL_LOGGING_ENABLED") + .ok() + .and_then(|v| v.parse().ok()) + .or(Some(false)), + } +} diff --git a/crates/config/src/observability/sink.rs b/crates/config/src/observability/sink.rs index dcb37fa3..9339e06e 100644 --- a/crates/config/src/observability/sink.rs +++ b/crates/config/src/observability/sink.rs @@ -1,23 +1,20 @@ -use crate::observability::file_sink::FileSinkConfig; -use crate::observability::kafka_sink::KafkaSinkConfig; -use crate::observability::webhook_sink::WebhookSinkConfig; -use serde::Deserialize; +use crate::observability::file::FileSink; +use crate::observability::kafka::KafkaSink; +use crate::observability::webhook::WebhookSink; +use serde::{Deserialize, Serialize}; /// Sink configuration -#[derive(Debug, Deserialize, Clone)] -pub struct SinkConfig { - pub kafka: Option, - pub webhook: Option, - pub file: Option, +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum SinkConfig { + Kafka(KafkaSink), + Webhook(WebhookSink), + File(FileSink), } impl SinkConfig { pub fn new() -> Self { - Self { - kafka: None, - webhook: None, - file: Some(FileSinkConfig::new()), - } + Self::File(FileSink::new()) } } diff --git a/crates/config/src/observability/webhook.rs b/crates/config/src/observability/webhook.rs new file mode 100644 index 00000000..8e1d32f8 --- /dev/null +++ b/crates/config/src/observability/webhook.rs @@ -0,0 +1,39 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Webhook sink configuration +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct WebhookSink { + pub endpoint: String, + pub auth_token: String, + pub headers: Option>, + #[serde(default = "default_max_retries")] + pub max_retries: Option, + #[serde(default = "default_retry_delay_ms")] + pub retry_delay_ms: Option, +} + +impl WebhookSink { + pub fn new() -> Self { + Self { + endpoint: "".to_string(), + auth_token: "".to_string(), + headers: Some(HashMap::new()), + max_retries: default_max_retries(), + retry_delay_ms: default_retry_delay_ms(), + } + } +} + +impl Default for WebhookSink { + fn default() -> Self { + Self::new() + } +} + +fn default_max_retries() -> Option { + Some(3) +} +fn default_retry_delay_ms() -> Option { + Some(100) +} diff --git a/crates/config/src/observability/webhook_sink.rs b/crates/config/src/observability/webhook_sink.rs deleted file mode 100644 index 49429203..00000000 --- a/crates/config/src/observability/webhook_sink.rs +++ /dev/null @@ -1,25 +0,0 @@ -use serde::Deserialize; - -/// Webhook sink configuration -#[derive(Debug, Deserialize, Clone)] -pub struct WebhookSinkConfig { - pub url: String, - pub method: String, - pub headers: Vec<(String, String)>, -} - -impl WebhookSinkConfig { - pub fn new() -> Self { - Self { - url: "http://localhost:8080/webhook".to_string(), - method: "POST".to_string(), - headers: vec![], - } - } -} - -impl Default for WebhookSinkConfig { - fn default() -> Self { - Self::new() - } -} diff --git a/crates/event-notifier/Cargo.toml b/crates/event-notifier/Cargo.toml index 5d1d3390..8d9acd9a 100644 --- a/crates/event-notifier/Cargo.toml +++ b/crates/event-notifier/Cargo.toml @@ -9,13 +9,12 @@ version.workspace = true [features] default = ["webhook"] webhook = ["dep:reqwest"] -kafka = ["rdkafka"] mqtt = ["rumqttc"] +kafka = ["dep:rdkafka"] [dependencies] async-trait = { workspace = true } config = { workspace = true } -rdkafka = { workspace = true, features = ["tokio"], optional = true } reqwest = { workspace = true, optional = true } rumqttc = { workspace = true, optional = true } serde = { workspace = true } @@ -29,12 +28,16 @@ tokio = { workspace = true, features = ["sync", "net", "macros", "signal", "rt-m tokio-util = { workspace = true } uuid = { workspace = true, features = ["v4", "serde"] } +# Only enable kafka features and related dependencies on Linux +[target.'cfg(target_os = "linux")'.dependencies] +rdkafka = { workspace = true, features = ["tokio"], optional = true } [dev-dependencies] tokio = { workspace = true, features = ["test-util"] } tracing-subscriber = { workspace = true } http = { workspace = true } axum = { workspace = true } +dotenvy = "0.15.7" [lints] workspace = true diff --git a/crates/event-notifier/examples/.env-zh.example b/crates/event-notifier/examples/.env-zh.example deleted file mode 100644 index 00228e16..00000000 --- a/crates/event-notifier/examples/.env-zh.example +++ /dev/null @@ -1,28 +0,0 @@ -# ===== 全局配置 ===== -NOTIFIER__STORE_PATH=/var/log/event-notification -NOTIFIER__CHANNEL_CAPACITY=5000 - -# ===== 适配器配置(数组格式) ===== -# Webhook 适配器(索引 0) -NOTIFIER__ADAPTERS_0__type=Webhook -NOTIFIER__ADAPTERS_0__endpoint=http://127.0.0.1:3020/webhook -NOTIFIER__ADAPTERS_0__auth_token=your-auth-token -NOTIFIER__ADAPTERS_0__max_retries=3 -NOTIFIER__ADAPTERS_0__timeout=50 -NOTIFIER__ADAPTERS_0__custom_headers__x_custom_server=value -NOTIFIER__ADAPTERS_0__custom_headers__x_custom_client=value - -# Kafka 适配器(索引 1) -NOTIFIER__ADAPTERS_1__type=Kafka -NOTIFIER__ADAPTERS_1__brokers=localhost:9092 -NOTIFIER__ADAPTERS_1__topic=notifications -NOTIFIER__ADAPTERS_1__max_retries=3 -NOTIFIER__ADAPTERS_1__timeout=60 - -# MQTT 适配器(索引 2) -NOTIFIER__ADAPTERS_2__type=Mqtt -NOTIFIER__ADAPTERS_2__broker=mqtt.example.com -NOTIFIER__ADAPTERS_2__port=1883 -NOTIFIER__ADAPTERS_2__client_id=event-notifier -NOTIFIER__ADAPTERS_2__topic=events -NOTIFIER__ADAPTERS_2__max_retries=3 \ No newline at end of file diff --git a/crates/event-notifier/examples/.env.example b/crates/event-notifier/examples/.env.example index af343863..c6d37142 100644 --- a/crates/event-notifier/examples/.env.example +++ b/crates/event-notifier/examples/.env.example @@ -1,28 +1,28 @@ -# ===== global configuration ===== -NOTIFIER__STORE_PATH=/var/log/event-notification -NOTIFIER__CHANNEL_CAPACITY=5000 - -# ===== adapter configuration array format ===== -# webhook adapter index 0 -NOTIFIER__ADAPTERS_0__type=Webhook -NOTIFIER__ADAPTERS_0__endpoint=http://127.0.0.1:3020/webhook -NOTIFIER__ADAPTERS_0__auth_token=your-auth-token -NOTIFIER__ADAPTERS_0__max_retries=3 -NOTIFIER__ADAPTERS_0__timeout=50 -NOTIFIER__ADAPTERS_0__custom_headers__x_custom_server=server-value -NOTIFIER__ADAPTERS_0__custom_headers__x_custom_client=client-value - -# kafka adapter index 1 -NOTIFIER__ADAPTERS_1__type=Kafka -NOTIFIER__ADAPTERS_1__brokers=localhost:9092 -NOTIFIER__ADAPTERS_1__topic=notifications -NOTIFIER__ADAPTERS_1__max_retries=3 -NOTIFIER__ADAPTERS_1__timeout=60 - -# mqtt adapter index 2 -NOTIFIER__ADAPTERS_2__type=Mqtt -NOTIFIER__ADAPTERS_2__broker=mqtt.example.com -NOTIFIER__ADAPTERS_2__port=1883 -NOTIFIER__ADAPTERS_2__client_id=event-notifier -NOTIFIER__ADAPTERS_2__topic=events -NOTIFIER__ADAPTERS_2__max_retries=3 \ No newline at end of file +## ===== global configuration ===== +#NOTIFIER__STORE_PATH=/var/log/event-notification +#NOTIFIER__CHANNEL_CAPACITY=5000 +# +## ===== adapter configuration array format ===== +## webhook adapter index 0 +#NOTIFIER__ADAPTERS_0__type=Webhook +#NOTIFIER__ADAPTERS_0__endpoint=http://127.0.0.1:3020/webhook +#NOTIFIER__ADAPTERS_0__auth_token=your-auth-token +#NOTIFIER__ADAPTERS_0__max_retries=3 +#NOTIFIER__ADAPTERS_0__timeout=50 +#NOTIFIER__ADAPTERS_0__custom_headers__x_custom_server=server-value +#NOTIFIER__ADAPTERS_0__custom_headers__x_custom_client=client-value +# +## kafka adapter index 1 +#NOTIFIER__ADAPTERS_1__type=Kafka +#NOTIFIER__ADAPTERS_1__brokers=localhost:9092 +#NOTIFIER__ADAPTERS_1__topic=notifications +#NOTIFIER__ADAPTERS_1__max_retries=3 +#NOTIFIER__ADAPTERS_1__timeout=60 +# +## mqtt adapter index 2 +#NOTIFIER__ADAPTERS_2__type=Mqtt +#NOTIFIER__ADAPTERS_2__broker=mqtt.example.com +#NOTIFIER__ADAPTERS_2__port=1883 +#NOTIFIER__ADAPTERS_2__client_id=event-notifier +#NOTIFIER__ADAPTERS_2__topic=events +#NOTIFIER__ADAPTERS_2__max_retries=3 \ No newline at end of file diff --git a/crates/event-notifier/examples/.env.zh.example b/crates/event-notifier/examples/.env.zh.example new file mode 100644 index 00000000..47f54308 --- /dev/null +++ b/crates/event-notifier/examples/.env.zh.example @@ -0,0 +1,28 @@ +## ===== 全局配置 ===== +#NOTIFIER__STORE_PATH=/var/log/event-notification +#NOTIFIER__CHANNEL_CAPACITY=5000 +# +## ===== 适配器配置(数组格式) ===== +## Webhook 适配器(索引 0) +#NOTIFIER__ADAPTERS_0__type=Webhook +#NOTIFIER__ADAPTERS_0__endpoint=http://127.0.0.1:3020/webhook +#NOTIFIER__ADAPTERS_0__auth_token=your-auth-token +#NOTIFIER__ADAPTERS_0__max_retries=3 +#NOTIFIER__ADAPTERS_0__timeout=50 +#NOTIFIER__ADAPTERS_0__custom_headers__x_custom_server=value +#NOTIFIER__ADAPTERS_0__custom_headers__x_custom_client=value +# +## Kafka 适配器(索引 1) +#NOTIFIER__ADAPTERS_1__type=Kafka +#NOTIFIER__ADAPTERS_1__brokers=localhost:9092 +#NOTIFIER__ADAPTERS_1__topic=notifications +#NOTIFIER__ADAPTERS_1__max_retries=3 +#NOTIFIER__ADAPTERS_1__timeout=60 +# +## MQTT 适配器(索引 2) +#NOTIFIER__ADAPTERS_2__type=Mqtt +#NOTIFIER__ADAPTERS_2__broker=mqtt.example.com +#NOTIFIER__ADAPTERS_2__port=1883 +#NOTIFIER__ADAPTERS_2__client_id=event-notifier +#NOTIFIER__ADAPTERS_2__topic=events +#NOTIFIER__ADAPTERS_2__max_retries=3 \ No newline at end of file diff --git a/crates/event-notifier/examples/simple.rs b/crates/event-notifier/examples/simple.rs index 93005f0a..27d422b0 100644 --- a/crates/event-notifier/examples/simple.rs +++ b/crates/event-notifier/examples/simple.rs @@ -33,7 +33,9 @@ async fn main() -> Result<(), Box> { // loading configuration from environment variables let _config = NotifierConfig::event_load_config(Some("./crates/event-notifier/examples/event.toml".to_string())); tracing::info!("event_load_config config: {:?} \n", _config); - + dotenvy::dotenv()?; + let _config = NotifierConfig::event_load_config(None); + tracing::info!("event_load_config config: {:?} \n", _config); let system = Arc::new(tokio::sync::Mutex::new(NotifierSystem::new(config.clone()).await?)); let adapters = create_adapters(&config.adapters)?; diff --git a/crates/event-notifier/src/adapter/mod.rs b/crates/event-notifier/src/adapter/mod.rs index fa12aa97..426fd2d8 100644 --- a/crates/event-notifier/src/adapter/mod.rs +++ b/crates/event-notifier/src/adapter/mod.rs @@ -4,7 +4,7 @@ use crate::Event; use async_trait::async_trait; use std::sync::Arc; -#[cfg(feature = "kafka")] +#[cfg(all(feature = "kafka", target_os = "linux"))] pub(crate) mod kafka; #[cfg(feature = "mqtt")] pub(crate) mod mqtt; @@ -31,7 +31,7 @@ pub fn create_adapters(configs: &[AdapterConfig]) -> Result { adapters.push(Arc::new(kafka::KafkaAdapter::new(kafka_config)?)); } @@ -43,7 +43,7 @@ pub fn create_adapters(configs: &[AdapterConfig]) -> Result return Err(Error::FeatureDisabled("webhook")), - #[cfg(not(feature = "kafka"))] + #[cfg(any(not(feature = "kafka"), not(target_os = "linux")))] AdapterConfig::Kafka(_) => return Err(Error::FeatureDisabled("kafka")), #[cfg(not(feature = "mqtt"))] AdapterConfig::Mqtt(_) => return Err(Error::FeatureDisabled("mqtt")), diff --git a/crates/event-notifier/src/config.rs b/crates/event-notifier/src/config.rs index 10429c34..3414f3fa 100644 --- a/crates/event-notifier/src/config.rs +++ b/crates/event-notifier/src/config.rs @@ -1,4 +1,4 @@ -use config::{Config, Environment, File, FileFormat}; +use config::{Config, File, FileFormat}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::env; @@ -138,15 +138,6 @@ impl NotifierConfig { let app_config = Config::builder() .add_source(File::with_name(config_dir.as_str()).format(FileFormat::Toml).required(false)) .add_source(File::with_name(config_dir.as_str()).format(FileFormat::Yaml).required(false)) - .add_source( - Environment::default() - .prefix("NOTIFIER") - .prefix_separator("__") - .separator("__") - .list_separator("_") - .with_list_parse_key("adapters") - .try_parsing(true), - ) .build() .unwrap_or_default(); match app_config.try_deserialize::() { diff --git a/crates/event-notifier/src/error.rs b/crates/event-notifier/src/error.rs index e6c061de..ebdaf899 100644 --- a/crates/event-notifier/src/error.rs +++ b/crates/event-notifier/src/error.rs @@ -15,7 +15,7 @@ pub enum Error { Serde(#[from] serde_json::Error), #[error("HTTP error: {0}")] Http(#[from] reqwest::Error), - #[cfg(feature = "kafka")] + #[cfg(all(feature = "kafka", target_os = "linux"))] #[error("Kafka error: {0}")] Kafka(#[from] rdkafka::error::KafkaError), #[cfg(feature = "mqtt")] diff --git a/crates/event-notifier/src/lib.rs b/crates/event-notifier/src/lib.rs index 20ef935d..fe2e5e3d 100644 --- a/crates/event-notifier/src/lib.rs +++ b/crates/event-notifier/src/lib.rs @@ -8,7 +8,7 @@ mod notifier; mod store; pub use adapter::create_adapters; -#[cfg(feature = "kafka")] +#[cfg(all(feature = "kafka", target_os = "linux"))] pub use adapter::kafka::KafkaAdapter; #[cfg(feature = "mqtt")] pub use adapter::mqtt::MqttAdapter; @@ -16,7 +16,7 @@ pub use adapter::mqtt::MqttAdapter; pub use adapter::webhook::WebhookAdapter; pub use adapter::ChannelAdapter; pub use bus::event_bus; -#[cfg(feature = "kafka")] +#[cfg(all(feature = "kafka", target_os = "linux"))] pub use config::KafkaConfig; #[cfg(feature = "mqtt")] pub use config::MqttConfig; diff --git a/crates/obs/Cargo.toml b/crates/obs/Cargo.toml index 26c26433..65968d51 100644 --- a/crates/obs/Cargo.toml +++ b/crates/obs/Cargo.toml @@ -13,11 +13,11 @@ workspace = true default = ["file"] file = [] gpu = ["dep:nvml-wrapper"] -kafka = ["dep:rdkafka"] webhook = ["dep:reqwest"] -full = ["file", "gpu", "kafka", "webhook"] +kafka = ["dep:rdkafka"] [dependencies] +rustfs-config = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } config = { workspace = true } @@ -37,12 +37,14 @@ tracing-error = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt", "env-filter", "tracing-log", "time", "local-time", "json"] } tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread", "rt", "time", "macros"] } -rdkafka = { workspace = true, features = ["tokio"], optional = true } reqwest = { workspace = true, optional = true, default-features = false } serde_json = { workspace = true } sysinfo = { workspace = true } thiserror = { workspace = true } +# Only enable kafka features and related dependencies on Linux +[target.'cfg(target_os = "linux")'.dependencies] +rdkafka = { workspace = true, features = ["tokio"], optional = true } [dev-dependencies] diff --git a/crates/obs/examples/config.toml b/crates/obs/examples/config.toml index c1b3df14..b649f806 100644 --- a/crates/obs/examples/config.toml +++ b/crates/obs/examples/config.toml @@ -7,27 +7,28 @@ service_name = "rustfs_obs" service_version = "0.1.0" environments = "develop" logger_level = "debug" +local_logging_enabled = true -[sinks] -[sinks.kafka] -enabled = false -bootstrap_servers = "localhost:9092" -topic = "logs" -batch_size = 100 # Default is 100 if not specified -batch_timeout_ms = 1000 # Default is 1000ms if not specified +#[[sinks]] +#type = "Kafka" +#bootstrap_servers = "localhost:9092" +#topic = "logs" +#batch_size = 100 # Default is 100 if not specified +#batch_timeout_ms = 100 # Default is 1000ms if not specified +# +#[[sinks]] +#type = "Webhook" +#endpoint = "http://localhost:8080/webhook" +#auth_token = "" +#batch_size = 100 # Default is 3 if not specified +#batch_timeout_ms = 100 # Default is 100ms if not specified -[sinks.webhook] -enabled = false -endpoint = "http://localhost:8080/webhook" -auth_token = "" -batch_size = 100 # Default is 3 if not specified -batch_timeout_ms = 1000 # Default is 100ms if not specified - -[sinks.file] -enabled = true -path = "deploy/logs/app.log" -batch_size = 100 -batch_timeout_ms = 1000 # Default is 8192 bytes if not specified +[[sinks]] +type = "File" +path = "deploy/logs/rustfs.log" +buffer_size = 102 # Default is 8192 bytes if not specified +flush_interval_ms = 1000 +flush_threshold = 100 [logger] queue_capacity = 10000 \ No newline at end of file diff --git a/crates/obs/src/config.rs b/crates/obs/src/config.rs index 737d2c0d..98929470 100644 --- a/crates/obs/src/config.rs +++ b/crates/obs/src/config.rs @@ -1,6 +1,6 @@ use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION, USE_STDOUT}; -use config::{Config, Environment, File, FileFormat}; -use serde::Deserialize; +use config::{Config, File, FileFormat}; +use serde::{Deserialize, Serialize}; use std::env; /// OpenTelemetry Configuration @@ -11,7 +11,7 @@ use std::env; /// Add use_stdout for output to stdout /// Add logger level for log level /// Add local_logging_enabled for local logging enabled -#[derive(Debug, Deserialize, Clone)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct OtelConfig { pub endpoint: String, // Endpoint for metric collection pub use_stdout: Option, // Output to stdout @@ -24,7 +24,7 @@ pub struct OtelConfig { pub local_logging_enabled: Option, // Local logging enabled } -// Helper function: Extract observable configuration from environment variables +/// Helper function: Extract observable configuration from environment variables fn extract_otel_config_from_env() -> OtelConfig { OtelConfig { endpoint: env::var("RUSTFS_OBSERVABILITY_ENDPOINT").unwrap_or_else(|_| "".to_string()), @@ -63,36 +63,89 @@ fn extract_otel_config_from_env() -> OtelConfig { } } -impl Default for OtelConfig { - fn default() -> Self { +impl OtelConfig { + /// Create a new instance of OtelConfig with default values + /// + /// # Returns + /// A new instance of OtelConfig + pub fn new() -> Self { extract_otel_config_from_env() } } +impl Default for OtelConfig { + fn default() -> Self { + Self::new() + } +} + /// Kafka Sink Configuration - Add batch parameters -#[derive(Debug, Deserialize, Clone, Default)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct KafkaSinkConfig { - pub enabled: bool, - pub bootstrap_servers: String, + pub brokers: String, pub topic: String, pub batch_size: Option, // Batch size, default 100 pub batch_timeout_ms: Option, // Batch timeout time, default 1000ms } +impl KafkaSinkConfig { + pub fn new() -> Self { + Self { + brokers: env::var("RUSTFS__SINKS_0_KAFKA_BROKERS") + .ok() + .filter(|s| !s.trim().is_empty()) + .unwrap_or_else(|| "localhost:9092".to_string()), + topic: env::var("RUSTFS__SINKS_0_KAFKA_TOPIC") + .ok() + .filter(|s| !s.trim().is_empty()) + .unwrap_or_else(|| "default_topic".to_string()), + batch_size: Some(100), + batch_timeout_ms: Some(1000), + } + } +} + +impl Default for KafkaSinkConfig { + fn default() -> Self { + Self::new() + } +} + /// Webhook Sink Configuration - Add Retry Parameters -#[derive(Debug, Deserialize, Clone, Default)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct WebhookSinkConfig { - pub enabled: bool, pub endpoint: String, pub auth_token: String, pub max_retries: Option, // Maximum number of retry times, default 3 pub retry_delay_ms: Option, // Retry the delay cardinality, default 100ms } +impl WebhookSinkConfig { + pub fn new() -> Self { + Self { + endpoint: env::var("RUSTFS__SINKS_0_WEBHOOK_ENDPOINT") + .ok() + .filter(|s| !s.trim().is_empty()) + .unwrap_or_else(|| "http://localhost:8080".to_string()), + auth_token: env::var("RUSTFS__SINKS_0_WEBHOOK_AUTH_TOKEN") + .ok() + .filter(|s| !s.trim().is_empty()) + .unwrap_or_else(|| "default_token".to_string()), + max_retries: Some(3), + retry_delay_ms: Some(100), + } + } +} + +impl Default for WebhookSinkConfig { + fn default() -> Self { + Self::new() + } +} + /// File Sink Configuration - Add buffering parameters -#[derive(Debug, Deserialize, Clone)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct FileSinkConfig { - pub enabled: bool, pub path: String, pub buffer_size: Option, // Write buffer size, default 8192 pub flush_interval_ms: Option, // Refresh interval time, default 1000ms @@ -114,13 +167,9 @@ impl FileSinkConfig { .unwrap_or("rustfs/rustfs.log") .to_string() } -} - -impl Default for FileSinkConfig { - fn default() -> Self { - FileSinkConfig { - enabled: true, - path: env::var("RUSTFS_SINKS_FILE_PATH") + pub fn new() -> Self { + Self { + path: env::var("RUSTFS__SINKS_0_FILE_PATH") .ok() .filter(|s| !s.trim().is_empty()) .unwrap_or_else(Self::get_default_log_path), @@ -131,38 +180,53 @@ impl Default for FileSinkConfig { } } +impl Default for FileSinkConfig { + fn default() -> Self { + Self::new() + } +} + /// Sink configuration collection -#[derive(Debug, Deserialize, Clone)] -pub struct SinkConfig { - pub kafka: Option, - pub webhook: Option, - pub file: Option, +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum SinkConfig { + File(FileSinkConfig), + Kafka(KafkaSinkConfig), + Webhook(WebhookSinkConfig), +} + +impl SinkConfig { + pub fn new() -> Self { + Self::File(FileSinkConfig::new()) + } } impl Default for SinkConfig { fn default() -> Self { - SinkConfig { - kafka: None, - webhook: None, - file: Some(FileSinkConfig::default()), - } + Self::new() } } ///Logger Configuration -#[derive(Debug, Deserialize, Clone)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct LoggerConfig { pub queue_capacity: Option, } -impl Default for LoggerConfig { - fn default() -> Self { - LoggerConfig { +impl LoggerConfig { + pub fn new() -> Self { + Self { queue_capacity: Some(10000), } } } +impl Default for LoggerConfig { + fn default() -> Self { + Self::new() + } +} + /// Overall application configuration /// Add observability, sinks, and logger configuration /// @@ -180,7 +244,7 @@ impl Default for LoggerConfig { #[derive(Debug, Deserialize, Clone)] pub struct AppConfig { pub observability: OtelConfig, - pub sinks: SinkConfig, + pub sinks: Vec, pub logger: Option, } @@ -192,7 +256,7 @@ impl AppConfig { pub fn new() -> Self { Self { observability: OtelConfig::default(), - sinks: SinkConfig::default(), + sinks: vec![SinkConfig::default()], logger: Some(LoggerConfig::default()), } } @@ -258,14 +322,6 @@ pub fn load_config(config_dir: Option) -> AppConfig { let app_config = Config::builder() .add_source(File::with_name(config_dir.as_str()).format(FileFormat::Toml).required(false)) .add_source(File::with_name(config_dir.as_str()).format(FileFormat::Yaml).required(false)) - .add_source( - Environment::default() - .prefix("RUSTFS") - .prefix_separator("__") - .separator("__") - .with_list_parse_key("volumes") - .try_parsing(true), - ) .build() .unwrap_or_default(); diff --git a/crates/obs/src/lib.rs b/crates/obs/src/lib.rs index d41d0e66..5d181e31 100644 --- a/crates/obs/src/lib.rs +++ b/crates/obs/src/lib.rs @@ -32,7 +32,7 @@ mod config; mod entry; mod global; mod logger; -mod sink; +mod sinks; mod system; mod telemetry; mod utils; @@ -40,12 +40,6 @@ mod worker; use crate::logger::InitLogStatus; pub use config::load_config; -#[cfg(feature = "file")] -pub use config::FileSinkConfig; -#[cfg(feature = "kafka")] -pub use config::KafkaSinkConfig; -#[cfg(feature = "webhook")] -pub use config::WebhookSinkConfig; pub use config::{AppConfig, LoggerConfig, OtelConfig, SinkConfig}; pub use entry::args::Args; pub use entry::audit::{ApiDetails, AuditLogEntry}; @@ -79,7 +73,7 @@ use tracing::{error, info}; /// ``` pub async fn init_obs(config: AppConfig) -> (Arc>, telemetry::OtelGuard) { let guard = init_telemetry(&config.observability); - let sinks = sink::create_sinks(&config).await; + let sinks = sinks::create_sinks(&config).await; let logger = init_global_logger(&config, sinks).await; let obs_config = config.observability.clone(); tokio::spawn(async move { diff --git a/crates/obs/src/logger.rs b/crates/obs/src/logger.rs index 6329ab8f..92ff5365 100644 --- a/crates/obs/src/logger.rs +++ b/crates/obs/src/logger.rs @@ -1,5 +1,5 @@ use crate::global::{ENVIRONMENT, SERVICE_NAME, SERVICE_VERSION}; -use crate::sink::Sink; +use crate::sinks::Sink; use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, GlobalError, OtelConfig, ServerLogEntry, UnifiedLogEntry}; use std::sync::Arc; use std::time::SystemTime; diff --git a/crates/obs/src/sink.rs b/crates/obs/src/sink.rs deleted file mode 100644 index 4df212b3..00000000 --- a/crates/obs/src/sink.rs +++ /dev/null @@ -1,497 +0,0 @@ -use crate::{AppConfig, LogRecord, UnifiedLogEntry}; -use async_trait::async_trait; -use std::sync::Arc; -use tokio::fs::OpenOptions; -use tokio::io; -use tokio::io::AsyncWriteExt; - -/// Sink Trait definition, asynchronously write logs -#[async_trait] -pub trait Sink: Send + Sync { - async fn write(&self, entry: &UnifiedLogEntry); -} - -#[cfg(feature = "kafka")] -/// Kafka Sink Implementation -pub struct KafkaSink { - producer: rdkafka::producer::FutureProducer, - topic: String, - batch_size: usize, - batch_timeout_ms: u64, - entries: Arc>>, - last_flush: Arc, -} - -#[cfg(feature = "kafka")] -impl KafkaSink { - /// Create a new KafkaSink instance - pub fn new(producer: rdkafka::producer::FutureProducer, topic: String, batch_size: usize, batch_timeout_ms: u64) -> Self { - // Create Arc-wrapped values first - let entries = Arc::new(tokio::sync::Mutex::new(Vec::with_capacity(batch_size))); - let last_flush = Arc::new(std::sync::atomic::AtomicU64::new( - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64, - )); - let sink = KafkaSink { - producer: producer.clone(), - topic: topic.clone(), - batch_size, - batch_timeout_ms, - entries: entries.clone(), - last_flush: last_flush.clone(), - }; - - // Start background flusher - tokio::spawn(Self::periodic_flush(producer, topic, entries, last_flush, batch_timeout_ms)); - - sink - } - - /// Add a getter method to read the batch_timeout_ms field - #[allow(dead_code)] - pub fn batch_timeout(&self) -> u64 { - self.batch_timeout_ms - } - - /// Add a method to dynamically adjust the timeout if needed - #[allow(dead_code)] - pub fn set_batch_timeout(&mut self, new_timeout_ms: u64) { - self.batch_timeout_ms = new_timeout_ms; - } - - async fn periodic_flush( - producer: rdkafka::producer::FutureProducer, - topic: String, - entries: Arc>>, - last_flush: Arc, - timeout_ms: u64, - ) { - loop { - tokio::time::sleep(tokio::time::Duration::from_millis(timeout_ms / 2)).await; - - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - - let last = last_flush.load(std::sync::atomic::Ordering::Relaxed); - - if now - last >= timeout_ms { - let mut batch = entries.lock().await; - if !batch.is_empty() { - Self::send_batch(&producer, &topic, batch.drain(..).collect()).await; - last_flush.store(now, std::sync::atomic::Ordering::Relaxed); - } - } - } - } - - async fn send_batch(producer: &rdkafka::producer::FutureProducer, topic: &str, entries: Vec) { - for entry in entries { - let payload = match serde_json::to_string(&entry) { - Ok(p) => p, - Err(e) => { - eprintln!("Failed to serialize log entry: {}", e); - continue; - } - }; - - let span_id = entry.get_timestamp().to_rfc3339(); - - let _ = producer - .send( - rdkafka::producer::FutureRecord::to(topic).payload(&payload).key(&span_id), - std::time::Duration::from_secs(5), - ) - .await; - } - } -} - -#[cfg(feature = "kafka")] -#[async_trait] -impl Sink for KafkaSink { - async fn write(&self, entry: &UnifiedLogEntry) { - let mut batch = self.entries.lock().await; - batch.push(entry.clone()); - - let should_flush_by_size = batch.len() >= self.batch_size; - let should_flush_by_time = { - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - let last = self.last_flush.load(std::sync::atomic::Ordering::Relaxed); - now - last >= self.batch_timeout_ms - }; - - if should_flush_by_size || should_flush_by_time { - // Existing flush logic - let entries_to_send: Vec = batch.drain(..).collect(); - let producer = self.producer.clone(); - let topic = self.topic.clone(); - - self.last_flush.store( - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64, - std::sync::atomic::Ordering::Relaxed, - ); - - tokio::spawn(async move { - KafkaSink::send_batch(&producer, &topic, entries_to_send).await; - }); - } - } -} - -#[cfg(feature = "kafka")] -impl Drop for KafkaSink { - fn drop(&mut self) { - // Perform any necessary cleanup here - // For example, you might want to flush any remaining entries - let producer = self.producer.clone(); - let topic = self.topic.clone(); - let entries = self.entries.clone(); - let last_flush = self.last_flush.clone(); - - tokio::spawn(async move { - let mut batch = entries.lock().await; - if !batch.is_empty() { - KafkaSink::send_batch(&producer, &topic, batch.drain(..).collect()).await; - last_flush.store( - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64, - std::sync::atomic::Ordering::Relaxed, - ); - } - }); - - eprintln!("Dropping KafkaSink with topic: {}", self.topic); - } -} - -#[cfg(feature = "webhook")] -/// Webhook Sink Implementation -pub struct WebhookSink { - endpoint: String, - auth_token: String, - client: reqwest::Client, - max_retries: usize, - retry_delay_ms: u64, -} - -#[cfg(feature = "webhook")] -impl WebhookSink { - pub fn new(endpoint: String, auth_token: String, max_retries: usize, retry_delay_ms: u64) -> Self { - WebhookSink { - endpoint, - auth_token, - client: reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(10)) - .build() - .unwrap_or_else(|_| reqwest::Client::new()), - max_retries, - retry_delay_ms, - } - } -} - -#[cfg(feature = "webhook")] -#[async_trait] -impl Sink for WebhookSink { - async fn write(&self, entry: &UnifiedLogEntry) { - let mut retries = 0; - let url = self.endpoint.clone(); - let entry_clone = entry.clone(); - let auth_value = reqwest::header::HeaderValue::from_str(format!("Bearer {}", self.auth_token.clone()).as_str()).unwrap(); - while retries < self.max_retries { - match self - .client - .post(&url) - .header(reqwest::header::AUTHORIZATION, auth_value.clone()) - .json(&entry_clone) - .send() - .await - { - Ok(response) if response.status().is_success() => { - return; - } - _ => { - retries += 1; - if retries < self.max_retries { - tokio::time::sleep(tokio::time::Duration::from_millis( - self.retry_delay_ms * (1 << retries), // Exponential backoff - )) - .await; - } - } - } - } - - eprintln!("Failed to send log to webhook after {} retries", self.max_retries); - } -} - -#[cfg(feature = "webhook")] -impl Drop for WebhookSink { - fn drop(&mut self) { - // Perform any necessary cleanup here - // For example, you might want to log that the sink is being dropped - eprintln!("Dropping WebhookSink with URL: {}", self.endpoint); - } -} - -#[cfg(feature = "file")] -/// File Sink Implementation -pub struct FileSink { - path: String, - buffer_size: usize, - writer: Arc>>, - entry_count: std::sync::atomic::AtomicUsize, - last_flush: std::sync::atomic::AtomicU64, - flush_interval_ms: u64, // Time between flushes - flush_threshold: usize, // Number of entries before flush -} - -#[cfg(feature = "file")] -impl FileSink { - /// Create a new FileSink instance - pub async fn new( - path: String, - buffer_size: usize, - flush_interval_ms: u64, - flush_threshold: usize, - ) -> Result { - // check if the file exists - let file_exists = tokio::fs::metadata(&path).await.is_ok(); - // if the file not exists, create it - if !file_exists { - tokio::fs::create_dir_all(std::path::Path::new(&path).parent().unwrap()).await?; - tracing::debug!("File does not exist, creating it. Path: {:?}", path) - } - let file = if file_exists { - // If the file exists, open it in append mode - tracing::debug!("FileSink: File exists, opening in append mode."); - OpenOptions::new().append(true).create(true).open(&path).await? - } else { - // If the file does not exist, create it - tracing::debug!("FileSink: File does not exist, creating a new file."); - // Create the file and write a header or initial content if needed - OpenOptions::new().create(true).truncate(true).write(true).open(&path).await? - }; - let writer = io::BufWriter::with_capacity(buffer_size, file); - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - Ok(FileSink { - path, - buffer_size, - writer: Arc::new(tokio::sync::Mutex::new(writer)), - entry_count: std::sync::atomic::AtomicUsize::new(0), - last_flush: std::sync::atomic::AtomicU64::new(now), - flush_interval_ms, - flush_threshold, - }) - } - - #[allow(dead_code)] - async fn initialize_writer(&mut self) -> io::Result<()> { - let file = tokio::fs::File::create(&self.path).await?; - - // Use buffer_size to create a buffer writer with a specified capacity - let buf_writer = io::BufWriter::with_capacity(self.buffer_size, file); - - // Replace the original writer with the new Mutex - self.writer = Arc::new(tokio::sync::Mutex::new(buf_writer)); - Ok(()) - } - - // Get the current buffer size - #[allow(dead_code)] - pub fn buffer_size(&self) -> usize { - self.buffer_size - } - - // How to dynamically adjust the buffer size - #[allow(dead_code)] - pub async fn set_buffer_size(&mut self, new_size: usize) -> io::Result<()> { - if self.buffer_size != new_size { - self.buffer_size = new_size; - // Reinitialize the writer directly, without checking is_some() - self.initialize_writer().await?; - } - Ok(()) - } - - // Check if flushing is needed based on count or time - fn should_flush(&self) -> bool { - // Check entry count threshold - if self.entry_count.load(std::sync::atomic::Ordering::Relaxed) >= self.flush_threshold { - return true; - } - - // Check time threshold - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - - let last = self.last_flush.load(std::sync::atomic::Ordering::Relaxed); - now - last >= self.flush_interval_ms - } -} - -#[cfg(feature = "file")] -#[async_trait] -impl Sink for FileSink { - async fn write(&self, entry: &UnifiedLogEntry) { - let line = format!("{:?}\n", entry); - let mut writer = self.writer.lock().await; - - if let Err(e) = writer.write_all(line.as_bytes()).await { - eprintln!( - "Failed to write log to file {}: {},entry timestamp:{:?}", - self.path, - e, - entry.get_timestamp() - ); - return; - } - - // Only flush periodically to improve performance - // Logic to determine when to flush could be added here - // Increment the entry count - self.entry_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - - // Check if we should flush - if self.should_flush() { - if let Err(e) = writer.flush().await { - eprintln!("Failed to flush log file {}: {}", self.path, e); - return; - } - - // Reset counters - self.entry_count.store(0, std::sync::atomic::Ordering::Relaxed); - - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - - self.last_flush.store(now, std::sync::atomic::Ordering::Relaxed); - } - } -} - -#[cfg(feature = "file")] -impl Drop for FileSink { - fn drop(&mut self) { - let writer = self.writer.clone(); - let path = self.path.clone(); - - tokio::task::spawn_blocking(move || { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - let mut writer = writer.lock().await; - if let Err(e) = writer.flush().await { - eprintln!("Failed to flush log file {}: {}", path, e); - } - }); - }); - } -} - -/// Create a list of Sink instances -pub async fn create_sinks(config: &AppConfig) -> Vec> { - let mut sinks: Vec> = Vec::new(); - - #[cfg(feature = "kafka")] - { - match &config.sinks.kafka { - Some(sink_kafka) => { - if sink_kafka.enabled { - match rdkafka::config::ClientConfig::new() - .set("bootstrap.servers", &sink_kafka.bootstrap_servers) - .set("message.timeout.ms", "5000") - .create() - { - Ok(producer) => { - sinks.push(Arc::new(KafkaSink::new( - producer, - sink_kafka.topic.clone(), - sink_kafka.batch_size.unwrap_or(100), - sink_kafka.batch_timeout_ms.unwrap_or(1000), - ))); - } - Err(e) => { - tracing::error!("Failed to create Kafka producer: {}", e); - } - } - } else { - tracing::info!("Kafka sink is disabled in the configuration"); - } - } - _ => { - tracing::info!("Kafka sink is not configured or disabled"); - } - } - } - #[cfg(feature = "webhook")] - { - match &config.sinks.webhook { - Some(sink_webhook) => { - if sink_webhook.enabled { - sinks.push(Arc::new(WebhookSink::new( - sink_webhook.endpoint.clone(), - sink_webhook.auth_token.clone(), - sink_webhook.max_retries.unwrap_or(3), - sink_webhook.retry_delay_ms.unwrap_or(100), - ))); - } else { - tracing::info!("Webhook sink is disabled in the configuration"); - } - } - _ => { - tracing::info!("Webhook sink is not configured or disabled"); - } - } - } - - #[cfg(feature = "file")] - { - // let config = config.clone(); - match &config.sinks.file { - Some(sink_file) => { - tracing::info!("File sink is enabled in the configuration"); - let path = if sink_file.enabled { - sink_file.path.clone() - } else { - "rustfs.log".to_string() - }; - tracing::debug!("FileSink: Using path: {}", path); - sinks.push(Arc::new( - FileSink::new( - path.clone(), - sink_file.buffer_size.unwrap_or(8192), - sink_file.flush_interval_ms.unwrap_or(1000), - sink_file.flush_threshold.unwrap_or(100), - ) - .await - .unwrap(), - )); - } - _ => { - tracing::info!("File sink is not configured or disabled"); - } - } - } - - sinks -} diff --git a/crates/obs/src/sinks/file.rs b/crates/obs/src/sinks/file.rs new file mode 100644 index 00000000..3e2b0db4 --- /dev/null +++ b/crates/obs/src/sinks/file.rs @@ -0,0 +1,164 @@ +use crate::sinks::Sink; +use crate::{LogRecord, UnifiedLogEntry}; +use async_trait::async_trait; +use std::sync::Arc; +use tokio::fs::OpenOptions; +use tokio::io; +use tokio::io::AsyncWriteExt; + +/// File Sink Implementation +pub struct FileSink { + path: String, + buffer_size: usize, + writer: Arc>>, + entry_count: std::sync::atomic::AtomicUsize, + last_flush: std::sync::atomic::AtomicU64, + flush_interval_ms: u64, // Time between flushes + flush_threshold: usize, // Number of entries before flush +} + +impl FileSink { + /// Create a new FileSink instance + pub async fn new( + path: String, + buffer_size: usize, + flush_interval_ms: u64, + flush_threshold: usize, + ) -> Result { + // check if the file exists + let file_exists = tokio::fs::metadata(&path).await.is_ok(); + // if the file not exists, create it + if !file_exists { + tokio::fs::create_dir_all(std::path::Path::new(&path).parent().unwrap()).await?; + tracing::debug!("File does not exist, creating it. Path: {:?}", path) + } + let file = if file_exists { + // If the file exists, open it in append mode + tracing::debug!("FileSink: File exists, opening in append mode."); + OpenOptions::new().append(true).create(true).open(&path).await? + } else { + // If the file does not exist, create it + tracing::debug!("FileSink: File does not exist, creating a new file."); + // Create the file and write a header or initial content if needed + OpenOptions::new().create(true).truncate(true).write(true).open(&path).await? + }; + let writer = io::BufWriter::with_capacity(buffer_size, file); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + Ok(FileSink { + path, + buffer_size, + writer: Arc::new(tokio::sync::Mutex::new(writer)), + entry_count: std::sync::atomic::AtomicUsize::new(0), + last_flush: std::sync::atomic::AtomicU64::new(now), + flush_interval_ms, + flush_threshold, + }) + } + + #[allow(dead_code)] + async fn initialize_writer(&mut self) -> io::Result<()> { + let file = tokio::fs::File::create(&self.path).await?; + + // Use buffer_size to create a buffer writer with a specified capacity + let buf_writer = io::BufWriter::with_capacity(self.buffer_size, file); + + // Replace the original writer with the new Mutex + self.writer = Arc::new(tokio::sync::Mutex::new(buf_writer)); + Ok(()) + } + + // Get the current buffer size + #[allow(dead_code)] + pub fn buffer_size(&self) -> usize { + self.buffer_size + } + + // How to dynamically adjust the buffer size + #[allow(dead_code)] + pub async fn set_buffer_size(&mut self, new_size: usize) -> io::Result<()> { + if self.buffer_size != new_size { + self.buffer_size = new_size; + // Reinitialize the writer directly, without checking is_some() + self.initialize_writer().await?; + } + Ok(()) + } + + // Check if flushing is needed based on count or time + fn should_flush(&self) -> bool { + // Check entry count threshold + if self.entry_count.load(std::sync::atomic::Ordering::Relaxed) >= self.flush_threshold { + return true; + } + + // Check time threshold + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + let last = self.last_flush.load(std::sync::atomic::Ordering::Relaxed); + now - last >= self.flush_interval_ms + } +} + +#[async_trait] +impl Sink for FileSink { + async fn write(&self, entry: &UnifiedLogEntry) { + let line = format!("{:?}\n", entry); + let mut writer = self.writer.lock().await; + + if let Err(e) = writer.write_all(line.as_bytes()).await { + eprintln!( + "Failed to write log to file {}: {},entry timestamp:{:?}", + self.path, + e, + entry.get_timestamp() + ); + return; + } + + // Only flush periodically to improve performance + // Logic to determine when to flush could be added here + // Increment the entry count + self.entry_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + // Check if we should flush + if self.should_flush() { + if let Err(e) = writer.flush().await { + eprintln!("Failed to flush log file {}: {}", self.path, e); + return; + } + + // Reset counters + self.entry_count.store(0, std::sync::atomic::Ordering::Relaxed); + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + self.last_flush.store(now, std::sync::atomic::Ordering::Relaxed); + } + } +} + +impl Drop for FileSink { + fn drop(&mut self) { + let writer = self.writer.clone(); + let path = self.path.clone(); + + tokio::task::spawn_blocking(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let mut writer = writer.lock().await; + if let Err(e) = writer.flush().await { + eprintln!("Failed to flush log file {}: {}", path, e); + } + }); + }); + } +} diff --git a/crates/obs/src/sinks/kafka.rs b/crates/obs/src/sinks/kafka.rs new file mode 100644 index 00000000..e4ef3419 --- /dev/null +++ b/crates/obs/src/sinks/kafka.rs @@ -0,0 +1,165 @@ +use crate::sinks::Sink; +use crate::{LogRecord, UnifiedLogEntry}; +use async_trait::async_trait; +use std::sync::Arc; + +/// Kafka Sink Implementation +pub struct KafkaSink { + producer: rdkafka::producer::FutureProducer, + topic: String, + batch_size: usize, + batch_timeout_ms: u64, + entries: Arc>>, + last_flush: Arc, +} + +impl KafkaSink { + /// Create a new KafkaSink instance + pub fn new(producer: rdkafka::producer::FutureProducer, topic: String, batch_size: usize, batch_timeout_ms: u64) -> Self { + // Create Arc-wrapped values first + let entries = Arc::new(tokio::sync::Mutex::new(Vec::with_capacity(batch_size))); + let last_flush = Arc::new(std::sync::atomic::AtomicU64::new( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + )); + let sink = KafkaSink { + producer: producer.clone(), + topic: topic.clone(), + batch_size, + batch_timeout_ms, + entries: entries.clone(), + last_flush: last_flush.clone(), + }; + + // Start background flusher + tokio::spawn(Self::periodic_flush(producer, topic, entries, last_flush, batch_timeout_ms)); + + sink + } + + /// Add a getter method to read the batch_timeout_ms field + #[allow(dead_code)] + pub fn batch_timeout(&self) -> u64 { + self.batch_timeout_ms + } + + /// Add a method to dynamically adjust the timeout if needed + #[allow(dead_code)] + pub fn set_batch_timeout(&mut self, new_timeout_ms: u64) { + self.batch_timeout_ms = new_timeout_ms; + } + + async fn periodic_flush( + producer: rdkafka::producer::FutureProducer, + topic: String, + entries: Arc>>, + last_flush: Arc, + timeout_ms: u64, + ) { + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(timeout_ms / 2)).await; + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + let last = last_flush.load(std::sync::atomic::Ordering::Relaxed); + + if now - last >= timeout_ms { + let mut batch = entries.lock().await; + if !batch.is_empty() { + Self::send_batch(&producer, &topic, batch.drain(..).collect()).await; + last_flush.store(now, std::sync::atomic::Ordering::Relaxed); + } + } + } + } + + async fn send_batch(producer: &rdkafka::producer::FutureProducer, topic: &str, entries: Vec) { + for entry in entries { + let payload = match serde_json::to_string(&entry) { + Ok(p) => p, + Err(e) => { + eprintln!("Failed to serialize log entry: {}", e); + continue; + } + }; + + let span_id = entry.get_timestamp().to_rfc3339(); + + let _ = producer + .send( + rdkafka::producer::FutureRecord::to(topic).payload(&payload).key(&span_id), + std::time::Duration::from_secs(5), + ) + .await; + } + } +} + +#[async_trait] +impl Sink for KafkaSink { + async fn write(&self, entry: &UnifiedLogEntry) { + let mut batch = self.entries.lock().await; + batch.push(entry.clone()); + + let should_flush_by_size = batch.len() >= self.batch_size; + let should_flush_by_time = { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let last = self.last_flush.load(std::sync::atomic::Ordering::Relaxed); + now - last >= self.batch_timeout_ms + }; + + if should_flush_by_size || should_flush_by_time { + // Existing flush logic + let entries_to_send: Vec = batch.drain(..).collect(); + let producer = self.producer.clone(); + let topic = self.topic.clone(); + + self.last_flush.store( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + std::sync::atomic::Ordering::Relaxed, + ); + + tokio::spawn(async move { + KafkaSink::send_batch(&producer, &topic, entries_to_send).await; + }); + } + } +} + +impl Drop for KafkaSink { + fn drop(&mut self) { + // Perform any necessary cleanup here + // For example, you might want to flush any remaining entries + let producer = self.producer.clone(); + let topic = self.topic.clone(); + let entries = self.entries.clone(); + let last_flush = self.last_flush.clone(); + + tokio::spawn(async move { + let mut batch = entries.lock().await; + if !batch.is_empty() { + KafkaSink::send_batch(&producer, &topic, batch.drain(..).collect()).await; + last_flush.store( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + std::sync::atomic::Ordering::Relaxed, + ); + } + }); + + eprintln!("Dropping KafkaSink with topic: {}", self.topic); + } +} diff --git a/crates/obs/src/sinks/mod.rs b/crates/obs/src/sinks/mod.rs new file mode 100644 index 00000000..3abafd3f --- /dev/null +++ b/crates/obs/src/sinks/mod.rs @@ -0,0 +1,92 @@ +use crate::{AppConfig, SinkConfig, UnifiedLogEntry}; +use async_trait::async_trait; +use std::sync::Arc; + +#[cfg(feature = "file")] +mod file; +#[cfg(all(feature = "kafka", target_os = "linux"))] +mod kafka; +#[cfg(feature = "webhook")] +mod webhook; + +/// Sink Trait definition, asynchronously write logs +#[async_trait] +pub trait Sink: Send + Sync { + async fn write(&self, entry: &UnifiedLogEntry); +} + +/// Create a list of Sink instances +pub async fn create_sinks(config: &AppConfig) -> Vec> { + let mut sinks: Vec> = Vec::new(); + + for sink_config in &config.sinks { + match sink_config { + #[cfg(all(feature = "kafka", target_os = "linux"))] + SinkConfig::Kafka(kafka_config) => { + match rdkafka::config::ClientConfig::new() + .set("bootstrap.servers", &kafka_config.brokers) + .set("message.timeout.ms", "5000") + .create() + { + Ok(producer) => { + sinks.push(Arc::new(kafka::KafkaSink::new( + producer, + kafka_config.topic.clone(), + kafka_config.batch_size.unwrap_or(100), + kafka_config.batch_timeout_ms.unwrap_or(1000), + ))); + tracing::info!("Kafka sink created for topic: {}", kafka_config.topic); + } + Err(e) => { + tracing::error!("Failed to create Kafka producer: {}", e); + } + } + } + #[cfg(feature = "webhook")] + SinkConfig::Webhook(webhook_config) => { + sinks.push(Arc::new(webhook::WebhookSink::new( + webhook_config.endpoint.clone(), + webhook_config.auth_token.clone(), + webhook_config.max_retries.unwrap_or(3), + webhook_config.retry_delay_ms.unwrap_or(100), + ))); + tracing::info!("Webhook sink created for endpoint: {}", webhook_config.endpoint); + } + + #[cfg(feature = "file")] + SinkConfig::File(file_config) => { + tracing::debug!("FileSink: Using path: {}", file_config.path); + match file::FileSink::new( + file_config.path.clone(), + file_config.buffer_size.unwrap_or(8192), + file_config.flush_interval_ms.unwrap_or(1000), + file_config.flush_threshold.unwrap_or(100), + ) + .await + { + Ok(sink) => { + sinks.push(Arc::new(sink)); + tracing::info!("File sink created for path: {}", file_config.path); + } + Err(e) => { + tracing::error!("Failed to create File sink: {}", e); + } + } + } + #[cfg(any(not(feature = "kafka"), not(target_os = "linux")))] + SinkConfig::Kafka(_) => { + tracing::warn!("Kafka sink is configured but the 'kafka' feature is not enabled"); + } + #[cfg(not(feature = "webhook"))] + SinkConfig::Webhook(_) => { + tracing::warn!("Webhook sink is configured but the 'webhook' feature is not enabled"); + } + #[cfg(not(feature = "file"))] + SinkConfig::File(_) => { + tracing::warn!("File sink is configured but the 'file' feature is not enabled"); + } + } + } + + sinks +} diff --git a/crates/obs/src/sinks/webhook.rs b/crates/obs/src/sinks/webhook.rs new file mode 100644 index 00000000..77a874d9 --- /dev/null +++ b/crates/obs/src/sinks/webhook.rs @@ -0,0 +1,70 @@ +use crate::sinks::Sink; +use crate::UnifiedLogEntry; +use async_trait::async_trait; + +/// Webhook Sink Implementation +pub struct WebhookSink { + endpoint: String, + auth_token: String, + client: reqwest::Client, + max_retries: usize, + retry_delay_ms: u64, +} + +impl WebhookSink { + pub fn new(endpoint: String, auth_token: String, max_retries: usize, retry_delay_ms: u64) -> Self { + WebhookSink { + endpoint, + auth_token, + client: reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build() + .unwrap_or_else(|_| reqwest::Client::new()), + max_retries, + retry_delay_ms, + } + } +} + +#[async_trait] +impl Sink for WebhookSink { + async fn write(&self, entry: &UnifiedLogEntry) { + let mut retries = 0; + let url = self.endpoint.clone(); + let entry_clone = entry.clone(); + let auth_value = reqwest::header::HeaderValue::from_str(format!("Bearer {}", self.auth_token.clone()).as_str()).unwrap(); + while retries < self.max_retries { + match self + .client + .post(&url) + .header(reqwest::header::AUTHORIZATION, auth_value.clone()) + .json(&entry_clone) + .send() + .await + { + Ok(response) if response.status().is_success() => { + return; + } + _ => { + retries += 1; + if retries < self.max_retries { + tokio::time::sleep(tokio::time::Duration::from_millis( + self.retry_delay_ms * (1 << retries), // Exponential backoff + )) + .await; + } + } + } + } + + eprintln!("Failed to send log to webhook after {} retries", self.max_retries); + } +} + +impl Drop for WebhookSink { + fn drop(&mut self) { + // Perform any necessary cleanup here + // For example, you might want to log that the sink is being dropped + eprintln!("Dropping WebhookSink with URL: {}", self.endpoint); + } +} diff --git a/crates/obs/src/worker.rs b/crates/obs/src/worker.rs index 2d7ee2e1..aee1695d 100644 --- a/crates/obs/src/worker.rs +++ b/crates/obs/src/worker.rs @@ -1,4 +1,4 @@ -use crate::{sink::Sink, UnifiedLogEntry}; +use crate::{sinks::Sink, UnifiedLogEntry}; use std::sync::Arc; use tokio::sync::mpsc::Receiver; diff --git a/crates/utils/src/certs.rs b/crates/utils/src/certs.rs index 568fc6b6..021c5915 100644 --- a/crates/utils/src/certs.rs +++ b/crates/utils/src/certs.rs @@ -20,7 +20,7 @@ pub fn load_certs(filename: &str) -> io::Result>> { // Load and return certificate. let certs = certs(&mut reader) .collect::, _>>() - .map_err(|_| certs_error(format!("certificate file {} format error", filename)))?; + .map_err(|e| certs_error(format!("certificate file {} format error:{:?}", filename, e)))?; if certs.is_empty() { return Err(certs_error(format!( "No valid certificate was found in the certificate file {}", @@ -165,7 +165,7 @@ pub fn create_multi_cert_resolver( for (domain, (certs, key)) in cert_key_pairs { // create a signature let signing_key = rustls::crypto::aws_lc_rs::sign::any_supported_type(&key) - .map_err(|_| certs_error(format!("unsupported private key types:{}", domain)))?; + .map_err(|e| certs_error(format!("unsupported private key types:{}, err:{:?}", domain, e)))?; // create a CertifiedKey let certified_key = CertifiedKey::new(certs, signing_key); diff --git a/crypto/Cargo.toml b/crypto/Cargo.toml index 5b0f5896..7822ee5a 100644 --- a/crypto/Cargo.toml +++ b/crypto/Cargo.toml @@ -17,7 +17,7 @@ chacha20poly1305 = { version = "0.10.1", optional = true } jsonwebtoken = { workspace = true } pbkdf2 = { version = "0.12.2", optional = true } rand = { workspace = true, optional = true } -sha2 = { version = "0.10.8", optional = true } +sha2 = { workspace = true, optional = true } thiserror.workspace = true serde_json.workspace = true diff --git a/deploy/config/.example.obs.env b/deploy/config/.example.obs.env index a1ea67b4..edbf3e88 100644 --- a/deploy/config/.example.obs.env +++ b/deploy/config/.example.obs.env @@ -1,27 +1,28 @@ -OBSERVABILITY__ENDPOINT=http://localhost:4317 -OBSERVABILITY__USE_STDOUT=true -OBSERVABILITY__SAMPLE_RATIO=2.0 -OBSERVABILITY__METER_INTERVAL=30 -OBSERVABILITY__SERVICE_NAME=rustfs -OBSERVABILITY__SERVICE_VERSION=0.1.0 -OBSERVABILITY__ENVIRONMENT=develop -OBSERVABILITY__LOGGER_LEVEL=debug - -SINKS__KAFKA__ENABLED=false -SINKS__KAFKA__BOOTSTRAP_SERVERS=localhost:9092 -SINKS__KAFKA__TOPIC=logs -SINKS__KAFKA__BATCH_SIZE=100 -SINKS__KAFKA__BATCH_TIMEOUT_MS=1000 - -SINKS__WEBHOOK__ENABLED=false -SINKS__WEBHOOK__ENDPOINT=http://localhost:8080/webhook -SINKS__WEBHOOK__AUTH_TOKEN= -SINKS__WEBHOOK__BATCH_SIZE=100 -SINKS__WEBHOOK__BATCH_TIMEOUT_MS=1000 - -SINKS__FILE__ENABLED=true -SINKS__FILE__PATH=./deploy/logs/rustfs.log -SINKS__FILE__BATCH_SIZE=10 -SINKS__FILE__BATCH_TIMEOUT_MS=1000 - -LOGGER__QUEUE_CAPACITY=10 \ No newline at end of file +#RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317 +#RUSTFS__OBSERVABILITY__USE_STDOUT=true +#RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0 +#RUSTFS__OBSERVABILITY__METER_INTERVAL=30 +#RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs +#RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0 +#RUSTFS__OBSERVABILITY__ENVIRONMENT=develop +#RUSTFS__OBSERVABILITY__LOGGER_LEVEL=debug +# +#RUSTFS__SINKS_0__type=Kakfa +#RUSTFS__SINKS_0__brokers=localhost:9092 +#RUSTFS__SINKS_0__topic=logs +#RUSTFS__SINKS_0__batch_size=100 +#RUSTFS__SINKS_0__batch_timeout_ms=1000 +# +#RUSTFS__SINKS_1__type=Webhook +#RUSTFS__SINKS_1__endpoint=http://localhost:8080/webhook +#RUSTFS__SINKS_1__auth_token=you-auth-token +#RUSTFS__SINKS_1__batch_size=100 +#RUSTFS__SINKS_1__batch_timeout_ms=1000 +# +#RUSTFS__SINKS_2__type=File +#RUSTFS__SINKS_2__path=./deploy/logs/rustfs.log +#RUSTFS__SINKS_2__buffer_size=10 +#RUSTFS__SINKS_2__flush_interval_ms=1000 +#RUSTFS__SINKS_2__flush_threshold=100 +# +#RUSTFS__LOGGER__QUEUE_CAPACITY=10 \ No newline at end of file diff --git a/deploy/config/obs-zh.example.toml b/deploy/config/obs-zh.example.toml index 712ae6ce..0474391b 100644 --- a/deploy/config/obs-zh.example.toml +++ b/deploy/config/obs-zh.example.toml @@ -9,26 +9,26 @@ environments = "develop" # 运行环境,如开发环境 (develop) logger_level = "debug" # 日志级别,可选 debug/info/warn/error 等 local_logging_enabled = true # 是否启用本地 stdout 日志记录,true 表示启用,false 表示禁用 -[sinks] -[sinks.kafka] # Kafka 接收器配置 -enabled = false # 是否启用 Kafka 接收器,默认禁用 -bootstrap_servers = "localhost:9092" # Kafka 服务器地址 -topic = "logs" # Kafka 主题名称 -batch_size = 100 # 批处理大小,每次发送的消息数量 -batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒 +#[[sinks]] # Kafka 接收器配置 +#type = "Kafka" # 是否启用 Kafka 接收器,默认禁用 +#brokers = "localhost:9092" # Kafka 服务器地址 +#topic = "logs" # Kafka 主题名称 +#batch_size = 100 # 批处理大小,每次发送的消息数量 +#batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒 -[sinks.webhook] # Webhook 接收器配置 -enabled = false # 是否启用 Webhook 接收器 -endpoint = "http://localhost:8080/webhook" # Webhook 接收地址 -auth_token = "" # 认证令牌 -batch_size = 100 # 批处理大小 -batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒 +#[[sinks]] # Webhook 接收器配置 +#type = "Webhook" # 是否启用 Webhook 接收器 +#endpoint = "http://localhost:8080/webhook" # Webhook 接收地址 +#auth_token = "" # 认证令牌 +#batch_size = 100 # 批处理大小 +#batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒 -[sinks.file] # 文件接收器配置 -enabled = true # 是否启用文件接收器 +[[sinks]] # 文件接收器配置 +type = "File" # 是否启用文件接收器 path = "./deploy/logs/rustfs.log" # 日志文件路径 -batch_size = 10 # 批处理大小 -batch_timeout_ms = 1000 # 批处理超时时间,单位为毫秒 +buffer_size = 10 # 缓冲区大小,表示每次写入的字节数 +flush_interval_ms = 100 # 批处理超时时间,单位为毫秒 +flush_threshold = 100 # 刷新阈值,表示在达到该数量时刷新日志 [logger] # 日志器配置 queue_capacity = 10 # 日志队列容量,表示可以缓存的日志条数 \ No newline at end of file diff --git a/deploy/config/obs.example.toml b/deploy/config/obs.example.toml index e38e06f8..e6c89833 100644 --- a/deploy/config/obs.example.toml +++ b/deploy/config/obs.example.toml @@ -4,31 +4,31 @@ use_stdout = false # Output with stdout, true output, false no output sample_ratio = 2.0 meter_interval = 30 service_name = "rustfs" -service_version = "0.1.0" +service_version = "0.0.1" environment = "develop" -logger_level = "error" +logger_level = "info" local_logging_enabled = true -[sinks] -[sinks.kafka] # Kafka sink is disabled by default -enabled = false -bootstrap_servers = "localhost:9092" -topic = "logs" -batch_size = 100 # Default is 100 if not specified -batch_timeout_ms = 1000 # Default is 1000ms if not specified +#[[sinks]] +#type = "Kafka" +#brokers = "localhost:9092" +#topic = "logs" +#batch_size = 100 # Default is 100 if not specified +#batch_timeout_ms = 100 # Default is 1000ms if not specified +# +#[[sinks]] +#type = "Webhook" +#endpoint = "http://localhost:8080/webhook" +#auth_token = "" +#batch_size = 100 # Default is 3 if not specified +#batch_timeout_ms = 100 # Default is 100ms if not specified -[sinks.webhook] -enabled = false -endpoint = "http://localhost:8080/webhook" -auth_token = "" -batch_size = 100 # Default is 3 if not specified -batch_timeout_ms = 1000 # Default is 100ms if not specified - -[sinks.file] -enabled = true -path = "./deploy/logs/rustfs.log" -batch_size = 100 -batch_timeout_ms = 1000 # Default is 8192 bytes if not specified +[[sinks]] +type = "File" +path = "deploy/logs/rustfs.log" +buffer_size = 101 # Default is 8192 bytes if not specified +flush_interval_ms = 1000 +flush_threshold = 100 [logger] queue_capacity = 10000 diff --git a/deploy/config/rustfs-zh.env b/deploy/config/rustfs-zh.env index b9700adb..fd2a5178 100644 --- a/deploy/config/rustfs-zh.env +++ b/deploy/config/rustfs-zh.env @@ -23,4 +23,6 @@ RUSTFS_LICENSE="license content" # 可观测性配置文件路径:deploy/config/obs.example.toml RUSTFS_OBS_CONFIG=/etc/default/obs.toml # TLS 证书目录路径:deploy/certs -RUSTFS_TLS_PATH=/etc/default/tls \ No newline at end of file +RUSTFS_TLS_PATH=/etc/default/tls +# 事件通知配置文件路径:deploy/config/event.example.toml +RUSTFS_EVENT_CONFIG=/etc/default/event.toml \ No newline at end of file diff --git a/deploy/config/rustfs.env b/deploy/config/rustfs.env index ec7d31b4..3f50033f 100644 --- a/deploy/config/rustfs.env +++ b/deploy/config/rustfs.env @@ -23,4 +23,6 @@ RUSTFS_LICENSE="license content" # Observability configuration file path: deploy/config/obs.example.toml RUSTFS_OBS_CONFIG=/etc/default/obs.toml # TLS certificates directory path: deploy/certs -RUSTFS_TLS_PATH=/etc/default/tls \ No newline at end of file +RUSTFS_TLS_PATH=/etc/default/tls +# event notification configuration file path: deploy/config/event.example.toml +RUSTFS_EVENT_CONFIG=/etc/default/event.toml \ No newline at end of file diff --git a/docker-compose-obs.yaml b/docker-compose-obs.yaml index f6d85b44..a709587b 100644 --- a/docker-compose-obs.yaml +++ b/docker-compose-obs.yaml @@ -83,7 +83,7 @@ services: dockerfile: Dockerfile.obs container_name: node2 environment: - - RUSTFS_VOLUMES=/root/data/target/volume/test{1...4} + - RUSTFS_VOLUMES=http://node{1...4}:9000/root/data/target/volume/test{1...4} - RUSTFS_ADDRESS=:9000 - RUSTFS_CONSOLE_ENABLE=true - RUSTFS_CONSOLE_ADDRESS=:9002 diff --git a/ecstore/src/config/com.rs b/ecstore/src/config/com.rs index 383359ea..43e48c6d 100644 --- a/ecstore/src/config/com.rs +++ b/ecstore/src/config/com.rs @@ -204,7 +204,7 @@ async fn apply_dynamic_config_for_sub_sys(cfg: &mut Config, api: } } Err(err) => { - error!("init storageclass err:{:?}", &err); + error!("init storage class err:{:?}", &err); break; } } diff --git a/ecstore/src/config/mod.rs b/ecstore/src/config/mod.rs index ffe00477..10a5d50f 100644 --- a/ecstore/src/config/mod.rs +++ b/ecstore/src/config/mod.rs @@ -141,7 +141,7 @@ impl Config { } pub fn merge(&self) -> Config { - // TODO: merge defauls + // TODO: merge default self.clone() } } @@ -158,6 +158,6 @@ pub fn register_default_kvs(kvs: HashMap) { pub fn init() { let mut kvs = HashMap::new(); kvs.insert(STORAGE_CLASS_SUB_SYS.to_owned(), storageclass::DefaultKVS.clone()); - // TODO: other defauls + // TODO: other default register_default_kvs(kvs) } diff --git a/ecstore/src/config/storageclass.rs b/ecstore/src/config/storageclass.rs index edb2095f..ef5199fc 100644 --- a/ecstore/src/config/storageclass.rs +++ b/ecstore/src/config/storageclass.rs @@ -8,8 +8,8 @@ use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use tracing::warn; -// default_partiy_count 默认配置,根据磁盘总数分配校验磁盘数量 -pub fn default_partiy_count(drive: usize) -> usize { +// default_parity_count 默认配置,根据磁盘总数分配校验磁盘数量 +pub fn default_parity_count(drive: usize) -> usize { match drive { 1 => 0, 2 | 3 => 1, @@ -158,7 +158,7 @@ pub fn lookup_config(kvs: &KVS, set_drive_count: usize) -> Result { parse_storage_class(&ssc_str)? } else { StorageClass { - parity: default_partiy_count(set_drive_count), + parity: default_parity_count(set_drive_count), } } }; diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index dfeeeb87..45ee477a 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -48,6 +48,7 @@ use tokio::{select, spawn}; use tokio_stream::wrappers::ReceiverStream; use tracing::{error, info, warn}; +pub mod event; pub mod group; pub mod policys; pub mod pools; diff --git a/rustfs/src/admin/handlers/event.rs b/rustfs/src/admin/handlers/event.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/rustfs/src/admin/handlers/event.rs @@ -0,0 +1 @@ + diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index 57293d4e..f0229f20 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -25,7 +25,7 @@ pub fn make_admin_route() -> Result { r.insert(Method::POST, "/", AdminOperation(&sts::AssumeRoleHandle {}))?; regist_rpc_route(&mut r)?; - regist_user_route(&mut r)?; + register_user_route(&mut r)?; r.insert( Method::POST, @@ -124,7 +124,7 @@ pub fn make_admin_route() -> Result { Ok(r) } -fn regist_user_route(r: &mut S3Router) -> Result<()> { +fn register_user_route(r: &mut S3Router) -> Result<()> { // 1 r.insert( Method::GET, diff --git a/scripts/run.sh b/scripts/run.sh index 59703af9..58d0f264 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -39,24 +39,35 @@ export RUSTFS_CONSOLE_ADDRESS=":9002" export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml" # 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值 -export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317 -export RUSTFS__OBSERVABILITY__USE_STDOUT=false -export RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0 -export RUSTFS__OBSERVABILITY__METER_INTERVAL=30 -export RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs -export RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0 -export RUSTFS__OBSERVABILITY__ENVIRONMENT=develop -export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=debug -export RUSTFS__OBSERVABILITY__LOCAL_LOGGING_ENABLED=true -export RUSTFS__SINKS__FILE__ENABLED=true -export RUSTFS__SINKS__FILE__PATH="./deploy/logs/rustfs.log" -export RUSTFS__SINKS__WEBHOOK__ENABLED=false -export RUSTFS__SINKS__WEBHOOK__ENDPOINT="" -export RUSTFS__SINKS__WEBHOOK__AUTH_TOKEN="" -export RUSTFS__SINKS__KAFKA__ENABLED=false -export RUSTFS__SINKS__KAFKA__BOOTSTRAP_SERVERS="" -export RUSTFS__SINKS__KAFKA__TOPIC="" -export RUSTFS__LOGGER__QUEUE_CAPACITY=10 +#export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317 +#export RUSTFS__OBSERVABILITY__USE_STDOUT=false +#export RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0 +#export RUSTFS__OBSERVABILITY__METER_INTERVAL=31 +#export RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs +#export RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0 +#export RUSTFS__OBSERVABILITY__ENVIRONMENT=develop +#export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=debug +#export RUSTFS__OBSERVABILITY__LOCAL_LOGGING_ENABLED=true +# +#export RUSTFS__SINKS_0__type=File +#export RUSTFS__SINKS_0__path=./deploy/logs/rustfs.log +#export RUSTFS__SINKS_0__buffer_size=12 +#export RUSTFS__SINKS_0__flush_interval_ms=1000 +#export RUSTFS__SINKS_0__flush_threshold=100 +# +#export RUSTFS__SINKS_1__type=Kakfa +#export RUSTFS__SINKS_1__brokers=localhost:9092 +#export RUSTFS__SINKS_1__topic=logs +#export RUSTFS__SINKS_1__batch_size=100 +#export RUSTFS__SINKS_1__batch_timeout_ms=1000 +# +#export RUSTFS__SINKS_2__type=Webhook +#export RUSTFS__SINKS_2__endpoint=http://localhost:8080/webhook +#export RUSTFS__SINKS_2__auth_token=you-auth-token +#export RUSTFS__SINKS_2__batch_size=100 +#export RUSTFS__SINKS_2__batch_timeout_ms=1000 +# +#export RUSTFS__LOGGER__QUEUE_CAPACITY=10 export OTEL_INSTRUMENTATION_NAME="rustfs" export OTEL_INSTRUMENTATION_VERSION="0.1.1" @@ -64,13 +75,13 @@ export OTEL_INSTRUMENTATION_SCHEMA_URL="https://opentelemetry.io/schemas/1.31.0" export OTEL_INSTRUMENTATION_ATTRIBUTES="env=production" # 事件消息配置 -export RUSTFS_EVENT_CONFIG="./deploy/config/event.example.toml" +#export RUSTFS_EVENT_CONFIG="./deploy/config/event.example.toml" if [ -n "$1" ]; then export RUSTFS_VOLUMES="$1" fi # 启动 webhook 服务器 -cargo run --example webhook -p rustfs-event-notifier & +#cargo run --example webhook -p rustfs-event-notifier & # 启动主服务 cargo run --bin rustfs \ No newline at end of file