mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -7933,6 +7933,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"futures",
|
||||
"lazy_static",
|
||||
"once_cell",
|
||||
"rmp-serde",
|
||||
"rustfs-common",
|
||||
"rustfs-ecstore",
|
||||
@@ -7942,14 +7943,18 @@ dependencies = [
|
||||
"rustfs-utils",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serial_test",
|
||||
"tempfile",
|
||||
"thiserror 2.0.12",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-test",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"uuid",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7968,6 +7973,7 @@ version = "0.0.5"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"lazy_static",
|
||||
"path-clean",
|
||||
"rmp-serde",
|
||||
"rustfs-filemeta",
|
||||
|
||||
@@ -1,16 +1,10 @@
|
||||
[package]
|
||||
name = "rustfs-ahm"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
version = "0.0.5"
|
||||
edition = "2021"
|
||||
authors = ["RustFS Team"]
|
||||
license.workspace = true
|
||||
license = "Apache-2.0"
|
||||
description = "RustFS AHM (Automatic Health Management) Scanner"
|
||||
repository.workspace = true
|
||||
rust-version.workspace = true
|
||||
homepage.workspace = true
|
||||
documentation = "https://docs.rs/rustfs-ahm/latest/rustfs_ahm/"
|
||||
keywords = ["RustFS", "AHM", "health-management", "scanner", "Minio"]
|
||||
categories = ["web-programming", "development-tools", "filesystem"]
|
||||
|
||||
[dependencies]
|
||||
rustfs-ecstore = { workspace = true }
|
||||
@@ -38,5 +32,10 @@ chrono = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
rmp-serde = { workspace = true }
|
||||
tokio-test = { workspace = true }
|
||||
tokio-test = "0.4"
|
||||
serde_json = { workspace = true }
|
||||
serial_test = "3.2.0"
|
||||
once_cell = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
walkdir = "2.5.0"
|
||||
tempfile = "3.10"
|
||||
@@ -22,10 +22,7 @@ pub mod scanner;
|
||||
|
||||
pub use error::{Error, Result};
|
||||
pub use heal::{channel::HealChannelProcessor, HealManager, HealOptions, HealPriority, HealRequest, HealType};
|
||||
pub use scanner::{
|
||||
BucketTargetUsageInfo, BucketUsageInfo, DataUsageInfo, Scanner, ScannerMetrics, load_data_usage_from_backend,
|
||||
store_data_usage_in_backend,
|
||||
};
|
||||
pub use scanner::Scanner;
|
||||
|
||||
// Global cancellation token for AHM services (scanner and other background tasks)
|
||||
static GLOBAL_AHM_SERVICES_CANCEL_TOKEN: OnceLock<CancellationToken> = OnceLock::new();
|
||||
|
||||
@@ -527,6 +527,7 @@ impl Scanner {
|
||||
mod_time: p.mod_time,
|
||||
index: p.index.clone(),
|
||||
checksums: p.checksums.clone(),
|
||||
error: None,
|
||||
})
|
||||
.collect(),
|
||||
erasure: rustfs_filemeta::ErasureInfo {
|
||||
@@ -1392,8 +1393,8 @@ mod tests {
|
||||
use rustfs_ecstore::endpoints::{EndpointServerPools, Endpoints, PoolEndpoints};
|
||||
use rustfs_ecstore::store::ECStore;
|
||||
use rustfs_ecstore::{
|
||||
StorageAPI,
|
||||
store_api::{MakeBucketOptions, ObjectIO, PutObjReader},
|
||||
StorageAPI,
|
||||
};
|
||||
use serial_test::serial;
|
||||
use std::fs;
|
||||
|
||||
@@ -12,197 +12,258 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::atomic::{AtomicU64, Ordering},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
/// Size interval for object size histogram
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SizeInterval {
|
||||
pub start: u64,
|
||||
pub end: u64,
|
||||
pub name: &'static str,
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::info;
|
||||
|
||||
/// Scanner metrics
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct ScannerMetrics {
|
||||
/// Total objects scanned since server start
|
||||
pub objects_scanned: u64,
|
||||
/// Total object versions scanned since server start
|
||||
pub versions_scanned: u64,
|
||||
/// Total directories scanned since server start
|
||||
pub directories_scanned: u64,
|
||||
/// Total bucket scans started since server start
|
||||
pub bucket_scans_started: u64,
|
||||
/// Total bucket scans finished since server start
|
||||
pub bucket_scans_finished: u64,
|
||||
/// Total objects with health issues found
|
||||
pub objects_with_issues: u64,
|
||||
/// Total heal tasks queued
|
||||
pub heal_tasks_queued: u64,
|
||||
/// Total heal tasks completed
|
||||
pub heal_tasks_completed: u64,
|
||||
/// Total heal tasks failed
|
||||
pub heal_tasks_failed: u64,
|
||||
/// Total healthy objects found
|
||||
pub healthy_objects: u64,
|
||||
/// Total corrupted objects found
|
||||
pub corrupted_objects: u64,
|
||||
/// Last scan activity time
|
||||
pub last_activity: Option<SystemTime>,
|
||||
/// Current scan cycle
|
||||
pub current_cycle: u64,
|
||||
/// Total scan cycles completed
|
||||
pub total_cycles: u64,
|
||||
/// Current scan duration
|
||||
pub current_scan_duration: Option<Duration>,
|
||||
/// Average scan duration
|
||||
pub avg_scan_duration: Duration,
|
||||
/// Objects scanned per second
|
||||
pub objects_per_second: f64,
|
||||
/// Buckets scanned per second
|
||||
pub buckets_per_second: f64,
|
||||
/// Storage metrics by bucket
|
||||
pub bucket_metrics: HashMap<String, BucketMetrics>,
|
||||
/// Disk metrics
|
||||
pub disk_metrics: HashMap<String, DiskMetrics>,
|
||||
}
|
||||
|
||||
/// Version interval for object versions histogram
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct VersionInterval {
|
||||
pub start: u64,
|
||||
pub end: u64,
|
||||
pub name: &'static str,
|
||||
/// Bucket-specific metrics
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct BucketMetrics {
|
||||
/// Bucket name
|
||||
pub bucket: String,
|
||||
/// Total objects in bucket
|
||||
pub total_objects: u64,
|
||||
/// Total size of objects in bucket (bytes)
|
||||
pub total_size: u64,
|
||||
/// Objects with health issues
|
||||
pub objects_with_issues: u64,
|
||||
/// Last scan time
|
||||
pub last_scan_time: Option<SystemTime>,
|
||||
/// Scan duration
|
||||
pub scan_duration: Option<Duration>,
|
||||
/// Heal tasks queued for this bucket
|
||||
pub heal_tasks_queued: u64,
|
||||
/// Heal tasks completed for this bucket
|
||||
pub heal_tasks_completed: u64,
|
||||
/// Heal tasks failed for this bucket
|
||||
pub heal_tasks_failed: u64,
|
||||
}
|
||||
|
||||
/// Object size histogram intervals
|
||||
pub const OBJECTS_HISTOGRAM_INTERVALS: &[SizeInterval] = &[
|
||||
SizeInterval {
|
||||
start: 0,
|
||||
end: 1024 - 1,
|
||||
name: "LESS_THAN_1_KiB",
|
||||
},
|
||||
SizeInterval {
|
||||
start: 1024,
|
||||
end: 1024 * 1024 - 1,
|
||||
name: "1_KiB_TO_1_MiB",
|
||||
},
|
||||
SizeInterval {
|
||||
start: 1024 * 1024,
|
||||
end: 10 * 1024 * 1024 - 1,
|
||||
name: "1_MiB_TO_10_MiB",
|
||||
},
|
||||
SizeInterval {
|
||||
start: 10 * 1024 * 1024,
|
||||
end: 64 * 1024 * 1024 - 1,
|
||||
name: "10_MiB_TO_64_MiB",
|
||||
},
|
||||
SizeInterval {
|
||||
start: 64 * 1024 * 1024,
|
||||
end: 128 * 1024 * 1024 - 1,
|
||||
name: "64_MiB_TO_128_MiB",
|
||||
},
|
||||
SizeInterval {
|
||||
start: 128 * 1024 * 1024,
|
||||
end: 512 * 1024 * 1024 - 1,
|
||||
name: "128_MiB_TO_512_MiB",
|
||||
},
|
||||
SizeInterval {
|
||||
start: 512 * 1024 * 1024,
|
||||
end: u64::MAX,
|
||||
name: "MORE_THAN_512_MiB",
|
||||
},
|
||||
];
|
||||
|
||||
/// Object version count histogram intervals
|
||||
pub const OBJECTS_VERSION_COUNT_INTERVALS: &[VersionInterval] = &[
|
||||
VersionInterval {
|
||||
start: 1,
|
||||
end: 1,
|
||||
name: "1_VERSION",
|
||||
},
|
||||
VersionInterval {
|
||||
start: 2,
|
||||
end: 10,
|
||||
name: "2_TO_10_VERSIONS",
|
||||
},
|
||||
VersionInterval {
|
||||
start: 11,
|
||||
end: 100,
|
||||
name: "11_TO_100_VERSIONS",
|
||||
},
|
||||
VersionInterval {
|
||||
start: 101,
|
||||
end: 1000,
|
||||
name: "101_TO_1000_VERSIONS",
|
||||
},
|
||||
VersionInterval {
|
||||
start: 1001,
|
||||
end: u64::MAX,
|
||||
name: "MORE_THAN_1000_VERSIONS",
|
||||
},
|
||||
];
|
||||
|
||||
/// Size histogram for object size distribution
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct SizeHistogram {
|
||||
counts: Vec<u64>,
|
||||
/// Disk-specific metrics
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct DiskMetrics {
|
||||
/// Disk path
|
||||
pub disk_path: String,
|
||||
/// Total disk space (bytes)
|
||||
pub total_space: u64,
|
||||
/// Used disk space (bytes)
|
||||
pub used_space: u64,
|
||||
/// Free disk space (bytes)
|
||||
pub free_space: u64,
|
||||
/// Objects scanned on this disk
|
||||
pub objects_scanned: u64,
|
||||
/// Objects with issues on this disk
|
||||
pub objects_with_issues: u64,
|
||||
/// Last scan time
|
||||
pub last_scan_time: Option<SystemTime>,
|
||||
/// Whether disk is online
|
||||
pub is_online: bool,
|
||||
/// Whether disk is being scanned
|
||||
pub is_scanning: bool,
|
||||
}
|
||||
|
||||
/// Versions histogram for object version count distribution
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct VersionsHistogram {
|
||||
counts: Vec<u64>,
|
||||
/// Thread-safe metrics collector
|
||||
pub struct MetricsCollector {
|
||||
/// Atomic counters for real-time metrics
|
||||
objects_scanned: AtomicU64,
|
||||
versions_scanned: AtomicU64,
|
||||
directories_scanned: AtomicU64,
|
||||
bucket_scans_started: AtomicU64,
|
||||
bucket_scans_finished: AtomicU64,
|
||||
objects_with_issues: AtomicU64,
|
||||
heal_tasks_queued: AtomicU64,
|
||||
heal_tasks_completed: AtomicU64,
|
||||
heal_tasks_failed: AtomicU64,
|
||||
current_cycle: AtomicU64,
|
||||
total_cycles: AtomicU64,
|
||||
healthy_objects: AtomicU64,
|
||||
corrupted_objects: AtomicU64,
|
||||
}
|
||||
|
||||
impl SizeHistogram {
|
||||
/// Create a new size histogram
|
||||
impl MetricsCollector {
|
||||
/// Create a new metrics collector
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
counts: vec![0; OBJECTS_HISTOGRAM_INTERVALS.len()],
|
||||
objects_scanned: AtomicU64::new(0),
|
||||
versions_scanned: AtomicU64::new(0),
|
||||
directories_scanned: AtomicU64::new(0),
|
||||
bucket_scans_started: AtomicU64::new(0),
|
||||
bucket_scans_finished: AtomicU64::new(0),
|
||||
objects_with_issues: AtomicU64::new(0),
|
||||
heal_tasks_queued: AtomicU64::new(0),
|
||||
heal_tasks_completed: AtomicU64::new(0),
|
||||
heal_tasks_failed: AtomicU64::new(0),
|
||||
current_cycle: AtomicU64::new(0),
|
||||
total_cycles: AtomicU64::new(0),
|
||||
healthy_objects: AtomicU64::new(0),
|
||||
corrupted_objects: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a size to the histogram
|
||||
pub fn add(&mut self, size: u64) {
|
||||
for (idx, interval) in OBJECTS_HISTOGRAM_INTERVALS.iter().enumerate() {
|
||||
if size >= interval.start && size <= interval.end {
|
||||
self.counts[idx] += 1;
|
||||
break;
|
||||
}
|
||||
/// Increment objects scanned count
|
||||
pub fn increment_objects_scanned(&self, count: u64) {
|
||||
self.objects_scanned.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment versions scanned count
|
||||
pub fn increment_versions_scanned(&self, count: u64) {
|
||||
self.versions_scanned.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment directories scanned count
|
||||
pub fn increment_directories_scanned(&self, count: u64) {
|
||||
self.directories_scanned.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment bucket scans started count
|
||||
pub fn increment_bucket_scans_started(&self, count: u64) {
|
||||
self.bucket_scans_started.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment bucket scans finished count
|
||||
pub fn increment_bucket_scans_finished(&self, count: u64) {
|
||||
self.bucket_scans_finished.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment objects with issues count
|
||||
pub fn increment_objects_with_issues(&self, count: u64) {
|
||||
self.objects_with_issues.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment heal tasks queued count
|
||||
pub fn increment_heal_tasks_queued(&self, count: u64) {
|
||||
self.heal_tasks_queued.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment heal tasks completed count
|
||||
pub fn increment_heal_tasks_completed(&self, count: u64) {
|
||||
self.heal_tasks_completed.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment heal tasks failed count
|
||||
pub fn increment_heal_tasks_failed(&self, count: u64) {
|
||||
self.heal_tasks_failed.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Set current cycle
|
||||
pub fn set_current_cycle(&self, cycle: u64) {
|
||||
self.current_cycle.store(cycle, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment total cycles
|
||||
pub fn increment_total_cycles(&self) {
|
||||
self.total_cycles.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment healthy objects count
|
||||
pub fn increment_healthy_objects(&self) {
|
||||
self.healthy_objects.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment corrupted objects count
|
||||
pub fn increment_corrupted_objects(&self) {
|
||||
self.corrupted_objects.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Get current metrics snapshot
|
||||
pub fn get_metrics(&self) -> ScannerMetrics {
|
||||
ScannerMetrics {
|
||||
objects_scanned: self.objects_scanned.load(Ordering::Relaxed),
|
||||
versions_scanned: self.versions_scanned.load(Ordering::Relaxed),
|
||||
directories_scanned: self.directories_scanned.load(Ordering::Relaxed),
|
||||
bucket_scans_started: self.bucket_scans_started.load(Ordering::Relaxed),
|
||||
bucket_scans_finished: self.bucket_scans_finished.load(Ordering::Relaxed),
|
||||
objects_with_issues: self.objects_with_issues.load(Ordering::Relaxed),
|
||||
heal_tasks_queued: self.heal_tasks_queued.load(Ordering::Relaxed),
|
||||
heal_tasks_completed: self.heal_tasks_completed.load(Ordering::Relaxed),
|
||||
heal_tasks_failed: self.heal_tasks_failed.load(Ordering::Relaxed),
|
||||
healthy_objects: self.healthy_objects.load(Ordering::Relaxed),
|
||||
corrupted_objects: self.corrupted_objects.load(Ordering::Relaxed),
|
||||
last_activity: Some(SystemTime::now()),
|
||||
current_cycle: self.current_cycle.load(Ordering::Relaxed),
|
||||
total_cycles: self.total_cycles.load(Ordering::Relaxed),
|
||||
current_scan_duration: None, // Will be set by scanner
|
||||
avg_scan_duration: Duration::ZERO, // Will be calculated
|
||||
objects_per_second: 0.0, // Will be calculated
|
||||
buckets_per_second: 0.0, // Will be calculated
|
||||
bucket_metrics: HashMap::new(), // Will be populated by scanner
|
||||
disk_metrics: HashMap::new(), // Will be populated by scanner
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the histogram as a map
|
||||
pub fn to_map(&self) -> HashMap<String, u64> {
|
||||
let mut result = HashMap::new();
|
||||
for (idx, count) in self.counts.iter().enumerate() {
|
||||
let interval = &OBJECTS_HISTOGRAM_INTERVALS[idx];
|
||||
result.insert(interval.name.to_string(), *count);
|
||||
}
|
||||
result
|
||||
}
|
||||
/// Reset all metrics
|
||||
pub fn reset(&self) {
|
||||
self.objects_scanned.store(0, Ordering::Relaxed);
|
||||
self.versions_scanned.store(0, Ordering::Relaxed);
|
||||
self.directories_scanned.store(0, Ordering::Relaxed);
|
||||
self.bucket_scans_started.store(0, Ordering::Relaxed);
|
||||
self.bucket_scans_finished.store(0, Ordering::Relaxed);
|
||||
self.objects_with_issues.store(0, Ordering::Relaxed);
|
||||
self.heal_tasks_queued.store(0, Ordering::Relaxed);
|
||||
self.heal_tasks_completed.store(0, Ordering::Relaxed);
|
||||
self.heal_tasks_failed.store(0, Ordering::Relaxed);
|
||||
self.current_cycle.store(0, Ordering::Relaxed);
|
||||
self.total_cycles.store(0, Ordering::Relaxed);
|
||||
self.healthy_objects.store(0, Ordering::Relaxed);
|
||||
self.corrupted_objects.store(0, Ordering::Relaxed);
|
||||
|
||||
/// Merge another histogram into this one
|
||||
pub fn merge(&mut self, other: &SizeHistogram) {
|
||||
for (idx, count) in other.counts.iter().enumerate() {
|
||||
self.counts[idx] += count;
|
||||
}
|
||||
}
|
||||
|
||||
/// Get total count
|
||||
pub fn total_count(&self) -> u64 {
|
||||
self.counts.iter().sum()
|
||||
}
|
||||
|
||||
/// Reset the histogram
|
||||
pub fn reset(&mut self) {
|
||||
for count in &mut self.counts {
|
||||
*count = 0;
|
||||
}
|
||||
info!("Scanner metrics reset");
|
||||
}
|
||||
}
|
||||
|
||||
impl VersionsHistogram {
|
||||
/// Create a new versions histogram
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
counts: vec![0; OBJECTS_VERSION_COUNT_INTERVALS.len()],
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a version count to the histogram
|
||||
pub fn add(&mut self, versions: u64) {
|
||||
for (idx, interval) in OBJECTS_VERSION_COUNT_INTERVALS.iter().enumerate() {
|
||||
if versions >= interval.start && versions <= interval.end {
|
||||
self.counts[idx] += 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the histogram as a map
|
||||
pub fn to_map(&self) -> HashMap<String, u64> {
|
||||
let mut result = HashMap::new();
|
||||
for (idx, count) in self.counts.iter().enumerate() {
|
||||
let interval = &OBJECTS_VERSION_COUNT_INTERVALS[idx];
|
||||
result.insert(interval.name.to_string(), *count);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Merge another histogram into this one
|
||||
pub fn merge(&mut self, other: &VersionsHistogram) {
|
||||
for (idx, count) in other.counts.iter().enumerate() {
|
||||
self.counts[idx] += count;
|
||||
}
|
||||
}
|
||||
|
||||
/// Get total count
|
||||
pub fn total_count(&self) -> u64 {
|
||||
self.counts.iter().sum()
|
||||
}
|
||||
|
||||
/// Reset the histogram
|
||||
pub fn reset(&mut self) {
|
||||
for count in &mut self.counts {
|
||||
*count = 0;
|
||||
}
|
||||
impl Default for MetricsCollector {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -211,67 +272,35 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_size_histogram() {
|
||||
let mut histogram = SizeHistogram::new();
|
||||
|
||||
// Add some sizes
|
||||
histogram.add(512); // LESS_THAN_1_KiB
|
||||
histogram.add(1024); // 1_KiB_TO_1_MiB
|
||||
histogram.add(1024 * 1024); // 1_MiB_TO_10_MiB
|
||||
histogram.add(5 * 1024 * 1024); // 1_MiB_TO_10_MiB
|
||||
|
||||
let map = histogram.to_map();
|
||||
|
||||
assert_eq!(map.get("LESS_THAN_1_KiB"), Some(&1));
|
||||
assert_eq!(map.get("1_KiB_TO_1_MiB"), Some(&1));
|
||||
assert_eq!(map.get("1_MiB_TO_10_MiB"), Some(&2));
|
||||
assert_eq!(map.get("10_MiB_TO_64_MiB"), Some(&0));
|
||||
fn test_metrics_collector_creation() {
|
||||
let collector = MetricsCollector::new();
|
||||
let metrics = collector.get_metrics();
|
||||
assert_eq!(metrics.objects_scanned, 0);
|
||||
assert_eq!(metrics.versions_scanned, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_versions_histogram() {
|
||||
let mut histogram = VersionsHistogram::new();
|
||||
fn test_metrics_increment() {
|
||||
let collector = MetricsCollector::new();
|
||||
|
||||
// Add some version counts
|
||||
histogram.add(1); // 1_VERSION
|
||||
histogram.add(5); // 2_TO_10_VERSIONS
|
||||
histogram.add(50); // 11_TO_100_VERSIONS
|
||||
histogram.add(500); // 101_TO_1000_VERSIONS
|
||||
collector.increment_objects_scanned(10);
|
||||
collector.increment_versions_scanned(5);
|
||||
collector.increment_objects_with_issues(2);
|
||||
|
||||
let map = histogram.to_map();
|
||||
|
||||
assert_eq!(map.get("1_VERSION"), Some(&1));
|
||||
assert_eq!(map.get("2_TO_10_VERSIONS"), Some(&1));
|
||||
assert_eq!(map.get("11_TO_100_VERSIONS"), Some(&1));
|
||||
assert_eq!(map.get("101_TO_1000_VERSIONS"), Some(&1));
|
||||
let metrics = collector.get_metrics();
|
||||
assert_eq!(metrics.objects_scanned, 10);
|
||||
assert_eq!(metrics.versions_scanned, 5);
|
||||
assert_eq!(metrics.objects_with_issues, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_histogram_merge() {
|
||||
let mut histogram1 = SizeHistogram::new();
|
||||
histogram1.add(1024);
|
||||
histogram1.add(1024 * 1024);
|
||||
fn test_metrics_reset() {
|
||||
let collector = MetricsCollector::new();
|
||||
|
||||
let mut histogram2 = SizeHistogram::new();
|
||||
histogram2.add(1024);
|
||||
histogram2.add(5 * 1024 * 1024);
|
||||
collector.increment_objects_scanned(10);
|
||||
collector.reset();
|
||||
|
||||
histogram1.merge(&histogram2);
|
||||
|
||||
let map = histogram1.to_map();
|
||||
assert_eq!(map.get("1_KiB_TO_1_MiB"), Some(&2)); // 1 from histogram1 + 1 from histogram2
|
||||
assert_eq!(map.get("1_MiB_TO_10_MiB"), Some(&2)); // 1 from histogram1 + 1 from histogram2
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_histogram_reset() {
|
||||
let mut histogram = SizeHistogram::new();
|
||||
histogram.add(1024);
|
||||
histogram.add(1024 * 1024);
|
||||
|
||||
assert_eq!(histogram.total_count(), 2);
|
||||
|
||||
histogram.reset();
|
||||
assert_eq!(histogram.total_count(), 0);
|
||||
let metrics = collector.get_metrics();
|
||||
assert_eq!(metrics.objects_scanned, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,13 +13,8 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub mod data_scanner;
|
||||
pub mod data_usage;
|
||||
pub mod histogram;
|
||||
pub mod metrics;
|
||||
|
||||
// Re-export main types for convenience
|
||||
pub use data_scanner::Scanner;
|
||||
pub use data_usage::{
|
||||
BucketTargetUsageInfo, BucketUsageInfo, DataUsageInfo, load_data_usage_from_backend, store_data_usage_in_backend,
|
||||
};
|
||||
pub use metrics::ScannerMetrics;
|
||||
|
||||
@@ -28,7 +28,8 @@ categories = ["web-programming", "development-tools", "data-structures"]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
tokio.workspace = true
|
||||
lazy_static = { workspace = true}
|
||||
tokio = { workspace = true }
|
||||
tonic = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
|
||||
@@ -21,9 +21,6 @@ use super::{
|
||||
};
|
||||
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
|
||||
|
||||
use crate::bucket::metadata_sys::{self};
|
||||
use crate::bucket::versioning::VersioningApi;
|
||||
use crate::bucket::versioning_sys::BucketVersioningSys;
|
||||
use crate::disk::error::FileAccessDeniedWithContext;
|
||||
use crate::disk::error_conv::{to_access_error, to_file_error, to_unformatted_disk_error, to_volume_error};
|
||||
use crate::disk::fs::{
|
||||
@@ -36,16 +33,6 @@ use crate::disk::{
|
||||
};
|
||||
use crate::disk::{FileWriter, STORAGE_FORMAT_FILE};
|
||||
use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold};
|
||||
use crate::heal::data_scanner::{
|
||||
ScannerItem, ShouldSleepFn, SizeSummary, lc_has_active_rules, rep_has_active_rules, scan_data_folder,
|
||||
};
|
||||
use crate::heal::data_usage_cache::{DataUsageCache, DataUsageEntry};
|
||||
use crate::heal::error::{ERR_IGNORE_FILE_CONTRIB, ERR_SKIP_FILE};
|
||||
use crate::heal::heal_commands::{HealScanMode, HealingTracker};
|
||||
use crate::heal::heal_ops::HEALING_TRACKER_FILENAME;
|
||||
use crate::new_object_layer_fn;
|
||||
use crate::store_api::{ObjectInfo, StorageAPI};
|
||||
use rustfs_common::metrics::{Metric, Metrics};
|
||||
use rustfs_utils::path::{
|
||||
GLOBAL_DIR_SUFFIX, GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR, clean, decode_dir_object, encode_dir_object, has_suffix,
|
||||
path_join, path_join_buf,
|
||||
@@ -55,19 +42,18 @@ use tokio::time::interval;
|
||||
use crate::erasure_coding::bitrot_verify;
|
||||
use bytes::Bytes;
|
||||
use path_absolutize::Absolutize;
|
||||
use rustfs_common::defer;
|
||||
use rustfs_filemeta::{
|
||||
Cache, FileInfo, FileInfoOpts, FileMeta, MetaCacheEntry, MetacacheWriter, ObjectPartInfo, Opts, RawFileInfo, UpdateFn,
|
||||
get_file_info, read_xl_meta_no_data,
|
||||
};
|
||||
use rustfs_utils::HashAlgorithm;
|
||||
use rustfs_utils::os::get_info;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::io::SeekFrom;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::time::Duration;
|
||||
use std::{
|
||||
fs::Metadata,
|
||||
path::{Path, PathBuf},
|
||||
@@ -76,7 +62,6 @@ use time::OffsetDateTime;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, ErrorKind};
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -2268,184 +2253,6 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "info", skip_all)]
|
||||
async fn ns_scanner(
|
||||
&self,
|
||||
cache: &DataUsageCache,
|
||||
updates: Sender<DataUsageEntry>,
|
||||
scan_mode: HealScanMode,
|
||||
we_sleep: ShouldSleepFn,
|
||||
) -> Result<DataUsageCache> {
|
||||
self.scanning.fetch_add(1, Ordering::SeqCst);
|
||||
defer!(|| { self.scanning.fetch_sub(1, Ordering::SeqCst) });
|
||||
|
||||
// must before metadata_sys
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(Error::other("errServerNotInitialized"));
|
||||
};
|
||||
|
||||
let mut cache = cache.clone();
|
||||
// Check if the current bucket has a configured lifecycle policy
|
||||
if let Ok((lc, _)) = metadata_sys::get_lifecycle_config(&cache.info.name).await {
|
||||
if lc_has_active_rules(&lc, "") {
|
||||
cache.info.lifecycle = Some(lc);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the current bucket has replication configuration
|
||||
if let Ok((rcfg, _)) = metadata_sys::get_replication_config(&cache.info.name).await {
|
||||
if rep_has_active_rules(&rcfg, "", true) {
|
||||
// TODO: globalBucketTargetSys
|
||||
}
|
||||
}
|
||||
|
||||
let vcfg = BucketVersioningSys::get(&cache.info.name).await.ok();
|
||||
|
||||
let loc = self.get_disk_location();
|
||||
// TODO: 这里需要处理错误
|
||||
let disks = store
|
||||
.get_disks(loc.pool_idx.unwrap(), loc.disk_idx.unwrap())
|
||||
.await
|
||||
.map_err(|e| Error::other(e.to_string()))?;
|
||||
let disk = Arc::new(LocalDisk::new(&self.endpoint(), false).await?);
|
||||
let disk_clone = disk.clone();
|
||||
cache.info.updates = Some(updates.clone());
|
||||
let mut data_usage_info = scan_data_folder(
|
||||
&disks,
|
||||
disk,
|
||||
&cache,
|
||||
Box::new(move |item: &ScannerItem| {
|
||||
let mut item = item.clone();
|
||||
let disk = disk_clone.clone();
|
||||
let vcfg = vcfg.clone();
|
||||
Box::pin(async move {
|
||||
if !item.path.ends_with(&format!("{SLASH_SEPARATOR}{STORAGE_FORMAT_FILE}")) {
|
||||
return Err(Error::other(ERR_SKIP_FILE).into());
|
||||
}
|
||||
let stop_fn = Metrics::log(Metric::ScanObject);
|
||||
let mut res = HashMap::new();
|
||||
let done_sz = Metrics::time_size(Metric::ReadMetadata);
|
||||
let buf = match disk.read_metadata(item.path.clone()).await {
|
||||
Ok(buf) => buf,
|
||||
Err(err) => {
|
||||
res.insert("err".to_string(), err.to_string());
|
||||
stop_fn(&res);
|
||||
return Err(Error::other(ERR_SKIP_FILE).into());
|
||||
}
|
||||
};
|
||||
done_sz(buf.len() as u64);
|
||||
res.insert("metasize".to_string(), buf.len().to_string());
|
||||
item.transform_meta_dir();
|
||||
let meta_cache = MetaCacheEntry {
|
||||
name: item.object_path().to_string_lossy().to_string(),
|
||||
metadata: buf,
|
||||
..Default::default()
|
||||
};
|
||||
let fivs = match meta_cache.file_info_versions(&item.bucket) {
|
||||
Ok(fivs) => fivs,
|
||||
Err(err) => {
|
||||
res.insert("err".to_string(), err.to_string());
|
||||
stop_fn(&res);
|
||||
return Err(Error::other(ERR_SKIP_FILE).into());
|
||||
}
|
||||
};
|
||||
let mut size_s = SizeSummary::default();
|
||||
let done = Metrics::time(Metric::ApplyAll);
|
||||
let obj_infos = match item.apply_versions_actions(&fivs.versions).await {
|
||||
Ok(obj_infos) => obj_infos,
|
||||
Err(err) => {
|
||||
res.insert("err".to_string(), err.to_string());
|
||||
stop_fn(&res);
|
||||
return Err(Error::other(ERR_SKIP_FILE).into());
|
||||
}
|
||||
};
|
||||
|
||||
let versioned = if let Some(vcfg) = vcfg.as_ref() {
|
||||
vcfg.versioned(item.object_path().to_str().unwrap_or_default())
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
let mut obj_deleted = false;
|
||||
for info in obj_infos.iter() {
|
||||
let done = Metrics::time(Metric::ApplyVersion);
|
||||
let sz: i64;
|
||||
(obj_deleted, sz) = item.apply_actions(info, &mut size_s).await;
|
||||
done();
|
||||
|
||||
if obj_deleted {
|
||||
break;
|
||||
}
|
||||
|
||||
let actual_sz = match info.get_actual_size() {
|
||||
Ok(size) => size,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
if info.delete_marker {
|
||||
size_s.delete_markers += 1;
|
||||
}
|
||||
|
||||
if info.version_id.is_some() && sz == actual_sz {
|
||||
size_s.versions += 1;
|
||||
}
|
||||
|
||||
size_s.total_size += sz as usize;
|
||||
|
||||
if info.delete_marker {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
for free_version in fivs.free_versions.iter() {
|
||||
let _obj_info = ObjectInfo::from_file_info(
|
||||
free_version,
|
||||
&item.bucket,
|
||||
&item.object_path().to_string_lossy(),
|
||||
versioned,
|
||||
);
|
||||
let done = Metrics::time(Metric::TierObjSweep);
|
||||
done();
|
||||
}
|
||||
|
||||
// todo: global trace
|
||||
if obj_deleted {
|
||||
return Err(Error::other(ERR_IGNORE_FILE_CONTRIB).into());
|
||||
}
|
||||
done();
|
||||
Ok(size_s)
|
||||
})
|
||||
}),
|
||||
scan_mode,
|
||||
we_sleep,
|
||||
)
|
||||
.await?;
|
||||
data_usage_info.info.last_update = Some(SystemTime::now());
|
||||
debug!("ns_scanner completed: {data_usage_info:?}");
|
||||
Ok(data_usage_info)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn healing(&self) -> Option<HealingTracker> {
|
||||
let healing_file = path_join(&[
|
||||
self.path(),
|
||||
PathBuf::from(RUSTFS_META_BUCKET),
|
||||
PathBuf::from(BUCKET_META_PREFIX),
|
||||
PathBuf::from(HEALING_TRACKER_FILENAME),
|
||||
]);
|
||||
let b = match fs::read(healing_file).await {
|
||||
Ok(b) => b,
|
||||
Err(_) => return None,
|
||||
};
|
||||
if b.is_empty() {
|
||||
return None;
|
||||
}
|
||||
match HealingTracker::unmarshal_msg(&b) {
|
||||
Ok(h) => Some(h),
|
||||
Err(_) => Some(HealingTracker::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_disk_info(drive_path: PathBuf) -> Result<(rustfs_utils::os::DiskInfo, bool)> {
|
||||
|
||||
@@ -21,9 +21,9 @@ use rustfs_protos::{
|
||||
node_service_time_out_client,
|
||||
proto_gen::node_service::{
|
||||
CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest,
|
||||
DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, NsScannerRequest,
|
||||
ReadAllRequest, ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest,
|
||||
RenameFileRequest, StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
|
||||
DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest,
|
||||
ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequest,
|
||||
StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -32,26 +32,15 @@ use crate::disk::{
|
||||
ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions,
|
||||
endpoint::Endpoint,
|
||||
};
|
||||
use crate::disk::{FileReader, FileWriter};
|
||||
use crate::{
|
||||
disk::error::{Error, Result},
|
||||
rpc::build_auth_headers,
|
||||
};
|
||||
use crate::{
|
||||
disk::{FileReader, FileWriter},
|
||||
heal::{
|
||||
data_scanner::ShouldSleepFn,
|
||||
data_usage_cache::{DataUsageCache, DataUsageEntry},
|
||||
heal_commands::{HealScanMode, HealingTracker},
|
||||
},
|
||||
};
|
||||
use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo};
|
||||
use rustfs_protos::proto_gen::node_service::RenamePartRequest;
|
||||
use rustfs_rio::{HttpReader, HttpWriter};
|
||||
use tokio::{
|
||||
io::AsyncWrite,
|
||||
sync::mpsc::{self, Sender},
|
||||
};
|
||||
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
|
||||
use tokio::io::AsyncWrite;
|
||||
use tonic::Request;
|
||||
use tracing::info;
|
||||
use uuid::Uuid;
|
||||
@@ -927,55 +916,6 @@ impl DiskAPI for RemoteDisk {
|
||||
|
||||
Ok(disk_info)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, cache, scan_mode, _we_sleep))]
|
||||
async fn ns_scanner(
|
||||
&self,
|
||||
cache: &DataUsageCache,
|
||||
updates: Sender<DataUsageEntry>,
|
||||
scan_mode: HealScanMode,
|
||||
_we_sleep: ShouldSleepFn,
|
||||
) -> Result<DataUsageCache> {
|
||||
info!("ns_scanner");
|
||||
let cache = serde_json::to_string(cache)?;
|
||||
let mut client = node_service_time_out_client(&self.addr)
|
||||
.await
|
||||
.map_err(|err| Error::other(format!("can not get client, err: {err}")))?;
|
||||
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
let in_stream = ReceiverStream::new(rx);
|
||||
let mut response = client.ns_scanner(in_stream).await?.into_inner();
|
||||
let request = NsScannerRequest {
|
||||
disk: self.endpoint.to_string(),
|
||||
cache,
|
||||
scan_mode: scan_mode as u64,
|
||||
};
|
||||
tx.send(request)
|
||||
.await
|
||||
.map_err(|err| Error::other(format!("can not send request, err: {err}")))?;
|
||||
|
||||
loop {
|
||||
match response.next().await {
|
||||
Some(Ok(resp)) => {
|
||||
if !resp.update.is_empty() {
|
||||
let data_usage_cache = serde_json::from_str::<DataUsageEntry>(&resp.update)?;
|
||||
let _ = updates.send(data_usage_cache).await;
|
||||
} else if !resp.data_usage_cache.is_empty() {
|
||||
let data_usage_cache = serde_json::from_str::<DataUsageCache>(&resp.data_usage_cache)?;
|
||||
return Ok(data_usage_cache);
|
||||
} else {
|
||||
return Err(Error::other("scan was interrupted"));
|
||||
}
|
||||
}
|
||||
_ => return Err(Error::other("scan was interrupted")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn healing(&self) -> Option<HealingTracker> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -970,6 +970,7 @@ pub trait StorageAPI: ObjectIO {
|
||||
// Walk TODO:
|
||||
|
||||
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
|
||||
async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()>;
|
||||
async fn copy_object(
|
||||
&self,
|
||||
src_bucket: &str,
|
||||
|
||||
@@ -54,6 +54,7 @@ use rustfs_iam::init_iam_sys;
|
||||
use rustfs_obs::{init_obs, set_global_guard};
|
||||
use rustfs_utils::net::parse_and_resolve_address;
|
||||
use std::io::{Error, Result};
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
|
||||
#[cfg(all(target_os = "linux", target_env = "gnu"))]
|
||||
|
||||
Reference in New Issue
Block a user