feat: add TraceLayer for HTTP service and improve metrics (#361)

* improve code for opentelemetry and add system metrics

* feat: add TraceLayer for HTTP service and improve metrics

- Add TraceLayer to HTTP server for request tracing
- Implement system metrics for process monitoring
- Optimize init_telemetry method for better resource management
- Add graceful shutdown handling for telemetry components
- Fix GracefulShutdown ownership issues with Arc wrapper

* improve code for init_process_observer

* remove tomlfmt.toml

* Translation comment

* improve code for console CompressionLayer params
This commit is contained in:
houseme
2025-04-24 19:04:57 +08:00
committed by GitHub
parent a095a607c0
commit 86353d98d5
21 changed files with 900 additions and 407 deletions

153
Cargo.lock generated
View File

@@ -525,7 +525,6 @@ version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06575e6a9673580f52661c92107baabffbf41e2141373441cbcdc47cb733003c"
dependencies = [
"brotli",
"bzip2",
"flate2",
"futures-core",
@@ -5251,6 +5250,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "ntapi"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4"
dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@@ -5413,6 +5421,29 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3c00a0c9600379bd32f8972de90676a7672cba3bf4886986bc05902afc1e093"
[[package]]
name = "nvml-wrapper"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c9bff0aa1d48904a1385ea2a8b97576fbdcbc9a3cfccd0d31fe978e1c4038c5"
dependencies = [
"bitflags 2.9.0",
"libloading 0.8.6",
"nvml-wrapper-sys",
"static_assertions",
"thiserror 1.0.69",
"wrapcenum-derive",
]
[[package]]
name = "nvml-wrapper-sys"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "698d45156f28781a4e79652b6ebe2eaa0589057d588d3aec1333f6466f13fcb5"
dependencies = [
"libloading 0.8.6",
]
[[package]]
name = "objc"
version = "0.2.7"
@@ -5716,19 +5747,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "opentelemetry-prometheus"
version = "0.29.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "098a71a4430bb712be6130ed777335d2e5b19bc8566de5f2edddfce906def6ab"
dependencies = [
"once_cell",
"opentelemetry",
"opentelemetry_sdk",
"prometheus",
"tracing",
]
[[package]]
name = "opentelemetry-proto"
version = "0.29.0"
@@ -6487,21 +6505,6 @@ dependencies = [
"version_check",
]
[[package]]
name = "prometheus"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot 0.12.3",
"protobuf",
"thiserror 2.0.12",
]
[[package]]
name = "prost"
version = "0.13.5"
@@ -7248,6 +7251,7 @@ dependencies = [
"mime",
"mime_guess",
"netif",
"opentelemetry",
"pin-project-lite",
"policy",
"prost-build",
@@ -7332,18 +7336,19 @@ dependencies = [
"chrono",
"config",
"local-ip-address",
"nvml-wrapper",
"opentelemetry",
"opentelemetry-appender-tracing",
"opentelemetry-otlp",
"opentelemetry-prometheus",
"opentelemetry-semantic-conventions",
"opentelemetry-stdout",
"opentelemetry_sdk",
"prometheus",
"rdkafka",
"reqwest",
"serde",
"serde_json",
"smallvec",
"sysinfo",
"thiserror 2.0.12",
"tokio",
"tracing",
@@ -8274,6 +8279,19 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "sysinfo"
version = "0.34.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4b93974b3d3aeaa036504b8eefd4c039dced109171c1ae973f1dc63b2c7e4b2"
dependencies = [
"libc",
"memchr",
"ntapi",
"objc2-core-foundation",
"windows 0.57.0",
]
[[package]]
name = "system-configuration"
version = "0.6.1"
@@ -8342,7 +8360,7 @@ dependencies = [
"tao-macros",
"unicode-segmentation",
"url",
"windows",
"windows 0.58.0",
"windows-core 0.58.0",
"windows-version",
"x11-dl",
@@ -9483,7 +9501,7 @@ checksum = "6f61ff3d9d0ee4efcb461b14eb3acfda2702d10dc329f339303fc3e57215ae2c"
dependencies = [
"webview2-com-macros",
"webview2-com-sys",
"windows",
"windows 0.58.0",
"windows-core 0.58.0",
"windows-implement 0.58.0",
"windows-interface 0.58.0",
@@ -9507,7 +9525,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3a3e2eeb58f82361c93f9777014668eb3d07e7d174ee4c819575a9208011886"
dependencies = [
"thiserror 1.0.69",
"windows",
"windows 0.58.0",
"windows-core 0.58.0",
]
@@ -9554,6 +9572,16 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows"
version = "0.57.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143"
dependencies = [
"windows-core 0.57.0",
"windows-targets 0.52.6",
]
[[package]]
name = "windows"
version = "0.58.0"
@@ -9564,6 +9592,18 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-core"
version = "0.57.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d"
dependencies = [
"windows-implement 0.57.0",
"windows-interface 0.57.0",
"windows-result 0.1.2",
"windows-targets 0.52.6",
]
[[package]]
name = "windows-core"
version = "0.58.0"
@@ -9590,6 +9630,17 @@ dependencies = [
"windows-strings 0.4.0",
]
[[package]]
name = "windows-implement"
version = "0.57.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "windows-implement"
version = "0.58.0"
@@ -9612,6 +9663,17 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "windows-interface"
version = "0.57.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "windows-interface"
version = "0.58.0"
@@ -9651,6 +9713,15 @@ dependencies = [
"windows-targets 0.53.0",
]
[[package]]
name = "windows-result"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8"
dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-result"
version = "0.2.0"
@@ -10020,6 +10091,18 @@ dependencies = [
"tracing",
]
[[package]]
name = "wrapcenum-derive"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a76ff259533532054cfbaefb115c613203c73707017459206380f03b3b3f266e"
dependencies = [
"darling",
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "write16"
version = "1.0.0"
@@ -10066,7 +10149,7 @@ dependencies = [
"webkit2gtk",
"webkit2gtk-sys",
"webview2-com",
"windows",
"windows 0.58.0",
"windows-core 0.58.0",
"windows-version",
"x11-dl",

View File

@@ -1,21 +1,21 @@
[workspace]
members = [
"madmin", # Management dashboard and admin API interface
"rustfs", # Core file system implementation
"ecstore", # Erasure coding storage implementation
"e2e_test", # End-to-end test suite
"common/common", # Shared utilities and data structures
"common/lock", # Distributed locking implementation
"common/protos", # Protocol buffer definitions
"common/workers", # Worker thread pools and task scheduling
"iam", # Identity and Access Management
"crypto", # Cryptography and security features
"cli/rustfs-gui", # Graphical user interface client
"crates/obs", # Observability utilities
"madmin", # Management dashboard and admin API interface
"rustfs", # Core file system implementation
"ecstore", # Erasure coding storage implementation
"e2e_test", # End-to-end test suite
"common/common", # Shared utilities and data structures
"common/lock", # Distributed locking implementation
"common/protos", # Protocol buffer definitions
"common/workers", # Worker thread pools and task scheduling
"iam", # Identity and Access Management
"crypto", # Cryptography and security features
"cli/rustfs-gui", # Graphical user interface client
"crates/obs", # Observability utilities
"crates/event-notifier", # Event notification system
"s3select/api", # S3 Select API interface
"s3select/query", # S3 Select query engine
"appauth", # Application authentication and authorization
"s3select/api", # S3 Select API interface
"s3select/query", # S3 Select query engine
"appauth", # Application authentication and authorization
]
resolver = "2"
@@ -93,6 +93,7 @@ md-5 = "0.10.6"
mime = "0.3.17"
mime_guess = "2.0.5"
netif = "0.1.6"
nvml-wrapper = "0.10.0"
object_store = "0.11.2"
opentelemetry = { version = "0.29.1" }
opentelemetry-appender-tracing = { version = "0.29.1", features = [
@@ -102,10 +103,7 @@ opentelemetry-appender-tracing = { version = "0.29.1", features = [
opentelemetry_sdk = { version = "0.29.0" }
opentelemetry-stdout = { version = "0.29.0" }
opentelemetry-otlp = { version = "0.29.0" }
opentelemetry-prometheus = { version = "0.29.1" }
opentelemetry-semantic-conventions = { version = "0.29.0", features = [
"semconv_experimental",
] }
opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] }
parking_lot = "0.12.3"
pin-project-lite = "0.2.16"
prometheus = "0.14.0"
@@ -143,10 +141,11 @@ serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
serde_urlencoded = "0.7.1"
serde_with = "3.12.0"
smallvec = { version = "1.15.0", features = ["serde"] }
strum = { version = "0.27.1", features = ["derive"] }
sha2 = "0.10.8"
smallvec = { version = "1.15.0", features = ["serde"] }
snafu = "0.8.5"
strum = { version = "0.27.1", features = ["derive"] }
sysinfo = "0.34.2"
tempfile = "3.19.1"
test-case = "3.3.1"
thiserror = "2.0.12"
@@ -192,7 +191,10 @@ inherits = "dev"
[profile.release]
opt-level = 3
lto = "thin"
lto = "fat"
codegen-units = 1
panic = "abort" # Optional, remove the panic expansion code
strip = true # strip symbol information to reduce binary size
[profile.production]
inherits = "release"

View File

@@ -149,7 +149,6 @@ impl NotifierConfig {
)
.build()
.unwrap_or_default();
println!("Loaded config: {:?}", app_config);
match app_config.try_deserialize::<NotifierConfig>() {
Ok(app_config) => {
println!("Parsed AppConfig: {:?} \n", app_config);

View File

@@ -11,32 +11,35 @@ workspace = true
[features]
default = ["file"]
file = []
gpu = ["dep:nvml-wrapper"]
kafka = ["dep:rdkafka"]
webhook = ["dep:reqwest"]
file = []
full = ["file", "gpu", "kafka", "webhook"]
[dependencies]
async-trait = { workspace = true }
chrono = { workspace = true }
config = { workspace = true }
nvml-wrapper = { workspace = true, optional = true }
opentelemetry = { workspace = true }
opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
opentelemetry-stdout = { workspace = true }
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "gzip-tonic"] }
opentelemetry-prometheus = { workspace = true }
opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] }
prometheus = { workspace = true }
serde = { workspace = true }
smallvec = { workspace = true, features = ["serde"] }
tracing = { workspace = true, features = ["std", "attributes"] }
tracing-core = { workspace = true }
tracing-error = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt", "env-filter", "tracing-log", "time", "local-time", "json"] }
tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread"] }
tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread", "rt", "time", "macros"] }
rdkafka = { workspace = true, features = ["tokio"], optional = true }
reqwest = { workspace = true, optional = true, default-features = false }
serde_json = { workspace = true }
sysinfo = { workspace = true }
thiserror = { workspace = true }
local-ip-address = { workspace = true }

View File

@@ -25,7 +25,7 @@ batch_timeout_ms = 1000 # Default is 100ms if not specified
[sinks.file]
enabled = true
path = "logs/app.log"
path = "deploy/logs/app.log"
batch_size = 100
batch_timeout_ms = 1000 # Default is 8192 bytes if not specified

View File

@@ -1,8 +1,8 @@
use opentelemetry::global;
use rustfs_obs::{get_logger, init_obs, load_config, log_info, BaseLogEntry, ServerLogEntry};
use rustfs_obs::{get_logger, init_obs, init_process_observer, load_config, log_info, BaseLogEntry, ServerLogEntry};
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use tracing::{info, instrument};
use tracing::{error, info, instrument};
use tracing_core::Level;
#[tokio::main]
@@ -38,6 +38,11 @@ async fn run(bucket: String, object: String, user: String, service_name: String)
&[opentelemetry::KeyValue::new("operation", "run")],
);
match init_process_observer(meter).await {
Ok(_) => info!("Process observer initialized successfully"),
Err(e) => error!("Failed to initialize process observer: {:?}", e),
}
let base_entry = BaseLogEntry::new()
.message(Some("run logger api_handler info".to_string()))
.request_id(Some("request_id".to_string()))

View File

@@ -178,7 +178,6 @@ pub struct AppConfig {
pub logger: Option<LoggerConfig>,
}
// 为 AppConfig 实现 Default
impl AppConfig {
pub fn new() -> Self {
Self {
@@ -189,6 +188,7 @@ impl AppConfig {
}
}
// implement default for AppConfig
impl Default for AppConfig {
fn default() -> Self {
Self::new()

View File

@@ -16,11 +16,27 @@ static GLOBAL_GUARD: OnceCell<Arc<Mutex<OtelGuard>>> = OnceCell::const_new();
/// Error type for global guard operations
#[derive(Debug, thiserror::Error)]
pub enum GuardError {
pub enum GlobalError {
#[error("Failed to set global guard: {0}")]
SetError(#[from] SetError<Arc<Mutex<OtelGuard>>>),
#[error("Global guard not initialized")]
NotInitialized,
#[error("Global system metrics err: {0}")]
MetricsError(String),
#[error("Failed to get current PID: {0}")]
PidError(String),
#[error("Process with PID {0} not found")]
ProcessNotFound(u32),
#[error("Failed to get physical core count")]
CoreCountError,
#[error("GPU initialization failed: {0}")]
GpuInitError(String),
#[error("GPU device not found: {0}")]
GpuDeviceError(String),
#[error("Failed to send log: {0}")]
SendFailed(&'static str),
#[error("Operation timed out: {0}")]
Timeout(&'static str),
}
/// Set the global guard for OpenTelemetry
@@ -43,9 +59,9 @@ pub enum GuardError {
/// Ok(())
/// }
/// ```
pub fn set_global_guard(guard: OtelGuard) -> Result<(), GuardError> {
pub fn set_global_guard(guard: OtelGuard) -> Result<(), GlobalError> {
info!("Initializing global OpenTelemetry guard");
GLOBAL_GUARD.set(Arc::new(Mutex::new(guard))).map_err(GuardError::SetError)
GLOBAL_GUARD.set(Arc::new(Mutex::new(guard))).map_err(GlobalError::SetError)
}
/// Get the global guard for OpenTelemetry
@@ -65,8 +81,8 @@ pub fn set_global_guard(guard: OtelGuard) -> Result<(), GuardError> {
/// Ok(())
/// }
/// ```
pub fn get_global_guard() -> Result<Arc<Mutex<OtelGuard>>, GuardError> {
GLOBAL_GUARD.get().cloned().ok_or(GuardError::NotInitialized)
pub fn get_global_guard() -> Result<Arc<Mutex<OtelGuard>>, GlobalError> {
GLOBAL_GUARD.get().cloned().ok_or(GlobalError::NotInitialized)
}
/// Try to get the global guard for OpenTelemetry
@@ -84,6 +100,6 @@ mod tests {
#[tokio::test]
async fn test_get_uninitialized_guard() {
let result = get_global_guard();
assert!(matches!(result, Err(GuardError::NotInitialized)));
assert!(matches!(result, Err(GlobalError::NotInitialized)));
}
}

View File

@@ -1,23 +1,24 @@
/// # obs
///
/// `obs` is a logging and observability library for Rust.
/// It provides a simple and easy-to-use interface for logging and observability.
/// It is built on top of the `log` crate and `opentelemetry` crate.
///
/// ## Features
/// - Structured logging
/// - Distributed tracing
/// - Metrics collection
/// - Log processing worker
/// - Multiple sinks
/// - Configuration-based setup
/// - Telemetry guard
/// - Global logger
/// - Log levels
/// - Log entry types
/// - Log record
/// - Object version
/// - Local IP address
//! # RustFS Observability
//!
//! provides tools for system and service monitoring
//!
//! ## feature mark
//!
//! - `file`: enable file logging enabled by default
//! - `gpu`: gpu monitoring function
//! - `kafka`: enable kafka metric output
//! - `webhook`: enable webhook notifications
//! - `full`: includes all functions
//!
//! to enable gpu monitoring add in cargo toml
//!
//! ```toml
//! # using gpu monitoring
//! rustfs-obs = { version = "0.1.0", features = ["gpu"] }
//!
//! # use all functions
//! rustfs-obs = { version = "0.1.0", features = ["full"] }
//! ```
///
/// ## Usage
///
@@ -32,28 +33,34 @@ mod entry;
mod global;
mod logger;
mod sink;
mod system;
mod telemetry;
mod utils;
mod worker;
use crate::logger::InitLogStatus;
pub use config::load_config;
pub use config::{AppConfig, OtelConfig};
#[cfg(feature = "file")]
pub use config::FileSinkConfig;
#[cfg(feature = "kafka")]
pub use config::KafkaSinkConfig;
#[cfg(feature = "webhook")]
pub use config::WebhookSinkConfig;
pub use config::{AppConfig, LoggerConfig, OtelConfig, SinkConfig};
pub use entry::args::Args;
pub use entry::audit::{ApiDetails, AuditLogEntry};
pub use entry::base::BaseLogEntry;
pub use entry::unified::{ConsoleLogEntry, ServerLogEntry, UnifiedLogEntry};
pub use entry::{LogKind, LogRecord, ObjectVersion, SerializableLevel};
pub use global::{get_global_guard, set_global_guard, try_get_global_guard, GuardError};
pub use logger::{ensure_logger_initialized, log_debug, log_error, log_info, log_trace, log_warn, log_with_context};
pub use logger::{get_global_logger, init_global_logger, locked_logger, start_logger};
pub use logger::{log_init_state, InitLogStatus};
pub use logger::{LogError, Logger};
pub use sink::Sink;
pub use global::{get_global_guard, set_global_guard, try_get_global_guard, GlobalError};
pub use logger::Logger;
pub use logger::{get_global_logger, init_global_logger, start_logger};
pub use logger::{log_debug, log_error, log_info, log_trace, log_warn, log_with_context};
use std::sync::Arc;
pub use system::{init_process_observer, init_process_observer_for_pid};
pub use telemetry::init_telemetry;
pub use telemetry::{get_global_registry, metrics};
use tokio::sync::Mutex;
pub use utils::{get_local_ip, get_local_ip_with_default};
pub use worker::start_worker;
use tracing::{error, info};
/// Initialize the observability module
///
@@ -74,6 +81,19 @@ pub async fn init_obs(config: AppConfig) -> (Arc<Mutex<Logger>>, telemetry::Otel
let guard = init_telemetry(&config.observability);
let sinks = sink::create_sinks(&config).await;
let logger = init_global_logger(&config, sinks).await;
let obs_config = config.observability.clone();
tokio::spawn(async move {
let result = InitLogStatus::init_start_log(&obs_config).await;
match result {
Ok(_) => {
info!("Logger initialized successfully");
}
Err(e) => {
error!("Failed to initialize logger: {}", e);
}
}
});
(logger, guard)
}

View File

@@ -1,5 +1,6 @@
use crate::global::{ENVIRONMENT, SERVICE_NAME, SERVICE_VERSION};
use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, OtelConfig, ServerLogEntry, Sink, UnifiedLogEntry};
use crate::sink::Sink;
use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, GlobalError, OtelConfig, ServerLogEntry, UnifiedLogEntry};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::mpsc::{self, Receiver, Sender};
@@ -43,26 +44,26 @@ impl Logger {
/// Log a server entry
#[tracing::instrument(skip(self), fields(log_source = "logger_server"))]
pub async fn log_server_entry(&self, entry: ServerLogEntry) -> Result<(), LogError> {
pub async fn log_server_entry(&self, entry: ServerLogEntry) -> Result<(), GlobalError> {
self.log_entry(UnifiedLogEntry::Server(entry)).await
}
/// Log an audit entry
#[tracing::instrument(skip(self), fields(log_source = "logger_audit"))]
pub async fn log_audit_entry(&self, entry: AuditLogEntry) -> Result<(), LogError> {
pub async fn log_audit_entry(&self, entry: AuditLogEntry) -> Result<(), GlobalError> {
self.log_entry(UnifiedLogEntry::Audit(Box::new(entry))).await
}
/// Log a console entry
#[tracing::instrument(skip(self), fields(log_source = "logger_console"))]
pub async fn log_console_entry(&self, entry: ConsoleLogEntry) -> Result<(), LogError> {
pub async fn log_console_entry(&self, entry: ConsoleLogEntry) -> Result<(), GlobalError> {
self.log_entry(UnifiedLogEntry::Console(entry)).await
}
/// Asynchronous logging of unified log entries
#[tracing::instrument(skip(self), fields(log_source = "logger"))]
#[tracing::instrument(level = "error", skip_all)]
pub async fn log_entry(&self, entry: UnifiedLogEntry) -> Result<(), LogError> {
pub async fn log_entry(&self, entry: UnifiedLogEntry) -> Result<(), GlobalError> {
// Extract information for tracing based on entry type
match &entry {
UnifiedLogEntry::Server(server) => {
@@ -123,11 +124,11 @@ impl Logger {
tracing::warn!("Log queue full, applying backpressure");
match tokio::time::timeout(std::time::Duration::from_millis(500), self.sender.send(entry)).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(_)) => Err(LogError::SendFailed("Channel closed")),
Err(_) => Err(LogError::Timeout("Queue backpressure timeout")),
Ok(Err(_)) => Err(GlobalError::SendFailed("Channel closed")),
Err(_) => Err(GlobalError::Timeout("Queue backpressure timeout")),
}
}
Err(mpsc::error::TrySendError::Closed(_)) => Err(LogError::SendFailed("Logger channel closed")),
Err(mpsc::error::TrySendError::Closed(_)) => Err(GlobalError::SendFailed("Logger channel closed")),
}
}
@@ -160,7 +161,7 @@ impl Logger {
request_id: Option<String>,
user_id: Option<String>,
fields: Vec<(String, String)>,
) -> Result<(), LogError> {
) -> Result<(), GlobalError> {
let base = BaseLogEntry::new().message(Some(message.to_string())).request_id(request_id);
let server_entry = ServerLogEntry::new(level, source.to_string())
@@ -190,7 +191,7 @@ impl Logger {
/// let _ = logger.write("This is an information message", "example", Level::INFO).await;
/// }
/// ```
pub async fn write(&self, message: &str, source: &str, level: Level) -> Result<(), LogError> {
pub async fn write(&self, message: &str, source: &str, level: Level) -> Result<(), GlobalError> {
self.write_with_context(message, source, level, None, None, Vec::new()).await
}
@@ -208,31 +209,12 @@ impl Logger {
/// let _ = logger.shutdown().await;
/// }
/// ```
pub async fn shutdown(self) -> Result<(), LogError> {
pub async fn shutdown(self) -> Result<(), GlobalError> {
drop(self.sender); //Close the sending end so that the receiver knows that there is no new message
Ok(())
}
}
/// Log error type
/// This enum defines the error types that can occur when logging.
/// It is used to provide more detailed error information.
/// # Example
/// ```
/// use rustfs_obs::LogError;
/// use thiserror::Error;
///
/// LogError::SendFailed("Failed to send log");
/// LogError::Timeout("Operation timed out");
/// ```
#[derive(Debug, thiserror::Error)]
pub enum LogError {
#[error("Failed to send log: {0}")]
SendFailed(&'static str),
#[error("Operation timed out: {0}")]
Timeout(&'static str),
}
/// Start the log module
/// This function starts the log module.
/// It initializes the logger and starts the worker to process logs.
@@ -297,48 +279,6 @@ pub fn get_global_logger() -> &'static Arc<Mutex<Logger>> {
GLOBAL_LOGGER.get().expect("Logger not initialized")
}
/// Get the global logger instance with a lock
/// This function returns a reference to the global logger instance with a lock.
/// It is used to ensure that the logger is thread-safe.
///
/// # Returns
/// A reference to the global logger instance with a lock
///
/// # Example
/// ```
/// use rustfs_obs::locked_logger;
///
/// async fn example() {
/// let logger = locked_logger().await;
/// }
/// ```
pub async fn locked_logger() -> tokio::sync::MutexGuard<'static, Logger> {
get_global_logger().lock().await
}
/// Initialize with default empty logger if needed (optional)
/// This function initializes the logger with a default empty logger if needed.
/// It is used to ensure that the logger is initialized before logging.
///
/// # Returns
/// A reference to the global logger instance
///
/// # Example
/// ```
/// use rustfs_obs::ensure_logger_initialized;
///
/// let logger = ensure_logger_initialized();
/// ```
pub fn ensure_logger_initialized() -> &'static Arc<Mutex<Logger>> {
if GLOBAL_LOGGER.get().is_none() {
let config = AppConfig::default();
let sinks = vec![];
let logger = Arc::new(Mutex::new(start_logger(&config, sinks)));
let _ = GLOBAL_LOGGER.set(logger);
}
GLOBAL_LOGGER.get().unwrap()
}
/// Log information
/// This function logs information messages.
///
@@ -357,7 +297,7 @@ pub fn ensure_logger_initialized() -> &'static Arc<Mutex<Logger>> {
/// let _ = log_info("This is an information message", "example").await;
/// }
/// ```
pub async fn log_info(message: &str, source: &str) -> Result<(), LogError> {
pub async fn log_info(message: &str, source: &str) -> Result<(), GlobalError> {
get_global_logger().lock().await.write(message, source, Level::INFO).await
}
@@ -375,7 +315,7 @@ pub async fn log_info(message: &str, source: &str) -> Result<(), LogError> {
/// async fn example() {
/// let _ = log_error("This is an error message", "example").await;
/// }
pub async fn log_error(message: &str, source: &str) -> Result<(), LogError> {
pub async fn log_error(message: &str, source: &str) -> Result<(), GlobalError> {
get_global_logger().lock().await.write(message, source, Level::ERROR).await
}
@@ -395,7 +335,7 @@ pub async fn log_error(message: &str, source: &str) -> Result<(), LogError> {
/// let _ = log_warn("This is a warning message", "example").await;
/// }
/// ```
pub async fn log_warn(message: &str, source: &str) -> Result<(), LogError> {
pub async fn log_warn(message: &str, source: &str) -> Result<(), GlobalError> {
get_global_logger().lock().await.write(message, source, Level::WARN).await
}
@@ -415,7 +355,7 @@ pub async fn log_warn(message: &str, source: &str) -> Result<(), LogError> {
/// let _ = log_debug("This is a debug message", "example").await;
/// }
/// ```
pub async fn log_debug(message: &str, source: &str) -> Result<(), LogError> {
pub async fn log_debug(message: &str, source: &str) -> Result<(), GlobalError> {
get_global_logger().lock().await.write(message, source, Level::DEBUG).await
}
@@ -436,7 +376,7 @@ pub async fn log_debug(message: &str, source: &str) -> Result<(), LogError> {
/// let _ = log_trace("This is a trace message", "example").await;
/// }
/// ```
pub async fn log_trace(message: &str, source: &str) -> Result<(), LogError> {
pub async fn log_trace(message: &str, source: &str) -> Result<(), GlobalError> {
get_global_logger().lock().await.write(message, source, Level::TRACE).await
}
@@ -467,7 +407,7 @@ pub async fn log_with_context(
request_id: Option<String>,
user_id: Option<String>,
fields: Vec<(String, String)>,
) -> Result<(), LogError> {
) -> Result<(), GlobalError> {
get_global_logger()
.lock()
.await
@@ -477,7 +417,7 @@ pub async fn log_with_context(
/// Log initialization status
#[derive(Debug)]
pub struct InitLogStatus {
pub(crate) struct InitLogStatus {
pub timestamp: SystemTime,
pub service_name: String,
pub version: String,
@@ -508,14 +448,14 @@ impl InitLogStatus {
}
}
pub async fn init_start_log(config: &OtelConfig) -> Result<(), LogError> {
pub async fn init_start_log(config: &OtelConfig) -> Result<(), GlobalError> {
let status = Self::new_config(config);
log_init_state(Some(status)).await
}
}
/// Log initialization details during system startup
pub async fn log_init_state(status: Option<InitLogStatus>) -> Result<(), LogError> {
async fn log_init_state(status: Option<InitLogStatus>) -> Result<(), GlobalError> {
let status = status.unwrap_or_default();
let base_entry = BaseLogEntry::new()

View File

@@ -0,0 +1,44 @@
use crate::GlobalError;
use opentelemetry::KeyValue;
use sysinfo::{Pid, System};
pub const PROCESS_PID: opentelemetry::Key = opentelemetry::Key::from_static_str("process.pid");
pub const PROCESS_EXECUTABLE_NAME: opentelemetry::Key = opentelemetry::Key::from_static_str("process.executable.name");
pub const PROCESS_EXECUTABLE_PATH: opentelemetry::Key = opentelemetry::Key::from_static_str("process.executable.path");
pub const PROCESS_COMMAND: opentelemetry::Key = opentelemetry::Key::from_static_str("process.command");
/// Struct to hold process attributes
pub struct ProcessAttributes {
pub attributes: Vec<KeyValue>,
}
impl ProcessAttributes {
/// Creates a new instance of `ProcessAttributes` for the given PID.
pub fn new(pid: Pid, system: &mut System) -> Result<Self, GlobalError> {
system.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true);
let process = system
.process(pid)
.ok_or_else(|| GlobalError::ProcessNotFound(pid.as_u32()))?;
let attributes = vec![
KeyValue::new(PROCESS_PID, pid.as_u32() as i64),
KeyValue::new(PROCESS_EXECUTABLE_NAME, process.name().to_os_string().into_string().unwrap_or_default()),
KeyValue::new(
PROCESS_EXECUTABLE_PATH,
process
.exe()
.map(|path| path.to_string_lossy().into_owned())
.unwrap_or_default(),
),
KeyValue::new(
PROCESS_COMMAND,
process
.cmd()
.iter()
.fold(String::new(), |t1, t2| t1 + " " + t2.to_str().unwrap_or_default()),
),
];
Ok(ProcessAttributes { attributes })
}
}

View File

@@ -0,0 +1,156 @@
use crate::system::attributes::ProcessAttributes;
use crate::system::gpu::GpuCollector;
use crate::system::metrics::{Metrics, DIRECTION, INTERFACE, STATUS};
use crate::GlobalError;
use opentelemetry::KeyValue;
use std::time::SystemTime;
use sysinfo::{Networks, Pid, ProcessStatus, System};
use tokio::time::{sleep, Duration};
/// Collector is responsible for collecting system metrics and attributes.
/// It uses the sysinfo crate to gather information about the system and processes.
/// It also uses OpenTelemetry to record metrics.
pub struct Collector {
metrics: Metrics,
attributes: ProcessAttributes,
gpu_collector: GpuCollector,
pid: Pid,
system: System,
networks: Networks,
core_count: usize,
interval_ms: u64,
}
impl Collector {
pub fn new(pid: Pid, meter: opentelemetry::metrics::Meter, interval_ms: u64) -> Result<Self, GlobalError> {
let mut system = System::new_all();
let attributes = ProcessAttributes::new(pid, &mut system)?;
let core_count = System::physical_core_count().ok_or(GlobalError::CoreCountError)?;
let metrics = Metrics::new(&meter);
let gpu_collector = GpuCollector::new(pid)?;
let networks = Networks::new_with_refreshed_list();
Ok(Collector {
metrics,
attributes,
gpu_collector,
pid,
system,
networks,
core_count,
interval_ms,
})
}
pub async fn run(&mut self) -> Result<(), GlobalError> {
loop {
self.collect()?;
tracing::debug!("Collected metrics for PID: {} ,time: {:?}", self.pid, SystemTime::now());
sleep(Duration::from_millis(self.interval_ms)).await;
}
}
fn collect(&mut self) -> Result<(), GlobalError> {
self.system
.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[self.pid]), true);
// refresh the network interface list and statistics
self.networks.refresh(false);
let process = self
.system
.process(self.pid)
.ok_or_else(|| GlobalError::ProcessNotFound(self.pid.as_u32()))?;
// CPU metrics
let cpu_usage = process.cpu_usage();
self.metrics.cpu_usage.record(cpu_usage as f64, &[]);
self.metrics
.cpu_utilization
.record((cpu_usage / self.core_count as f32) as f64, &self.attributes.attributes);
// Memory metrics
self.metrics
.memory_usage
.record(process.memory() as i64, &self.attributes.attributes);
self.metrics
.memory_virtual
.record(process.virtual_memory() as i64, &self.attributes.attributes);
// Disk I/O metrics
let disk_io = process.disk_usage();
self.metrics.disk_io.record(
disk_io.read_bytes as i64,
&[&self.attributes.attributes[..], &[KeyValue::new(DIRECTION, "read")]].concat(),
);
self.metrics.disk_io.record(
disk_io.written_bytes as i64,
&[&self.attributes.attributes[..], &[KeyValue::new(DIRECTION, "write")]].concat(),
);
// Network I/O indicators (corresponding to /system/network/internode)
let mut total_received: i64 = 0;
let mut total_transmitted: i64 = 0;
// statistics by interface
for (interface_name, data) in self.networks.iter() {
total_received += data.total_received() as i64;
total_transmitted += data.total_transmitted() as i64;
let received = data.received() as i64;
let transmitted = data.transmitted() as i64;
self.metrics.network_io_per_interface.record(
received,
&[
&self.attributes.attributes[..],
&[
KeyValue::new(INTERFACE, interface_name.to_string()),
KeyValue::new(DIRECTION, "received"),
],
]
.concat(),
);
self.metrics.network_io_per_interface.record(
transmitted,
&[
&self.attributes.attributes[..],
&[
KeyValue::new(INTERFACE, interface_name.to_string()),
KeyValue::new(DIRECTION, "transmitted"),
],
]
.concat(),
);
}
// global statistics
self.metrics.network_io.record(
total_received,
&[&self.attributes.attributes[..], &[KeyValue::new(DIRECTION, "received")]].concat(),
);
self.metrics.network_io.record(
total_transmitted,
&[&self.attributes.attributes[..], &[KeyValue::new(DIRECTION, "transmitted")]].concat(),
);
// Process status indicator (corresponding to /system/process)
let status_value = match process.status() {
ProcessStatus::Run => 0,
ProcessStatus::Sleep => 1,
ProcessStatus::Zombie => 2,
_ => 3, // other status
};
self.metrics.process_status.record(
status_value,
&[
&self.attributes.attributes[..],
&[KeyValue::new(STATUS, format!("{:?}", process.status()))],
]
.concat(),
);
// GPU Metrics (Optional) Non-MacOS
self.gpu_collector.collect(&self.metrics, &self.attributes)?;
Ok(())
}
}

View File

@@ -0,0 +1,70 @@
#[cfg(feature = "gpu")]
use crate::system::attributes::ProcessAttributes;
#[cfg(feature = "gpu")]
use crate::system::metrics::Metrics;
#[cfg(feature = "gpu")]
use crate::GlobalError;
#[cfg(feature = "gpu")]
use nvml_wrapper::enums::device::UsedGpuMemory;
#[cfg(feature = "gpu")]
use nvml_wrapper::Nvml;
#[cfg(feature = "gpu")]
use sysinfo::Pid;
#[cfg(feature = "gpu")]
use tracing::warn;
/// `GpuCollector` is responsible for collecting GPU memory usage metrics.
#[cfg(feature = "gpu")]
pub struct GpuCollector {
nvml: Nvml,
pid: Pid,
}
#[cfg(feature = "gpu")]
impl GpuCollector {
pub fn new(pid: Pid) -> Result<Self, GlobalError> {
let nvml = Nvml::init().map_err(|e| GlobalError::GpuInitError(e.to_string()))?;
Ok(GpuCollector { nvml, pid })
}
pub fn collect(&self, metrics: &Metrics, attributes: &ProcessAttributes) -> Result<(), GlobalError> {
if let Ok(device) = self.nvml.device_by_index(0) {
if let Ok(gpu_stats) = device.running_compute_processes() {
for stat in gpu_stats.iter() {
if stat.pid == self.pid.as_u32() {
let memory_used = match stat.used_gpu_memory {
UsedGpuMemory::Used(bytes) => bytes,
UsedGpuMemory::Unavailable => 0,
};
metrics.gpu_memory_usage.record(memory_used, &attributes.attributes);
return Ok(());
}
}
} else {
warn!("Could not get GPU stats, recording 0 for GPU memory usage");
}
} else {
return Err(GlobalError::GpuDeviceError("No GPU device found".to_string()));
}
metrics.gpu_memory_usage.record(0, &attributes.attributes);
Ok(())
}
}
#[cfg(not(feature = "gpu"))]
pub struct GpuCollector;
#[cfg(not(feature = "gpu"))]
impl GpuCollector {
pub fn new(_pid: sysinfo::Pid) -> Result<Self, crate::GlobalError> {
Ok(GpuCollector)
}
pub fn collect(
&self,
_metrics: &crate::system::metrics::Metrics,
_attributes: &crate::system::attributes::ProcessAttributes,
) -> Result<(), crate::GlobalError> {
Ok(())
}
}

View File

@@ -0,0 +1,100 @@
pub const PROCESS_CPU_USAGE: &str = "process.cpu.usage";
pub const PROCESS_CPU_UTILIZATION: &str = "process.cpu.utilization";
pub const PROCESS_MEMORY_USAGE: &str = "process.memory.usage";
pub const PROCESS_MEMORY_VIRTUAL: &str = "process.memory.virtual";
pub const PROCESS_DISK_IO: &str = "process.disk.io";
pub const PROCESS_NETWORK_IO: &str = "process.network.io";
pub const PROCESS_NETWORK_IO_PER_INTERFACE: &str = "process.network.io.per_interface";
pub const PROCESS_STATUS: &str = "process.status";
#[cfg(feature = "gpu")]
pub const PROCESS_GPU_MEMORY_USAGE: &str = "process.gpu.memory.usage";
pub const DIRECTION: opentelemetry::Key = opentelemetry::Key::from_static_str("direction");
pub const STATUS: opentelemetry::Key = opentelemetry::Key::from_static_str("status");
pub const INTERFACE: opentelemetry::Key = opentelemetry::Key::from_static_str("interface");
/// `Metrics` struct holds the OpenTelemetry metrics for process monitoring.
/// It contains various metrics such as CPU usage, memory usage,
/// disk I/O, network I/O, and process status.
///
/// The `Metrics` struct is designed to be used with OpenTelemetry's
/// metrics API to record and export these metrics.
///
/// The `new` method initializes the metrics using the provided
/// `opentelemetry::metrics::Meter`.
pub struct Metrics {
pub cpu_usage: opentelemetry::metrics::Gauge<f64>,
pub cpu_utilization: opentelemetry::metrics::Gauge<f64>,
pub memory_usage: opentelemetry::metrics::Gauge<i64>,
pub memory_virtual: opentelemetry::metrics::Gauge<i64>,
pub disk_io: opentelemetry::metrics::Gauge<i64>,
pub network_io: opentelemetry::metrics::Gauge<i64>,
pub network_io_per_interface: opentelemetry::metrics::Gauge<i64>,
pub process_status: opentelemetry::metrics::Gauge<i64>,
#[cfg(feature = "gpu")]
pub gpu_memory_usage: opentelemetry::metrics::Gauge<u64>,
}
impl Metrics {
pub fn new(meter: &opentelemetry::metrics::Meter) -> Self {
let cpu_usage = meter
.f64_gauge(PROCESS_CPU_USAGE)
.with_description("The percentage of CPU in use.")
.with_unit("percent")
.build();
let cpu_utilization = meter
.f64_gauge(PROCESS_CPU_UTILIZATION)
.with_description("The amount of CPU in use.")
.with_unit("percent")
.build();
let memory_usage = meter
.i64_gauge(PROCESS_MEMORY_USAGE)
.with_description("The amount of physical memory in use.")
.with_unit("byte")
.build();
let memory_virtual = meter
.i64_gauge(PROCESS_MEMORY_VIRTUAL)
.with_description("The amount of committed virtual memory.")
.with_unit("byte")
.build();
let disk_io = meter
.i64_gauge(PROCESS_DISK_IO)
.with_description("Disk bytes transferred.")
.with_unit("byte")
.build();
let network_io = meter
.i64_gauge(PROCESS_NETWORK_IO)
.with_description("Network bytes transferred.")
.with_unit("byte")
.build();
let network_io_per_interface = meter
.i64_gauge(PROCESS_NETWORK_IO_PER_INTERFACE)
.with_description("Network bytes transferred (per interface).")
.with_unit("byte")
.build();
let process_status = meter
.i64_gauge(PROCESS_STATUS)
.with_description("Process status (0: Running, 1: Sleeping, 2: Zombie, etc.)")
.build();
#[cfg(feature = "gpu")]
let gpu_memory_usage = meter
.u64_gauge(PROCESS_GPU_MEMORY_USAGE)
.with_description("The amount of physical GPU memory in use.")
.with_unit("byte")
.build();
Metrics {
cpu_usage,
cpu_utilization,
memory_usage,
memory_virtual,
disk_io,
network_io,
network_io_per_interface,
process_status,
#[cfg(feature = "gpu")]
gpu_memory_usage,
}
}
}

View File

@@ -0,0 +1,24 @@
use crate::GlobalError;
pub(crate) mod attributes;
mod collector;
pub(crate) mod gpu;
pub(crate) mod metrics;
/// Initialize the indicator collector for the current process
/// This function will create a new `Collector` instance and start collecting metrics.
/// It will run indefinitely until the process is terminated.
pub async fn init_process_observer(meter: opentelemetry::metrics::Meter) -> Result<(), GlobalError> {
let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?;
let mut collector = collector::Collector::new(pid, meter, 30000)?;
collector.run().await
}
/// Initialize the metric collector for the specified PID process
/// This function will create a new `Collector` instance and start collecting metrics.
/// It will run indefinitely until the process is terminated.
pub async fn init_process_observer_for_pid(meter: opentelemetry::metrics::Meter, pid: u32) -> Result<(), GlobalError> {
let pid = sysinfo::Pid::from_u32(pid);
let mut collector = collector::Collector::new(pid, meter, 30000)?;
collector.run().await
}

View File

@@ -1,5 +1,6 @@
use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION, USE_STDOUT};
use crate::{get_local_ip_with_default, OtelConfig};
use crate::utils::get_local_ip_with_default;
use crate::OtelConfig;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry_appender_tracing::layer;
@@ -14,11 +15,10 @@ use opentelemetry_semantic_conventions::{
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
SCHEMA_URL,
};
use prometheus::{Encoder, Registry, TextEncoder};
use smallvec::SmallVec;
use std::borrow::Cow;
use std::io::IsTerminal;
use std::sync::Arc;
use tokio::sync::{Mutex, OnceCell};
use tracing::{info, warn};
use tracing::info;
use tracing_error::ErrorLayer;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
@@ -67,66 +67,20 @@ impl Drop for OtelGuard {
}
}
/// Global registry for Prometheus metrics
static GLOBAL_REGISTRY: OnceCell<Arc<Mutex<Registry>>> = OnceCell::const_new();
/// Get the global registry instance
/// This function returns a reference to the global registry instance.
///
/// # Returns
/// A reference to the global registry instance
///
/// # Example
/// ```
/// use rustfs_obs::get_global_registry;
///
/// let registry = get_global_registry();
/// ```
pub fn get_global_registry() -> Arc<Mutex<Registry>> {
GLOBAL_REGISTRY.get().unwrap().clone()
}
/// Prometheus metric endpoints
/// This function returns a string containing the Prometheus metrics.
/// The metrics are collected from the global registry.
/// The function is used to expose the metrics via an HTTP endpoint.
///
/// # Returns
/// A string containing the Prometheus metrics
///
/// # Example
/// ```
/// use rustfs_obs::metrics;
///
/// async fn main() {
/// let metrics = metrics().await;
/// println!("{}", metrics);
/// }
/// ```
pub async fn metrics() -> String {
let encoder = TextEncoder::new();
// Get a reference to the registry for reading metrics
let registry = get_global_registry().lock().await.to_owned();
let metric_families = registry.gather();
if metric_families.is_empty() {
warn!("No metrics available in Prometheus registry");
} else {
info!("Metrics collected: {} families", metric_families.len());
}
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap_or_else(|_| "Error encoding metrics".to_string())
}
/// create OpenTelemetry Resource
fn resource(config: &OtelConfig) -> Resource {
let config = config.clone();
Resource::builder()
.with_service_name(config.service_name.unwrap_or(SERVICE_NAME.to_string()))
.with_service_name(Cow::Borrowed(config.service_name.as_deref().unwrap_or(SERVICE_NAME)).to_string())
.with_schema_url(
[
KeyValue::new(OTEL_SERVICE_VERSION, config.service_version.unwrap_or(SERVICE_VERSION.to_string())),
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.unwrap_or(ENVIRONMENT.to_string())),
KeyValue::new(
OTEL_SERVICE_VERSION,
Cow::Borrowed(config.service_version.as_deref().unwrap_or(SERVICE_VERSION)).to_string(),
),
KeyValue::new(
DEPLOYMENT_ENVIRONMENT_NAME,
Cow::Borrowed(config.environment.as_deref().unwrap_or(ENVIRONMENT)).to_string(),
),
KeyValue::new(NETWORK_LOCAL_ADDRESS, get_local_ip_with_default()),
],
SCHEMA_URL,
@@ -141,167 +95,169 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout:
.build()
}
/// Initialize Meter Provider
fn init_meter_provider(config: &OtelConfig) -> SdkMeterProvider {
let mut builder = MeterProviderBuilder::default().with_resource(resource(config));
// If endpoint is empty, use stdout output
if config.endpoint.is_empty() {
builder = builder.with_reader(create_periodic_reader(config.meter_interval.unwrap_or(METER_INTERVAL)));
} else {
// If endpoint is not empty, use otlp output
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(&config.endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(config.meter_interval.unwrap_or(METER_INTERVAL)))
.build(),
);
// If use_stdout is true, output to stdout at the same time
if config.use_stdout.unwrap_or(USE_STDOUT) {
builder = builder.with_reader(create_periodic_reader(config.meter_interval.unwrap_or(METER_INTERVAL)));
}
}
let registry = Registry::new();
// Set global registry
GLOBAL_REGISTRY.set(Arc::new(Mutex::new(registry.clone()))).unwrap();
// Create Prometheus exporter
let prometheus_exporter = opentelemetry_prometheus::exporter().with_registry(registry).build().unwrap();
// Build meter provider
let meter_provider = builder.with_reader(prometheus_exporter).build();
global::set_meter_provider(meter_provider.clone());
meter_provider
}
/// Initialize Tracer Provider
fn init_tracer_provider(config: &OtelConfig) -> SdkTracerProvider {
let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO);
let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 {
Sampler::TraceIdRatioBased(sample_ratio)
} else {
Sampler::AlwaysOn
};
let builder = SdkTracerProvider::builder()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource(config));
let tracer_provider = if config.endpoint.is_empty() {
builder
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
.build()
} else {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(&config.endpoint)
.build()
.unwrap();
if config.use_stdout.unwrap_or(USE_STDOUT) {
builder
.with_batch_exporter(exporter)
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
} else {
builder.with_batch_exporter(exporter)
}
.build()
};
global::set_tracer_provider(tracer_provider.clone());
tracer_provider
}
/// Initialize Telemetry
pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
let tracer_provider = init_tracer_provider(config);
let meter_provider = init_meter_provider(config);
// avoid repeated access to configuration fields
let endpoint = &config.endpoint;
let use_stdout = config.use_stdout.unwrap_or(USE_STDOUT);
let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL);
let logger_level = config.logger_level.as_deref().unwrap_or(LOGGER_LEVEL);
let service_name = config.service_name.as_deref().unwrap_or(SERVICE_NAME);
// Initialize logger provider based on configuration
let logger_provider = {
let mut builder = SdkLoggerProvider::builder().with_resource(resource(config));
// Pre-create resource objects to avoid repeated construction
let res = resource(config);
if config.endpoint.is_empty() {
// Use stdout exporter when no endpoint is configured
builder = builder.with_simple_exporter(opentelemetry_stdout::LogExporter::default());
// initialize tracer provider
let tracer_provider = {
let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO);
let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 {
Sampler::TraceIdRatioBased(sample_ratio)
} else {
Sampler::AlwaysOn
};
let builder = SdkTracerProvider::builder()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.with_resource(res.clone());
let tracer_provider = if endpoint.is_empty() {
builder
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
.build()
} else {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.unwrap();
let builder = if use_stdout {
builder
.with_batch_exporter(exporter)
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
} else {
builder.with_batch_exporter(exporter)
};
builder.build()
};
global::set_tracer_provider(tracer_provider.clone());
tracer_provider
};
// initialize meter provider
let meter_provider = {
let mut builder = MeterProviderBuilder::default().with_resource(res.clone());
if endpoint.is_empty() {
builder = builder.with_reader(create_periodic_reader(meter_interval));
} else {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(meter_interval))
.build(),
);
if use_stdout {
builder = builder.with_reader(create_periodic_reader(meter_interval));
}
}
let meter_provider = builder.build();
global::set_meter_provider(meter_provider.clone());
meter_provider
};
// initialize logger provider
let logger_provider = {
let mut builder = SdkLoggerProvider::builder().with_resource(res);
if endpoint.is_empty() {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
} else {
// Configure OTLP exporter when endpoint is provided
let exporter = opentelemetry_otlp::LogExporter::builder()
.with_tonic()
.with_endpoint(&config.endpoint)
.with_endpoint(endpoint)
.build()
.unwrap();
builder = builder.with_batch_exporter(exporter);
// Add stdout exporter if requested
if config.use_stdout.unwrap_or(USE_STDOUT) {
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
}
}
builder.build()
};
let config = config.clone();
let logger_level = config.logger_level.unwrap_or(LOGGER_LEVEL.to_string());
let logger_level = logger_level.as_str();
// Setup OpenTelemetryTracingBridge layer
let otel_layer = {
// Filter to prevent infinite telemetry loops
// This blocks events from OpenTelemetry and its dependent libraries (tonic, reqwest, etc.)
// from being sent back to OpenTelemetry itself
let filter_otel = match logger_level {
"trace" | "debug" => {
info!("OpenTelemetry tracing initialized with level: {}", logger_level);
let mut filter = EnvFilter::new(logger_level);
for directive in ["hyper", "tonic", "h2", "reqwest", "tower"] {
filter = filter.add_directive(format!("{}=off", directive).parse().unwrap());
// configuring tracing
{
// optimize filter configuration
let otel_layer = {
let filter_otel = match logger_level {
"trace" | "debug" => {
info!("OpenTelemetry tracing initialized with level: {}", logger_level);
EnvFilter::new(logger_level)
}
filter
}
_ => {
let mut filter = EnvFilter::new(logger_level);
for directive in ["hyper", "tonic", "h2", "reqwest", "tower"] {
filter = filter.add_directive(format!("{}=off", directive).parse().unwrap());
_ => {
let mut filter = EnvFilter::new(logger_level);
// use smallvec to avoid heap allocation
let directives: SmallVec<[&str; 5]> = smallvec::smallvec!["hyper", "tonic", "h2", "reqwest", "tower"];
for directive in directives {
filter = filter.add_directive(format!("{}=off", directive).parse().unwrap());
}
filter
}
filter
}
};
layer::OpenTelemetryTracingBridge::new(&logger_provider).with_filter(filter_otel)
};
layer::OpenTelemetryTracingBridge::new(&logger_provider).with_filter(filter_otel)
};
let tracer = tracer_provider.tracer(config.service_name.unwrap_or(SERVICE_NAME.to_string()));
let registry = tracing_subscriber::registry()
.with(switch_level(logger_level))
.with(OpenTelemetryLayer::new(tracer))
.with(MetricsLayer::new(meter_provider.clone()))
.with(otel_layer)
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(logger_level)));
let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string());
// Configure formatting layer
let enable_color = std::io::stdout().is_terminal();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(enable_color)
.with_thread_names(true)
.with_file(true)
.with_line_number(true);
// Configure registry to avoid repeated calls to filter methods
let level_filter = switch_level(logger_level);
let registry = tracing_subscriber::registry()
.with(level_filter)
.with(OpenTelemetryLayer::new(tracer))
.with(MetricsLayer::new(meter_provider.clone()))
.with(otel_layer)
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(logger_level)));
// Creating a formatting layer with explicit type to avoid type mismatches
let fmt_layer = fmt_layer.with_filter(
EnvFilter::new(logger_level).add_directive(
format!("opentelemetry={}", if config.endpoint.is_empty() { logger_level } else { "off" })
.parse()
.unwrap(),
),
);
// configure the formatting layer
let enable_color = std::io::stdout().is_terminal();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(enable_color)
.with_thread_names(true)
.with_file(true)
.with_line_number(true)
.with_filter(
EnvFilter::new(logger_level).add_directive(
format!("opentelemetry={}", if endpoint.is_empty() { logger_level } else { "off" })
.parse()
.unwrap(),
),
);
registry.with(ErrorLayer::default()).with(fmt_layer).init();
if !config.endpoint.is_empty() {
info!(
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {}",
config.endpoint, logger_level
);
registry.with(ErrorLayer::default()).with(fmt_layer).init();
if !endpoint.is_empty() {
info!(
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {}",
endpoint, logger_level
);
}
}
OtelGuard {
@@ -313,12 +269,13 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
/// Switch log level
fn switch_level(logger_level: &str) -> tracing_subscriber::filter::LevelFilter {
use tracing_subscriber::filter::LevelFilter;
match logger_level {
"error" => tracing_subscriber::filter::LevelFilter::ERROR,
"warn" => tracing_subscriber::filter::LevelFilter::WARN,
"info" => tracing_subscriber::filter::LevelFilter::INFO,
"debug" => tracing_subscriber::filter::LevelFilter::DEBUG,
"trace" => tracing_subscriber::filter::LevelFilter::TRACE,
_ => tracing_subscriber::filter::LevelFilter::OFF,
"error" => LevelFilter::ERROR,
"warn" => LevelFilter::WARN,
"info" => LevelFilter::INFO,
"debug" => LevelFilter::DEBUG,
"trace" => LevelFilter::TRACE,
_ => LevelFilter::OFF,
}
}

View File

@@ -46,6 +46,7 @@ local-ip-address = { workspace = true }
matchit = { workspace = true }
mime.workspace = true
mime_guess = { workspace = true }
opentelemetry = { workspace = true }
pin-project-lite.workspace = true
protos.workspace = true
query = { workspace = true }
@@ -76,7 +77,7 @@ tokio-stream.workspace = true
tonic = { workspace = true }
tower.workspace = true
transform-stream.workspace = true
tower-http = { workspace = true, features = ["trace", "compression-full", "cors"] }
tower-http = { workspace = true, features = ["trace", "compression-deflate", "compression-gzip", "cors"] }
uuid = { workspace = true }
[target.'cfg(target_os = "linux")'.dependencies]

View File

@@ -271,7 +271,7 @@ pub async fn start_static_file_server(
.route("/config.json", get(config_handler))
.fallback_service(get(static_handler))
.layer(cors)
.layer(tower_http::compression::CompressionLayer::new())
.layer(tower_http::compression::CompressionLayer::new().gzip(true).deflate(true))
.layer(TraceLayer::new_for_http());
let local_addr: SocketAddr = addrs.parse().expect("Failed to parse socket address");
info!("WebUI: http://{}:{} http://127.0.0.1:{}", local_ip, local_addr.port(), local_addr.port());

View File

@@ -14,6 +14,7 @@ use crate::console::{init_console_cfg, CONSOLE_CONFIG};
// Ensure the correct path for parse_license is imported
use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, ShutdownSignal, SHUTDOWN_TIMEOUT};
use crate::utils::error;
use bytes::Bytes;
use chrono::Datelike;
use clap::Parser;
use common::{
@@ -37,6 +38,7 @@ use ecstore::{
};
use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys};
use grpc::make_server;
use http::{HeaderMap, Request as HttpRequest, Response};
use hyper_util::server::graceful::GracefulShutdown;
use hyper_util::{
rt::{TokioExecutor, TokioIo},
@@ -47,17 +49,20 @@ use iam::init_iam_sys;
use license::init_license;
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_event_notifier::NotifierConfig;
use rustfs_obs::{init_obs, load_config, set_global_guard, InitLogStatus};
use rustfs_obs::{init_obs, init_process_observer, load_config, set_global_guard};
use rustls::ServerConfig;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
use service::hybrid;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::signal::unix::{signal, SignalKind};
use tokio_rustls::TlsAcceptor;
use tonic::{metadata::MetadataValue, Request, Status};
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use tracing::Span;
use tracing::{debug, error, info, info_span, warn};
#[cfg(all(target_os = "linux", target_env = "gnu"))]
@@ -103,9 +108,6 @@ async fn main() -> Result<()> {
// Store in global storage
set_global_guard(guard)?;
// Log initialization status
InitLogStatus::init_start_log(&config.observability).await?;
// Initialize event notifier
let notifier_config = opt.clone().event_config;
if notifier_config.is_some() {
@@ -247,6 +249,20 @@ async fn run(opt: config::Opt) -> Result<()> {
b.build()
};
tokio::spawn(async move {
// Record the PID-related metrics of the current process
let meter = opentelemetry::global::meter("system");
let obs_result = init_process_observer(meter).await;
match obs_result {
Ok(_) => {
info!("Process observer initialized successfully");
}
Err(e) => {
error!("Failed to initialize process observer: {}", e);
}
}
});
let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth);
let tls_path = opt.tls_path.clone().unwrap_or_default();
@@ -300,18 +316,53 @@ async fn run(opt: config::Opt) -> Result<()> {
let mut sigint_inner = sigint_inner;
let hybrid_service = TowerToHyperService::new(
tower::ServiceBuilder::new()
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &HttpRequest<_>| {
let span = tracing::debug_span!("http-request",
status_code = tracing::field::Empty,
method = %request.method(),
uri = %request.uri(),
version = ?request.version(),
);
for (header_name, header_value) in request.headers() {
if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length"
{
span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid"));
}
}
span
})
.on_request(|request: &HttpRequest<_>, _span: &Span| {
debug!("started method: {}, url path: {}", request.method(), request.uri().path())
})
.on_response(|response: &Response<_>, latency: Duration, _span: &Span| {
_span.record("status_code", tracing::field::display(response.status()));
debug!("response generated in {:?}", latency)
})
.on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
debug!("sending {} bytes in {:?}", chunk.len(), latency)
})
.on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
debug!("stream closed after {:?}", stream_duration)
})
.on_failure(|_error, latency: Duration, _span: &Span| {
debug!("request error: {:?} in {:?}", _error, latency)
}),
)
.layer(CorsLayer::permissive())
.service(hybrid(s3_service, rpc_service)),
);
let http_server = ConnBuilder::new(TokioExecutor::new());
let http_server = Arc::new(ConnBuilder::new(TokioExecutor::new()));
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
let graceful = GracefulShutdown::new();
let graceful = Arc::new(GracefulShutdown::new());
debug!("graceful initiated");
// 服务准备就绪
worker_state_manager.update(ServiceState::Ready);
let value = hybrid_service.clone();
loop {
debug!("waiting for SIGINT or SIGTERM has_tls_certs: {}", has_tls_certs);
// Wait for a connection
@@ -366,12 +417,17 @@ async fn run(opt: config::Opt) -> Result<()> {
continue;
}
};
let conn = http_server.serve_connection(TokioIo::new(tls_socket), hybrid_service.clone());
let conn = graceful.watch(conn.into_owned());
let http_server_clone = http_server.clone();
let value_clone = value.clone();
let graceful_clone = graceful.clone();
tokio::task::spawn_blocking(move || {
tokio::runtime::Runtime::new()
.expect("Failed to create runtime")
.block_on(async move {
let conn = http_server_clone.serve_connection(TokioIo::new(tls_socket), value_clone);
let conn = graceful_clone.watch(conn);
if let Err(err) = conn.await {
error!("Https Connection error: {}", err);
}
@@ -380,9 +436,13 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("TLS handshake success");
} else {
debug!("Http handshake start");
let conn = http_server.serve_connection(TokioIo::new(socket), hybrid_service.clone());
let conn = graceful.watch(conn.into_owned());
let http_server_clone = http_server.clone();
let value_clone = value.clone();
let graceful_clone = graceful.clone();
tokio::spawn(async move {
let conn = http_server_clone.serve_connection(TokioIo::new(socket), value_clone);
let conn = graceful_clone.watch(conn);
if let Err(err) = conn.await {
error!("Http Connection error: {}", err);
}
@@ -391,12 +451,24 @@ async fn run(opt: config::Opt) -> Result<()> {
}
}
worker_state_manager.update(ServiceState::Stopping);
tokio::select! {
() = graceful.shutdown() => {
debug!("Gracefully shutdown!");
},
() = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
debug!("Waited 10 seconds for graceful shutdown, aborting...");
match Arc::try_unwrap(graceful) {
Ok(g) => {
// Successfully obtaining unique ownership, you can call shutdown
tokio::select! {
() = g.shutdown() => {
debug!("Gracefully shutdown!");
},
() = tokio::time::sleep(Duration::from_secs(10)) => {
debug!("Waited 10 seconds for graceful shutdown, aborting...");
}
}
}
Err(arc_graceful) => {
// There are other references that cannot be obtained for unique ownership
error!("Cannot perform graceful shutdown, other references exist err: {:?}", arc_graceful);
// In this case, we can only wait for the timeout
tokio::time::sleep(Duration::from_secs(10)).await;
debug!("Timeout reached, forcing shutdown");
}
}
worker_state_manager.update(ServiceState::Stopped);

View File

@@ -166,7 +166,8 @@ impl SimpleQueryDispatcher {
if let Some(delimiter) = csv.field_delimiter.as_ref() {
file_format = file_format.with_delimiter(delimiter.as_bytes().first().copied().unwrap_or_default());
}
if csv.file_header_info.is_some() {}
// TODO waiting for processing @junxiang Mu
// if csv.file_header_info.is_some() {}
match csv.file_header_info.as_ref() {
Some(info) => {
if *info == *NONE {

View File

@@ -40,13 +40,13 @@ export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml"
# 如下变量需要必须参数都有值才可以,以及会覆盖配置文件中的值
export RUSTFS__OBSERVABILITY__ENDPOINT=http://localhost:4317
export RUSTFS__OBSERVABILITY__USE_STDOUT=true
export RUSTFS__OBSERVABILITY__USE_STDOUT=false
export RUSTFS__OBSERVABILITY__SAMPLE_RATIO=2.0
export RUSTFS__OBSERVABILITY__METER_INTERVAL=30
export RUSTFS__OBSERVABILITY__SERVICE_NAME=rustfs
export RUSTFS__OBSERVABILITY__SERVICE_VERSION=0.1.0
export RUSTFS__OBSERVABILITY__ENVIRONMENT=develop
export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=info
export RUSTFS__OBSERVABILITY__LOGGER_LEVEL=debug
export RUSTFS__SINKS__FILE__ENABLED=true
export RUSTFS__SINKS__FILE__PATH="./deploy/logs/rustfs.log"
export RUSTFS__SINKS__WEBHOOK__ENABLED=false