metrics(scanner): Add metrics to scanner (#1823)

Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: loverustfs <hello@rustfs.com>
Co-authored-by: heihutu <30542132+heihutu@users.noreply.github.com>
This commit is contained in:
evan slack
2026-02-15 05:36:40 -05:00
committed by GitHub
parent bffeacf1d2
commit 9786d9b004
9 changed files with 218 additions and 601 deletions

1
Cargo.lock generated
View File

@@ -6960,6 +6960,7 @@ version = "0.0.5"
dependencies = [
"async-trait",
"chrono",
"metrics",
"path-clean",
"rmp-serde",
"rustfs-filemeta",

View File

@@ -32,6 +32,7 @@ tokio = { workspace = true }
tonic = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
metrics = { workspace = true }
rustfs-madmin = { workspace = true }
rustfs-filemeta = { workspace = true }
serde = { workspace = true }
@@ -39,4 +40,4 @@ path-clean = { workspace = true }
rmp-serde = { workspace = true }
async-trait = { workspace = true }
s3s = { workspace = true }
tracing = { workspace = true }
tracing = { workspace = true }

View File

@@ -14,7 +14,8 @@
use crate::last_minute::{AccElem, LastMinuteLatency};
use chrono::{DateTime, Utc};
use rustfs_madmin::metrics::ScannerMetrics as M_ScannerMetrics;
use rustfs_madmin::metrics::{ScannerMetrics as M_ScannerMetrics, TimedAction};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
fmt::Display,
@@ -64,6 +65,37 @@ impl IlmAction {
|| *self == Self::DeleteAllVersionsAction
|| *self == Self::DelMarkerDeleteAllVersionsAction
}
pub fn as_str(&self) -> &'static str {
match self {
Self::NoneAction => "none",
Self::DeleteAction => "delete",
Self::DeleteVersionAction => "delete_version",
Self::TransitionAction => "transition",
Self::TransitionVersionAction => "transition_version",
Self::DeleteRestoredAction => "delete_restored",
Self::DeleteRestoredVersionAction => "delete_restored_version",
Self::DeleteAllVersionsAction => "delete_all_versions",
Self::DelMarkerDeleteAllVersionsAction => "del_marker_delete_all_versions",
Self::ActionCount => "action_count",
}
}
pub fn from_index(i: usize) -> Option<Self> {
match i {
0 => Some(Self::NoneAction),
1 => Some(Self::DeleteAction),
2 => Some(Self::DeleteVersionAction),
3 => Some(Self::TransitionAction),
4 => Some(Self::TransitionVersionAction),
5 => Some(Self::DeleteRestoredAction),
6 => Some(Self::DeleteRestoredVersionAction),
7 => Some(Self::DeleteAllVersionsAction),
8 => Some(Self::DelMarkerDeleteAllVersionsAction),
9 => Some(Self::ActionCount),
_ => None,
}
}
}
impl Display for IlmAction {
@@ -272,14 +304,76 @@ pub struct Metrics {
cycle_info: Arc<RwLock<Option<CurrentCycle>>>,
}
// This is a placeholder. We'll need to define this struct.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct CurrentCycle {
pub current: u64,
pub next: u64,
pub cycle_completed: Vec<DateTime<Utc>>,
pub started: DateTime<Utc>,
}
impl CurrentCycle {
pub fn unmarshal(&mut self, buf: &[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
*self = rmp_serde::from_slice(buf)?;
Ok(())
}
pub fn marshal(&self) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
Ok(rmp_serde::to_vec(self)?)
}
}
/// OTEL metric name constants for scanner metrics
const OTEL_SCANNER_OBJECTS_SCANNED: &str = "rustfs_scanner_objects_scanned_total";
const OTEL_SCANNER_DIRECTORIES_SCANNED: &str = "rustfs_scanner_directories_scanned_total";
const OTEL_SCANNER_BUCKETS_SCANNED: &str = "rustfs_scanner_buckets_scanned_total";
const OTEL_SCANNER_CYCLES: &str = "rustfs_scanner_cycles_total";
const OTEL_SCANNER_CYCLE_DURATION_SECONDS: &str = "rustfs_scanner_cycle_duration_seconds";
const OTEL_SCANNER_BUCKET_DRIVE_DURATION_SECONDS: &str = "rustfs_scanner_bucket_drive_duration_seconds";
/// Emit an OTEL counter increment for the given scanner metric.
/// ScanCycle and ScanBucketDrive are handled by dedicated emit functions with labels.
fn emit_otel_counter(metric: usize, count: u64) {
match Metric::from_index(metric) {
Some(Metric::ScanObject) => {
metrics::counter!(OTEL_SCANNER_OBJECTS_SCANNED).increment(count);
}
Some(Metric::ScanFolder) => {
metrics::counter!(OTEL_SCANNER_DIRECTORIES_SCANNED).increment(count);
}
_ => {}
}
}
/// Emit OTel metrics for a completed scan cycle.
/// Counter with result label + gauge for last successful cycle duration.
pub fn emit_scan_cycle_complete(success: bool, duration: Duration) {
let result = if success { "success" } else { "error" };
metrics::counter!(OTEL_SCANNER_CYCLES, "result" => result).increment(1);
if success {
metrics::gauge!(OTEL_SCANNER_CYCLE_DURATION_SECONDS).set(duration.as_secs_f64());
}
}
/// Emit OTel metrics for a completed bucket-drive scan.
/// Counter with result/bucket/disk labels + histogram for duration.
pub fn emit_scan_bucket_drive_complete(success: bool, bucket: &str, disk: &str, duration: Duration) {
let result = if success { "success" } else { "error" };
metrics::counter!(
OTEL_SCANNER_BUCKETS_SCANNED,
"result" => result,
"bucket" => bucket.to_owned(),
"disk" => disk.to_owned()
)
.increment(1);
metrics::histogram!(
OTEL_SCANNER_BUCKET_DRIVE_DURATION_SECONDS,
"bucket" => bucket.to_owned(),
"disk" => disk.to_owned()
)
.record(duration.as_secs_f64());
}
impl Metrics {
pub fn new() -> Self {
let operations = (0..Metric::Last as usize).map(|_| AtomicU64::new(0)).collect();
@@ -307,6 +401,7 @@ impl Metrics {
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
emit_otel_counter(metric, 1);
// Update latency for realtime metrics (spawn async task for this)
if (metric) < Metric::LastRealtime as usize {
@@ -332,6 +427,7 @@ impl Metrics {
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
emit_otel_counter(metric, 1);
// Update latency for realtime metrics with size (spawn async task)
if (metric) < Metric::LastRealtime as usize {
@@ -352,6 +448,7 @@ impl Metrics {
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
emit_otel_counter(metric, 1);
// Update latency for realtime metrics (spawn async task)
if (metric) < Metric::LastRealtime as usize {
@@ -373,6 +470,7 @@ impl Metrics {
// Update operation count
global_metrics().operations[metric].fetch_add(count as u64, Ordering::Relaxed);
emit_otel_counter(metric, count as u64);
// Update latency for realtime metrics (spawn async task)
if (metric) < Metric::LastRealtime as usize {
@@ -408,6 +506,7 @@ impl Metrics {
let metric = metric as usize;
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
emit_otel_counter(metric, 1);
// Update latency for realtime metrics
if (metric) < Metric::LastRealtime as usize {
@@ -494,10 +593,43 @@ impl Metrics {
for i in 0..Metric::LastRealtime as usize {
let last_min = self.latency[i].total().await;
if last_min.n > 0
&& let Some(_metric) = Metric::from_index(i)
&& let Some(metric) = Metric::from_index(i)
{
// Convert to madmin TimedAction format if needed
// This would require implementing the conversion
metrics.last_minute.actions.insert(
metric.as_str().to_string(),
TimedAction {
count: last_min.n,
acc_time: last_min.total,
bytes: last_min.size,
},
);
}
}
// Lifetime ILM operations
for i in 0..IlmAction::ActionCount as usize {
let count = self.actions[i].load(Ordering::Relaxed);
if count > 0
&& let Some(action) = IlmAction::from_index(i)
{
metrics.life_time_ilm.insert(action.as_str().to_string(), count);
}
}
// Last minute ILM latency
for i in 0..IlmAction::ActionCount as usize {
let last_min = self.actions_latency[i].total().await;
if last_min.n > 0
&& let Some(action) = IlmAction::from_index(i)
{
metrics.last_minute.ilm.insert(
action.as_str().to_string(),
TimedAction {
count: last_min.n,
acc_time: last_min.total,
bytes: last_min.size,
},
);
}
}
@@ -550,3 +682,31 @@ impl Default for Metrics {
Self::new()
}
}
pub struct CloseDiskGuard(CloseDiskFn);
impl CloseDiskGuard {
pub fn new(close_disk: CloseDiskFn) -> Self {
Self(close_disk)
}
pub async fn close(&self) {
self.0().await;
}
}
impl Drop for CloseDiskGuard {
fn drop(&mut self) {
// Drop cannot be async, so we spawn the async cleanup task
// The task will run in the background and complete asynchronously
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let close_fn = self.0.clone();
handle.spawn(async move {
close_fn().await;
});
} else {
// If we're not in a tokio runtime context, we can't spawn
// This is a best-effort cleanup, so we just skip it
}
}
}

View File

@@ -782,8 +782,9 @@ pub async fn transition_object(api: Arc<ECStore>, oi: &ObjectInfo, lae: LcAuditE
mod_time: oi.mod_time,
..Default::default()
};
time_ilm(1);
api.transition_object(&oi.bucket, &oi.name, &opts).await
let result = api.transition_object(&oi.bucket, &oi.name, &opts).await;
time_ilm(1)();
result
}
pub fn audit_tier_actions(_api: ECStore, _tier: &str, _bytes: i64) -> TimeFn {
@@ -1082,11 +1083,11 @@ pub async fn apply_expiry_on_transitioned_object(
lc_event: &lifecycle::Event,
src: &LcEventSrc,
) -> bool {
// let time_ilm = ScannerMetrics::time_ilm(lc_event.action.clone());
let time_ilm = Metrics::time_ilm(lc_event.action);
if let Err(_err) = expire_transitioned_object(api, oi, lc_event, src).await {
return false;
}
// let _ = time_ilm(1);
time_ilm(1)();
true
}
@@ -1114,7 +1115,7 @@ pub async fn apply_expiry_on_non_transitioned_objects(
opts.delete_prefix_object = true;
}
// let time_ilm = ScannerMetrics::time_ilm(lc_event.action.clone());
let time_ilm = Metrics::time_ilm(lc_event.action);
//debug!("lc_event.action: {:?}", lc_event.action);
//debug!("opts: {:?}", opts);
@@ -1152,11 +1153,11 @@ pub async fn apply_expiry_on_non_transitioned_objects(
});
if lc_event.action != lifecycle::IlmAction::NoneAction {
// let mut num_versions = 1_u64;
// if lc_event.action.delete_all() {
// num_versions = oi.num_versions as u64;
// }
// let _ = time_ilm(num_versions);
let mut num_versions = 1_u64;
if lc_event.action.delete_all() {
num_versions = oi.num_versions as u64;
}
time_ilm(num_versions)();
}
true

View File

@@ -23,7 +23,6 @@
pub mod data_usage_define;
pub mod error;
pub mod last_minute;
pub mod metrics;
pub mod scanner;
pub mod scanner_folder;
pub mod scanner_io;

View File

@@ -1,576 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::last_minute::{AccElem, LastMinuteLatency};
use chrono::{DateTime, Utc};
use rustfs_madmin::metrics::ScannerMetrics as M_ScannerMetrics;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
fmt::Display,
pin::Pin,
sync::{
Arc, OnceLock,
atomic::{AtomicU64, Ordering},
},
time::{Duration, SystemTime},
};
use tokio::sync::{Mutex, RwLock};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IlmAction {
NoneAction = 0,
DeleteAction,
DeleteVersionAction,
TransitionAction,
TransitionVersionAction,
DeleteRestoredAction,
DeleteRestoredVersionAction,
DeleteAllVersionsAction,
DelMarkerDeleteAllVersionsAction,
ActionCount,
}
impl IlmAction {
pub fn delete_restored(&self) -> bool {
*self == Self::DeleteRestoredAction || *self == Self::DeleteRestoredVersionAction
}
pub fn delete_versioned(&self) -> bool {
*self == Self::DeleteVersionAction || *self == Self::DeleteRestoredVersionAction
}
pub fn delete_all(&self) -> bool {
*self == Self::DeleteAllVersionsAction || *self == Self::DelMarkerDeleteAllVersionsAction
}
pub fn delete(&self) -> bool {
if self.delete_restored() {
return true;
}
*self == Self::DeleteVersionAction
|| *self == Self::DeleteAction
|| *self == Self::DeleteAllVersionsAction
|| *self == Self::DelMarkerDeleteAllVersionsAction
}
}
impl Display for IlmAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
pub static GLOBAL_METRICS: OnceLock<Arc<Metrics>> = OnceLock::new();
pub fn global_metrics() -> &'static Arc<Metrics> {
GLOBAL_METRICS.get_or_init(|| Arc::new(Metrics::new()))
}
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub enum Metric {
// START Realtime metrics, that only records
// last minute latencies and total operation count.
ReadMetadata = 0,
CheckMissing,
SaveUsage,
ApplyAll,
ApplyVersion,
TierObjSweep,
HealCheck,
Ilm,
CheckReplication,
Yield,
CleanAbandoned,
ApplyNonCurrent,
HealAbandonedVersion,
// START Trace metrics:
StartTrace,
ScanObject, // Scan object. All operations included.
HealAbandonedObject,
// END realtime metrics:
LastRealtime,
// Trace only metrics:
ScanFolder, // Scan a folder on disk, recursively.
ScanCycle, // Full cycle, cluster global.
ScanBucketDrive, // Single bucket on one drive.
CompactFolder, // Folder compacted.
// Must be last:
Last,
}
impl Metric {
/// Convert to string representation for metrics
pub fn as_str(self) -> &'static str {
match self {
Self::ReadMetadata => "read_metadata",
Self::CheckMissing => "check_missing",
Self::SaveUsage => "save_usage",
Self::ApplyAll => "apply_all",
Self::ApplyVersion => "apply_version",
Self::TierObjSweep => "tier_obj_sweep",
Self::HealCheck => "heal_check",
Self::Ilm => "ilm",
Self::CheckReplication => "check_replication",
Self::Yield => "yield",
Self::CleanAbandoned => "clean_abandoned",
Self::ApplyNonCurrent => "apply_non_current",
Self::HealAbandonedVersion => "heal_abandoned_version",
Self::StartTrace => "start_trace",
Self::ScanObject => "scan_object",
Self::HealAbandonedObject => "heal_abandoned_object",
Self::LastRealtime => "last_realtime",
Self::ScanFolder => "scan_folder",
Self::ScanCycle => "scan_cycle",
Self::ScanBucketDrive => "scan_bucket_drive",
Self::CompactFolder => "compact_folder",
Self::Last => "last",
}
}
/// Convert from index back to enum (safe version)
pub fn from_index(index: usize) -> Option<Self> {
if index >= Self::Last as usize {
return None;
}
// Safe conversion using match instead of unsafe transmute
match index {
0 => Some(Self::ReadMetadata),
1 => Some(Self::CheckMissing),
2 => Some(Self::SaveUsage),
3 => Some(Self::ApplyAll),
4 => Some(Self::ApplyVersion),
5 => Some(Self::TierObjSweep),
6 => Some(Self::HealCheck),
7 => Some(Self::Ilm),
8 => Some(Self::CheckReplication),
9 => Some(Self::Yield),
10 => Some(Self::CleanAbandoned),
11 => Some(Self::ApplyNonCurrent),
12 => Some(Self::HealAbandonedVersion),
13 => Some(Self::StartTrace),
14 => Some(Self::ScanObject),
15 => Some(Self::HealAbandonedObject),
16 => Some(Self::LastRealtime),
17 => Some(Self::ScanFolder),
18 => Some(Self::ScanCycle),
19 => Some(Self::ScanBucketDrive),
20 => Some(Self::CompactFolder),
21 => Some(Self::Last),
_ => None,
}
}
}
/// Thread-safe wrapper for LastMinuteLatency with atomic operations
#[derive(Default)]
pub struct LockedLastMinuteLatency {
latency: Arc<Mutex<LastMinuteLatency>>,
}
impl Clone for LockedLastMinuteLatency {
fn clone(&self) -> Self {
Self {
latency: Arc::clone(&self.latency),
}
}
}
impl LockedLastMinuteLatency {
pub fn new() -> Self {
Self {
latency: Arc::new(Mutex::new(LastMinuteLatency::default())),
}
}
/// Add a duration measurement
pub async fn add(&self, duration: Duration) {
self.add_size(duration, 0).await;
}
/// Add a duration measurement with size
pub async fn add_size(&self, duration: Duration, size: u64) {
let mut latency = self.latency.lock().await;
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let elem = AccElem {
n: 1,
total: duration.as_secs(),
size,
};
latency.add_all(now, &elem);
}
/// Get total accumulated metrics for the last minute
pub async fn total(&self) -> AccElem {
let mut latency = self.latency.lock().await;
latency.get_total()
}
}
/// Current path tracker for monitoring active scan paths
struct CurrentPathTracker {
current_path: Arc<RwLock<String>>,
}
impl CurrentPathTracker {
fn new(initial_path: String) -> Self {
Self {
current_path: Arc::new(RwLock::new(initial_path)),
}
}
async fn update_path(&self, path: String) {
*self.current_path.write().await = path;
}
async fn get_path(&self) -> String {
self.current_path.read().await.clone()
}
}
/// Main scanner metrics structure
pub struct Metrics {
// All fields must be accessed atomically and aligned.
operations: Vec<AtomicU64>,
latency: Vec<LockedLastMinuteLatency>,
actions: Vec<AtomicU64>,
actions_latency: Vec<LockedLastMinuteLatency>,
// Current paths contains disk -> tracker mappings
current_paths: Arc<RwLock<HashMap<String, Arc<CurrentPathTracker>>>>,
// Cycle information
cycle_info: Arc<RwLock<Option<CurrentCycle>>>,
}
// This is a placeholder. We'll need to define this struct.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct CurrentCycle {
pub current: u64,
pub next: u64,
pub cycle_completed: Vec<DateTime<Utc>>,
pub started: DateTime<Utc>,
}
impl CurrentCycle {
pub fn unmarshal(&mut self, buf: &[u8]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
*self = rmp_serde::from_slice(buf)?;
Ok(())
}
pub fn marshal(&self) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
Ok(rmp_serde::to_vec(self)?)
}
}
impl Metrics {
pub fn new() -> Self {
let operations = (0..Metric::Last as usize).map(|_| AtomicU64::new(0)).collect();
let latency = (0..Metric::LastRealtime as usize)
.map(|_| LockedLastMinuteLatency::new())
.collect();
Self {
operations,
latency,
actions: (0..IlmAction::ActionCount as usize).map(|_| AtomicU64::new(0)).collect(),
actions_latency: vec![LockedLastMinuteLatency::default(); IlmAction::ActionCount as usize],
current_paths: Arc::new(RwLock::new(HashMap::new())),
cycle_info: Arc::new(RwLock::new(None)),
}
}
/// Log scanner action with custom metadata - compatible with existing usage
pub fn log(metric: Metric) -> impl Fn(&HashMap<String, String>) {
let metric = metric as usize;
let start_time = SystemTime::now();
move |_custom: &HashMap<String, String>| {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task for this)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
global_metrics().latency[metric_index].add(duration).await;
});
}
// Log trace metrics
if metric as u8 > Metric::StartTrace as u8 {
//debug!(metric = metric.as_str(), duration_ms = duration.as_millis(), "Scanner trace metric");
}
}
}
/// Time scanner action with size - returns function that takes size
pub fn time_size(metric: Metric) -> impl Fn(u64) {
let metric = metric as usize;
let start_time = SystemTime::now();
move |size: u64| {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics with size (spawn async task)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
global_metrics().latency[metric_index].add_size(duration, size).await;
});
}
}
}
/// Time a scanner action - returns a closure to call when done
pub fn time(metric: Metric) -> impl Fn() {
let metric = metric as usize;
let start_time = SystemTime::now();
move || {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
global_metrics().latency[metric_index].add(duration).await;
});
}
}
}
/// Time N scanner actions - returns function that takes count, then returns completion function
pub fn time_n(metric: Metric) -> Box<dyn Fn(usize) -> Box<dyn Fn() + Send + Sync> + Send + Sync> {
let metric = metric as usize;
let start_time = SystemTime::now();
Box::new(move |count: usize| {
Box::new(move || {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
global_metrics().operations[metric].fetch_add(count as u64, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
global_metrics().latency[metric_index].add(duration).await;
});
}
})
})
}
/// Time ILM action with versions - returns function that takes versions, then returns completion function
pub fn time_ilm(a: IlmAction) -> Box<dyn Fn(u64) -> Box<dyn Fn() + Send + Sync> + Send + Sync> {
let a_clone = a as usize;
if a_clone == IlmAction::NoneAction as usize || a_clone >= IlmAction::ActionCount as usize {
return Box::new(move |_: u64| Box::new(move || {}));
}
let start = SystemTime::now();
Box::new(move |versions: u64| {
Box::new(move || {
let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0));
tokio::spawn(async move {
global_metrics().actions[a_clone].fetch_add(versions, Ordering::Relaxed);
global_metrics().actions_latency[a_clone].add(duration).await;
});
})
})
}
/// Increment time with specific duration
pub async fn inc_time(metric: Metric, duration: Duration) {
let metric = metric as usize;
// Update operation count
global_metrics().operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics
if (metric) < Metric::LastRealtime as usize {
global_metrics().latency[metric].add(duration).await;
}
}
/// Get lifetime operation count for a metric
pub fn lifetime(&self, metric: Metric) -> u64 {
let metric = metric as usize;
if (metric) >= Metric::Last as usize {
return 0;
}
self.operations[metric].load(Ordering::Relaxed)
}
/// Get last minute statistics for a metric
pub async fn last_minute(&self, metric: Metric) -> AccElem {
let metric = metric as usize;
if (metric) >= Metric::LastRealtime as usize {
return AccElem::default();
}
self.latency[metric].total().await
}
/// Set current cycle information
pub async fn set_cycle(&self, cycle: Option<CurrentCycle>) {
*self.cycle_info.write().await = cycle;
}
/// Get current cycle information
pub async fn get_cycle(&self) -> Option<CurrentCycle> {
self.cycle_info.read().await.clone()
}
/// Get current active paths
pub async fn get_current_paths(&self) -> Vec<String> {
let mut result = Vec::new();
let paths = self.current_paths.read().await;
for (disk, tracker) in paths.iter() {
let path = tracker.get_path().await;
result.push(format!("{disk}/{path}"));
}
result
}
/// Get number of active drives
pub async fn active_drives(&self) -> usize {
self.current_paths.read().await.len()
}
/// Generate metrics report
pub async fn report(&self) -> M_ScannerMetrics {
let mut metrics = M_ScannerMetrics::default();
// Set cycle information
if let Some(cycle) = self.get_cycle().await {
metrics.current_cycle = cycle.current;
metrics.cycles_completed_at = cycle.cycle_completed;
metrics.current_started = cycle.started;
}
metrics.collected_at = Utc::now();
metrics.active_paths = self.get_current_paths().await;
// Lifetime operations
for i in 0..Metric::Last as usize {
let count = self.operations[i].load(Ordering::Relaxed);
if count > 0
&& let Some(metric) = Metric::from_index(i)
{
metrics.life_time_ops.insert(metric.as_str().to_string(), count);
}
}
// Last minute statistics for realtime metrics
for i in 0..Metric::LastRealtime as usize {
let last_min = self.latency[i].total().await;
if last_min.n > 0
&& let Some(_metric) = Metric::from_index(i)
{
// Convert to madmin TimedAction format if needed
// This would require implementing the conversion
}
}
metrics
}
}
// Type aliases for compatibility with existing code
pub type UpdateCurrentPathFn = Arc<dyn Fn(&str) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
pub type CloseDiskFn = Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
/// Create a current path updater for tracking scan progress
pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) {
let tracker = Arc::new(CurrentPathTracker::new(initial.to_string()));
let disk_name = disk.to_string();
// Store the tracker in global metrics
let tracker_clone = Arc::clone(&tracker);
let disk_clone = disk_name.clone();
tokio::spawn(async move {
global_metrics().current_paths.write().await.insert(disk_clone, tracker_clone);
});
let update_fn = {
let tracker = Arc::clone(&tracker);
Arc::new(move |path: &str| -> Pin<Box<dyn Future<Output = ()> + Send>> {
let tracker = Arc::clone(&tracker);
let path = path.to_string();
Box::pin(async move {
tracker.update_path(path).await;
})
})
};
let done_fn = {
let disk_name = disk_name.clone();
Arc::new(move || -> Pin<Box<dyn Future<Output = ()> + Send>> {
let disk_name = disk_name.clone();
Box::pin(async move {
global_metrics().current_paths.write().await.remove(&disk_name);
})
})
};
(update_fn, done_fn)
}
impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}
pub struct CloseDiskGuard(CloseDiskFn);
impl CloseDiskGuard {
pub fn new(close_disk: CloseDiskFn) -> Self {
Self(close_disk)
}
pub async fn close(&self) {
self.0().await;
}
}
impl Drop for CloseDiskGuard {
fn drop(&mut self) {
// Drop cannot be async, so we spawn the async cleanup task
// The task will run in the background and complete asynchronously
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let close_fn = self.0.clone();
handle.spawn(async move {
close_fn().await;
});
} else {
// If we're not in a tokio runtime context, we can't spawn
// This is a best-effort cleanup, so we just skip it
}
}
}

