From a12a3bedc3096d5dabd0ee73ea2b4d50d61e4e24 Mon Sep 17 00:00:00 2001 From: houseme Date: Tue, 16 Sep 2025 08:25:37 +0800 Subject: [PATCH] feat(obs): optimize WriteMode selection logic in init_telemetry (#546) - Refactor WriteMode selection to ensure all variables moved into thread closures are owned types, preventing lifetime issues. - Simplify and clarify WriteMode assignment for production and non-production environments. - Improve code readability and maintainability for logger initialization. --- Cargo.lock | 1 + Cargo.toml | 2 +- crates/config/src/observability/config.rs | 21 ++++++++- crates/obs/Cargo.toml | 3 +- crates/obs/src/telemetry.rs | 55 +++++++++++++++++++---- rustfs/src/server/console.rs | 12 ++--- rustfs/src/server/http.rs | 23 +++------- scripts/run.sh | 3 ++ 8 files changed, 85 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9eccb17f..e374f706 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2942,6 +2942,7 @@ dependencies = [ "chrono", "crossbeam-channel", "crossbeam-queue", + "flate2", "log", "notify-debouncer-mini", "nu-ansi-term", diff --git a/Cargo.toml b/Cargo.toml index fe8c8eee..37ccd795 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -123,7 +123,7 @@ derive_builder = "0.20.2" enumset = "1.1.10" flatbuffers = "25.2.10" flate2 = "1.1.2" -flexi_logger = { version = "0.31.2", features = ["trc", "dont_minimize_extra_stacks"] } +flexi_logger = { version = "0.31.2", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] } form_urlencoded = "1.2.2" futures = "0.3.31" futures-core = "0.3.31" diff --git a/crates/config/src/observability/config.rs b/crates/config/src/observability/config.rs index 32515aef..0032c286 100644 --- a/crates/config/src/observability/config.rs +++ b/crates/config/src/observability/config.rs @@ -29,9 +29,28 @@ pub const ENV_OBS_LOG_ROTATION_SIZE_MB: &str = "RUSTFS_OBS_LOG_ROTATION_SIZE_MB" pub const ENV_OBS_LOG_ROTATION_TIME: &str = "RUSTFS_OBS_LOG_ROTATION_TIME"; pub const ENV_OBS_LOG_KEEP_FILES: &str = "RUSTFS_OBS_LOG_KEEP_FILES"; +/// Log pool capacity for async logging +pub const ENV_OBS_LOG_POOL_CAPA: &str = "RUSTFS_OBS_LOG_POOL_CAPA"; + +/// Log message capacity for async logging +pub const ENV_OBS_LOG_MESSAGE_CAPA: &str = "RUSTFS_OBS_LOG_MESSAGE_CAPA"; + +/// Log flush interval in milliseconds for async logging +pub const ENV_OBS_LOG_FLUSH_MS: &str = "RUSTFS_OBS_LOG_FLUSH_MS"; + +/// Default values for log pool +pub const DEFAULT_OBS_LOG_POOL_CAPA: usize = 10240; + +/// Default values for message capacity +pub const DEFAULT_OBS_LOG_MESSAGE_CAPA: usize = 32768; + +/// Default values for flush interval in milliseconds +pub const DEFAULT_OBS_LOG_FLUSH_MS: u64 = 200; + +/// Audit logger queue capacity environment variable key pub const ENV_AUDIT_LOGGER_QUEUE_CAPACITY: &str = "RUSTFS_AUDIT_LOGGER_QUEUE_CAPACITY"; -// Default values for observability configuration +/// Default values for observability configuration pub const DEFAULT_AUDIT_LOGGER_QUEUE_CAPACITY: usize = 10000; /// Default values for observability configuration diff --git a/crates/obs/Cargo.toml b/crates/obs/Cargo.toml index d87a86f0..9838561a 100644 --- a/crates/obs/Cargo.toml +++ b/crates/obs/Cargo.toml @@ -40,7 +40,7 @@ rustfs-config = { workspace = true, features = ["constants", "observability"] } rustfs-utils = { workspace = true, features = ["ip", "path"] } async-trait = { workspace = true } chrono = { workspace = true } -flexi_logger = { workspace = true, features = ["trc", "kv"] } +flexi_logger = { workspace = true } nu-ansi-term = { workspace = true } nvml-wrapper = { workspace = true, optional = true } opentelemetry = { workspace = true } @@ -62,6 +62,7 @@ serde_json = { workspace = true } sysinfo = { workspace = true } thiserror = { workspace = true } + # Only enable kafka features and related dependencies on Linux [target.'cfg(target_os = "linux")'.dependencies] rdkafka = { workspace = true, features = ["tokio"], optional = true } diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index 69cde686..97511876 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -13,15 +13,19 @@ // limitations under the License. use crate::OtelConfig; -use flexi_logger::{Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode, style}; +use flexi_logger::{ + Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode, + WriteMode::{AsyncWith, BufferAndFlush}, + style, +}; use nu_ansi_term::Color; use opentelemetry::trace::TracerProvider; use opentelemetry::{KeyValue, global}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::WithExportConfig; -use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::{ Resource, + logs::SdkLoggerProvider, metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider}, trace::{RandomIdGenerator, Sampler, SdkTracerProvider}, }; @@ -29,15 +33,19 @@ use opentelemetry_semantic_conventions::{ SCHEMA_URL, attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION}, }; -use rustfs_config::observability::{DEFAULT_OBS_ENVIRONMENT_PRODUCTION, ENV_OBS_LOG_DIRECTORY}; use rustfs_config::{ APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT, + observability::{ + DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_POOL_CAPA, + ENV_OBS_LOG_DIRECTORY, + }, }; use rustfs_utils::get_local_ip_with_default; use smallvec::SmallVec; use std::borrow::Cow; -use std::fs; use std::io::IsTerminal; +use std::time::Duration; +use std::{env, fs}; use tracing::info; use tracing_error::ErrorLayer; use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; @@ -121,7 +129,7 @@ fn resource(config: &OtelConfig) -> Resource { /// Creates a periodic reader for stdout metrics fn create_periodic_reader(interval: u64) -> PeriodicReader { PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()) - .with_interval(std::time::Duration::from_secs(interval)) + .with_interval(Duration::from_secs(interval)) .build() } @@ -209,7 +217,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard { builder = builder.with_reader( PeriodicReader::builder(exporter) - .with_interval(std::time::Duration::from_secs(meter_interval)) + .with_interval(Duration::from_secs(meter_interval)) .build(), ); @@ -291,7 +299,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard { "OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}", endpoint, logger_level, - std::env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string()) + env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string()) ); } } @@ -393,9 +401,18 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard { // Choose write mode based on environment let write_mode = if is_production { - WriteMode::Async + get_env_async_with().unwrap_or_else(|| { + eprintln!( + "Using default Async write mode in production. To customize, set RUSTFS_OBS_LOG_POOL_CAPA, RUSTFS_OBS_LOG_MESSAGE_CAPA, and RUSTFS_OBS_LOG_FLUSH_MS environment variables." + ); + AsyncWith { + pool_capa: DEFAULT_OBS_LOG_POOL_CAPA, + message_capa: DEFAULT_OBS_LOG_MESSAGE_CAPA, + flush_interval: Duration::from_millis(DEFAULT_OBS_LOG_FLUSH_MS), + } + }) } else { - WriteMode::BufferAndFlush + BufferAndFlush }; // Configure the flexi_logger with enhanced error handling @@ -470,6 +487,26 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard { } } +// Read the AsyncWith parameter from the environment variable +fn get_env_async_with() -> Option { + let pool_capa = env::var("RUSTFS_OBS_LOG_POOL_CAPA") + .ok() + .and_then(|v| v.parse::().ok()); + let message_capa = env::var("RUSTFS_OBS_LOG_MESSAGE_CAPA") + .ok() + .and_then(|v| v.parse::().ok()); + let flush_ms = env::var("RUSTFS_OBS_LOG_FLUSH_MS").ok().and_then(|v| v.parse::().ok()); + + match (pool_capa, message_capa, flush_ms) { + (Some(pool), Some(msg), Some(flush)) => Some(AsyncWith { + pool_capa: pool, + message_capa: msg, + flush_interval: Duration::from_millis(flush), + }), + _ => None, + } +} + fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilter { let level = default_level.unwrap_or(logger_level); let mut filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level)); diff --git a/rustfs/src/server/console.rs b/rustfs/src/server/console.rs index 290eb561..ba4d2b4c 100644 --- a/rustfs/src/server/console.rs +++ b/rustfs/src/server/console.rs @@ -16,7 +16,7 @@ use crate::admin::console::static_handler; use crate::config::Opt; use axum::{Router, extract::Request, middleware, response::Json, routing::get}; use axum_server::tls_rustls::RustlsConfig; -use http::{HeaderValue, Method, header}; +use http::{HeaderValue, Method}; use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; use rustfs_utils::net::parse_and_resolve_address; use serde_json::json; @@ -230,15 +230,15 @@ async fn health_check() -> Json { pub fn parse_cors_origins(origins: Option<&String>) -> CorsLayer { let cors_layer = CorsLayer::new() .allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE, Method::OPTIONS]) - .allow_headers([header::CONTENT_TYPE, header::AUTHORIZATION, header::ACCEPT, header::ORIGIN]); + .allow_headers(Any); match origins { - Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any), + Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any).expose_headers(Any), Some(origins_str) => { let origins: Vec<&str> = origins_str.split(',').map(|s| s.trim()).collect(); if origins.is_empty() { warn!("Empty CORS origins provided, using permissive CORS"); - cors_layer.allow_origin(Any) + cors_layer.allow_origin(Any).expose_headers(Any) } else { // Parse origins with proper error handling let mut valid_origins = Vec::new(); @@ -255,10 +255,10 @@ pub fn parse_cors_origins(origins: Option<&String>) -> CorsLayer { if valid_origins.is_empty() { warn!("No valid CORS origins found, using permissive CORS"); - cors_layer.allow_origin(Any) + cors_layer.allow_origin(Any).expose_headers(Any) } else { info!("Console CORS origins configured: {:?}", valid_origins); - cors_layer.allow_origin(AllowOrigin::list(valid_origins)) + cors_layer.allow_origin(AllowOrigin::list(valid_origins)).expose_headers(Any) } } } diff --git a/rustfs/src/server/http.rs b/rustfs/src/server/http.rs index 1759ccaa..e5cdd968 100644 --- a/rustfs/src/server/http.rs +++ b/rustfs/src/server/http.rs @@ -65,26 +65,15 @@ fn parse_cors_origins(origins: Option<&String>) -> CorsLayer { Method::HEAD, Method::OPTIONS, ]) - .allow_headers([ - http::header::CONTENT_TYPE, - http::header::AUTHORIZATION, - http::header::ACCEPT, - http::header::ORIGIN, - // Note: X_AMZ_* headers are custom and may need to be defined - // http::header::X_AMZ_CONTENT_SHA256, - // http::header::X_AMZ_DATE, - // http::header::X_AMZ_SECURITY_TOKEN, - // http::header::X_AMZ_USER_AGENT, - http::header::RANGE, - ]); + .allow_headers(Any); match origins { - Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any), + Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any).expose_headers(Any), Some(origins_str) => { let origins: Vec<&str> = origins_str.split(',').map(|s| s.trim()).collect(); if origins.is_empty() { warn!("Empty CORS origins provided, using permissive CORS"); - cors_layer.allow_origin(Any) + cors_layer.allow_origin(Any).expose_headers(Any) } else { // Parse origins with proper error handling let mut valid_origins = Vec::new(); @@ -101,16 +90,16 @@ fn parse_cors_origins(origins: Option<&String>) -> CorsLayer { if valid_origins.is_empty() { warn!("No valid CORS origins found, using permissive CORS"); - cors_layer.allow_origin(Any) + cors_layer.allow_origin(Any).expose_headers(Any) } else { info!("Endpoint CORS origins configured: {:?}", valid_origins); - cors_layer.allow_origin(AllowOrigin::list(valid_origins)) + cors_layer.allow_origin(AllowOrigin::list(valid_origins)).expose_headers(Any) } } } None => { debug!("No CORS origins configured for endpoint, using permissive CORS"); - cors_layer.allow_origin(Any) + cors_layer.allow_origin(Any).expose_headers(Any) } } } diff --git a/scripts/run.sh b/scripts/run.sh index 735cd973..e02a0668 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -64,6 +64,9 @@ export RUSTFS_OBS_LOCAL_LOGGING_ENABLED=true # Whether to enable local logging export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory export RUSTFS_OBS_LOG_ROTATION_TIME="hour" # Log rotation time unit, can be "second", "minute", "hour", "day" export RUSTFS_OBS_LOG_ROTATION_SIZE_MB=100 # Log rotation size in MB +export RUSTFS_OBS_LOG_POOL_CAPA=10240 +export RUSTFS_OBS_LOG_MESSAGE_CAPA=32768 +export RUSTFS_OBS_LOG_FLUSH_MS=300 export RUSTFS_SINKS_FILE_PATH="$current_dir/deploy/logs" export RUSTFS_SINKS_FILE_BUFFER_SIZE=12