feat: integrate global metrics system into AHM scanner

- Add global metrics system to common crate for cross-module usage
- Integrate global metrics collection into AHM scanner operations
- Update ECStore to use common metrics system instead of local implementation
- Add chrono dependency to AHM crate for timestamp handling
- Re-export IlmAction from common metrics in ECStore lifecycle module
- Update scanner methods to use global metrics for cycle, disk, and volume scans
- Maintain backward compatibility with local metrics collector
- Fix clippy warnings and ensure proper code formatting

This change enables unified metrics collection across the entire RustFS system,
allowing better monitoring and observability of scanner operations.

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2025-07-21 11:32:43 +08:00
parent 7d3b2b774c
commit e7d0a8d4b9
14 changed files with 677 additions and 144 deletions

3
Cargo.lock generated
View File

@@ -7930,6 +7930,7 @@ dependencies = [
"anyhow",
"async-trait",
"bytes",
"chrono",
"futures",
"lazy_static",
"rmp-serde",
@@ -7965,6 +7966,8 @@ dependencies = [
name = "rustfs-common"
version = "0.0.5"
dependencies = [
"chrono",
"rustfs-madmin",
"tokio",
"tonic",
"uuid",

View File

@@ -34,6 +34,7 @@ url = { workspace = true }
rustfs-lock = { workspace = true }
lazy_static = { workspace = true }
chrono = { workspace = true }
[dev-dependencies]
rmp-serde = { workspace = true }

View File

@@ -37,6 +37,7 @@ use crate::{
error::{Error, Result},
get_ahm_services_cancel_token, HealRequest,
};
use rustfs_common::metrics::{globalMetrics, Metric, Metrics};
use rustfs_ecstore::disk::RUSTFS_META_BUCKET;
@@ -117,7 +118,7 @@ pub struct Scanner {
config: Arc<RwLock<ScannerConfig>>,
/// Scanner state
state: Arc<RwLock<ScannerState>>,
/// Metrics collector
/// Local metrics collector (for backward compatibility)
metrics: Arc<MetricsCollector>,
/// Bucket metrics cache
bucket_metrics: Arc<Mutex<HashMap<String, BucketMetrics>>>,
@@ -286,10 +287,18 @@ impl Scanner {
metrics
}
/// Get global metrics from common crate
pub async fn get_global_metrics(&self) -> rustfs_madmin::metrics::ScannerMetrics {
globalMetrics.report().await
}
/// Perform a single scan cycle
pub async fn scan_cycle(&self) -> Result<()> {
let start_time = SystemTime::now();
// Start global metrics collection for this cycle
let stop_fn = Metrics::time(Metric::ScanCycle);
info!("Starting scan cycle {} for all EC sets", self.metrics.get_metrics().current_cycle + 1);
// Update state
@@ -301,6 +310,14 @@ impl Scanner {
state.scanning_disks.clear();
}
// Update global metrics cycle information
let cycle_info = rustfs_common::metrics::CurrentCycle {
current: self.state.read().await.current_cycle,
cycle_completed: vec![chrono::Utc::now()],
started: chrono::Utc::now(),
};
globalMetrics.set_cycle(Some(cycle_info)).await;
self.metrics.set_current_cycle(self.state.read().await.current_cycle);
self.metrics.increment_total_cycles();
@@ -392,6 +409,9 @@ impl Scanner {
state.current_scan_duration = Some(scan_duration);
}
// Complete global metrics collection for this cycle
stop_fn();
info!(
"Completed scan cycle in {:?} ({} successful, {} failed)",
scan_duration, successful_scans, failed_scans
@@ -475,6 +495,9 @@ impl Scanner {
async fn scan_disk(&self, disk: &DiskStore) -> Result<HashMap<String, HashMap<String, rustfs_filemeta::FileMeta>>> {
let disk_path = disk.path().to_string_lossy().to_string();
// Start global metrics collection for disk scan
let stop_fn = Metrics::time(Metric::ScanBucketDrive);
info!("Scanning disk: {}", disk_path);
// Update disk metrics
@@ -638,6 +661,9 @@ impl Scanner {
state.scanning_disks.retain(|d| d != &disk_path);
}
// Complete global metrics collection for disk scan
stop_fn();
Ok(disk_objects)
}
@@ -646,6 +672,9 @@ impl Scanner {
/// This method collects all objects from a disk for a specific bucket.
/// It returns a map of object names to their metadata for later analysis.
async fn scan_volume(&self, disk: &DiskStore, bucket: &str) -> Result<HashMap<String, rustfs_filemeta::FileMeta>> {
// Start global metrics collection for volume scan
let stop_fn = Metrics::time(Metric::ScanObject);
info!("Scanning bucket: {} on disk: {}", bucket, disk.to_string());
// Initialize bucket metrics if not exists
@@ -785,6 +814,9 @@ impl Scanner {
state.scanning_buckets.retain(|b| b != bucket);
}
// Complete global metrics collection for volume scan
stop_fn();
debug!(
"Completed scanning bucket: {} on disk {} ({} objects, {} issues)",
bucket,

View File

@@ -318,13 +318,13 @@ async fn test_heal_format_with_data() {
let obj_dir = disk_paths[0].join(bucket_name).join(object_name);
let target_part = WalkDir::new(&obj_dir)
.min_depth(2)
.max_depth(2)
.into_iter()
.filter_map(Result::ok)
.find(|e| e.file_type().is_file() && e.file_name().to_str().map(|n| n.starts_with("part.")).unwrap_or(false))
.map(|e| e.into_path())
.expect("Failed to locate part file to delete");
.min_depth(2)
.max_depth(2)
.into_iter()
.filter_map(Result::ok)
.find(|e| e.file_type().is_file() && e.file_name().to_str().map(|n| n.starts_with("part.")).unwrap_or(false))
.map(|e| e.into_path())
.expect("Failed to locate part file to delete");
// ─── 1⃣ delete format.json on one disk ──────────────
let format_path = disk_paths[0].join(".rustfs.sys").join("format.json");

View File

@@ -31,3 +31,5 @@ workspace = true
tokio.workspace = true
tonic = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
rustfs-madmin = { workspace = true }

View File

@@ -17,6 +17,7 @@ pub mod bucket_stats;
pub mod globals;
pub mod heal_channel;
pub mod last_minute;
pub mod metrics;
// is ','
pub static DEFAULT_DELIMITER: u8 = 44;

View File

@@ -0,0 +1,535 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use chrono::{DateTime, Utc};
use lazy_static::lazy_static;
use rustfs_madmin::metrics::ScannerMetrics as M_ScannerMetrics;
use std::{
collections::HashMap,
fmt::Display,
pin::Pin,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::{Duration, SystemTime},
};
use tokio::sync::{Mutex, RwLock};
use crate::last_minute::{AccElem, LastMinuteLatency};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IlmAction {
NoneAction = 0,
DeleteAction,
DeleteVersionAction,
TransitionAction,
TransitionVersionAction,
DeleteRestoredAction,
DeleteRestoredVersionAction,
DeleteAllVersionsAction,
DelMarkerDeleteAllVersionsAction,
ActionCount,
}
impl IlmAction {
pub fn delete_restored(&self) -> bool {
*self == Self::DeleteRestoredAction || *self == Self::DeleteRestoredVersionAction
}
pub fn delete_versioned(&self) -> bool {
*self == Self::DeleteVersionAction || *self == Self::DeleteRestoredVersionAction
}
pub fn delete_all(&self) -> bool {
*self == Self::DeleteAllVersionsAction || *self == Self::DelMarkerDeleteAllVersionsAction
}
pub fn delete(&self) -> bool {
if self.delete_restored() {
return true;
}
*self == Self::DeleteVersionAction
|| *self == Self::DeleteAction
|| *self == Self::DeleteAllVersionsAction
|| *self == Self::DelMarkerDeleteAllVersionsAction
}
}
impl Display for IlmAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
lazy_static! {
pub static ref globalMetrics: Arc<Metrics> = Arc::new(Metrics::new());
}
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub enum Metric {
// START Realtime metrics, that only records
// last minute latencies and total operation count.
ReadMetadata = 0,
CheckMissing,
SaveUsage,
ApplyAll,
ApplyVersion,
TierObjSweep,
HealCheck,
Ilm,
CheckReplication,
Yield,
CleanAbandoned,
ApplyNonCurrent,
HealAbandonedVersion,
// START Trace metrics:
StartTrace,
ScanObject, // Scan object. All operations included.
HealAbandonedObject,
// END realtime metrics:
LastRealtime,
// Trace only metrics:
ScanFolder, // Scan a folder on disk, recursively.
ScanCycle, // Full cycle, cluster global.
ScanBucketDrive, // Single bucket on one drive.
CompactFolder, // Folder compacted.
// Must be last:
Last,
}
impl Metric {
/// Convert to string representation for metrics
pub fn as_str(self) -> &'static str {
match self {
Self::ReadMetadata => "read_metadata",
Self::CheckMissing => "check_missing",
Self::SaveUsage => "save_usage",
Self::ApplyAll => "apply_all",
Self::ApplyVersion => "apply_version",
Self::TierObjSweep => "tier_obj_sweep",
Self::HealCheck => "heal_check",
Self::Ilm => "ilm",
Self::CheckReplication => "check_replication",
Self::Yield => "yield",
Self::CleanAbandoned => "clean_abandoned",
Self::ApplyNonCurrent => "apply_non_current",
Self::HealAbandonedVersion => "heal_abandoned_version",
Self::StartTrace => "start_trace",
Self::ScanObject => "scan_object",
Self::HealAbandonedObject => "heal_abandoned_object",
Self::LastRealtime => "last_realtime",
Self::ScanFolder => "scan_folder",
Self::ScanCycle => "scan_cycle",
Self::ScanBucketDrive => "scan_bucket_drive",
Self::CompactFolder => "compact_folder",
Self::Last => "last",
}
}
/// Convert from index back to enum (safe version)
pub fn from_index(index: usize) -> Option<Self> {
if index >= Self::Last as usize {
return None;
}
// Safe conversion using match instead of unsafe transmute
match index {
0 => Some(Self::ReadMetadata),
1 => Some(Self::CheckMissing),
2 => Some(Self::SaveUsage),
3 => Some(Self::ApplyAll),
4 => Some(Self::ApplyVersion),
5 => Some(Self::TierObjSweep),
6 => Some(Self::HealCheck),
7 => Some(Self::Ilm),
8 => Some(Self::CheckReplication),
9 => Some(Self::Yield),
10 => Some(Self::CleanAbandoned),
11 => Some(Self::ApplyNonCurrent),
12 => Some(Self::HealAbandonedVersion),
13 => Some(Self::StartTrace),
14 => Some(Self::ScanObject),
15 => Some(Self::HealAbandonedObject),
16 => Some(Self::LastRealtime),
17 => Some(Self::ScanFolder),
18 => Some(Self::ScanCycle),
19 => Some(Self::ScanBucketDrive),
20 => Some(Self::CompactFolder),
21 => Some(Self::Last),
_ => None,
}
}
}
/// Thread-safe wrapper for LastMinuteLatency with atomic operations
#[derive(Default)]
pub struct LockedLastMinuteLatency {
latency: Arc<Mutex<LastMinuteLatency>>,
}
impl Clone for LockedLastMinuteLatency {
fn clone(&self) -> Self {
Self {
latency: Arc::clone(&self.latency),
}
}
}
impl LockedLastMinuteLatency {
pub fn new() -> Self {
Self {
latency: Arc::new(Mutex::new(LastMinuteLatency::default())),
}
}
/// Add a duration measurement
pub async fn add(&self, duration: Duration) {
self.add_size(duration, 0).await;
}
/// Add a duration measurement with size
pub async fn add_size(&self, duration: Duration, size: u64) {
let mut latency = self.latency.lock().await;
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let elem = AccElem {
n: 1,
total: duration.as_secs(),
size,
};
latency.add_all(now, &elem);
}
/// Get total accumulated metrics for the last minute
pub async fn total(&self) -> AccElem {
let mut latency = self.latency.lock().await;
latency.get_total()
}
}
/// Current path tracker for monitoring active scan paths
struct CurrentPathTracker {
current_path: Arc<RwLock<String>>,
}
impl CurrentPathTracker {
fn new(initial_path: String) -> Self {
Self {
current_path: Arc::new(RwLock::new(initial_path)),
}
}
async fn update_path(&self, path: String) {
*self.current_path.write().await = path;
}
async fn get_path(&self) -> String {
self.current_path.read().await.clone()
}
}
/// Main scanner metrics structure
pub struct Metrics {
// All fields must be accessed atomically and aligned.
operations: Vec<AtomicU64>,
latency: Vec<LockedLastMinuteLatency>,
actions: Vec<AtomicU64>,
actions_latency: Vec<LockedLastMinuteLatency>,
// Current paths contains disk -> tracker mappings
current_paths: Arc<RwLock<HashMap<String, Arc<CurrentPathTracker>>>>,
// Cycle information
cycle_info: Arc<RwLock<Option<CurrentCycle>>>,
}
// This is a placeholder. We'll need to define this struct.
#[derive(Clone, Debug)]
pub struct CurrentCycle {
pub current: u64,
pub cycle_completed: Vec<DateTime<Utc>>,
pub started: DateTime<Utc>,
}
impl Metrics {
pub fn new() -> Self {
let operations = (0..Metric::Last as usize).map(|_| AtomicU64::new(0)).collect();
let latency = (0..Metric::LastRealtime as usize)
.map(|_| LockedLastMinuteLatency::new())
.collect();
Self {
operations,
latency,
actions: (0..IlmAction::ActionCount as usize).map(|_| AtomicU64::new(0)).collect(),
actions_latency: vec![LockedLastMinuteLatency::default(); IlmAction::ActionCount as usize],
current_paths: Arc::new(RwLock::new(HashMap::new())),
cycle_info: Arc::new(RwLock::new(None)),
}
}
/// Log scanner action with custom metadata - compatible with existing usage
pub fn log(metric: Metric) -> impl Fn(&HashMap<String, String>) {
let metric = metric as usize;
let start_time = SystemTime::now();
move |_custom: &HashMap<String, String>| {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task for this)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
globalMetrics.latency[metric_index].add(duration).await;
});
}
// Log trace metrics
if metric as u8 > Metric::StartTrace as u8 {
//debug!(metric = metric.as_str(), duration_ms = duration.as_millis(), "Scanner trace metric");
}
}
}
/// Time scanner action with size - returns function that takes size
pub fn time_size(metric: Metric) -> impl Fn(u64) {
let metric = metric as usize;
let start_time = SystemTime::now();
move |size: u64| {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics with size (spawn async task)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
globalMetrics.latency[metric_index].add_size(duration, size).await;
});
}
}
}
/// Time a scanner action - returns a closure to call when done
pub fn time(metric: Metric) -> impl Fn() {
let metric = metric as usize;
let start_time = SystemTime::now();
move || {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
globalMetrics.latency[metric_index].add(duration).await;
});
}
}
}
/// Time N scanner actions - returns function that takes count, then returns completion function
pub fn time_n(metric: Metric) -> Box<dyn Fn(usize) -> Box<dyn Fn() + Send + Sync> + Send + Sync> {
let metric = metric as usize;
let start_time = SystemTime::now();
Box::new(move |count: usize| {
Box::new(move || {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
globalMetrics.operations[metric].fetch_add(count as u64, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task)
if (metric) < Metric::LastRealtime as usize {
let metric_index = metric;
tokio::spawn(async move {
globalMetrics.latency[metric_index].add(duration).await;
});
}
})
})
}
/// Time ILM action with versions - returns function that takes versions, then returns completion function
pub fn time_ilm(a: IlmAction) -> Box<dyn Fn(u64) -> Box<dyn Fn() + Send + Sync> + Send + Sync> {
let a_clone = a as usize;
if a_clone == IlmAction::NoneAction as usize || a_clone >= IlmAction::ActionCount as usize {
return Box::new(move |_: u64| Box::new(move || {}));
}
let start = SystemTime::now();
Box::new(move |versions: u64| {
Box::new(move || {
let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0));
tokio::spawn(async move {
globalMetrics.actions[a_clone].fetch_add(versions, Ordering::Relaxed);
globalMetrics.actions_latency[a_clone].add(duration).await;
});
})
})
}
/// Increment time with specific duration
pub async fn inc_time(metric: Metric, duration: Duration) {
let metric = metric as usize;
// Update operation count
globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics
if (metric) < Metric::LastRealtime as usize {
globalMetrics.latency[metric].add(duration).await;
}
}
/// Get lifetime operation count for a metric
pub fn lifetime(&self, metric: Metric) -> u64 {
let metric = metric as usize;
if (metric) >= Metric::Last as usize {
return 0;
}
self.operations[metric].load(Ordering::Relaxed)
}
/// Get last minute statistics for a metric
pub async fn last_minute(&self, metric: Metric) -> AccElem {
let metric = metric as usize;
if (metric) >= Metric::LastRealtime as usize {
return AccElem::default();
}
self.latency[metric].total().await
}
/// Set current cycle information
pub async fn set_cycle(&self, cycle: Option<CurrentCycle>) {
*self.cycle_info.write().await = cycle;
}
/// Get current cycle information
pub async fn get_cycle(&self) -> Option<CurrentCycle> {
self.cycle_info.read().await.clone()
}
/// Get current active paths
pub async fn get_current_paths(&self) -> Vec<String> {
let mut result = Vec::new();
let paths = self.current_paths.read().await;
for (disk, tracker) in paths.iter() {
let path = tracker.get_path().await;
result.push(format!("{disk}/{path}"));
}
result
}
/// Get number of active drives
pub async fn active_drives(&self) -> usize {
self.current_paths.read().await.len()
}
/// Generate metrics report
pub async fn report(&self) -> M_ScannerMetrics {
let mut metrics = M_ScannerMetrics::default();
// Set cycle information
if let Some(cycle) = self.get_cycle().await {
metrics.current_cycle = cycle.current;
metrics.cycles_completed_at = cycle.cycle_completed;
metrics.current_started = cycle.started;
}
metrics.collected_at = Utc::now();
metrics.active_paths = self.get_current_paths().await;
// Lifetime operations
for i in 0..Metric::Last as usize {
let count = self.operations[i].load(Ordering::Relaxed);
if count > 0 {
if let Some(metric) = Metric::from_index(i) {
metrics.life_time_ops.insert(metric.as_str().to_string(), count);
}
}
}
// Last minute statistics for realtime metrics
for i in 0..Metric::LastRealtime as usize {
let last_min = self.latency[i].total().await;
if last_min.n > 0 {
if let Some(_metric) = Metric::from_index(i) {
// Convert to madmin TimedAction format if needed
// This would require implementing the conversion
}
}
}
metrics
}
}
// Type aliases for compatibility with existing code
pub type UpdateCurrentPathFn = Arc<dyn Fn(&str) -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
pub type CloseDiskFn = Arc<dyn Fn() -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
/// Create a current path updater for tracking scan progress
pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) {
let tracker = Arc::new(CurrentPathTracker::new(initial.to_string()));
let disk_name = disk.to_string();
// Store the tracker in global metrics
let tracker_clone = Arc::clone(&tracker);
let disk_clone = disk_name.clone();
tokio::spawn(async move {
globalMetrics.current_paths.write().await.insert(disk_clone, tracker_clone);
});
let update_fn = {
let tracker = Arc::clone(&tracker);
Arc::new(move |path: &str| -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
let tracker = Arc::clone(&tracker);
let path = path.to_string();
Box::pin(async move {
tracker.update_path(path).await;
})
})
};
let done_fn = {
let disk_name = disk_name.clone();
Arc::new(move || -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
let disk_name = disk_name.clone();
Box::pin(async move {
globalMetrics.current_paths.write().await.remove(&disk_name);
})
})
};
(update_fn, done_fn)
}
impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}

