improve code for opentelemetry and add system metrics

This commit is contained in:
houseme
2025-04-24 01:41:47 +08:00
parent 873a04aed0
commit ef3f86ccf5
9 changed files with 745 additions and 270 deletions

152
Cargo.lock generated
View File

@@ -5250,6 +5250,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "ntapi"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4"
dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@@ -5412,6 +5421,29 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3c00a0c9600379bd32f8972de90676a7672cba3bf4886986bc05902afc1e093"
[[package]]
name = "nvml-wrapper"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c9bff0aa1d48904a1385ea2a8b97576fbdcbc9a3cfccd0d31fe978e1c4038c5"
dependencies = [
"bitflags 2.9.0",
"libloading 0.8.6",
"nvml-wrapper-sys",
"static_assertions",
"thiserror 1.0.69",
"wrapcenum-derive",
]
[[package]]
name = "nvml-wrapper-sys"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "698d45156f28781a4e79652b6ebe2eaa0589057d588d3aec1333f6466f13fcb5"
dependencies = [
"libloading 0.8.6",
]
[[package]]
name = "objc"
version = "0.2.7"
@@ -5715,19 +5747,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "opentelemetry-prometheus"
version = "0.29.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "098a71a4430bb712be6130ed777335d2e5b19bc8566de5f2edddfce906def6ab"
dependencies = [
"once_cell",
"opentelemetry",
"opentelemetry_sdk",
"prometheus",
"tracing",
]
[[package]]
name = "opentelemetry-proto"
version = "0.29.0"
@@ -6486,21 +6505,6 @@ dependencies = [
"version_check",
]
[[package]]
name = "prometheus"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot 0.12.3",
"protobuf",
"thiserror 2.0.12",
]
[[package]]
name = "prost"
version = "0.13.5"
@@ -7247,6 +7251,7 @@ dependencies = [
"mime",
"mime_guess",
"netif",
"opentelemetry",
"pin-project-lite",
"policy",
"prost-build",
@@ -7331,18 +7336,19 @@ dependencies = [
"chrono",
"config",
"local-ip-address",
"nvml-wrapper",
"opentelemetry",
"opentelemetry-appender-tracing",
"opentelemetry-otlp",
"opentelemetry-prometheus",
"opentelemetry-semantic-conventions",
"opentelemetry-stdout",
"opentelemetry_sdk",
"prometheus",
"rdkafka",
"reqwest",
"serde",
"serde_json",
"smallvec",
"sysinfo",
"thiserror 2.0.12",
"tokio",
"tracing",
@@ -8273,6 +8279,19 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "sysinfo"
version = "0.34.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4b93974b3d3aeaa036504b8eefd4c039dced109171c1ae973f1dc63b2c7e4b2"
dependencies = [
"libc",
"memchr",
"ntapi",
"objc2-core-foundation",
"windows 0.57.0",
]
[[package]]
name = "system-configuration"
version = "0.6.1"
@@ -8341,7 +8360,7 @@ dependencies = [
"tao-macros",
"unicode-segmentation",
"url",
"windows",
"windows 0.58.0",
"windows-core 0.58.0",
"windows-version",
"x11-dl",
@@ -9482,7 +9501,7 @@ checksum = "6f61ff3d9d0ee4efcb461b14eb3acfda2702d10dc329f339303fc3e57215ae2c"
dependencies = [
"webview2-com-macros",
"webview2-com-sys",
"windows",
"windows 0.58.0",
"windows-core 0.58.0",
"windows-implement 0.58.0",
"windows-interface 0.58.0",
@@ -9506,7 +9525,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3a3e2eeb58f82361c93f9777014668eb3d07e7d174ee4c819575a9208011886"
dependencies = [
"thiserror 1.0.69",
"windows",
"windows 0.58.0",
"windows-core 0.58.0",
]
@@ -9553,6 +9572,16 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows"
version = "0.57.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143"
dependencies = [
"windows-core 0.57.0",
"windows-targets 0.52.6",
]
[[package]]
name = "windows"
version = "0.58.0"
@@ -9563,6 +9592,18 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-core"
version = "0.57.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d"
dependencies = [
"windows-implement 0.57.0",
"windows-interface 0.57.0",
"windows-result 0.1.2",
"windows-targets 0.52.6",
]
[[package]]
name = "windows-core"
version = "0.58.0"
@@ -9589,6 +9630,17 @@ dependencies = [
"windows-strings 0.4.0",
]
[[package]]
name = "windows-implement"
version = "0.57.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "windows-implement"
version = "0.58.0"
@@ -9611,6 +9663,17 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "windows-interface"
version = "0.57.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "windows-interface"
version = "0.58.0"
@@ -9650,6 +9713,15 @@ dependencies = [
"windows-targets 0.53.0",
]
[[package]]
name = "windows-result"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8"
dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-result"
version = "0.2.0"
@@ -10019,6 +10091,18 @@ dependencies = [
"tracing",
]
[[package]]
name = "wrapcenum-derive"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a76ff259533532054cfbaefb115c613203c73707017459206380f03b3b3f266e"
dependencies = [
"darling",
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "write16"
version = "1.0.0"
@@ -10065,7 +10149,7 @@ dependencies = [
"webkit2gtk",
"webkit2gtk-sys",
"webview2-com",
"windows",
"windows 0.58.0",
"windows-core 0.58.0",
"windows-version",
"x11-dl",

View File

@@ -89,13 +89,13 @@ md-5 = "0.10.6"
mime = "0.3.17"
mime_guess = "2.0.5"
netif = "0.1.6"
nvml-wrapper = "0.10.0"
object_store = "0.11.2"
opentelemetry = { version = "0.29.1" }
opentelemetry-appender-tracing = { version = "0.29.1", features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
opentelemetry_sdk = { version = "0.29.0" }
opentelemetry-stdout = { version = "0.29.0" }
opentelemetry-otlp = { version = "0.29.0" }
opentelemetry-prometheus = { version = "0.29.1" }
opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] }
parking_lot = "0.12.3"
pin-project-lite = "0.2.16"
@@ -123,10 +123,11 @@ serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
serde_urlencoded = "0.7.1"
serde_with = "3.12.0"
smallvec = { version = "1.15.0", features = ["serde"] }
strum = { version = "0.27.1", features = ["derive"] }
sha2 = "0.10.8"
smallvec = { version = "1.15.0", features = ["serde"] }
snafu = "0.8.5"
strum = { version = "0.27.1", features = ["derive"] }
sysinfo = "0.34.2"
tempfile = "3.19.1"
test-case = "3.3.1"
thiserror = "2.0.12"
@@ -172,7 +173,10 @@ inherits = "dev"
[profile.release]
opt-level = 3
lto = "thin"
lto = "fat"
codegen-units = 1
panic = "abort" # Optional, remove the panic expansion code
strip = true # strip symbol information to reduce binary size
[profile.production]
inherits = "release"

View File

@@ -11,32 +11,35 @@ workspace = true
[features]
default = ["file"]
file = []
gpu = ["dep:nvml-wrapper"]
kafka = ["dep:rdkafka"]
webhook = ["dep:reqwest"]
file = []
full = ["file", "gpu", "kafka", "webhook"]
[dependencies]
async-trait = { workspace = true }
chrono = { workspace = true }
config = { workspace = true }
nvml-wrapper = { workspace = true, optional = true }
opentelemetry = { workspace = true }
opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
opentelemetry-stdout = { workspace = true }
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "gzip-tonic"] }
opentelemetry-prometheus = { workspace = true }
opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] }
prometheus = { workspace = true }
serde = { workspace = true }
smallvec = { workspace = true, features = ["serde"] }
tracing = { workspace = true, features = ["std", "attributes"] }
tracing-core = { workspace = true }
tracing-error = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt", "env-filter", "tracing-log", "time", "local-time", "json"] }
tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread"] }
tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread", "rt", "time", "macros"] }
rdkafka = { workspace = true, features = ["tokio"], optional = true }
reqwest = { workspace = true, optional = true, default-features = false }
serde_json = { workspace = true }
sysinfo = { workspace = true }
thiserror = { workspace = true }
local-ip-address = { workspace = true }

View File

@@ -16,11 +16,24 @@ static GLOBAL_GUARD: OnceCell<Arc<Mutex<OtelGuard>>> = OnceCell::const_new();
/// Error type for global guard operations
#[derive(Debug, thiserror::Error)]
pub enum GuardError {
pub enum GlobalError {
#[error("Failed to set global guard: {0}")]
SetError(#[from] SetError<Arc<Mutex<OtelGuard>>>),
#[error("Global guard not initialized")]
NotInitialized,
#[error("Global system metrics err: {0}")]
MetricsError(String),
#[error("Failed to get process ID: {0}")]
GetPidError(String),
#[error("Type conversion error: {0}")]
ConversionError(String),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("System metrics error: {0}")]
SystemMetricsError(String),
#[cfg(feature = "gpu")]
#[error("GPU metrics error: {0}")]
GpuMetricsError(String),
}
/// Set the global guard for OpenTelemetry
@@ -43,9 +56,9 @@ pub enum GuardError {
/// Ok(())
/// }
/// ```
pub fn set_global_guard(guard: OtelGuard) -> Result<(), GuardError> {
pub fn set_global_guard(guard: OtelGuard) -> Result<(), GlobalError> {
info!("Initializing global OpenTelemetry guard");
GLOBAL_GUARD.set(Arc::new(Mutex::new(guard))).map_err(GuardError::SetError)
GLOBAL_GUARD.set(Arc::new(Mutex::new(guard))).map_err(GlobalError::SetError)
}
/// Get the global guard for OpenTelemetry
@@ -65,8 +78,8 @@ pub fn set_global_guard(guard: OtelGuard) -> Result<(), GuardError> {
/// Ok(())
/// }
/// ```
pub fn get_global_guard() -> Result<Arc<Mutex<OtelGuard>>, GuardError> {
GLOBAL_GUARD.get().cloned().ok_or(GuardError::NotInitialized)
pub fn get_global_guard() -> Result<Arc<Mutex<OtelGuard>>, GlobalError> {
GLOBAL_GUARD.get().cloned().ok_or(GlobalError::NotInitialized)
}
/// Try to get the global guard for OpenTelemetry
@@ -84,6 +97,6 @@ mod tests {
#[tokio::test]
async fn test_get_uninitialized_guard() {
let result = get_global_guard();
assert!(matches!(result, Err(GuardError::NotInitialized)));
assert!(matches!(result, Err(GlobalError::NotInitialized)));
}
}

View File

@@ -1,23 +1,24 @@
/// # obs
///
/// `obs` is a logging and observability library for Rust.
/// It provides a simple and easy-to-use interface for logging and observability.
/// It is built on top of the `log` crate and `opentelemetry` crate.
///
/// ## Features
/// - Structured logging
/// - Distributed tracing
/// - Metrics collection
/// - Log processing worker
/// - Multiple sinks
/// - Configuration-based setup
/// - Telemetry guard
/// - Global logger
/// - Log levels
/// - Log entry types
/// - Log record
/// - Object version
/// - Local IP address
//! # RustFS Observability
//!
//! provides tools for system and service monitoring
//!
//! ## feature mark
//!
//! - `file`: enable file logging enabled by default
//! - `gpu`: gpu monitoring function
//! - `kafka`: enable kafka metric output
//! - `webhook`: enable webhook notifications
//! - `full`: includes all functions
//!
//! to enable gpu monitoring add in cargo toml
//!
//! ```toml
//! # using gpu monitoring
//! rustfs-obs = { version = "0.1.0", features = ["gpu"] }
//!
//! # use all functions
//! rustfs-obs = { version = "0.1.0", features = ["full"] }
//! ```
///
/// ## Usage
///
@@ -31,10 +32,15 @@ mod config;
mod entry;
mod global;
mod logger;
mod metrics;
mod sink;
mod telemetry;
mod utils;
mod worker;
#[cfg(feature = "gpu")]
pub use crate::metrics::init_gpu_metrics;
pub use config::load_config;
pub use config::{AppConfig, OtelConfig};
pub use entry::args::Args;
@@ -42,15 +48,15 @@ pub use entry::audit::{ApiDetails, AuditLogEntry};
pub use entry::base::BaseLogEntry;
pub use entry::unified::{ConsoleLogEntry, ServerLogEntry, UnifiedLogEntry};
pub use entry::{LogKind, LogRecord, ObjectVersion, SerializableLevel};
pub use global::{get_global_guard, set_global_guard, try_get_global_guard, GuardError};
pub use global::{get_global_guard, set_global_guard, try_get_global_guard, GlobalError};
pub use logger::{ensure_logger_initialized, log_debug, log_error, log_info, log_trace, log_warn, log_with_context};
pub use logger::{get_global_logger, init_global_logger, locked_logger, start_logger};
pub use logger::{log_init_state, InitLogStatus};
pub use logger::{LogError, Logger};
pub use metrics::{init_system_metrics, init_system_metrics_for_pid};
pub use sink::Sink;
use std::sync::Arc;
pub use telemetry::init_telemetry;
pub use telemetry::{get_global_registry, metrics};
use tokio::sync::Mutex;
pub use utils::{get_local_ip, get_local_ip_with_default};
pub use worker::start_worker;

394
crates/obs/src/metrics.rs Normal file
View File

@@ -0,0 +1,394 @@
//! # Metrics
//! This file is part of the RustFS project
//! Current metrics observed are:
//! - CPU
//! - Memory
//! - Disk
//! - Network
//!
//! # Getting started
//!
//! ```
//! use opentelemetry::global;
//! use rustfs_obs::init_system_metrics;
//!
//! #[tokio::main]
//! async fn main() {
//! let meter = global::meter("rustfs-system-meter");
//! let result = init_system_metrics(meter);
//! }
//! ```
//!
use crate::GlobalError;
#[cfg(feature = "gpu")]
use nvml_wrapper::enums::device::UsedGpuMemory;
#[cfg(feature = "gpu")]
use nvml_wrapper::Nvml;
use opentelemetry::metrics::Meter;
use opentelemetry::Key;
use opentelemetry::KeyValue;
use std::time::Duration;
use sysinfo::{get_current_pid, System};
use tokio::time::sleep;
use tracing::warn;
const PROCESS_PID: Key = Key::from_static_str("process.pid");
const PROCESS_EXECUTABLE_NAME: Key = Key::from_static_str("process.executable.name");
const PROCESS_EXECUTABLE_PATH: Key = Key::from_static_str("process.executable.path");
const PROCESS_COMMAND: Key = Key::from_static_str("process.command");
const PROCESS_CPU_USAGE: &str = "process.cpu.usage";
const PROCESS_CPU_UTILIZATION: &str = "process.cpu.utilization";
const PROCESS_MEMORY_USAGE: &str = "process.memory.usage";
const PROCESS_MEMORY_VIRTUAL: &str = "process.memory.virtual";
const PROCESS_DISK_IO: &str = "process.disk.io";
const DIRECTION: Key = Key::from_static_str("direction");
const PROCESS_GPU_MEMORY_USAGE: &str = "process.gpu.memory.usage";
// add static variables that delay initialize nvml
#[cfg(feature = "gpu")]
static NVML_INSTANCE: OnceCell<Arc<Mutex<Option<Result<Nvml, nvml_wrapper::error::NvmlError>>>>> = OnceCell::const_new();
// get or initialize an nvml instance
#[cfg(feature = "gpu")]
async fn get_or_init_nvml() -> &'static Arc<Mutex<Option<Result<Nvml, nvml_wrapper::error::NvmlError>>>> {
NVML_INSTANCE
.get_or_init(|| async { Arc::new(Mutex::new(Some(Nvml::init()))) })
.await
}
/// Record asynchronously information about the current process.
/// This function is useful for monitoring the current process.
///
/// # Arguments
/// * `meter` - The OpenTelemetry meter to use.
///
/// # Returns
/// * `Ok(())` if successful
/// * `Err(GlobalError)` if an error occurs
///
/// # Example
/// ```
/// use opentelemetry::global;
/// use rustfs_obs::init_system_metrics;
///
/// #[tokio::main]
/// async fn main() {
/// let meter = global::meter("rustfs-system-meter");
/// let result = init_system_metrics(meter);
/// }
/// ```
pub async fn init_system_metrics(meter: Meter) -> Result<(), GlobalError> {
let pid = get_current_pid().map_err(|err| GlobalError::MetricsError(err.to_string()))?;
register_system_metrics(meter, pid).await
}
/// Record asynchronously information about a specific process by its PID.
/// This function is useful for monitoring processes other than the current one.
///
/// # Arguments
/// * `meter` - The OpenTelemetry meter to use.
/// * `pid` - The PID of the process to monitor.
///
/// # Returns
/// * `Ok(())` if successful
/// * `Err(GlobalError)` if an error occurs
///
/// # Example
/// ```
/// use opentelemetry::global;
/// use rustfs_obs::init_system_metrics_for_pid;
///
/// #[tokio::main]
/// async fn main() {
/// let meter = global::meter("rustfs-system-meter");
/// // replace with the actual PID
/// let pid = 1234;
/// let result = init_system_metrics_for_pid(meter, pid).await;
/// }
/// ```
///
pub async fn init_system_metrics_for_pid(meter: Meter, pid: u32) -> Result<(), GlobalError> {
let pid = sysinfo::Pid::from_u32(pid);
register_system_metrics(meter, pid).await
}
/// Register system metrics for the current process.
/// This function is useful for monitoring the current process.
///
/// # Arguments
/// * `meter` - The OpenTelemetry meter to use.
/// * `pid` - The PID of the process to monitor.
///
/// # Returns
/// * `Ok(())` if successful
/// * `Err(GlobalError)` if an error occurs
///
async fn register_system_metrics(meter: Meter, pid: sysinfo::Pid) -> Result<(), GlobalError> {
// cache core counts to avoid repeated calculations
let core_count = System::physical_core_count()
.ok_or_else(|| GlobalError::SystemMetricsError("Could not get physical core count".to_string()))?;
let core_count_f32 = core_count as f32;
// create metric meter
let (
process_cpu_utilization,
process_cpu_usage,
process_memory_usage,
process_memory_virtual,
process_disk_io,
process_gpu_memory_usage,
) = create_metrics(&meter);
// initialize system object
let mut sys = System::new_all();
sys.refresh_all();
// Prepare public properties to avoid repeated construction in loops
let common_attributes = prepare_common_attributes(&sys, pid)?;
// get the metric export interval
let interval = get_export_interval();
// Use asynchronous tasks to process CPU, memory, and disk metrics to avoid blocking the main asynchronous tasks
let cpu_mem_task = tokio::spawn(async move {
loop {
sleep(Duration::from_millis(interval)).await;
if let Err(e) = update_process_metrics(
&mut sys,
pid,
&process_cpu_usage,
&process_cpu_utilization,
&process_memory_usage,
&process_memory_virtual,
&process_disk_io,
&common_attributes,
core_count_f32,
) {
warn!("Failed to update process metrics: {}", e);
}
}
});
// Use another asynchronous task to handle GPU metrics
#[cfg(feature = "gpu")]
let gpu_task = tokio::spawn(async move {
loop {
sleep(Duration::from_millis(interval)).await;
// delayed initialization nvml
let nvml_arc = get_or_init_nvml().await;
let nvml_option = nvml_arc.lock().unwrap();
if let Err(e) = update_gpu_metrics(&nvml, pid, &process_gpu_memory_usage, &common_attributes) {
warn!("Failed to update GPU metrics: {}", e);
}
}
});
// record empty values when non gpu function
#[cfg(not(feature = "gpu"))]
let gpu_task = tokio::spawn(async move {
loop {
sleep(Duration::from_millis(interval)).await;
process_gpu_memory_usage.record(0, &common_attributes);
}
});
// Wait for the two tasks to complete (actually they will run forever)
let _ = tokio::join!(cpu_mem_task, gpu_task);
Ok(())
}
fn create_metrics(meter: &Meter) -> (F64Gauge, F64Gauge, I64Gauge, I64Gauge, I64Gauge, U64Gauge) {
let process_cpu_utilization = meter
.f64_gauge(PROCESS_CPU_USAGE)
.with_description("The percentage of CPU in use.")
.with_unit("percent")
.build();
let process_cpu_usage = meter
.f64_gauge(PROCESS_CPU_UTILIZATION)
.with_description("The amount of CPU in use.")
.with_unit("percent")
.build();
let process_memory_usage = meter
.i64_gauge(PROCESS_MEMORY_USAGE)
.with_description("The amount of physical memory in use.")
.with_unit("byte")
.build();
let process_memory_virtual = meter
.i64_gauge(PROCESS_MEMORY_VIRTUAL)
.with_description("The amount of committed virtual memory.")
.with_unit("byte")
.build();
let process_disk_io = meter
.i64_gauge(PROCESS_DISK_IO)
.with_description("Disk bytes transferred.")
.with_unit("byte")
.build();
let process_gpu_memory_usage = meter
.u64_gauge(PROCESS_GPU_MEMORY_USAGE)
.with_description("The amount of physical GPU memory in use.")
.with_unit("byte")
.build();
(
process_cpu_utilization,
process_cpu_usage,
process_memory_usage,
process_memory_virtual,
process_disk_io,
process_gpu_memory_usage,
)
}
fn prepare_common_attributes(sys: &System, pid: sysinfo::Pid) -> Result<[KeyValue; 4], GlobalError> {
let process = sys
.process(pid)
.ok_or_else(|| GlobalError::SystemMetricsError(format!("Process with PID {} not found", pid.as_u32())))?;
// optimize string operations and reduce allocation
let cmd = process.cmd().iter().filter_map(|s| s.to_str()).collect::<Vec<_>>().join(" ");
let executable_path = process
.exe()
.map(|path| path.to_string_lossy().into_owned())
.unwrap_or_default();
let name = process.name().to_os_string().into_string().unwrap_or_default();
Ok([
KeyValue::new(PROCESS_PID, pid.as_u32() as i64),
KeyValue::new(PROCESS_EXECUTABLE_NAME, name),
KeyValue::new(PROCESS_EXECUTABLE_PATH, executable_path),
KeyValue::new(PROCESS_COMMAND, cmd),
])
}
fn get_export_interval() -> u64 {
std::env::var("OTEL_METRIC_EXPORT_INTERVAL")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(30000)
}
fn update_process_metrics(
sys: &mut System,
pid: sysinfo::Pid,
process_cpu_usage: &F64Gauge,
process_cpu_utilization: &F64Gauge,
process_memory_usage: &I64Gauge,
process_memory_virtual: &I64Gauge,
process_disk_io: &I64Gauge,
common_attributes: &[KeyValue; 4],
core_count: f32,
) -> Result<(), GlobalError> {
// Only refresh the data of the required process to reduce system call overhead
sys.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true);
let process = match sys.process(pid) {
Some(p) => p,
None => {
return Err(GlobalError::SystemMetricsError(format!(
"Process with PID {} no longer exists",
pid.as_u32()
)))
}
};
// collect data in batches and record it again
let cpu_usage = process.cpu_usage();
process_cpu_usage.record(cpu_usage.into(), &[]);
process_cpu_utilization.record((cpu_usage / core_count).into(), &common_attributes);
// safe type conversion
let memory = process.memory();
let virtual_memory = process.virtual_memory();
// Avoid multiple error checks and use .map_err to handle errors in chain
let memory_i64 =
i64::try_from(memory).map_err(|_| GlobalError::ConversionError("Failed to convert memory usage to i64".to_string()))?;
let virtual_memory_i64 = i64::try_from(virtual_memory)
.map_err(|_| GlobalError::ConversionError("Failed to convert virtual memory to i64".to_string()))?;
process_memory_usage.record(memory_i64, common_attributes);
process_memory_virtual.record(virtual_memory_i64, common_attributes);
// process disk io metrics
let disk_io = process.disk_usage();
// batch conversion to reduce duplicate code
let read_bytes_i64 = i64::try_from(disk_io.read_bytes)
.map_err(|_| GlobalError::ConversionError("Failed to convert read bytes to i64".to_string()))?;
let written_bytes_i64 = i64::try_from(disk_io.written_bytes)
.map_err(|_| GlobalError::ConversionError("Failed to convert written bytes to i64".to_string()))?;
// Optimize attribute array stitching to reduce heap allocation
let mut read_attributes = [KeyValue::new(DIRECTION, "read")];
let read_attrs = [common_attributes, &read_attributes].concat();
let mut write_attributes = [KeyValue::new(DIRECTION, "write")];
let write_attrs = [common_attributes, &write_attributes].concat();
process_disk_io.record(read_bytes_i64, &read_attrs);
process_disk_io.record(written_bytes_i64, &write_attrs);
Ok(())
}
// GPU metric update function, conditional compilation based on feature flags
#[cfg(feature = "gpu")]
fn update_gpu_metrics(
nvml: &Result<Nvml, nvml_wrapper::error::NvmlError>,
pid: sysinfo::Pid,
process_gpu_memory_usage: &U64Gauge,
common_attributes: &[KeyValue; 4],
) -> Result<(), GlobalError> {
match nvml {
Ok(nvml) => {
if let Ok(device) = nvml.device_by_index(0) {
if let Ok(gpu_stats) = device.running_compute_processes() {
for stat in gpu_stats.iter() {
if stat.pid == pid.as_u32() {
let memory_used = match stat.used_gpu_memory {
UsedGpuMemory::Used(bytes) => bytes,
UsedGpuMemory::Unavailable => 0,
};
process_gpu_memory_usage.record(memory_used, common_attributes);
return Ok(());
}
}
}
}
// If no GPU usage record of the process is found, the record is 0
process_gpu_memory_usage.record(0, common_attributes);
Ok(())
}
Err(e) => {
warn!("Could not get NVML, recording 0 for GPU memory usage: {}", e);
process_gpu_memory_usage.record(0, common_attributes);
Ok(())
}
}
}
#[cfg(not(feature = "gpu"))]
fn update_gpu_metrics(
_: &(), // blank placeholder parameters
_: sysinfo::Pid,
process_gpu_memory_usage: &U64Gauge,
common_attributes: &[KeyValue; 4],
) -> Result<(), GlobalError> {
// always logged when non gpu function 0
process_gpu_memory_usage.record(0, common_attributes);
Ok(())
}

View File

@@ -14,11 +14,10 @@ use opentelemetry_semantic_conventions::{
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
SCHEMA_URL,
};
use prometheus::{Encoder, Registry, TextEncoder};
use smallvec::SmallVec;
use std::borrow::Cow;
use std::io::IsTerminal;
use std::sync::Arc;
use tokio::sync::{Mutex, OnceCell};
use tracing::{info, warn};
use tracing::info;
use tracing_error::ErrorLayer;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
@@ -67,66 +66,20 @@ impl Drop for OtelGuard {
}
}
/// Global registry for Prometheus metrics
static GLOBAL_REGISTRY: OnceCell<Arc<Mutex<Registry>>> = OnceCell::const_new();
/// Get the global registry instance
/// This function returns a reference to the global registry instance.
///
/// # Returns
/// A reference to the global registry instance
///
/// # Example
/// ```
/// use rustfs_obs::get_global_registry;
///
/// let registry = get_global_registry();
/// ```
pub fn get_global_registry() -> Arc<Mutex<Registry>> {
GLOBAL_REGISTRY.get().unwrap().clone()
}
/// Prometheus metric endpoints
/// This function returns a string containing the Prometheus metrics.
/// The metrics are collected from the global registry.
/// The function is used to expose the metrics via an HTTP endpoint.
///
/// # Returns
/// A string containing the Prometheus metrics
///
/// # Example
/// ```
/// use rustfs_obs::metrics;
///
/// async fn main() {
/// let metrics = metrics().await;
/// println!("{}", metrics);
/// }
/// ```
pub async fn metrics() -> String {
let encoder = TextEncoder::new();
// Get a reference to the registry for reading metrics
let registry = get_global_registry().lock().await.to_owned();
let metric_families = registry.gather();
if metric_families.is_empty() {
warn!("No metrics available in Prometheus registry");
} else {
info!("Metrics collected: {} families", metric_families.len());
}
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap_or_else(|_| "Error encoding metrics".to_string())
}
/// create OpenTelemetry Resource
fn resource(config: &OtelConfig) -> Resource {
let config = config.clone();
Resource::builder()
.with_service_name(config.service_name.unwrap_or(SERVICE_NAME.to_string()))
.with_service_name(Cow::Borrowed(config.service_name.as_deref().unwrap_or(SERVICE_NAME)).to_string())
.with_schema_url(
[
KeyValue::new(OTEL_SERVICE_VERSION, config.service_version.unwrap_or(SERVICE_VERSION.to_string())),
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.unwrap_or(ENVIRONMENT.to_string())),
KeyValue::new(
OTEL_SERVICE_VERSION,
Cow::Borrowed(config.service_version.as_deref().unwrap_or(SERVICE_VERSION)).to_string(),
),
KeyValue::new(
DEPLOYMENT_ENVIRONMENT_NAME,
Cow::Borrowed(config.environment.as_deref().unwrap_or(ENVIRONMENT)).to_string(),
),
KeyValue::new(NETWORK_LOCAL_ADDRESS, get_local_ip_with_default()),
],
SCHEMA_URL,
@@ -141,163 +94,169 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout:
.build()
}
/// Initialize Meter Provider
fn init_meter_provider(config: &OtelConfig) -> SdkMeterProvider {
let mut builder = MeterProviderBuilder::default().with_resource(resource(config));
// If endpoint is empty, use stdout output
if config.endpoint.is_empty() {
builder = builder.with_reader(create_periodic_reader(config.meter_interval.unwrap_or(METER_INTERVAL)));
} else {
// If endpoint is not empty, use otlp output
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(&config.endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(config.meter_interval.unwrap_or(METER_INTERVAL)))
.build(),
);
// If use_stdout is true, output to stdout at the same time
if config.use_stdout.unwrap_or(USE_STDOUT) {
builder = builder.with_reader(create_periodic_reader(config.meter_interval.unwrap_or(METER_INTERVAL)));
}
}
let registry = Registry::new();
// Set global registry
GLOBAL_REGISTRY.set(Arc::new(Mutex::new(registry.clone()))).unwrap();
// Create Prometheus exporter
let prometheus_exporter = opentelemetry_prometheus::exporter().with_registry(registry).build().unwrap();
// Build meter provider
let meter_provider = builder.with_reader(prometheus_exporter).build();
global::set_meter_provider(meter_provider.clone());
meter_provider
}
/// Initialize Tracer Provider
fn init_tracer_provider(config: &OtelConfig) -> SdkTracerProvider {
let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO);
let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 {
Sampler::TraceIdRatioBased(sample_ratio)
} else {
Sampler::AlwaysOn
};
let builder = SdkTracerProvider::builder()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource(config));
let tracer_provider = if config.endpoint.is_empty() {
builder
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
.build()
} else {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(&config.endpoint)
.build()
.unwrap();
if config.use_stdout.unwrap_or(USE_STDOUT) {
builder
.with_batch_exporter(exporter)
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
} else {
builder.with_batch_exporter(exporter)
}
.build()
};
global::set_tracer_provider(tracer_provider.clone());
tracer_provider
}
/// Initialize Telemetry
pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
let tracer_provider = init_tracer_provider(config);
let meter_provider = init_meter_provider(config);
// avoid repeated access to configuration fields
let endpoint = &config.endpoint;
let use_stdout = config.use_stdout.unwrap_or(USE_STDOUT);
let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL);
let logger_level = config.logger_level.as_deref().unwrap_or(LOGGER_LEVEL);
let service_name = config.service_name.as_deref().unwrap_or(SERVICE_NAME);
// Initialize logger provider based on configuration
let logger_provider = {
let mut builder = SdkLoggerProvider::builder().with_resource(resource(config));
// Pre-create resource objects to avoid repeated construction
let res = resource(config);
if config.endpoint.is_empty() {
// Use stdout exporter when no endpoint is configured
builder = builder.with_simple_exporter(opentelemetry_stdout::LogExporter::default());
// initialize tracer provider
let tracer_provider = {
let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO);
let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 {
Sampler::TraceIdRatioBased(sample_ratio)
} else {
Sampler::AlwaysOn
};
let builder = SdkTracerProvider::builder()
.with_sampler(sampler)
.with_id_generator(RandomIdGenerator::default())
.with_resource(res.clone());
let tracer_provider = if endpoint.is_empty() {
builder
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
.build()
} else {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.unwrap();
let builder = if use_stdout {
builder
.with_batch_exporter(exporter)
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
} else {
builder.with_batch_exporter(exporter)
};
builder.build()
};
global::set_tracer_provider(tracer_provider.clone());
tracer_provider
};
// initialize meter provider
let meter_provider = {
let mut builder = MeterProviderBuilder::default().with_resource(res.clone());
if endpoint.is_empty() {
builder = builder.with_reader(create_periodic_reader(meter_interval));
} else {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(meter_interval))
.build(),
);
if use_stdout {
builder = builder.with_reader(create_periodic_reader(meter_interval));
}
}
let meter_provider = builder.build();
global::set_meter_provider(meter_provider.clone());
meter_provider
};
// initialize logger provider
let logger_provider = {
let mut builder = SdkLoggerProvider::builder().with_resource(res);
if endpoint.is_empty() {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
} else {
// Configure OTLP exporter when endpoint is provided
let exporter = opentelemetry_otlp::LogExporter::builder()
.with_tonic()
.with_endpoint(&config.endpoint)
.with_endpoint(endpoint)
.build()
.unwrap();
builder = builder.with_batch_exporter(exporter);
// Add stdout exporter if requested
if config.use_stdout.unwrap_or(USE_STDOUT) {
if use_stdout {
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
}
}
builder.build()
};
let config = config.clone();
let logger_level = config.logger_level.unwrap_or(LOGGER_LEVEL.to_string());
let logger_level = logger_level.as_str();
// Setup OpenTelemetryTracingBridge layer
let otel_layer = {
// Filter to prevent infinite telemetry loops
// This blocks events from OpenTelemetry and its dependent libraries (tonic, reqwest, etc.)
// from being sent back to OpenTelemetry itself
let filter_otel = match logger_level {
"trace" | "debug" => {
info!("OpenTelemetry tracing initialized with level: {}", logger_level);
EnvFilter::new(logger_level)
}
_ => {
let mut filter = EnvFilter::new(logger_level);
for directive in ["hyper", "tonic", "h2", "reqwest", "tower"] {
filter = filter.add_directive(format!("{}=off", directive).parse().unwrap());
// configuring tracing
{
// optimize filter configuration
let otel_layer = {
let filter_otel = match logger_level {
"trace" | "debug" => {
info!("OpenTelemetry tracing initialized with level: {}", logger_level);
EnvFilter::new(logger_level)
}
filter
}
_ => {
let mut filter = EnvFilter::new(logger_level);
// use smallvec to avoid heap allocation
let directives: SmallVec<[&str; 5]> = smallvec::smallvec!["hyper", "tonic", "h2", "reqwest", "tower"];
for directive in directives {
filter = filter.add_directive(format!("{}=off", directive).parse().unwrap());
}
filter
}
};
layer::OpenTelemetryTracingBridge::new(&logger_provider).with_filter(filter_otel)
};
layer::OpenTelemetryTracingBridge::new(&logger_provider).with_filter(filter_otel)
};
let tracer = tracer_provider.tracer(config.service_name.unwrap_or(SERVICE_NAME.to_string()));
let registry = tracing_subscriber::registry()
.with(switch_level(logger_level))
.with(OpenTelemetryLayer::new(tracer))
.with(MetricsLayer::new(meter_provider.clone()))
.with(otel_layer)
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(logger_level)));
let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string());
// Configure formatting layer
let enable_color = std::io::stdout().is_terminal();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(enable_color)
.with_thread_names(true)
.with_file(true)
.with_line_number(true);
// Configure registry to avoid repeated calls to filter methods
let level_filter = switch_level(logger_level);
let registry = tracing_subscriber::registry()
.with(level_filter)
.with(OpenTelemetryLayer::new(tracer))
.with(MetricsLayer::new(meter_provider.clone()))
.with(otel_layer)
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(logger_level)));
// Creating a formatting layer with explicit type to avoid type mismatches
let fmt_layer = fmt_layer.with_filter(
EnvFilter::new(logger_level).add_directive(
format!("opentelemetry={}", if config.endpoint.is_empty() { logger_level } else { "off" })
.parse()
.unwrap(),
),
);
// configure the formatting layer
let enable_color = std::io::stdout().is_terminal();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(enable_color)
.with_thread_names(true)
.with_file(true)
.with_line_number(true)
.with_filter(
EnvFilter::new(logger_level).add_directive(
format!("opentelemetry={}", if endpoint.is_empty() { logger_level } else { "off" })
.parse()
.unwrap(),
),
);
registry.with(ErrorLayer::default()).with(fmt_layer).init();
if !config.endpoint.is_empty() {
info!(
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {}",
config.endpoint, logger_level
);
registry.with(ErrorLayer::default()).with(fmt_layer).init();
if !endpoint.is_empty() {
info!(
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {}",
endpoint, logger_level
);
}
}
OtelGuard {
@@ -309,12 +268,13 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
/// Switch log level
fn switch_level(logger_level: &str) -> tracing_subscriber::filter::LevelFilter {
use tracing_subscriber::filter::LevelFilter;
match logger_level {
"error" => tracing_subscriber::filter::LevelFilter::ERROR,
"warn" => tracing_subscriber::filter::LevelFilter::WARN,
"info" => tracing_subscriber::filter::LevelFilter::INFO,
"debug" => tracing_subscriber::filter::LevelFilter::DEBUG,
"trace" => tracing_subscriber::filter::LevelFilter::TRACE,
_ => tracing_subscriber::filter::LevelFilter::OFF,
"error" => LevelFilter::ERROR,
"warn" => LevelFilter::WARN,
"info" => LevelFilter::INFO,
"debug" => LevelFilter::DEBUG,
"trace" => LevelFilter::TRACE,
_ => LevelFilter::OFF,
}
}

View File

@@ -46,12 +46,13 @@ local-ip-address = { workspace = true }
matchit = { workspace = true }
mime.workspace = true
mime_guess = { workspace = true }
opentelemetry = { workspace = true }
pin-project-lite.workspace = true
protos.workspace = true
query = { workspace = true }
rmp-serde.workspace = true
rustfs-event-notifier = { workspace = true }
rustfs-obs = { workspace = true }
rustfs-obs = { workspace = true, features = ["gpu"] }
rustls.workspace = true
rustls-pemfile.workspace = true
rustls-pki-types.workspace = true

View File

@@ -47,7 +47,7 @@ use iam::init_iam_sys;
use license::init_license;
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_event_notifier::NotifierConfig;
use rustfs_obs::{init_obs, load_config, set_global_guard, InitLogStatus};
use rustfs_obs::{init_obs, init_system_metrics, load_config, set_global_guard, InitLogStatus};
use rustls::ServerConfig;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
use service::hybrid;
@@ -123,6 +123,16 @@ async fn main() -> Result<()> {
info!("event_config is empty");
}
let meter = opentelemetry::global::meter("system");
let _ = init_system_metrics(meter).await;
// If GPU function is enabled, specific functions can be used
#[cfg(feature = "gpu")]
{
let gpu_meter = opentelemetry::global::meter("system.gpu");
let _ = rustfs_obs::init_gpu_metrics(gpu_meter).await;
}
// Run parameters
run(opt).await
}