diff --git a/.docker/observability/grafana/dashboards/rustfs.json b/.docker/observability/grafana/dashboards/rustfs.json index 41bd5ad2..81d45a38 100644 --- a/.docker/observability/grafana/dashboards/rustfs.json +++ b/.docker/observability/grafana/dashboards/rustfs.json @@ -2689,6 +2689,167 @@ ], "title": "Network", "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "binBps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 80 + }, + "id": 39, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "editorMode": "code", + "expr": "sum by (bucket, targetArn) (rustfs_bucket_replication_bandwidth_current_bytes_per_second{job=~\"$job\",bucket=~\"$bucket\"})", + "legendFormat": "{{bucket}} | {{targetArn}}", + "range": true, + "refId": "Replication_Bandwidth_Current" + } + ], + "title": "Replication_Bandwidth_Current", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "binBps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 80 + }, + "id": 38, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "12.3.2", + "targets": [ + { + "editorMode": "builder", + "expr": "max by (bucket, targetArn) (rustfs_bucket_replication_bandwidth_limit_bytes_per_second{job=~\"$job\",bucket=~\"$bucket\"})", + "format": "time_series", + "legendFormat": "{{bucket}} | {{targetArn}}", + "range": true, + "refId": "Replication_Bandwidth_Limit" + } + ], + "title": "Replication_Bandwidth_Limit", + "type": "stat" } ], "preload": false, diff --git a/crates/ecstore/src/bucket/bandwidth/monitor.rs b/crates/ecstore/src/bucket/bandwidth/monitor.rs index 20357678..2da030b6 100644 --- a/crates/ecstore/src/bucket/bandwidth/monitor.rs +++ b/crates/ecstore/src/bucket/bandwidth/monitor.rs @@ -14,11 +14,16 @@ use crate::bucket::bandwidth::reader::BucketOptions; use ratelimit::{Error as RatelimitError, Ratelimiter}; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock}; -use std::time::Duration; +use std::time::{Duration, Instant}; use tracing::warn; +/// BETA_BUCKET is the weight used to calculate exponential moving average +const BETA_BUCKET: f64 = 0.1; + #[derive(Clone)] pub struct BucketThrottle { limiter: Arc>, @@ -71,25 +76,202 @@ impl BucketThrottle { } } +#[derive(Debug)] +pub struct BucketMeasurement { + bytes_since_last_window: AtomicU64, + start_time: Mutex>, + exp_moving_avg: Mutex, +} + +impl BucketMeasurement { + pub fn new(init_time: Instant) -> Self { + Self { + bytes_since_last_window: AtomicU64::new(0), + start_time: Mutex::new(Some(init_time)), + exp_moving_avg: Mutex::new(0.0), + } + } + + pub fn increment_bytes(&self, bytes: u64) { + self.bytes_since_last_window.fetch_add(bytes, Ordering::Relaxed); + } + + pub fn update_exponential_moving_average(&self, end_time: Instant) { + let mut start_time = self.start_time.lock().unwrap_or_else(|e| { + warn!("bucket measurement start_time mutex poisoned, recovering"); + e.into_inner() + }); + let previous_start = *start_time; + *start_time = Some(end_time); + + let Some(prev_start) = previous_start else { + return; + }; + + if prev_start > end_time { + return; + } + + let duration = end_time.duration_since(prev_start); + if duration.is_zero() { + return; + } + + let bytes_since_last_window = self.bytes_since_last_window.swap(0, Ordering::Relaxed); + let increment = bytes_since_last_window as f64 / duration.as_secs_f64(); + + let mut exp_moving_avg = self.exp_moving_avg.lock().unwrap_or_else(|e| { + warn!("bucket measurement exp_moving_avg mutex poisoned, recovering"); + e.into_inner() + }); + + if *exp_moving_avg == 0.0 { + *exp_moving_avg = increment; + return; + } + + *exp_moving_avg = exponential_moving_average(BETA_BUCKET, *exp_moving_avg, increment); + } + + pub fn get_exp_moving_avg_bytes_per_second(&self) -> f64 { + *self.exp_moving_avg.lock().unwrap_or_else(|e| { + warn!("bucket measurement exp_moving_avg mutex poisoned, recovering"); + e.into_inner() + }) + } +} + +fn exponential_moving_average(beta: f64, previous_avg: f64, increment_avg: f64) -> f64 { + (1f64 - beta) * increment_avg + beta * previous_avg +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BandwidthDetails { + pub limit_bytes_per_sec: i64, + pub current_bandwidth_bytes_per_sec: f64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BucketBandwidthReport { + pub bucket_stats: HashMap, +} + pub struct Monitor { t_lock: RwLock>, + m_lock: RwLock>, pub node_count: u64, } impl Monitor { pub fn new(num_nodes: u64) -> Arc { let node_cnt = num_nodes.max(1); - Arc::new(Monitor { + let m = Arc::new(Monitor { t_lock: RwLock::new(HashMap::new()), + m_lock: RwLock::new(HashMap::new()), node_count: node_cnt, - }) + }); + if let Ok(handle) = tokio::runtime::Handle::try_current() { + let weak = Arc::downgrade(&m); + handle.spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(2)); + loop { + interval.tick().await; + let Some(monitor) = weak.upgrade() else { break }; + monitor.update_moving_avg(); + } + }); + } + m + } + + pub fn init_measurement(&self, opts: &BucketOptions) { + let mut guard = self.m_lock.write().unwrap_or_else(|e| { + warn!("bucket monitor measurement rwlock write poisoned, recovering"); + e.into_inner() + }); + guard + .entry(opts.clone()) + .or_insert_with(|| BucketMeasurement::new(Instant::now())); + } + + pub fn update_measurement(&self, opts: &BucketOptions, bytes: u64) { + { + let guard = self.m_lock.read().unwrap_or_else(|e| { + warn!("bucket monitor measurement rwlock read poisoned, recovering"); + e.into_inner() + }); + if let Some(measurement) = guard.get(opts) { + measurement.increment_bytes(bytes); + return; + } + } + + // Miss path: write lock + insert once, then increment. + let mut guard = self.m_lock.write().unwrap_or_else(|e| { + warn!("bucket monitor measurement rwlock write poisoned, recovering"); + e.into_inner() + }); + + // Double-check after lock upgrade in case another thread inserted it. + let measurement = guard + .entry(opts.clone()) + .or_insert_with(|| BucketMeasurement::new(Instant::now())); + + measurement.increment_bytes(bytes); + } + + pub fn update_moving_avg(&self) { + let now = Instant::now(); + let guard = self.m_lock.read().unwrap_or_else(|e| { + warn!("bucket monitor measurement rwlock read poisoned, recovering"); + e.into_inner() + }); + for measurement in guard.values() { + measurement.update_exponential_moving_average(now); + } + } + + pub fn get_report(&self, select_bucket: impl Fn(&str) -> bool) -> BucketBandwidthReport { + let t_guard = self.t_lock.read().unwrap_or_else(|e| { + warn!("bucket monitor throttle rwlock read poisoned, recovering"); + e.into_inner() + }); + let m_guard = self.m_lock.read().unwrap_or_else(|e| { + warn!("bucket monitor measurement rwlock read poisoned, recovering"); + e.into_inner() + }); + let mut bucket_stats = HashMap::new(); + for (opts, throttle) in t_guard.iter() { + if !select_bucket(&opts.name) { + continue; + } + let mut current_bandwidth_bytes_per_sec = 0.0; + if let Some(measurement) = m_guard.get(opts) { + current_bandwidth_bytes_per_sec = measurement.get_exp_moving_avg_bytes_per_second(); + } + bucket_stats.insert( + opts.clone(), + BandwidthDetails { + limit_bytes_per_sec: throttle.node_bandwidth_per_sec * self.node_count as i64, + current_bandwidth_bytes_per_sec, + }, + ); + } + BucketBandwidthReport { bucket_stats } } pub fn delete_bucket(&self, bucket: &str) { self.t_lock .write() .unwrap_or_else(|e| { - warn!("bucket monitor rwlock write poisoned, recovering"); + warn!("bucket monitor throttle rwlock write poisoned, recovering"); + e.into_inner() + }) + .retain(|opts, _| opts.name != bucket); + self.m_lock + .write() + .unwrap_or_else(|e| { + warn!("bucket monitor measurement rwlock write poisoned, recovering"); e.into_inner() }) .retain(|opts, _| opts.name != bucket); @@ -103,7 +285,14 @@ impl Monitor { self.t_lock .write() .unwrap_or_else(|e| { - warn!("bucket monitor rwlock write poisoned, recovering"); + warn!("bucket monitor throttle rwlock write poisoned, recovering"); + e.into_inner() + }) + .remove(&opts); + self.m_lock + .write() + .unwrap_or_else(|e| { + warn!("bucket monitor measurement rwlock write poisoned, recovering"); e.into_inner() }) .remove(&opts); @@ -113,7 +302,7 @@ impl Monitor { self.t_lock .read() .unwrap_or_else(|e| { - warn!("bucket monitor rwlock read poisoned, recovering"); + warn!("bucket monitor throttle rwlock read poisoned, recovering"); e.into_inner() }) .get(opts) @@ -160,7 +349,7 @@ impl Monitor { self.t_lock .write() .unwrap_or_else(|e| { - warn!("bucket monitor rwlock write poisoned, recovering"); + warn!("bucket monitor throttle rwlock write poisoned, recovering"); e.into_inner() }) .insert(opts, throttle); @@ -174,7 +363,7 @@ impl Monitor { self.t_lock .read() .unwrap_or_else(|e| { - warn!("bucket monitor rwlock read poisoned, recovering"); + warn!("bucket monitor throttle rwlock read poisoned, recovering"); e.into_inner() }) .contains_key(&opt) @@ -184,6 +373,7 @@ impl Monitor { #[cfg(test)] mod tests { use super::*; + use std::panic::{AssertUnwindSafe, catch_unwind}; #[test] fn test_set_and_get_throttle_with_node_split() { @@ -318,4 +508,82 @@ mod tests { assert_eq!(t2.burst(), 500); assert_eq!(t2.node_bandwidth_per_sec, 500); } + + #[test] + fn test_bucket_measurement_recovers_from_poisoned_mutexes() { + let measurement = BucketMeasurement::new(Instant::now()); + + let _ = catch_unwind(AssertUnwindSafe(|| { + let _guard = measurement.start_time.lock().unwrap(); + panic!("poison start_time mutex"); + })); + measurement.increment_bytes(64); + measurement.update_exponential_moving_average(Instant::now() + Duration::from_secs(1)); + + let _ = catch_unwind(AssertUnwindSafe(|| { + let _guard = measurement.exp_moving_avg.lock().unwrap(); + panic!("poison exp_moving_avg mutex"); + })); + measurement.increment_bytes(32); + measurement.update_exponential_moving_average(Instant::now() + Duration::from_secs(2)); + + let value = measurement.get_exp_moving_avg_bytes_per_second(); + assert!(value.is_finite()); + assert!(value >= 0.0); + } + + #[test] + fn test_get_report_limit_and_current_bandwidth_after_measurement() { + let monitor = Monitor::new(4); + monitor.set_bandwidth_limit("b1", "arn1", 400); + let opts = BucketOptions { + name: "b1".to_string(), + replication_arn: "arn1".to_string(), + }; + monitor.init_measurement(&opts); + monitor.update_measurement(&opts, 500); + monitor.update_measurement(&opts, 500); + std::thread::sleep(Duration::from_millis(110)); + monitor.update_moving_avg(); + + let report = monitor.get_report(|name| name == "b1"); + let details = report.bucket_stats.get(&opts).expect("report should contain b1/arn1"); + assert_eq!(details.limit_bytes_per_sec, 400); + assert!( + details.current_bandwidth_bytes_per_sec > 0.0, + "current_bandwidth should be positive after update_measurement and update_moving_avg" + ); + assert!( + details.current_bandwidth_bytes_per_sec < 20000.0, + "current_bandwidth should be in reasonable range" + ); + } + + #[test] + fn test_get_report_select_bucket_filters() { + let monitor = Monitor::new(2); + monitor.set_bandwidth_limit("b1", "arn1", 100); + monitor.set_bandwidth_limit("b2", "arn2", 200); + let opts_b1 = BucketOptions { + name: "b1".to_string(), + replication_arn: "arn1".to_string(), + }; + let opts_b2 = BucketOptions { + name: "b2".to_string(), + replication_arn: "arn2".to_string(), + }; + monitor.init_measurement(&opts_b1); + monitor.init_measurement(&opts_b2); + + let report_all = monitor.get_report(|_| true); + assert_eq!(report_all.bucket_stats.len(), 2); + + let report_b1 = monitor.get_report(|name| name == "b1"); + assert_eq!(report_b1.bucket_stats.len(), 1); + assert_eq!(report_b1.bucket_stats.get(&opts_b1).unwrap().limit_bytes_per_sec, 100); + + let report_b2 = monitor.get_report(|name| name == "b2"); + assert_eq!(report_b2.bucket_stats.len(), 1); + assert_eq!(report_b2.bucket_stats.get(&opts_b2).unwrap().limit_bytes_per_sec, 200); + } } diff --git a/crates/ecstore/src/bucket/bandwidth/reader.rs b/crates/ecstore/src/bucket/bandwidth/reader.rs index af8b81af..d9c5fc89 100644 --- a/crates/ecstore/src/bucket/bandwidth/reader.rs +++ b/crates/ecstore/src/bucket/bandwidth/reader.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::bucket::bandwidth::monitor::Monitor; +use serde::{Deserialize, Serialize}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -20,7 +21,7 @@ use tokio::io::{AsyncRead, ReadBuf}; use tokio::time::Sleep; use tracing::{debug, warn}; -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct BucketOptions { pub name: String, pub replication_arn: String, @@ -54,6 +55,9 @@ impl MonitoredReader { header_size = opts.header_size, "MonitoredReader created" ); + if throttle.is_some() { + m.init_measurement(&opts.bucket_options); + } MonitoredReader { r, m, @@ -117,7 +121,15 @@ impl AsyncRead for MonitoredReader { } } - poll_limited_read(&mut this.r, cx, buf, need, &mut this.temp_buf) + let filled_before = buf.filled().len(); + let result = poll_limited_read(&mut this.r, cx, buf, need, &mut this.temp_buf); + if let Poll::Ready(Ok(())) = result { + let read_bytes = buf.filled().len().saturating_sub(filled_before) as u64; + if read_bytes > 0 { + this.m.update_measurement(&this.opts.bucket_options, read_bytes); + } + } + result } } diff --git a/crates/ecstore/src/bucket/replication/mod.rs b/crates/ecstore/src/bucket/replication/mod.rs index 01fb71ef..308f0063 100644 --- a/crates/ecstore/src/bucket/replication/mod.rs +++ b/crates/ecstore/src/bucket/replication/mod.rs @@ -23,4 +23,5 @@ pub use config::*; pub use datatypes::*; pub use replication_pool::*; pub use replication_resyncer::*; +pub use replication_state::BucketStats; pub use rule::*; diff --git a/crates/ecstore/src/bucket/replication/replication_resyncer.rs b/crates/ecstore/src/bucket/replication/replication_resyncer.rs index 320a9c2a..98b4a264 100644 --- a/crates/ecstore/src/bucket/replication/replication_resyncer.rs +++ b/crates/ecstore/src/bucket/replication/replication_resyncer.rs @@ -18,6 +18,7 @@ use crate::bucket::bucket_target_sys::{ }; use crate::bucket::metadata_sys; use crate::bucket::replication::ResyncStatusType; +use crate::bucket::replication::replication_pool::GLOBAL_REPLICATION_STATS; use crate::bucket::replication::{ObjectOpts, ReplicationConfigurationExt as _}; use crate::bucket::tagging::decode_tags_to_map; use crate::bucket::target::BucketTargets; @@ -1442,9 +1443,13 @@ pub async fn replicate_delete(dobj: DeletedObjectReplicationInfo, ) }; - for tgt in rinfos.targets.iter() { - if tgt.replication_status != tgt.prev_replication_status { - // TODO: update global replication status + if let Some(stats) = GLOBAL_REPLICATION_STATS.get() { + for tgt in rinfos.targets.iter() { + if tgt.replication_status != tgt.prev_replication_status { + stats + .update(&bucket, tgt, tgt.replication_status.clone(), tgt.prev_replication_status.clone()) + .await; + } } } @@ -1900,7 +1905,15 @@ pub async fn replicate_object(roi: ReplicateObjectInfo, storage: object_info = u; } - // TODO: update stats + if let Some(stats) = GLOBAL_REPLICATION_STATS.get() { + for tgt in &rinfos.targets { + if tgt.replication_status != tgt.prev_replication_status { + stats + .update(&bucket, tgt, tgt.replication_status.clone(), tgt.prev_replication_status.clone()) + .await; + } + } + } } let event_name = if replication_status == ReplicationStatusType::Completed { @@ -1918,9 +1931,17 @@ pub async fn replicate_object(roi: ReplicateObjectInfo, storage: ..Default::default() }); - if rinfos.replication_status() != ReplicationStatusType::Completed { - // TODO: update stats - // pool + if rinfos.replication_status() != ReplicationStatusType::Completed + && roi.replication_status_internal == rinfos.replication_status_internal() + && let Some(stats) = GLOBAL_REPLICATION_STATS.get() + { + for tgt in &rinfos.targets { + if tgt.replication_status != tgt.prev_replication_status { + stats + .update(&bucket, tgt, tgt.replication_status.clone(), tgt.prev_replication_status.clone()) + .await; + } + } } } diff --git a/crates/ecstore/src/bucket/replication/replication_state.rs b/crates/ecstore/src/bucket/replication/replication_state.rs index 28971c7b..2efa3adb 100644 --- a/crates/ecstore/src/bucket/replication/replication_state.rs +++ b/crates/ecstore/src/bucket/replication/replication_state.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::error::Error; +use crate::global::get_global_bucket_monitor; use rustfs_filemeta::{ReplicatedTargetInfo, ReplicationStatusType, ReplicationType}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -585,6 +586,8 @@ pub struct BucketReplicationStat { pub latency: LatencyStats, pub xfer_rate_lrg: XferStats, pub xfer_rate_sml: XferStats, + pub bandwidth_limit_bytes_per_sec: i64, + pub current_bandwidth_bytes_per_sec: f64, } impl BucketReplicationStat { @@ -1019,6 +1022,9 @@ impl ReplicationStats { latency: stat.latency.merge(&old_stat.latency), xfer_rate_lrg: lrg, xfer_rate_sml: sml, + bandwidth_limit_bytes_per_sec: stat.bandwidth_limit_bytes_per_sec, + current_bandwidth_bytes_per_sec: stat.current_bandwidth_bytes_per_sec + + old_stat.current_bandwidth_bytes_per_sec, }; tot_replicated_size += stat.replicated_size; @@ -1069,24 +1075,43 @@ impl ReplicationStats { // In actual implementation, statistics would be obtained from cluster // This is simplified to get from local cache let cache = self.cache.read().await; - if let Some(stats) = cache.get(bucket) { - BucketStats { - uptime: SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_secs() as i64, - replication_stats: stats.clone_stats(), - queue_stats: Default::default(), - proxy_stats: ProxyMetric::default(), - } + let mut replication_stats = if let Some(stats) = cache.get(bucket) { + stats.clone_stats() } else { - BucketStats { - uptime: 0, - replication_stats: BucketReplicationStats::new(), - queue_stats: Default::default(), - proxy_stats: ProxyMetric::default(), + BucketReplicationStats::new() + }; + let uptime = if cache.contains_key(bucket) { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64 + } else { + 0 + }; + drop(cache); + + if let Some(monitor) = get_global_bucket_monitor() { + let bw_report = monitor.get_report(|name| name == bucket); + for (opts, bw) in bw_report.bucket_stats { + let stat = replication_stats + .stats + .entry(opts.replication_arn) + .or_insert_with(|| BucketReplicationStat { + xfer_rate_lrg: XferStats::new(), + xfer_rate_sml: XferStats::new(), + ..Default::default() + }); + stat.bandwidth_limit_bytes_per_sec = bw.limit_bytes_per_sec; + stat.current_bandwidth_bytes_per_sec = bw.current_bandwidth_bytes_per_sec; } } + + BucketStats { + uptime, + replication_stats, + queue_stats: Default::default(), + proxy_stats: ProxyMetric::default(), + } } /// Increase queue statistics diff --git a/crates/metrics/src/collectors/bucket_replication.rs b/crates/metrics/src/collectors/bucket_replication.rs new file mode 100644 index 00000000..85c114d5 --- /dev/null +++ b/crates/metrics/src/collectors/bucket_replication.rs @@ -0,0 +1,119 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Bucket replication bandwidth metrics collector. + +use crate::MetricType; +use crate::format::PrometheusMetric; +use std::borrow::Cow; + +/// Bucket replication bandwidth stats for one replication target. +#[derive(Debug, Clone, Default)] +pub struct BucketReplicationBandwidthStats { + pub bucket: String, + pub target_arn: String, + pub limit_bytes_per_sec: i64, + pub current_bandwidth_bytes_per_sec: f64, +} + +const BUCKET_LABEL: &str = "bucket"; +const TARGET_ARN_LABEL: &str = "targetArn"; + +const METRIC_BANDWIDTH_LIMIT: &str = "rustfs_bucket_replication_bandwidth_limit_bytes_per_second"; +const METRIC_BANDWIDTH_CURRENT: &str = "rustfs_bucket_replication_bandwidth_current_bytes_per_second"; + +const HELP_BANDWIDTH_LIMIT: &str = "Configured bandwidth limit for replication in bytes per second"; +const HELP_BANDWIDTH_CURRENT: &str = "Current replication bandwidth in bytes per second (EWMA)"; + +/// Collect bucket replication bandwidth metrics for Prometheus/OpenTelemetry export. +#[must_use] +#[inline] +pub fn collect_bucket_replication_bandwidth_metrics(stats: &[BucketReplicationBandwidthStats]) -> Vec { + if stats.is_empty() { + return Vec::new(); + } + + let mut metrics = Vec::with_capacity(stats.len() * 2); + for stat in stats { + let bucket_label: Cow<'static, str> = Cow::Owned(stat.bucket.clone()); + let target_arn_label: Cow<'static, str> = Cow::Owned(stat.target_arn.clone()); + + metrics.push( + PrometheusMetric::new( + METRIC_BANDWIDTH_LIMIT, + MetricType::Gauge, + HELP_BANDWIDTH_LIMIT, + stat.limit_bytes_per_sec as f64, + ) + .with_label(BUCKET_LABEL, bucket_label.clone()) + .with_label(TARGET_ARN_LABEL, target_arn_label.clone()), + ); + + metrics.push( + PrometheusMetric::new( + METRIC_BANDWIDTH_CURRENT, + MetricType::Gauge, + HELP_BANDWIDTH_CURRENT, + stat.current_bandwidth_bytes_per_sec, + ) + .with_label(BUCKET_LABEL, bucket_label) + .with_label(TARGET_ARN_LABEL, target_arn_label), + ); + } + + metrics +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_collect_bucket_replication_bandwidth_metrics() { + let stats = vec![BucketReplicationBandwidthStats { + bucket: "b1".to_string(), + target_arn: "arn:rustfs:replication:us-east-1:1:test-2".to_string(), + limit_bytes_per_sec: 1_048_576, + current_bandwidth_bytes_per_sec: 204_800.0, + }]; + + let metrics = collect_bucket_replication_bandwidth_metrics(&stats); + assert_eq!(metrics.len(), 2); + + let limit_metric = metrics.iter().find(|m| m.name == METRIC_BANDWIDTH_LIMIT); + assert!(limit_metric.is_some()); + assert_eq!(limit_metric.map(|m| m.value), Some(1_048_576.0)); + assert!( + limit_metric + .and_then(|m| { + m.labels + .iter() + .find(|(k, _)| *k == TARGET_ARN_LABEL) + .map(|(_, v)| v.as_ref() == "arn:rustfs:replication:us-east-1:1:test-2") + }) + .unwrap_or(false) + ); + + let current_metric = metrics.iter().find(|m| m.name == METRIC_BANDWIDTH_CURRENT); + assert!(current_metric.is_some()); + assert_eq!(current_metric.map(|m| m.value), Some(204_800.0)); + } + + #[test] + fn test_collect_bucket_replication_bandwidth_metrics_empty() { + let stats: Vec = Vec::new(); + let metrics = collect_bucket_replication_bandwidth_metrics(&stats); + assert!(metrics.is_empty()); + } +} diff --git a/crates/metrics/src/collectors/global.rs b/crates/metrics/src/collectors/global.rs index dcfc46dd..f856d433 100644 --- a/crates/metrics/src/collectors/global.rs +++ b/crates/metrics/src/collectors/global.rs @@ -13,17 +13,19 @@ // limitations under the License. use crate::collectors::{ - BucketStats, ClusterStats, DiskStats, ResourceStats, collect_bucket_metrics, collect_cluster_metrics, collect_node_metrics, - collect_resource_metrics, + BucketReplicationBandwidthStats, BucketStats, ClusterStats, DiskStats, ResourceStats, collect_bucket_metrics, + collect_bucket_replication_bandwidth_metrics, collect_cluster_metrics, collect_node_metrics, collect_resource_metrics, }; use crate::constants::{ - DEFAULT_BUCKET_METRICS_INTERVAL, DEFAULT_CLUSTER_METRICS_INTERVAL, DEFAULT_NODE_METRICS_INTERVAL, - DEFAULT_RESOURCE_METRICS_INTERVAL, ENV_BUCKET_METRICS_INTERVAL, ENV_CLUSTER_METRICS_INTERVAL, ENV_DEFAULT_METRICS_INTERVAL, + DEFAULT_BUCKET_METRICS_INTERVAL, DEFAULT_BUCKET_REPLICATION_BANDWIDTH_METRICS_INTERVAL, DEFAULT_CLUSTER_METRICS_INTERVAL, + DEFAULT_NODE_METRICS_INTERVAL, DEFAULT_RESOURCE_METRICS_INTERVAL, ENV_BUCKET_METRICS_INTERVAL, + ENV_BUCKET_REPLICATION_BANDWIDTH_METRICS_INTERVAL, ENV_CLUSTER_METRICS_INTERVAL, ENV_DEFAULT_METRICS_INTERVAL, ENV_NODE_METRICS_INTERVAL, ENV_RESOURCE_METRICS_INTERVAL, }; use crate::format::report_metrics; use rustfs_ecstore::bucket::metadata_sys::get_quota_config; use rustfs_ecstore::data_usage::load_data_usage_from_backend; +use rustfs_ecstore::global::get_global_bucket_monitor; use rustfs_ecstore::pools::{get_total_usable_capacity, get_total_usable_capacity_free}; use rustfs_ecstore::store_api::{BucketOperations, BucketOptions}; use rustfs_ecstore::{StorageAPI, new_object_layer_fn}; @@ -147,6 +149,25 @@ async fn collect_bucket_stats() -> Vec { stats } +/// Collect bucket replication bandwidth stats from the global monitor. +fn collect_bucket_replication_bandwidth_stats() -> Vec { + let Some(monitor) = get_global_bucket_monitor() else { + return Vec::new(); + }; + + monitor + .get_report(|_| true) + .bucket_stats + .into_iter() + .map(|(opts, details)| BucketReplicationBandwidthStats { + bucket: opts.name, + target_arn: opts.replication_arn, + limit_bytes_per_sec: details.limit_bytes_per_sec, + current_bandwidth_bytes_per_sec: details.current_bandwidth_bytes_per_sec, + }) + .collect() +} + /// Collect disk statistics from the storage layer. async fn collect_disk_stats() -> Vec { let Some(store) = new_object_layer_fn() else { @@ -235,6 +256,10 @@ pub fn init_metrics_collectors(token: CancellationToken) { let cluster_interval = get_interval(ENV_CLUSTER_METRICS_INTERVAL, DEFAULT_CLUSTER_METRICS_INTERVAL); let bucket_interval = get_interval(ENV_BUCKET_METRICS_INTERVAL, DEFAULT_BUCKET_METRICS_INTERVAL); + let bucket_replication_bandwidth_interval = get_interval( + ENV_BUCKET_REPLICATION_BANDWIDTH_METRICS_INTERVAL, + DEFAULT_BUCKET_REPLICATION_BANDWIDTH_METRICS_INTERVAL, + ); let node_interval = get_interval(ENV_NODE_METRICS_INTERVAL, DEFAULT_NODE_METRICS_INTERVAL); let resource_interval = get_interval(ENV_RESOURCE_METRICS_INTERVAL, DEFAULT_RESOURCE_METRICS_INTERVAL); @@ -295,6 +320,25 @@ pub fn init_metrics_collectors(token: CancellationToken) { } }); + // Spawn task for bucket replication bandwidth metrics + let token_clone = token.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(bucket_replication_bandwidth_interval); + loop { + tokio::select! { + _ = interval.tick() => { + let stats = collect_bucket_replication_bandwidth_stats(); + let metrics = collect_bucket_replication_bandwidth_metrics(&stats); + report_metrics(&metrics); + } + _ = token_clone.cancelled() => { + warn!("Metrics collection for bucket replication bandwidth stats cancelled."); + return; + } + } + } + }); + // Spawn task for resource metrics let token_clone = token.clone(); tokio::spawn(async move { diff --git a/crates/metrics/src/collectors/mod.rs b/crates/metrics/src/collectors/mod.rs index f8241f9b..6131dc4b 100644 --- a/crates/metrics/src/collectors/mod.rs +++ b/crates/metrics/src/collectors/mod.rs @@ -19,6 +19,7 @@ //! //! - [`cluster`]: Cluster-wide capacity and object statistics //! - [`bucket`]: Per-bucket usage and quota metrics +//! - [`bucket_replication`]: Per-target replication bandwidth metrics //! - [`node`]: Per-node disk capacity and health metrics //! - [`resource`]: System resource metrics (CPU, memory, uptime) //! @@ -61,12 +62,14 @@ //! ``` mod bucket; +mod bucket_replication; mod cluster; pub(crate) mod global; mod node; mod resource; pub use bucket::{BucketStats, collect_bucket_metrics}; +pub use bucket_replication::{BucketReplicationBandwidthStats, collect_bucket_replication_bandwidth_metrics}; pub use cluster::{ClusterStats, collect_cluster_metrics}; pub use global::init_metrics_collectors; pub use node::{DiskStats, collect_node_metrics}; diff --git a/crates/metrics/src/constants/mod.rs b/crates/metrics/src/constants/mod.rs index 6533997f..b393ee68 100644 --- a/crates/metrics/src/constants/mod.rs +++ b/crates/metrics/src/constants/mod.rs @@ -40,3 +40,8 @@ pub const DEFAULT_NODE_METRICS_INTERVAL: Duration = Duration::from_secs(60); pub const ENV_RESOURCE_METRICS_INTERVAL: &str = "RUSTFS_METRICS_RESOURCE_INTERVAL_SEC"; /// Default interval for collecting system resource metrics (CPU, memory). pub const DEFAULT_RESOURCE_METRICS_INTERVAL: Duration = Duration::from_secs(15); + +/// Environment variable key for replication bandwidth metrics interval (seconds). +pub const ENV_BUCKET_REPLICATION_BANDWIDTH_METRICS_INTERVAL: &str = "RUSTFS_METRICS_BUCKET_REPLICATION_BANDWIDTH_INTERVAL_SEC"; +/// Default interval for collecting replication bandwidth metrics. +pub const DEFAULT_BUCKET_REPLICATION_BANDWIDTH_METRICS_INTERVAL: Duration = Duration::from_secs(30); diff --git a/crates/metrics/src/metrics_type/bucket_replication.rs b/crates/metrics/src/metrics_type/bucket_replication.rs index 2e9ec0dd..8234909c 100644 --- a/crates/metrics/src/metrics_type/bucket_replication.rs +++ b/crates/metrics/src/metrics_type/bucket_replication.rs @@ -190,6 +190,24 @@ pub static BUCKET_REPL_TOTAL_FAILED_COUNT_MD: LazyLock = LazyL ) }); +pub static BUCKET_REPL_BANDWIDTH_LIMIT_MD: LazyLock = LazyLock::new(|| { + new_gauge_md( + MetricName::BandwidthLimitBytesPerSecond, + "Configured bandwidth limit for replication in bytes per second", + &[BUCKET_L, TARGET_ARN_L], + subsystems::BUCKET_REPLICATION, + ) +}); + +pub static BUCKET_REPL_BANDWIDTH_CURRENT_MD: LazyLock = LazyLock::new(|| { + new_gauge_md( + MetricName::BandwidthCurrentBytesPerSecond, + "Current replication bandwidth in bytes per second (EWMA)", + &[BUCKET_L, TARGET_ARN_L], + subsystems::BUCKET_REPLICATION, + ) +}); + // TODO - add a metric for the number of DELETE requests proxied to replication target pub static BUCKET_REPL_PROXIED_DELETE_TAGGING_REQUESTS_FAILURES_MD: LazyLock = LazyLock::new(|| { new_counter_md( diff --git a/crates/metrics/src/metrics_type/entry/metric_name.rs b/crates/metrics/src/metrics_type/entry/metric_name.rs index e8a5b44a..9a53b8c3 100644 --- a/crates/metrics/src/metrics_type/entry/metric_name.rs +++ b/crates/metrics/src/metrics_type/entry/metric_name.rs @@ -263,6 +263,8 @@ pub enum MetricName { ReplicationMaxQueuedCount, ReplicationMaxDataTransferRate, ReplicationRecentBacklogCount, + BandwidthLimitBytesPerSecond, + BandwidthCurrentBytesPerSecond, // Scanner-related metrics ScannerBucketScansFinished, @@ -580,6 +582,8 @@ impl MetricName { Self::ReplicationMaxQueuedCount => "max_queued_count".to_string(), Self::ReplicationMaxDataTransferRate => "max_data_transfer_rate".to_string(), Self::ReplicationRecentBacklogCount => "recent_backlog_count".to_string(), + Self::BandwidthLimitBytesPerSecond => "bandwidth_limit_bytes_per_second".to_string(), + Self::BandwidthCurrentBytesPerSecond => "bandwidth_current_bytes_per_second".to_string(), // Scanner-related metrics Self::ScannerBucketScansFinished => "bucket_scans_finished".to_string(), diff --git a/crates/policy/src/policy/action.rs b/crates/policy/src/policy/action.rs index 68e78ad5..45c39a45 100644 --- a/crates/policy/src/policy/action.rs +++ b/crates/policy/src/policy/action.rs @@ -496,6 +496,8 @@ pub enum AdminAction { GetBucketTargetAction, #[strum(serialize = "admin:ReplicationDiff")] ReplicationDiff, + #[strum(serialize = "admin:GetReplicationMetrics")] + GetReplicationMetricsAction, #[strum(serialize = "admin:ImportBucketMetadata")] ImportBucketMetadataAction, #[strum(serialize = "admin:ExportBucketMetadata")] @@ -581,6 +583,7 @@ impl AdminAction { | AdminAction::SetBucketTargetAction | AdminAction::GetBucketTargetAction | AdminAction::ReplicationDiff + | AdminAction::GetReplicationMetricsAction | AdminAction::ImportBucketMetadataAction | AdminAction::ExportBucketMetadataAction | AdminAction::SetTierAction @@ -670,4 +673,9 @@ mod tests { assert_eq!(arr.len(), 1); assert_eq!(arr[0].as_str().unwrap(), "s3:*"); } + + #[test] + fn test_get_replication_metrics_admin_action_is_valid() { + assert!(AdminAction::GetReplicationMetricsAction.is_valid()); + } } diff --git a/rustfs/src/admin/handlers/replication.rs b/rustfs/src/admin/handlers/replication.rs index a29f694d..c98648ed 100644 --- a/rustfs/src/admin/handlers/replication.rs +++ b/rustfs/src/admin/handlers/replication.rs @@ -24,7 +24,11 @@ use rustfs_config::MAX_ADMIN_REQUEST_BODY_SIZE; use rustfs_ecstore::bucket::bucket_target_sys::BucketTargetSys; use rustfs_ecstore::bucket::metadata::BUCKET_TARGETS_FILE; use rustfs_ecstore::bucket::metadata_sys; +use rustfs_ecstore::bucket::metadata_sys::get_replication_config; +use rustfs_ecstore::bucket::replication::BucketStats; +use rustfs_ecstore::bucket::replication::GLOBAL_REPLICATION_STATS; use rustfs_ecstore::bucket::target::BucketTarget; +use rustfs_ecstore::error::StorageError; use rustfs_ecstore::global::global_rustfs_port; use rustfs_ecstore::new_object_layer_fn; use rustfs_ecstore::store_api::{BucketOperations, BucketOptions}; @@ -32,7 +36,7 @@ use rustfs_policy::policy::action::{Action, AdminAction}; use s3s::header::CONTENT_TYPE; use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, s3_error}; use std::collections::HashMap; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use url::Host; fn extract_query_params(uri: &Uri) -> HashMap { @@ -98,12 +102,52 @@ pub struct GetReplicationMetricsHandler {} #[async_trait::async_trait] impl Operation for GetReplicationMetricsHandler { async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { - error!("GetReplicationMetricsHandler"); + validate_replication_admin_request(&req, AdminAction::GetReplicationMetricsAction).await?; + let queries = extract_query_params(&req.uri); - if let Some(bucket) = queries.get("bucket") { - error!("get bucket:{} metrics", bucket); + + let Some(bucket) = queries.get("bucket") else { + return Err(s3_error!(InvalidRequest, "bucket is required")); + }; + + if bucket.is_empty() { + return Err(s3_error!(InvalidRequest, "bucket is required")); } - Ok(S3Response::new((StatusCode::OK, Body::from("Ok".to_string())))) + + let Some(store) = new_object_layer_fn() else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); + }; + + store + .get_bucket_info(bucket, &BucketOptions::default()) + .await + .map_err(ApiError::from)?; + + if let Err(err) = get_replication_config(bucket).await { + if err == StorageError::ConfigNotFound { + info!("replication configuration not found for bucket '{}'", bucket); + return Err(S3Error::with_message( + S3ErrorCode::ReplicationConfigurationNotFoundError, + "replication not found".to_string(), + )); + } + error!("get_replication_config unexpected error: {:?}", err); + return Err(ApiError::from(err).into()); + } + + // TODO cluster cache + // In actual implementation, statistics would be obtained from cluster + // This is simplified to get from local cache + let bucket_stats = match GLOBAL_REPLICATION_STATS.get() { + Some(s) => s.get_latest_replication_stats(bucket).await, + None => BucketStats::default(), + }; + + let data = serde_json::to_vec(&bucket_stats.replication_stats) + .map_err(|_| S3Error::with_message(S3ErrorCode::InternalError, "serialize failed"))?; + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), headers)) } }