mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
improve code for obs crate
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
use opentelemetry::global;
|
||||
use rustfs_obs::{get_logger, init_obs, init_process_observer, load_config, log_info, BaseLogEntry, ServerLogEntry};
|
||||
use rustfs_obs::{get_logger, init_obs, log_info, BaseLogEntry, ServerLogEntry, SystemObserver};
|
||||
use std::collections::HashMap;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use tracing::{error, info, instrument};
|
||||
@@ -8,8 +8,7 @@ use tracing_core::Level;
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let obs_conf = Some("crates/obs/examples/config.toml".to_string());
|
||||
let config = load_config(obs_conf);
|
||||
let (_logger, _guard) = init_obs(config.clone()).await;
|
||||
let (_logger, _guard) = init_obs(obs_conf).await;
|
||||
let span = tracing::span!(Level::INFO, "main");
|
||||
let _enter = span.enter();
|
||||
info!("Program starts");
|
||||
@@ -38,7 +37,7 @@ async fn run(bucket: String, object: String, user: String, service_name: String)
|
||||
&[opentelemetry::KeyValue::new("operation", "run")],
|
||||
);
|
||||
|
||||
match init_process_observer(meter).await {
|
||||
match SystemObserver::init_process_observer(meter).await {
|
||||
Ok(_) => info!("Process observer initialized successfully"),
|
||||
Err(e) => error!("Failed to initialize process observer: {:?}", e),
|
||||
}
|
||||
|
||||
@@ -24,12 +24,12 @@ pub struct OtelConfig {
|
||||
pub environment: Option<String>, // Environment
|
||||
pub logger_level: Option<String>, // Logger level
|
||||
pub local_logging_enabled: Option<bool>, // Local logging enabled
|
||||
// 新增 flexi_logger 相关配置
|
||||
pub log_directory: Option<String>, // 日志文件目录
|
||||
pub log_filename: Option<String>, // 日志文件名称
|
||||
pub log_rotation_size_mb: Option<u64>, // 日志文件大小切割阈值 (MB)
|
||||
pub log_rotation_time: Option<String>, // 日志按时间切割 (Hour, Day)
|
||||
pub log_keep_files: Option<u16>, // 保留日志文件数量
|
||||
// Added flexi_logger related configurations
|
||||
pub log_directory: Option<String>, // LOG FILE DIRECTORY
|
||||
pub log_filename: Option<String>, // The name of the log file
|
||||
pub log_rotation_size_mb: Option<u64>, // Log file size cut threshold (MB)
|
||||
pub log_rotation_time: Option<String>, // Logs are cut by time (Hour, Day,Minute, Second)
|
||||
pub log_keep_files: Option<u16>, // Number of log files to be retained
|
||||
}
|
||||
|
||||
impl OtelConfig {
|
||||
@@ -132,6 +132,12 @@ pub struct KafkaSinkConfig {
|
||||
|
||||
impl KafkaSinkConfig {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for KafkaSinkConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
brokers: env::var("RUSTFS_SINKS_KAFKA_BROKERS")
|
||||
.ok()
|
||||
@@ -140,19 +146,13 @@ impl KafkaSinkConfig {
|
||||
topic: env::var("RUSTFS_SINKS_KAFKA_TOPIC")
|
||||
.ok()
|
||||
.filter(|s| !s.trim().is_empty())
|
||||
.unwrap_or_else(|| "default_topic".to_string()),
|
||||
.unwrap_or_else(|| "rustfs_sink".to_string()),
|
||||
batch_size: Some(100),
|
||||
batch_timeout_ms: Some(1000),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for KafkaSinkConfig {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Webhook Sink Configuration - Add Retry Parameters
|
||||
#[derive(Debug, Deserialize, Serialize, Clone)]
|
||||
pub struct WebhookSinkConfig {
|
||||
@@ -178,7 +178,7 @@ impl Default for WebhookSinkConfig {
|
||||
auth_token: env::var("RUSTFS_SINKS_WEBHOOK_AUTH_TOKEN")
|
||||
.ok()
|
||||
.filter(|s| !s.trim().is_empty())
|
||||
.unwrap_or_else(|| "default_token".to_string()),
|
||||
.unwrap_or_else(|| "rustfs_webhook_token".to_string()),
|
||||
max_retries: Some(3),
|
||||
retry_delay_ms: Some(100),
|
||||
}
|
||||
@@ -209,6 +209,12 @@ impl FileSinkConfig {
|
||||
.to_string()
|
||||
}
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for FileSinkConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
path: env::var("RUSTFS_SINKS_FILE_PATH")
|
||||
.ok()
|
||||
@@ -230,12 +236,6 @@ impl FileSinkConfig {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for FileSinkConfig {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Sink configuration collection
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
@@ -287,9 +287,8 @@ impl Default for LoggerConfig {
|
||||
/// # Example
|
||||
/// ```
|
||||
/// use rustfs_obs::AppConfig;
|
||||
/// use rustfs_obs::load_config;
|
||||
///
|
||||
/// let config = load_config(None);
|
||||
/// let config = AppConfig::new_with_endpoint(None);
|
||||
/// ```
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct AppConfig {
|
||||
@@ -326,23 +325,3 @@ impl Default for AppConfig {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Loading the configuration file
|
||||
/// Supports TOML, YAML and .env formats, read in order by priority
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `config_dir`: Configuration file path
|
||||
///
|
||||
/// # Returns
|
||||
/// Configuration information
|
||||
///
|
||||
/// # Example
|
||||
/// ```
|
||||
/// use rustfs_obs::AppConfig;
|
||||
/// use rustfs_obs::load_config;
|
||||
///
|
||||
/// let config = load_config(None);
|
||||
/// ```
|
||||
pub fn load_config(config: Option<String>) -> AppConfig {
|
||||
AppConfig::new_with_endpoint(config)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use crate::telemetry::OtelGuard;
|
||||
use crate::logger::InitLogStatus;
|
||||
use crate::telemetry::{init_telemetry, OtelGuard};
|
||||
use crate::{get_global_logger, init_global_logger, AppConfig, Logger};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::{OnceCell, SetError};
|
||||
use tracing::{error, info};
|
||||
@@ -31,6 +33,62 @@ pub enum GlobalError {
|
||||
Timeout(&'static str),
|
||||
}
|
||||
|
||||
/// Initialize the observability module
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `config`: Configuration information
|
||||
///
|
||||
/// # Returns
|
||||
/// A tuple containing the logger and the telemetry guard
|
||||
///
|
||||
/// # Example
|
||||
/// ```no_run
|
||||
/// use rustfs_obs::init_obs;
|
||||
///
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() {
|
||||
/// let (logger, guard) = init_obs(None).await;
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn init_obs(endpoint: Option<String>) -> (Arc<tokio::sync::Mutex<Logger>>, OtelGuard) {
|
||||
// Load the configuration file
|
||||
let config = AppConfig::new_with_endpoint(endpoint);
|
||||
|
||||
let guard = init_telemetry(&config.observability);
|
||||
|
||||
let logger = init_global_logger(&config).await;
|
||||
let obs_config = config.observability.clone();
|
||||
tokio::spawn(async move {
|
||||
let result = InitLogStatus::init_start_log(&obs_config).await;
|
||||
match result {
|
||||
Ok(_) => {
|
||||
info!("Logger initialized successfully");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to initialize logger: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
(logger, guard)
|
||||
}
|
||||
|
||||
/// Get the global logger instance
|
||||
/// This function returns a reference to the global logger instance.
|
||||
///
|
||||
/// # Returns
|
||||
/// A reference to the global logger instance
|
||||
///
|
||||
/// # Example
|
||||
/// ```no_run
|
||||
/// use rustfs_obs::get_logger;
|
||||
///
|
||||
/// let logger = get_logger();
|
||||
/// ```
|
||||
pub fn get_logger() -> &'static Arc<tokio::sync::Mutex<Logger>> {
|
||||
get_global_logger()
|
||||
}
|
||||
|
||||
/// Set the global guard for OpenTelemetry
|
||||
///
|
||||
/// # Arguments
|
||||
@@ -42,11 +100,10 @@ pub enum GlobalError {
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use rustfs_obs::{init_telemetry, load_config, set_global_guard};
|
||||
/// use rustfs_obs::{ init_obs, set_global_guard};
|
||||
///
|
||||
/// fn init() -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let config = load_config(None);
|
||||
/// let guard = init_telemetry(&config.observability);
|
||||
/// let guard = init_obs(None);
|
||||
/// set_global_guard(guard)?;
|
||||
/// Ok(())
|
||||
/// }
|
||||
|
||||
@@ -23,12 +23,11 @@
|
||||
/// ## Usage
|
||||
///
|
||||
/// ```no_run
|
||||
/// use rustfs_obs::{AppConfig, init_obs};
|
||||
/// use rustfs_obs::init_obs;
|
||||
///
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() {
|
||||
/// let config = AppConfig::default();
|
||||
/// let (logger, guard) = init_obs(config).await;
|
||||
/// let (logger, guard) = init_obs(None).await;
|
||||
/// # }
|
||||
/// ```
|
||||
mod config;
|
||||
@@ -40,74 +39,14 @@ mod system;
|
||||
mod telemetry;
|
||||
mod worker;
|
||||
|
||||
use crate::logger::InitLogStatus;
|
||||
pub use config::load_config;
|
||||
pub use config::{AppConfig, LoggerConfig, OtelConfig, SinkConfig};
|
||||
pub use entry::args::Args;
|
||||
pub use entry::audit::{ApiDetails, AuditLogEntry};
|
||||
pub use entry::base::BaseLogEntry;
|
||||
pub use entry::unified::{ConsoleLogEntry, ServerLogEntry, UnifiedLogEntry};
|
||||
pub use entry::{LogKind, LogRecord, ObjectVersion, SerializableLevel};
|
||||
pub use global::{get_global_guard, set_global_guard, try_get_global_guard, GlobalError};
|
||||
pub use global::*;
|
||||
pub use logger::Logger;
|
||||
pub use logger::{get_global_logger, init_global_logger, start_logger};
|
||||
pub use logger::{log_debug, log_error, log_info, log_trace, log_warn, log_with_context};
|
||||
use std::sync::Arc;
|
||||
pub use system::{init_process_observer, init_process_observer_for_pid};
|
||||
pub use telemetry::init_telemetry;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{error, info};
|
||||
|
||||
/// Initialize the observability module
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `config`: Configuration information
|
||||
///
|
||||
/// # Returns
|
||||
/// A tuple containing the logger and the telemetry guard
|
||||
///
|
||||
/// # Example
|
||||
/// ```no_run
|
||||
/// use rustfs_obs::{AppConfig, init_obs};
|
||||
///
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() {
|
||||
/// let config = AppConfig::default();
|
||||
/// let (logger, guard) = init_obs(config).await;
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn init_obs(config: AppConfig) -> (Arc<Mutex<Logger>>, telemetry::OtelGuard) {
|
||||
let guard = init_telemetry(&config.observability);
|
||||
let sinks = sinks::create_sinks(&config).await;
|
||||
let logger = init_global_logger(&config, sinks).await;
|
||||
let obs_config = config.observability.clone();
|
||||
tokio::spawn(async move {
|
||||
let result = InitLogStatus::init_start_log(&obs_config).await;
|
||||
match result {
|
||||
Ok(_) => {
|
||||
info!("Logger initialized successfully");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to initialize logger: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
(logger, guard)
|
||||
}
|
||||
|
||||
/// Get the global logger instance
|
||||
/// This function returns a reference to the global logger instance.
|
||||
///
|
||||
/// # Returns
|
||||
/// A reference to the global logger instance
|
||||
///
|
||||
/// # Example
|
||||
/// ```no_run
|
||||
/// use rustfs_obs::get_logger;
|
||||
///
|
||||
/// let logger = get_logger();
|
||||
/// ```
|
||||
pub fn get_logger() -> &'static Arc<Mutex<Logger>> {
|
||||
get_global_logger()
|
||||
}
|
||||
pub use system::SystemObserver;
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use crate::sinks::Sink;
|
||||
use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, GlobalError, OtelConfig, ServerLogEntry, UnifiedLogEntry};
|
||||
use crate::{
|
||||
sinks, AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, GlobalError, OtelConfig, ServerLogEntry, UnifiedLogEntry,
|
||||
};
|
||||
use rustfs_config::{APP_NAME, ENVIRONMENT, SERVICE_VERSION};
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
@@ -253,10 +255,10 @@ pub fn start_logger(config: &AppConfig, sinks: Vec<Arc<dyn Sink>>) -> Logger {
|
||||
/// use rustfs_obs::{AppConfig,init_global_logger};
|
||||
///
|
||||
/// let config = AppConfig::default();
|
||||
/// let sinks = vec![];
|
||||
/// let logger = init_global_logger(&config, sinks);
|
||||
/// let logger = init_global_logger(&config);
|
||||
/// ```
|
||||
pub async fn init_global_logger(config: &AppConfig, sinks: Vec<Arc<dyn Sink>>) -> Arc<Mutex<Logger>> {
|
||||
pub async fn init_global_logger(config: &AppConfig) -> Arc<Mutex<Logger>> {
|
||||
let sinks = sinks::create_sinks(config).await;
|
||||
let logger = Arc::new(Mutex::new(start_logger(config, sinks)));
|
||||
GLOBAL_LOGGER.set(logger.clone()).expect("Logger already initialized");
|
||||
logger
|
||||
|
||||
@@ -5,20 +5,24 @@ mod collector;
|
||||
pub(crate) mod gpu;
|
||||
pub(crate) mod metrics;
|
||||
|
||||
/// Initialize the indicator collector for the current process
|
||||
/// This function will create a new `Collector` instance and start collecting metrics.
|
||||
/// It will run indefinitely until the process is terminated.
|
||||
pub async fn init_process_observer(meter: opentelemetry::metrics::Meter) -> Result<(), GlobalError> {
|
||||
let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?;
|
||||
let mut collector = collector::Collector::new(pid, meter, 30000)?;
|
||||
collector.run().await
|
||||
}
|
||||
pub struct SystemObserver {}
|
||||
|
||||
/// Initialize the metric collector for the specified PID process
|
||||
/// This function will create a new `Collector` instance and start collecting metrics.
|
||||
/// It will run indefinitely until the process is terminated.
|
||||
pub async fn init_process_observer_for_pid(meter: opentelemetry::metrics::Meter, pid: u32) -> Result<(), GlobalError> {
|
||||
let pid = sysinfo::Pid::from_u32(pid);
|
||||
let mut collector = collector::Collector::new(pid, meter, 30000)?;
|
||||
collector.run().await
|
||||
impl SystemObserver {
|
||||
/// Initialize the indicator collector for the current process
|
||||
/// This function will create a new `Collector` instance and start collecting metrics.
|
||||
/// It will run indefinitely until the process is terminated.
|
||||
pub async fn init_process_observer(meter: opentelemetry::metrics::Meter) -> Result<(), GlobalError> {
|
||||
let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?;
|
||||
let mut collector = collector::Collector::new(pid, meter, 30000)?;
|
||||
collector.run().await
|
||||
}
|
||||
|
||||
/// Initialize the metric collector for the specified PID process
|
||||
/// This function will create a new `Collector` instance and start collecting metrics.
|
||||
/// It will run indefinitely until the process is terminated.
|
||||
pub async fn init_process_observer_for_pid(meter: opentelemetry::metrics::Meter, pid: u32) -> Result<(), GlobalError> {
|
||||
let pid = sysinfo::Pid::from_u32(pid);
|
||||
let mut collector = collector::Collector::new(pid, meter, 30000)?;
|
||||
collector.run().await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
// Added flexi_logger related dependencies
|
||||
use crate::OtelConfig;
|
||||
use flexi_logger::{style, Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode};
|
||||
use nu_ansi_term::Color;
|
||||
@@ -17,7 +16,8 @@ use opentelemetry_semantic_conventions::{
|
||||
SCHEMA_URL,
|
||||
};
|
||||
use rustfs_config::{
|
||||
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,
|
||||
APP_NAME, DEFAULT_LOG_DIR, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO,
|
||||
SERVICE_VERSION, USE_STDOUT,
|
||||
};
|
||||
use rustfs_utils::get_local_ip_with_default;
|
||||
use smallvec::SmallVec;
|
||||
@@ -39,19 +39,6 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilte
|
||||
/// - The tracer provider (for distributed tracing)
|
||||
/// - The meter provider (for metrics collection)
|
||||
/// - The logger provider (for structured logging)
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use rustfs_obs::{init_telemetry, OtelConfig};
|
||||
///
|
||||
/// let config = OtelConfig::default();
|
||||
/// let otel_guard = init_telemetry(&config);
|
||||
///
|
||||
/// // The guard is kept alive for the duration of the application
|
||||
/// // When it's dropped, all telemetry components are properly shut down
|
||||
/// drop(otel_guard);
|
||||
/// ```
|
||||
// Implement Debug trait correctly, rather than using derive, as some fields may not have implemented Debug
|
||||
pub struct OtelGuard {
|
||||
tracer_provider: Option<SdkTracerProvider>,
|
||||
@@ -123,7 +110,7 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout:
|
||||
}
|
||||
|
||||
/// Initialize Telemetry
|
||||
pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
|
||||
pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
|
||||
// avoid repeated access to configuration fields
|
||||
let endpoint = &config.endpoint;
|
||||
let use_stdout = config.use_stdout.unwrap_or(USE_STDOUT);
|
||||
@@ -289,7 +276,7 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
|
||||
}
|
||||
} else {
|
||||
// Obtain the log directory and file name configuration
|
||||
let log_directory = config.log_directory.as_deref().unwrap_or("logs");
|
||||
let log_directory = config.log_directory.as_deref().unwrap_or(DEFAULT_LOG_DIR);
|
||||
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
|
||||
|
||||
// Build log cutting conditions
|
||||
@@ -349,7 +336,7 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
|
||||
.basename(log_filename)
|
||||
.suffix("log"),
|
||||
)
|
||||
.rotate(rotation_criterion, Naming::Timestamps, Cleanup::KeepLogFiles(keep_files.into()))
|
||||
.rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files.into()))
|
||||
.format_for_files(format_for_file) // Add a custom formatting function for file output
|
||||
.duplicate_to_stdout(level_filter) // Use dynamic levels
|
||||
.format_for_stdout(format_with_color) // Add a custom formatting function for terminal output
|
||||
|
||||
@@ -48,7 +48,7 @@ use iam::init_iam_sys;
|
||||
use license::init_license;
|
||||
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
|
||||
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
|
||||
use rustfs_obs::{init_obs, init_process_observer, load_config, set_global_guard};
|
||||
use rustfs_obs::{init_obs, set_global_guard, SystemObserver};
|
||||
use rustls::ServerConfig;
|
||||
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
|
||||
use service::hybrid;
|
||||
@@ -102,11 +102,8 @@ async fn main() -> Result<()> {
|
||||
// Initialize the configuration
|
||||
init_license(opt.license.clone());
|
||||
|
||||
// Load the configuration file
|
||||
let config = load_config(Some(opt.clone().obs_endpoint));
|
||||
|
||||
// Initialize Observability
|
||||
let (_logger, guard) = init_obs(config.clone()).await;
|
||||
let (_logger, guard) = init_obs(Some(opt.clone().obs_endpoint)).await;
|
||||
|
||||
// Store in global storage
|
||||
set_global_guard(guard)?;
|
||||
@@ -233,7 +230,7 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
tokio::spawn(async move {
|
||||
// Record the PID-related metrics of the current process
|
||||
let meter = opentelemetry::global::meter("system");
|
||||
let obs_result = init_process_observer(meter).await;
|
||||
let obs_result = SystemObserver::init_process_observer(meter).await;
|
||||
match obs_result {
|
||||
Ok(_) => {
|
||||
info!("Process observer initialized successfully");
|
||||
|
||||
Reference in New Issue
Block a user