From 541b812bb4e1c1ca640a4d025a23194dcde07e1f Mon Sep 17 00:00:00 2001 From: houseme Date: Fri, 30 May 2025 13:04:24 +0800 Subject: [PATCH] improve code for `obs` crate --- crates/obs/examples/server.rs | 7 ++-- crates/obs/src/config.rs | 63 +++++++++++--------------------- crates/obs/src/global.rs | 65 +++++++++++++++++++++++++++++++-- crates/obs/src/lib.rs | 69 ++--------------------------------- crates/obs/src/logger.rs | 10 +++-- crates/obs/src/system/mod.rs | 34 +++++++++-------- crates/obs/src/telemetry.rs | 23 +++--------- rustfs/src/main.rs | 9 ++--- 8 files changed, 122 insertions(+), 158 deletions(-) diff --git a/crates/obs/examples/server.rs b/crates/obs/examples/server.rs index 0310c158..f010581d 100644 --- a/crates/obs/examples/server.rs +++ b/crates/obs/examples/server.rs @@ -1,5 +1,5 @@ use opentelemetry::global; -use rustfs_obs::{get_logger, init_obs, init_process_observer, load_config, log_info, BaseLogEntry, ServerLogEntry}; +use rustfs_obs::{get_logger, init_obs, log_info, BaseLogEntry, ServerLogEntry, SystemObserver}; use std::collections::HashMap; use std::time::{Duration, SystemTime}; use tracing::{error, info, instrument}; @@ -8,8 +8,7 @@ use tracing_core::Level; #[tokio::main] async fn main() { let obs_conf = Some("crates/obs/examples/config.toml".to_string()); - let config = load_config(obs_conf); - let (_logger, _guard) = init_obs(config.clone()).await; + let (_logger, _guard) = init_obs(obs_conf).await; let span = tracing::span!(Level::INFO, "main"); let _enter = span.enter(); info!("Program starts"); @@ -38,7 +37,7 @@ async fn run(bucket: String, object: String, user: String, service_name: String) &[opentelemetry::KeyValue::new("operation", "run")], ); - match init_process_observer(meter).await { + match SystemObserver::init_process_observer(meter).await { Ok(_) => info!("Process observer initialized successfully"), Err(e) => error!("Failed to initialize process observer: {:?}", e), } diff --git a/crates/obs/src/config.rs b/crates/obs/src/config.rs index 5284e301..9e8562ab 100644 --- a/crates/obs/src/config.rs +++ b/crates/obs/src/config.rs @@ -24,12 +24,12 @@ pub struct OtelConfig { pub environment: Option, // Environment pub logger_level: Option, // Logger level pub local_logging_enabled: Option, // Local logging enabled - // 新增 flexi_logger 相关配置 - pub log_directory: Option, // 日志文件目录 - pub log_filename: Option, // 日志文件名称 - pub log_rotation_size_mb: Option, // 日志文件大小切割阈值 (MB) - pub log_rotation_time: Option, // 日志按时间切割 (Hour, Day) - pub log_keep_files: Option, // 保留日志文件数量 + // Added flexi_logger related configurations + pub log_directory: Option, // LOG FILE DIRECTORY + pub log_filename: Option, // The name of the log file + pub log_rotation_size_mb: Option, // Log file size cut threshold (MB) + pub log_rotation_time: Option, // Logs are cut by time (Hour, Day,Minute, Second) + pub log_keep_files: Option, // Number of log files to be retained } impl OtelConfig { @@ -132,6 +132,12 @@ pub struct KafkaSinkConfig { impl KafkaSinkConfig { pub fn new() -> Self { + Self::default() + } +} + +impl Default for KafkaSinkConfig { + fn default() -> Self { Self { brokers: env::var("RUSTFS_SINKS_KAFKA_BROKERS") .ok() @@ -140,19 +146,13 @@ impl KafkaSinkConfig { topic: env::var("RUSTFS_SINKS_KAFKA_TOPIC") .ok() .filter(|s| !s.trim().is_empty()) - .unwrap_or_else(|| "default_topic".to_string()), + .unwrap_or_else(|| "rustfs_sink".to_string()), batch_size: Some(100), batch_timeout_ms: Some(1000), } } } -impl Default for KafkaSinkConfig { - fn default() -> Self { - Self::new() - } -} - /// Webhook Sink Configuration - Add Retry Parameters #[derive(Debug, Deserialize, Serialize, Clone)] pub struct WebhookSinkConfig { @@ -178,7 +178,7 @@ impl Default for WebhookSinkConfig { auth_token: env::var("RUSTFS_SINKS_WEBHOOK_AUTH_TOKEN") .ok() .filter(|s| !s.trim().is_empty()) - .unwrap_or_else(|| "default_token".to_string()), + .unwrap_or_else(|| "rustfs_webhook_token".to_string()), max_retries: Some(3), retry_delay_ms: Some(100), } @@ -209,6 +209,12 @@ impl FileSinkConfig { .to_string() } pub fn new() -> Self { + Self::default() + } +} + +impl Default for FileSinkConfig { + fn default() -> Self { Self { path: env::var("RUSTFS_SINKS_FILE_PATH") .ok() @@ -230,12 +236,6 @@ impl FileSinkConfig { } } -impl Default for FileSinkConfig { - fn default() -> Self { - Self::new() - } -} - /// Sink configuration collection #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] @@ -287,9 +287,8 @@ impl Default for LoggerConfig { /// # Example /// ``` /// use rustfs_obs::AppConfig; -/// use rustfs_obs::load_config; /// -/// let config = load_config(None); +/// let config = AppConfig::new_with_endpoint(None); /// ``` #[derive(Debug, Deserialize, Clone)] pub struct AppConfig { @@ -326,23 +325,3 @@ impl Default for AppConfig { Self::new() } } - -/// Loading the configuration file -/// Supports TOML, YAML and .env formats, read in order by priority -/// -/// # Parameters -/// - `config_dir`: Configuration file path -/// -/// # Returns -/// Configuration information -/// -/// # Example -/// ``` -/// use rustfs_obs::AppConfig; -/// use rustfs_obs::load_config; -/// -/// let config = load_config(None); -/// ``` -pub fn load_config(config: Option) -> AppConfig { - AppConfig::new_with_endpoint(config) -} diff --git a/crates/obs/src/global.rs b/crates/obs/src/global.rs index 0593e0a8..7d15290a 100644 --- a/crates/obs/src/global.rs +++ b/crates/obs/src/global.rs @@ -1,4 +1,6 @@ -use crate::telemetry::OtelGuard; +use crate::logger::InitLogStatus; +use crate::telemetry::{init_telemetry, OtelGuard}; +use crate::{get_global_logger, init_global_logger, AppConfig, Logger}; use std::sync::{Arc, Mutex}; use tokio::sync::{OnceCell, SetError}; use tracing::{error, info}; @@ -31,6 +33,62 @@ pub enum GlobalError { Timeout(&'static str), } +/// Initialize the observability module +/// +/// # Parameters +/// - `config`: Configuration information +/// +/// # Returns +/// A tuple containing the logger and the telemetry guard +/// +/// # Example +/// ```no_run +/// use rustfs_obs::init_obs; +/// +/// # #[tokio::main] +/// # async fn main() { +/// let (logger, guard) = init_obs(None).await; +/// # } +/// ``` +pub async fn init_obs(endpoint: Option) -> (Arc>, OtelGuard) { + // Load the configuration file + let config = AppConfig::new_with_endpoint(endpoint); + + let guard = init_telemetry(&config.observability); + + let logger = init_global_logger(&config).await; + let obs_config = config.observability.clone(); + tokio::spawn(async move { + let result = InitLogStatus::init_start_log(&obs_config).await; + match result { + Ok(_) => { + info!("Logger initialized successfully"); + } + Err(e) => { + error!("Failed to initialize logger: {}", e); + } + } + }); + + (logger, guard) +} + +/// Get the global logger instance +/// This function returns a reference to the global logger instance. +/// +/// # Returns +/// A reference to the global logger instance +/// +/// # Example +/// ```no_run +/// use rustfs_obs::get_logger; +/// +/// let logger = get_logger(); +/// ``` +pub fn get_logger() -> &'static Arc> { + get_global_logger() +} + /// Set the global guard for OpenTelemetry /// /// # Arguments @@ -42,11 +100,10 @@ pub enum GlobalError { /// /// # Example /// ```rust -/// use rustfs_obs::{init_telemetry, load_config, set_global_guard}; +/// use rustfs_obs::{ init_obs, set_global_guard}; /// /// fn init() -> Result<(), Box> { -/// let config = load_config(None); -/// let guard = init_telemetry(&config.observability); +/// let guard = init_obs(None); /// set_global_guard(guard)?; /// Ok(()) /// } diff --git a/crates/obs/src/lib.rs b/crates/obs/src/lib.rs index 4a25f1b0..68d2e272 100644 --- a/crates/obs/src/lib.rs +++ b/crates/obs/src/lib.rs @@ -23,12 +23,11 @@ /// ## Usage /// /// ```no_run -/// use rustfs_obs::{AppConfig, init_obs}; +/// use rustfs_obs::init_obs; /// /// # #[tokio::main] /// # async fn main() { -/// let config = AppConfig::default(); -/// let (logger, guard) = init_obs(config).await; +/// let (logger, guard) = init_obs(None).await; /// # } /// ``` mod config; @@ -40,74 +39,14 @@ mod system; mod telemetry; mod worker; -use crate::logger::InitLogStatus; -pub use config::load_config; pub use config::{AppConfig, LoggerConfig, OtelConfig, SinkConfig}; pub use entry::args::Args; pub use entry::audit::{ApiDetails, AuditLogEntry}; pub use entry::base::BaseLogEntry; pub use entry::unified::{ConsoleLogEntry, ServerLogEntry, UnifiedLogEntry}; pub use entry::{LogKind, LogRecord, ObjectVersion, SerializableLevel}; -pub use global::{get_global_guard, set_global_guard, try_get_global_guard, GlobalError}; +pub use global::*; pub use logger::Logger; pub use logger::{get_global_logger, init_global_logger, start_logger}; pub use logger::{log_debug, log_error, log_info, log_trace, log_warn, log_with_context}; -use std::sync::Arc; -pub use system::{init_process_observer, init_process_observer_for_pid}; -pub use telemetry::init_telemetry; -use tokio::sync::Mutex; -use tracing::{error, info}; - -/// Initialize the observability module -/// -/// # Parameters -/// - `config`: Configuration information -/// -/// # Returns -/// A tuple containing the logger and the telemetry guard -/// -/// # Example -/// ```no_run -/// use rustfs_obs::{AppConfig, init_obs}; -/// -/// # #[tokio::main] -/// # async fn main() { -/// let config = AppConfig::default(); -/// let (logger, guard) = init_obs(config).await; -/// # } -/// ``` -pub async fn init_obs(config: AppConfig) -> (Arc>, telemetry::OtelGuard) { - let guard = init_telemetry(&config.observability); - let sinks = sinks::create_sinks(&config).await; - let logger = init_global_logger(&config, sinks).await; - let obs_config = config.observability.clone(); - tokio::spawn(async move { - let result = InitLogStatus::init_start_log(&obs_config).await; - match result { - Ok(_) => { - info!("Logger initialized successfully"); - } - Err(e) => { - error!("Failed to initialize logger: {}", e); - } - } - }); - - (logger, guard) -} - -/// Get the global logger instance -/// This function returns a reference to the global logger instance. -/// -/// # Returns -/// A reference to the global logger instance -/// -/// # Example -/// ```no_run -/// use rustfs_obs::get_logger; -/// -/// let logger = get_logger(); -/// ``` -pub fn get_logger() -> &'static Arc> { - get_global_logger() -} +pub use system::SystemObserver; diff --git a/crates/obs/src/logger.rs b/crates/obs/src/logger.rs index 462c2c4c..2ba498b7 100644 --- a/crates/obs/src/logger.rs +++ b/crates/obs/src/logger.rs @@ -1,5 +1,7 @@ use crate::sinks::Sink; -use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, GlobalError, OtelConfig, ServerLogEntry, UnifiedLogEntry}; +use crate::{ + sinks, AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, GlobalError, OtelConfig, ServerLogEntry, UnifiedLogEntry, +}; use rustfs_config::{APP_NAME, ENVIRONMENT, SERVICE_VERSION}; use std::sync::Arc; use std::time::SystemTime; @@ -253,10 +255,10 @@ pub fn start_logger(config: &AppConfig, sinks: Vec>) -> Logger { /// use rustfs_obs::{AppConfig,init_global_logger}; /// /// let config = AppConfig::default(); -/// let sinks = vec![]; -/// let logger = init_global_logger(&config, sinks); +/// let logger = init_global_logger(&config); /// ``` -pub async fn init_global_logger(config: &AppConfig, sinks: Vec>) -> Arc> { +pub async fn init_global_logger(config: &AppConfig) -> Arc> { + let sinks = sinks::create_sinks(config).await; let logger = Arc::new(Mutex::new(start_logger(config, sinks))); GLOBAL_LOGGER.set(logger.clone()).expect("Logger already initialized"); logger diff --git a/crates/obs/src/system/mod.rs b/crates/obs/src/system/mod.rs index a69bc343..c8920612 100644 --- a/crates/obs/src/system/mod.rs +++ b/crates/obs/src/system/mod.rs @@ -5,20 +5,24 @@ mod collector; pub(crate) mod gpu; pub(crate) mod metrics; -/// Initialize the indicator collector for the current process -/// This function will create a new `Collector` instance and start collecting metrics. -/// It will run indefinitely until the process is terminated. -pub async fn init_process_observer(meter: opentelemetry::metrics::Meter) -> Result<(), GlobalError> { - let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?; - let mut collector = collector::Collector::new(pid, meter, 30000)?; - collector.run().await -} +pub struct SystemObserver {} -/// Initialize the metric collector for the specified PID process -/// This function will create a new `Collector` instance and start collecting metrics. -/// It will run indefinitely until the process is terminated. -pub async fn init_process_observer_for_pid(meter: opentelemetry::metrics::Meter, pid: u32) -> Result<(), GlobalError> { - let pid = sysinfo::Pid::from_u32(pid); - let mut collector = collector::Collector::new(pid, meter, 30000)?; - collector.run().await +impl SystemObserver { + /// Initialize the indicator collector for the current process + /// This function will create a new `Collector` instance and start collecting metrics. + /// It will run indefinitely until the process is terminated. + pub async fn init_process_observer(meter: opentelemetry::metrics::Meter) -> Result<(), GlobalError> { + let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?; + let mut collector = collector::Collector::new(pid, meter, 30000)?; + collector.run().await + } + + /// Initialize the metric collector for the specified PID process + /// This function will create a new `Collector` instance and start collecting metrics. + /// It will run indefinitely until the process is terminated. + pub async fn init_process_observer_for_pid(meter: opentelemetry::metrics::Meter, pid: u32) -> Result<(), GlobalError> { + let pid = sysinfo::Pid::from_u32(pid); + let mut collector = collector::Collector::new(pid, meter, 30000)?; + collector.run().await + } } diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index 69966e19..5cb86f09 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -1,4 +1,3 @@ -// Added flexi_logger related dependencies use crate::OtelConfig; use flexi_logger::{style, Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode}; use nu_ansi_term::Color; @@ -17,7 +16,8 @@ use opentelemetry_semantic_conventions::{ SCHEMA_URL, }; use rustfs_config::{ - APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT, + APP_NAME, DEFAULT_LOG_DIR, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, + SERVICE_VERSION, USE_STDOUT, }; use rustfs_utils::get_local_ip_with_default; use smallvec::SmallVec; @@ -39,19 +39,6 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilte /// - The tracer provider (for distributed tracing) /// - The meter provider (for metrics collection) /// - The logger provider (for structured logging) -/// -/// # Example -/// -/// ``` -/// use rustfs_obs::{init_telemetry, OtelConfig}; -/// -/// let config = OtelConfig::default(); -/// let otel_guard = init_telemetry(&config); -/// -/// // The guard is kept alive for the duration of the application -/// // When it's dropped, all telemetry components are properly shut down -/// drop(otel_guard); -/// ``` // Implement Debug trait correctly, rather than using derive, as some fields may not have implemented Debug pub struct OtelGuard { tracer_provider: Option, @@ -123,7 +110,7 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader OtelGuard { +pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard { // avoid repeated access to configuration fields let endpoint = &config.endpoint; let use_stdout = config.use_stdout.unwrap_or(USE_STDOUT); @@ -289,7 +276,7 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard { } } else { // Obtain the log directory and file name configuration - let log_directory = config.log_directory.as_deref().unwrap_or("logs"); + let log_directory = config.log_directory.as_deref().unwrap_or(DEFAULT_LOG_DIR); let log_filename = config.log_filename.as_deref().unwrap_or(service_name); // Build log cutting conditions @@ -349,7 +336,7 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard { .basename(log_filename) .suffix("log"), ) - .rotate(rotation_criterion, Naming::Timestamps, Cleanup::KeepLogFiles(keep_files.into())) + .rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files.into())) .format_for_files(format_for_file) // Add a custom formatting function for file output .duplicate_to_stdout(level_filter) // Use dynamic levels .format_for_stdout(format_with_color) // Add a custom formatting function for terminal output diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index b3192be3..8b53801f 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -48,7 +48,7 @@ use iam::init_iam_sys; use license::init_license; use protos::proto_gen::node_service::node_service_server::NodeServiceServer; use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; -use rustfs_obs::{init_obs, init_process_observer, load_config, set_global_guard}; +use rustfs_obs::{init_obs, set_global_guard, SystemObserver}; use rustls::ServerConfig; use s3s::{host::MultiDomain, service::S3ServiceBuilder}; use service::hybrid; @@ -102,11 +102,8 @@ async fn main() -> Result<()> { // Initialize the configuration init_license(opt.license.clone()); - // Load the configuration file - let config = load_config(Some(opt.clone().obs_endpoint)); - // Initialize Observability - let (_logger, guard) = init_obs(config.clone()).await; + let (_logger, guard) = init_obs(Some(opt.clone().obs_endpoint)).await; // Store in global storage set_global_guard(guard)?; @@ -233,7 +230,7 @@ async fn run(opt: config::Opt) -> Result<()> { tokio::spawn(async move { // Record the PID-related metrics of the current process let meter = opentelemetry::global::meter("system"); - let obs_result = init_process_observer(meter).await; + let obs_result = SystemObserver::init_process_observer(meter).await; match obs_result { Ok(_) => { info!("Process observer initialized successfully");