From 51584986e1ec0f4b5a7d239a97bfc5841cde0b88 Mon Sep 17 00:00:00 2001 From: houseme Date: Fri, 14 Nov 2025 00:50:07 +0800 Subject: [PATCH] feat(obs): unify metrics initialization and fix exporter move error (#851) * feat(obs): unify metrics initialization and fix exporter move error - Fix Rust E0382 (use after move) by removing duplicate MetricExporter consumption. - Consolidate MeterProvider construction into single Recorder builder path. - Remove redundant Recorder::builder(...).install_global() call. - Ensure PeriodicReader setup is performed only once (HTTP + optional stdout). - Set global meter provider and metrics recorder exactly once. - Preserve existing behavior for stdout/file vs HTTP modes. - Minor cleanup: consistent resource reuse and interval handling. * update telemetry.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix * fix * fix * fix: modify logger level from error to event --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- Cargo.lock | 87 ++------- Cargo.toml | 1 - crates/notify/src/integration.rs | 4 +- crates/obs/Cargo.toml | 1 - crates/obs/src/error.rs | 75 +++++++ crates/obs/src/global.rs | 41 +--- crates/obs/src/lib.rs | 6 +- crates/obs/src/recorder.rs | 323 +++++++++++++++++++++++++++++++ crates/obs/src/system/mod.rs | 4 +- crates/obs/src/telemetry.rs | 167 +++++++--------- rustfs/src/server/audit.rs | 4 +- rustfs/src/server/event.rs | 6 +- 12 files changed, 502 insertions(+), 217 deletions(-) create mode 100644 crates/obs/src/error.rs create mode 100644 crates/obs/src/recorder.rs diff --git a/Cargo.lock b/Cargo.lock index d1a999c4..8b680915 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2980,27 +2980,6 @@ dependencies = [ "syn 2.0.110", ] -[[package]] -name = "derive_more" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" -dependencies = [ - "derive_more-impl", -] - -[[package]] -name = "derive_more-impl" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.110", - "unicode-xid", -] - [[package]] name = "diff" version = "0.1.13" @@ -5080,18 +5059,6 @@ dependencies = [ "portable-atomic", ] -[[package]] -name = "metrics-exporter-opentelemetry" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85831c590c74bde49aac410386630e60f216a7d9473726708da160c5303ef803" -dependencies = [ - "derive_more", - "metrics", - "opentelemetry 0.30.0", - "opentelemetry_sdk 0.30.0", -] - [[package]] name = "mimalloc" version = "0.1.48" @@ -5555,20 +5522,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" -[[package]] -name = "opentelemetry" -version = "0.30.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" -dependencies = [ - "futures-core", - "futures-sink", - "js-sys", - "pin-project-lite", - "thiserror 2.0.17", - "tracing", -] - [[package]] name = "opentelemetry" version = "0.31.0" @@ -5589,7 +5542,7 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef6a1ac5ca3accf562b8c306fa8483c85f4390f768185ab775f242f7fe8fdcc2" dependencies = [ - "opentelemetry 0.31.0", + "opentelemetry", "tracing", "tracing-core", "tracing-log", @@ -5606,7 +5559,7 @@ dependencies = [ "async-trait", "bytes", "http 1.3.1", - "opentelemetry 0.31.0", + "opentelemetry", "reqwest", ] @@ -5617,10 +5570,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" dependencies = [ "http 1.3.1", - "opentelemetry 0.31.0", + "opentelemetry", "opentelemetry-http", "opentelemetry-proto", - "opentelemetry_sdk 0.31.0", + "opentelemetry_sdk", "prost 0.14.1", "reqwest", "thiserror 2.0.17", @@ -5634,8 +5587,8 @@ version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" dependencies = [ - "opentelemetry 0.31.0", - "opentelemetry_sdk 0.31.0", + "opentelemetry", + "opentelemetry_sdk", "prost 0.14.1", "tonic", "tonic-prost", @@ -5654,21 +5607,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc8887887e169414f637b18751487cce4e095be787d23fad13c454e2fb1b3811" dependencies = [ "chrono", - "opentelemetry 0.31.0", - "opentelemetry_sdk 0.31.0", -] - -[[package]] -name = "opentelemetry_sdk" -version = "0.30.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" -dependencies = [ - "futures-channel", - "futures-executor", - "futures-util", - "opentelemetry 0.30.0", - "thiserror 2.0.17", + "opentelemetry", + "opentelemetry_sdk", ] [[package]] @@ -5680,7 +5620,7 @@ dependencies = [ "futures-channel", "futures-executor", "futures-util", - "opentelemetry 0.31.0", + "opentelemetry", "percent-encoding", "rand 0.9.2", "thiserror 2.0.17", @@ -7430,15 +7370,14 @@ version = "0.0.5" dependencies = [ "flexi_logger", "metrics", - "metrics-exporter-opentelemetry", "nu-ansi-term", "nvml-wrapper", - "opentelemetry 0.31.0", + "opentelemetry", "opentelemetry-appender-tracing", "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry-stdout", - "opentelemetry_sdk 0.31.0", + "opentelemetry_sdk", "rustfs-config", "rustfs-utils", "serde", @@ -9424,8 +9363,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e" dependencies = [ "js-sys", - "opentelemetry 0.31.0", - "opentelemetry_sdk 0.31.0", + "opentelemetry", + "opentelemetry_sdk", "rustversion", "smallvec", "thiserror 2.0.17", diff --git a/Cargo.toml b/Cargo.toml index 4c1e8627..8fc40c35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -204,7 +204,6 @@ matchit = "0.9.0" md-5 = "0.11.0-rc.3" md5 = "0.8.0" metrics = "0.24.2" -metrics-exporter-opentelemetry = "0.1.2" mime_guess = "2.0.5" moka = { version = "0.12.11", features = ["future"] } netif = "0.1.6" diff --git a/crates/notify/src/integration.rs b/crates/notify/src/integration.rs index ef661706..4afa0145 100644 --- a/crates/notify/src/integration.rs +++ b/crates/notify/src/integration.rs @@ -140,7 +140,7 @@ impl NotificationSystem { info!("Initializing target: {}", target.id()); // Initialize the target if let Err(e) = target.init().await { - error!("Target {} Initialization failed:{}", target.id(), e); + warn!("Target {} Initialization failed:{}", target.id(), e); continue; } debug!("Target {} initialized successfully,enabled:{}", target_id, target.is_enabled()); @@ -422,7 +422,7 @@ impl NotificationSystem { if !e.to_string().contains("ARN not found") { return Err(NotificationError::BucketNotification(e.to_string())); } else { - error!("{}", e); + error!("config validate failed, err: {}", e); } } diff --git a/crates/obs/Cargo.toml b/crates/obs/Cargo.toml index b27ae42b..01fcb3e1 100644 --- a/crates/obs/Cargo.toml +++ b/crates/obs/Cargo.toml @@ -38,7 +38,6 @@ rustfs-config = { workspace = true, features = ["constants", "observability"] } rustfs-utils = { workspace = true, features = ["ip", "path"] } flexi_logger = { workspace = true } metrics = { workspace = true } -metrics-exporter-opentelemetry = { workspace = true } nu-ansi-term = { workspace = true } nvml-wrapper = { workspace = true, optional = true } opentelemetry = { workspace = true } diff --git a/crates/obs/src/error.rs b/crates/obs/src/error.rs new file mode 100644 index 00000000..c3dcff80 --- /dev/null +++ b/crates/obs/src/error.rs @@ -0,0 +1,75 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::OtelGuard; +use std::sync::{Arc, Mutex}; +use tokio::sync::SetError; + +/// Error type for global guard operations +#[derive(Debug, thiserror::Error)] +pub enum GlobalError { + /// Occurs when attempting to set a global recorder (e.g., via [`crate::Recorder::install_global`] or [`metrics::set_global_recorder`]) + /// but a global recorder is already initialized. + /// + /// [`crate::Recorder::install_global`]: crate::Recorder::install_global + /// [`metrics::set_global_recorder`]: https://docs.rs/metrics/latest/metrics/fn.set_global_recorder.html + #[error("Failed to set a global recorder: {0}")] + SetRecorder(#[from] metrics::SetRecorderError), + #[error("Failed to set global guard: {0}")] + SetError(#[from] SetError>>), + #[error("Global guard not initialized")] + NotInitialized, + #[error("Global system metrics err: {0}")] + MetricsError(String), + #[error("Failed to get current PID: {0}")] + PidError(String), + #[error("Process with PID {0} not found")] + ProcessNotFound(u32), + #[error("Failed to get physical core count")] + CoreCountError, + #[error("GPU initialization failed: {0}")] + GpuInitError(String), + #[error("GPU device not found: {0}")] + GpuDeviceError(String), + #[error("Failed to send log: {0}")] + SendFailed(&'static str), + #[error("Operation timed out: {0}")] + Timeout(&'static str), + #[error("Telemetry initialization failed: {0}")] + TelemetryError(#[from] TelemetryError), +} + +#[derive(Debug, thiserror::Error)] +pub enum TelemetryError { + #[error("Span exporter build failed: {0}")] + BuildSpanExporter(String), + #[error("Metric exporter build failed: {0}")] + BuildMetricExporter(String), + #[error("Log exporter build failed: {0}")] + BuildLogExporter(String), + #[error("Install metrics recorder failed: {0}")] + InstallMetricsRecorder(String), + #[error("Tracing subscriber init failed: {0}")] + SubscriberInit(String), + #[error("I/O error: {0}")] + Io(String), + #[error("Set permissions failed: {0}")] + SetPermissions(String), +} + +impl From for TelemetryError { + fn from(e: std::io::Error) -> Self { + TelemetryError::Io(e.to_string()) + } +} diff --git a/crates/obs/src/global.rs b/crates/obs/src/global.rs index 7eade486..dc69185c 100644 --- a/crates/obs/src/global.rs +++ b/crates/obs/src/global.rs @@ -12,47 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{AppConfig, OtelGuard, SystemObserver, TelemetryError, telemetry::init_telemetry}; +use crate::{AppConfig, GlobalError, OtelGuard, SystemObserver, telemetry::init_telemetry}; use std::sync::{Arc, Mutex}; -use tokio::sync::{OnceCell, SetError}; +use tokio::sync::OnceCell; use tracing::{error, info}; /// Global guard for OpenTelemetry tracing static GLOBAL_GUARD: OnceCell>> = OnceCell::const_new(); -/// Flag indicating if observability is enabled -pub(crate) static IS_OBSERVABILITY_ENABLED: OnceCell = OnceCell::const_new(); +/// Flag indicating if observability metric is enabled +pub(crate) static OBSERVABILITY_METRIC_ENABLED: OnceCell = OnceCell::const_new(); -/// Check whether Observability is enabled -pub fn is_observability_enabled() -> bool { - IS_OBSERVABILITY_ENABLED.get().copied().unwrap_or(false) -} - -/// Error type for global guard operations -#[derive(Debug, thiserror::Error)] -pub enum GlobalError { - #[error("Failed to set global guard: {0}")] - SetError(#[from] SetError>>), - #[error("Global guard not initialized")] - NotInitialized, - #[error("Global system metrics err: {0}")] - MetricsError(String), - #[error("Failed to get current PID: {0}")] - PidError(String), - #[error("Process with PID {0} not found")] - ProcessNotFound(u32), - #[error("Failed to get physical core count")] - CoreCountError, - #[error("GPU initialization failed: {0}")] - GpuInitError(String), - #[error("GPU device not found: {0}")] - GpuDeviceError(String), - #[error("Failed to send log: {0}")] - SendFailed(&'static str), - #[error("Operation timed out: {0}")] - Timeout(&'static str), - #[error("Telemetry initialization failed: {0}")] - TelemetryError(#[from] TelemetryError), +/// Check whether Observability metric is enabled +pub fn observability_metric_enabled() -> bool { + OBSERVABILITY_METRIC_ENABLED.get().copied().unwrap_or(false) } /// Initialize the observability module diff --git a/crates/obs/src/lib.rs b/crates/obs/src/lib.rs index 6ea59552..3d749d78 100644 --- a/crates/obs/src/lib.rs +++ b/crates/obs/src/lib.rs @@ -54,13 +54,17 @@ /// # } /// ``` mod config; +mod error; mod global; mod metrics; +mod recorder; mod system; mod telemetry; pub use config::*; +pub use error::*; pub use global::*; pub use metrics::*; +pub use recorder::*; pub use system::SystemObserver; -pub use telemetry::{OtelGuard, TelemetryError}; +pub use telemetry::OtelGuard; diff --git a/crates/obs/src/recorder.rs b/crates/obs/src/recorder.rs new file mode 100644 index 00000000..92d69ead --- /dev/null +++ b/crates/obs/src/recorder.rs @@ -0,0 +1,323 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::GlobalError; +use metrics::{Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn, Key, KeyName, Metadata, SharedString, Unit}; +use opentelemetry::{ + InstrumentationScope, InstrumentationScopeBuilder, KeyValue, global, + metrics::{Meter, MeterProvider}, +}; +use opentelemetry_sdk::metrics::{MeterProviderBuilder, SdkMeterProvider}; +use std::{ + borrow::Cow, + collections::HashMap, + ops::Deref, + sync::{ + Arc, Mutex, + atomic::{AtomicU64, Ordering}, + }, +}; + +/// A builder for constructing a [`Recorder`]. +#[derive(Debug)] +pub struct Builder { + builder: MeterProviderBuilder, + scope: InstrumentationScopeBuilder, +} + +impl Builder { + /// Runs the closure (`f`) to modify the [`MeterProviderBuilder`] to build a + /// [`MeterProvider`](MeterProvider). + pub fn with_meter_provider(mut self, f: impl FnOnce(MeterProviderBuilder) -> MeterProviderBuilder) -> Self { + self.builder = f(self.builder); + self + } + + /// Modify the [`InstrumentationScope`] to provide additional metadata from the + /// closure (`f`). + pub fn with_instrumentation_scope( + mut self, + f: impl FnOnce(InstrumentationScopeBuilder) -> InstrumentationScopeBuilder, + ) -> Self { + self.scope = f(self.scope); + self + } + + /// Consumes the builder and builds a new [`Recorder`] and returns + /// a [`SdkMeterProvider`]. + /// + /// A [`SdkMeterProvider`] is provided so you have the responsibility to + /// do whatever you need to do with it. + /// + /// This will not install the recorder as the global recorder for + /// the [`metrics`] crate, use [`Builder::install`]. This will not install a meter + /// provider to [`global`], use [`Builder::install_global`]. + pub fn build(self) -> (SdkMeterProvider, Recorder) { + let provider = self.builder.build(); + let meter = provider.meter_with_scope(self.scope.build()); + + ( + provider, + Recorder { + meter, + metrics_metadata: Arc::new(Mutex::new(HashMap::new())), + }, + ) + } + + /// Builds a [`Recorder`] and sets it as the global recorder for the [`metrics`] + /// crate. + /// + /// This method will not call [`global::set_meter_provider`] for OpenTelemetry and + /// will be returned as the first element in the return's type tuple. + pub fn install(self) -> Result<(SdkMeterProvider, Recorder), GlobalError> { + let (provider, recorder) = self.build(); + metrics::set_global_recorder(recorder.clone())?; + + Ok((provider, recorder)) + } + + /// Builds the [`Recorder`] to record metrics to OpenTelemetry, set the global + /// recorder for the [`metrics`] crate, and calls [`global::set_meter_provider`] + /// to set the constructed [`SdkMeterProvider`]. + pub fn install_global(self) -> Result { + let (provider, recorder) = self.install()?; + global::set_meter_provider(provider); + + Ok(recorder) + } +} + +#[derive(Debug)] +struct MetricMetadata { + unit: Option, + description: SharedString, +} + +/// A standard recorder that implements [`metrics::Recorder`]. +/// +/// This instance implements [`Deref`]\, so +/// you can still interact with the SDK's initialized [`Meter`] instance. +#[derive(Debug, Clone)] +pub struct Recorder { + meter: Meter, + metrics_metadata: Arc>>, +} + +impl Recorder { + /// Creates a new [`Builder`] with a given name for instrumentation. + pub fn builder>>(name: S) -> Builder { + Builder { + builder: MeterProviderBuilder::default(), + scope: InstrumentationScope::builder(name.into()), + } + } + + /// Creates a [`Recorder`] with an already established [`Meter`]. + pub fn with_meter(meter: Meter) -> Self { + Recorder { + meter, + metrics_metadata: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +impl Deref for Recorder { + type Target = Meter; + + fn deref(&self) -> &Self::Target { + &self.meter + } +} + +impl metrics::Recorder for Recorder { + fn describe_counter(&self, key: KeyName, unit: Option, description: SharedString) { + let mut metrics_metadata = self.metrics_metadata.lock().unwrap(); + metrics_metadata.insert(key, MetricMetadata { unit, description }); + } + + fn describe_gauge(&self, key: KeyName, unit: Option, description: SharedString) { + let mut metrics_metadata = self.metrics_metadata.lock().unwrap(); + metrics_metadata.insert(key, MetricMetadata { unit, description }); + } + + fn describe_histogram(&self, key: KeyName, unit: Option, description: SharedString) { + let mut metrics_metadata = self.metrics_metadata.lock().unwrap(); + metrics_metadata.insert(key, MetricMetadata { unit, description }); + } + + fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter { + let mut builder = self.meter.u64_counter(key.name().to_owned()); + if let Some(metadata) = self.metrics_metadata.lock().unwrap().remove(key.name()) { + if let Some(unit) = metadata.unit { + builder = builder.with_unit(unit.as_canonical_label()); + } + builder = builder.with_description(metadata.description.to_string()); + } + + let counter = builder.build(); + let labels = key + .labels() + .map(|label| KeyValue::new(label.key().to_owned(), label.value().to_owned())) + .collect(); + + Counter::from_arc(Arc::new(WrappedCounter { + counter, + labels, + value: AtomicU64::new(0), + })) + } + + fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge { + let mut builder = self.meter.f64_gauge(key.name().to_owned()); + if let Some(metadata) = self.metrics_metadata.lock().unwrap().remove(key.name()) { + if let Some(unit) = metadata.unit { + builder = builder.with_unit(unit.as_canonical_label()); + } + builder = builder.with_description(metadata.description.to_string()); + } + + let gauge = builder.build(); + let labels = key + .labels() + .map(|label| KeyValue::new(label.key().to_owned(), label.value().to_owned())) + .collect(); + + Gauge::from_arc(Arc::new(WrappedGauge { + gauge, + labels, + value: AtomicU64::new(0), + })) + } + + fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram { + let mut builder = self.meter.f64_histogram(key.name().to_owned()); + if let Some(metadata) = self.metrics_metadata.lock().unwrap().remove(key.name()) { + if let Some(unit) = metadata.unit { + builder = builder.with_unit(unit.as_canonical_label()); + } + builder = builder.with_description(metadata.description.to_string()); + } + + let histogram = builder.build(); + let labels = key + .labels() + .map(|label| KeyValue::new(label.key().to_owned(), label.value().to_owned())) + .collect(); + + Histogram::from_arc(Arc::new(WrappedHistogram { histogram, labels })) + } +} + +struct WrappedCounter { + counter: opentelemetry::metrics::Counter, + labels: Vec, + value: AtomicU64, +} + +impl CounterFn for WrappedCounter { + fn increment(&self, value: u64) { + self.value.fetch_add(value, Ordering::Relaxed); + self.counter.add(value, &self.labels); + } + + fn absolute(&self, value: u64) { + let prev = self.value.swap(value, Ordering::Relaxed); + let diff = value.saturating_sub(prev); + self.counter.add(diff, &self.labels); + } +} + +struct WrappedGauge { + gauge: opentelemetry::metrics::Gauge, + labels: Vec, + value: AtomicU64, +} + +impl GaugeFn for WrappedGauge { + fn increment(&self, value: f64) { + let mut current = self.value.load(Ordering::Relaxed); + let mut new = f64::from_bits(current) + value; + while let Err(val) = self + .value + .compare_exchange(current, new.to_bits(), Ordering::AcqRel, Ordering::Relaxed) + { + current = val; + new = f64::from_bits(current) + value; + } + + self.gauge.record(new, &self.labels); + } + + fn decrement(&self, value: f64) { + let mut current = self.value.load(Ordering::Relaxed); + let mut new = f64::from_bits(current) - value; + while let Err(val) = self + .value + .compare_exchange(current, new.to_bits(), Ordering::AcqRel, Ordering::Relaxed) + { + current = val; + new = f64::from_bits(current) - value; + } + + self.gauge.record(new, &self.labels); + } + + fn set(&self, value: f64) { + self.value.store(value.to_bits(), Ordering::Relaxed); + self.gauge.record(value, &self.labels); + } +} + +struct WrappedHistogram { + histogram: opentelemetry::metrics::Histogram, + labels: Vec, +} + +impl HistogramFn for WrappedHistogram { + fn record(&self, value: f64) { + self.histogram.record(value, &self.labels); + } + + fn record_many(&self, value: f64, count: usize) { + for _ in 0..count { + self.histogram.record(value, &self.labels); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use opentelemetry_sdk::metrics::Temporality; + + #[test] + fn standard_usage() { + let exporter = opentelemetry_stdout::MetricExporterBuilder::default() + .with_temporality(Temporality::Cumulative) + .build(); + + let (provider, recorder) = Recorder::builder("my-app") + .with_meter_provider(|builder| builder.with_periodic_exporter(exporter)) + .build(); + + global::set_meter_provider(provider.clone()); + metrics::set_global_recorder(recorder).unwrap(); + + let counter = metrics::counter!("my-counter"); + counter.increment(1); + + provider.force_flush().unwrap(); + } +} diff --git a/crates/obs/src/system/mod.rs b/crates/obs/src/system/mod.rs index 7174c11e..4050465e 100644 --- a/crates/obs/src/system/mod.rs +++ b/crates/obs/src/system/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{GlobalError, is_observability_enabled}; +use crate::{GlobalError, observability_metric_enabled}; use opentelemetry::{global::meter, metrics::Meter}; use sysinfo::Pid; @@ -29,7 +29,7 @@ impl SystemObserver { /// This function will create a new `Collector` instance and start collecting metrics. /// It will run indefinitely until the process is terminated. pub async fn init_process_observer() -> Result<(), GlobalError> { - if is_observability_enabled() { + if observability_metric_enabled() { let meter = meter("system"); let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?; return SystemObserver::init_process_observer_for_pid(meter, pid).await; diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index 29e8d10d..e52ec230 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -13,45 +13,44 @@ // limitations under the License. use crate::config::OtelConfig; -use crate::global::IS_OBSERVABILITY_ENABLED; +use crate::global::OBSERVABILITY_METRIC_ENABLED; +use crate::{Recorder, TelemetryError}; use flexi_logger::{DeferredNow, Record, WriteMode, WriteMode::AsyncWith, style}; use metrics::counter; use nu_ansi_term::Color; -use opentelemetry::trace::TracerProvider; -use opentelemetry::{KeyValue, global}; +use opentelemetry::{KeyValue, global, trace::TracerProvider}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::{Compression, Protocol, WithExportConfig, WithHttpConfig}; use opentelemetry_sdk::{ Resource, logs::SdkLoggerProvider, - metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider}, + metrics::{PeriodicReader, SdkMeterProvider}, trace::{RandomIdGenerator, Sampler, SdkTracerProvider}, }; use opentelemetry_semantic_conventions::{ SCHEMA_URL, attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION}, }; -use rustfs_config::observability::{ENV_OBS_LOG_FLUSH_MS, ENV_OBS_LOG_MESSAGE_CAPA, ENV_OBS_LOG_POOL_CAPA}; use rustfs_config::{ APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_OBS_LOG_STDOUT_ENABLED, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, observability::{ DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_POOL_CAPA, - ENV_OBS_LOG_DIRECTORY, + ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FLUSH_MS, ENV_OBS_LOG_MESSAGE_CAPA, ENV_OBS_LOG_POOL_CAPA, }, }; use rustfs_utils::{get_env_u64, get_env_usize, get_local_ip_with_default}; use smallvec::SmallVec; -use std::borrow::Cow; -use std::io::IsTerminal; -use std::time::Duration; -use std::{env, fs}; +use std::{borrow::Cow, env, fs, io::IsTerminal, time::Duration}; use tracing::info; use tracing_error::ErrorLayer; use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; -use tracing_subscriber::fmt::format::FmtSpan; -use tracing_subscriber::fmt::time::LocalTime; -use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt}; +use tracing_subscriber::{ + EnvFilter, Layer, + fmt::{format::FmtSpan, time::LocalTime}, + layer::SubscriberExt, + util::SubscriberInitExt, +}; /// A guard object that manages the lifecycle of OpenTelemetry components. /// @@ -69,9 +68,7 @@ pub struct OtelGuard { tracer_provider: Option, meter_provider: Option, logger_provider: Option, - // Add a flexi_logger handle to keep the logging alive flexi_logger_handles: Option, - // WorkerGuard for writing tracing files tracing_guard: Option, } @@ -112,35 +109,12 @@ impl Drop for OtelGuard { } if let Some(guard) = self.tracing_guard.take() { - // The guard will be dropped here, flushing any remaining logs drop(guard); println!("Tracing guard dropped, flushing logs."); } } } -#[derive(Debug)] -pub enum TelemetryError { - BuildSpanExporter(String), - BuildMetricExporter(String), - BuildLogExporter(String), - InstallMetricsRecorder(String), - SubscriberInit(String), -} - -impl std::fmt::Display for TelemetryError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - TelemetryError::BuildSpanExporter(e) => write!(f, "Span exporter build failed: {e}"), - TelemetryError::BuildMetricExporter(e) => write!(f, "Metric exporter build failed: {e}"), - TelemetryError::BuildLogExporter(e) => write!(f, "Log exporter build failed: {e}"), - TelemetryError::InstallMetricsRecorder(e) => write!(f, "Install metrics recorder failed: {e}"), - TelemetryError::SubscriberInit(e) => write!(f, "Tracing subscriber init failed: {e}"), - } - } -} -impl std::error::Error for TelemetryError {} - /// create OpenTelemetry Resource fn resource(config: &OtelConfig) -> Resource { Resource::builder() @@ -170,16 +144,16 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader Option { +fn get_env_async_with() -> WriteMode { let pool_capa = get_env_usize(ENV_OBS_LOG_POOL_CAPA, DEFAULT_OBS_LOG_POOL_CAPA); let message_capa = get_env_usize(ENV_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_MESSAGE_CAPA); let flush_ms = get_env_u64(ENV_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_FLUSH_MS); - Some(AsyncWith { + AsyncWith { pool_capa, message_capa, flush_interval: Duration::from_millis(flush_ms), - }) + } } fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilter { @@ -200,12 +174,9 @@ fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilte fn format_with_color(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &Record) -> Result<(), std::io::Error> { let level = record.level(); let level_style = style(level); - - // Get the current thread information let binding = std::thread::current(); let thread_name = binding.name().unwrap_or("unnamed"); let thread_id = format!("{:?}", std::thread::current().id()); - writeln!( w, "[{}] {} [{}] [{}:{}] [{}:{}] {}", @@ -224,12 +195,9 @@ fn format_with_color(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: #[inline(never)] fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &Record) -> Result<(), std::io::Error> { let level = record.level(); - - // Get the current thread information let binding = std::thread::current(); let thread_name = binding.name().unwrap_or("unnamed"); let thread_id = format!("{:?}", std::thread::current().id()); - writeln!( w, "[{}] {} [{}] [{}:{}] [{}:{}] {}", @@ -247,9 +215,7 @@ fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &R /// stdout + span information (fix: retain WorkerGuard to avoid releasing after initialization) fn init_stdout_logging(_config: &OtelConfig, logger_level: &str, is_production: bool) -> OtelGuard { let env_filter = build_env_filter(logger_level, None); - let (nb, guard) = tracing_appender::non_blocking(std::io::stdout()); - let enable_color = std::io::stdout().is_terminal(); let fmt_layer = tracing_subscriber::fmt::layer() .with_timer(LocalTime::rfc_3339()) @@ -264,17 +230,15 @@ fn init_stdout_logging(_config: &OtelConfig, logger_level: &str, is_production: .with_current_span(true) .with_span_list(true) .with_span_events(if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL }); - tracing_subscriber::registry() .with(env_filter) .with(ErrorLayer::default()) .with(fmt_layer) .init(); - IS_OBSERVABILITY_ENABLED.set(false).ok(); + OBSERVABILITY_METRIC_ENABLED.set(false).ok(); counter!("rustfs.start.total").increment(1); info!("Init stdout logging (level: {})", logger_level); - OtelGuard { tracer_provider: None, meter_provider: None, @@ -285,27 +249,49 @@ fn init_stdout_logging(_config: &OtelConfig, logger_level: &str, is_production: } /// File rolling log (size switching + number retained) -fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: bool) -> OtelGuard { - use flexi_logger::{ - Age, Cleanup, Criterion, FileSpec, LogSpecification, Naming, - WriteMode::{AsyncWith, BufferAndFlush}, - }; +fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: bool) -> Result { + use flexi_logger::{Age, Cleanup, Criterion, FileSpec, LogSpecification, Naming}; let service_name = config.service_name.as_deref().unwrap_or(APP_NAME); let default_log_directory = rustfs_utils::dirs::get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY); let log_directory = config.log_directory.as_deref().unwrap_or(default_log_directory.as_str()); let log_filename = config.log_filename.as_deref().unwrap_or(service_name); let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES); - if let Err(e) = fs::create_dir_all(log_directory) { - eprintln!("ERROR: create log dir '{}': {e}", log_directory); + return Err(TelemetryError::Io(e.to_string())); } - #[cfg(unix)] { use std::fs::Permissions; use std::os::unix::fs::PermissionsExt; - let _ = fs::set_permissions(log_directory, Permissions::from_mode(0o755)); + let desired: u32 = 0o755; + match fs::metadata(log_directory) { + Ok(meta) => { + let current = meta.permissions().mode() & 0o777; + // Only tighten to 0755 if existing permissions are looser than target, avoid loosening + if (current & !desired) != 0 { + if let Err(e) = fs::set_permissions(log_directory, Permissions::from_mode(desired)) { + return Err(TelemetryError::SetPermissions(format!( + "dir='{}', want={:#o}, have={:#o}, err={}", + log_directory, desired, current, e + ))); + } + // Second verification + if let Ok(meta2) = fs::metadata(log_directory) { + let after = meta2.permissions().mode() & 0o777; + if after != desired { + return Err(TelemetryError::SetPermissions(format!( + "dir='{}', want={:#o}, after={:#o}", + log_directory, desired, after + ))); + } + } + } + } + Err(e) => { + return Err(TelemetryError::Io(format!("stat '{}' failed: {}", log_directory, e))); + } + } } // parsing level @@ -345,25 +331,10 @@ fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: boo }; // write mode - let write_mode = get_env_async_with().unwrap_or(if is_production { - AsyncWith { - pool_capa: DEFAULT_OBS_LOG_POOL_CAPA, - message_capa: DEFAULT_OBS_LOG_MESSAGE_CAPA, - flush_interval: Duration::from_millis(DEFAULT_OBS_LOG_FLUSH_MS), - } - } else { - BufferAndFlush - }); - + let write_mode = get_env_async_with(); // Build let mut builder = flexi_logger::Logger::try_with_env_or_str(logger_level) - .unwrap_or_else(|e| { - if !is_production { - eprintln!("WARNING: Invalid logger configuration '{logger_level}': {e:?}"); - eprintln!("Falling back to default configuration with level: {DEFAULT_LOG_LEVEL}"); - } - flexi_logger::Logger::with(log_spec.clone()) - }) + .unwrap_or(flexi_logger::Logger::with(log_spec.clone())) .format_for_stderr(format_with_color) .format_for_stdout(format_with_color) .format_for_files(format_for_file) @@ -379,7 +350,7 @@ fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: boo .use_utc(); // Optional copy to stdout (for local observation) - if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) { + if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) || !is_production { builder = builder.duplicate_to_stdout(flexi_logger::Duplicate::All); } else { builder = builder.duplicate_to_stdout(flexi_logger::Duplicate::None); @@ -393,20 +364,20 @@ fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: boo } }; - IS_OBSERVABILITY_ENABLED.set(false).ok(); + OBSERVABILITY_METRIC_ENABLED.set(false).ok(); counter!("rustfs.start.total").increment(1); info!( "Init file logging at '{}', roll size {:?}MB, keep {}", log_directory, config.log_rotation_size_mb, keep_files ); - OtelGuard { + Ok(OtelGuard { tracer_provider: None, meter_provider: None, logger_provider: None, flexi_logger_handles: handle, tracing_guard: None, - } + }) } /// Observability (HTTP export, supports three sub-endpoints; if not, fallback to unified endpoint) @@ -464,24 +435,26 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio .build() .map_err(|e| TelemetryError::BuildMetricExporter(e.to_string()))?; let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL); - let mut builder = MeterProviderBuilder::default().with_resource(res.clone()); - builder = builder.with_reader( - PeriodicReader::builder(exporter) - .with_interval(Duration::from_secs(meter_interval)) - .build(), - ); - if use_stdout { - builder = builder.with_reader(create_periodic_reader(meter_interval)); - } - let provider = builder.build(); + let (provider, recorder) = Recorder::builder(service_name.clone()) + .with_meter_provider(|b| { + let b = b.with_resource(res.clone()).with_reader( + PeriodicReader::builder(exporter) + .with_interval(Duration::from_secs(meter_interval)) + .build(), + ); + if use_stdout { + b.with_reader(create_periodic_reader(meter_interval)) + } else { + b + } + }) + .build(); global::set_meter_provider(provider.clone()); + metrics::set_global_recorder(recorder).map_err(|e| TelemetryError::InstallMetricsRecorder(e.to_string()))?; provider }; - // metrics crate -> OTel - let _ = metrics_exporter_opentelemetry::Recorder::builder(service_name.clone()).install_global(); - // Logger(HTTP) let logger_provider = { let exporter = opentelemetry_otlp::LogExporter::builder() @@ -536,7 +509,7 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio .with(MetricsLayer::new(meter_provider.clone())) .init(); - IS_OBSERVABILITY_ENABLED.set(true).ok(); + OBSERVABILITY_METRIC_ENABLED.set(true).ok(); counter!("rustfs.start.total").increment(1); info!( "Init observability (HTTP): trace='{}', metric='{}', log='{}'", @@ -571,7 +544,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> Result AuditResult<()> { info!( target: "rustfs::main::start_audit_system", - "Step 1: Initializing the audit system..." + "Initializing the audit system..." ); // 1. Get the global configuration loaded by ecstore @@ -39,7 +39,7 @@ pub(crate) async fn start_audit_system() -> AuditResult<()> { config.clone() } None => { - error!( + warn!( target: "rustfs::main::start_audit_system", "Audit system initialization failed: Global server configuration not loaded." ); diff --git a/rustfs/src/server/event.rs b/rustfs/src/server/event.rs index d0cf551b..9103fb8f 100644 --- a/rustfs/src/server/event.rs +++ b/rustfs/src/server/event.rs @@ -14,7 +14,7 @@ use rustfs_config::DEFAULT_DELIMITER; use rustfs_ecstore::config::GLOBAL_SERVER_CONFIG; -use tracing::{error, info, instrument}; +use tracing::{error, info, instrument, warn}; /// Shuts down the event notifier system gracefully pub(crate) async fn shutdown_event_notifier() { @@ -28,7 +28,7 @@ pub(crate) async fn shutdown_event_notifier() { let system = match rustfs_notify::notification_system() { Some(sys) => sys, None => { - error!("Event notifier system is not initialized."); + info!("Event notifier system is not initialized."); return; } }; @@ -49,7 +49,7 @@ pub(crate) async fn init_event_notifier() { let server_config = match GLOBAL_SERVER_CONFIG.get() { Some(config) => config.clone(), // Clone the config to pass ownership None => { - error!("Event notifier initialization failed: Global server config not loaded."); + warn!("Event notifier initialization failed: Global server config not loaded."); return; } };