From 86353d98d5a1fc60c7a91aecd6dd93f0fbf47295 Mon Sep 17 00:00:00 2001 From: houseme Date: Thu, 24 Apr 2025 19:04:57 +0800 Subject: [PATCH] feat: add TraceLayer for HTTP service and improve metrics (#361) * improve code for opentelemetry and add system metrics * feat: add TraceLayer for HTTP service and improve metrics - Add TraceLayer to HTTP server for request tracing - Implement system metrics for process monitoring - Optimize init_telemetry method for better resource management - Add graceful shutdown handling for telemetry components - Fix GracefulShutdown ownership issues with Arc wrapper * improve code for init_process_observer * remove tomlfmt.toml * Translation comment * improve code for console CompressionLayer params --- Cargo.lock | 153 +++++++--- Cargo.toml | 46 +-- crates/event-notifier/src/config.rs | 1 - crates/obs/Cargo.toml | 11 +- crates/obs/examples/config.toml | 2 +- crates/obs/examples/server.rs | 9 +- crates/obs/src/config.rs | 2 +- crates/obs/src/global.rs | 28 +- crates/obs/src/lib.rs | 80 +++-- crates/obs/src/logger.rs | 102 ++----- crates/obs/src/system/attributes.rs | 44 +++ crates/obs/src/system/collector.rs | 156 ++++++++++ crates/obs/src/system/gpu.rs | 70 +++++ crates/obs/src/system/metrics.rs | 100 +++++++ crates/obs/src/system/mod.rs | 24 ++ crates/obs/src/telemetry.rs | 361 ++++++++++------------- rustfs/Cargo.toml | 3 +- rustfs/src/console.rs | 2 +- rustfs/src/main.rs | 106 +++++-- s3select/query/src/dispatcher/manager.rs | 3 +- scripts/run.sh | 4 +- 21 files changed, 900 insertions(+), 407 deletions(-) create mode 100644 crates/obs/src/system/attributes.rs create mode 100644 crates/obs/src/system/collector.rs create mode 100644 crates/obs/src/system/gpu.rs create mode 100644 crates/obs/src/system/metrics.rs create mode 100644 crates/obs/src/system/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 53ba1e2b..81d6b4c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -525,7 +525,6 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06575e6a9673580f52661c92107baabffbf41e2141373441cbcdc47cb733003c" dependencies = [ - "brotli", "bzip2", "flate2", "futures-core", @@ -5251,6 +5250,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -5413,6 +5421,29 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3c00a0c9600379bd32f8972de90676a7672cba3bf4886986bc05902afc1e093" +[[package]] +name = "nvml-wrapper" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c9bff0aa1d48904a1385ea2a8b97576fbdcbc9a3cfccd0d31fe978e1c4038c5" +dependencies = [ + "bitflags 2.9.0", + "libloading 0.8.6", + "nvml-wrapper-sys", + "static_assertions", + "thiserror 1.0.69", + "wrapcenum-derive", +] + +[[package]] +name = "nvml-wrapper-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "698d45156f28781a4e79652b6ebe2eaa0589057d588d3aec1333f6466f13fcb5" +dependencies = [ + "libloading 0.8.6", +] + [[package]] name = "objc" version = "0.2.7" @@ -5716,19 +5747,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "opentelemetry-prometheus" -version = "0.29.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "098a71a4430bb712be6130ed777335d2e5b19bc8566de5f2edddfce906def6ab" -dependencies = [ - "once_cell", - "opentelemetry", - "opentelemetry_sdk", - "prometheus", - "tracing", -] - [[package]] name = "opentelemetry-proto" version = "0.29.0" @@ -6487,21 +6505,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "prometheus" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a" -dependencies = [ - "cfg-if", - "fnv", - "lazy_static", - "memchr", - "parking_lot 0.12.3", - "protobuf", - "thiserror 2.0.12", -] - [[package]] name = "prost" version = "0.13.5" @@ -7248,6 +7251,7 @@ dependencies = [ "mime", "mime_guess", "netif", + "opentelemetry", "pin-project-lite", "policy", "prost-build", @@ -7332,18 +7336,19 @@ dependencies = [ "chrono", "config", "local-ip-address", + "nvml-wrapper", "opentelemetry", "opentelemetry-appender-tracing", "opentelemetry-otlp", - "opentelemetry-prometheus", "opentelemetry-semantic-conventions", "opentelemetry-stdout", "opentelemetry_sdk", - "prometheus", "rdkafka", "reqwest", "serde", "serde_json", + "smallvec", + "sysinfo", "thiserror 2.0.12", "tokio", "tracing", @@ -8274,6 +8279,19 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "sysinfo" +version = "0.34.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4b93974b3d3aeaa036504b8eefd4c039dced109171c1ae973f1dc63b2c7e4b2" +dependencies = [ + "libc", + "memchr", + "ntapi", + "objc2-core-foundation", + "windows 0.57.0", +] + [[package]] name = "system-configuration" version = "0.6.1" @@ -8342,7 +8360,7 @@ dependencies = [ "tao-macros", "unicode-segmentation", "url", - "windows", + "windows 0.58.0", "windows-core 0.58.0", "windows-version", "x11-dl", @@ -9483,7 +9501,7 @@ checksum = "6f61ff3d9d0ee4efcb461b14eb3acfda2702d10dc329f339303fc3e57215ae2c" dependencies = [ "webview2-com-macros", "webview2-com-sys", - "windows", + "windows 0.58.0", "windows-core 0.58.0", "windows-implement 0.58.0", "windows-interface 0.58.0", @@ -9507,7 +9525,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3a3e2eeb58f82361c93f9777014668eb3d07e7d174ee4c819575a9208011886" dependencies = [ "thiserror 1.0.69", - "windows", + "windows 0.58.0", "windows-core 0.58.0", ] @@ -9554,6 +9572,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143" +dependencies = [ + "windows-core 0.57.0", + "windows-targets 0.52.6", +] + [[package]] name = "windows" version = "0.58.0" @@ -9564,6 +9592,18 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d" +dependencies = [ + "windows-implement 0.57.0", + "windows-interface 0.57.0", + "windows-result 0.1.2", + "windows-targets 0.52.6", +] + [[package]] name = "windows-core" version = "0.58.0" @@ -9590,6 +9630,17 @@ dependencies = [ "windows-strings 0.4.0", ] +[[package]] +name = "windows-implement" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "windows-implement" version = "0.58.0" @@ -9612,6 +9663,17 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "windows-interface" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "windows-interface" version = "0.58.0" @@ -9651,6 +9713,15 @@ dependencies = [ "windows-targets 0.53.0", ] +[[package]] +name = "windows-result" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-result" version = "0.2.0" @@ -10020,6 +10091,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "wrapcenum-derive" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a76ff259533532054cfbaefb115c613203c73707017459206380f03b3b3f266e" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "write16" version = "1.0.0" @@ -10066,7 +10149,7 @@ dependencies = [ "webkit2gtk", "webkit2gtk-sys", "webview2-com", - "windows", + "windows 0.58.0", "windows-core 0.58.0", "windows-version", "x11-dl", diff --git a/Cargo.toml b/Cargo.toml index d0eb484d..b9eb8dfd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,21 +1,21 @@ [workspace] members = [ - "madmin", # Management dashboard and admin API interface - "rustfs", # Core file system implementation - "ecstore", # Erasure coding storage implementation - "e2e_test", # End-to-end test suite - "common/common", # Shared utilities and data structures - "common/lock", # Distributed locking implementation - "common/protos", # Protocol buffer definitions - "common/workers", # Worker thread pools and task scheduling - "iam", # Identity and Access Management - "crypto", # Cryptography and security features - "cli/rustfs-gui", # Graphical user interface client - "crates/obs", # Observability utilities + "madmin", # Management dashboard and admin API interface + "rustfs", # Core file system implementation + "ecstore", # Erasure coding storage implementation + "e2e_test", # End-to-end test suite + "common/common", # Shared utilities and data structures + "common/lock", # Distributed locking implementation + "common/protos", # Protocol buffer definitions + "common/workers", # Worker thread pools and task scheduling + "iam", # Identity and Access Management + "crypto", # Cryptography and security features + "cli/rustfs-gui", # Graphical user interface client + "crates/obs", # Observability utilities "crates/event-notifier", # Event notification system - "s3select/api", # S3 Select API interface - "s3select/query", # S3 Select query engine - "appauth", # Application authentication and authorization + "s3select/api", # S3 Select API interface + "s3select/query", # S3 Select query engine + "appauth", # Application authentication and authorization ] resolver = "2" @@ -93,6 +93,7 @@ md-5 = "0.10.6" mime = "0.3.17" mime_guess = "2.0.5" netif = "0.1.6" +nvml-wrapper = "0.10.0" object_store = "0.11.2" opentelemetry = { version = "0.29.1" } opentelemetry-appender-tracing = { version = "0.29.1", features = [ @@ -102,10 +103,7 @@ opentelemetry-appender-tracing = { version = "0.29.1", features = [ opentelemetry_sdk = { version = "0.29.0" } opentelemetry-stdout = { version = "0.29.0" } opentelemetry-otlp = { version = "0.29.0" } -opentelemetry-prometheus = { version = "0.29.1" } -opentelemetry-semantic-conventions = { version = "0.29.0", features = [ - "semconv_experimental", -] } +opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] } parking_lot = "0.12.3" pin-project-lite = "0.2.16" prometheus = "0.14.0" @@ -143,10 +141,11 @@ serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" serde_urlencoded = "0.7.1" serde_with = "3.12.0" -smallvec = { version = "1.15.0", features = ["serde"] } -strum = { version = "0.27.1", features = ["derive"] } sha2 = "0.10.8" +smallvec = { version = "1.15.0", features = ["serde"] } snafu = "0.8.5" +strum = { version = "0.27.1", features = ["derive"] } +sysinfo = "0.34.2" tempfile = "3.19.1" test-case = "3.3.1" thiserror = "2.0.12" @@ -192,7 +191,10 @@ inherits = "dev" [profile.release] opt-level = 3 -lto = "thin" +lto = "fat" +codegen-units = 1 +panic = "abort" # Optional, remove the panic expansion code +strip = true # strip symbol information to reduce binary size [profile.production] inherits = "release" diff --git a/crates/event-notifier/src/config.rs b/crates/event-notifier/src/config.rs index ae718abc..ab46d8e8 100644 --- a/crates/event-notifier/src/config.rs +++ b/crates/event-notifier/src/config.rs @@ -149,7 +149,6 @@ impl NotifierConfig { ) .build() .unwrap_or_default(); - println!("Loaded config: {:?}", app_config); match app_config.try_deserialize::() { Ok(app_config) => { println!("Parsed AppConfig: {:?} \n", app_config); diff --git a/crates/obs/Cargo.toml b/crates/obs/Cargo.toml index 4290b76b..f6d9c478 100644 --- a/crates/obs/Cargo.toml +++ b/crates/obs/Cargo.toml @@ -11,32 +11,35 @@ workspace = true [features] default = ["file"] +file = [] +gpu = ["dep:nvml-wrapper"] kafka = ["dep:rdkafka"] webhook = ["dep:reqwest"] -file = [] +full = ["file", "gpu", "kafka", "webhook"] [dependencies] async-trait = { workspace = true } chrono = { workspace = true } config = { workspace = true } +nvml-wrapper = { workspace = true, optional = true } opentelemetry = { workspace = true } opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] } opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } opentelemetry-stdout = { workspace = true } opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "gzip-tonic"] } -opentelemetry-prometheus = { workspace = true } opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] } -prometheus = { workspace = true } serde = { workspace = true } +smallvec = { workspace = true, features = ["serde"] } tracing = { workspace = true, features = ["std", "attributes"] } tracing-core = { workspace = true } tracing-error = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt", "env-filter", "tracing-log", "time", "local-time", "json"] } -tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread"] } +tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread", "rt", "time", "macros"] } rdkafka = { workspace = true, features = ["tokio"], optional = true } reqwest = { workspace = true, optional = true, default-features = false } serde_json = { workspace = true } +sysinfo = { workspace = true } thiserror = { workspace = true } local-ip-address = { workspace = true } diff --git a/crates/obs/examples/config.toml b/crates/obs/examples/config.toml index 929867f0..c1b3df14 100644 --- a/crates/obs/examples/config.toml +++ b/crates/obs/examples/config.toml @@ -25,7 +25,7 @@ batch_timeout_ms = 1000 # Default is 100ms if not specified [sinks.file] enabled = true -path = "logs/app.log" +path = "deploy/logs/app.log" batch_size = 100 batch_timeout_ms = 1000 # Default is 8192 bytes if not specified diff --git a/crates/obs/examples/server.rs b/crates/obs/examples/server.rs index 55977b10..0310c158 100644 --- a/crates/obs/examples/server.rs +++ b/crates/obs/examples/server.rs @@ -1,8 +1,8 @@ use opentelemetry::global; -use rustfs_obs::{get_logger, init_obs, load_config, log_info, BaseLogEntry, ServerLogEntry}; +use rustfs_obs::{get_logger, init_obs, init_process_observer, load_config, log_info, BaseLogEntry, ServerLogEntry}; use std::collections::HashMap; use std::time::{Duration, SystemTime}; -use tracing::{info, instrument}; +use tracing::{error, info, instrument}; use tracing_core::Level; #[tokio::main] @@ -38,6 +38,11 @@ async fn run(bucket: String, object: String, user: String, service_name: String) &[opentelemetry::KeyValue::new("operation", "run")], ); + match init_process_observer(meter).await { + Ok(_) => info!("Process observer initialized successfully"), + Err(e) => error!("Failed to initialize process observer: {:?}", e), + } + let base_entry = BaseLogEntry::new() .message(Some("run logger api_handler info".to_string())) .request_id(Some("request_id".to_string())) diff --git a/crates/obs/src/config.rs b/crates/obs/src/config.rs index a9e62016..33f974f9 100644 --- a/crates/obs/src/config.rs +++ b/crates/obs/src/config.rs @@ -178,7 +178,6 @@ pub struct AppConfig { pub logger: Option, } -// 为 AppConfig 实现 Default impl AppConfig { pub fn new() -> Self { Self { @@ -189,6 +188,7 @@ impl AppConfig { } } +// implement default for AppConfig impl Default for AppConfig { fn default() -> Self { Self::new() diff --git a/crates/obs/src/global.rs b/crates/obs/src/global.rs index 41c5e555..8d392ad9 100644 --- a/crates/obs/src/global.rs +++ b/crates/obs/src/global.rs @@ -16,11 +16,27 @@ static GLOBAL_GUARD: OnceCell>> = OnceCell::const_new(); /// Error type for global guard operations #[derive(Debug, thiserror::Error)] -pub enum GuardError { +pub enum GlobalError { #[error("Failed to set global guard: {0}")] SetError(#[from] SetError>>), #[error("Global guard not initialized")] NotInitialized, + #[error("Global system metrics err: {0}")] + MetricsError(String), + #[error("Failed to get current PID: {0}")] + PidError(String), + #[error("Process with PID {0} not found")] + ProcessNotFound(u32), + #[error("Failed to get physical core count")] + CoreCountError, + #[error("GPU initialization failed: {0}")] + GpuInitError(String), + #[error("GPU device not found: {0}")] + GpuDeviceError(String), + #[error("Failed to send log: {0}")] + SendFailed(&'static str), + #[error("Operation timed out: {0}")] + Timeout(&'static str), } /// Set the global guard for OpenTelemetry @@ -43,9 +59,9 @@ pub enum GuardError { /// Ok(()) /// } /// ``` -pub fn set_global_guard(guard: OtelGuard) -> Result<(), GuardError> { +pub fn set_global_guard(guard: OtelGuard) -> Result<(), GlobalError> { info!("Initializing global OpenTelemetry guard"); - GLOBAL_GUARD.set(Arc::new(Mutex::new(guard))).map_err(GuardError::SetError) + GLOBAL_GUARD.set(Arc::new(Mutex::new(guard))).map_err(GlobalError::SetError) } /// Get the global guard for OpenTelemetry @@ -65,8 +81,8 @@ pub fn set_global_guard(guard: OtelGuard) -> Result<(), GuardError> { /// Ok(()) /// } /// ``` -pub fn get_global_guard() -> Result>, GuardError> { - GLOBAL_GUARD.get().cloned().ok_or(GuardError::NotInitialized) +pub fn get_global_guard() -> Result>, GlobalError> { + GLOBAL_GUARD.get().cloned().ok_or(GlobalError::NotInitialized) } /// Try to get the global guard for OpenTelemetry @@ -84,6 +100,6 @@ mod tests { #[tokio::test] async fn test_get_uninitialized_guard() { let result = get_global_guard(); - assert!(matches!(result, Err(GuardError::NotInitialized))); + assert!(matches!(result, Err(GlobalError::NotInitialized))); } } diff --git a/crates/obs/src/lib.rs b/crates/obs/src/lib.rs index 843c5236..d41d0e66 100644 --- a/crates/obs/src/lib.rs +++ b/crates/obs/src/lib.rs @@ -1,23 +1,24 @@ -/// # obs -/// -/// `obs` is a logging and observability library for Rust. -/// It provides a simple and easy-to-use interface for logging and observability. -/// It is built on top of the `log` crate and `opentelemetry` crate. -/// -/// ## Features -/// - Structured logging -/// - Distributed tracing -/// - Metrics collection -/// - Log processing worker -/// - Multiple sinks -/// - Configuration-based setup -/// - Telemetry guard -/// - Global logger -/// - Log levels -/// - Log entry types -/// - Log record -/// - Object version -/// - Local IP address +//! # RustFS Observability +//! +//! provides tools for system and service monitoring +//! +//! ## feature mark +//! +//! - `file`: enable file logging enabled by default +//! - `gpu`: gpu monitoring function +//! - `kafka`: enable kafka metric output +//! - `webhook`: enable webhook notifications +//! - `full`: includes all functions +//! +//! to enable gpu monitoring add in cargo toml +//! +//! ```toml +//! # using gpu monitoring +//! rustfs-obs = { version = "0.1.0", features = ["gpu"] } +//! +//! # use all functions +//! rustfs-obs = { version = "0.1.0", features = ["full"] } +//! ``` /// /// ## Usage /// @@ -32,28 +33,34 @@ mod entry; mod global; mod logger; mod sink; +mod system; mod telemetry; mod utils; mod worker; + +use crate::logger::InitLogStatus; pub use config::load_config; -pub use config::{AppConfig, OtelConfig}; +#[cfg(feature = "file")] +pub use config::FileSinkConfig; +#[cfg(feature = "kafka")] +pub use config::KafkaSinkConfig; +#[cfg(feature = "webhook")] +pub use config::WebhookSinkConfig; +pub use config::{AppConfig, LoggerConfig, OtelConfig, SinkConfig}; pub use entry::args::Args; pub use entry::audit::{ApiDetails, AuditLogEntry}; pub use entry::base::BaseLogEntry; pub use entry::unified::{ConsoleLogEntry, ServerLogEntry, UnifiedLogEntry}; pub use entry::{LogKind, LogRecord, ObjectVersion, SerializableLevel}; -pub use global::{get_global_guard, set_global_guard, try_get_global_guard, GuardError}; -pub use logger::{ensure_logger_initialized, log_debug, log_error, log_info, log_trace, log_warn, log_with_context}; -pub use logger::{get_global_logger, init_global_logger, locked_logger, start_logger}; -pub use logger::{log_init_state, InitLogStatus}; -pub use logger::{LogError, Logger}; -pub use sink::Sink; +pub use global::{get_global_guard, set_global_guard, try_get_global_guard, GlobalError}; +pub use logger::Logger; +pub use logger::{get_global_logger, init_global_logger, start_logger}; +pub use logger::{log_debug, log_error, log_info, log_trace, log_warn, log_with_context}; use std::sync::Arc; +pub use system::{init_process_observer, init_process_observer_for_pid}; pub use telemetry::init_telemetry; -pub use telemetry::{get_global_registry, metrics}; use tokio::sync::Mutex; -pub use utils::{get_local_ip, get_local_ip_with_default}; -pub use worker::start_worker; +use tracing::{error, info}; /// Initialize the observability module /// @@ -74,6 +81,19 @@ pub async fn init_obs(config: AppConfig) -> (Arc>, telemetry::Otel let guard = init_telemetry(&config.observability); let sinks = sink::create_sinks(&config).await; let logger = init_global_logger(&config, sinks).await; + let obs_config = config.observability.clone(); + tokio::spawn(async move { + let result = InitLogStatus::init_start_log(&obs_config).await; + match result { + Ok(_) => { + info!("Logger initialized successfully"); + } + Err(e) => { + error!("Failed to initialize logger: {}", e); + } + } + }); + (logger, guard) } diff --git a/crates/obs/src/logger.rs b/crates/obs/src/logger.rs index 1e62712c..6329ab8f 100644 --- a/crates/obs/src/logger.rs +++ b/crates/obs/src/logger.rs @@ -1,5 +1,6 @@ use crate::global::{ENVIRONMENT, SERVICE_NAME, SERVICE_VERSION}; -use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, OtelConfig, ServerLogEntry, Sink, UnifiedLogEntry}; +use crate::sink::Sink; +use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, GlobalError, OtelConfig, ServerLogEntry, UnifiedLogEntry}; use std::sync::Arc; use std::time::SystemTime; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -43,26 +44,26 @@ impl Logger { /// Log a server entry #[tracing::instrument(skip(self), fields(log_source = "logger_server"))] - pub async fn log_server_entry(&self, entry: ServerLogEntry) -> Result<(), LogError> { + pub async fn log_server_entry(&self, entry: ServerLogEntry) -> Result<(), GlobalError> { self.log_entry(UnifiedLogEntry::Server(entry)).await } /// Log an audit entry #[tracing::instrument(skip(self), fields(log_source = "logger_audit"))] - pub async fn log_audit_entry(&self, entry: AuditLogEntry) -> Result<(), LogError> { + pub async fn log_audit_entry(&self, entry: AuditLogEntry) -> Result<(), GlobalError> { self.log_entry(UnifiedLogEntry::Audit(Box::new(entry))).await } /// Log a console entry #[tracing::instrument(skip(self), fields(log_source = "logger_console"))] - pub async fn log_console_entry(&self, entry: ConsoleLogEntry) -> Result<(), LogError> { + pub async fn log_console_entry(&self, entry: ConsoleLogEntry) -> Result<(), GlobalError> { self.log_entry(UnifiedLogEntry::Console(entry)).await } /// Asynchronous logging of unified log entries #[tracing::instrument(skip(self), fields(log_source = "logger"))] #[tracing::instrument(level = "error", skip_all)] - pub async fn log_entry(&self, entry: UnifiedLogEntry) -> Result<(), LogError> { + pub async fn log_entry(&self, entry: UnifiedLogEntry) -> Result<(), GlobalError> { // Extract information for tracing based on entry type match &entry { UnifiedLogEntry::Server(server) => { @@ -123,11 +124,11 @@ impl Logger { tracing::warn!("Log queue full, applying backpressure"); match tokio::time::timeout(std::time::Duration::from_millis(500), self.sender.send(entry)).await { Ok(Ok(_)) => Ok(()), - Ok(Err(_)) => Err(LogError::SendFailed("Channel closed")), - Err(_) => Err(LogError::Timeout("Queue backpressure timeout")), + Ok(Err(_)) => Err(GlobalError::SendFailed("Channel closed")), + Err(_) => Err(GlobalError::Timeout("Queue backpressure timeout")), } } - Err(mpsc::error::TrySendError::Closed(_)) => Err(LogError::SendFailed("Logger channel closed")), + Err(mpsc::error::TrySendError::Closed(_)) => Err(GlobalError::SendFailed("Logger channel closed")), } } @@ -160,7 +161,7 @@ impl Logger { request_id: Option, user_id: Option, fields: Vec<(String, String)>, - ) -> Result<(), LogError> { + ) -> Result<(), GlobalError> { let base = BaseLogEntry::new().message(Some(message.to_string())).request_id(request_id); let server_entry = ServerLogEntry::new(level, source.to_string()) @@ -190,7 +191,7 @@ impl Logger { /// let _ = logger.write("This is an information message", "example", Level::INFO).await; /// } /// ``` - pub async fn write(&self, message: &str, source: &str, level: Level) -> Result<(), LogError> { + pub async fn write(&self, message: &str, source: &str, level: Level) -> Result<(), GlobalError> { self.write_with_context(message, source, level, None, None, Vec::new()).await } @@ -208,31 +209,12 @@ impl Logger { /// let _ = logger.shutdown().await; /// } /// ``` - pub async fn shutdown(self) -> Result<(), LogError> { + pub async fn shutdown(self) -> Result<(), GlobalError> { drop(self.sender); //Close the sending end so that the receiver knows that there is no new message Ok(()) } } -/// Log error type -/// This enum defines the error types that can occur when logging. -/// It is used to provide more detailed error information. -/// # Example -/// ``` -/// use rustfs_obs::LogError; -/// use thiserror::Error; -/// -/// LogError::SendFailed("Failed to send log"); -/// LogError::Timeout("Operation timed out"); -/// ``` -#[derive(Debug, thiserror::Error)] -pub enum LogError { - #[error("Failed to send log: {0}")] - SendFailed(&'static str), - #[error("Operation timed out: {0}")] - Timeout(&'static str), -} - /// Start the log module /// This function starts the log module. /// It initializes the logger and starts the worker to process logs. @@ -297,48 +279,6 @@ pub fn get_global_logger() -> &'static Arc> { GLOBAL_LOGGER.get().expect("Logger not initialized") } -/// Get the global logger instance with a lock -/// This function returns a reference to the global logger instance with a lock. -/// It is used to ensure that the logger is thread-safe. -/// -/// # Returns -/// A reference to the global logger instance with a lock -/// -/// # Example -/// ``` -/// use rustfs_obs::locked_logger; -/// -/// async fn example() { -/// let logger = locked_logger().await; -/// } -/// ``` -pub async fn locked_logger() -> tokio::sync::MutexGuard<'static, Logger> { - get_global_logger().lock().await -} - -/// Initialize with default empty logger if needed (optional) -/// This function initializes the logger with a default empty logger if needed. -/// It is used to ensure that the logger is initialized before logging. -/// -/// # Returns -/// A reference to the global logger instance -/// -/// # Example -/// ``` -/// use rustfs_obs::ensure_logger_initialized; -/// -/// let logger = ensure_logger_initialized(); -/// ``` -pub fn ensure_logger_initialized() -> &'static Arc> { - if GLOBAL_LOGGER.get().is_none() { - let config = AppConfig::default(); - let sinks = vec![]; - let logger = Arc::new(Mutex::new(start_logger(&config, sinks))); - let _ = GLOBAL_LOGGER.set(logger); - } - GLOBAL_LOGGER.get().unwrap() -} - /// Log information /// This function logs information messages. /// @@ -357,7 +297,7 @@ pub fn ensure_logger_initialized() -> &'static Arc> { /// let _ = log_info("This is an information message", "example").await; /// } /// ``` -pub async fn log_info(message: &str, source: &str) -> Result<(), LogError> { +pub async fn log_info(message: &str, source: &str) -> Result<(), GlobalError> { get_global_logger().lock().await.write(message, source, Level::INFO).await } @@ -375,7 +315,7 @@ pub async fn log_info(message: &str, source: &str) -> Result<(), LogError> { /// async fn example() { /// let _ = log_error("This is an error message", "example").await; /// } -pub async fn log_error(message: &str, source: &str) -> Result<(), LogError> { +pub async fn log_error(message: &str, source: &str) -> Result<(), GlobalError> { get_global_logger().lock().await.write(message, source, Level::ERROR).await } @@ -395,7 +335,7 @@ pub async fn log_error(message: &str, source: &str) -> Result<(), LogError> { /// let _ = log_warn("This is a warning message", "example").await; /// } /// ``` -pub async fn log_warn(message: &str, source: &str) -> Result<(), LogError> { +pub async fn log_warn(message: &str, source: &str) -> Result<(), GlobalError> { get_global_logger().lock().await.write(message, source, Level::WARN).await } @@ -415,7 +355,7 @@ pub async fn log_warn(message: &str, source: &str) -> Result<(), LogError> { /// let _ = log_debug("This is a debug message", "example").await; /// } /// ``` -pub async fn log_debug(message: &str, source: &str) -> Result<(), LogError> { +pub async fn log_debug(message: &str, source: &str) -> Result<(), GlobalError> { get_global_logger().lock().await.write(message, source, Level::DEBUG).await } @@ -436,7 +376,7 @@ pub async fn log_debug(message: &str, source: &str) -> Result<(), LogError> { /// let _ = log_trace("This is a trace message", "example").await; /// } /// ``` -pub async fn log_trace(message: &str, source: &str) -> Result<(), LogError> { +pub async fn log_trace(message: &str, source: &str) -> Result<(), GlobalError> { get_global_logger().lock().await.write(message, source, Level::TRACE).await } @@ -467,7 +407,7 @@ pub async fn log_with_context( request_id: Option, user_id: Option, fields: Vec<(String, String)>, -) -> Result<(), LogError> { +) -> Result<(), GlobalError> { get_global_logger() .lock() .await @@ -477,7 +417,7 @@ pub async fn log_with_context( /// Log initialization status #[derive(Debug)] -pub struct InitLogStatus { +pub(crate) struct InitLogStatus { pub timestamp: SystemTime, pub service_name: String, pub version: String, @@ -508,14 +448,14 @@ impl InitLogStatus { } } - pub async fn init_start_log(config: &OtelConfig) -> Result<(), LogError> { + pub async fn init_start_log(config: &OtelConfig) -> Result<(), GlobalError> { let status = Self::new_config(config); log_init_state(Some(status)).await } } /// Log initialization details during system startup -pub async fn log_init_state(status: Option) -> Result<(), LogError> { +async fn log_init_state(status: Option) -> Result<(), GlobalError> { let status = status.unwrap_or_default(); let base_entry = BaseLogEntry::new() diff --git a/crates/obs/src/system/attributes.rs b/crates/obs/src/system/attributes.rs new file mode 100644 index 00000000..289b4b66 --- /dev/null +++ b/crates/obs/src/system/attributes.rs @@ -0,0 +1,44 @@ +use crate::GlobalError; +use opentelemetry::KeyValue; +use sysinfo::{Pid, System}; + +pub const PROCESS_PID: opentelemetry::Key = opentelemetry::Key::from_static_str("process.pid"); +pub const PROCESS_EXECUTABLE_NAME: opentelemetry::Key = opentelemetry::Key::from_static_str("process.executable.name"); +pub const PROCESS_EXECUTABLE_PATH: opentelemetry::Key = opentelemetry::Key::from_static_str("process.executable.path"); +pub const PROCESS_COMMAND: opentelemetry::Key = opentelemetry::Key::from_static_str("process.command"); + +/// Struct to hold process attributes +pub struct ProcessAttributes { + pub attributes: Vec, +} + +impl ProcessAttributes { + /// Creates a new instance of `ProcessAttributes` for the given PID. + pub fn new(pid: Pid, system: &mut System) -> Result { + system.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true); + let process = system + .process(pid) + .ok_or_else(|| GlobalError::ProcessNotFound(pid.as_u32()))?; + + let attributes = vec![ + KeyValue::new(PROCESS_PID, pid.as_u32() as i64), + KeyValue::new(PROCESS_EXECUTABLE_NAME, process.name().to_os_string().into_string().unwrap_or_default()), + KeyValue::new( + PROCESS_EXECUTABLE_PATH, + process + .exe() + .map(|path| path.to_string_lossy().into_owned()) + .unwrap_or_default(), + ), + KeyValue::new( + PROCESS_COMMAND, + process + .cmd() + .iter() + .fold(String::new(), |t1, t2| t1 + " " + t2.to_str().unwrap_or_default()), + ), + ]; + + Ok(ProcessAttributes { attributes }) + } +} diff --git a/crates/obs/src/system/collector.rs b/crates/obs/src/system/collector.rs new file mode 100644 index 00000000..c4184051 --- /dev/null +++ b/crates/obs/src/system/collector.rs @@ -0,0 +1,156 @@ +use crate::system::attributes::ProcessAttributes; +use crate::system::gpu::GpuCollector; +use crate::system::metrics::{Metrics, DIRECTION, INTERFACE, STATUS}; +use crate::GlobalError; +use opentelemetry::KeyValue; +use std::time::SystemTime; +use sysinfo::{Networks, Pid, ProcessStatus, System}; +use tokio::time::{sleep, Duration}; + +/// Collector is responsible for collecting system metrics and attributes. +/// It uses the sysinfo crate to gather information about the system and processes. +/// It also uses OpenTelemetry to record metrics. +pub struct Collector { + metrics: Metrics, + attributes: ProcessAttributes, + gpu_collector: GpuCollector, + pid: Pid, + system: System, + networks: Networks, + core_count: usize, + interval_ms: u64, +} + +impl Collector { + pub fn new(pid: Pid, meter: opentelemetry::metrics::Meter, interval_ms: u64) -> Result { + let mut system = System::new_all(); + let attributes = ProcessAttributes::new(pid, &mut system)?; + let core_count = System::physical_core_count().ok_or(GlobalError::CoreCountError)?; + let metrics = Metrics::new(&meter); + let gpu_collector = GpuCollector::new(pid)?; + let networks = Networks::new_with_refreshed_list(); + + Ok(Collector { + metrics, + attributes, + gpu_collector, + pid, + system, + networks, + core_count, + interval_ms, + }) + } + + pub async fn run(&mut self) -> Result<(), GlobalError> { + loop { + self.collect()?; + tracing::debug!("Collected metrics for PID: {} ,time: {:?}", self.pid, SystemTime::now()); + sleep(Duration::from_millis(self.interval_ms)).await; + } + } + + fn collect(&mut self) -> Result<(), GlobalError> { + self.system + .refresh_processes(sysinfo::ProcessesToUpdate::Some(&[self.pid]), true); + + // refresh the network interface list and statistics + self.networks.refresh(false); + + let process = self + .system + .process(self.pid) + .ok_or_else(|| GlobalError::ProcessNotFound(self.pid.as_u32()))?; + + // CPU metrics + let cpu_usage = process.cpu_usage(); + self.metrics.cpu_usage.record(cpu_usage as f64, &[]); + self.metrics + .cpu_utilization + .record((cpu_usage / self.core_count as f32) as f64, &self.attributes.attributes); + + // Memory metrics + self.metrics + .memory_usage + .record(process.memory() as i64, &self.attributes.attributes); + self.metrics + .memory_virtual + .record(process.virtual_memory() as i64, &self.attributes.attributes); + + // Disk I/O metrics + let disk_io = process.disk_usage(); + self.metrics.disk_io.record( + disk_io.read_bytes as i64, + &[&self.attributes.attributes[..], &[KeyValue::new(DIRECTION, "read")]].concat(), + ); + self.metrics.disk_io.record( + disk_io.written_bytes as i64, + &[&self.attributes.attributes[..], &[KeyValue::new(DIRECTION, "write")]].concat(), + ); + + // Network I/O indicators (corresponding to /system/network/internode) + let mut total_received: i64 = 0; + let mut total_transmitted: i64 = 0; + + // statistics by interface + for (interface_name, data) in self.networks.iter() { + total_received += data.total_received() as i64; + total_transmitted += data.total_transmitted() as i64; + + let received = data.received() as i64; + let transmitted = data.transmitted() as i64; + self.metrics.network_io_per_interface.record( + received, + &[ + &self.attributes.attributes[..], + &[ + KeyValue::new(INTERFACE, interface_name.to_string()), + KeyValue::new(DIRECTION, "received"), + ], + ] + .concat(), + ); + self.metrics.network_io_per_interface.record( + transmitted, + &[ + &self.attributes.attributes[..], + &[ + KeyValue::new(INTERFACE, interface_name.to_string()), + KeyValue::new(DIRECTION, "transmitted"), + ], + ] + .concat(), + ); + } + // global statistics + self.metrics.network_io.record( + total_received, + &[&self.attributes.attributes[..], &[KeyValue::new(DIRECTION, "received")]].concat(), + ); + self.metrics.network_io.record( + total_transmitted, + &[&self.attributes.attributes[..], &[KeyValue::new(DIRECTION, "transmitted")]].concat(), + ); + + // Process status indicator (corresponding to /system/process) + let status_value = match process.status() { + ProcessStatus::Run => 0, + ProcessStatus::Sleep => 1, + ProcessStatus::Zombie => 2, + _ => 3, // other status + }; + self.metrics.process_status.record( + status_value, + &[ + &self.attributes.attributes[..], + &[KeyValue::new(STATUS, format!("{:?}", process.status()))], + ] + .concat(), + ); + + // GPU Metrics (Optional) Non-MacOS + self.gpu_collector.collect(&self.metrics, &self.attributes)?; + + Ok(()) + } +} diff --git a/crates/obs/src/system/gpu.rs b/crates/obs/src/system/gpu.rs new file mode 100644 index 00000000..ce47f2c5 --- /dev/null +++ b/crates/obs/src/system/gpu.rs @@ -0,0 +1,70 @@ +#[cfg(feature = "gpu")] +use crate::system::attributes::ProcessAttributes; +#[cfg(feature = "gpu")] +use crate::system::metrics::Metrics; +#[cfg(feature = "gpu")] +use crate::GlobalError; +#[cfg(feature = "gpu")] +use nvml_wrapper::enums::device::UsedGpuMemory; +#[cfg(feature = "gpu")] +use nvml_wrapper::Nvml; +#[cfg(feature = "gpu")] +use sysinfo::Pid; +#[cfg(feature = "gpu")] +use tracing::warn; + +/// `GpuCollector` is responsible for collecting GPU memory usage metrics. +#[cfg(feature = "gpu")] +pub struct GpuCollector { + nvml: Nvml, + pid: Pid, +} + +#[cfg(feature = "gpu")] +impl GpuCollector { + pub fn new(pid: Pid) -> Result { + let nvml = Nvml::init().map_err(|e| GlobalError::GpuInitError(e.to_string()))?; + Ok(GpuCollector { nvml, pid }) + } + + pub fn collect(&self, metrics: &Metrics, attributes: &ProcessAttributes) -> Result<(), GlobalError> { + if let Ok(device) = self.nvml.device_by_index(0) { + if let Ok(gpu_stats) = device.running_compute_processes() { + for stat in gpu_stats.iter() { + if stat.pid == self.pid.as_u32() { + let memory_used = match stat.used_gpu_memory { + UsedGpuMemory::Used(bytes) => bytes, + UsedGpuMemory::Unavailable => 0, + }; + metrics.gpu_memory_usage.record(memory_used, &attributes.attributes); + return Ok(()); + } + } + } else { + warn!("Could not get GPU stats, recording 0 for GPU memory usage"); + } + } else { + return Err(GlobalError::GpuDeviceError("No GPU device found".to_string())); + } + metrics.gpu_memory_usage.record(0, &attributes.attributes); + Ok(()) + } +} + +#[cfg(not(feature = "gpu"))] +pub struct GpuCollector; + +#[cfg(not(feature = "gpu"))] +impl GpuCollector { + pub fn new(_pid: sysinfo::Pid) -> Result { + Ok(GpuCollector) + } + + pub fn collect( + &self, + _metrics: &crate::system::metrics::Metrics, + _attributes: &crate::system::attributes::ProcessAttributes, + ) -> Result<(), crate::GlobalError> { + Ok(()) + } +} diff --git a/crates/obs/src/system/metrics.rs b/crates/obs/src/system/metrics.rs new file mode 100644 index 00000000..114c285e --- /dev/null +++ b/crates/obs/src/system/metrics.rs @@ -0,0 +1,100 @@ +pub const PROCESS_CPU_USAGE: &str = "process.cpu.usage"; +pub const PROCESS_CPU_UTILIZATION: &str = "process.cpu.utilization"; +pub const PROCESS_MEMORY_USAGE: &str = "process.memory.usage"; +pub const PROCESS_MEMORY_VIRTUAL: &str = "process.memory.virtual"; +pub const PROCESS_DISK_IO: &str = "process.disk.io"; +pub const PROCESS_NETWORK_IO: &str = "process.network.io"; +pub const PROCESS_NETWORK_IO_PER_INTERFACE: &str = "process.network.io.per_interface"; +pub const PROCESS_STATUS: &str = "process.status"; +#[cfg(feature = "gpu")] +pub const PROCESS_GPU_MEMORY_USAGE: &str = "process.gpu.memory.usage"; +pub const DIRECTION: opentelemetry::Key = opentelemetry::Key::from_static_str("direction"); +pub const STATUS: opentelemetry::Key = opentelemetry::Key::from_static_str("status"); +pub const INTERFACE: opentelemetry::Key = opentelemetry::Key::from_static_str("interface"); + +/// `Metrics` struct holds the OpenTelemetry metrics for process monitoring. +/// It contains various metrics such as CPU usage, memory usage, +/// disk I/O, network I/O, and process status. +/// +/// The `Metrics` struct is designed to be used with OpenTelemetry's +/// metrics API to record and export these metrics. +/// +/// The `new` method initializes the metrics using the provided +/// `opentelemetry::metrics::Meter`. +pub struct Metrics { + pub cpu_usage: opentelemetry::metrics::Gauge, + pub cpu_utilization: opentelemetry::metrics::Gauge, + pub memory_usage: opentelemetry::metrics::Gauge, + pub memory_virtual: opentelemetry::metrics::Gauge, + pub disk_io: opentelemetry::metrics::Gauge, + pub network_io: opentelemetry::metrics::Gauge, + pub network_io_per_interface: opentelemetry::metrics::Gauge, + pub process_status: opentelemetry::metrics::Gauge, + #[cfg(feature = "gpu")] + pub gpu_memory_usage: opentelemetry::metrics::Gauge, +} + +impl Metrics { + pub fn new(meter: &opentelemetry::metrics::Meter) -> Self { + let cpu_usage = meter + .f64_gauge(PROCESS_CPU_USAGE) + .with_description("The percentage of CPU in use.") + .with_unit("percent") + .build(); + let cpu_utilization = meter + .f64_gauge(PROCESS_CPU_UTILIZATION) + .with_description("The amount of CPU in use.") + .with_unit("percent") + .build(); + let memory_usage = meter + .i64_gauge(PROCESS_MEMORY_USAGE) + .with_description("The amount of physical memory in use.") + .with_unit("byte") + .build(); + let memory_virtual = meter + .i64_gauge(PROCESS_MEMORY_VIRTUAL) + .with_description("The amount of committed virtual memory.") + .with_unit("byte") + .build(); + let disk_io = meter + .i64_gauge(PROCESS_DISK_IO) + .with_description("Disk bytes transferred.") + .with_unit("byte") + .build(); + let network_io = meter + .i64_gauge(PROCESS_NETWORK_IO) + .with_description("Network bytes transferred.") + .with_unit("byte") + .build(); + let network_io_per_interface = meter + .i64_gauge(PROCESS_NETWORK_IO_PER_INTERFACE) + .with_description("Network bytes transferred (per interface).") + .with_unit("byte") + .build(); + + let process_status = meter + .i64_gauge(PROCESS_STATUS) + .with_description("Process status (0: Running, 1: Sleeping, 2: Zombie, etc.)") + .build(); + + #[cfg(feature = "gpu")] + let gpu_memory_usage = meter + .u64_gauge(PROCESS_GPU_MEMORY_USAGE) + .with_description("The amount of physical GPU memory in use.") + .with_unit("byte") + .build(); + + Metrics { + cpu_usage, + cpu_utilization, + memory_usage, + memory_virtual, + disk_io, + network_io, + network_io_per_interface, + process_status, + #[cfg(feature = "gpu")] + gpu_memory_usage, + } + } +} diff --git a/crates/obs/src/system/mod.rs b/crates/obs/src/system/mod.rs new file mode 100644 index 00000000..a69bc343 --- /dev/null +++ b/crates/obs/src/system/mod.rs @@ -0,0 +1,24 @@ +use crate::GlobalError; + +pub(crate) mod attributes; +mod collector; +pub(crate) mod gpu; +pub(crate) mod metrics; + +/// Initialize the indicator collector for the current process +/// This function will create a new `Collector` instance and start collecting metrics. +/// It will run indefinitely until the process is terminated. +pub async fn init_process_observer(meter: opentelemetry::metrics::Meter) -> Result<(), GlobalError> { + let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?; + let mut collector = collector::Collector::new(pid, meter, 30000)?; + collector.run().await +} + +/// Initialize the metric collector for the specified PID process +/// This function will create a new `Collector` instance and start collecting metrics. +/// It will run indefinitely until the process is terminated. +pub async fn init_process_observer_for_pid(meter: opentelemetry::metrics::Meter, pid: u32) -> Result<(), GlobalError> { + let pid = sysinfo::Pid::from_u32(pid); + let mut collector = collector::Collector::new(pid, meter, 30000)?; + collector.run().await +} diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index 46fdbc8d..9ffc265d 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -1,5 +1,6 @@ use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION, USE_STDOUT}; -use crate::{get_local_ip_with_default, OtelConfig}; +use crate::utils::get_local_ip_with_default; +use crate::OtelConfig; use opentelemetry::trace::TracerProvider; use opentelemetry::{global, KeyValue}; use opentelemetry_appender_tracing::layer; @@ -14,11 +15,10 @@ use opentelemetry_semantic_conventions::{ attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION}, SCHEMA_URL, }; -use prometheus::{Encoder, Registry, TextEncoder}; +use smallvec::SmallVec; +use std::borrow::Cow; use std::io::IsTerminal; -use std::sync::Arc; -use tokio::sync::{Mutex, OnceCell}; -use tracing::{info, warn}; +use tracing::info; use tracing_error::ErrorLayer; use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; @@ -67,66 +67,20 @@ impl Drop for OtelGuard { } } -/// Global registry for Prometheus metrics -static GLOBAL_REGISTRY: OnceCell>> = OnceCell::const_new(); - -/// Get the global registry instance -/// This function returns a reference to the global registry instance. -/// -/// # Returns -/// A reference to the global registry instance -/// -/// # Example -/// ``` -/// use rustfs_obs::get_global_registry; -/// -/// let registry = get_global_registry(); -/// ``` -pub fn get_global_registry() -> Arc> { - GLOBAL_REGISTRY.get().unwrap().clone() -} - -/// Prometheus metric endpoints -/// This function returns a string containing the Prometheus metrics. -/// The metrics are collected from the global registry. -/// The function is used to expose the metrics via an HTTP endpoint. -/// -/// # Returns -/// A string containing the Prometheus metrics -/// -/// # Example -/// ``` -/// use rustfs_obs::metrics; -/// -/// async fn main() { -/// let metrics = metrics().await; -/// println!("{}", metrics); -/// } -/// ``` -pub async fn metrics() -> String { - let encoder = TextEncoder::new(); - // Get a reference to the registry for reading metrics - let registry = get_global_registry().lock().await.to_owned(); - let metric_families = registry.gather(); - if metric_families.is_empty() { - warn!("No metrics available in Prometheus registry"); - } else { - info!("Metrics collected: {} families", metric_families.len()); - } - let mut buffer = Vec::new(); - encoder.encode(&metric_families, &mut buffer).unwrap(); - String::from_utf8(buffer).unwrap_or_else(|_| "Error encoding metrics".to_string()) -} - /// create OpenTelemetry Resource fn resource(config: &OtelConfig) -> Resource { - let config = config.clone(); Resource::builder() - .with_service_name(config.service_name.unwrap_or(SERVICE_NAME.to_string())) + .with_service_name(Cow::Borrowed(config.service_name.as_deref().unwrap_or(SERVICE_NAME)).to_string()) .with_schema_url( [ - KeyValue::new(OTEL_SERVICE_VERSION, config.service_version.unwrap_or(SERVICE_VERSION.to_string())), - KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.unwrap_or(ENVIRONMENT.to_string())), + KeyValue::new( + OTEL_SERVICE_VERSION, + Cow::Borrowed(config.service_version.as_deref().unwrap_or(SERVICE_VERSION)).to_string(), + ), + KeyValue::new( + DEPLOYMENT_ENVIRONMENT_NAME, + Cow::Borrowed(config.environment.as_deref().unwrap_or(ENVIRONMENT)).to_string(), + ), KeyValue::new(NETWORK_LOCAL_ADDRESS, get_local_ip_with_default()), ], SCHEMA_URL, @@ -141,167 +95,169 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader SdkMeterProvider { - let mut builder = MeterProviderBuilder::default().with_resource(resource(config)); - // If endpoint is empty, use stdout output - if config.endpoint.is_empty() { - builder = builder.with_reader(create_periodic_reader(config.meter_interval.unwrap_or(METER_INTERVAL))); - } else { - // If endpoint is not empty, use otlp output - let exporter = opentelemetry_otlp::MetricExporter::builder() - .with_tonic() - .with_endpoint(&config.endpoint) - .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) - .build() - .unwrap(); - builder = builder.with_reader( - PeriodicReader::builder(exporter) - .with_interval(std::time::Duration::from_secs(config.meter_interval.unwrap_or(METER_INTERVAL))) - .build(), - ); - // If use_stdout is true, output to stdout at the same time - if config.use_stdout.unwrap_or(USE_STDOUT) { - builder = builder.with_reader(create_periodic_reader(config.meter_interval.unwrap_or(METER_INTERVAL))); - } - } - let registry = Registry::new(); - // Set global registry - GLOBAL_REGISTRY.set(Arc::new(Mutex::new(registry.clone()))).unwrap(); - // Create Prometheus exporter - let prometheus_exporter = opentelemetry_prometheus::exporter().with_registry(registry).build().unwrap(); - // Build meter provider - let meter_provider = builder.with_reader(prometheus_exporter).build(); - global::set_meter_provider(meter_provider.clone()); - meter_provider -} - -/// Initialize Tracer Provider -fn init_tracer_provider(config: &OtelConfig) -> SdkTracerProvider { - let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO); - let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 { - Sampler::TraceIdRatioBased(sample_ratio) - } else { - Sampler::AlwaysOn - }; - let builder = SdkTracerProvider::builder() - .with_sampler(sampler) - .with_id_generator(RandomIdGenerator::default()) - .with_resource(resource(config)); - - let tracer_provider = if config.endpoint.is_empty() { - builder - .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) - .build() - } else { - let exporter = opentelemetry_otlp::SpanExporter::builder() - .with_tonic() - .with_endpoint(&config.endpoint) - .build() - .unwrap(); - if config.use_stdout.unwrap_or(USE_STDOUT) { - builder - .with_batch_exporter(exporter) - .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) - } else { - builder.with_batch_exporter(exporter) - } - .build() - }; - - global::set_tracer_provider(tracer_provider.clone()); - tracer_provider -} - /// Initialize Telemetry pub fn init_telemetry(config: &OtelConfig) -> OtelGuard { - let tracer_provider = init_tracer_provider(config); - let meter_provider = init_meter_provider(config); + // avoid repeated access to configuration fields + let endpoint = &config.endpoint; + let use_stdout = config.use_stdout.unwrap_or(USE_STDOUT); + let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL); + let logger_level = config.logger_level.as_deref().unwrap_or(LOGGER_LEVEL); + let service_name = config.service_name.as_deref().unwrap_or(SERVICE_NAME); - // Initialize logger provider based on configuration - let logger_provider = { - let mut builder = SdkLoggerProvider::builder().with_resource(resource(config)); + // Pre-create resource objects to avoid repeated construction + let res = resource(config); - if config.endpoint.is_empty() { - // Use stdout exporter when no endpoint is configured - builder = builder.with_simple_exporter(opentelemetry_stdout::LogExporter::default()); + // initialize tracer provider + let tracer_provider = { + let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO); + let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 { + Sampler::TraceIdRatioBased(sample_ratio) + } else { + Sampler::AlwaysOn + }; + + let builder = SdkTracerProvider::builder() + .with_sampler(sampler) + .with_id_generator(RandomIdGenerator::default()) + .with_resource(res.clone()); + + let tracer_provider = if endpoint.is_empty() { + builder + .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) + .build() + } else { + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .build() + .unwrap(); + + let builder = if use_stdout { + builder + .with_batch_exporter(exporter) + .with_batch_exporter(opentelemetry_stdout::SpanExporter::default()) + } else { + builder.with_batch_exporter(exporter) + }; + + builder.build() + }; + + global::set_tracer_provider(tracer_provider.clone()); + tracer_provider + }; + + // initialize meter provider + let meter_provider = { + let mut builder = MeterProviderBuilder::default().with_resource(res.clone()); + + if endpoint.is_empty() { + builder = builder.with_reader(create_periodic_reader(meter_interval)); + } else { + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) + .build() + .unwrap(); + + builder = builder.with_reader( + PeriodicReader::builder(exporter) + .with_interval(std::time::Duration::from_secs(meter_interval)) + .build(), + ); + + if use_stdout { + builder = builder.with_reader(create_periodic_reader(meter_interval)); + } + } + + let meter_provider = builder.build(); + global::set_meter_provider(meter_provider.clone()); + meter_provider + }; + + // initialize logger provider + let logger_provider = { + let mut builder = SdkLoggerProvider::builder().with_resource(res); + + if endpoint.is_empty() { + builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); } else { - // Configure OTLP exporter when endpoint is provided let exporter = opentelemetry_otlp::LogExporter::builder() .with_tonic() - .with_endpoint(&config.endpoint) + .with_endpoint(endpoint) .build() .unwrap(); builder = builder.with_batch_exporter(exporter); - // Add stdout exporter if requested - if config.use_stdout.unwrap_or(USE_STDOUT) { + if use_stdout { builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default()); } } builder.build() }; - let config = config.clone(); - let logger_level = config.logger_level.unwrap_or(LOGGER_LEVEL.to_string()); - let logger_level = logger_level.as_str(); - // Setup OpenTelemetryTracingBridge layer - let otel_layer = { - // Filter to prevent infinite telemetry loops - // This blocks events from OpenTelemetry and its dependent libraries (tonic, reqwest, etc.) - // from being sent back to OpenTelemetry itself - let filter_otel = match logger_level { - "trace" | "debug" => { - info!("OpenTelemetry tracing initialized with level: {}", logger_level); - let mut filter = EnvFilter::new(logger_level); - for directive in ["hyper", "tonic", "h2", "reqwest", "tower"] { - filter = filter.add_directive(format!("{}=off", directive).parse().unwrap()); + + // configuring tracing + { + // optimize filter configuration + let otel_layer = { + let filter_otel = match logger_level { + "trace" | "debug" => { + info!("OpenTelemetry tracing initialized with level: {}", logger_level); + EnvFilter::new(logger_level) } - filter - } - _ => { - let mut filter = EnvFilter::new(logger_level); - for directive in ["hyper", "tonic", "h2", "reqwest", "tower"] { - filter = filter.add_directive(format!("{}=off", directive).parse().unwrap()); + _ => { + let mut filter = EnvFilter::new(logger_level); + + // use smallvec to avoid heap allocation + let directives: SmallVec<[&str; 5]> = smallvec::smallvec!["hyper", "tonic", "h2", "reqwest", "tower"]; + + for directive in directives { + filter = filter.add_directive(format!("{}=off", directive).parse().unwrap()); + } + filter } - filter - } + }; + layer::OpenTelemetryTracingBridge::new(&logger_provider).with_filter(filter_otel) }; - layer::OpenTelemetryTracingBridge::new(&logger_provider).with_filter(filter_otel) - }; - let tracer = tracer_provider.tracer(config.service_name.unwrap_or(SERVICE_NAME.to_string())); - let registry = tracing_subscriber::registry() - .with(switch_level(logger_level)) - .with(OpenTelemetryLayer::new(tracer)) - .with(MetricsLayer::new(meter_provider.clone())) - .with(otel_layer) - .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(logger_level))); + let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string()); - // Configure formatting layer - let enable_color = std::io::stdout().is_terminal(); - let fmt_layer = tracing_subscriber::fmt::layer() - .with_ansi(enable_color) - .with_thread_names(true) - .with_file(true) - .with_line_number(true); + // Configure registry to avoid repeated calls to filter methods + let level_filter = switch_level(logger_level); + let registry = tracing_subscriber::registry() + .with(level_filter) + .with(OpenTelemetryLayer::new(tracer)) + .with(MetricsLayer::new(meter_provider.clone())) + .with(otel_layer) + .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(logger_level))); - // Creating a formatting layer with explicit type to avoid type mismatches - let fmt_layer = fmt_layer.with_filter( - EnvFilter::new(logger_level).add_directive( - format!("opentelemetry={}", if config.endpoint.is_empty() { logger_level } else { "off" }) - .parse() - .unwrap(), - ), - ); + // configure the formatting layer + let enable_color = std::io::stdout().is_terminal(); + let fmt_layer = tracing_subscriber::fmt::layer() + .with_ansi(enable_color) + .with_thread_names(true) + .with_file(true) + .with_line_number(true) + .with_filter( + EnvFilter::new(logger_level).add_directive( + format!("opentelemetry={}", if endpoint.is_empty() { logger_level } else { "off" }) + .parse() + .unwrap(), + ), + ); - registry.with(ErrorLayer::default()).with(fmt_layer).init(); - if !config.endpoint.is_empty() { - info!( - "OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {}", - config.endpoint, logger_level - ); + registry.with(ErrorLayer::default()).with(fmt_layer).init(); + + if !endpoint.is_empty() { + info!( + "OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {}", + endpoint, logger_level + ); + } } OtelGuard { @@ -313,12 +269,13 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard { /// Switch log level fn switch_level(logger_level: &str) -> tracing_subscriber::filter::LevelFilter { + use tracing_subscriber::filter::LevelFilter; match logger_level { - "error" => tracing_subscriber::filter::LevelFilter::ERROR, - "warn" => tracing_subscriber::filter::LevelFilter::WARN, - "info" => tracing_subscriber::filter::LevelFilter::INFO, - "debug" => tracing_subscriber::filter::LevelFilter::DEBUG, - "trace" => tracing_subscriber::filter::LevelFilter::TRACE, - _ => tracing_subscriber::filter::LevelFilter::OFF, + "error" => LevelFilter::ERROR, + "warn" => LevelFilter::WARN, + "info" => LevelFilter::INFO, + "debug" => LevelFilter::DEBUG, + "trace" => LevelFilter::TRACE, + _ => LevelFilter::OFF, } } diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 49ac8541..3019c060 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -46,6 +46,7 @@ local-ip-address = { workspace = true } matchit = { workspace = true } mime.workspace = true mime_guess = { workspace = true } +opentelemetry = { workspace = true } pin-project-lite.workspace = true protos.workspace = true query = { workspace = true } @@ -76,7 +77,7 @@ tokio-stream.workspace = true tonic = { workspace = true } tower.workspace = true transform-stream.workspace = true -tower-http = { workspace = true, features = ["trace", "compression-full", "cors"] } +tower-http = { workspace = true, features = ["trace", "compression-deflate", "compression-gzip", "cors"] } uuid = { workspace = true } [target.'cfg(target_os = "linux")'.dependencies] diff --git a/rustfs/src/console.rs b/rustfs/src/console.rs index 127d2144..1125b24d 100644 --- a/rustfs/src/console.rs +++ b/rustfs/src/console.rs @@ -271,7 +271,7 @@ pub async fn start_static_file_server( .route("/config.json", get(config_handler)) .fallback_service(get(static_handler)) .layer(cors) - .layer(tower_http::compression::CompressionLayer::new()) + .layer(tower_http::compression::CompressionLayer::new().gzip(true).deflate(true)) .layer(TraceLayer::new_for_http()); let local_addr: SocketAddr = addrs.parse().expect("Failed to parse socket address"); info!("WebUI: http://{}:{} http://127.0.0.1:{}", local_ip, local_addr.port(), local_addr.port()); diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index f86f951f..ba1579ab 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -14,6 +14,7 @@ use crate::console::{init_console_cfg, CONSOLE_CONFIG}; // Ensure the correct path for parse_license is imported use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT}; use crate::utils::error; +use bytes::Bytes; use chrono::Datelike; use clap::Parser; use common::{ @@ -37,6 +38,7 @@ use ecstore::{ }; use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys}; use grpc::make_server; +use http::{HeaderMap, Request as HttpRequest, Response}; use hyper_util::server::graceful::GracefulShutdown; use hyper_util::{ rt::{TokioExecutor, TokioIo}, @@ -47,17 +49,20 @@ use iam::init_iam_sys; use license::init_license; use protos::proto_gen::node_service::node_service_server::NodeServiceServer; use rustfs_event_notifier::NotifierConfig; -use rustfs_obs::{init_obs, load_config, set_global_guard, InitLogStatus}; +use rustfs_obs::{init_obs, init_process_observer, load_config, set_global_guard}; use rustls::ServerConfig; use s3s::{host::MultiDomain, service::S3ServiceBuilder}; use service::hybrid; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use tokio::net::TcpListener; use tokio::signal::unix::{signal, SignalKind}; use tokio_rustls::TlsAcceptor; use tonic::{metadata::MetadataValue, Request, Status}; use tower_http::cors::CorsLayer; +use tower_http::trace::TraceLayer; +use tracing::Span; use tracing::{debug, error, info, info_span, warn}; #[cfg(all(target_os = "linux", target_env = "gnu"))] @@ -103,9 +108,6 @@ async fn main() -> Result<()> { // Store in global storage set_global_guard(guard)?; - // Log initialization status - InitLogStatus::init_start_log(&config.observability).await?; - // Initialize event notifier let notifier_config = opt.clone().event_config; if notifier_config.is_some() { @@ -247,6 +249,20 @@ async fn run(opt: config::Opt) -> Result<()> { b.build() }; + tokio::spawn(async move { + // Record the PID-related metrics of the current process + let meter = opentelemetry::global::meter("system"); + let obs_result = init_process_observer(meter).await; + match obs_result { + Ok(_) => { + info!("Process observer initialized successfully"); + } + Err(e) => { + error!("Failed to initialize process observer: {}", e); + } + } + }); + let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth); let tls_path = opt.tls_path.clone().unwrap_or_default(); @@ -300,18 +316,53 @@ async fn run(opt: config::Opt) -> Result<()> { let mut sigint_inner = sigint_inner; let hybrid_service = TowerToHyperService::new( tower::ServiceBuilder::new() + .layer( + TraceLayer::new_for_http() + .make_span_with(|request: &HttpRequest<_>| { + let span = tracing::debug_span!("http-request", + status_code = tracing::field::Empty, + method = %request.method(), + uri = %request.uri(), + version = ?request.version(), + ); + for (header_name, header_value) in request.headers() { + if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length" + { + span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid")); + } + } + + span + }) + .on_request(|request: &HttpRequest<_>, _span: &Span| { + debug!("started method: {}, url path: {}", request.method(), request.uri().path()) + }) + .on_response(|response: &Response<_>, latency: Duration, _span: &Span| { + _span.record("status_code", tracing::field::display(response.status())); + debug!("response generated in {:?}", latency) + }) + .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| { + debug!("sending {} bytes in {:?}", chunk.len(), latency) + }) + .on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| { + debug!("stream closed after {:?}", stream_duration) + }) + .on_failure(|_error, latency: Duration, _span: &Span| { + debug!("request error: {:?} in {:?}", _error, latency) + }), + ) .layer(CorsLayer::permissive()) .service(hybrid(s3_service, rpc_service)), ); - let http_server = ConnBuilder::new(TokioExecutor::new()); + let http_server = Arc::new(ConnBuilder::new(TokioExecutor::new())); let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c()); - let graceful = GracefulShutdown::new(); + let graceful = Arc::new(GracefulShutdown::new()); debug!("graceful initiated"); // 服务准备就绪 worker_state_manager.update(ServiceState::Ready); - + let value = hybrid_service.clone(); loop { debug!("waiting for SIGINT or SIGTERM has_tls_certs: {}", has_tls_certs); // Wait for a connection @@ -366,12 +417,17 @@ async fn run(opt: config::Opt) -> Result<()> { continue; } }; - let conn = http_server.serve_connection(TokioIo::new(tls_socket), hybrid_service.clone()); - let conn = graceful.watch(conn.into_owned()); + + let http_server_clone = http_server.clone(); + let value_clone = value.clone(); + let graceful_clone = graceful.clone(); + tokio::task::spawn_blocking(move || { tokio::runtime::Runtime::new() .expect("Failed to create runtime") .block_on(async move { + let conn = http_server_clone.serve_connection(TokioIo::new(tls_socket), value_clone); + let conn = graceful_clone.watch(conn); if let Err(err) = conn.await { error!("Https Connection error: {}", err); } @@ -380,9 +436,13 @@ async fn run(opt: config::Opt) -> Result<()> { debug!("TLS handshake success"); } else { debug!("Http handshake start"); - let conn = http_server.serve_connection(TokioIo::new(socket), hybrid_service.clone()); - let conn = graceful.watch(conn.into_owned()); + + let http_server_clone = http_server.clone(); + let value_clone = value.clone(); + let graceful_clone = graceful.clone(); tokio::spawn(async move { + let conn = http_server_clone.serve_connection(TokioIo::new(socket), value_clone); + let conn = graceful_clone.watch(conn); if let Err(err) = conn.await { error!("Http Connection error: {}", err); } @@ -391,12 +451,24 @@ async fn run(opt: config::Opt) -> Result<()> { } } worker_state_manager.update(ServiceState::Stopping); - tokio::select! { - () = graceful.shutdown() => { - debug!("Gracefully shutdown!"); - }, - () = tokio::time::sleep(std::time::Duration::from_secs(10)) => { - debug!("Waited 10 seconds for graceful shutdown, aborting..."); + match Arc::try_unwrap(graceful) { + Ok(g) => { + // Successfully obtaining unique ownership, you can call shutdown + tokio::select! { + () = g.shutdown() => { + debug!("Gracefully shutdown!"); + }, + () = tokio::time::sleep(Duration::from_secs(10)) => { + debug!("Waited 10 seconds for graceful shutdown, aborting..."); + } + } + } + Err(arc_graceful) => { + // There are other references that cannot be obtained for unique ownership + error!("Cannot perform graceful shutdown, other references exist err: {:?}", arc_graceful); + // In this case, we can only wait for the timeout + tokio::time::sleep(Duration::from_secs(10)).await; + debug!("Timeout reached, forcing shutdown"); } } worker_state_manager.update(ServiceState::Stopped); diff --git a/s3select/query/src/dispatcher/manager.rs b/s3select/query/src/dispatcher/manager.rs index 05f76343..80543ed6 100644 --- a/s3select/query/src/dispatcher/manager.rs +++ b/s3select/query/src/dispatcher/manager.rs @@ -166,7 +166,8 @@ impl SimpleQueryDispatcher { if let Some(delimiter) = csv.field_delimiter.as_ref() { file_format = file_format.with_delimiter(delimiter.as_bytes().first().copied().unwrap_or_default()); } - if csv.file_header_info.is_some() {} + // TODO waiting for processing @junxiang Mu + // if csv.file_header_info.is_some() {} match csv.file_header_info.as_ref() { Some(info) => { if *info == *NONE { diff --git a/scripts/run.sh b/scripts/run.sh index 2cbd115a..c31ce7ed 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -40,13 +40,13 @@ export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml" # 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值 export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317 -export RUSTFS__OBSERVABILITY__USE_STDOUT=true +export RUSTFS__OBSERVABILITY__USE_STDOUT=false export RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0 export RUSTFS__OBSERVABILITY__METER_INTERVAL=30 export RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs export RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0 export RUSTFS__OBSERVABILITY__ENVIRONMENT=develop -export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=info +export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=debug export RUSTFS__SINKS__FILE__ENABLED=true export RUSTFS__SINKS__FILE__PATH="./deploy/logs/rustfs.log" export RUSTFS__SINKS__WEBHOOK__ENABLED=false