View File

@@ -22,6 +22,7 @@ use async_channel::{Receiver as A_Receiver, Sender as A_Sender, bounded};
use futures::Future;
use http::HeaderMap;
use lazy_static::lazy_static;
use rustfs_common::metrics::{IlmAction, Metrics};
use s3s::Body;
use sha2::{Digest, Sha256};
use std::any::Any;
@@ -41,7 +42,7 @@ use xxhash_rust::xxh64;
//use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger};
//use rustfs_notify::{initialize, notification_system};
use super::bucket_lifecycle_audit::{LcAuditEvent, LcEventSrc};
use super::lifecycle::{self, ExpirationOptions, IlmAction, Lifecycle, TransitionOptions};
use super::lifecycle::{self, ExpirationOptions, Lifecycle, TransitionOptions};
use super::tier_last_day_stats::{DailyAllTierStats, LastDayTierStats};
use super::tier_sweeper::{Jentry, delete_object_from_remote_tier};
use crate::bucket::{metadata_sys::get_lifecycle_config, versioning_sys::BucketVersioningSys};
@@ -54,7 +55,6 @@ use crate::global::GLOBAL_LocalNodeName;
use crate::global::{GLOBAL_LifecycleSys, GLOBAL_TierConfigMgr, get_global_deployment_id};
use crate::heal::{
data_scanner::{apply_expiry_on_non_transitioned_objects, apply_expiry_on_transitioned_object},
data_scanner_metric::ScannerMetrics,
data_usage_cache::TierStats,
};
use crate::store::ECStore;
@@ -631,7 +631,7 @@ pub async fn enqueue_transition_immediate(oi: &ObjectInfo, src: LcEventSrc) {
if !lc.is_none() {
let event = lc.expect("err").eval(&oi.to_lifecycle_opts()).await;
match event.action {
lifecycle::IlmAction::TransitionAction | lifecycle::IlmAction::TransitionVersionAction => {
IlmAction::TransitionAction | IlmAction::TransitionVersionAction => {
if oi.delete_marker || oi.is_dir {
return;
}
@@ -728,7 +728,7 @@ pub fn gen_transition_objname(bucket: &str) -> Result<String, Error> {
}
pub async fn transition_object(api: Arc<ECStore>, oi: &ObjectInfo, lae: LcAuditEvent) -> Result<(), Error> {
let time_ilm = ScannerMetrics::time_ilm(lae.event.action);
let time_ilm = Metrics::time_ilm(lae.event.action);
let opts = ObjectOptions {
transition: TransitionOptions {

View File

@@ -43,49 +43,7 @@ const _ERR_XML_NOT_WELL_FORMED: &str =
const ERR_LIFECYCLE_BUCKET_LOCKED: &str =
"ExpiredObjectAllVersions element and DelMarkerExpiration action cannot be used on an retention bucket";
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IlmAction {
NoneAction = 0,
DeleteAction,
DeleteVersionAction,
TransitionAction,
TransitionVersionAction,
DeleteRestoredAction,
DeleteRestoredVersionAction,
DeleteAllVersionsAction,
DelMarkerDeleteAllVersionsAction,
ActionCount,
}
impl IlmAction {
pub fn delete_restored(&self) -> bool {
*self == Self::DeleteRestoredAction || *self == Self::DeleteRestoredVersionAction
}
pub fn delete_versioned(&self) -> bool {
*self == Self::DeleteVersionAction || *self == Self::DeleteRestoredVersionAction
}
pub fn delete_all(&self) -> bool {
*self == Self::DeleteAllVersionsAction || *self == Self::DelMarkerDeleteAllVersionsAction
}
pub fn delete(&self) -> bool {
if self.delete_restored() {
return true;
}
*self == Self::DeleteVersionAction
|| *self == Self::DeleteAction
|| *self == Self::DeleteAllVersionsAction
|| *self == Self::DelMarkerDeleteAllVersionsAction
}
}
impl Display for IlmAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
pub use rustfs_common::metrics::IlmAction;
#[async_trait::async_trait]
pub trait RuleValidate {

View File

@@ -39,13 +39,13 @@ use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold};
use crate::heal::data_scanner::{
ScannerItem, ShouldSleepFn, SizeSummary, lc_has_active_rules, rep_has_active_rules, scan_data_folder,
};
use crate::heal::data_scanner_metric::{ScannerMetric, ScannerMetrics};
use crate::heal::data_usage_cache::{DataUsageCache, DataUsageEntry};
use crate::heal::error::{ERR_IGNORE_FILE_CONTRIB, ERR_SKIP_FILE};
use crate::heal::heal_commands::{HealScanMode, HealingTracker};
use crate::heal::heal_ops::HEALING_TRACKER_FILENAME;
use crate::new_object_layer_fn;
use crate::store_api::{ObjectInfo, StorageAPI};
use rustfs_common::metrics::{Metric, Metrics};
use rustfs_utils::path::{
GLOBAL_DIR_SUFFIX, GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR, clean, decode_dir_object, encode_dir_object, has_suffix,
path_join, path_join_buf,
@@ -2323,9 +2323,9 @@ impl DiskAPI for LocalDisk {
if !item.path.ends_with(&format!("{SLASH_SEPARATOR}{STORAGE_FORMAT_FILE}")) {
return Err(Error::other(ERR_SKIP_FILE).into());
}
let stop_fn = ScannerMetrics::log(ScannerMetric::ScanObject);
let stop_fn = Metrics::log(Metric::ScanObject);
let mut res = HashMap::new();
let done_sz = ScannerMetrics::time_size(ScannerMetric::ReadMetadata);
let done_sz = Metrics::time_size(Metric::ReadMetadata);
let buf = match disk.read_metadata(item.path.clone()).await {
Ok(buf) => buf,
Err(err) => {
@@ -2351,7 +2351,7 @@ impl DiskAPI for LocalDisk {
}
};
let mut size_s = SizeSummary::default();
let done = ScannerMetrics::time(ScannerMetric::ApplyAll);
let done = Metrics::time(Metric::ApplyAll);
let obj_infos = match item.apply_versions_actions(&fivs.versions).await {
Ok(obj_infos) => obj_infos,
Err(err) => {
@@ -2369,7 +2369,7 @@ impl DiskAPI for LocalDisk {
let mut obj_deleted = false;
for info in obj_infos.iter() {
let done = ScannerMetrics::time(ScannerMetric::ApplyVersion);
let done = Metrics::time(Metric::ApplyVersion);
let sz: i64;
(obj_deleted, sz) = item.apply_actions(info, &mut size_s).await;
done();
@@ -2405,7 +2405,7 @@ impl DiskAPI for LocalDisk {
&item.object_path().to_string_lossy(),
versioned,
);
let done = ScannerMetrics::time(ScannerMetric::TierObjSweep);
let done = Metrics::time(Metric::TierObjSweep);
done();
}

View File

@@ -29,7 +29,6 @@ use std::{
use time::{self, OffsetDateTime};
use super::{
data_scanner_metric::{ScannerMetric, ScannerMetrics, globalScannerMetrics},
data_usage::{DATA_USAGE_BLOOM_NAME_PATH, DataUsageInfo, store_data_usage_in_backend},
data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash},
heal_commands::{HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN, HealScanMode},
@@ -39,6 +38,7 @@ use crate::bucket::{
utils::is_meta_bucketname,
};
use crate::cmd::bucket_replication::queue_replication_heal;
use crate::disk::local::LocalDisk;
use crate::event::name::EventName;
use crate::{
bucket::{
@@ -57,7 +57,7 @@ use crate::{
bucket::{versioning::VersioningApi, versioning_sys::BucketVersioningSys},
cmd::bucket_replication::ReplicationStatusType,
disk,
heal::data_usage::DATA_USAGE_ROOT,
// heal::data_usage::DATA_USAGE_ROOT,
};
use crate::{
cache_value::metacache_set::{ListPathRawOptions, list_path_raw},
@@ -66,7 +66,7 @@ use crate::{
heal::Config,
},
disk::{DiskInfoOptions, DiskStore},
global::{GLOBAL_BackgroundHealState, GLOBAL_IsErasure, GLOBAL_IsErasureSD},
global::{GLOBAL_BackgroundHealState, GLOBAL_IsErasureSD},
heal::{
data_usage::BACKGROUND_HEAL_INFO_PATH,
data_usage_cache::{DataUsageHashMap, hash_path},
@@ -83,7 +83,6 @@ use crate::{
disk::error::DiskError,
error::{Error, Result},
};
use crate::{disk::local::LocalDisk, heal::data_scanner_metric::current_path_updater};
use chrono::{DateTime, Utc};
use lazy_static::lazy_static;
use rand::Rng;
@@ -300,14 +299,14 @@ async fn run_data_scanner_cycle() {
};
// Start metrics collection for this cycle
let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle);
// let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle);
// Update cycle information
cycle_info.current = cycle_info.next;
cycle_info.started = Utc::now();
// Update global scanner metrics
globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await;
// globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await;
// Read background healing information and determine scan mode
let bg_heal_info = read_background_heal_info(store.clone()).await;
@@ -357,7 +356,7 @@ async fn run_data_scanner_cycle() {
}
// Update global metrics with completion info
globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await;
// globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await;
// Persist updated cycle information
// ignore error, continue.
@@ -379,7 +378,7 @@ async fn run_data_scanner_cycle() {
}
// Complete metrics collection for this cycle
stop_fn(&scan_result);
// stop_fn(&scan_result);
}
/// Execute namespace scan with cancellation support
@@ -781,7 +780,7 @@ impl ScannerItem {
}
pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, i64) {
let done = ScannerMetrics::time(ScannerMetric::Ilm);
// let done = ScannerMetrics::time(ScannerMetric::Ilm);
let (action, _size) = self.apply_lifecycle(oi).await;
@@ -807,7 +806,7 @@ impl ScannerItem {
self.heal_replication(&oi, _size_s).await;
}
done();
// done();
if action.delete_all() {
return (true, 0);
@@ -1572,68 +1571,69 @@ pub fn rep_has_active_rules(config: &ReplicationConfiguration, prefix: &str, rec
pub type LocalDrive = Arc<LocalDisk>;
pub async fn scan_data_folder(
disks: &[Option<DiskStore>],
drive: LocalDrive,
cache: &DataUsageCache,
get_size_fn: GetSizeFn,
heal_scan_mode: HealScanMode,
should_sleep: ShouldSleepFn,
_disks: &[Option<DiskStore>],
_drive: LocalDrive,
_cache: &DataUsageCache,
_get_size_fn: GetSizeFn,
_heal_scan_mode: HealScanMode,
_should_sleep: ShouldSleepFn,
) -> disk::error::Result<DataUsageCache> {
if cache.info.name.is_empty() || cache.info.name == DATA_USAGE_ROOT {
return Err(DiskError::other("internal error: root scan attempted"));
}
// if cache.info.name.is_empty() || cache.info.name == DATA_USAGE_ROOT {
// return Err(DiskError::other("internal error: root scan attempted"));
// }
let base_path = drive.to_string();
let (update_path, close_disk) = current_path_updater(&base_path, &cache.info.name);
let skip_heal = if *GLOBAL_IsErasure.read().await || cache.info.skip_healing {
AtomicBool::new(true)
} else {
AtomicBool::new(false)
};
let mut s = FolderScanner {
root: base_path,
get_size: get_size_fn,
old_cache: cache.clone(),
new_cache: DataUsageCache {
info: cache.info.clone(),
..Default::default()
},
update_cache: DataUsageCache {
info: cache.info.clone(),
..Default::default()
},
data_usage_scanner_debug: false,
heal_object_select: 0,
scan_mode: heal_scan_mode,
updates: cache.info.updates.clone().unwrap(),
last_update: SystemTime::now(),
update_current_path: update_path,
disks: disks.to_vec(),
disks_quorum: disks.len() / 2,
skip_heal,
drive: drive.clone(),
we_sleep: should_sleep,
};
// let base_path = drive.to_string();
// // let (update_path, close_disk) = current_path_updater(&base_path, &cache.info.name);
// let skip_heal = if *GLOBAL_IsErasure.read().await || cache.info.skip_healing {
// AtomicBool::new(true)
// } else {
// AtomicBool::new(false)
// };
// let mut s = FolderScanner {
// root: base_path,
// get_size: get_size_fn,
// old_cache: cache.clone(),
// new_cache: DataUsageCache {
// info: cache.info.clone(),
// ..Default::default()
// },
// update_cache: DataUsageCache {
// info: cache.info.clone(),
// ..Default::default()
// },
// data_usage_scanner_debug: false,
// heal_object_select: 0,
// scan_mode: heal_scan_mode,
// updates: cache.info.updates.clone().unwrap(),
// last_update: SystemTime::now(),
// update_current_path: update_path,
// disks: disks.to_vec(),
// disks_quorum: disks.len() / 2,
// skip_heal,
// drive: drive.clone(),
// we_sleep: should_sleep,
// };
if *GLOBAL_IsErasure.read().await || !cache.info.skip_healing {
s.heal_object_select = HEAL_OBJECT_SELECT_PROB as u32;
}
// if *GLOBAL_IsErasure.read().await || !cache.info.skip_healing {
// s.heal_object_select = HEAL_OBJECT_SELECT_PROB as u32;
// }
let mut root = DataUsageEntry::default();
let folder = CachedFolder {
name: cache.info.name.clone(),
object_heal_prob_div: 1,
parent: DataUsageHash("".to_string()),
};
// let mut root = DataUsageEntry::default();
// let folder = CachedFolder {
// name: cache.info.name.clone(),
// object_heal_prob_div: 1,
// parent: DataUsageHash("".to_string()),
// };
if s.scan_folder(&folder, &mut root).await.is_err() {
close_disk().await;
}
s.new_cache.force_compact(DATA_SCANNER_COMPACT_AT_CHILDREN as usize);
s.new_cache.info.last_update = Some(SystemTime::now());
s.new_cache.info.next_cycle = cache.info.next_cycle;
close_disk().await;
Ok(s.new_cache)
// if s.scan_folder(&folder, &mut root).await.is_err() {
// close_disk().await;
// }
// s.new_cache.force_compact(DATA_SCANNER_COMPACT_AT_CHILDREN as usize);
// s.new_cache.info.last_update = Some(SystemTime::now());
// s.new_cache.info.next_cycle = cache.info.next_cycle;
// close_disk().await;
// Ok(s.new_cache)
todo!()
}
pub async fn eval_action_from_lifecycle(
@@ -1695,11 +1695,11 @@ pub async fn apply_expiry_on_transitioned_object(
lc_event: &lifecycle::Event,
src: &LcEventSrc,
) -> bool {
let time_ilm = ScannerMetrics::time_ilm(lc_event.action.clone());
// let time_ilm = ScannerMetrics::time_ilm(lc_event.action.clone());
if let Err(_err) = expire_transitioned_object(api, oi, lc_event, src).await {
return false;
}
let _ = time_ilm(1);
// let _ = time_ilm(1);
true
}
@@ -1727,7 +1727,7 @@ pub async fn apply_expiry_on_non_transitioned_objects(
opts.delete_prefix_object = true;
}
let time_ilm = ScannerMetrics::time_ilm(lc_event.action.clone());
// let time_ilm = ScannerMetrics::time_ilm(lc_event.action.clone());
let mut dobj = api
.delete_object(&oi.bucket, &encode_dir_object(&oi.name), opts)
@@ -1759,11 +1759,11 @@ pub async fn apply_expiry_on_non_transitioned_objects(
});
if lc_event.action != lifecycle::IlmAction::NoneAction {
let mut num_versions = 1_u64;
if lc_event.action.delete_all() {
num_versions = oi.num_versions as u64;
}
let _ = time_ilm(num_versions);
// let mut num_versions = 1_u64;
// if lc_event.action.delete_all() {
// num_versions = oi.num_versions as u64;
// }
// let _ = time_ilm(num_versions);
}
true

View File

@@ -14,7 +14,7 @@
pub mod background_heal_ops;
pub mod data_scanner;
pub mod data_scanner_metric;
// pub mod data_scanner_metric;
pub mod data_usage;
pub mod data_usage_cache;
pub mod error;

View File

@@ -15,7 +15,10 @@
use std::collections::{HashMap, HashSet};
use chrono::Utc;
use rustfs_common::globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Addr};
use rustfs_common::{
globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Addr},
metrics::globalMetrics,
};
use rustfs_madmin::metrics::{DiskIOStats, DiskMetric, RealtimeMetrics};
use rustfs_utils::os::get_drive_stats;
use serde::{Deserialize, Serialize};
@@ -23,10 +26,7 @@ use tracing::info;
use crate::{
admin_server_info::get_local_server_property,
heal::{
data_scanner_metric::globalScannerMetrics,
heal_commands::{DRIVE_STATE_OK, DRIVE_STATE_UNFORMATTED},
},
heal::heal_commands::{DRIVE_STATE_OK, DRIVE_STATE_UNFORMATTED},
new_object_layer_fn,
store_api::StorageAPI,
// utils::os::get_drive_stats,
@@ -108,7 +108,7 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts)
if types.contains(&MetricType::SCANNER) {
info!("start get scanner metrics");
let metrics = globalScannerMetrics.report().await;
let metrics = globalMetrics.report().await;
real_time_metrics.aggregated.scanner = Some(metrics);
}

View File

@@ -31,7 +31,8 @@ use router::{AdminOperation, S3Router};
use rpc::register_rpc_route;
use s3s::route::S3Route;
const ADMIN_PREFIX: &str = "/minio/admin";
const ADMIN_PREFIX: &str = "/rustfs/admin";
// const ADMIN_PREFIX: &str = "/minio/admin";
pub fn make_admin_route(console_enabled: bool) -> std::io::Result<impl S3Route> {
let mut r: S3Router<AdminOperation> = S3Router::new(console_enabled);