diff --git a/Cargo.lock b/Cargo.lock index c1c1608c..b94029ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7930,6 +7930,7 @@ dependencies = [ "anyhow", "async-trait", "bytes", + "chrono", "futures", "lazy_static", "rmp-serde", @@ -7965,6 +7966,8 @@ dependencies = [ name = "rustfs-common" version = "0.0.5" dependencies = [ + "chrono", + "rustfs-madmin", "tokio", "tonic", "uuid", diff --git a/crates/ahm/Cargo.toml b/crates/ahm/Cargo.toml index fc4eae4b..5442fb71 100644 --- a/crates/ahm/Cargo.toml +++ b/crates/ahm/Cargo.toml @@ -34,6 +34,7 @@ url = { workspace = true } rustfs-lock = { workspace = true } lazy_static = { workspace = true } +chrono = { workspace = true } [dev-dependencies] rmp-serde = { workspace = true } diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index 0496f70e..a98e1f9f 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -37,6 +37,7 @@ use crate::{ error::{Error, Result}, get_ahm_services_cancel_token, HealRequest, }; +use rustfs_common::metrics::{globalMetrics, Metric, Metrics}; use rustfs_ecstore::disk::RUSTFS_META_BUCKET; @@ -117,7 +118,7 @@ pub struct Scanner { config: Arc>, /// Scanner state state: Arc>, - /// Metrics collector + /// Local metrics collector (for backward compatibility) metrics: Arc, /// Bucket metrics cache bucket_metrics: Arc>>, @@ -286,10 +287,18 @@ impl Scanner { metrics } + /// Get global metrics from common crate + pub async fn get_global_metrics(&self) -> rustfs_madmin::metrics::ScannerMetrics { + globalMetrics.report().await + } + /// Perform a single scan cycle pub async fn scan_cycle(&self) -> Result<()> { let start_time = SystemTime::now(); + // Start global metrics collection for this cycle + let stop_fn = Metrics::time(Metric::ScanCycle); + info!("Starting scan cycle {} for all EC sets", self.metrics.get_metrics().current_cycle + 1); // Update state @@ -301,6 +310,14 @@ impl Scanner { state.scanning_disks.clear(); } + // Update global metrics cycle information + let cycle_info = rustfs_common::metrics::CurrentCycle { + current: self.state.read().await.current_cycle, + cycle_completed: vec![chrono::Utc::now()], + started: chrono::Utc::now(), + }; + globalMetrics.set_cycle(Some(cycle_info)).await; + self.metrics.set_current_cycle(self.state.read().await.current_cycle); self.metrics.increment_total_cycles(); @@ -392,6 +409,9 @@ impl Scanner { state.current_scan_duration = Some(scan_duration); } + // Complete global metrics collection for this cycle + stop_fn(); + info!( "Completed scan cycle in {:?} ({} successful, {} failed)", scan_duration, successful_scans, failed_scans @@ -475,6 +495,9 @@ impl Scanner { async fn scan_disk(&self, disk: &DiskStore) -> Result>> { let disk_path = disk.path().to_string_lossy().to_string(); + // Start global metrics collection for disk scan + let stop_fn = Metrics::time(Metric::ScanBucketDrive); + info!("Scanning disk: {}", disk_path); // Update disk metrics @@ -638,6 +661,9 @@ impl Scanner { state.scanning_disks.retain(|d| d != &disk_path); } + // Complete global metrics collection for disk scan + stop_fn(); + Ok(disk_objects) } @@ -646,6 +672,9 @@ impl Scanner { /// This method collects all objects from a disk for a specific bucket. /// It returns a map of object names to their metadata for later analysis. async fn scan_volume(&self, disk: &DiskStore, bucket: &str) -> Result> { + // Start global metrics collection for volume scan + let stop_fn = Metrics::time(Metric::ScanObject); + info!("Scanning bucket: {} on disk: {}", bucket, disk.to_string()); // Initialize bucket metrics if not exists @@ -785,6 +814,9 @@ impl Scanner { state.scanning_buckets.retain(|b| b != bucket); } + // Complete global metrics collection for volume scan + stop_fn(); + debug!( "Completed scanning bucket: {} on disk {} ({} objects, {} issues)", bucket, diff --git a/crates/ahm/tests/heal_integration_test.rs b/crates/ahm/tests/heal_integration_test.rs index 76603e53..f5dbb0c7 100644 --- a/crates/ahm/tests/heal_integration_test.rs +++ b/crates/ahm/tests/heal_integration_test.rs @@ -318,13 +318,13 @@ async fn test_heal_format_with_data() { let obj_dir = disk_paths[0].join(bucket_name).join(object_name); let target_part = WalkDir::new(&obj_dir) - .min_depth(2) - .max_depth(2) - .into_iter() - .filter_map(Result::ok) - .find(|e| e.file_type().is_file() && e.file_name().to_str().map(|n| n.starts_with("part.")).unwrap_or(false)) - .map(|e| e.into_path()) - .expect("Failed to locate part file to delete"); + .min_depth(2) + .max_depth(2) + .into_iter() + .filter_map(Result::ok) + .find(|e| e.file_type().is_file() && e.file_name().to_str().map(|n| n.starts_with("part.")).unwrap_or(false)) + .map(|e| e.into_path()) + .expect("Failed to locate part file to delete"); // ─── 1️⃣ delete format.json on one disk ────────────── let format_path = disk_paths[0].join(".rustfs.sys").join("format.json"); diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 85d8bf97..daba9456 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -31,3 +31,5 @@ workspace = true tokio.workspace = true tonic = { workspace = true } uuid = { workspace = true } +chrono = { workspace = true } +rustfs-madmin = { workspace = true } diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 7c163b34..3a41462a 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -17,6 +17,7 @@ pub mod bucket_stats; pub mod globals; pub mod heal_channel; pub mod last_minute; +pub mod metrics; // is ',' pub static DEFAULT_DELIMITER: u8 = 44; diff --git a/crates/common/src/metrics.rs b/crates/common/src/metrics.rs new file mode 100644 index 00000000..d88e5a3a --- /dev/null +++ b/crates/common/src/metrics.rs @@ -0,0 +1,535 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::{DateTime, Utc}; +use lazy_static::lazy_static; +use rustfs_madmin::metrics::ScannerMetrics as M_ScannerMetrics; +use std::{ + collections::HashMap, + fmt::Display, + pin::Pin, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::{Duration, SystemTime}, +}; +use tokio::sync::{Mutex, RwLock}; + +use crate::last_minute::{AccElem, LastMinuteLatency}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum IlmAction { + NoneAction = 0, + DeleteAction, + DeleteVersionAction, + TransitionAction, + TransitionVersionAction, + DeleteRestoredAction, + DeleteRestoredVersionAction, + DeleteAllVersionsAction, + DelMarkerDeleteAllVersionsAction, + ActionCount, +} + +impl IlmAction { + pub fn delete_restored(&self) -> bool { + *self == Self::DeleteRestoredAction || *self == Self::DeleteRestoredVersionAction + } + + pub fn delete_versioned(&self) -> bool { + *self == Self::DeleteVersionAction || *self == Self::DeleteRestoredVersionAction + } + + pub fn delete_all(&self) -> bool { + *self == Self::DeleteAllVersionsAction || *self == Self::DelMarkerDeleteAllVersionsAction + } + + pub fn delete(&self) -> bool { + if self.delete_restored() { + return true; + } + *self == Self::DeleteVersionAction + || *self == Self::DeleteAction + || *self == Self::DeleteAllVersionsAction + || *self == Self::DelMarkerDeleteAllVersionsAction + } +} + +impl Display for IlmAction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } +} + +lazy_static! { + pub static ref globalMetrics: Arc = Arc::new(Metrics::new()); +} + +#[derive(Clone, Debug, PartialEq, PartialOrd)] +pub enum Metric { + // START Realtime metrics, that only records + // last minute latencies and total operation count. + ReadMetadata = 0, + CheckMissing, + SaveUsage, + ApplyAll, + ApplyVersion, + TierObjSweep, + HealCheck, + Ilm, + CheckReplication, + Yield, + CleanAbandoned, + ApplyNonCurrent, + HealAbandonedVersion, + + // START Trace metrics: + StartTrace, + ScanObject, // Scan object. All operations included. + HealAbandonedObject, + + // END realtime metrics: + LastRealtime, + + // Trace only metrics: + ScanFolder, // Scan a folder on disk, recursively. + ScanCycle, // Full cycle, cluster global. + ScanBucketDrive, // Single bucket on one drive. + CompactFolder, // Folder compacted. + + // Must be last: + Last, +} + +impl Metric { + /// Convert to string representation for metrics + pub fn as_str(self) -> &'static str { + match self { + Self::ReadMetadata => "read_metadata", + Self::CheckMissing => "check_missing", + Self::SaveUsage => "save_usage", + Self::ApplyAll => "apply_all", + Self::ApplyVersion => "apply_version", + Self::TierObjSweep => "tier_obj_sweep", + Self::HealCheck => "heal_check", + Self::Ilm => "ilm", + Self::CheckReplication => "check_replication", + Self::Yield => "yield", + Self::CleanAbandoned => "clean_abandoned", + Self::ApplyNonCurrent => "apply_non_current", + Self::HealAbandonedVersion => "heal_abandoned_version", + Self::StartTrace => "start_trace", + Self::ScanObject => "scan_object", + Self::HealAbandonedObject => "heal_abandoned_object", + Self::LastRealtime => "last_realtime", + Self::ScanFolder => "scan_folder", + Self::ScanCycle => "scan_cycle", + Self::ScanBucketDrive => "scan_bucket_drive", + Self::CompactFolder => "compact_folder", + Self::Last => "last", + } + } + + /// Convert from index back to enum (safe version) + pub fn from_index(index: usize) -> Option { + if index >= Self::Last as usize { + return None; + } + // Safe conversion using match instead of unsafe transmute + match index { + 0 => Some(Self::ReadMetadata), + 1 => Some(Self::CheckMissing), + 2 => Some(Self::SaveUsage), + 3 => Some(Self::ApplyAll), + 4 => Some(Self::ApplyVersion), + 5 => Some(Self::TierObjSweep), + 6 => Some(Self::HealCheck), + 7 => Some(Self::Ilm), + 8 => Some(Self::CheckReplication), + 9 => Some(Self::Yield), + 10 => Some(Self::CleanAbandoned), + 11 => Some(Self::ApplyNonCurrent), + 12 => Some(Self::HealAbandonedVersion), + 13 => Some(Self::StartTrace), + 14 => Some(Self::ScanObject), + 15 => Some(Self::HealAbandonedObject), + 16 => Some(Self::LastRealtime), + 17 => Some(Self::ScanFolder), + 18 => Some(Self::ScanCycle), + 19 => Some(Self::ScanBucketDrive), + 20 => Some(Self::CompactFolder), + 21 => Some(Self::Last), + _ => None, + } + } +} + +/// Thread-safe wrapper for LastMinuteLatency with atomic operations +#[derive(Default)] +pub struct LockedLastMinuteLatency { + latency: Arc>, +} + +impl Clone for LockedLastMinuteLatency { + fn clone(&self) -> Self { + Self { + latency: Arc::clone(&self.latency), + } + } +} + +impl LockedLastMinuteLatency { + pub fn new() -> Self { + Self { + latency: Arc::new(Mutex::new(LastMinuteLatency::default())), + } + } + + /// Add a duration measurement + pub async fn add(&self, duration: Duration) { + self.add_size(duration, 0).await; + } + + /// Add a duration measurement with size + pub async fn add_size(&self, duration: Duration, size: u64) { + let mut latency = self.latency.lock().await; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let elem = AccElem { + n: 1, + total: duration.as_secs(), + size, + }; + latency.add_all(now, &elem); + } + + /// Get total accumulated metrics for the last minute + pub async fn total(&self) -> AccElem { + let mut latency = self.latency.lock().await; + latency.get_total() + } +} + +/// Current path tracker for monitoring active scan paths +struct CurrentPathTracker { + current_path: Arc>, +} + +impl CurrentPathTracker { + fn new(initial_path: String) -> Self { + Self { + current_path: Arc::new(RwLock::new(initial_path)), + } + } + + async fn update_path(&self, path: String) { + *self.current_path.write().await = path; + } + + async fn get_path(&self) -> String { + self.current_path.read().await.clone() + } +} + +/// Main scanner metrics structure +pub struct Metrics { + // All fields must be accessed atomically and aligned. + operations: Vec, + latency: Vec, + actions: Vec, + actions_latency: Vec, + // Current paths contains disk -> tracker mappings + current_paths: Arc>>>, + + // Cycle information + cycle_info: Arc>>, +} + +// This is a placeholder. We'll need to define this struct. +#[derive(Clone, Debug)] +pub struct CurrentCycle { + pub current: u64, + pub cycle_completed: Vec>, + pub started: DateTime, +} + +impl Metrics { + pub fn new() -> Self { + let operations = (0..Metric::Last as usize).map(|_| AtomicU64::new(0)).collect(); + + let latency = (0..Metric::LastRealtime as usize) + .map(|_| LockedLastMinuteLatency::new()) + .collect(); + + Self { + operations, + latency, + actions: (0..IlmAction::ActionCount as usize).map(|_| AtomicU64::new(0)).collect(), + actions_latency: vec![LockedLastMinuteLatency::default(); IlmAction::ActionCount as usize], + current_paths: Arc::new(RwLock::new(HashMap::new())), + cycle_info: Arc::new(RwLock::new(None)), + } + } + + /// Log scanner action with custom metadata - compatible with existing usage + pub fn log(metric: Metric) -> impl Fn(&HashMap) { + let metric = metric as usize; + let start_time = SystemTime::now(); + move |_custom: &HashMap| { + let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); + + // Update operation count + globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed); + + // Update latency for realtime metrics (spawn async task for this) + if (metric) < Metric::LastRealtime as usize { + let metric_index = metric; + tokio::spawn(async move { + globalMetrics.latency[metric_index].add(duration).await; + }); + } + + // Log trace metrics + if metric as u8 > Metric::StartTrace as u8 { + //debug!(metric = metric.as_str(), duration_ms = duration.as_millis(), "Scanner trace metric"); + } + } + } + + /// Time scanner action with size - returns function that takes size + pub fn time_size(metric: Metric) -> impl Fn(u64) { + let metric = metric as usize; + let start_time = SystemTime::now(); + move |size: u64| { + let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); + + // Update operation count + globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed); + + // Update latency for realtime metrics with size (spawn async task) + if (metric) < Metric::LastRealtime as usize { + let metric_index = metric; + tokio::spawn(async move { + globalMetrics.latency[metric_index].add_size(duration, size).await; + }); + } + } + } + + /// Time a scanner action - returns a closure to call when done + pub fn time(metric: Metric) -> impl Fn() { + let metric = metric as usize; + let start_time = SystemTime::now(); + move || { + let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); + + // Update operation count + globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed); + + // Update latency for realtime metrics (spawn async task) + if (metric) < Metric::LastRealtime as usize { + let metric_index = metric; + tokio::spawn(async move { + globalMetrics.latency[metric_index].add(duration).await; + }); + } + } + } + + /// Time N scanner actions - returns function that takes count, then returns completion function + pub fn time_n(metric: Metric) -> Box Box + Send + Sync> { + let metric = metric as usize; + let start_time = SystemTime::now(); + Box::new(move |count: usize| { + Box::new(move || { + let duration = SystemTime::now().duration_since(start_time).unwrap_or_default(); + + // Update operation count + globalMetrics.operations[metric].fetch_add(count as u64, Ordering::Relaxed); + + // Update latency for realtime metrics (spawn async task) + if (metric) < Metric::LastRealtime as usize { + let metric_index = metric; + tokio::spawn(async move { + globalMetrics.latency[metric_index].add(duration).await; + }); + } + }) + }) + } + + /// Time ILM action with versions - returns function that takes versions, then returns completion function + pub fn time_ilm(a: IlmAction) -> Box Box + Send + Sync> { + let a_clone = a as usize; + if a_clone == IlmAction::NoneAction as usize || a_clone >= IlmAction::ActionCount as usize { + return Box::new(move |_: u64| Box::new(move || {})); + } + let start = SystemTime::now(); + Box::new(move |versions: u64| { + Box::new(move || { + let duration = SystemTime::now().duration_since(start).unwrap_or(Duration::from_secs(0)); + tokio::spawn(async move { + globalMetrics.actions[a_clone].fetch_add(versions, Ordering::Relaxed); + globalMetrics.actions_latency[a_clone].add(duration).await; + }); + }) + }) + } + + /// Increment time with specific duration + pub async fn inc_time(metric: Metric, duration: Duration) { + let metric = metric as usize; + // Update operation count + globalMetrics.operations[metric].fetch_add(1, Ordering::Relaxed); + + // Update latency for realtime metrics + if (metric) < Metric::LastRealtime as usize { + globalMetrics.latency[metric].add(duration).await; + } + } + + /// Get lifetime operation count for a metric + pub fn lifetime(&self, metric: Metric) -> u64 { + let metric = metric as usize; + if (metric) >= Metric::Last as usize { + return 0; + } + self.operations[metric].load(Ordering::Relaxed) + } + + /// Get last minute statistics for a metric + pub async fn last_minute(&self, metric: Metric) -> AccElem { + let metric = metric as usize; + if (metric) >= Metric::LastRealtime as usize { + return AccElem::default(); + } + self.latency[metric].total().await + } + + /// Set current cycle information + pub async fn set_cycle(&self, cycle: Option) { + *self.cycle_info.write().await = cycle; + } + + /// Get current cycle information + pub async fn get_cycle(&self) -> Option { + self.cycle_info.read().await.clone() + } + + /// Get current active paths + pub async fn get_current_paths(&self) -> Vec { + let mut result = Vec::new(); + let paths = self.current_paths.read().await; + + for (disk, tracker) in paths.iter() { + let path = tracker.get_path().await; + result.push(format!("{disk}/{path}")); + } + + result + } + + /// Get number of active drives + pub async fn active_drives(&self) -> usize { + self.current_paths.read().await.len() + } + + /// Generate metrics report + pub async fn report(&self) -> M_ScannerMetrics { + let mut metrics = M_ScannerMetrics::default(); + + // Set cycle information + if let Some(cycle) = self.get_cycle().await { + metrics.current_cycle = cycle.current; + metrics.cycles_completed_at = cycle.cycle_completed; + metrics.current_started = cycle.started; + } + + metrics.collected_at = Utc::now(); + metrics.active_paths = self.get_current_paths().await; + + // Lifetime operations + for i in 0..Metric::Last as usize { + let count = self.operations[i].load(Ordering::Relaxed); + if count > 0 { + if let Some(metric) = Metric::from_index(i) { + metrics.life_time_ops.insert(metric.as_str().to_string(), count); + } + } + } + + // Last minute statistics for realtime metrics + for i in 0..Metric::LastRealtime as usize { + let last_min = self.latency[i].total().await; + if last_min.n > 0 { + if let Some(_metric) = Metric::from_index(i) { + // Convert to madmin TimedAction format if needed + // This would require implementing the conversion + } + } + } + + metrics + } +} + +// Type aliases for compatibility with existing code +pub type UpdateCurrentPathFn = Arc Pin + Send>> + Send + Sync>; +pub type CloseDiskFn = Arc Pin + Send>> + Send + Sync>; + +/// Create a current path updater for tracking scan progress +pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) { + let tracker = Arc::new(CurrentPathTracker::new(initial.to_string())); + let disk_name = disk.to_string(); + + // Store the tracker in global metrics + let tracker_clone = Arc::clone(&tracker); + let disk_clone = disk_name.clone(); + tokio::spawn(async move { + globalMetrics.current_paths.write().await.insert(disk_clone, tracker_clone); + }); + + let update_fn = { + let tracker = Arc::clone(&tracker); + Arc::new(move |path: &str| -> Pin + Send>> { + let tracker = Arc::clone(&tracker); + let path = path.to_string(); + Box::pin(async move { + tracker.update_path(path).await; + }) + }) + }; + + let done_fn = { + let disk_name = disk_name.clone(); + Arc::new(move || -> Pin + Send>> { + let disk_name = disk_name.clone(); + Box::pin(async move { + globalMetrics.current_paths.write().await.remove(&disk_name); + }) + }) + }; + + (update_fn, done_fn) +} + +impl Default for Metrics { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs index 14cfe998..40af33df 100644 --- a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs +++ b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs @@ -22,6 +22,7 @@ use async_channel::{Receiver as A_Receiver, Sender as A_Sender, bounded}; use futures::Future; use http::HeaderMap; use lazy_static::lazy_static; +use rustfs_common::metrics::{IlmAction, Metrics}; use s3s::Body; use sha2::{Digest, Sha256}; use std::any::Any; @@ -41,7 +42,7 @@ use xxhash_rust::xxh64; //use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger}; //use rustfs_notify::{initialize, notification_system}; use super::bucket_lifecycle_audit::{LcAuditEvent, LcEventSrc}; -use super::lifecycle::{self, ExpirationOptions, IlmAction, Lifecycle, TransitionOptions}; +use super::lifecycle::{self, ExpirationOptions, Lifecycle, TransitionOptions}; use super::tier_last_day_stats::{DailyAllTierStats, LastDayTierStats}; use super::tier_sweeper::{Jentry, delete_object_from_remote_tier}; use crate::bucket::{metadata_sys::get_lifecycle_config, versioning_sys::BucketVersioningSys}; @@ -54,7 +55,6 @@ use crate::global::GLOBAL_LocalNodeName; use crate::global::{GLOBAL_LifecycleSys, GLOBAL_TierConfigMgr, get_global_deployment_id}; use crate::heal::{ data_scanner::{apply_expiry_on_non_transitioned_objects, apply_expiry_on_transitioned_object}, - data_scanner_metric::ScannerMetrics, data_usage_cache::TierStats, }; use crate::store::ECStore; @@ -631,7 +631,7 @@ pub async fn enqueue_transition_immediate(oi: &ObjectInfo, src: LcEventSrc) { if !lc.is_none() { let event = lc.expect("err").eval(&oi.to_lifecycle_opts()).await; match event.action { - lifecycle::IlmAction::TransitionAction | lifecycle::IlmAction::TransitionVersionAction => { + IlmAction::TransitionAction | IlmAction::TransitionVersionAction => { if oi.delete_marker || oi.is_dir { return; } @@ -728,7 +728,7 @@ pub fn gen_transition_objname(bucket: &str) -> Result { } pub async fn transition_object(api: Arc, oi: &ObjectInfo, lae: LcAuditEvent) -> Result<(), Error> { - let time_ilm = ScannerMetrics::time_ilm(lae.event.action); + let time_ilm = Metrics::time_ilm(lae.event.action); let opts = ObjectOptions { transition: TransitionOptions { diff --git a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs index 2431d6ba..3d232363 100644 --- a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs +++ b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs @@ -43,49 +43,7 @@ const _ERR_XML_NOT_WELL_FORMED: &str = const ERR_LIFECYCLE_BUCKET_LOCKED: &str = "ExpiredObjectAllVersions element and DelMarkerExpiration action cannot be used on an retention bucket"; -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum IlmAction { - NoneAction = 0, - DeleteAction, - DeleteVersionAction, - TransitionAction, - TransitionVersionAction, - DeleteRestoredAction, - DeleteRestoredVersionAction, - DeleteAllVersionsAction, - DelMarkerDeleteAllVersionsAction, - ActionCount, -} - -impl IlmAction { - pub fn delete_restored(&self) -> bool { - *self == Self::DeleteRestoredAction || *self == Self::DeleteRestoredVersionAction - } - - pub fn delete_versioned(&self) -> bool { - *self == Self::DeleteVersionAction || *self == Self::DeleteRestoredVersionAction - } - - pub fn delete_all(&self) -> bool { - *self == Self::DeleteAllVersionsAction || *self == Self::DelMarkerDeleteAllVersionsAction - } - - pub fn delete(&self) -> bool { - if self.delete_restored() { - return true; - } - *self == Self::DeleteVersionAction - || *self == Self::DeleteAction - || *self == Self::DeleteAllVersionsAction - || *self == Self::DelMarkerDeleteAllVersionsAction - } -} - -impl Display for IlmAction { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self) - } -} +pub use rustfs_common::metrics::IlmAction; #[async_trait::async_trait] pub trait RuleValidate { diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index 12214f3e..649ca702 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -39,13 +39,13 @@ use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold}; use crate::heal::data_scanner::{ ScannerItem, ShouldSleepFn, SizeSummary, lc_has_active_rules, rep_has_active_rules, scan_data_folder, }; -use crate::heal::data_scanner_metric::{ScannerMetric, ScannerMetrics}; use crate::heal::data_usage_cache::{DataUsageCache, DataUsageEntry}; use crate::heal::error::{ERR_IGNORE_FILE_CONTRIB, ERR_SKIP_FILE}; use crate::heal::heal_commands::{HealScanMode, HealingTracker}; use crate::heal::heal_ops::HEALING_TRACKER_FILENAME; use crate::new_object_layer_fn; use crate::store_api::{ObjectInfo, StorageAPI}; +use rustfs_common::metrics::{Metric, Metrics}; use rustfs_utils::path::{ GLOBAL_DIR_SUFFIX, GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR, clean, decode_dir_object, encode_dir_object, has_suffix, path_join, path_join_buf, @@ -2323,9 +2323,9 @@ impl DiskAPI for LocalDisk { if !item.path.ends_with(&format!("{SLASH_SEPARATOR}{STORAGE_FORMAT_FILE}")) { return Err(Error::other(ERR_SKIP_FILE).into()); } - let stop_fn = ScannerMetrics::log(ScannerMetric::ScanObject); + let stop_fn = Metrics::log(Metric::ScanObject); let mut res = HashMap::new(); - let done_sz = ScannerMetrics::time_size(ScannerMetric::ReadMetadata); + let done_sz = Metrics::time_size(Metric::ReadMetadata); let buf = match disk.read_metadata(item.path.clone()).await { Ok(buf) => buf, Err(err) => { @@ -2351,7 +2351,7 @@ impl DiskAPI for LocalDisk { } }; let mut size_s = SizeSummary::default(); - let done = ScannerMetrics::time(ScannerMetric::ApplyAll); + let done = Metrics::time(Metric::ApplyAll); let obj_infos = match item.apply_versions_actions(&fivs.versions).await { Ok(obj_infos) => obj_infos, Err(err) => { @@ -2369,7 +2369,7 @@ impl DiskAPI for LocalDisk { let mut obj_deleted = false; for info in obj_infos.iter() { - let done = ScannerMetrics::time(ScannerMetric::ApplyVersion); + let done = Metrics::time(Metric::ApplyVersion); let sz: i64; (obj_deleted, sz) = item.apply_actions(info, &mut size_s).await; done(); @@ -2405,7 +2405,7 @@ impl DiskAPI for LocalDisk { &item.object_path().to_string_lossy(), versioned, ); - let done = ScannerMetrics::time(ScannerMetric::TierObjSweep); + let done = Metrics::time(Metric::TierObjSweep); done(); } diff --git a/crates/ecstore/src/heal/data_scanner.rs b/crates/ecstore/src/heal/data_scanner.rs index c5ea666c..98a0924f 100644 --- a/crates/ecstore/src/heal/data_scanner.rs +++ b/crates/ecstore/src/heal/data_scanner.rs @@ -29,7 +29,6 @@ use std::{ use time::{self, OffsetDateTime}; use super::{ - data_scanner_metric::{ScannerMetric, ScannerMetrics, globalScannerMetrics}, data_usage::{DATA_USAGE_BLOOM_NAME_PATH, DataUsageInfo, store_data_usage_in_backend}, data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash}, heal_commands::{HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN, HealScanMode}, @@ -39,6 +38,7 @@ use crate::bucket::{ utils::is_meta_bucketname, }; use crate::cmd::bucket_replication::queue_replication_heal; +use crate::disk::local::LocalDisk; use crate::event::name::EventName; use crate::{ bucket::{ @@ -57,7 +57,7 @@ use crate::{ bucket::{versioning::VersioningApi, versioning_sys::BucketVersioningSys}, cmd::bucket_replication::ReplicationStatusType, disk, - heal::data_usage::DATA_USAGE_ROOT, + // heal::data_usage::DATA_USAGE_ROOT, }; use crate::{ cache_value::metacache_set::{ListPathRawOptions, list_path_raw}, @@ -66,7 +66,7 @@ use crate::{ heal::Config, }, disk::{DiskInfoOptions, DiskStore}, - global::{GLOBAL_BackgroundHealState, GLOBAL_IsErasure, GLOBAL_IsErasureSD}, + global::{GLOBAL_BackgroundHealState, GLOBAL_IsErasureSD}, heal::{ data_usage::BACKGROUND_HEAL_INFO_PATH, data_usage_cache::{DataUsageHashMap, hash_path}, @@ -83,7 +83,6 @@ use crate::{ disk::error::DiskError, error::{Error, Result}, }; -use crate::{disk::local::LocalDisk, heal::data_scanner_metric::current_path_updater}; use chrono::{DateTime, Utc}; use lazy_static::lazy_static; use rand::Rng; @@ -300,14 +299,14 @@ async fn run_data_scanner_cycle() { }; // Start metrics collection for this cycle - let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle); + // let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle); // Update cycle information cycle_info.current = cycle_info.next; cycle_info.started = Utc::now(); // Update global scanner metrics - globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await; + // globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await; // Read background healing information and determine scan mode let bg_heal_info = read_background_heal_info(store.clone()).await; @@ -357,7 +356,7 @@ async fn run_data_scanner_cycle() { } // Update global metrics with completion info - globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await; + // globalScannerMetrics.set_cycle(Some(cycle_info.clone())).await; // Persist updated cycle information // ignore error, continue. @@ -379,7 +378,7 @@ async fn run_data_scanner_cycle() { } // Complete metrics collection for this cycle - stop_fn(&scan_result); + // stop_fn(&scan_result); } /// Execute namespace scan with cancellation support @@ -781,7 +780,7 @@ impl ScannerItem { } pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, i64) { - let done = ScannerMetrics::time(ScannerMetric::Ilm); + // let done = ScannerMetrics::time(ScannerMetric::Ilm); let (action, _size) = self.apply_lifecycle(oi).await; @@ -807,7 +806,7 @@ impl ScannerItem { self.heal_replication(&oi, _size_s).await; } - done(); + // done(); if action.delete_all() { return (true, 0); @@ -1572,68 +1571,69 @@ pub fn rep_has_active_rules(config: &ReplicationConfiguration, prefix: &str, rec pub type LocalDrive = Arc; pub async fn scan_data_folder( - disks: &[Option], - drive: LocalDrive, - cache: &DataUsageCache, - get_size_fn: GetSizeFn, - heal_scan_mode: HealScanMode, - should_sleep: ShouldSleepFn, + _disks: &[Option], + _drive: LocalDrive, + _cache: &DataUsageCache, + _get_size_fn: GetSizeFn, + _heal_scan_mode: HealScanMode, + _should_sleep: ShouldSleepFn, ) -> disk::error::Result { - if cache.info.name.is_empty() || cache.info.name == DATA_USAGE_ROOT { - return Err(DiskError::other("internal error: root scan attempted")); - } + // if cache.info.name.is_empty() || cache.info.name == DATA_USAGE_ROOT { + // return Err(DiskError::other("internal error: root scan attempted")); + // } - let base_path = drive.to_string(); - let (update_path, close_disk) = current_path_updater(&base_path, &cache.info.name); - let skip_heal = if *GLOBAL_IsErasure.read().await || cache.info.skip_healing { - AtomicBool::new(true) - } else { - AtomicBool::new(false) - }; - let mut s = FolderScanner { - root: base_path, - get_size: get_size_fn, - old_cache: cache.clone(), - new_cache: DataUsageCache { - info: cache.info.clone(), - ..Default::default() - }, - update_cache: DataUsageCache { - info: cache.info.clone(), - ..Default::default() - }, - data_usage_scanner_debug: false, - heal_object_select: 0, - scan_mode: heal_scan_mode, - updates: cache.info.updates.clone().unwrap(), - last_update: SystemTime::now(), - update_current_path: update_path, - disks: disks.to_vec(), - disks_quorum: disks.len() / 2, - skip_heal, - drive: drive.clone(), - we_sleep: should_sleep, - }; + // let base_path = drive.to_string(); + // // let (update_path, close_disk) = current_path_updater(&base_path, &cache.info.name); + // let skip_heal = if *GLOBAL_IsErasure.read().await || cache.info.skip_healing { + // AtomicBool::new(true) + // } else { + // AtomicBool::new(false) + // }; + // let mut s = FolderScanner { + // root: base_path, + // get_size: get_size_fn, + // old_cache: cache.clone(), + // new_cache: DataUsageCache { + // info: cache.info.clone(), + // ..Default::default() + // }, + // update_cache: DataUsageCache { + // info: cache.info.clone(), + // ..Default::default() + // }, + // data_usage_scanner_debug: false, + // heal_object_select: 0, + // scan_mode: heal_scan_mode, + // updates: cache.info.updates.clone().unwrap(), + // last_update: SystemTime::now(), + // update_current_path: update_path, + // disks: disks.to_vec(), + // disks_quorum: disks.len() / 2, + // skip_heal, + // drive: drive.clone(), + // we_sleep: should_sleep, + // }; - if *GLOBAL_IsErasure.read().await || !cache.info.skip_healing { - s.heal_object_select = HEAL_OBJECT_SELECT_PROB as u32; - } + // if *GLOBAL_IsErasure.read().await || !cache.info.skip_healing { + // s.heal_object_select = HEAL_OBJECT_SELECT_PROB as u32; + // } - let mut root = DataUsageEntry::default(); - let folder = CachedFolder { - name: cache.info.name.clone(), - object_heal_prob_div: 1, - parent: DataUsageHash("".to_string()), - }; + // let mut root = DataUsageEntry::default(); + // let folder = CachedFolder { + // name: cache.info.name.clone(), + // object_heal_prob_div: 1, + // parent: DataUsageHash("".to_string()), + // }; - if s.scan_folder(&folder, &mut root).await.is_err() { - close_disk().await; - } - s.new_cache.force_compact(DATA_SCANNER_COMPACT_AT_CHILDREN as usize); - s.new_cache.info.last_update = Some(SystemTime::now()); - s.new_cache.info.next_cycle = cache.info.next_cycle; - close_disk().await; - Ok(s.new_cache) + // if s.scan_folder(&folder, &mut root).await.is_err() { + // close_disk().await; + // } + // s.new_cache.force_compact(DATA_SCANNER_COMPACT_AT_CHILDREN as usize); + // s.new_cache.info.last_update = Some(SystemTime::now()); + // s.new_cache.info.next_cycle = cache.info.next_cycle; + // close_disk().await; + // Ok(s.new_cache) + todo!() } pub async fn eval_action_from_lifecycle( @@ -1695,11 +1695,11 @@ pub async fn apply_expiry_on_transitioned_object( lc_event: &lifecycle::Event, src: &LcEventSrc, ) -> bool { - let time_ilm = ScannerMetrics::time_ilm(lc_event.action.clone()); + // let time_ilm = ScannerMetrics::time_ilm(lc_event.action.clone()); if let Err(_err) = expire_transitioned_object(api, oi, lc_event, src).await { return false; } - let _ = time_ilm(1); + // let _ = time_ilm(1); true } @@ -1727,7 +1727,7 @@ pub async fn apply_expiry_on_non_transitioned_objects( opts.delete_prefix_object = true; } - let time_ilm = ScannerMetrics::time_ilm(lc_event.action.clone()); + // let time_ilm = ScannerMetrics::time_ilm(lc_event.action.clone()); let mut dobj = api .delete_object(&oi.bucket, &encode_dir_object(&oi.name), opts) @@ -1759,11 +1759,11 @@ pub async fn apply_expiry_on_non_transitioned_objects( }); if lc_event.action != lifecycle::IlmAction::NoneAction { - let mut num_versions = 1_u64; - if lc_event.action.delete_all() { - num_versions = oi.num_versions as u64; - } - let _ = time_ilm(num_versions); + // let mut num_versions = 1_u64; + // if lc_event.action.delete_all() { + // num_versions = oi.num_versions as u64; + // } + // let _ = time_ilm(num_versions); } true diff --git a/crates/ecstore/src/heal/mod.rs b/crates/ecstore/src/heal/mod.rs index a92bfc33..0733d91d 100644 --- a/crates/ecstore/src/heal/mod.rs +++ b/crates/ecstore/src/heal/mod.rs @@ -14,7 +14,7 @@ pub mod background_heal_ops; pub mod data_scanner; -pub mod data_scanner_metric; +// pub mod data_scanner_metric; pub mod data_usage; pub mod data_usage_cache; pub mod error; diff --git a/crates/ecstore/src/metrics_realtime.rs b/crates/ecstore/src/metrics_realtime.rs index 298ff846..2363f1c0 100644 --- a/crates/ecstore/src/metrics_realtime.rs +++ b/crates/ecstore/src/metrics_realtime.rs @@ -15,7 +15,10 @@ use std::collections::{HashMap, HashSet}; use chrono::Utc; -use rustfs_common::globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Addr}; +use rustfs_common::{ + globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Addr}, + metrics::globalMetrics, +}; use rustfs_madmin::metrics::{DiskIOStats, DiskMetric, RealtimeMetrics}; use rustfs_utils::os::get_drive_stats; use serde::{Deserialize, Serialize}; @@ -23,10 +26,7 @@ use tracing::info; use crate::{ admin_server_info::get_local_server_property, - heal::{ - data_scanner_metric::globalScannerMetrics, - heal_commands::{DRIVE_STATE_OK, DRIVE_STATE_UNFORMATTED}, - }, + heal::heal_commands::{DRIVE_STATE_OK, DRIVE_STATE_UNFORMATTED}, new_object_layer_fn, store_api::StorageAPI, // utils::os::get_drive_stats, @@ -108,7 +108,7 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts) if types.contains(&MetricType::SCANNER) { info!("start get scanner metrics"); - let metrics = globalScannerMetrics.report().await; + let metrics = globalMetrics.report().await; real_time_metrics.aggregated.scanner = Some(metrics); } diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index eb4ecf43..4e24d7dc 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -31,7 +31,8 @@ use router::{AdminOperation, S3Router}; use rpc::register_rpc_route; use s3s::route::S3Route; -const ADMIN_PREFIX: &str = "/minio/admin"; +const ADMIN_PREFIX: &str = "/rustfs/admin"; +// const ADMIN_PREFIX: &str = "/minio/admin"; pub fn make_admin_route(console_enabled: bool) -> std::io::Result { let mut r: S3Router = S3Router::new(console_enabled);