View File

@@ -15,13 +15,12 @@
use std::sync::Arc;
use crate::data_usage_define::{BACKGROUND_HEAL_INFO_PATH, DATA_USAGE_BLOOM_NAME_PATH, DATA_USAGE_OBJ_NAME_PATH};
use crate::metrics::CurrentCycle;
use crate::metrics::global_metrics;
use crate::scanner_folder::data_usage_update_dir_cycles;
use crate::scanner_io::ScannerIO;
use crate::{DataUsageInfo, ScannerError};
use chrono::{DateTime, Utc};
use rustfs_common::heal_channel::HealScanMode;
use rustfs_common::metrics::{CurrentCycle, Metric, Metrics, emit_scan_cycle_complete, global_metrics};
use rustfs_config::{DEFAULT_DATA_SCANNER_START_DELAY_SECS, ENV_DATA_SCANNER_START_DELAY_SECS};
use rustfs_ecstore::StorageAPI as _;
use rustfs_ecstore::config::com::{read_config, save_config};
@@ -199,9 +198,14 @@ pub async fn run_data_scanner(ctx: CancellationToken, storeapi: Arc<ECStore>) ->
});
let done_cycle = Metrics::time(Metric::ScanCycle);
let cycle_start = std::time::Instant::now();
if let Err(e) = storeapi.clone().nsscanner(ctx.clone(), sender, cycle_info.current, scan_mode).await {
error!("Failed to scan namespace: {e}");
emit_scan_cycle_complete(false, cycle_start.elapsed());
} else {
done_cycle();
emit_scan_cycle_complete(true, cycle_start.elapsed());
info!("Namespace scanned successfully");
cycle_info.next +=1;

View File

@@ -20,10 +20,9 @@ use std::time::{Duration, SystemTime};
use crate::ReplTargetSizeSummary;
use crate::data_usage_define::{DataUsageCache, DataUsageEntry, DataUsageHash, DataUsageHashMap, SizeSummary, hash_path};
use crate::error::ScannerError;
use crate::metrics::{UpdateCurrentPathFn, current_path_updater};
use crate::scanner_io::ScannerIODisk as _;
use rustfs_common::heal_channel::{HEAL_DELETE_DANGLING, HealChannelRequest, HealOpts, HealScanMode, send_heal_request};
use rustfs_common::metrics::IlmAction;
use rustfs_common::metrics::{IlmAction, Metric, Metrics, UpdateCurrentPathFn, current_path_updater};
use rustfs_ecstore::StorageAPI;
use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc;
use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_rule;
@@ -202,11 +201,13 @@ impl ScannerItem {
let mut size = actual_size;
let done_ilm = Metrics::time_ilm(event.action);
match event.action {
IlmAction::DeleteAllVersionsAction | IlmAction::DelMarkerDeleteAllVersionsAction => {
remaining_versions = 0;
debug!("apply_actions: applying expiry rule for object: {} {}", oi.name, event.action);
apply_expiry_rule(event, &LcEventSrc::Scanner, oi).await;
done_ilm(1)();
break 'eventLoop;
}
@@ -218,6 +219,7 @@ impl ScannerItem {
debug!("apply_actions: applying expiry rule for object: {} {}", oi.name, event.action);
apply_expiry_rule(event, &LcEventSrc::Scanner, oi).await;
done_ilm(1)();
}
IlmAction::DeleteVersionAction => {
remaining_versions -= 1;
@@ -230,10 +232,12 @@ impl ScannerItem {
});
}
noncurrent_events.push(event.clone());
done_ilm(1)();
}
IlmAction::TransitionAction | IlmAction::TransitionVersionAction => {
debug!("apply_actions: applying transition rule for object: {} {}", oi.name, event.action);
apply_transition_rule(event, &LcEventSrc::Scanner, oi).await;
done_ilm(1)();
}
IlmAction::NoneAction | IlmAction::ActionCount => {
@@ -324,6 +328,7 @@ impl ScannerItem {
}
async fn apply_heal<S: StorageAPI>(&mut self, store: Arc<S>, oi: &ObjectInfo) -> i64 {
let done_heal = Metrics::time(Metric::HealAbandonedObject);
debug!(
"apply_heal: bucket: {}, object_path: {}, version_id: {}",
self.bucket,
@@ -337,7 +342,7 @@ impl ScannerItem {
HealScanMode::Normal
};
match store
let result = match store
.clone()
.heal_object(
self.bucket.as_str(),
@@ -364,7 +369,9 @@ impl ScannerItem {
warn!("apply_heal: failed to heal object: {}", e);
0
}
}
};
done_heal();
result
}
fn alert_excessive_versions(&self, _object_infos_length: usize, _cumulative_size: i64) {
@@ -465,6 +472,8 @@ impl FolderScanner {
folder: CachedFolder,
into: &mut DataUsageEntry,
) -> Result<(), ScannerError> {
let done_folder = Metrics::time(Metric::ScanFolder);
if ctx.is_cancelled() {
return Err(ScannerError::Other("Operation cancelled".to_string()));
}
@@ -1031,11 +1040,13 @@ impl FolderScanner {
// Compact if too many children...
if !into.compacted {
let done_compact = Metrics::time(Metric::CompactFolder);
self.new_cache.reduce_children_of(
&this_hash,
DATA_SCANNER_COMPACT_AT_CHILDREN,
self.new_cache.info.name != folder.name,
);
done_compact();
}
if self.update_cache.cache.contains_key(&this_hash.key()) && !was_compacted {
@@ -1046,6 +1057,8 @@ impl FolderScanner {
}
}
done_folder();
Ok(())
}

View File

@@ -20,6 +20,7 @@ use crate::{
use futures::future::join_all;
use rand::seq::SliceRandom as _;
use rustfs_common::heal_channel::HealScanMode;
use rustfs_common::metrics::{Metric, Metrics, emit_scan_bucket_drive_complete};
use rustfs_ecstore::bucket::bucket_target_sys::BucketTargetSys;
use rustfs_ecstore::bucket::lifecycle::lifecycle::Lifecycle;
use rustfs_ecstore::bucket::metadata_sys::{get_lifecycle_config, get_object_lock_config, get_replication_config};
@@ -483,6 +484,8 @@ impl ScannerIOCache for SetDisks {
#[async_trait::async_trait]
impl ScannerIODisk for Disk {
async fn get_size(&self, mut item: ScannerItem) -> Result<SizeSummary> {
let done_object = Metrics::time(Metric::ScanObject);
if !item.path.ends_with(&format!("{SLASH_SEPARATOR}{STORAGE_FORMAT_FILE}")) {
return Err(StorageError::other("skip file".to_string()));
}
@@ -554,6 +557,8 @@ impl ScannerIODisk for Disk {
item.apply_actions(ecstore, object_infos, lock_config, &mut size_summary)
.await;
done_object();
// TODO: enqueueFreeVersion
Ok(size_summary)
@@ -565,6 +570,10 @@ impl ScannerIODisk for Disk {
updates: Option<mpsc::Sender<DataUsageEntry>>,
scan_mode: HealScanMode,
) -> Result<DataUsageCache> {
let done_drive = Metrics::time(Metric::ScanBucketDrive);
let drive_start = std::time::Instant::now();
let bucket = cache.info.name.clone();
let disk_path = self.path().to_string_lossy().to_string();
let _guard = self.start_scan();
let mut cache = cache;
@@ -631,10 +640,15 @@ impl ScannerIODisk for Disk {
match result {
Ok(mut data_usage_info) => {
done_drive();
emit_scan_bucket_drive_complete(true, &bucket, &disk_path, drive_start.elapsed());
data_usage_info.info.last_update = Some(SystemTime::now());
Ok(data_usage_info)
}
Err(e) => Err(StorageError::other(format!("Failed to scan data folder: {e}"))),
Err(e) => {
emit_scan_bucket_drive_complete(false, &bucket, &disk_path, drive_start.elapsed());
Err(StorageError::other(format!("Failed to scan data folder: {e}")))
}
}
}
}