mirror of
https://github.com/rustfs/rustfs.git
synced 2026-03-17 14:24:08 +00:00
perf(obs): optimize metrics recorder and telemetry initialization (#1859)
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -10480,9 +10480,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zlib-rs"
|
||||
version = "0.6.0"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a7948af682ccbc3342b6e9420e8c51c1fe5d7bf7756002b4a3c6cabfe96a7e3c"
|
||||
checksum = "3a33bbf307b25a1774cee0687694ec72fa7814b3ab5c1c12a9d2fc6a36fc439c"
|
||||
|
||||
[[package]]
|
||||
name = "zmij"
|
||||
|
||||
@@ -28,6 +28,20 @@ use std::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
},
|
||||
};
|
||||
use tracing::error;
|
||||
|
||||
macro_rules! configure_builder {
|
||||
($builder:expr, $metadata:expr) => {{
|
||||
let mut builder = $builder;
|
||||
if let Some(metadata) = $metadata {
|
||||
if let Some(unit) = metadata.unit {
|
||||
builder = builder.with_unit(unit.as_canonical_label());
|
||||
}
|
||||
builder = builder.with_description(metadata.description.to_string());
|
||||
}
|
||||
builder
|
||||
}};
|
||||
}
|
||||
|
||||
/// A builder for constructing a [`Recorder`].
|
||||
#[derive(Debug)]
|
||||
@@ -67,16 +81,7 @@ impl Builder {
|
||||
let provider = self.builder.build();
|
||||
let meter = provider.meter_with_scope(self.scope.build());
|
||||
|
||||
(
|
||||
provider,
|
||||
Recorder {
|
||||
meter,
|
||||
metrics_metadata: Arc::new(Mutex::new(HashMap::new())),
|
||||
cached_counters: Arc::new(RwLock::new(HashMap::new())),
|
||||
cached_gauges: Arc::new(RwLock::new(HashMap::new())),
|
||||
cached_histograms: Arc::new(RwLock::new(HashMap::new())),
|
||||
},
|
||||
)
|
||||
(provider, Recorder::with_meter(meter))
|
||||
}
|
||||
|
||||
/// Builds a [`Recorder`] and sets it as the global recorder for the [`metrics`]
|
||||
@@ -135,12 +140,60 @@ impl Recorder {
|
||||
pub fn with_meter(meter: Meter) -> Self {
|
||||
Recorder {
|
||||
meter,
|
||||
metrics_metadata: Arc::new(Mutex::new(HashMap::new())),
|
||||
cached_counters: Arc::new(RwLock::new(HashMap::new())),
|
||||
cached_gauges: Arc::new(RwLock::new(HashMap::new())),
|
||||
cached_histograms: Arc::new(RwLock::new(HashMap::new())),
|
||||
metrics_metadata: Default::default(),
|
||||
cached_counters: Default::default(),
|
||||
cached_gauges: Default::default(),
|
||||
cached_histograms: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cached_metric<T: Clone>(lock: &RwLock<HashMap<Key, T>>, key: &Key, metric_type: &str) -> Option<T> {
|
||||
let cache = match lock.read() {
|
||||
Ok(g) => g,
|
||||
Err(e) => {
|
||||
error!("{} cache read lock poisoned: {}", metric_type, e);
|
||||
e.into_inner()
|
||||
}
|
||||
};
|
||||
cache.get(key).cloned()
|
||||
}
|
||||
|
||||
fn insert_cached_metric<T: Clone>(lock: &RwLock<HashMap<Key, T>>, key: Key, value: T, metric_type: &str) -> T {
|
||||
let mut cache = match lock.write() {
|
||||
Ok(g) => g,
|
||||
Err(e) => {
|
||||
error!("{} cache write lock poisoned: {}", metric_type, e);
|
||||
e.into_inner()
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(v) = cache.get(&key) {
|
||||
return v.clone();
|
||||
}
|
||||
cache.insert(key, value.clone());
|
||||
value
|
||||
}
|
||||
|
||||
fn with_metadata_lock<F, R>(&self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut HashMap<KeyName, MetricMetadata>) -> R,
|
||||
{
|
||||
let mut guard = self.metrics_metadata.lock().unwrap_or_else(|e| {
|
||||
error!("metrics_metadata lock poisoned: {}", e);
|
||||
e.into_inner()
|
||||
});
|
||||
f(&mut guard)
|
||||
}
|
||||
|
||||
fn describe_metric(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
|
||||
self.with_metadata_lock(|metadata| {
|
||||
metadata.insert(key, MetricMetadata { unit, description });
|
||||
});
|
||||
}
|
||||
|
||||
fn get_metadata_for_builder(&self, key_name: &str) -> Option<MetricMetadata> {
|
||||
self.with_metadata_lock(|metadata| metadata.remove(key_name))
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Recorder {
|
||||
@@ -153,32 +206,25 @@ impl Deref for Recorder {
|
||||
|
||||
impl metrics::Recorder for Recorder {
|
||||
fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
|
||||
let mut metrics_metadata = self.metrics_metadata.lock().unwrap();
|
||||
metrics_metadata.insert(key, MetricMetadata { unit, description });
|
||||
self.describe_metric(key, unit, description);
|
||||
}
|
||||
|
||||
fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
|
||||
let mut metrics_metadata = self.metrics_metadata.lock().unwrap();
|
||||
metrics_metadata.insert(key, MetricMetadata { unit, description });
|
||||
self.describe_metric(key, unit, description);
|
||||
}
|
||||
|
||||
fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
|
||||
let mut metrics_metadata = self.metrics_metadata.lock().unwrap();
|
||||
metrics_metadata.insert(key, MetricMetadata { unit, description });
|
||||
self.describe_metric(key, unit, description);
|
||||
}
|
||||
|
||||
fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter {
|
||||
if let Some(cached) = self.cached_counters.read().unwrap().get(key) {
|
||||
return cached.clone();
|
||||
if let Some(counter) = Self::get_cached_metric(&self.cached_counters, key, "counter") {
|
||||
return counter;
|
||||
}
|
||||
|
||||
let mut builder = self.meter.u64_counter(key.name().to_owned());
|
||||
if let Some(metadata) = self.metrics_metadata.lock().unwrap().get(key.name()) {
|
||||
if let Some(unit) = metadata.unit {
|
||||
builder = builder.with_unit(unit.as_canonical_label());
|
||||
}
|
||||
builder = builder.with_description(metadata.description.to_string());
|
||||
}
|
||||
let builder = self.meter.u64_counter(key.name().to_owned());
|
||||
let metadata = self.get_metadata_for_builder(key.name());
|
||||
let builder = configure_builder!(builder, metadata);
|
||||
|
||||
let counter = builder.build();
|
||||
let labels = key
|
||||
@@ -192,29 +238,17 @@ impl metrics::Recorder for Recorder {
|
||||
value: AtomicU64::new(0),
|
||||
}));
|
||||
|
||||
// Take write lock only after building
|
||||
let mut cache = self.cached_counters.write().unwrap();
|
||||
// Quick check it wasn't created between last read and finish building
|
||||
if let Some(cached) = cache.get(key) {
|
||||
return cached.clone();
|
||||
}
|
||||
|
||||
cache.insert(key.clone(), handle.clone());
|
||||
handle
|
||||
Self::insert_cached_metric(&self.cached_counters, key.clone(), handle, "counter")
|
||||
}
|
||||
|
||||
fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge {
|
||||
if let Some(cached) = self.cached_gauges.read().unwrap().get(key) {
|
||||
return cached.clone();
|
||||
if let Some(gauge) = Self::get_cached_metric(&self.cached_gauges, key, "gauge") {
|
||||
return gauge;
|
||||
}
|
||||
|
||||
let mut builder = self.meter.f64_gauge(key.name().to_owned());
|
||||
if let Some(metadata) = self.metrics_metadata.lock().unwrap().get(key.name()) {
|
||||
if let Some(unit) = metadata.unit {
|
||||
builder = builder.with_unit(unit.as_canonical_label());
|
||||
}
|
||||
builder = builder.with_description(metadata.description.to_string());
|
||||
}
|
||||
let builder = self.meter.f64_gauge(key.name().to_owned());
|
||||
let metadata = self.get_metadata_for_builder(key.name());
|
||||
let builder = configure_builder!(builder, metadata);
|
||||
|
||||
let gauge = builder.build();
|
||||
let labels = key
|
||||
@@ -228,29 +262,17 @@ impl metrics::Recorder for Recorder {
|
||||
value: AtomicU64::new(0),
|
||||
}));
|
||||
|
||||
// Take write lock only after building
|
||||
let mut cache = self.cached_gauges.write().unwrap();
|
||||
// Quick check it wasn't created between last read and finish building
|
||||
if let Some(cached) = cache.get(key) {
|
||||
return cached.clone();
|
||||
}
|
||||
|
||||
cache.insert(key.clone(), handle.clone());
|
||||
handle
|
||||
Self::insert_cached_metric(&self.cached_gauges, key.clone(), handle, "gauge")
|
||||
}
|
||||
|
||||
fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram {
|
||||
if let Some(cached) = self.cached_histograms.read().unwrap().get(key) {
|
||||
return cached.clone();
|
||||
if let Some(histogram) = Self::get_cached_metric(&self.cached_histograms, key, "histogram") {
|
||||
return histogram;
|
||||
}
|
||||
|
||||
let mut builder = self.meter.f64_histogram(key.name().to_owned());
|
||||
if let Some(metadata) = self.metrics_metadata.lock().unwrap().get(key.name()) {
|
||||
if let Some(unit) = metadata.unit {
|
||||
builder = builder.with_unit(unit.as_canonical_label());
|
||||
}
|
||||
builder = builder.with_description(metadata.description.to_string());
|
||||
}
|
||||
let builder = self.meter.f64_histogram(key.name().to_owned());
|
||||
let metadata = self.get_metadata_for_builder(key.name());
|
||||
let builder = configure_builder!(builder, metadata);
|
||||
|
||||
let histogram = builder.build();
|
||||
let labels = key
|
||||
@@ -260,15 +282,7 @@ impl metrics::Recorder for Recorder {
|
||||
|
||||
let handle = Histogram::from_arc(Arc::new(WrappedHistogram { histogram, labels }));
|
||||
|
||||
// Take write lock only after building
|
||||
let mut cache = self.cached_histograms.write().unwrap();
|
||||
// Quick check it wasn't created between last read and finish building
|
||||
if let Some(cached) = cache.get(key) {
|
||||
return cached.clone();
|
||||
}
|
||||
|
||||
cache.insert(key.clone(), handle.clone());
|
||||
handle
|
||||
Self::insert_cached_metric(&self.cached_histograms, key.clone(), handle, "histogram")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -477,6 +477,7 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio
|
||||
.build();
|
||||
global::set_meter_provider(provider.clone());
|
||||
metrics::set_global_recorder(recorder).map_err(|e| TelemetryError::InstallMetricsRecorder(e.to_string()))?;
|
||||
OBSERVABILITY_METRIC_ENABLED.set(true).ok();
|
||||
Some(provider)
|
||||
}
|
||||
};
|
||||
@@ -544,7 +545,6 @@ fn init_observability_http(config: &OtelConfig, logger_level: &str, is_productio
|
||||
.with(metrics_layer)
|
||||
.init();
|
||||
|
||||
OBSERVABILITY_METRIC_ENABLED.set(true).ok();
|
||||
counter!("rustfs.start.total").increment(1);
|
||||
info!(
|
||||
"Init observability (HTTP): trace='{}', metric='{}', log='{}'",
|
||||
|
||||
Reference in New Issue
Block a user