feat(obs): optimize WriteMode selection logic in init_telemetry (#546)

- Refactor WriteMode selection to ensure all variables moved into thread closures are owned types, preventing lifetime issues.
- Simplify and clarify WriteMode assignment for production and non-production environments.
- Improve code readability and maintainability for logger initialization.
This commit is contained in:
houseme
2025-09-16 08:25:37 +08:00
committed by GitHub
parent cafec06b7e
commit a12a3bedc3
8 changed files with 85 additions and 35 deletions

1
Cargo.lock generated
View File

@@ -2942,6 +2942,7 @@ dependencies = [
"chrono",
"crossbeam-channel",
"crossbeam-queue",
"flate2",
"log",
"notify-debouncer-mini",
"nu-ansi-term",

View File

@@ -123,7 +123,7 @@ derive_builder = "0.20.2"
enumset = "1.1.10"
flatbuffers = "25.2.10"
flate2 = "1.1.2"
flexi_logger = { version = "0.31.2", features = ["trc", "dont_minimize_extra_stacks"] }
flexi_logger = { version = "0.31.2", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] }
form_urlencoded = "1.2.2"
futures = "0.3.31"
futures-core = "0.3.31"

View File

@@ -29,9 +29,28 @@ pub const ENV_OBS_LOG_ROTATION_SIZE_MB: &str = "RUSTFS_OBS_LOG_ROTATION_SIZE_MB"
pub const ENV_OBS_LOG_ROTATION_TIME: &str = "RUSTFS_OBS_LOG_ROTATION_TIME";
pub const ENV_OBS_LOG_KEEP_FILES: &str = "RUSTFS_OBS_LOG_KEEP_FILES";
/// Log pool capacity for async logging
pub const ENV_OBS_LOG_POOL_CAPA: &str = "RUSTFS_OBS_LOG_POOL_CAPA";
/// Log message capacity for async logging
pub const ENV_OBS_LOG_MESSAGE_CAPA: &str = "RUSTFS_OBS_LOG_MESSAGE_CAPA";
/// Log flush interval in milliseconds for async logging
pub const ENV_OBS_LOG_FLUSH_MS: &str = "RUSTFS_OBS_LOG_FLUSH_MS";
/// Default values for log pool
pub const DEFAULT_OBS_LOG_POOL_CAPA: usize = 10240;
/// Default values for message capacity
pub const DEFAULT_OBS_LOG_MESSAGE_CAPA: usize = 32768;
/// Default values for flush interval in milliseconds
pub const DEFAULT_OBS_LOG_FLUSH_MS: u64 = 200;
/// Audit logger queue capacity environment variable key
pub const ENV_AUDIT_LOGGER_QUEUE_CAPACITY: &str = "RUSTFS_AUDIT_LOGGER_QUEUE_CAPACITY";
// Default values for observability configuration
/// Default values for observability configuration
pub const DEFAULT_AUDIT_LOGGER_QUEUE_CAPACITY: usize = 10000;
/// Default values for observability configuration

View File

@@ -40,7 +40,7 @@ rustfs-config = { workspace = true, features = ["constants", "observability"] }
rustfs-utils = { workspace = true, features = ["ip", "path"] }
async-trait = { workspace = true }
chrono = { workspace = true }
flexi_logger = { workspace = true, features = ["trc", "kv"] }
flexi_logger = { workspace = true }
nu-ansi-term = { workspace = true }
nvml-wrapper = { workspace = true, optional = true }
opentelemetry = { workspace = true }
@@ -62,6 +62,7 @@ serde_json = { workspace = true }
sysinfo = { workspace = true }
thiserror = { workspace = true }
# Only enable kafka features and related dependencies on Linux
[target.'cfg(target_os = "linux")'.dependencies]
rdkafka = { workspace = true, features = ["tokio"], optional = true }

View File

@@ -13,15 +13,19 @@
// limitations under the License.
use crate::OtelConfig;
use flexi_logger::{Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode, style};
use flexi_logger::{
Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode,
WriteMode::{AsyncWith, BufferAndFlush},
style,
};
use nu_ansi_term::Color;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{KeyValue, global};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::{
Resource,
logs::SdkLoggerProvider,
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
};
@@ -29,15 +33,19 @@ use opentelemetry_semantic_conventions::{
SCHEMA_URL,
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
};
use rustfs_config::observability::{DEFAULT_OBS_ENVIRONMENT_PRODUCTION, ENV_OBS_LOG_DIRECTORY};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,
observability::{
DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_POOL_CAPA,
ENV_OBS_LOG_DIRECTORY,
},
};
use rustfs_utils::get_local_ip_with_default;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::fs;
use std::io::IsTerminal;
use std::time::Duration;
use std::{env, fs};
use tracing::info;
use tracing_error::ErrorLayer;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
@@ -121,7 +129,7 @@ fn resource(config: &OtelConfig) -> Resource {
/// Creates a periodic reader for stdout metrics
fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout::MetricExporter> {
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default())
.with_interval(std::time::Duration::from_secs(interval))
.with_interval(Duration::from_secs(interval))
.build()
}
@@ -209,7 +217,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(meter_interval))
.with_interval(Duration::from_secs(meter_interval))
.build(),
);
@@ -291,7 +299,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}",
endpoint,
logger_level,
std::env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string())
env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string())
);
}
}
@@ -393,9 +401,18 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
// Choose write mode based on environment
let write_mode = if is_production {
WriteMode::Async
get_env_async_with().unwrap_or_else(|| {
eprintln!(
"Using default Async write mode in production. To customize, set RUSTFS_OBS_LOG_POOL_CAPA, RUSTFS_OBS_LOG_MESSAGE_CAPA, and RUSTFS_OBS_LOG_FLUSH_MS environment variables."
);
AsyncWith {
pool_capa: DEFAULT_OBS_LOG_POOL_CAPA,
message_capa: DEFAULT_OBS_LOG_MESSAGE_CAPA,
flush_interval: Duration::from_millis(DEFAULT_OBS_LOG_FLUSH_MS),
}
})
} else {
WriteMode::BufferAndFlush
BufferAndFlush
};
// Configure the flexi_logger with enhanced error handling
@@ -470,6 +487,26 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
}
}
// Read the AsyncWith parameter from the environment variable
fn get_env_async_with() -> Option<WriteMode> {
let pool_capa = env::var("RUSTFS_OBS_LOG_POOL_CAPA")
.ok()
.and_then(|v| v.parse::<usize>().ok());
let message_capa = env::var("RUSTFS_OBS_LOG_MESSAGE_CAPA")
.ok()
.and_then(|v| v.parse::<usize>().ok());
let flush_ms = env::var("RUSTFS_OBS_LOG_FLUSH_MS").ok().and_then(|v| v.parse::<u64>().ok());
match (pool_capa, message_capa, flush_ms) {
(Some(pool), Some(msg), Some(flush)) => Some(AsyncWith {
pool_capa: pool,
message_capa: msg,
flush_interval: Duration::from_millis(flush),
}),
_ => None,
}
}
fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilter {
let level = default_level.unwrap_or(logger_level);
let mut filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(level));

