improve code for obs crate

This commit is contained in:
houseme
2025-05-30 13:04:24 +08:00
parent d3e2c9cb86
commit 541b812bb4
8 changed files with 122 additions and 158 deletions

View File

@@ -1,5 +1,5 @@
use opentelemetry::global;
use rustfs_obs::{get_logger, init_obs, init_process_observer, load_config, log_info, BaseLogEntry, ServerLogEntry};
use rustfs_obs::{get_logger, init_obs, log_info, BaseLogEntry, ServerLogEntry, SystemObserver};
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use tracing::{error, info, instrument};
@@ -8,8 +8,7 @@ use tracing_core::Level;
#[tokio::main]
async fn main() {
let obs_conf = Some("crates/obs/examples/config.toml".to_string());
let config = load_config(obs_conf);
let (_logger, _guard) = init_obs(config.clone()).await;
let (_logger, _guard) = init_obs(obs_conf).await;
let span = tracing::span!(Level::INFO, "main");
let _enter = span.enter();
info!("Program starts");
@@ -38,7 +37,7 @@ async fn run(bucket: String, object: String, user: String, service_name: String)
&[opentelemetry::KeyValue::new("operation", "run")],
);
match init_process_observer(meter).await {
match SystemObserver::init_process_observer(meter).await {
Ok(_) => info!("Process observer initialized successfully"),
Err(e) => error!("Failed to initialize process observer: {:?}", e),
}

View File

@@ -24,12 +24,12 @@ pub struct OtelConfig {
pub environment: Option<String>, // Environment
pub logger_level: Option<String>, // Logger level
pub local_logging_enabled: Option<bool>, // Local logging enabled
// 新增 flexi_logger 相关配置
pub log_directory: Option<String>, // 日志文件目录
pub log_filename: Option<String>, // 日志文件名称
pub log_rotation_size_mb: Option<u64>, // 日志文件大小切割阈值 (MB)
pub log_rotation_time: Option<String>, // 日志按时间切割 (Hour, Day)
pub log_keep_files: Option<u16>, // 保留日志文件数量
// Added flexi_logger related configurations
pub log_directory: Option<String>, // LOG FILE DIRECTORY
pub log_filename: Option<String>, // The name of the log file
pub log_rotation_size_mb: Option<u64>, // Log file size cut threshold (MB)
pub log_rotation_time: Option<String>, // Logs are cut by time (Hour DayMinute Second)
pub log_keep_files: Option<u16>, // Number of log files to be retained
}
impl OtelConfig {
@@ -132,6 +132,12 @@ pub struct KafkaSinkConfig {
impl KafkaSinkConfig {
pub fn new() -> Self {
Self::default()
}
}
impl Default for KafkaSinkConfig {
fn default() -> Self {
Self {
brokers: env::var("RUSTFS_SINKS_KAFKA_BROKERS")
.ok()
@@ -140,19 +146,13 @@ impl KafkaSinkConfig {
topic: env::var("RUSTFS_SINKS_KAFKA_TOPIC")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "default_topic".to_string()),
.unwrap_or_else(|| "rustfs_sink".to_string()),
batch_size: Some(100),
batch_timeout_ms: Some(1000),
}
}
}
impl Default for KafkaSinkConfig {
fn default() -> Self {
Self::new()
}
}
/// Webhook Sink Configuration - Add Retry Parameters
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct WebhookSinkConfig {
@@ -178,7 +178,7 @@ impl Default for WebhookSinkConfig {
auth_token: env::var("RUSTFS_SINKS_WEBHOOK_AUTH_TOKEN")
.ok()
.filter(|s| !s.trim().is_empty())
.unwrap_or_else(|| "default_token".to_string()),
.unwrap_or_else(|| "rustfs_webhook_token".to_string()),
max_retries: Some(3),
retry_delay_ms: Some(100),
}
@@ -209,6 +209,12 @@ impl FileSinkConfig {
.to_string()
}
pub fn new() -> Self {
Self::default()
}
}
impl Default for FileSinkConfig {
fn default() -> Self {
Self {
path: env::var("RUSTFS_SINKS_FILE_PATH")
.ok()
@@ -230,12 +236,6 @@ impl FileSinkConfig {
}
}
impl Default for FileSinkConfig {
fn default() -> Self {
Self::new()
}
}
/// Sink configuration collection
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
@@ -287,9 +287,8 @@ impl Default for LoggerConfig {
/// # Example
/// ```
/// use rustfs_obs::AppConfig;
/// use rustfs_obs::load_config;
///
/// let config = load_config(None);
/// let config = AppConfig::new_with_endpoint(None);
/// ```
#[derive(Debug, Deserialize, Clone)]
pub struct AppConfig {
@@ -326,23 +325,3 @@ impl Default for AppConfig {
Self::new()
}
}
/// Loading the configuration file
/// Supports TOML, YAML and .env formats, read in order by priority
///
/// # Parameters
/// - `config_dir`: Configuration file path
///
/// # Returns
/// Configuration information
///
/// # Example
/// ```
/// use rustfs_obs::AppConfig;
/// use rustfs_obs::load_config;
///
/// let config = load_config(None);
/// ```
pub fn load_config(config: Option<String>) -> AppConfig {
AppConfig::new_with_endpoint(config)
}

View File

@@ -1,4 +1,6 @@
use crate::telemetry::OtelGuard;
use crate::logger::InitLogStatus;
use crate::telemetry::{init_telemetry, OtelGuard};
use crate::{get_global_logger, init_global_logger, AppConfig, Logger};
use std::sync::{Arc, Mutex};
use tokio::sync::{OnceCell, SetError};
use tracing::{error, info};
@@ -31,6 +33,62 @@ pub enum GlobalError {
Timeout(&'static str),
}
/// Initialize the observability module
///
/// # Parameters
/// - `config`: Configuration information
///
/// # Returns
/// A tuple containing the logger and the telemetry guard
///
/// # Example
/// ```no_run
/// use rustfs_obs::init_obs;
///
/// # #[tokio::main]
/// # async fn main() {
/// let (logger, guard) = init_obs(None).await;
/// # }
/// ```
pub async fn init_obs(endpoint: Option<String>) -> (Arc<tokio::sync::Mutex<Logger>>, OtelGuard) {
// Load the configuration file
let config = AppConfig::new_with_endpoint(endpoint);
let guard = init_telemetry(&config.observability);
let logger = init_global_logger(&config).await;
let obs_config = config.observability.clone();
tokio::spawn(async move {
let result = InitLogStatus::init_start_log(&obs_config).await;
match result {
Ok(_) => {
info!("Logger initialized successfully");
}
Err(e) => {
error!("Failed to initialize logger: {}", e);
}
}
});
(logger, guard)
}
/// Get the global logger instance
/// This function returns a reference to the global logger instance.
///
/// # Returns
/// A reference to the global logger instance
///
/// # Example
/// ```no_run
/// use rustfs_obs::get_logger;
///
/// let logger = get_logger();
/// ```
pub fn get_logger() -> &'static Arc<tokio::sync::Mutex<Logger>> {
get_global_logger()
}
/// Set the global guard for OpenTelemetry
///
/// # Arguments
@@ -42,11 +100,10 @@ pub enum GlobalError {
///
/// # Example
/// ```rust
/// use rustfs_obs::{init_telemetry, load_config, set_global_guard};
/// use rustfs_obs::{ init_obs, set_global_guard};
///
/// fn init() -> Result<(), Box<dyn std::error::Error>> {
/// let config = load_config(None);
/// let guard = init_telemetry(&config.observability);
/// let guard = init_obs(None);
/// set_global_guard(guard)?;
/// Ok(())
/// }

View File

@@ -23,12 +23,11 @@
/// ## Usage
///
/// ```no_run
/// use rustfs_obs::{AppConfig, init_obs};
/// use rustfs_obs::init_obs;
///
/// # #[tokio::main]
/// # async fn main() {
/// let config = AppConfig::default();
/// let (logger, guard) = init_obs(config).await;
/// let (logger, guard) = init_obs(None).await;
/// # }
/// ```
mod config;
@@ -40,74 +39,14 @@ mod system;
mod telemetry;
mod worker;
use crate::logger::InitLogStatus;
pub use config::load_config;
pub use config::{AppConfig, LoggerConfig, OtelConfig, SinkConfig};
pub use entry::args::Args;
pub use entry::audit::{ApiDetails, AuditLogEntry};
pub use entry::base::BaseLogEntry;
pub use entry::unified::{ConsoleLogEntry, ServerLogEntry, UnifiedLogEntry};
pub use entry::{LogKind, LogRecord, ObjectVersion, SerializableLevel};
pub use global::{get_global_guard, set_global_guard, try_get_global_guard, GlobalError};
pub use global::*;
pub use logger::Logger;
pub use logger::{get_global_logger, init_global_logger, start_logger};
pub use logger::{log_debug, log_error, log_info, log_trace, log_warn, log_with_context};
use std::sync::Arc;
pub use system::{init_process_observer, init_process_observer_for_pid};
pub use telemetry::init_telemetry;
use tokio::sync::Mutex;
use tracing::{error, info};
/// Initialize the observability module
///
/// # Parameters
/// - `config`: Configuration information
///
/// # Returns
/// A tuple containing the logger and the telemetry guard
///
/// # Example
/// ```no_run
/// use rustfs_obs::{AppConfig, init_obs};
///
/// # #[tokio::main]
/// # async fn main() {
/// let config = AppConfig::default();
/// let (logger, guard) = init_obs(config).await;
/// # }
/// ```
pub async fn init_obs(config: AppConfig) -> (Arc<Mutex<Logger>>, telemetry::OtelGuard) {
let guard = init_telemetry(&config.observability);
let sinks = sinks::create_sinks(&config).await;
let logger = init_global_logger(&config, sinks).await;
let obs_config = config.observability.clone();
tokio::spawn(async move {
let result = InitLogStatus::init_start_log(&obs_config).await;
match result {
Ok(_) => {
info!("Logger initialized successfully");
}
Err(e) => {
error!("Failed to initialize logger: {}", e);
}
}
});
(logger, guard)
}
/// Get the global logger instance
/// This function returns a reference to the global logger instance.
///
/// # Returns
/// A reference to the global logger instance
///
/// # Example
/// ```no_run
/// use rustfs_obs::get_logger;
///
/// let logger = get_logger();
/// ```
pub fn get_logger() -> &'static Arc<Mutex<Logger>> {
get_global_logger()
}
pub use system::SystemObserver;

View File

@@ -1,5 +1,7 @@
use crate::sinks::Sink;
use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, GlobalError, OtelConfig, ServerLogEntry, UnifiedLogEntry};
use crate::{
sinks, AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, GlobalError, OtelConfig, ServerLogEntry, UnifiedLogEntry,
};
use rustfs_config::{APP_NAME, ENVIRONMENT, SERVICE_VERSION};
use std::sync::Arc;
use std::time::SystemTime;
@@ -253,10 +255,10 @@ pub fn start_logger(config: &AppConfig, sinks: Vec<Arc<dyn Sink>>) -> Logger {
/// use rustfs_obs::{AppConfig,init_global_logger};
///
/// let config = AppConfig::default();
/// let sinks = vec![];
/// let logger = init_global_logger(&config, sinks);
/// let logger = init_global_logger(&config);
/// ```
pub async fn init_global_logger(config: &AppConfig, sinks: Vec<Arc<dyn Sink>>) -> Arc<Mutex<Logger>> {
pub async fn init_global_logger(config: &AppConfig) -> Arc<Mutex<Logger>> {
let sinks = sinks::create_sinks(config).await;
let logger = Arc::new(Mutex::new(start_logger(config, sinks)));
GLOBAL_LOGGER.set(logger.clone()).expect("Logger already initialized");
logger

View File

@@ -5,20 +5,24 @@ mod collector;
pub(crate) mod gpu;
pub(crate) mod metrics;
/// Initialize the indicator collector for the current process
/// This function will create a new `Collector` instance and start collecting metrics.
/// It will run indefinitely until the process is terminated.
pub async fn init_process_observer(meter: opentelemetry::metrics::Meter) -> Result<(), GlobalError> {
let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?;
let mut collector = collector::Collector::new(pid, meter, 30000)?;
collector.run().await
}
pub struct SystemObserver {}
/// Initialize the metric collector for the specified PID process
/// This function will create a new `Collector` instance and start collecting metrics.
/// It will run indefinitely until the process is terminated.
pub async fn init_process_observer_for_pid(meter: opentelemetry::metrics::Meter, pid: u32) -> Result<(), GlobalError> {
let pid = sysinfo::Pid::from_u32(pid);
let mut collector = collector::Collector::new(pid, meter, 30000)?;
collector.run().await
impl SystemObserver {
/// Initialize the indicator collector for the current process
/// This function will create a new `Collector` instance and start collecting metrics.
/// It will run indefinitely until the process is terminated.
pub async fn init_process_observer(meter: opentelemetry::metrics::Meter) -> Result<(), GlobalError> {
let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?;
let mut collector = collector::Collector::new(pid, meter, 30000)?;
collector.run().await
}
/// Initialize the metric collector for the specified PID process
/// This function will create a new `Collector` instance and start collecting metrics.
/// It will run indefinitely until the process is terminated.
pub async fn init_process_observer_for_pid(meter: opentelemetry::metrics::Meter, pid: u32) -> Result<(), GlobalError> {
let pid = sysinfo::Pid::from_u32(pid);
let mut collector = collector::Collector::new(pid, meter, 30000)?;
collector.run().await
}
}

View File

@@ -1,4 +1,3 @@
// Added flexi_logger related dependencies
use crate::OtelConfig;
use flexi_logger::{style, Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode};
use nu_ansi_term::Color;
@@ -17,7 +16,8 @@ use opentelemetry_semantic_conventions::{
SCHEMA_URL,
};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,
APP_NAME, DEFAULT_LOG_DIR, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO,
SERVICE_VERSION, USE_STDOUT,
};
use rustfs_utils::get_local_ip_with_default;
use smallvec::SmallVec;
@@ -39,19 +39,6 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilte
/// - The tracer provider (for distributed tracing)
/// - The meter provider (for metrics collection)
/// - The logger provider (for structured logging)
///
/// # Example
///
/// ```
/// use rustfs_obs::{init_telemetry, OtelConfig};
///
/// let config = OtelConfig::default();
/// let otel_guard = init_telemetry(&config);
///
/// // The guard is kept alive for the duration of the application
/// // When it's dropped, all telemetry components are properly shut down
/// drop(otel_guard);
/// ```
// Implement Debug trait correctly, rather than using derive, as some fields may not have implemented Debug
pub struct OtelGuard {
tracer_provider: Option<SdkTracerProvider>,
@@ -123,7 +110,7 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout:
}
/// Initialize Telemetry
pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
// avoid repeated access to configuration fields
let endpoint = &config.endpoint;
let use_stdout = config.use_stdout.unwrap_or(USE_STDOUT);
@@ -289,7 +276,7 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
}
} else {
// Obtain the log directory and file name configuration
let log_directory = config.log_directory.as_deref().unwrap_or("logs");
let log_directory = config.log_directory.as_deref().unwrap_or(DEFAULT_LOG_DIR);
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
// Build log cutting conditions
@@ -349,7 +336,7 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
.basename(log_filename)
.suffix("log"),
)
.rotate(rotation_criterion, Naming::Timestamps, Cleanup::KeepLogFiles(keep_files.into()))
.rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files.into()))
.format_for_files(format_for_file) // Add a custom formatting function for file output
.duplicate_to_stdout(level_filter) // Use dynamic levels
.format_for_stdout(format_with_color) // Add a custom formatting function for terminal output

View File

@@ -48,7 +48,7 @@ use iam::init_iam_sys;
use license::init_license;
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_obs::{init_obs, init_process_observer, load_config, set_global_guard};
use rustfs_obs::{init_obs, set_global_guard, SystemObserver};
use rustls::ServerConfig;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
use service::hybrid;
@@ -102,11 +102,8 @@ async fn main() -> Result<()> {
// Initialize the configuration
init_license(opt.license.clone());
// Load the configuration file
let config = load_config(Some(opt.clone().obs_endpoint));
// Initialize Observability
let (_logger, guard) = init_obs(config.clone()).await;
let (_logger, guard) = init_obs(Some(opt.clone().obs_endpoint)).await;
// Store in global storage
set_global_guard(guard)?;
@@ -233,7 +230,7 @@ async fn run(opt: config::Opt) -> Result<()> {
tokio::spawn(async move {
// Record the PID-related metrics of the current process
let meter = opentelemetry::global::meter("system");
let obs_result = init_process_observer(meter).await;
let obs_result = SystemObserver::init_process_observer(meter).await;
match obs_result {
Ok(_) => {
info!("Process observer initialized successfully");