diff --git a/Cargo.lock b/Cargo.lock index 683f5a05..3098046b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7933,6 +7933,7 @@ dependencies = [ "chrono", "futures", "lazy_static", + "once_cell", "rmp-serde", "rustfs-common", "rustfs-ecstore", @@ -7942,14 +7943,18 @@ dependencies = [ "rustfs-utils", "serde", "serde_json", + "serial_test", + "tempfile", "thiserror 2.0.12", "time", "tokio", "tokio-test", "tokio-util", "tracing", + "tracing-subscriber", "url", "uuid", + "walkdir", ] [[package]] @@ -7968,6 +7973,7 @@ version = "0.0.5" dependencies = [ "async-trait", "chrono", + "lazy_static", "path-clean", "rmp-serde", "rustfs-filemeta", diff --git a/crates/ahm/Cargo.toml b/crates/ahm/Cargo.toml index 5442fb71..817e3019 100644 --- a/crates/ahm/Cargo.toml +++ b/crates/ahm/Cargo.toml @@ -1,16 +1,10 @@ [package] name = "rustfs-ahm" -version.workspace = true -edition.workspace = true +version = "0.0.5" +edition = "2021" authors = ["RustFS Team"] -license.workspace = true +license = "Apache-2.0" description = "RustFS AHM (Automatic Health Management) Scanner" -repository.workspace = true -rust-version.workspace = true -homepage.workspace = true -documentation = "https://docs.rs/rustfs-ahm/latest/rustfs_ahm/" -keywords = ["RustFS", "AHM", "health-management", "scanner", "Minio"] -categories = ["web-programming", "development-tools", "filesystem"] [dependencies] rustfs-ecstore = { workspace = true } @@ -38,5 +32,10 @@ chrono = { workspace = true } [dev-dependencies] rmp-serde = { workspace = true } -tokio-test = { workspace = true } +tokio-test = "0.4" serde_json = { workspace = true } +serial_test = "3.2.0" +once_cell = { workspace = true } +tracing-subscriber = { workspace = true } +walkdir = "2.5.0" +tempfile = "3.10" \ No newline at end of file diff --git a/crates/ahm/src/lib.rs b/crates/ahm/src/lib.rs index 17a6b71b..7bc1b689 100644 --- a/crates/ahm/src/lib.rs +++ b/crates/ahm/src/lib.rs @@ -22,10 +22,7 @@ pub mod scanner; pub use error::{Error, Result}; pub use heal::{channel::HealChannelProcessor, HealManager, HealOptions, HealPriority, HealRequest, HealType}; -pub use scanner::{ - BucketTargetUsageInfo, BucketUsageInfo, DataUsageInfo, Scanner, ScannerMetrics, load_data_usage_from_backend, - store_data_usage_in_backend, -}; +pub use scanner::Scanner; // Global cancellation token for AHM services (scanner and other background tasks) static GLOBAL_AHM_SERVICES_CANCEL_TOKEN: OnceLock = OnceLock::new(); diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index ab19595c..5a28186a 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -527,6 +527,7 @@ impl Scanner { mod_time: p.mod_time, index: p.index.clone(), checksums: p.checksums.clone(), + error: None, }) .collect(), erasure: rustfs_filemeta::ErasureInfo { @@ -1392,8 +1393,8 @@ mod tests { use rustfs_ecstore::endpoints::{EndpointServerPools, Endpoints, PoolEndpoints}; use rustfs_ecstore::store::ECStore; use rustfs_ecstore::{ - StorageAPI, store_api::{MakeBucketOptions, ObjectIO, PutObjReader}, + StorageAPI, }; use serial_test::serial; use std::fs; diff --git a/crates/ahm/src/scanner/histogram.rs b/crates/ahm/src/scanner/histogram.rs index 778569ce..f5d7b73b 100644 --- a/crates/ahm/src/scanner/histogram.rs +++ b/crates/ahm/src/scanner/histogram.rs @@ -12,197 +12,258 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::{ + collections::HashMap, + sync::atomic::{AtomicU64, Ordering}, + time::{Duration, SystemTime}, +}; -/// Size interval for object size histogram -#[derive(Debug, Clone)] -pub struct SizeInterval { - pub start: u64, - pub end: u64, - pub name: &'static str, +use serde::{Deserialize, Serialize}; +use tracing::info; + +/// Scanner metrics +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct ScannerMetrics { + /// Total objects scanned since server start + pub objects_scanned: u64, + /// Total object versions scanned since server start + pub versions_scanned: u64, + /// Total directories scanned since server start + pub directories_scanned: u64, + /// Total bucket scans started since server start + pub bucket_scans_started: u64, + /// Total bucket scans finished since server start + pub bucket_scans_finished: u64, + /// Total objects with health issues found + pub objects_with_issues: u64, + /// Total heal tasks queued + pub heal_tasks_queued: u64, + /// Total heal tasks completed + pub heal_tasks_completed: u64, + /// Total heal tasks failed + pub heal_tasks_failed: u64, + /// Total healthy objects found + pub healthy_objects: u64, + /// Total corrupted objects found + pub corrupted_objects: u64, + /// Last scan activity time + pub last_activity: Option, + /// Current scan cycle + pub current_cycle: u64, + /// Total scan cycles completed + pub total_cycles: u64, + /// Current scan duration + pub current_scan_duration: Option, + /// Average scan duration + pub avg_scan_duration: Duration, + /// Objects scanned per second + pub objects_per_second: f64, + /// Buckets scanned per second + pub buckets_per_second: f64, + /// Storage metrics by bucket + pub bucket_metrics: HashMap, + /// Disk metrics + pub disk_metrics: HashMap, } -/// Version interval for object versions histogram -#[derive(Debug, Clone)] -pub struct VersionInterval { - pub start: u64, - pub end: u64, - pub name: &'static str, +/// Bucket-specific metrics +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct BucketMetrics { + /// Bucket name + pub bucket: String, + /// Total objects in bucket + pub total_objects: u64, + /// Total size of objects in bucket (bytes) + pub total_size: u64, + /// Objects with health issues + pub objects_with_issues: u64, + /// Last scan time + pub last_scan_time: Option, + /// Scan duration + pub scan_duration: Option, + /// Heal tasks queued for this bucket + pub heal_tasks_queued: u64, + /// Heal tasks completed for this bucket + pub heal_tasks_completed: u64, + /// Heal tasks failed for this bucket + pub heal_tasks_failed: u64, } -/// Object size histogram intervals -pub const OBJECTS_HISTOGRAM_INTERVALS: &[SizeInterval] = &[ - SizeInterval { - start: 0, - end: 1024 - 1, - name: "LESS_THAN_1_KiB", - }, - SizeInterval { - start: 1024, - end: 1024 * 1024 - 1, - name: "1_KiB_TO_1_MiB", - }, - SizeInterval { - start: 1024 * 1024, - end: 10 * 1024 * 1024 - 1, - name: "1_MiB_TO_10_MiB", - }, - SizeInterval { - start: 10 * 1024 * 1024, - end: 64 * 1024 * 1024 - 1, - name: "10_MiB_TO_64_MiB", - }, - SizeInterval { - start: 64 * 1024 * 1024, - end: 128 * 1024 * 1024 - 1, - name: "64_MiB_TO_128_MiB", - }, - SizeInterval { - start: 128 * 1024 * 1024, - end: 512 * 1024 * 1024 - 1, - name: "128_MiB_TO_512_MiB", - }, - SizeInterval { - start: 512 * 1024 * 1024, - end: u64::MAX, - name: "MORE_THAN_512_MiB", - }, -]; - -/// Object version count histogram intervals -pub const OBJECTS_VERSION_COUNT_INTERVALS: &[VersionInterval] = &[ - VersionInterval { - start: 1, - end: 1, - name: "1_VERSION", - }, - VersionInterval { - start: 2, - end: 10, - name: "2_TO_10_VERSIONS", - }, - VersionInterval { - start: 11, - end: 100, - name: "11_TO_100_VERSIONS", - }, - VersionInterval { - start: 101, - end: 1000, - name: "101_TO_1000_VERSIONS", - }, - VersionInterval { - start: 1001, - end: u64::MAX, - name: "MORE_THAN_1000_VERSIONS", - }, -]; - -/// Size histogram for object size distribution -#[derive(Debug, Clone, Default)] -pub struct SizeHistogram { - counts: Vec, +/// Disk-specific metrics +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct DiskMetrics { + /// Disk path + pub disk_path: String, + /// Total disk space (bytes) + pub total_space: u64, + /// Used disk space (bytes) + pub used_space: u64, + /// Free disk space (bytes) + pub free_space: u64, + /// Objects scanned on this disk + pub objects_scanned: u64, + /// Objects with issues on this disk + pub objects_with_issues: u64, + /// Last scan time + pub last_scan_time: Option, + /// Whether disk is online + pub is_online: bool, + /// Whether disk is being scanned + pub is_scanning: bool, } -/// Versions histogram for object version count distribution -#[derive(Debug, Clone, Default)] -pub struct VersionsHistogram { - counts: Vec, +/// Thread-safe metrics collector +pub struct MetricsCollector { + /// Atomic counters for real-time metrics + objects_scanned: AtomicU64, + versions_scanned: AtomicU64, + directories_scanned: AtomicU64, + bucket_scans_started: AtomicU64, + bucket_scans_finished: AtomicU64, + objects_with_issues: AtomicU64, + heal_tasks_queued: AtomicU64, + heal_tasks_completed: AtomicU64, + heal_tasks_failed: AtomicU64, + current_cycle: AtomicU64, + total_cycles: AtomicU64, + healthy_objects: AtomicU64, + corrupted_objects: AtomicU64, } -impl SizeHistogram { - /// Create a new size histogram +impl MetricsCollector { + /// Create a new metrics collector pub fn new() -> Self { Self { - counts: vec![0; OBJECTS_HISTOGRAM_INTERVALS.len()], + objects_scanned: AtomicU64::new(0), + versions_scanned: AtomicU64::new(0), + directories_scanned: AtomicU64::new(0), + bucket_scans_started: AtomicU64::new(0), + bucket_scans_finished: AtomicU64::new(0), + objects_with_issues: AtomicU64::new(0), + heal_tasks_queued: AtomicU64::new(0), + heal_tasks_completed: AtomicU64::new(0), + heal_tasks_failed: AtomicU64::new(0), + current_cycle: AtomicU64::new(0), + total_cycles: AtomicU64::new(0), + healthy_objects: AtomicU64::new(0), + corrupted_objects: AtomicU64::new(0), } } - /// Add a size to the histogram - pub fn add(&mut self, size: u64) { - for (idx, interval) in OBJECTS_HISTOGRAM_INTERVALS.iter().enumerate() { - if size >= interval.start && size <= interval.end { - self.counts[idx] += 1; - break; - } + /// Increment objects scanned count + pub fn increment_objects_scanned(&self, count: u64) { + self.objects_scanned.fetch_add(count, Ordering::Relaxed); + } + + /// Increment versions scanned count + pub fn increment_versions_scanned(&self, count: u64) { + self.versions_scanned.fetch_add(count, Ordering::Relaxed); + } + + /// Increment directories scanned count + pub fn increment_directories_scanned(&self, count: u64) { + self.directories_scanned.fetch_add(count, Ordering::Relaxed); + } + + /// Increment bucket scans started count + pub fn increment_bucket_scans_started(&self, count: u64) { + self.bucket_scans_started.fetch_add(count, Ordering::Relaxed); + } + + /// Increment bucket scans finished count + pub fn increment_bucket_scans_finished(&self, count: u64) { + self.bucket_scans_finished.fetch_add(count, Ordering::Relaxed); + } + + /// Increment objects with issues count + pub fn increment_objects_with_issues(&self, count: u64) { + self.objects_with_issues.fetch_add(count, Ordering::Relaxed); + } + + /// Increment heal tasks queued count + pub fn increment_heal_tasks_queued(&self, count: u64) { + self.heal_tasks_queued.fetch_add(count, Ordering::Relaxed); + } + + /// Increment heal tasks completed count + pub fn increment_heal_tasks_completed(&self, count: u64) { + self.heal_tasks_completed.fetch_add(count, Ordering::Relaxed); + } + + /// Increment heal tasks failed count + pub fn increment_heal_tasks_failed(&self, count: u64) { + self.heal_tasks_failed.fetch_add(count, Ordering::Relaxed); + } + + /// Set current cycle + pub fn set_current_cycle(&self, cycle: u64) { + self.current_cycle.store(cycle, Ordering::Relaxed); + } + + /// Increment total cycles + pub fn increment_total_cycles(&self) { + self.total_cycles.fetch_add(1, Ordering::Relaxed); + } + + /// Increment healthy objects count + pub fn increment_healthy_objects(&self) { + self.healthy_objects.fetch_add(1, Ordering::Relaxed); + } + + /// Increment corrupted objects count + pub fn increment_corrupted_objects(&self) { + self.corrupted_objects.fetch_add(1, Ordering::Relaxed); + } + + /// Get current metrics snapshot + pub fn get_metrics(&self) -> ScannerMetrics { + ScannerMetrics { + objects_scanned: self.objects_scanned.load(Ordering::Relaxed), + versions_scanned: self.versions_scanned.load(Ordering::Relaxed), + directories_scanned: self.directories_scanned.load(Ordering::Relaxed), + bucket_scans_started: self.bucket_scans_started.load(Ordering::Relaxed), + bucket_scans_finished: self.bucket_scans_finished.load(Ordering::Relaxed), + objects_with_issues: self.objects_with_issues.load(Ordering::Relaxed), + heal_tasks_queued: self.heal_tasks_queued.load(Ordering::Relaxed), + heal_tasks_completed: self.heal_tasks_completed.load(Ordering::Relaxed), + heal_tasks_failed: self.heal_tasks_failed.load(Ordering::Relaxed), + healthy_objects: self.healthy_objects.load(Ordering::Relaxed), + corrupted_objects: self.corrupted_objects.load(Ordering::Relaxed), + last_activity: Some(SystemTime::now()), + current_cycle: self.current_cycle.load(Ordering::Relaxed), + total_cycles: self.total_cycles.load(Ordering::Relaxed), + current_scan_duration: None, // Will be set by scanner + avg_scan_duration: Duration::ZERO, // Will be calculated + objects_per_second: 0.0, // Will be calculated + buckets_per_second: 0.0, // Will be calculated + bucket_metrics: HashMap::new(), // Will be populated by scanner + disk_metrics: HashMap::new(), // Will be populated by scanner } } - /// Get the histogram as a map - pub fn to_map(&self) -> HashMap { - let mut result = HashMap::new(); - for (idx, count) in self.counts.iter().enumerate() { - let interval = &OBJECTS_HISTOGRAM_INTERVALS[idx]; - result.insert(interval.name.to_string(), *count); - } - result - } + /// Reset all metrics + pub fn reset(&self) { + self.objects_scanned.store(0, Ordering::Relaxed); + self.versions_scanned.store(0, Ordering::Relaxed); + self.directories_scanned.store(0, Ordering::Relaxed); + self.bucket_scans_started.store(0, Ordering::Relaxed); + self.bucket_scans_finished.store(0, Ordering::Relaxed); + self.objects_with_issues.store(0, Ordering::Relaxed); + self.heal_tasks_queued.store(0, Ordering::Relaxed); + self.heal_tasks_completed.store(0, Ordering::Relaxed); + self.heal_tasks_failed.store(0, Ordering::Relaxed); + self.current_cycle.store(0, Ordering::Relaxed); + self.total_cycles.store(0, Ordering::Relaxed); + self.healthy_objects.store(0, Ordering::Relaxed); + self.corrupted_objects.store(0, Ordering::Relaxed); - /// Merge another histogram into this one - pub fn merge(&mut self, other: &SizeHistogram) { - for (idx, count) in other.counts.iter().enumerate() { - self.counts[idx] += count; - } - } - - /// Get total count - pub fn total_count(&self) -> u64 { - self.counts.iter().sum() - } - - /// Reset the histogram - pub fn reset(&mut self) { - for count in &mut self.counts { - *count = 0; - } + info!("Scanner metrics reset"); } } -impl VersionsHistogram { - /// Create a new versions histogram - pub fn new() -> Self { - Self { - counts: vec![0; OBJECTS_VERSION_COUNT_INTERVALS.len()], - } - } - - /// Add a version count to the histogram - pub fn add(&mut self, versions: u64) { - for (idx, interval) in OBJECTS_VERSION_COUNT_INTERVALS.iter().enumerate() { - if versions >= interval.start && versions <= interval.end { - self.counts[idx] += 1; - break; - } - } - } - - /// Get the histogram as a map - pub fn to_map(&self) -> HashMap { - let mut result = HashMap::new(); - for (idx, count) in self.counts.iter().enumerate() { - let interval = &OBJECTS_VERSION_COUNT_INTERVALS[idx]; - result.insert(interval.name.to_string(), *count); - } - result - } - - /// Merge another histogram into this one - pub fn merge(&mut self, other: &VersionsHistogram) { - for (idx, count) in other.counts.iter().enumerate() { - self.counts[idx] += count; - } - } - - /// Get total count - pub fn total_count(&self) -> u64 { - self.counts.iter().sum() - } - - /// Reset the histogram - pub fn reset(&mut self) { - for count in &mut self.counts { - *count = 0; - } +impl Default for MetricsCollector { + fn default() -> Self { + Self::new() } } @@ -211,67 +272,35 @@ mod tests { use super::*; #[test] - fn test_size_histogram() { - let mut histogram = SizeHistogram::new(); - - // Add some sizes - histogram.add(512); // LESS_THAN_1_KiB - histogram.add(1024); // 1_KiB_TO_1_MiB - histogram.add(1024 * 1024); // 1_MiB_TO_10_MiB - histogram.add(5 * 1024 * 1024); // 1_MiB_TO_10_MiB - - let map = histogram.to_map(); - - assert_eq!(map.get("LESS_THAN_1_KiB"), Some(&1)); - assert_eq!(map.get("1_KiB_TO_1_MiB"), Some(&1)); - assert_eq!(map.get("1_MiB_TO_10_MiB"), Some(&2)); - assert_eq!(map.get("10_MiB_TO_64_MiB"), Some(&0)); + fn test_metrics_collector_creation() { + let collector = MetricsCollector::new(); + let metrics = collector.get_metrics(); + assert_eq!(metrics.objects_scanned, 0); + assert_eq!(metrics.versions_scanned, 0); } #[test] - fn test_versions_histogram() { - let mut histogram = VersionsHistogram::new(); + fn test_metrics_increment() { + let collector = MetricsCollector::new(); - // Add some version counts - histogram.add(1); // 1_VERSION - histogram.add(5); // 2_TO_10_VERSIONS - histogram.add(50); // 11_TO_100_VERSIONS - histogram.add(500); // 101_TO_1000_VERSIONS + collector.increment_objects_scanned(10); + collector.increment_versions_scanned(5); + collector.increment_objects_with_issues(2); - let map = histogram.to_map(); - - assert_eq!(map.get("1_VERSION"), Some(&1)); - assert_eq!(map.get("2_TO_10_VERSIONS"), Some(&1)); - assert_eq!(map.get("11_TO_100_VERSIONS"), Some(&1)); - assert_eq!(map.get("101_TO_1000_VERSIONS"), Some(&1)); + let metrics = collector.get_metrics(); + assert_eq!(metrics.objects_scanned, 10); + assert_eq!(metrics.versions_scanned, 5); + assert_eq!(metrics.objects_with_issues, 2); } #[test] - fn test_histogram_merge() { - let mut histogram1 = SizeHistogram::new(); - histogram1.add(1024); - histogram1.add(1024 * 1024); + fn test_metrics_reset() { + let collector = MetricsCollector::new(); - let mut histogram2 = SizeHistogram::new(); - histogram2.add(1024); - histogram2.add(5 * 1024 * 1024); + collector.increment_objects_scanned(10); + collector.reset(); - histogram1.merge(&histogram2); - - let map = histogram1.to_map(); - assert_eq!(map.get("1_KiB_TO_1_MiB"), Some(&2)); // 1 from histogram1 + 1 from histogram2 - assert_eq!(map.get("1_MiB_TO_10_MiB"), Some(&2)); // 1 from histogram1 + 1 from histogram2 - } - - #[test] - fn test_histogram_reset() { - let mut histogram = SizeHistogram::new(); - histogram.add(1024); - histogram.add(1024 * 1024); - - assert_eq!(histogram.total_count(), 2); - - histogram.reset(); - assert_eq!(histogram.total_count(), 0); + let metrics = collector.get_metrics(); + assert_eq!(metrics.objects_scanned, 0); } } diff --git a/crates/ahm/src/scanner/mod.rs b/crates/ahm/src/scanner/mod.rs index 025bbb38..d299c143 100644 --- a/crates/ahm/src/scanner/mod.rs +++ b/crates/ahm/src/scanner/mod.rs @@ -13,13 +13,8 @@ // limitations under the License. pub mod data_scanner; -pub mod data_usage; pub mod histogram; pub mod metrics; -// Re-export main types for convenience pub use data_scanner::Scanner; -pub use data_usage::{ - BucketTargetUsageInfo, BucketUsageInfo, DataUsageInfo, load_data_usage_from_backend, store_data_usage_in_backend, -}; pub use metrics::ScannerMetrics; diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 88c8a3f4..a2b57c66 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -28,7 +28,8 @@ categories = ["web-programming", "development-tools", "data-structures"] workspace = true [dependencies] -tokio.workspace = true +lazy_static = { workspace = true} +tokio = { workspace = true } tonic = { workspace = true } uuid = { workspace = true } chrono = { workspace = true } diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index 649ca702..18aafeca 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -21,9 +21,6 @@ use super::{ }; use super::{endpoint::Endpoint, error::DiskError, format::FormatV3}; -use crate::bucket::metadata_sys::{self}; -use crate::bucket::versioning::VersioningApi; -use crate::bucket::versioning_sys::BucketVersioningSys; use crate::disk::error::FileAccessDeniedWithContext; use crate::disk::error_conv::{to_access_error, to_file_error, to_unformatted_disk_error, to_volume_error}; use crate::disk::fs::{ @@ -36,16 +33,6 @@ use crate::disk::{ }; use crate::disk::{FileWriter, STORAGE_FORMAT_FILE}; use crate::global::{GLOBAL_IsErasureSD, GLOBAL_RootDiskThreshold}; -use crate::heal::data_scanner::{ - ScannerItem, ShouldSleepFn, SizeSummary, lc_has_active_rules, rep_has_active_rules, scan_data_folder, -}; -use crate::heal::data_usage_cache::{DataUsageCache, DataUsageEntry}; -use crate::heal::error::{ERR_IGNORE_FILE_CONTRIB, ERR_SKIP_FILE}; -use crate::heal::heal_commands::{HealScanMode, HealingTracker}; -use crate::heal::heal_ops::HEALING_TRACKER_FILENAME; -use crate::new_object_layer_fn; -use crate::store_api::{ObjectInfo, StorageAPI}; -use rustfs_common::metrics::{Metric, Metrics}; use rustfs_utils::path::{ GLOBAL_DIR_SUFFIX, GLOBAL_DIR_SUFFIX_WITH_SLASH, SLASH_SEPARATOR, clean, decode_dir_object, encode_dir_object, has_suffix, path_join, path_join_buf, @@ -55,19 +42,18 @@ use tokio::time::interval; use crate::erasure_coding::bitrot_verify; use bytes::Bytes; use path_absolutize::Absolutize; -use rustfs_common::defer; use rustfs_filemeta::{ Cache, FileInfo, FileInfoOpts, FileMeta, MetaCacheEntry, MetacacheWriter, ObjectPartInfo, Opts, RawFileInfo, UpdateFn, get_file_info, read_xl_meta_no_data, }; use rustfs_utils::HashAlgorithm; use rustfs_utils::os::get_info; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::fmt::Debug; use std::io::SeekFrom; use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; -use std::time::{Duration, SystemTime}; +use std::time::Duration; use std::{ fs::Metadata, path::{Path, PathBuf}, @@ -76,7 +62,6 @@ use time::OffsetDateTime; use tokio::fs::{self, File}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, ErrorKind}; use tokio::sync::RwLock; -use tokio::sync::mpsc::Sender; use tracing::{debug, error, info, warn}; use uuid::Uuid; @@ -2268,184 +2253,6 @@ impl DiskAPI for LocalDisk { Ok(info) } - - #[tracing::instrument(level = "info", skip_all)] - async fn ns_scanner( - &self, - cache: &DataUsageCache, - updates: Sender, - scan_mode: HealScanMode, - we_sleep: ShouldSleepFn, - ) -> Result { - self.scanning.fetch_add(1, Ordering::SeqCst); - defer!(|| { self.scanning.fetch_sub(1, Ordering::SeqCst) }); - - // must before metadata_sys - let Some(store) = new_object_layer_fn() else { - return Err(Error::other("errServerNotInitialized")); - }; - - let mut cache = cache.clone(); - // Check if the current bucket has a configured lifecycle policy - if let Ok((lc, _)) = metadata_sys::get_lifecycle_config(&cache.info.name).await { - if lc_has_active_rules(&lc, "") { - cache.info.lifecycle = Some(lc); - } - } - - // Check if the current bucket has replication configuration - if let Ok((rcfg, _)) = metadata_sys::get_replication_config(&cache.info.name).await { - if rep_has_active_rules(&rcfg, "", true) { - // TODO: globalBucketTargetSys - } - } - - let vcfg = BucketVersioningSys::get(&cache.info.name).await.ok(); - - let loc = self.get_disk_location(); - // TODO: 这里需要处理错误 - let disks = store - .get_disks(loc.pool_idx.unwrap(), loc.disk_idx.unwrap()) - .await - .map_err(|e| Error::other(e.to_string()))?; - let disk = Arc::new(LocalDisk::new(&self.endpoint(), false).await?); - let disk_clone = disk.clone(); - cache.info.updates = Some(updates.clone()); - let mut data_usage_info = scan_data_folder( - &disks, - disk, - &cache, - Box::new(move |item: &ScannerItem| { - let mut item = item.clone(); - let disk = disk_clone.clone(); - let vcfg = vcfg.clone(); - Box::pin(async move { - if !item.path.ends_with(&format!("{SLASH_SEPARATOR}{STORAGE_FORMAT_FILE}")) { - return Err(Error::other(ERR_SKIP_FILE).into()); - } - let stop_fn = Metrics::log(Metric::ScanObject); - let mut res = HashMap::new(); - let done_sz = Metrics::time_size(Metric::ReadMetadata); - let buf = match disk.read_metadata(item.path.clone()).await { - Ok(buf) => buf, - Err(err) => { - res.insert("err".to_string(), err.to_string()); - stop_fn(&res); - return Err(Error::other(ERR_SKIP_FILE).into()); - } - }; - done_sz(buf.len() as u64); - res.insert("metasize".to_string(), buf.len().to_string()); - item.transform_meta_dir(); - let meta_cache = MetaCacheEntry { - name: item.object_path().to_string_lossy().to_string(), - metadata: buf, - ..Default::default() - }; - let fivs = match meta_cache.file_info_versions(&item.bucket) { - Ok(fivs) => fivs, - Err(err) => { - res.insert("err".to_string(), err.to_string()); - stop_fn(&res); - return Err(Error::other(ERR_SKIP_FILE).into()); - } - }; - let mut size_s = SizeSummary::default(); - let done = Metrics::time(Metric::ApplyAll); - let obj_infos = match item.apply_versions_actions(&fivs.versions).await { - Ok(obj_infos) => obj_infos, - Err(err) => { - res.insert("err".to_string(), err.to_string()); - stop_fn(&res); - return Err(Error::other(ERR_SKIP_FILE).into()); - } - }; - - let versioned = if let Some(vcfg) = vcfg.as_ref() { - vcfg.versioned(item.object_path().to_str().unwrap_or_default()) - } else { - false - }; - - let mut obj_deleted = false; - for info in obj_infos.iter() { - let done = Metrics::time(Metric::ApplyVersion); - let sz: i64; - (obj_deleted, sz) = item.apply_actions(info, &mut size_s).await; - done(); - - if obj_deleted { - break; - } - - let actual_sz = match info.get_actual_size() { - Ok(size) => size, - Err(_) => continue, - }; - - if info.delete_marker { - size_s.delete_markers += 1; - } - - if info.version_id.is_some() && sz == actual_sz { - size_s.versions += 1; - } - - size_s.total_size += sz as usize; - - if info.delete_marker { - continue; - } - } - - for free_version in fivs.free_versions.iter() { - let _obj_info = ObjectInfo::from_file_info( - free_version, - &item.bucket, - &item.object_path().to_string_lossy(), - versioned, - ); - let done = Metrics::time(Metric::TierObjSweep); - done(); - } - - // todo: global trace - if obj_deleted { - return Err(Error::other(ERR_IGNORE_FILE_CONTRIB).into()); - } - done(); - Ok(size_s) - }) - }), - scan_mode, - we_sleep, - ) - .await?; - data_usage_info.info.last_update = Some(SystemTime::now()); - debug!("ns_scanner completed: {data_usage_info:?}"); - Ok(data_usage_info) - } - - #[tracing::instrument(skip(self))] - async fn healing(&self) -> Option { - let healing_file = path_join(&[ - self.path(), - PathBuf::from(RUSTFS_META_BUCKET), - PathBuf::from(BUCKET_META_PREFIX), - PathBuf::from(HEALING_TRACKER_FILENAME), - ]); - let b = match fs::read(healing_file).await { - Ok(b) => b, - Err(_) => return None, - }; - if b.is_empty() { - return None; - } - match HealingTracker::unmarshal_msg(&b) { - Ok(h) => Some(h), - Err(_) => Some(HealingTracker::default()), - } - } } async fn get_disk_info(drive_path: PathBuf) -> Result<(rustfs_utils::os::DiskInfo, bool)> { diff --git a/crates/ecstore/src/rpc/remote_disk.rs b/crates/ecstore/src/rpc/remote_disk.rs index e45c9e63..5f2f078a 100644 --- a/crates/ecstore/src/rpc/remote_disk.rs +++ b/crates/ecstore/src/rpc/remote_disk.rs @@ -21,9 +21,9 @@ use rustfs_protos::{ node_service_time_out_client, proto_gen::node_service::{ CheckPartsRequest, DeletePathsRequest, DeleteRequest, DeleteVersionRequest, DeleteVersionsRequest, DeleteVolumeRequest, - DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, NsScannerRequest, - ReadAllRequest, ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, - RenameFileRequest, StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest, + DiskInfoRequest, ListDirRequest, ListVolumesRequest, MakeVolumeRequest, MakeVolumesRequest, ReadAllRequest, + ReadMultipleRequest, ReadPartsRequest, ReadVersionRequest, ReadXlRequest, RenameDataRequest, RenameFileRequest, + StatVolumeRequest, UpdateMetadataRequest, VerifyFileRequest, WriteAllRequest, WriteMetadataRequest, }, }; @@ -32,26 +32,15 @@ use crate::disk::{ ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, UpdateMetadataOpts, VolumeInfo, WalkDirOptions, endpoint::Endpoint, }; +use crate::disk::{FileReader, FileWriter}; use crate::{ disk::error::{Error, Result}, rpc::build_auth_headers, }; -use crate::{ - disk::{FileReader, FileWriter}, - heal::{ - data_scanner::ShouldSleepFn, - data_usage_cache::{DataUsageCache, DataUsageEntry}, - heal_commands::{HealScanMode, HealingTracker}, - }, -}; use rustfs_filemeta::{FileInfo, ObjectPartInfo, RawFileInfo}; use rustfs_protos::proto_gen::node_service::RenamePartRequest; use rustfs_rio::{HttpReader, HttpWriter}; -use tokio::{ - io::AsyncWrite, - sync::mpsc::{self, Sender}, -}; -use tokio_stream::{StreamExt, wrappers::ReceiverStream}; +use tokio::io::AsyncWrite; use tonic::Request; use tracing::info; use uuid::Uuid; @@ -927,55 +916,6 @@ impl DiskAPI for RemoteDisk { Ok(disk_info) } - - #[tracing::instrument(skip(self, cache, scan_mode, _we_sleep))] - async fn ns_scanner( - &self, - cache: &DataUsageCache, - updates: Sender, - scan_mode: HealScanMode, - _we_sleep: ShouldSleepFn, - ) -> Result { - info!("ns_scanner"); - let cache = serde_json::to_string(cache)?; - let mut client = node_service_time_out_client(&self.addr) - .await - .map_err(|err| Error::other(format!("can not get client, err: {err}")))?; - - let (tx, rx) = mpsc::channel(10); - let in_stream = ReceiverStream::new(rx); - let mut response = client.ns_scanner(in_stream).await?.into_inner(); - let request = NsScannerRequest { - disk: self.endpoint.to_string(), - cache, - scan_mode: scan_mode as u64, - }; - tx.send(request) - .await - .map_err(|err| Error::other(format!("can not send request, err: {err}")))?; - - loop { - match response.next().await { - Some(Ok(resp)) => { - if !resp.update.is_empty() { - let data_usage_cache = serde_json::from_str::(&resp.update)?; - let _ = updates.send(data_usage_cache).await; - } else if !resp.data_usage_cache.is_empty() { - let data_usage_cache = serde_json::from_str::(&resp.data_usage_cache)?; - return Ok(data_usage_cache); - } else { - return Err(Error::other("scan was interrupted")); - } - } - _ => return Err(Error::other("scan was interrupted")), - } - } - } - - #[tracing::instrument(skip(self))] - async fn healing(&self) -> Option { - None - } } #[cfg(test)] diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index d00ead5c..4519d520 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -17,6 +17,7 @@ use crate::bitrot::{create_bitrot_reader, create_bitrot_writer}; use crate::bucket::lifecycle::lifecycle::TRANSITION_COMPLETE; use crate::client::{object_api_utils::extract_etag, transition_api::ReaderImpl}; +use crate::disk::STORAGE_FORMAT_FILE; use crate::disk::error_reduce::{OBJECT_OP_IGNORED_ERRS, reduce_read_quorum_errs, reduce_write_quorum_errs}; use crate::disk::{ self, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, @@ -26,10 +27,8 @@ use crate::erasure_coding; use crate::erasure_coding::bitrot_verify; use crate::error::{Error, Result}; use crate::error::{ObjectApiError, is_err_object_not_found}; -use crate::global::GLOBAL_MRFState; use crate::global::{GLOBAL_LocalNodeName, GLOBAL_TierConfigMgr}; -use crate::heal::data_usage_cache::DataUsageCache; -use crate::heal::heal_ops::{HealEntryFn, HealSequence}; +use crate::store_api::ListObjectVersionsInfo; use crate::store_api::{ListPartsInfo, ObjectToDelete}; use crate::{ bucket::lifecycle::bucket_lifecycle_ops::{gen_transition_objname, get_transitioned_object_reader, put_restore_opts}, @@ -43,19 +42,7 @@ use crate::{ error::{StorageError, to_object_err}, event::name::EventName, event_notification::{EventArgs, send_event}, - global::{ - GLOBAL_BackgroundHealState, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES, get_global_deployment_id, - is_dist_erasure, - }, - heal::{ - data_usage::{DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT}, - data_usage_cache::{DataUsageCacheInfo, DataUsageEntry, DataUsageEntryInfo}, - heal_commands::{ - DRIVE_STATE_CORRUPT, DRIVE_STATE_MISSING, DRIVE_STATE_OFFLINE, DRIVE_STATE_OK, HEAL_DEEP_SCAN, HEAL_ITEM_OBJECT, - HEAL_NORMAL_SCAN, HealOpts, HealScanMode, HealingTracker, - }, - heal_ops::BG_HEALING_UUID, - }, + global::{GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES, get_global_deployment_id, is_dist_erasure}, store_api::{ BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec, ListMultipartsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartInfo, MultipartUploadResult, ObjectIO, ObjectInfo, @@ -63,11 +50,6 @@ use crate::{ }, store_init::load_format_erasure, }; -use crate::{disk::STORAGE_FORMAT_FILE, heal::mrf::PartialOperation}; -use crate::{ - heal::data_scanner::{HEAL_DELETE_DANGLING, globalHealConfig}, - store_api::ListObjectVersionsInfo, -}; use bytes::Bytes; use bytesize::ByteSize; use chrono::Utc; @@ -76,6 +58,7 @@ use glob::Pattern; use http::HeaderMap; use md5::{Digest as Md5Digest, Md5}; use rand::{Rng, seq::SliceRandom}; +use rustfs_common::heal_channel::{DriveState, HealChannelPriority, HealItemType, HealOpts, HealScanMode, send_heal_disk}; use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER; use rustfs_filemeta::{ FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, ObjectPartInfo, @@ -1653,21 +1636,14 @@ impl SetDisks { Err(e) => { warn!("connect_endpoint err {:?}", &e); if ep.is_local && e == DiskError::UnformattedDisk { - info!("unformatteddisk will push_heal_local_disks, {:?}", ep); - GLOBAL_BackgroundHealState.push_heal_local_disks(&[ep.clone()]).await; + info!("unformatteddisk will trigger heal_disk, {:?}", ep); + let set_disk_id = format!("pool_{}_set_{}", ep.pool_idx, ep.set_idx); + let _ = send_heal_disk(set_disk_id, Some(HealChannelPriority::Normal)).await; } return; } }; - if new_disk.is_local() { - if let Some(h) = new_disk.healing().await { - if !h.finished { - GLOBAL_BackgroundHealState.push_heal_local_disks(&[new_disk.endpoint()]).await; - } - } - } - let (set_idx, disk_idx) = match self.find_disk_index(&fm) { Ok(res) => res, Err(e) => { @@ -2033,23 +2009,16 @@ impl SetDisks { let fi = Self::pick_valid_fileinfo(&parts_metadata, mot_time, etag, read_quorum as usize)?; if errs.iter().any(|err| err.is_some()) { - let _ = rustfs_common::heal_channel::send_heal_request( - rustfs_common::heal_channel::create_heal_request_with_options( - fi.volume.to_string(), // bucket - Some(fi.name.to_string()), // object_prefix - false, // force_start + let _ = + rustfs_common::heal_channel::send_heal_request(rustfs_common::heal_channel::create_heal_request_with_options( + fi.volume.to_string(), // bucket + Some(fi.name.to_string()), // object_prefix + false, // force_start Some(rustfs_common::heal_channel::HealChannelPriority::Normal), // priority - Some(self.pool_index), // pool_index - Some(self.set_index), // set_index - None, // scan_mode - None, // remove_corrupted - None, // recreate_missing - None, // update_parity - None, // recursive - None, // dry_run - None, // timeout_seconds - ) - ).await; + Some(self.pool_index), // pool_index + Some(self.set_index), // set_index + )) + .await; } // debug!("get_object_fileinfo pick fi {:?}", &fi); @@ -2188,15 +2157,9 @@ impl SetDisks { Some(rustfs_common::heal_channel::HealChannelPriority::Normal), Some(pool_index), Some(set_index), - None, - None, - None, - None, - None, - None, - None, - ) - ).await; + ), + ) + .await; has_err = false; } _ => {} @@ -2275,99 +2238,6 @@ impl SetDisks { Ok(()) } - pub async fn list_and_heal(&self, bucket: &str, prefix: &str, opts: &HealOpts, heal_entry: HealEntryFn) -> Result<()> { - let bucket = bucket.to_string(); - let (disks, _) = self.get_online_disk_with_healing(false).await?; - if disks.is_empty() { - return Err(Error::other("listAndHeal: No non-healing drives found")); - } - - let expected_disks = disks.len() / 2 + 1; - let fallback_disks = &disks[expected_disks..]; - let disks = &disks[..expected_disks]; - let resolver = MetadataResolutionParams { - dir_quorum: 1, - obj_quorum: 1, - bucket: bucket.clone(), - strict: false, - ..Default::default() - }; - let path = Path::new(prefix).parent().map_or("", |p| p.to_str().unwrap()); - let filter_prefix = prefix.trim_start_matches(path).trim_matches('/'); - let opts_clone = *opts; - let bucket_agreed = bucket.clone(); - let bucket_partial = bucket.to_string(); - let (tx, rx) = broadcast::channel(1); - let tx_agreed = tx.clone(); - let tx_partial = tx.clone(); - let func_agreed = heal_entry.clone(); - let func_partial = heal_entry.clone(); - let lopts = ListPathRawOptions { - disks: disks.to_vec(), - fallback_disks: fallback_disks.to_vec(), - bucket: bucket.to_string(), - path: path.to_string(), - filter_prefix: { - if filter_prefix.is_empty() { - None - } else { - Some(filter_prefix.to_string()) - } - }, - recursive: true, - forward_to: None, - min_disks: 1, - report_not_found: false, - per_disk_limit: 0, - agreed: Some(Box::new(move |entry: MetaCacheEntry| { - let heal_entry = func_agreed.clone(); - let tx_agreed = tx_agreed.clone(); - - Box::pin({ - let bucket_agreed = bucket_agreed.clone(); - async move { - if heal_entry(bucket_agreed.clone(), entry.clone(), opts_clone.scan_mode) - .await - .is_err() - { - let _ = tx_agreed.send(true); - } - } - }) - })), - partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { - let heal_entry = func_partial.clone(); - let tx_partial = tx_partial.clone(); - - Box::pin({ - let resolver_partial = resolver.clone(); - let bucket_partial = bucket_partial.clone(); - async move { - let entry = match entries.resolve(resolver_partial) { - Some(entry) => entry, - _ => match entries.first_found() { - (Some(entry), _) => entry, - _ => return, - }, - }; - - if heal_entry(bucket_partial.clone(), entry.clone(), opts_clone.scan_mode) - .await - .is_err() - { - let _ = tx_partial.send(true); - } - } - }) - })), - finished: None, - }; - - _ = list_path_raw(rx, lopts) - .await - .map_err(|err| Error::other(format!("listPathRaw returned {err}: bucket: {bucket}, path: {path}"))); - Ok(()) - } async fn get_online_disk_with_healing(&self, incl_healing: bool) -> Result<(Vec>, bool)> { let (new_disks, _, healing) = self.get_online_disk_with_healing_and_info(incl_healing).await?; @@ -2436,7 +2306,7 @@ impl SetDisks { ) -> disk::error::Result<(HealResultItem, Option)> { info!("SetDisks heal_object"); let mut result = HealResultItem { - heal_item_type: HEAL_ITEM_OBJECT.to_string(), + heal_item_type: HealItemType::Object.to_string(), bucket: bucket.to_string(), object: object.to_string(), version_id: version_id.to_string(), @@ -2542,15 +2412,15 @@ impl SetDisks { let drive_state = match reason { Some(err) => match err { - DiskError::DiskNotFound => DRIVE_STATE_OFFLINE, + DiskError::DiskNotFound => DriveState::Offline.to_string(), DiskError::FileNotFound | DiskError::FileVersionNotFound | DiskError::VolumeNotFound | DiskError::PartMissingOrCorrupt - | DiskError::OutdatedXLMeta => DRIVE_STATE_MISSING, - _ => DRIVE_STATE_CORRUPT, + | DiskError::OutdatedXLMeta => DriveState::Missing.to_string(), + _ => DriveState::Corrupt.to_string(), }, - None => DRIVE_STATE_OK, + None => DriveState::Ok.to_string(), }; result.before.drives.push(HealDriveInfo { uuid: "".to_string(), @@ -2894,37 +2764,82 @@ impl SetDisks { "rename temp data, src_volume: {}, src_path: {}, dst_volume: {}, dst_path: {}", RUSTFS_META_TMP_BUCKET, tmp_id, bucket, object ); - if let Err(err) = disk + let rename_result = disk .rename_data(RUSTFS_META_TMP_BUCKET, &tmp_id, parts_metadata[index].clone(), bucket, object) - .await - { - info!("rename temp data err: {}", err.to_string()); - // self.delete_all(RUSTFS_META_TMP_BUCKET, &tmp_id).await?; - return Ok((result, Some(err))); - } + .await; - info!("remove temp object, volume: {}, path: {}", RUSTFS_META_TMP_BUCKET, tmp_id); - self.delete_all(RUSTFS_META_TMP_BUCKET, &tmp_id) - .await - .map_err(DiskError::other)?; - if parts_metadata[index].is_remote() { - let rm_data_dir = parts_metadata[index].data_dir.unwrap().to_string(); - let d_path = Path::new(&encode_dir_object(object)).join(rm_data_dir); - disk.delete( - bucket, - d_path.to_str().unwrap(), - DeleteOptions { - immediate: true, - recursive: true, - ..Default::default() - }, - ) - .await?; - } + if let Err(err) = &rename_result { + info!( + "rename temp data err: {}. Try fallback to direct xl.meta overwrite...", + err.to_string() + ); - for (i, v) in result.before.drives.iter().enumerate() { - if v.endpoint == disk.endpoint().to_string() { - result.after.drives[i].state = DRIVE_STATE_OK.to_string(); + let healthy_index = latest_disks.iter().position(|d| d.is_some()).unwrap_or(0); + + if let Some(healthy_disk) = &latest_disks[healthy_index] { + let xlmeta_path = format!("{object}/xl.meta"); + + match healthy_disk.read_all(bucket, &xlmeta_path).await { + Ok(xlmeta_bytes) => { + if let Err(e) = disk.write_all(bucket, &xlmeta_path, xlmeta_bytes).await { + info!("fallback xl.meta overwrite failed: {}", e.to_string()); + + return Ok(( + result, + Some(DiskError::other(format!("fallback xl.meta overwrite failed: {e}"))), + )); + } else { + info!("fallback xl.meta overwrite succeeded for disk {}", disk.to_string()); + } + } + + Err(e) => { + info!("read healthy xl.meta failed: {}", e.to_string()); + + return Ok(( + result, + Some(DiskError::other(format!("read healthy xl.meta failed: {e}"))), + )); + } + } + } else { + info!("no healthy disk found for xl.meta fallback overwrite"); + + return Ok(( + result, + Some(DiskError::other("no healthy disk found for xl.meta fallback overwrite")), + )); + } + } else { + info!("remove temp object, volume: {}, path: {}", RUSTFS_META_TMP_BUCKET, tmp_id); + + self.delete_all(RUSTFS_META_TMP_BUCKET, &tmp_id) + .await + .map_err(DiskError::other)?; + + if parts_metadata[index].is_remote() { + let rm_data_dir = parts_metadata[index].data_dir.unwrap().to_string(); + + let d_path = Path::new(&encode_dir_object(object)).join(rm_data_dir); + + disk.delete( + bucket, + d_path.to_str().unwrap(), + DeleteOptions { + immediate: true, + + recursive: true, + + ..Default::default() + }, + ) + .await?; + } + + for (i, v) in result.before.drives.iter().enumerate() { + if v.endpoint == disk.endpoint().to_string() { + result.after.drives[i].state = DriveState::Ok.to_string(); + } } } } @@ -2984,7 +2899,7 @@ impl SetDisks { disks.clone() }; let mut result = HealResultItem { - heal_item_type: HEAL_ITEM_OBJECT.to_string(), + heal_item_type: HealItemType::Object.to_string(), bucket: bucket.to_string(), object: object.to_string(), disk_count: self.disks.read().await.len(), @@ -3028,11 +2943,11 @@ impl SetDisks { let endpoint = drive.to_string(); let drive_state = match err { Some(err) => match err { - DiskError::DiskNotFound => DRIVE_STATE_OFFLINE, - DiskError::FileNotFound | DiskError::VolumeNotFound => DRIVE_STATE_MISSING, - _ => DRIVE_STATE_CORRUPT, + DiskError::DiskNotFound => DriveState::Offline.to_string(), + DiskError::FileNotFound | DiskError::VolumeNotFound => DriveState::Missing.to_string(), + _ => DriveState::Corrupt.to_string(), }, - None => DRIVE_STATE_OK, + None => DriveState::Ok.to_string(), }; result.before.drives.push(HealDriveInfo { uuid: "".to_string(), @@ -3059,11 +2974,11 @@ impl SetDisks { if let (Some(DiskError::VolumeNotFound | DiskError::FileNotFound), Some(disk)) = (err, disk) { let vol_path = Path::new(bucket).join(object); let drive_state = match disk.make_volume(vol_path.to_str().unwrap()).await { - Ok(_) => DRIVE_STATE_OK, + Ok(_) => DriveState::Ok.to_string(), Err(merr) => match merr { - DiskError::VolumeExists => DRIVE_STATE_OK, - DiskError::DiskNotFound => DRIVE_STATE_OFFLINE, - _ => DRIVE_STATE_CORRUPT, + DiskError::VolumeExists => DriveState::Ok.to_string(), + DiskError::DiskNotFound => DriveState::Offline.to_string(), + _ => DriveState::Corrupt.to_string(), }, }; result.after.drives[index].state = drive_state.to_string(); @@ -3083,7 +2998,7 @@ impl SetDisks { ) -> HealResultItem { let disk_len = { self.disks.read().await.len() }; let mut result = HealResultItem { - heal_item_type: HEAL_ITEM_OBJECT.to_string(), + heal_item_type: HealItemType::Object.to_string(), bucket: bucket.to_string(), object: object.to_string(), object_size: lfi.size as usize, @@ -3105,23 +3020,23 @@ impl SetDisks { result.before.drives.push(HealDriveInfo { uuid: "".to_string(), endpoint: self.set_endpoints[index].to_string(), - state: DRIVE_STATE_OFFLINE.to_string(), + state: DriveState::Offline.to_string(), }); result.after.drives.push(HealDriveInfo { uuid: "".to_string(), endpoint: self.set_endpoints[index].to_string(), - state: DRIVE_STATE_OFFLINE.to_string(), + state: DriveState::Offline.to_string(), }); } - let mut drive_state = DRIVE_STATE_CORRUPT; + let mut drive_state = DriveState::Corrupt; if let Some(err) = &errs[index] { if err == &DiskError::FileNotFound || err == &DiskError::VolumeNotFound { - drive_state = DRIVE_STATE_MISSING; + drive_state = DriveState::Missing; } } else { - drive_state = DRIVE_STATE_OK; + drive_state = DriveState::Ok; } result.before.drives.push(HealDriveInfo { @@ -3213,661 +3128,6 @@ impl SetDisks { } } - pub async fn ns_scanner( - self: Arc, - buckets: &[BucketInfo], - want_cycle: u32, - updates: Sender, - heal_scan_mode: HealScanMode, - ) -> Result<()> { - info!("ns_scanner"); - if buckets.is_empty() { - info!("data-scanner: no buckets to scan, skipping scanner cycle"); - return Ok(()); - } - - let (mut disks, healing) = self.get_online_disk_with_healing(false).await?; - if disks.is_empty() { - info!("data-scanner: all drives are offline or being healed, skipping scanner cycle"); - return Ok(()); - } - - let old_cache = DataUsageCache::load(&self, DATA_USAGE_CACHE_NAME).await?; - let mut cache = DataUsageCache { - info: DataUsageCacheInfo { - name: DATA_USAGE_ROOT.to_string(), - next_cycle: old_cache.info.next_cycle, - ..Default::default() - }, - cache: HashMap::new(), - }; - - // Put all buckets into channel. - let (bucket_tx, bucket_rx) = mpsc::channel(buckets.len()); - // Shuffle buckets to ensure total randomness of buckets, being scanned. - // Otherwise, same set of buckets get scanned across erasure sets always. - // at any given point in time. This allows different buckets to be scanned - // in different order per erasure set, this wider spread is needed when - // there are lots of buckets with different order of objects in them. - let permutes = { - let mut rng = rand::rng(); - let mut permutes: Vec = (0..buckets.len()).collect(); - permutes.shuffle(&mut rng); - permutes - }; - - // Add new buckets first - for idx in permutes.iter() { - let b = buckets[*idx].clone(); - match old_cache.find(&b.name) { - Some(e) => { - cache.replace(&b.name, DATA_USAGE_ROOT, e); - let _ = bucket_tx.send(b).await; - } - None => { - let _ = bucket_tx.send(b).await; - } - } - } - - let (buckets_results_tx, mut buckets_results_rx) = mpsc::channel::(disks.len()); - // 新增:从环境变量读取基础间隔,默认 30 秒 - let set_disk_update_interval_secs = std::env::var("RUSTFS_NS_SCANNER_INTERVAL") - .ok() - .and_then(|v| v.parse::().ok()) - .unwrap_or(30); - let update_time = { - let mut rng = rand::rng(); - Duration::from_secs(set_disk_update_interval_secs) + Duration::from_secs_f64(10.0 * rng.random_range(0.0..1.0)) - }; - let mut ticker = interval(update_time); - - // 检查是否需要运行后台任务 - let skip_background_task = std::env::var("RUSTFS_SKIP_BACKGROUND_TASK") - .ok() - .and_then(|v| v.parse::().ok()) - .unwrap_or(false); - - let task = if !skip_background_task { - Some(tokio::spawn(async move { - let last_save = Some(SystemTime::now()); - let mut need_loop = true; - while need_loop { - select! { - _ = ticker.tick() => { - if !cache.info.last_update.eq(&last_save) { - let _ = cache.save(DATA_USAGE_CACHE_NAME).await; - let _ = updates.send(cache.clone()).await; - } - } - result = buckets_results_rx.recv() => { - match result { - Some(result) => { - cache.replace(&result.name, &result.parent, result.entry); - cache.info.last_update = Some(SystemTime::now()); - }, - None => { - need_loop = false; - cache.info.next_cycle = want_cycle; - cache.info.last_update = Some(SystemTime::now()); - let _ = cache.save(DATA_USAGE_CACHE_NAME).await; - let _ = updates.send(cache.clone()).await; - } - } - } - } - } - })) - } else { - None - }; - - // Restrict parallelism for disk usage scanner - let max_procs = num_cpus::get(); - if max_procs < disks.len() { - disks = disks[0..max_procs].to_vec(); - } - - let mut futures = Vec::new(); - let bucket_rx = Arc::new(RwLock::new(bucket_rx)); - for disk in disks.iter() { - let disk = match disk { - Some(disk) => disk.clone(), - None => continue, - }; - let self_clone = Arc::clone(&self); - let bucket_rx_clone = bucket_rx.clone(); - let buckets_results_tx_clone = buckets_results_tx.clone(); - futures.push(async move { - loop { - match bucket_rx_clone.write().await.try_recv() { - Err(_) => return, - Ok(bucket_info) => { - let cache_name = Path::new(&bucket_info.name).join(DATA_USAGE_CACHE_NAME); - let mut cache = match DataUsageCache::load(&self_clone, &cache_name.to_string_lossy()).await { - Ok(cache) => cache, - Err(_) => continue, - }; - if cache.info.name.is_empty() { - cache.info.name = bucket_info.name.clone(); - } - cache.info.skip_healing = healing; - cache.info.next_cycle = want_cycle; - if cache.info.name != bucket_info.name { - cache.info = DataUsageCacheInfo { - name: bucket_info.name, - last_update: Some(SystemTime::now()), - next_cycle: want_cycle, - ..Default::default() - }; - } - - // Collect updates. - let (tx, mut rx) = mpsc::channel(1); - let buckets_results_tx_inner_clone = buckets_results_tx_clone.clone(); - let name = cache.info.name.clone(); - let task = tokio::spawn(async move { - loop { - match rx.recv().await { - Some(entry) => { - let _ = buckets_results_tx_inner_clone - .send(DataUsageEntryInfo { - name: name.clone(), - parent: DATA_USAGE_ROOT.to_string(), - entry, - }) - .await; - } - None => return, - } - } - }); - - // Calc usage - let before = cache.info.last_update; - let mut cache = match disk.ns_scanner(&cache, tx, heal_scan_mode, None).await { - Ok(cache) => cache, - Err(_) => { - if cache.info.last_update > before { - let _ = cache.save(&cache_name.to_string_lossy()).await; - } - let _ = task.await; - continue; - } - }; - - cache.info.updates = None; - let _ = task.await; - let mut root = DataUsageEntry::default(); - if let Some(r) = cache.root() { - root = cache.flatten(&r); - if let Some(r) = &root.replication_stats { - if r.empty() { - root.replication_stats = None; - } - } - } - let _ = buckets_results_tx_clone - .send(DataUsageEntryInfo { - name: cache.info.name.clone(), - parent: DATA_USAGE_ROOT.to_string(), - entry: root, - }) - .await; - let _ = cache.save(&cache_name.to_string_lossy()).await; - } - } - info!("continue scanner"); - } - }); - } - - info!("ns_scanner start"); - let _ = join_all(futures).await; - if let Some(task) = task { - let _ = task.await; - } - info!("ns_scanner completed"); - Ok(()) - } - - pub async fn heal_erasure_set(self: Arc, buckets: &[String], tracker: Arc>) -> Result<()> { - let (bg_seq, found) = GLOBAL_BackgroundHealState.get_heal_sequence_by_token(BG_HEALING_UUID).await; - if !found { - return Err(Error::other("no local healing sequence initialized, unable to heal the drive")); - } - let bg_seq = bg_seq.unwrap(); - let scan_mode = HEAL_NORMAL_SCAN; - - let tracker_defer = tracker.clone(); - let defer = async move { - let mut w = tracker_defer.write().await; - w.set_object("").await; - w.set_bucket("").await; - let _ = w.update().await; - }; - - for bucket in buckets.iter() { - if let Err(err) = HealSequence::heal_bucket(bg_seq.clone(), bucket, true).await { - info!("{}", err.to_string()); - } - } - - let info = match tracker - .read() - .await - .disk - .as_ref() - .unwrap() - .disk_info(&DiskInfoOptions::default()) - .await - { - Ok(info) => info, - Err(err) => { - defer.await; - return Err(Error::other(format!("unable to get disk information before healing it: {err}"))); - } - }; - let num_cores = num_cpus::get(); // use num_cpus crate to get the number of cores - let mut num_healers: usize; - - if info.nr_requests as usize > num_cores { - num_healers = num_cores / 4; - } else { - num_healers = (info.nr_requests / 4) as usize; - } - - if num_healers < 4 { - num_healers = 4; - } - - let v = globalHealConfig.read().await.get_workers(); - if v > 0 { - num_healers = v; - } - info!( - "Healing drive '{}' - use {} parallel workers.", - tracker.read().await.disk.as_ref().unwrap().to_string(), - num_healers - ); - - let jt = rustfs_workers::workers::Workers::new(num_healers).map_err(|err| Error::other(err.to_string()))?; - - let heal_entry_done = |name: String| HealEntryResult { - entry_done: true, - name, - ..Default::default() - }; - - let heal_entry_success = |sz: usize| HealEntryResult { - bytes: sz, - success: true, - ..Default::default() - }; - - let heal_entry_failure = |sz: usize| HealEntryResult { - bytes: sz, - ..Default::default() - }; - - let heal_entry_skipped = |sz: usize| HealEntryResult { - bytes: sz, - skipped: true, - ..Default::default() - }; - - let (result_tx, mut result_rx) = mpsc::channel::(1000); - let tracker_task = tracker.clone(); - let task = tokio::spawn(async move { - loop { - match result_rx.recv().await { - Some(entry) => { - if entry.entry_done { - tracker_task.write().await.set_object(entry.name.as_str()).await; - if let Some(last_update) = tracker_task.read().await.get_last_update().await { - if SystemTime::now().duration_since(last_update).unwrap() > Duration::from_secs(60) { - if let Err(err) = tracker_task.write().await.update().await { - info!("tracker update failed, err: {}", err.to_string()); - } - } - } - continue; - } - - tracker_task - .write() - .await - .update_progress(entry.success, entry.skipped, entry.bytes as u64) - .await; - } - None => { - if let Err(err) = tracker_task.write().await.update().await { - info!("tracker update failed, err: {}", err.to_string()); - } - return; - } - } - } - }); - - let started = tracker.read().await.started; - let mut ret_err = None; - for bucket in buckets.iter() { - if tracker.read().await.is_healed(bucket).await { - info!("bucket{} was healed", bucket); - continue; - } - - let mut forward_to = None; - let b = tracker.read().await.get_bucket().await; - if b == *bucket { - forward_to = Some(tracker.read().await.get_object().await); - } - - if !b.is_empty() { - tracker.write().await.resume().await; - } - - tracker.write().await.set_object("").await; - tracker.write().await.set_bucket("").await; - - if let Err(err) = HealSequence::heal_bucket(bg_seq.clone(), bucket, true).await { - info!("heal bucket failed: {}", err.to_string()); - ret_err = Some(err); - continue; - } - - let (mut disks, _, healing) = self.get_online_disk_with_healing_and_info(true).await?; - if disks.len() == healing { - info!("all drives are in healing state, aborting.."); - defer.await; - return Ok(()); - } - - disks = disks[0..disks.len() - healing].to_vec(); - if disks.len() < self.set_drive_count / 2 { - defer.await; - return Err(Error::other(format!( - "not enough drives (found={}, healing={}, total={}) are available to heal `{}`", - disks.len(), - healing, - self.set_drive_count, - tracker.read().await.disk.as_ref().unwrap().to_string() - ))); - } - - { - let mut rng = rand::rng(); - - // 随机洗牌 - disks.shuffle(&mut rng); - } - - let expected_disk = disks.len() / 2 + 1; - let fallback_disks = disks[expected_disk..].to_vec(); - disks = disks[..expected_disk].to_vec(); - - let result_tx_send = result_tx.clone(); - let bg_seq_send = bg_seq.clone(); - let send = Box::new(move |result: HealEntryResult| { - let result_tx_send = result_tx_send.clone(); - let bg_seq_send = bg_seq_send.clone(); - Box::pin(async move { - let _ = result_tx_send.send(result).await; - bg_seq_send.count_scanned(HEAL_ITEM_OBJECT.to_string()).await; - true - }) - }); - - let jt_clone = jt.clone(); - let self_clone = self.clone(); - let started_clone = started; - let tracker_heal = tracker.clone(); - let bg_seq_clone = bg_seq.clone(); - let send_clone = send.clone(); - let heal_entry = Arc::new(move |bucket: String, entry: MetaCacheEntry| { - info!("heal entry, bucket: {}, entry: {:?}", bucket, entry); - let jt_clone = jt_clone.clone(); - let self_clone = self_clone.clone(); - let started = started_clone; - let tracker_heal = tracker_heal.clone(); - let bg_seq = bg_seq_clone.clone(); - let send = send_clone.clone(); - Box::pin(async move { - let defer = async { - jt_clone.give().await; - }; - if entry.name.is_empty() && entry.metadata.is_empty() { - defer.await; - return; - } - if entry.is_dir() { - defer.await; - return; - } - if bucket == RUSTFS_META_BUCKET - && (Pattern::new("buckets/*/.metacache/*") - .map(|p| p.matches(&entry.name)) - .unwrap_or(false) - || Pattern::new("tmp/.trash/*").map(|p| p.matches(&entry.name)).unwrap_or(false) - || Pattern::new("multipart/*").map(|p| p.matches(&entry.name)).unwrap_or(false)) - { - defer.await; - return; - } - let encoded_entry_name = encode_dir_object(entry.name.as_str()); - let mut result: HealEntryResult; - let fivs = match entry.file_info_versions(bucket.as_str()) { - Ok(fivs) => fivs, - Err(err) => { - match self_clone - .heal_object( - &bucket, - &encoded_entry_name, - "", - &HealOpts { - scan_mode, - remove: HEAL_DELETE_DANGLING, - ..Default::default() - }, - ) - .await - { - Ok((res, None)) => { - bg_seq.count_healed(HEAL_ITEM_OBJECT.to_string()).await; - result = heal_entry_success(res.object_size); - } - Ok((_, Some(err))) => { - if DiskError::is_err_object_not_found(&err) || DiskError::is_err_version_not_found(&err) { - defer.await; - return; - } - - result = heal_entry_failure(0); - bg_seq.count_failed(HEAL_ITEM_OBJECT.to_string()).await; - info!("unable to heal object {}/{}: {}", bucket, entry.name, err.to_string()); - } - Err(_) => { - result = heal_entry_failure(0); - bg_seq.count_failed(HEAL_ITEM_OBJECT.to_string()).await; - info!("unable to heal object {}/{}: {}", bucket, entry.name, err.to_string()); - } - } - send(result.clone()).await; - defer.await; - return; - } - }; - let mut version_not_found = 0; - for version in fivs.versions.iter() { - if let (Some(started), Some(mod_time)) = (started, version.mod_time) { - if mod_time > started { - version_not_found += 1; - if send(heal_entry_skipped(version.size as usize)).await { - defer.await; - return; - } - continue; - } - } - - let mut version_healed = false; - match self_clone - .heal_object( - &bucket, - &encoded_entry_name, - version - .version_id - .as_ref() - .map(|v| v.to_string()) - .unwrap_or("".to_string()) - .as_str(), - &HealOpts { - scan_mode, - remove: HEAL_DELETE_DANGLING, - ..Default::default() - }, - ) - .await - { - Ok((res, None)) => { - if res.after.drives[tracker_heal.read().await.disk_index.unwrap()].state == DRIVE_STATE_OK { - version_healed = true; - } - } - Ok((_, Some(err))) => match err { - DiskError::FileNotFound | DiskError::FileVersionNotFound => { - version_not_found += 1; - continue; - } - _ => {} - }, - Err(_) => {} - } - - if version_healed { - bg_seq.count_healed(HEAL_ITEM_OBJECT.to_string()).await; - result = heal_entry_success(version.size as usize); - } else { - bg_seq.count_failed(HEAL_ITEM_OBJECT.to_string()).await; - result = heal_entry_failure(version.size as usize); - match version.version_id { - Some(version_id) => { - info!("unable to heal object {}/{}-v({})", bucket, version.name, version_id); - } - None => { - info!("unable to heal object {}/{}", bucket, version.name); - } - } - } - - if !send(result).await { - defer.await; - return; - } - } - if version_not_found == fivs.versions.len() { - defer.await; - return; - } - send(heal_entry_done(entry.name.clone())).await; - defer.await; - }) - }); - let resolver = MetadataResolutionParams { - dir_quorum: 1, - obj_quorum: 1, - bucket: bucket.clone(), - ..Default::default() - }; - let (_, rx) = broadcast::channel(1); - let jt_agree = jt.clone(); - let jt_partial = jt.clone(); - let bucket_agree = bucket.clone(); - let bucket_partial = bucket.clone(); - let heal_entry_agree = heal_entry.clone(); - let heal_entry_partial = heal_entry.clone(); - if let Err(err) = list_path_raw( - rx, - ListPathRawOptions { - disks, - fallback_disks, - bucket: bucket.clone(), - recursive: true, - forward_to, - min_disks: 1, - report_not_found: false, - agreed: Some(Box::new(move |entry: MetaCacheEntry| { - let jt = jt_agree.clone(); - let bucket = bucket_agree.clone(); - let heal_entry = heal_entry_agree.clone(); - Box::pin(async move { - jt.take().await; - let bucket = bucket.clone(); - tokio::spawn(async move { - heal_entry(bucket, entry).await; - }); - }) - })), - partial: Some(Box::new(move |entries: MetaCacheEntries, _: &[Option]| { - let jt = jt_partial.clone(); - let bucket = bucket_partial.clone(); - let heal_entry = heal_entry_partial.clone(); - Box::pin({ - let heal_entry = heal_entry.clone(); - let resolver = resolver.clone(); - async move { - let entry = if let Some(entry) = entries.resolve(resolver) { - entry - } else if let (Some(entry), _) = entries.first_found() { - entry - } else { - return; - }; - jt.take().await; - let bucket = bucket.clone(); - let heal_entry = heal_entry.clone(); - tokio::spawn(async move { - heal_entry(bucket, entry).await; - }); - } - }) - })), - finished: None, - ..Default::default() - }, - ) - .await - { - ret_err = Some(err.into()); - } - - jt.wait().await; - if let Some(err) = ret_err.as_ref() { - info!("listing failed with: {} on bucket: {}", err.to_string(), bucket); - continue; - } - tracker.write().await.bucket_done(bucket).await; - if let Err(err) = tracker.write().await.update().await { - info!("tracker update failed, err: {}", err.to_string()); - } - } - - if let Some(err) = ret_err.as_ref() { - return Err(err.clone()); - } - if !tracker.read().await.queue_buckets.is_empty() { - return Err(Error::other(format!( - "not all buckets were healed: {:?}", - tracker.read().await.queue_buckets - ))); - } - drop(result_tx); - let _ = task.await; - defer.await; - Ok(()) - } - async fn delete_prefix(&self, bucket: &str, prefix: &str) -> disk::error::Result<()> { let disks = self.get_disks_internal().await; let write_quorum = disks.len() / 2 + 1; @@ -4655,23 +3915,15 @@ impl StorageAPI for SetDisks { #[tracing::instrument(skip(self))] async fn add_partial(&self, bucket: &str, object: &str, version_id: &str) -> Result<()> { - let _ = rustfs_common::heal_channel::send_heal_request( - rustfs_common::heal_channel::create_heal_request_with_options( - bucket.to_string(), - Some(object.to_string()), - false, - Some(rustfs_common::heal_channel::HealChannelPriority::Normal), - Some(self.pool_index), - Some(self.set_index), - None, - None, - None, - None, - None, - None, - None, - ) - ).await; + let _ = rustfs_common::heal_channel::send_heal_request(rustfs_common::heal_channel::create_heal_request_with_options( + bucket.to_string(), + Some(object.to_string()), + false, + Some(rustfs_common::heal_channel::HealChannelPriority::Normal), + Some(self.pool_index), + Some(self.set_index), + )) + .await; Ok(()) } @@ -5813,23 +5065,16 @@ impl StorageAPI for SetDisks { .await?; } if let Some(versions) = versions { - let _ = rustfs_common::heal_channel::send_heal_request( - rustfs_common::heal_channel::create_heal_request_with_options( + let _ = + rustfs_common::heal_channel::send_heal_request(rustfs_common::heal_channel::create_heal_request_with_options( bucket.to_string(), Some(object.to_string()), false, Some(rustfs_common::heal_channel::HealChannelPriority::Normal), Some(self.pool_index), Some(self.set_index), - None, - None, - None, - None, - None, - None, - None, - ) - ).await; + )) + .await; } let upload_id_path = upload_id_path.clone(); @@ -5915,11 +5160,11 @@ impl StorageAPI for SetDisks { let (result, err) = self.heal_object(bucket, object, version_id, opts).await?; if let Some(err) = err.as_ref() { match err { - &DiskError::FileCorrupt if opts.scan_mode != HEAL_DEEP_SCAN => { + &DiskError::FileCorrupt if opts.scan_mode != HealScanMode::Deep => { // Instead of returning an error when a bitrot error is detected // during a normal heal scan, heal again with bitrot flag enabled. let mut opts = *opts; - opts.scan_mode = HEAL_DEEP_SCAN; + opts.scan_mode = HealScanMode::Deep; let (result, err) = self.heal_object(bucket, object, version_id, &opts).await?; return Ok((result, err.map(|e| e.into()))); } @@ -5929,18 +5174,6 @@ impl StorageAPI for SetDisks { Ok((result, err.map(|e| e.into()))) } - #[tracing::instrument(skip(self))] - async fn heal_objects( - &self, - _bucket: &str, - _prefix: &str, - _opts: &HealOpts, - _hs: Arc, - _is_meta: bool, - ) -> Result<()> { - unimplemented!() - } - #[tracing::instrument(skip(self))] async fn get_pool_and_set(&self, _id: &str) -> Result<(Option, Option, Option)> { unimplemented!() @@ -6236,7 +5469,7 @@ async fn disks_with_all_parts( let mut verify_resp = CheckPartsResp::default(); let mut verify_err = None; meta.data_dir = latest_meta.data_dir; - if scan_mode == HEAL_DEEP_SCAN { + if scan_mode == HealScanMode::Deep { // disk has a valid xl.meta but may not have all the // parts. This is considered an outdated disk, since // it needs healing too. diff --git a/crates/ecstore/src/store_api.rs b/crates/ecstore/src/store_api.rs index 5f1ea4e0..5a582a52 100644 --- a/crates/ecstore/src/store_api.rs +++ b/crates/ecstore/src/store_api.rs @@ -970,6 +970,7 @@ pub trait StorageAPI: ObjectIO { // Walk TODO: async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result; + async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()>; async fn copy_object( &self, src_bucket: &str, diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index c6677ed9..0aa2b573 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -54,6 +54,7 @@ use rustfs_iam::init_iam_sys; use rustfs_obs::{init_obs, set_global_guard}; use rustfs_utils::net::parse_and_resolve_address; use std::io::{Error, Result}; +use std::sync::Arc; use tracing::{debug, error, info, instrument, warn}; #[cfg(all(target_os = "linux", target_env = "gnu"))]