feat(obs): unify metrics initialization and fix exporter move error (#851)

* feat(obs): unify metrics initialization and fix exporter move error

- Fix Rust E0382 (use after move) by removing duplicate MetricExporter consumption.
- Consolidate MeterProvider construction into single Recorder builder path.
- Remove redundant Recorder::builder(...).install_global() call.
- Ensure PeriodicReader setup is performed only once (HTTP + optional stdout).
- Set global meter provider and metrics recorder exactly once.
- Preserve existing behavior for stdout/file vs HTTP modes.
- Minor cleanup: consistent resource reuse and interval handling.

* update telemetry.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix

* fix

* fix

* fix: modify logger level from error to event

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
houseme
2025-11-14 00:50:07 +08:00
committed by GitHub
parent 93090adf7c
commit 51584986e1
12 changed files with 502 additions and 217 deletions

87
Cargo.lock generated
View File

@@ -2980,27 +2980,6 @@ dependencies = [
"syn 2.0.110",
]
[[package]]
name = "derive_more"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678"
dependencies = [
"derive_more-impl",
]
[[package]]
name = "derive_more-impl"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.110",
"unicode-xid",
]
[[package]]
name = "diff"
version = "0.1.13"
@@ -5080,18 +5059,6 @@ dependencies = [
"portable-atomic",
]
[[package]]
name = "metrics-exporter-opentelemetry"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85831c590c74bde49aac410386630e60f216a7d9473726708da160c5303ef803"
dependencies = [
"derive_more",
"metrics",
"opentelemetry 0.30.0",
"opentelemetry_sdk 0.30.0",
]
[[package]]
name = "mimalloc"
version = "0.1.48"
@@ -5555,20 +5522,6 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "opentelemetry"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6"
dependencies = [
"futures-core",
"futures-sink",
"js-sys",
"pin-project-lite",
"thiserror 2.0.17",
"tracing",
]
[[package]]
name = "opentelemetry"
version = "0.31.0"
@@ -5589,7 +5542,7 @@ version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef6a1ac5ca3accf562b8c306fa8483c85f4390f768185ab775f242f7fe8fdcc2"
dependencies = [
"opentelemetry 0.31.0",
"opentelemetry",
"tracing",
"tracing-core",
"tracing-log",
@@ -5606,7 +5559,7 @@ dependencies = [
"async-trait",
"bytes",
"http 1.3.1",
"opentelemetry 0.31.0",
"opentelemetry",
"reqwest",
]
@@ -5617,10 +5570,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf"
dependencies = [
"http 1.3.1",
"opentelemetry 0.31.0",
"opentelemetry",
"opentelemetry-http",
"opentelemetry-proto",
"opentelemetry_sdk 0.31.0",
"opentelemetry_sdk",
"prost 0.14.1",
"reqwest",
"thiserror 2.0.17",
@@ -5634,8 +5587,8 @@ version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f"
dependencies = [
"opentelemetry 0.31.0",
"opentelemetry_sdk 0.31.0",
"opentelemetry",
"opentelemetry_sdk",
"prost 0.14.1",
"tonic",
"tonic-prost",
@@ -5654,21 +5607,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc8887887e169414f637b18751487cce4e095be787d23fad13c454e2fb1b3811"
dependencies = [
"chrono",
"opentelemetry 0.31.0",
"opentelemetry_sdk 0.31.0",
]
[[package]]
name = "opentelemetry_sdk"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b"
dependencies = [
"futures-channel",
"futures-executor",
"futures-util",
"opentelemetry 0.30.0",
"thiserror 2.0.17",
"opentelemetry",
"opentelemetry_sdk",
]
[[package]]
@@ -5680,7 +5620,7 @@ dependencies = [
"futures-channel",
"futures-executor",
"futures-util",
"opentelemetry 0.31.0",
"opentelemetry",
"percent-encoding",
"rand 0.9.2",
"thiserror 2.0.17",
@@ -7430,15 +7370,14 @@ version = "0.0.5"
dependencies = [
"flexi_logger",
"metrics",
"metrics-exporter-opentelemetry",
"nu-ansi-term",
"nvml-wrapper",
"opentelemetry 0.31.0",
"opentelemetry",
"opentelemetry-appender-tracing",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"opentelemetry-stdout",
"opentelemetry_sdk 0.31.0",
"opentelemetry_sdk",
"rustfs-config",
"rustfs-utils",
"serde",
@@ -9424,8 +9363,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e"
dependencies = [
"js-sys",
"opentelemetry 0.31.0",
"opentelemetry_sdk 0.31.0",
"opentelemetry",
"opentelemetry_sdk",
"rustversion",
"smallvec",
"thiserror 2.0.17",

View File

@@ -204,7 +204,6 @@ matchit = "0.9.0"
md-5 = "0.11.0-rc.3"
md5 = "0.8.0"
metrics = "0.24.2"
metrics-exporter-opentelemetry = "0.1.2"
mime_guess = "2.0.5"
moka = { version = "0.12.11", features = ["future"] }
netif = "0.1.6"

View File

@@ -140,7 +140,7 @@ impl NotificationSystem {
info!("Initializing target: {}", target.id());
// Initialize the target
if let Err(e) = target.init().await {
error!("Target {} Initialization failed:{}", target.id(), e);
warn!("Target {} Initialization failed:{}", target.id(), e);
continue;
}
debug!("Target {} initialized successfully,enabled:{}", target_id, target.is_enabled());
@@ -422,7 +422,7 @@ impl NotificationSystem {
if !e.to_string().contains("ARN not found") {
return Err(NotificationError::BucketNotification(e.to_string()));
} else {
error!("{}", e);
error!("config validate failed, err: {}", e);
}
}

View File

@@ -38,7 +38,6 @@ rustfs-config = { workspace = true, features = ["constants", "observability"] }
rustfs-utils = { workspace = true, features = ["ip", "path"] }
flexi_logger = { workspace = true }
metrics = { workspace = true }
metrics-exporter-opentelemetry = { workspace = true }
nu-ansi-term = { workspace = true }
nvml-wrapper = { workspace = true, optional = true }
opentelemetry = { workspace = true }

75
crates/obs/src/error.rs Normal file
View File

@@ -0,0 +1,75 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::OtelGuard;
use std::sync::{Arc, Mutex};
use tokio::sync::SetError;
/// Error type for global guard operations
#[derive(Debug, thiserror::Error)]
pub enum GlobalError {
/// Occurs when attempting to set a global recorder (e.g., via [`crate::Recorder::install_global`] or [`metrics::set_global_recorder`])
/// but a global recorder is already initialized.
///
/// [`crate::Recorder::install_global`]: crate::Recorder::install_global
/// [`metrics::set_global_recorder`]: https://docs.rs/metrics/latest/metrics/fn.set_global_recorder.html
#[error("Failed to set a global recorder: {0}")]
SetRecorder(#[from] metrics::SetRecorderError<crate::Recorder>),
#[error("Failed to set global guard: {0}")]
SetError(#[from] SetError<Arc<Mutex<OtelGuard>>>),
#[error("Global guard not initialized")]
NotInitialized,
#[error("Global system metrics err: {0}")]
MetricsError(String),
#[error("Failed to get current PID: {0}")]
PidError(String),
#[error("Process with PID {0} not found")]
ProcessNotFound(u32),
#[error("Failed to get physical core count")]
CoreCountError,
#[error("GPU initialization failed: {0}")]
GpuInitError(String),
#[error("GPU device not found: {0}")]
GpuDeviceError(String),
#[error("Failed to send log: {0}")]
SendFailed(&'static str),
#[error("Operation timed out: {0}")]
Timeout(&'static str),
#[error("Telemetry initialization failed: {0}")]
TelemetryError(#[from] TelemetryError),
}
#[derive(Debug, thiserror::Error)]
pub enum TelemetryError {
#[error("Span exporter build failed: {0}")]
BuildSpanExporter(String),
#[error("Metric exporter build failed: {0}")]
BuildMetricExporter(String),
#[error("Log exporter build failed: {0}")]
BuildLogExporter(String),
#[error("Install metrics recorder failed: {0}")]
InstallMetricsRecorder(String),
#[error("Tracing subscriber init failed: {0}")]
SubscriberInit(String),
#[error("I/O error: {0}")]
Io(String),
#[error("Set permissions failed: {0}")]
SetPermissions(String),
}
impl From<std::io::Error> for TelemetryError {
fn from(e: std::io::Error) -> Self {
TelemetryError::Io(e.to_string())
}
}

View File

@@ -12,47 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{AppConfig, OtelGuard, SystemObserver, TelemetryError, telemetry::init_telemetry};
use crate::{AppConfig, GlobalError, OtelGuard, SystemObserver, telemetry::init_telemetry};
use std::sync::{Arc, Mutex};
use tokio::sync::{OnceCell, SetError};
use tokio::sync::OnceCell;
use tracing::{error, info};
/// Global guard for OpenTelemetry tracing
static GLOBAL_GUARD: OnceCell<Arc<Mutex<OtelGuard>>> = OnceCell::const_new();
/// Flag indicating if observability is enabled
pub(crate) static IS_OBSERVABILITY_ENABLED: OnceCell<bool> = OnceCell::const_new();
/// Flag indicating if observability metric is enabled
pub(crate) static OBSERVABILITY_METRIC_ENABLED: OnceCell<bool> = OnceCell::const_new();
/// Check whether Observability is enabled
pub fn is_observability_enabled() -> bool {
IS_OBSERVABILITY_ENABLED.get().copied().unwrap_or(false)
}
/// Error type for global guard operations
#[derive(Debug, thiserror::Error)]
pub enum GlobalError {
#[error("Failed to set global guard: {0}")]
SetError(#[from] SetError<Arc<Mutex<OtelGuard>>>),
#[error("Global guard not initialized")]
NotInitialized,
#[error("Global system metrics err: {0}")]
MetricsError(String),
#[error("Failed to get current PID: {0}")]
PidError(String),
#[error("Process with PID {0} not found")]
ProcessNotFound(u32),
#[error("Failed to get physical core count")]
CoreCountError,
#[error("GPU initialization failed: {0}")]
GpuInitError(String),
#[error("GPU device not found: {0}")]
GpuDeviceError(String),
#[error("Failed to send log: {0}")]
SendFailed(&'static str),
#[error("Operation timed out: {0}")]
Timeout(&'static str),
#[error("Telemetry initialization failed: {0}")]
TelemetryError(#[from] TelemetryError),
/// Check whether Observability metric is enabled
pub fn observability_metric_enabled() -> bool {
OBSERVABILITY_METRIC_ENABLED.get().copied().unwrap_or(false)
}
/// Initialize the observability module

View File

@@ -54,13 +54,17 @@
/// # }
/// ```
mod config;
mod error;
mod global;
mod metrics;
mod recorder;
mod system;
mod telemetry;
pub use config::*;
pub use error::*;
pub use global::*;
pub use metrics::*;
pub use recorder::*;
pub use system::SystemObserver;
pub use telemetry::{OtelGuard, TelemetryError};
pub use telemetry::OtelGuard;

323
crates/obs/src/recorder.rs Normal file
View File

@@ -0,0 +1,323 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::GlobalError;
use metrics::{Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn, Key, KeyName, Metadata, SharedString, Unit};
use opentelemetry::{
InstrumentationScope, InstrumentationScopeBuilder, KeyValue, global,
metrics::{Meter, MeterProvider},
};
use opentelemetry_sdk::metrics::{MeterProviderBuilder, SdkMeterProvider};
use std::{
borrow::Cow,
collections::HashMap,
ops::Deref,
sync::{
Arc, Mutex,
atomic::{AtomicU64, Ordering},
},
};
/// A builder for constructing a [`Recorder`].
#[derive(Debug)]
pub struct Builder {
builder: MeterProviderBuilder,
scope: InstrumentationScopeBuilder,
}
impl Builder {
/// Runs the closure (`f`) to modify the [`MeterProviderBuilder`] to build a
/// [`MeterProvider`](MeterProvider).
pub fn with_meter_provider(mut self, f: impl FnOnce(MeterProviderBuilder) -> MeterProviderBuilder) -> Self {
self.builder = f(self.builder);
self
}
/// Modify the [`InstrumentationScope`] to provide additional metadata from the
/// closure (`f`).
pub fn with_instrumentation_scope(
mut self,
f: impl FnOnce(InstrumentationScopeBuilder) -> InstrumentationScopeBuilder,
) -> Self {
self.scope = f(self.scope);
self
}
/// Consumes the builder and builds a new [`Recorder`] and returns
/// a [`SdkMeterProvider`].
///
/// A [`SdkMeterProvider`] is provided so you have the responsibility to
/// do whatever you need to do with it.
///
/// This will not install the recorder as the global recorder for
/// the [`metrics`] crate, use [`Builder::install`]. This will not install a meter
/// provider to [`global`], use [`Builder::install_global`].
pub fn build(self) -> (SdkMeterProvider, Recorder) {
let provider = self.builder.build();
let meter = provider.meter_with_scope(self.scope.build());
(
provider,
Recorder {
meter,
metrics_metadata: Arc::new(Mutex::new(HashMap::new())),
},
)
}
/// Builds a [`Recorder`] and sets it as the global recorder for the [`metrics`]
/// crate.
///
/// This method will not call [`global::set_meter_provider`] for OpenTelemetry and
/// will be returned as the first element in the return's type tuple.
pub fn install(self) -> Result<(SdkMeterProvider, Recorder), GlobalError> {
let (provider, recorder) = self.build();
metrics::set_global_recorder(recorder.clone())?;
Ok((provider, recorder))
}
/// Builds the [`Recorder`] to record metrics to OpenTelemetry, set the global
/// recorder for the [`metrics`] crate, and calls [`global::set_meter_provider`]
/// to set the constructed [`SdkMeterProvider`].
pub fn install_global(self) -> Result<Recorder, GlobalError> {
let (provider, recorder) = self.install()?;
global::set_meter_provider(provider);
Ok(recorder)
}
}
#[derive(Debug)]
struct MetricMetadata {
unit: Option<Unit>,
description: SharedString,
}
/// A standard recorder that implements [`metrics::Recorder`].
///
/// This instance implements <code>[`Deref`]\<Target = [`Meter`]\></code>, so
/// you can still interact with the SDK's initialized [`Meter`] instance.
#[derive(Debug, Clone)]
pub struct Recorder {
meter: Meter,
metrics_metadata: Arc<Mutex<HashMap<KeyName, MetricMetadata>>>,
}
impl Recorder {
/// Creates a new [`Builder`] with a given name for instrumentation.
pub fn builder<S: Into<Cow<'static, str>>>(name: S) -> Builder {
Builder {
builder: MeterProviderBuilder::default(),
scope: InstrumentationScope::builder(name.into()),
}
}
/// Creates a [`Recorder`] with an already established [`Meter`].
pub fn with_meter(meter: Meter) -> Self {
Recorder {
meter,
metrics_metadata: Arc::new(Mutex::new(HashMap::new())),
}
}
}
impl Deref for Recorder {
type Target = Meter;
fn deref(&self) -> &Self::Target {
&self.meter
}
}
impl metrics::Recorder for Recorder {
fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
let mut metrics_metadata = self.metrics_metadata.lock().unwrap();
metrics_metadata.insert(key, MetricMetadata { unit, description });
}
fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
let mut metrics_metadata = self.metrics_metadata.lock().unwrap();
metrics_metadata.insert(key, MetricMetadata { unit, description });
}
fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
let mut metrics_metadata = self.metrics_metadata.lock().unwrap();
metrics_metadata.insert(key, MetricMetadata { unit, description });
}
fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter {
let mut builder = self.meter.u64_counter(key.name().to_owned());
if let Some(metadata) = self.metrics_metadata.lock().unwrap().remove(key.name()) {
if let Some(unit) = metadata.unit {
builder = builder.with_unit(unit.as_canonical_label());
}
builder = builder.with_description(metadata.description.to_string());
}
let counter = builder.build();
let labels = key
.labels()
.map(|label| KeyValue::new(label.key().to_owned(), label.value().to_owned()))
.collect();
Counter::from_arc(Arc::new(WrappedCounter {
counter,
labels,
value: AtomicU64::new(0),
}))
}
fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge {
let mut builder = self.meter.f64_gauge(key.name().to_owned());
if let Some(metadata) = self.metrics_metadata.lock().unwrap().remove(key.name()) {
if let Some(unit) = metadata.unit {
builder = builder.with_unit(unit.as_canonical_label());
}
builder = builder.with_description(metadata.description.to_string());
}
let gauge = builder.build();
let labels = key
.labels()
.map(|label| KeyValue::new(label.key().to_owned(), label.value().to_owned()))
.collect();
Gauge::from_arc(Arc::new(WrappedGauge {
gauge,
labels,
value: AtomicU64::new(0),
}))
}
fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram {
let mut builder = self.meter.f64_histogram(key.name().to_owned());
if let Some(metadata) = self.metrics_metadata.lock().unwrap().remove(key.name()) {
if let Some(unit) = metadata.unit {
builder = builder.with_unit(unit.as_canonical_label());
}
builder = builder.with_description(metadata.description.to_string());
}
let histogram = builder.build();
let labels = key
.labels()
.map(|label| KeyValue::new(label.key().to_owned(), label.value().to_owned()))
.collect();
Histogram::from_arc(Arc::new(WrappedHistogram { histogram, labels }))
}
}
struct WrappedCounter {
counter: opentelemetry::metrics::Counter<u64>,
labels: Vec<KeyValue>,
value: AtomicU64,
}
impl CounterFn for WrappedCounter {
fn increment(&self, value: u64) {
self.value.fetch_add(value, Ordering::Relaxed);
self.counter.add(value, &self.labels);
}
fn absolute(&self, value: u64) {
let prev = self.value.swap(value, Ordering::Relaxed);
let diff = value.saturating_sub(prev);
self.counter.add(diff, &self.labels);
}
}
struct WrappedGauge {
gauge: opentelemetry::metrics::Gauge<f64>,
labels: Vec<KeyValue>,
value: AtomicU64,
}
impl GaugeFn for WrappedGauge {
fn increment(&self, value: f64) {
let mut current = self.value.load(Ordering::Relaxed);
let mut new = f64::from_bits(current) + value;
while let Err(val) = self
.value
.compare_exchange(current, new.to_bits(), Ordering::AcqRel, Ordering::Relaxed)
{
current = val;
new = f64::from_bits(current) + value;
}
self.gauge.record(new, &self.labels);
}
fn decrement(&self, value: f64) {
let mut current = self.value.load(Ordering::Relaxed);
let mut new = f64::from_bits(current) - value;
while let Err(val) = self
.value
.compare_exchange(current, new.to_bits(), Ordering::AcqRel, Ordering::Relaxed)
{
current = val;
new = f64::from_bits(current) - value;
}
self.gauge.record(new, &self.labels);
}
fn set(&self, value: f64) {
self.value.store(value.to_bits(), Ordering::Relaxed);
self.gauge.record(value, &self.labels);
}
}
struct WrappedHistogram {
histogram: opentelemetry::metrics::Histogram<f64>,
labels: Vec<KeyValue>,
}
impl HistogramFn for WrappedHistogram {
fn record(&self, value: f64) {
self.histogram.record(value, &self.labels);
}
fn record_many(&self, value: f64, count: usize) {
for _ in 0..count {
self.histogram.record(value, &self.labels);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use opentelemetry_sdk::metrics::Temporality;
#[test]
fn standard_usage() {
let exporter = opentelemetry_stdout::MetricExporterBuilder::default()
.with_temporality(Temporality::Cumulative)
.build();
let (provider, recorder) = Recorder::builder("my-app")
.with_meter_provider(|builder| builder.with_periodic_exporter(exporter))
.build();
global::set_meter_provider(provider.clone());
metrics::set_global_recorder(recorder).unwrap();
let counter = metrics::counter!("my-counter");
counter.increment(1);
provider.force_flush().unwrap();
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{GlobalError, is_observability_enabled};
use crate::{GlobalError, observability_metric_enabled};
use opentelemetry::{global::meter, metrics::Meter};
use sysinfo::Pid;
@@ -29,7 +29,7 @@ impl SystemObserver {
/// This function will create a new `Collector` instance and start collecting metrics.
/// It will run indefinitely until the process is terminated.
pub async fn init_process_observer() -> Result<(), GlobalError> {
if is_observability_enabled() {
if observability_metric_enabled() {
let meter = meter("system");
let pid = sysinfo::get_current_pid().map_err(|e| GlobalError::PidError(e.to_string()))?;
return SystemObserver::init_process_observer_for_pid(meter, pid).await;

View File

@@ -13,45 +13,44 @@
// limitations under the License.
use crate::config::OtelConfig;
use crate::global::IS_OBSERVABILITY_ENABLED;
use crate::global::OBSERVABILITY_METRIC_ENABLED;
use crate::{Recorder, TelemetryError};
use flexi_logger::{DeferredNow, Record, WriteMode, WriteMode::AsyncWith, style};
use metrics::counter;
use nu_ansi_term::Color;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{KeyValue, global};
use opentelemetry::{KeyValue, global, trace::TracerProvider};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{Compression, Protocol, WithExportConfig, WithHttpConfig};
use opentelemetry_sdk::{
Resource,
logs::SdkLoggerProvider,
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
metrics::{PeriodicReader, SdkMeterProvider},
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
};
use opentelemetry_semantic_conventions::{
SCHEMA_URL,
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
};
use rustfs_config::observability::{ENV_OBS_LOG_FLUSH_MS, ENV_OBS_LOG_MESSAGE_CAPA, ENV_OBS_LOG_POOL_CAPA};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_OBS_LOG_STDOUT_ENABLED, ENVIRONMENT, METER_INTERVAL,
SAMPLE_RATIO, SERVICE_VERSION,
observability::{
DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_POOL_CAPA,
ENV_OBS_LOG_DIRECTORY,
ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FLUSH_MS, ENV_OBS_LOG_MESSAGE_CAPA, ENV_OBS_LOG_POOL_CAPA,
},
};
use rustfs_utils::{get_env_u64, get_env_usize, get_local_ip_with_default};
use smallvec::SmallVec;
use std::borrow::Cow;
use std::io::IsTerminal;
use std::time::Duration;
use std::{env, fs};
use std::{borrow::Cow, env, fs, io::IsTerminal, time::Duration};
use tracing::info;
use tracing_error::ErrorLayer;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::fmt::time::LocalTime;
use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt};
use tracing_subscriber::{
EnvFilter, Layer,
fmt::{format::FmtSpan, time::LocalTime},
layer::SubscriberExt,
util::SubscriberInitExt,
};
/// A guard object that manages the lifecycle of OpenTelemetry components.
///
@@ -69,9 +68,7 @@ pub struct OtelGuard {
tracer_provider: Option<SdkTracerProvider>,
meter_provider: Option<SdkMeterProvider>,
logger_provider: Option<SdkLoggerProvider>,
// Add a flexi_logger handle to keep the logging alive
flexi_logger_handles: Option<flexi_logger::LoggerHandle>,
// WorkerGuard for writing tracing files
tracing_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
}
@@ -112,35 +109,12 @@ impl Drop for OtelGuard {
}
if let Some(guard) = self.tracing_guard.take() {
// The guard will be dropped here, flushing any remaining logs
drop(guard);
println!("Tracing guard dropped, flushing logs.");
}
}
}
#[derive(Debug)]
pub enum TelemetryError {
BuildSpanExporter(String),
BuildMetricExporter(String),
BuildLogExporter(String),
InstallMetricsRecorder(String),
SubscriberInit(String),
}
impl std::fmt::Display for TelemetryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TelemetryError::BuildSpanExporter(e) => write!(f, "Span exporter build failed: {e}"),
TelemetryError::BuildMetricExporter(e) => write!(f, "Metric exporter build failed: {e}"),
TelemetryError::BuildLogExporter(e) => write!(f, "Log exporter build failed: {e}"),
TelemetryError::InstallMetricsRecorder(e) => write!(f, "Install metrics recorder failed: {e}"),
TelemetryError::SubscriberInit(e) => write!(f, "Tracing subscriber init failed: {e}"),
}
}
}
impl std::error::Error for TelemetryError {}
/// create OpenTelemetry Resource
fn resource(config: &OtelConfig) -> Resource {
Resource::builder()
@@ -170,16 +144,16 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout:
}
// Read the AsyncWith parameter from the environment variable
fn get_env_async_with() -> Option<WriteMode> {
fn get_env_async_with() -> WriteMode {
let pool_capa = get_env_usize(ENV_OBS_LOG_POOL_CAPA, DEFAULT_OBS_LOG_POOL_CAPA);
let message_capa = get_env_usize(ENV_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_MESSAGE_CAPA);
let flush_ms = get_env_u64(ENV_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_FLUSH_MS);
Some(AsyncWith {
AsyncWith {
pool_capa,
message_capa,
flush_interval: Duration::from_millis(flush_ms),
})
}
}
fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilter {
@@ -200,12 +174,9 @@ fn build_env_filter(logger_level: &str, default_level: Option<&str>) -> EnvFilte
fn format_with_color(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &Record) -> Result<(), std::io::Error> {
let level = record.level();
let level_style = style(level);
// Get the current thread information
let binding = std::thread::current();
let thread_name = binding.name().unwrap_or("unnamed");
let thread_id = format!("{:?}", std::thread::current().id());
writeln!(
w,
"[{}] {} [{}] [{}:{}] [{}:{}] {}",
@@ -224,12 +195,9 @@ fn format_with_color(w: &mut dyn std::io::Write, now: &mut DeferredNow, record:
#[inline(never)]
fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &Record) -> Result<(), std::io::Error> {
let level = record.level();
// Get the current thread information
let binding = std::thread::current();
let thread_name = binding.name().unwrap_or("unnamed");
let thread_id = format!("{:?}", std::thread::current().id());
writeln!(
w,
"[{}] {} [{}] [{}:{}] [{}:{}] {}",
@@ -247,9 +215,7 @@ fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &R
/// stdout + span information (fix: retain WorkerGuard to avoid releasing after initialization)
fn init_stdout_logging(_config: &OtelConfig, logger_level: &str, is_production: bool) -> OtelGuard {
let env_filter = build_env_filter(logger_level, None);
let (nb, guard) = tracing_appender::non_blocking(std::io::stdout());
let enable_color = std::io::stdout().is_terminal();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_timer(LocalTime::rfc_3339())
@@ -264,17 +230,15 @@ fn init_stdout_logging(_config: &OtelConfig, logger_level: &str, is_production:
.with_current_span(true)
.with_span_list(true)
.with_span_events(if is_production { FmtSpan::CLOSE } else { FmtSpan::FULL });
tracing_subscriber::registry()
.with(env_filter)
.with(ErrorLayer::default())
.with(fmt_layer)
.init();
IS_OBSERVABILITY_ENABLED.set(false).ok();
OBSERVABILITY_METRIC_ENABLED.set(false).ok();
counter!("rustfs.start.total").increment(1);
info!("Init stdout logging (level: {})", logger_level);
OtelGuard {
tracer_provider: None,
meter_provider: None,
@@ -285,27 +249,49 @@ fn init_stdout_logging(_config: &OtelConfig, logger_level: &str, is_production:
}
/// File rolling log (size switching + number retained)
fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: bool) -> OtelGuard {
use flexi_logger::{
Age, Cleanup, Criterion, FileSpec, LogSpecification, Naming,
WriteMode::{AsyncWith, BufferAndFlush},
};
fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: bool) -> Result<OtelGuard, TelemetryError> {
use flexi_logger::{Age, Cleanup, Criterion, FileSpec, LogSpecification, Naming};
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME);
let default_log_directory = rustfs_utils::dirs::get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY);
let log_directory = config.log_directory.as_deref().unwrap_or(default_log_directory.as_str());
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES);
if let Err(e) = fs::create_dir_all(log_directory) {
eprintln!("ERROR: create log dir '{}': {e}", log_directory);
return Err(TelemetryError::Io(e.to_string()));
}
#[cfg(unix)]
{
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
let _ = fs::set_permissions(log_directory, Permissions::from_mode(0o755));
let desired: u32 = 0o755;
match fs::metadata(log_directory) {
Ok(meta) => {
let current = meta.permissions().mode() & 0o777;
// Only tighten to 0755 if existing permissions are looser than target, avoid loosening
if (current & !desired) != 0 {
if let Err(e) = fs::set_permissions(log_directory, Permissions::from_mode(desired)) {
return Err(TelemetryError::SetPermissions(format!(
"dir='{}', want={:#o}, have={:#o}, err={}",
log_directory, desired, current, e
)));
}
// Second verification
if let Ok(meta2) = fs::metadata(log_directory) {
let after = meta2.permissions().mode() & 0o777;
if after != desired {
return Err(TelemetryError::SetPermissions(format!(
"dir='{}', want={:#o}, after={:#o}",
log_directory, desired, after
)));
}
}
}
}
Err(e) => {
return Err(TelemetryError::Io(format!("stat '{}' failed: {}", log_directory, e)));
}
}
}
// parsing level
@@ -345,25 +331,10 @@ fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: boo
};
// write mode
let write_mode = get_env_async_with().unwrap_or(if is_production {
AsyncWith {
pool_capa: DEFAULT_OBS_LOG_POOL_CAPA,
message_capa: DEFAULT_OBS_LOG_MESSAGE_CAPA,
flush_interval: Duration::from_millis(DEFAULT_OBS_LOG_FLUSH_MS),
}
} else {
BufferAndFlush
});
let write_mode = get_env_async_with();
// Build
let mut builder = flexi_logger::Logger::try_with_env_or_str(logger_level)
.unwrap_or_else(|e| {
if !is_production {
eprintln!("WARNING: Invalid logger configuration '{logger_level}': {e:?}");
eprintln!("Falling back to default configuration with level: {DEFAULT_LOG_LEVEL}");
}
flexi_logger::Logger::with(log_spec.clone())
})
.unwrap_or(flexi_logger::Logger::with(log_spec.clone()))
.format_for_stderr(format_with_color)
.format_for_stdout(format_with_color)
.format_for_files(format_for_file)
@@ -379,7 +350,7 @@ fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: boo
.use_utc();
// Optional copy to stdout (for local observation)
if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) {
if config.log_stdout_enabled.unwrap_or(DEFAULT_OBS_LOG_STDOUT_ENABLED) || !is_production {
builder = builder.duplicate_to_stdout(flexi_logger::Duplicate::All);
} else {
builder = builder.duplicate_to_stdout(flexi_logger::Duplicate::None);
@@ -393,20 +364,20 @@ fn init_file_logging(config: &OtelConfig, logger_level: &str, is_production: boo
}
};
IS_OBSERVABILITY_ENABLED.set(false).ok();
OBSERVABILITY_METRIC_ENABLED.set(false).ok();
counter!("rustfs.start.total").increment(1);
info!(
"Init file logging at '{}', roll size {:?}MB, keep {}",
log_directory, config.log_rotation_size_mb, keep_files
);
OtelGuard {
Ok(OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
flexi_logger_handles: handle,
tracing_guard: None,
}
})
}
/// Observability (HTTP export, supports three sub-endpoints; if not, fallback to unified endpoint)
@@ -464,24 +435,26 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio
.build()
.map_err(|e| TelemetryError::BuildMetricExporter(e.to_string()))?;
let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL);
let mut builder = MeterProviderBuilder::default().with_resource(res.clone());
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(Duration::from_secs(meter_interval))
.build(),
);
if use_stdout {
builder = builder.with_reader(create_periodic_reader(meter_interval));
}
let provider = builder.build();
let (provider, recorder) = Recorder::builder(service_name.clone())
.with_meter_provider(|b| {
let b = b.with_resource(res.clone()).with_reader(
PeriodicReader::builder(exporter)
.with_interval(Duration::from_secs(meter_interval))
.build(),
);
if use_stdout {
b.with_reader(create_periodic_reader(meter_interval))
} else {
b
}
})
.build();
global::set_meter_provider(provider.clone());
metrics::set_global_recorder(recorder).map_err(|e| TelemetryError::InstallMetricsRecorder(e.to_string()))?;
provider
};
// metrics crate -> OTel
let _ = metrics_exporter_opentelemetry::Recorder::builder(service_name.clone()).install_global();
// LoggerHTTP
let logger_provider = {
let exporter = opentelemetry_otlp::LogExporter::builder()
@@ -536,7 +509,7 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio
.with(MetricsLayer::new(meter_provider.clone()))
.init();
IS_OBSERVABILITY_ENABLED.set(true).ok();
OBSERVABILITY_METRIC_ENABLED.set(true).ok();
counter!("rustfs.start.total").increment(1);
info!(
"Init observability (HTTP): trace='{}', metric='{}', log='{}'",
@@ -571,7 +544,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> Result<OtelGuard, Telemetry
// Rule 2: The user has explicitly customized the log directory (determined by whether ENV_OBS_LOG_DIRECTORY is set)
let user_set_log_dir = env::var(ENV_OBS_LOG_DIRECTORY).is_ok();
if user_set_log_dir {
return Ok(init_file_logging(config, logger_level, is_production));
return init_file_logging(config, logger_level, is_production);
}
// Rule 1: Default stdout (error level)

View File

@@ -26,7 +26,7 @@ use tracing::{error, info, warn};
pub(crate) async fn start_audit_system() -> AuditResult<()> {
info!(
target: "rustfs::main::start_audit_system",
"Step 1: Initializing the audit system..."
"Initializing the audit system..."
);
// 1. Get the global configuration loaded by ecstore
@@ -39,7 +39,7 @@ pub(crate) async fn start_audit_system() -> AuditResult<()> {
config.clone()
}
None => {
error!(
warn!(
target: "rustfs::main::start_audit_system",
"Audit system initialization failed: Global server configuration not loaded."
);

View File

@@ -14,7 +14,7 @@
use rustfs_config::DEFAULT_DELIMITER;
use rustfs_ecstore::config::GLOBAL_SERVER_CONFIG;
use tracing::{error, info, instrument};
use tracing::{error, info, instrument, warn};
/// Shuts down the event notifier system gracefully
pub(crate) async fn shutdown_event_notifier() {
@@ -28,7 +28,7 @@ pub(crate) async fn shutdown_event_notifier() {
let system = match rustfs_notify::notification_system() {
Some(sys) => sys,
None => {
error!("Event notifier system is not initialized.");
info!("Event notifier system is not initialized.");
return;
}
};
@@ -49,7 +49,7 @@ pub(crate) async fn init_event_notifier() {
let server_config = match GLOBAL_SERVER_CONFIG.get() {
Some(config) => config.clone(), // Clone the config to pass ownership
None => {
error!("Event notifier initialization failed: Global server config not loaded.");
warn!("Event notifier initialization failed: Global server config not loaded.");
return;
}
};