View File

@@ -16,7 +16,7 @@ use crate::admin::console::static_handler;
use crate::config::Opt;
use axum::{Router, extract::Request, middleware, response::Json, routing::get};
use axum_server::tls_rustls::RustlsConfig;
use http::{HeaderValue, Method, header};
use http::{HeaderValue, Method};
use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_utils::net::parse_and_resolve_address;
use serde_json::json;
@@ -230,15 +230,15 @@ async fn health_check() -> Json<serde_json::Value> {
pub fn parse_cors_origins(origins: Option<&String>) -> CorsLayer {
let cors_layer = CorsLayer::new()
.allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE, Method::OPTIONS])
.allow_headers([header::CONTENT_TYPE, header::AUTHORIZATION, header::ACCEPT, header::ORIGIN]);
.allow_headers(Any);
match origins {
Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any),
Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any).expose_headers(Any),
Some(origins_str) => {
let origins: Vec<&str> = origins_str.split(',').map(|s| s.trim()).collect();
if origins.is_empty() {
warn!("Empty CORS origins provided, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
} else {
// Parse origins with proper error handling
let mut valid_origins = Vec::new();
@@ -255,10 +255,10 @@ pub fn parse_cors_origins(origins: Option<&String>) -> CorsLayer {
if valid_origins.is_empty() {
warn!("No valid CORS origins found, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
} else {
info!("Console CORS origins configured: {:?}", valid_origins);
cors_layer.allow_origin(AllowOrigin::list(valid_origins))
cors_layer.allow_origin(AllowOrigin::list(valid_origins)).expose_headers(Any)
}
}
}

View File

@@ -65,26 +65,15 @@ fn parse_cors_origins(origins: Option<&String>) -> CorsLayer {
Method::HEAD,
Method::OPTIONS,
])
.allow_headers([
http::header::CONTENT_TYPE,
http::header::AUTHORIZATION,
http::header::ACCEPT,
http::header::ORIGIN,
// Note: X_AMZ_* headers are custom and may need to be defined
// http::header::X_AMZ_CONTENT_SHA256,
// http::header::X_AMZ_DATE,
// http::header::X_AMZ_SECURITY_TOKEN,
// http::header::X_AMZ_USER_AGENT,
http::header::RANGE,
]);
.allow_headers(Any);
match origins {
Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any),
Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any).expose_headers(Any),
Some(origins_str) => {
let origins: Vec<&str> = origins_str.split(',').map(|s| s.trim()).collect();
if origins.is_empty() {
warn!("Empty CORS origins provided, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
} else {
// Parse origins with proper error handling
let mut valid_origins = Vec::new();
@@ -101,16 +90,16 @@ fn parse_cors_origins(origins: Option<&String>) -> CorsLayer {
if valid_origins.is_empty() {
warn!("No valid CORS origins found, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
} else {
info!("Endpoint CORS origins configured: {:?}", valid_origins);
cors_layer.allow_origin(AllowOrigin::list(valid_origins))
cors_layer.allow_origin(AllowOrigin::list(valid_origins)).expose_headers(Any)
}
}
}
None => {
debug!("No CORS origins configured for endpoint, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
}
}
}

View File

@@ -64,6 +64,9 @@ export RUSTFS_OBS_LOCAL_LOGGING_ENABLED=true # Whether to enable local logging
export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
export RUSTFS_OBS_LOG_ROTATION_TIME="hour" # Log rotation time unit, can be "second", "minute", "hour", "day"
export RUSTFS_OBS_LOG_ROTATION_SIZE_MB=100 # Log rotation size in MB
export RUSTFS_OBS_LOG_POOL_CAPA=10240
export RUSTFS_OBS_LOG_MESSAGE_CAPA=32768
export RUSTFS_OBS_LOG_FLUSH_MS=300
export RUSTFS_SINKS_FILE_PATH="$current_dir/deploy/logs"
export RUSTFS_SINKS_FILE_BUFFER_SIZE=12