From 3409cd8dfffa2ce72b1a7d543ee3288fc99c0b75 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Fri, 11 Jul 2025 18:42:12 +0800 Subject: [PATCH] feat(ahm): add HealingTracker support & complete fresh-disk healing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Introduce ecstore HealingTracker into ahm crate; load/init/save tracker • Re-implement heal_fresh_disk to use heal_erasure_set with tracker • Enhance auto-disk scanner: detect unformatted disks via get_disk_id() • Remove DataUsageCache handling for now • Refactor imports & types, clean up duplicate constants --- crates/ahm/src/error.rs | 5 +- crates/ahm/src/heal/event.rs | 250 ++++++++------- crates/ahm/src/heal/manager.rs | 88 +++++- crates/ahm/src/heal/mod.rs | 2 +- crates/ahm/src/heal/progress.rs | 11 +- crates/ahm/src/heal/storage.rs | 181 ++++++----- crates/ahm/src/heal/task.rs | 357 ++++++++++++---------- crates/ahm/src/lib.rs | 2 +- crates/ahm/src/scanner/data_scanner.rs | 47 ++- crates/ahm/tests/heal_integration_test.rs | 297 ++++++++++-------- crates/ecstore/src/set_disk.rs | 6 +- 11 files changed, 731 insertions(+), 515 deletions(-) diff --git a/crates/ahm/src/error.rs b/crates/ahm/src/error.rs index a8938f9a..aca50383 100644 --- a/crates/ahm/src/error.rs +++ b/crates/ahm/src/error.rs @@ -24,6 +24,9 @@ pub enum Error { #[error("Storage error: {0}")] Storage(#[from] rustfs_ecstore::error::Error), + #[error("Disk error: {0}")] + Disk(#[from] rustfs_ecstore::disk::error::DiskError), + #[error("Configuration error: {0}")] Config(String), @@ -88,4 +91,4 @@ impl From for std::io::Error { fn from(err: Error) -> Self { std::io::Error::other(err) } -} \ No newline at end of file +} diff --git a/crates/ahm/src/heal/event.rs b/crates/ahm/src/heal/event.rs index 0e59452e..85ecb002 100644 --- a/crates/ahm/src/heal/event.rs +++ b/crates/ahm/src/heal/event.rs @@ -106,87 +106,88 @@ impl HealEvent { /// Convert HealEvent to HealRequest pub fn to_heal_request(&self) -> HealRequest { match self { - HealEvent::ObjectCorruption { bucket, object, version_id, severity, .. } => { - HealRequest::new( - HealType::Object { - bucket: bucket.clone(), - object: object.clone(), - version_id: version_id.clone(), - }, - HealOptions::default(), - Self::severity_to_priority(severity), - ) - } - HealEvent::ObjectMissing { bucket, object, version_id, .. } => { - HealRequest::new( - HealType::Object { - bucket: bucket.clone(), - object: object.clone(), - version_id: version_id.clone(), - }, - HealOptions::default(), - HealPriority::High, - ) - } - HealEvent::MetadataCorruption { bucket, object, .. } => { - HealRequest::new( - HealType::Metadata { - bucket: bucket.clone(), - object: object.clone(), - }, - HealOptions::default(), - HealPriority::High, - ) - } - HealEvent::DiskStatusChange { endpoint, .. } => { - HealRequest::new( - HealType::Disk { - endpoint: endpoint.clone(), - }, - HealOptions::default(), - HealPriority::High, - ) - } - HealEvent::ECDecodeFailure { bucket, object, version_id, .. } => { - HealRequest::new( - HealType::ECDecode { - bucket: bucket.clone(), - object: object.clone(), - version_id: version_id.clone(), - }, - HealOptions::default(), - HealPriority::Urgent, - ) - } - HealEvent::ChecksumMismatch { bucket, object, version_id, .. } => { - HealRequest::new( - HealType::Object { - bucket: bucket.clone(), - object: object.clone(), - version_id: version_id.clone(), - }, - HealOptions::default(), - HealPriority::High, - ) - } + HealEvent::ObjectCorruption { + bucket, + object, + version_id, + severity, + .. + } => HealRequest::new( + HealType::Object { + bucket: bucket.clone(), + object: object.clone(), + version_id: version_id.clone(), + }, + HealOptions::default(), + Self::severity_to_priority(severity), + ), + HealEvent::ObjectMissing { + bucket, + object, + version_id, + .. + } => HealRequest::new( + HealType::Object { + bucket: bucket.clone(), + object: object.clone(), + version_id: version_id.clone(), + }, + HealOptions::default(), + HealPriority::High, + ), + HealEvent::MetadataCorruption { bucket, object, .. } => HealRequest::new( + HealType::Metadata { + bucket: bucket.clone(), + object: object.clone(), + }, + HealOptions::default(), + HealPriority::High, + ), + HealEvent::DiskStatusChange { endpoint, .. } => HealRequest::new( + HealType::Disk { + endpoint: endpoint.clone(), + }, + HealOptions::default(), + HealPriority::High, + ), + HealEvent::ECDecodeFailure { + bucket, + object, + version_id, + .. + } => HealRequest::new( + HealType::ECDecode { + bucket: bucket.clone(), + object: object.clone(), + version_id: version_id.clone(), + }, + HealOptions::default(), + HealPriority::Urgent, + ), + HealEvent::ChecksumMismatch { + bucket, + object, + version_id, + .. + } => HealRequest::new( + HealType::Object { + bucket: bucket.clone(), + object: object.clone(), + version_id: version_id.clone(), + }, + HealOptions::default(), + HealPriority::High, + ), HealEvent::BucketMetadataCorruption { bucket, .. } => { - HealRequest::new( - HealType::Bucket { - bucket: bucket.clone(), - }, - HealOptions::default(), - HealPriority::High, - ) - } - HealEvent::MRFMetadataCorruption { meta_path, .. } => { - HealRequest::new( - HealType::MRF { - meta_path: meta_path.clone(), - }, - HealOptions::default(), - HealPriority::High, - ) + HealRequest::new(HealType::Bucket { bucket: bucket.clone() }, HealOptions::default(), HealPriority::High) } + HealEvent::MRFMetadataCorruption { meta_path, .. } => HealRequest::new( + HealType::MRF { + meta_path: meta_path.clone(), + }, + HealOptions::default(), + HealPriority::High, + ), } } @@ -203,29 +204,63 @@ impl HealEvent { /// Get event description pub fn description(&self) -> String { match self { - HealEvent::ObjectCorruption { bucket, object, corruption_type, .. } => { - format!("Object corruption detected: {}/{} - {:?}", bucket, object, corruption_type) + HealEvent::ObjectCorruption { + bucket, + object, + corruption_type, + .. + } => { + format!("Object corruption detected: {bucket}/{object} - {corruption_type:?}") } HealEvent::ObjectMissing { bucket, object, .. } => { - format!("Object missing: {}/{}", bucket, object) + format!("Object missing: {bucket}/{object}") } - HealEvent::MetadataCorruption { bucket, object, corruption_type, .. } => { - format!("Metadata corruption: {}/{} - {:?}", bucket, object, corruption_type) + HealEvent::MetadataCorruption { + bucket, + object, + corruption_type, + .. + } => { + format!("Metadata corruption: {bucket}/{object} - {corruption_type:?}") } - HealEvent::DiskStatusChange { endpoint, old_status, new_status, .. } => { - format!("Disk status changed: {:?} {} -> {}", endpoint, old_status, new_status) + HealEvent::DiskStatusChange { + endpoint, + old_status, + new_status, + .. + } => { + format!("Disk status changed: {endpoint:?} {old_status} -> {new_status}") } - HealEvent::ECDecodeFailure { bucket, object, missing_shards, .. } => { - format!("EC decode failure: {}/{} - missing shards: {:?}", bucket, object, missing_shards) + HealEvent::ECDecodeFailure { + bucket, + object, + missing_shards, + .. + } => { + format!("EC decode failure: {bucket}/{object} - missing shards: {missing_shards:?}") } - HealEvent::ChecksumMismatch { bucket, object, expected_checksum, actual_checksum, .. } => { - format!("Checksum mismatch: {}/{} - expected: {}, actual: {}", bucket, object, expected_checksum, actual_checksum) + HealEvent::ChecksumMismatch { + bucket, + object, + expected_checksum, + actual_checksum, + .. + } => { + format!( + "Checksum mismatch: {bucket}/{object} - expected: {expected_checksum}, actual: {actual_checksum}" + ) } - HealEvent::BucketMetadataCorruption { bucket, corruption_type, .. } => { - format!("Bucket metadata corruption: {} - {:?}", bucket, corruption_type) + HealEvent::BucketMetadataCorruption { + bucket, corruption_type, .. + } => { + format!("Bucket metadata corruption: {bucket} - {corruption_type:?}") } - HealEvent::MRFMetadataCorruption { meta_path, corruption_type, .. } => { - format!("MRF metadata corruption: {} - {:?}", meta_path, corruption_type) + HealEvent::MRFMetadataCorruption { + meta_path, + corruption_type, + .. + } => { + format!("MRF metadata corruption: {meta_path} - {corruption_type:?}") } } } @@ -292,27 +327,22 @@ impl HealEventHandler { /// Filter events by severity pub fn filter_by_severity(&self, min_severity: Severity) -> Vec<&HealEvent> { - self.events - .iter() - .filter(|event| event.severity() >= min_severity) - .collect() + self.events.iter().filter(|event| event.severity() >= min_severity).collect() } /// Filter events by type pub fn filter_by_type(&self, event_type: &str) -> Vec<&HealEvent> { self.events .iter() - .filter(|event| { - match event { - HealEvent::ObjectCorruption { .. } => event_type == "ObjectCorruption", - HealEvent::ObjectMissing { .. } => event_type == "ObjectMissing", - HealEvent::MetadataCorruption { .. } => event_type == "MetadataCorruption", - HealEvent::DiskStatusChange { .. } => event_type == "DiskStatusChange", - HealEvent::ECDecodeFailure { .. } => event_type == "ECDecodeFailure", - HealEvent::ChecksumMismatch { .. } => event_type == "ChecksumMismatch", - HealEvent::BucketMetadataCorruption { .. } => event_type == "BucketMetadataCorruption", - HealEvent::MRFMetadataCorruption { .. } => event_type == "MRFMetadataCorruption", - } + .filter(|event| match event { + HealEvent::ObjectCorruption { .. } => event_type == "ObjectCorruption", + HealEvent::ObjectMissing { .. } => event_type == "ObjectMissing", + HealEvent::MetadataCorruption { .. } => event_type == "MetadataCorruption", + HealEvent::DiskStatusChange { .. } => event_type == "DiskStatusChange", + HealEvent::ECDecodeFailure { .. } => event_type == "ECDecodeFailure", + HealEvent::ChecksumMismatch { .. } => event_type == "ChecksumMismatch", + HealEvent::BucketMetadataCorruption { .. } => event_type == "BucketMetadataCorruption", + HealEvent::MRFMetadataCorruption { .. } => event_type == "MRFMetadataCorruption", }) .collect() } @@ -322,4 +352,4 @@ impl Default for HealEventHandler { fn default() -> Self { Self::new(1000) } -} \ No newline at end of file +} diff --git a/crates/ahm/src/heal/manager.rs b/crates/ahm/src/heal/manager.rs index 9676bea8..694b340b 100644 --- a/crates/ahm/src/heal/manager.rs +++ b/crates/ahm/src/heal/manager.rs @@ -18,6 +18,9 @@ use crate::heal::{ storage::HealStorageAPI, task::{HealRequest, HealTask, HealTaskStatus}, }; +use rustfs_ecstore::disk::error::DiskError; +use rustfs_ecstore::disk::DiskAPI; +use rustfs_ecstore::global::GLOBAL_LOCAL_DISK_MAP; use std::{ collections::{HashMap, VecDeque}, sync::Arc, @@ -122,6 +125,9 @@ impl HealManager { // start scheduler self.start_scheduler().await?; + // start auto disk scanner + self.start_auto_disk_scanner().await?; + info!("HealManager started successfully"); Ok(()) } @@ -253,6 +259,86 @@ impl HealManager { Ok(()) } + /// Start background task to auto scan local disks and enqueue disk heal requests + async fn start_auto_disk_scanner(&self) -> Result<()> { + let config = self.config.clone(); + let heal_queue = self.heal_queue.clone(); + let active_heals = self.active_heals.clone(); + let cancel_token = self.cancel_token.clone(); + + tokio::spawn(async move { + let mut interval = interval(config.read().await.heal_interval); + + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + info!("Auto disk scanner received shutdown signal"); + break; + } + _ = interval.tick() => { + // Build list of endpoints that need healing + let mut endpoints = Vec::new(); + println!("GLOBAL_LOCAL_DISK_MAP length: {:?}", GLOBAL_LOCAL_DISK_MAP.read().await.len()); + for (_, disk_opt) in GLOBAL_LOCAL_DISK_MAP.read().await.iter() { + if let Some(disk) = disk_opt { + // detect unformatted disk via get_disk_id() + if let Err(err) = disk.get_disk_id().await { + if err == DiskError::UnformattedDisk { + endpoints.push(disk.endpoint()); + continue; + } + } + // disk currently healing and not finished + if let Some(h) = disk.healing().await { + if !h.finished { + endpoints.push(disk.endpoint()); + } + } + } + } + + if endpoints.is_empty() { + continue; + } + println!("endpoints length: {:?}", endpoints.len()); + + for ep in endpoints { + // skip if already queued or healing + let mut skip = false; + { + let queue = heal_queue.lock().await; + if queue.iter().any(|req| matches!(&req.heal_type, crate::heal::task::HealType::Disk { endpoint } if endpoint == &ep)) { + skip = true; + } + } + if !skip { + let active = active_heals.lock().await; + if active.values().any(|task| matches!(&task.heal_type, crate::heal::task::HealType::Disk { endpoint } if endpoint == &ep)) { + skip = true; + } + } + + if skip { + continue; + } + + // enqueue heal request for this disk + let req = crate::heal::task::HealRequest::new( + crate::heal::task::HealType::Disk { endpoint: ep.clone() }, + crate::heal::task::HealOptions::default(), + crate::heal::task::HealPriority::Normal, + ); + let mut queue = heal_queue.lock().await; + queue.push_back(req); + info!("Enqueued auto disk heal for endpoint: {}", ep); + } + } + } + } + }); + Ok(()) + } + /// Process heal queue async fn process_heal_queue( heal_queue: &Arc>>, @@ -321,4 +407,4 @@ impl std::fmt::Debug for HealManager { .field("queue_length", &"") .finish() } -} \ No newline at end of file +} diff --git a/crates/ahm/src/heal/mod.rs b/crates/ahm/src/heal/mod.rs index 290a99a5..742c2766 100644 --- a/crates/ahm/src/heal/mod.rs +++ b/crates/ahm/src/heal/mod.rs @@ -19,4 +19,4 @@ pub mod storage; pub mod task; pub use manager::HealManager; -pub use task::{HealOptions, HealPriority, HealRequest, HealTask, HealType}; \ No newline at end of file +pub use task::{HealOptions, HealPriority, HealRequest, HealTask, HealType}; diff --git a/crates/ahm/src/heal/progress.rs b/crates/ahm/src/heal/progress.rs index aea4aa47..f590a5a5 100644 --- a/crates/ahm/src/heal/progress.rs +++ b/crates/ahm/src/heal/progress.rs @@ -66,7 +66,8 @@ impl HealProgress { } pub fn is_completed(&self) -> bool { - self.progress_percentage >= 100.0 || self.objects_scanned > 0 && self.objects_healed + self.objects_failed >= self.objects_scanned + self.progress_percentage >= 100.0 + || self.objects_scanned > 0 && self.objects_healed + self.objects_failed >= self.objects_scanned } pub fn get_success_rate(&self) -> f64 { @@ -97,6 +98,12 @@ pub struct HealStatistics { pub last_update_time: SystemTime, } +impl Default for HealStatistics { + fn default() -> Self { + Self::new() + } +} + impl HealStatistics { pub fn new() -> Self { Self { @@ -138,4 +145,4 @@ impl HealStatistics { 0.0 } } -} \ No newline at end of file +} diff --git a/crates/ahm/src/heal/storage.rs b/crates/ahm/src/heal/storage.rs index 4a440f00..d04270c8 100644 --- a/crates/ahm/src/heal/storage.rs +++ b/crates/ahm/src/heal/storage.rs @@ -17,8 +17,8 @@ use async_trait::async_trait; use rustfs_ecstore::{ disk::endpoint::Endpoint, heal::heal_commands::{HealOpts, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN}, - store_api::{BucketInfo, StorageAPI, ObjectIO}, store::ECStore, + store_api::{BucketInfo, ObjectIO, StorageAPI}, }; use rustfs_madmin::heal_commands::HealResultItem; use std::sync::Arc; @@ -52,55 +52,61 @@ pub enum DiskStatus { pub trait HealStorageAPI: Send + Sync { /// Get object meta async fn get_object_meta(&self, bucket: &str, object: &str) -> Result>; - + /// Get object data async fn get_object_data(&self, bucket: &str, object: &str) -> Result>>; - + /// Put object data async fn put_object_data(&self, bucket: &str, object: &str, data: &[u8]) -> Result<()>; - + /// Delete object async fn delete_object(&self, bucket: &str, object: &str) -> Result<()>; - + /// Check object integrity async fn verify_object_integrity(&self, bucket: &str, object: &str) -> Result; - + /// EC decode rebuild async fn ec_decode_rebuild(&self, bucket: &str, object: &str) -> Result>; - + /// Get disk status async fn get_disk_status(&self, endpoint: &Endpoint) -> Result; - + /// Format disk async fn format_disk(&self, endpoint: &Endpoint) -> Result<()>; - + /// Get bucket info async fn get_bucket_info(&self, bucket: &str) -> Result>; - + /// Fix bucket metadata async fn heal_bucket_metadata(&self, bucket: &str) -> Result<()>; - + /// Get all buckets async fn list_buckets(&self) -> Result>; - + /// Check object exists async fn object_exists(&self, bucket: &str, object: &str) -> Result; - + /// Get object size async fn get_object_size(&self, bucket: &str, object: &str) -> Result>; - + /// Get object checksum async fn get_object_checksum(&self, bucket: &str, object: &str) -> Result>; /// Heal object using ecstore - async fn heal_object(&self, bucket: &str, object: &str, version_id: Option<&str>, opts: &HealOpts) -> Result<(HealResultItem, Option)>; - + async fn heal_object( + &self, + bucket: &str, + object: &str, + version_id: Option<&str>, + opts: &HealOpts, + ) -> Result<(HealResultItem, Option)>; + /// Heal bucket using ecstore async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result; - + /// Heal format using ecstore async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option)>; - + /// List objects for healing async fn list_objects_for_heal(&self, bucket: &str, prefix: &str) -> Result>; } @@ -120,7 +126,7 @@ impl ECStoreHealStorage { impl HealStorageAPI for ECStoreHealStorage { async fn get_object_meta(&self, bucket: &str, object: &str) -> Result> { debug!("Getting object meta: {}/{}", bucket, object); - + match self.ecstore.get_object_info(bucket, object, &Default::default()).await { Ok(info) => Ok(Some(info)), Err(e) => { @@ -129,32 +135,36 @@ impl HealStorageAPI for ECStoreHealStorage { } } } - + async fn get_object_data(&self, bucket: &str, object: &str) -> Result>> { debug!("Getting object data: {}/{}", bucket, object); - - match (&*self.ecstore).get_object_reader(bucket, object, None, Default::default(), &Default::default()).await { - Ok(mut reader) => { - match reader.read_all().await { - Ok(data) => Ok(Some(data)), - Err(e) => { - error!("Failed to read object data: {}/{} - {}", bucket, object, e); - Err(Error::other(e)) - } + + match (*self.ecstore) + .get_object_reader(bucket, object, None, Default::default(), &Default::default()) + .await + { + Ok(mut reader) => match reader.read_all().await { + Ok(data) => Ok(Some(data)), + Err(e) => { + error!("Failed to read object data: {}/{} - {}", bucket, object, e); + Err(Error::other(e)) } - } + }, Err(e) => { error!("Failed to get object: {}/{} - {}", bucket, object, e); Err(Error::other(e)) } } } - + async fn put_object_data(&self, bucket: &str, object: &str, data: &[u8]) -> Result<()> { debug!("Putting object data: {}/{} ({} bytes)", bucket, object, data.len()); - + let mut reader = rustfs_ecstore::store_api::PutObjReader::from_vec(data.to_vec()); - match (&*self.ecstore).put_object(bucket, object, &mut reader, &Default::default()).await { + match (*self.ecstore) + .put_object(bucket, object, &mut reader, &Default::default()) + .await + { Ok(_) => { info!("Successfully put object: {}/{}", bucket, object); Ok(()) @@ -165,10 +175,10 @@ impl HealStorageAPI for ECStoreHealStorage { } } } - + async fn delete_object(&self, bucket: &str, object: &str) -> Result<()> { debug!("Deleting object: {}/{}", bucket, object); - + match self.ecstore.delete_object(bucket, object, Default::default()).await { Ok(_) => { info!("Successfully deleted object: {}/{}", bucket, object); @@ -180,10 +190,10 @@ impl HealStorageAPI for ECStoreHealStorage { } } } - + async fn verify_object_integrity(&self, bucket: &str, object: &str) -> Result { debug!("Verifying object integrity: {}/{}", bucket, object); - + // Try to get object info and data to verify integrity match self.get_object_meta(bucket, object).await? { Some(obj_info) => { @@ -192,7 +202,7 @@ impl HealStorageAPI for ECStoreHealStorage { warn!("Object has invalid size: {}/{}", bucket, object); return Ok(false); } - + // Try to read object data to verify it's accessible match self.get_object_data(bucket, object).await { Ok(Some(_)) => { @@ -215,10 +225,10 @@ impl HealStorageAPI for ECStoreHealStorage { } } } - + async fn ec_decode_rebuild(&self, bucket: &str, object: &str) -> Result> { debug!("EC decode rebuild: {}/{}", bucket, object); - + // Use ecstore's heal_object to rebuild the object let heal_opts = HealOpts { recursive: false, @@ -231,15 +241,15 @@ impl HealStorageAPI for ECStoreHealStorage { pool: None, set: None, }; - + match self.heal_object(bucket, object, None, &heal_opts).await { Ok((_result, error)) => { if error.is_some() { return Err(Error::TaskExecutionFailed { - message: format!("Heal failed: {:?}", error), + message: format!("Heal failed: {error:?}"), }); } - + // After healing, try to read the object data match self.get_object_data(bucket, object).await? { Some(data) => { @@ -249,7 +259,7 @@ impl HealStorageAPI for ECStoreHealStorage { None => { error!("Object not found after heal: {}/{}", bucket, object); Err(Error::TaskExecutionFailed { - message: format!("Object not found after heal: {}/{}", bucket, object), + message: format!("Object not found after heal: {bucket}/{object}"), }) } } @@ -260,24 +270,24 @@ impl HealStorageAPI for ECStoreHealStorage { } } } - + async fn get_disk_status(&self, endpoint: &Endpoint) -> Result { debug!("Getting disk status: {:?}", endpoint); - + // TODO: implement disk status check using ecstore // For now, return Ok status info!("Disk status check: {:?} - OK", endpoint); Ok(DiskStatus::Ok) } - + async fn format_disk(&self, endpoint: &Endpoint) -> Result<()> { debug!("Formatting disk: {:?}", endpoint); - + // Use ecstore's heal_format match self.heal_format(false).await { Ok((_, error)) => { if error.is_some() { - return Err(Error::other(format!("Format failed: {:?}", error))); + return Err(Error::other(format!("Format failed: {error:?}"))); } info!("Successfully formatted disk: {:?}", endpoint); Ok(()) @@ -288,10 +298,10 @@ impl HealStorageAPI for ECStoreHealStorage { } } } - + async fn get_bucket_info(&self, bucket: &str) -> Result> { debug!("Getting bucket info: {}", bucket); - + match self.ecstore.get_bucket_info(bucket, &Default::default()).await { Ok(info) => Ok(Some(info)), Err(e) => { @@ -300,10 +310,10 @@ impl HealStorageAPI for ECStoreHealStorage { } } } - + async fn heal_bucket_metadata(&self, bucket: &str) -> Result<()> { debug!("Healing bucket metadata: {}", bucket); - + let heal_opts = HealOpts { recursive: true, dry_run: false, @@ -315,7 +325,7 @@ impl HealStorageAPI for ECStoreHealStorage { pool: None, set: None, }; - + match self.heal_bucket(bucket, &heal_opts).await { Ok(_) => { info!("Successfully healed bucket metadata: {}", bucket); @@ -327,10 +337,10 @@ impl HealStorageAPI for ECStoreHealStorage { } } } - + async fn list_buckets(&self) -> Result> { debug!("Listing buckets"); - + match self.ecstore.list_bucket(&Default::default()).await { Ok(buckets) => Ok(buckets), Err(e) => { @@ -339,36 +349,34 @@ impl HealStorageAPI for ECStoreHealStorage { } } } - + async fn object_exists(&self, bucket: &str, object: &str) -> Result { debug!("Checking object exists: {}/{}", bucket, object); - + match self.get_object_meta(bucket, object).await { Ok(Some(_)) => Ok(true), Ok(None) => Ok(false), Err(_) => Ok(false), } } - + async fn get_object_size(&self, bucket: &str, object: &str) -> Result> { debug!("Getting object size: {}/{}", bucket, object); - + match self.get_object_meta(bucket, object).await { Ok(Some(obj_info)) => Ok(Some(obj_info.size as u64)), Ok(None) => Ok(None), Err(e) => Err(e), } } - + async fn get_object_checksum(&self, bucket: &str, object: &str) -> Result> { debug!("Getting object checksum: {}/{}", bucket, object); - + match self.get_object_meta(bucket, object).await { Ok(Some(obj_info)) => { // Convert checksum bytes to hex string - let checksum = obj_info.checksum.iter() - .map(|b| format!("{:02x}", b)) - .collect::(); + let checksum = obj_info.checksum.iter().map(|b| format!("{b:02x}")).collect::(); Ok(Some(checksum)) } Ok(None) => Ok(None), @@ -376,14 +384,20 @@ impl HealStorageAPI for ECStoreHealStorage { } } - async fn heal_object(&self, bucket: &str, object: &str, version_id: Option<&str>, opts: &HealOpts) -> Result<(HealResultItem, Option)> { + async fn heal_object( + &self, + bucket: &str, + object: &str, + version_id: Option<&str>, + opts: &HealOpts, + ) -> Result<(HealResultItem, Option)> { debug!("Healing object: {}/{}", bucket, object); - + let version_id_str = version_id.unwrap_or(""); - + match self.ecstore.heal_object(bucket, object, version_id_str, opts).await { Ok((result, ecstore_error)) => { - let error = ecstore_error.map(|e| Error::other(e)); + let error = ecstore_error.map(Error::other); info!("Heal object completed: {}/{} - result: {:?}, error: {:?}", bucket, object, result, error); Ok((result, error)) } @@ -393,10 +407,10 @@ impl HealStorageAPI for ECStoreHealStorage { } } } - + async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result { debug!("Healing bucket: {}", bucket); - + match self.ecstore.heal_bucket(bucket, opts).await { Ok(result) => { info!("Heal bucket completed: {} - result: {:?}", bucket, result); @@ -408,13 +422,13 @@ impl HealStorageAPI for ECStoreHealStorage { } } } - + async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option)> { debug!("Healing format (dry_run: {})", dry_run); - + match self.ecstore.heal_format(dry_run).await { Ok((result, ecstore_error)) => { - let error = ecstore_error.map(|e| Error::other(e)); + let error = ecstore_error.map(Error::other); info!("Heal format completed - result: {:?}, error: {:?}", result, error); Ok((result, error)) } @@ -424,18 +438,19 @@ impl HealStorageAPI for ECStoreHealStorage { } } } - + async fn list_objects_for_heal(&self, bucket: &str, prefix: &str) -> Result> { debug!("Listing objects for heal: {}/{}", bucket, prefix); - + // Use list_objects_v2 to get objects - match self.ecstore.clone().list_objects_v2( - bucket, prefix, None, None, 1000, false, None - ).await { + match self + .ecstore + .clone() + .list_objects_v2(bucket, prefix, None, None, 1000, false, None) + .await + { Ok(list_info) => { - let objects: Vec = list_info.objects.into_iter() - .map(|obj| obj.name) - .collect(); + let objects: Vec = list_info.objects.into_iter().map(|obj| obj.name).collect(); info!("Found {} objects for heal in {}/{}", objects.len(), bucket, prefix); Ok(objects) } @@ -445,4 +460,4 @@ impl HealStorageAPI for ECStoreHealStorage { } } } -} \ No newline at end of file +} diff --git a/crates/ahm/src/heal/task.rs b/crates/ahm/src/heal/task.rs index 0d531def..8650e571 100644 --- a/crates/ahm/src/heal/task.rs +++ b/crates/ahm/src/heal/task.rs @@ -14,24 +14,25 @@ use crate::error::{Error, Result}; use crate::heal::{progress::HealProgress, storage::HealStorageAPI}; +use rustfs_ecstore::config::RUSTFS_CONFIG_PREFIX; use rustfs_ecstore::disk::endpoint::Endpoint; -use crate::heal::storage::DiskStatus; +use rustfs_ecstore::disk::error::DiskError; +use rustfs_ecstore::disk::{DiskAPI, DiskInfoOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET}; +use rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN; +use rustfs_ecstore::heal::heal_commands::{init_healing_tracker, load_healing_tracker, HealScanMode}; +use rustfs_ecstore::new_object_layer_fn; +use rustfs_ecstore::store::get_disk_via_endpoint; +use rustfs_ecstore::store_api::BucketInfo; +use rustfs_utils::path::path_join; use serde::{Deserialize, Serialize}; -use std::{ - sync::Arc, - time::{Duration, SystemTime}, -}; +use std::cmp::Ordering; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; use tokio::sync::RwLock; use tracing::{error, info, warn}; use uuid::Uuid; -/// Heal scan mode -pub type HealScanMode = usize; - -pub const HEAL_UNKNOWN_SCAN: HealScanMode = 0; -pub const HEAL_NORMAL_SCAN: HealScanMode = 1; -pub const HEAL_DEEP_SCAN: HealScanMode = 2; - /// Heal type #[derive(Debug, Clone)] pub enum HealType { @@ -42,22 +43,13 @@ pub enum HealType { version_id: Option, }, /// Bucket heal - Bucket { - bucket: String, - }, + Bucket { bucket: String }, /// Disk heal - Disk { - endpoint: Endpoint, - }, + Disk { endpoint: Endpoint }, /// Metadata heal - Metadata { - bucket: String, - object: String, - }, + Metadata { bucket: String, object: String }, /// MRF heal - MRF { - meta_path: String, - }, + MRF { meta_path: String }, /// EC decode heal ECDecode { bucket: String, @@ -119,7 +111,7 @@ impl Default for HealOptions { } /// Heal task status -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum HealTaskStatus { /// Pending Pending, @@ -174,27 +166,15 @@ impl HealRequest { } pub fn bucket(bucket: String) -> Self { - Self::new( - HealType::Bucket { bucket }, - HealOptions::default(), - HealPriority::Normal, - ) + Self::new(HealType::Bucket { bucket }, HealOptions::default(), HealPriority::Normal) } pub fn disk(endpoint: Endpoint) -> Self { - Self::new( - HealType::Disk { endpoint }, - HealOptions::default(), - HealPriority::High, - ) + Self::new(HealType::Disk { endpoint }, HealOptions::default(), HealPriority::High) } pub fn metadata(bucket: String, object: String) -> Self { - Self::new( - HealType::Metadata { bucket, object }, - HealOptions::default(), - HealPriority::High, - ) + Self::new(HealType::Metadata { bucket, object }, HealOptions::default(), HealPriority::High) } pub fn ec_decode(bucket: String, object: String, version_id: Option) -> Self { @@ -264,24 +244,20 @@ impl HealTask { info!("Starting heal task: {} with type: {:?}", self.id, self.heal_type); let result = match &self.heal_type { - HealType::Object { bucket, object, version_id } => { - self.heal_object(bucket, object, version_id.as_deref()).await - } - HealType::Bucket { bucket } => { - self.heal_bucket(bucket).await - } - HealType::Disk { endpoint } => { - self.heal_disk(endpoint).await - } - HealType::Metadata { bucket, object } => { - self.heal_metadata(bucket, object).await - } - HealType::MRF { meta_path } => { - self.heal_mrf(meta_path).await - } - HealType::ECDecode { bucket, object, version_id } => { - self.heal_ec_decode(bucket, object, version_id.as_deref()).await - } + HealType::Object { + bucket, + object, + version_id, + } => self.heal_object(bucket, object, version_id.as_deref()).await, + HealType::Bucket { bucket } => self.heal_bucket(bucket).await, + HealType::Disk { endpoint } => self.heal_disk(endpoint).await, + HealType::Metadata { bucket, object } => self.heal_metadata(bucket, object).await, + HealType::MRF { meta_path } => self.heal_mrf(meta_path).await, + HealType::ECDecode { + bucket, + object, + version_id, + } => self.heal_ec_decode(bucket, object, version_id.as_deref()).await, }; // update completed time and status @@ -298,9 +274,7 @@ impl HealTask { } Err(e) => { let mut status = self.status.write().await; - *status = HealTaskStatus::Failed { - error: e.to_string(), - }; + *status = HealTaskStatus::Failed { error: e.to_string() }; error!("Heal task failed: {} with error: {}", self.id, e); } } @@ -327,11 +301,11 @@ impl HealTask { // specific heal implementation method async fn heal_object(&self, bucket: &str, object: &str, version_id: Option<&str>) -> Result<()> { info!("Healing object: {}/{}", bucket, object); - + // update progress { let mut progress = self.progress.write().await; - progress.set_current_object(Some(format!("{}/{}", bucket, object))); + progress.set_current_object(Some(format!("{bucket}/{object}"))); progress.update_progress(0, 4, 0, 0); // 开始heal,总共4个步骤 } @@ -345,47 +319,24 @@ impl HealTask { return self.recreate_missing_object(bucket, object, version_id).await; } else { return Err(Error::TaskExecutionFailed { - message: format!("Object not found: {}/{}", bucket, object), + message: format!("Object not found: {bucket}/{object}"), }); } } { let mut progress = self.progress.write().await; - progress.update_progress(1, 4, 0, 0); + progress.update_progress(1, 3, 0, 0); } - // Step 2: Verify object integrity - info!("Step 2: Verifying object integrity"); - let integrity_ok = self.storage.verify_object_integrity(bucket, object).await?; - if integrity_ok { - info!("Object integrity check passed: {}/{}", bucket, object); - { - let mut progress = self.progress.write().await; - progress.update_progress(4, 4, 0, 0); - } - return Ok(()); - } - - warn!("Object integrity check failed: {}/{}", bucket, object); - { - let mut progress = self.progress.write().await; - progress.update_progress(2, 4, 0, 0); - } - - // Step 3: Perform actual heal using ecstore - info!("Step 3: Performing heal using ecstore"); + // Step 2: directly call ecstore to perform heal + info!("Step 2: Performing heal using ecstore"); let heal_opts = rustfs_ecstore::heal::heal_commands::HealOpts { recursive: self.options.recursive, dry_run: self.options.dry_run, remove: self.options.remove_corrupted, recreate: self.options.recreate_missing, - scan_mode: match self.options.scan_mode { - crate::heal::task::HEAL_UNKNOWN_SCAN => rustfs_ecstore::heal::heal_commands::HEAL_UNKNOWN_SCAN, - crate::heal::task::HEAL_NORMAL_SCAN => rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN, - crate::heal::task::HEAL_DEEP_SCAN => rustfs_ecstore::heal::heal_commands::HEAL_DEEP_SCAN, - _ => rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN, - }, + scan_mode: self.options.scan_mode, update_parity: self.options.update_parity, no_lock: false, pool: None, @@ -396,7 +347,7 @@ impl HealTask { Ok((result, error)) => { if let Some(e) = error { error!("Heal operation failed: {}/{} - {}", bucket, object, e); - + // If heal failed and remove_corrupted is enabled, delete the corrupted object if self.options.remove_corrupted { warn!("Removing corrupted object: {}/{}", bucket, object); @@ -410,29 +361,34 @@ impl HealTask { { let mut progress = self.progress.write().await; - progress.update_progress(4, 4, 0, 0); + progress.update_progress(3, 3, 0, 0); } - + return Err(Error::TaskExecutionFailed { - message: format!("Failed to heal object {}/{}: {}", bucket, object, e), + message: format!("Failed to heal object {bucket}/{object}: {e}"), }); } - // Step 4: Verify heal result - info!("Step 4: Verifying heal result"); + // Step 3: Verify heal result + info!("Step 3: Verifying heal result"); let object_size = result.object_size as u64; - info!("Heal completed successfully: {}/{} ({} bytes, {} drives healed)", - bucket, object, object_size, result.after.drives.len()); + info!( + "Heal completed successfully: {}/{} ({} bytes, {} drives healed)", + bucket, + object, + object_size, + result.after.drives.len() + ); { let mut progress = self.progress.write().await; - progress.update_progress(4, 4, object_size, object_size); + progress.update_progress(3, 3, object_size, object_size); } Ok(()) } Err(e) => { error!("Heal operation failed: {}/{} - {}", bucket, object, e); - + // If heal failed and remove_corrupted is enabled, delete the corrupted object if self.options.remove_corrupted { warn!("Removing corrupted object: {}/{}", bucket, object); @@ -446,11 +402,11 @@ impl HealTask { { let mut progress = self.progress.write().await; - progress.update_progress(4, 4, 0, 0); + progress.update_progress(3, 3, 0, 0); } - + Err(Error::TaskExecutionFailed { - message: format!("Failed to heal object {}/{}: {}", bucket, object, e), + message: format!("Failed to heal object {bucket}/{object}: {e}"), }) } } @@ -459,7 +415,7 @@ impl HealTask { /// Recreate missing object (for EC decode scenarios) async fn recreate_missing_object(&self, bucket: &str, object: &str, version_id: Option<&str>) -> Result<()> { info!("Attempting to recreate missing object: {}/{}", bucket, object); - + // Use ecstore's heal_object with recreate option let heal_opts = rustfs_ecstore::heal::heal_commands::HealOpts { recursive: false, @@ -478,7 +434,7 @@ impl HealTask { if let Some(e) = error { error!("Failed to recreate missing object: {}/{} - {}", bucket, object, e); return Err(Error::TaskExecutionFailed { - message: format!("Failed to recreate missing object {}/{}: {}", bucket, object, e), + message: format!("Failed to recreate missing object {bucket}/{object}: {e}"), }); } @@ -494,7 +450,7 @@ impl HealTask { Err(e) => { error!("Failed to recreate missing object: {}/{} - {}", bucket, object, e); Err(Error::TaskExecutionFailed { - message: format!("Failed to recreate missing object {}/{}: {}", bucket, object, e), + message: format!("Failed to recreate missing object {bucket}/{object}: {e}"), }) } } @@ -502,11 +458,11 @@ impl HealTask { async fn heal_bucket(&self, bucket: &str) -> Result<()> { info!("Healing bucket: {}", bucket); - + // update progress { let mut progress = self.progress.write().await; - progress.set_current_object(Some(format!("bucket: {}", bucket))); + progress.set_current_object(Some(format!("bucket: {bucket}"))); progress.update_progress(0, 3, 0, 0); } @@ -516,7 +472,7 @@ impl HealTask { if !bucket_exists { warn!("Bucket does not exist: {}", bucket); return Err(Error::TaskExecutionFailed { - message: format!("Bucket not found: {}", bucket), + message: format!("Bucket not found: {bucket}"), }); } @@ -532,12 +488,7 @@ impl HealTask { dry_run: self.options.dry_run, remove: self.options.remove_corrupted, recreate: self.options.recreate_missing, - scan_mode: match self.options.scan_mode { - crate::heal::task::HEAL_UNKNOWN_SCAN => rustfs_ecstore::heal::heal_commands::HEAL_UNKNOWN_SCAN, - crate::heal::task::HEAL_NORMAL_SCAN => rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN, - crate::heal::task::HEAL_DEEP_SCAN => rustfs_ecstore::heal::heal_commands::HEAL_DEEP_SCAN, - _ => rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN, - }, + scan_mode: self.options.scan_mode, update_parity: self.options.update_parity, no_lock: false, pool: None, @@ -547,7 +498,7 @@ impl HealTask { match self.storage.heal_bucket(bucket, &heal_opts).await { Ok(result) => { info!("Bucket heal completed successfully: {} ({} drives)", bucket, result.after.drives.len()); - + { let mut progress = self.progress.write().await; progress.update_progress(3, 3, 0, 0); @@ -561,7 +512,7 @@ impl HealTask { progress.update_progress(3, 3, 0, 0); } Err(Error::TaskExecutionFailed { - message: format!("Failed to heal bucket {}: {}", bucket, e), + message: format!("Failed to heal bucket {bucket}: {e}"), }) } } @@ -569,33 +520,17 @@ impl HealTask { async fn heal_disk(&self, endpoint: &Endpoint) -> Result<()> { info!("Healing disk: {:?}", endpoint); - + // update progress { let mut progress = self.progress.write().await; - progress.set_current_object(Some(format!("disk: {:?}", endpoint))); + progress.set_current_object(Some(format!("disk: {endpoint:?}"))); progress.update_progress(0, 3, 0, 0); } - // Step 1: Check disk status - info!("Step 1: Checking disk status"); - let disk_status = self.storage.get_disk_status(endpoint).await?; - if disk_status == DiskStatus::Ok { - info!("Disk is already healthy: {:?}", endpoint); - { - let mut progress = self.progress.write().await; - progress.update_progress(3, 3, 0, 0); - } - return Ok(()); - } + // Step 1: Perform disk format heal using ecstore + info!("Step 1: Performing disk format heal using ecstore"); - { - let mut progress = self.progress.write().await; - progress.update_progress(1, 3, 0, 0); - } - - // Step 2: Perform disk heal using ecstore - info!("Step 2: Performing disk heal using ecstore"); match self.storage.heal_format(self.options.dry_run).await { Ok((result, error)) => { if let Some(e) = error { @@ -605,12 +540,21 @@ impl HealTask { progress.update_progress(3, 3, 0, 0); } return Err(Error::TaskExecutionFailed { - message: format!("Failed to heal disk {:?}: {}", endpoint, e), + message: format!("Failed to heal disk {endpoint:?}: {e}"), }); } info!("Disk heal completed successfully: {:?} ({} drives)", endpoint, result.after.drives.len()); - + + { + let mut progress = self.progress.write().await; + progress.update_progress(2, 3, 0, 0); + } + + // Step 2: Synchronize data/buckets on the fresh disk + info!("Step 2: Healing buckets on fresh disk"); + self.heal_fresh_disk(endpoint).await?; + { let mut progress = self.progress.write().await; progress.update_progress(3, 3, 0, 0); @@ -624,7 +568,7 @@ impl HealTask { progress.update_progress(3, 3, 0, 0); } Err(Error::TaskExecutionFailed { - message: format!("Failed to heal disk {:?}: {}", endpoint, e), + message: format!("Failed to heal disk {endpoint:?}: {e}"), }) } } @@ -632,11 +576,11 @@ impl HealTask { async fn heal_metadata(&self, bucket: &str, object: &str) -> Result<()> { info!("Healing metadata: {}/{}", bucket, object); - + // update progress { let mut progress = self.progress.write().await; - progress.set_current_object(Some(format!("metadata: {}/{}", bucket, object))); + progress.set_current_object(Some(format!("metadata: {bucket}/{object}"))); progress.update_progress(0, 3, 0, 0); } @@ -646,7 +590,7 @@ impl HealTask { if !object_exists { warn!("Object does not exist: {}/{}", bucket, object); return Err(Error::TaskExecutionFailed { - message: format!("Object not found: {}/{}", bucket, object), + message: format!("Object not found: {bucket}/{object}"), }); } @@ -678,12 +622,17 @@ impl HealTask { progress.update_progress(3, 3, 0, 0); } return Err(Error::TaskExecutionFailed { - message: format!("Failed to heal metadata {}/{}: {}", bucket, object, e), + message: format!("Failed to heal metadata {bucket}/{object}: {e}"), }); } - info!("Metadata heal completed successfully: {}/{} ({} drives)", bucket, object, result.after.drives.len()); - + info!( + "Metadata heal completed successfully: {}/{} ({} drives)", + bucket, + object, + result.after.drives.len() + ); + { let mut progress = self.progress.write().await; progress.update_progress(3, 3, 0, 0); @@ -697,7 +646,7 @@ impl HealTask { progress.update_progress(3, 3, 0, 0); } Err(Error::TaskExecutionFailed { - message: format!("Failed to heal metadata {}/{}: {}", bucket, object, e), + message: format!("Failed to heal metadata {bucket}/{object}: {e}"), }) } } @@ -705,11 +654,11 @@ impl HealTask { async fn heal_mrf(&self, meta_path: &str) -> Result<()> { info!("Healing MRF: {}", meta_path); - + // update progress { let mut progress = self.progress.write().await; - progress.set_current_object(Some(format!("mrf: {}", meta_path))); + progress.set_current_object(Some(format!("mrf: {meta_path}"))); progress.update_progress(0, 2, 0, 0); } @@ -717,7 +666,7 @@ impl HealTask { let parts: Vec<&str> = meta_path.split('/').collect(); if parts.len() < 2 { return Err(Error::TaskExecutionFailed { - message: format!("Invalid meta path format: {}", meta_path), + message: format!("Invalid meta path format: {meta_path}"), }); } @@ -747,12 +696,12 @@ impl HealTask { progress.update_progress(2, 2, 0, 0); } return Err(Error::TaskExecutionFailed { - message: format!("Failed to heal MRF {}: {}", meta_path, e), + message: format!("Failed to heal MRF {meta_path}: {e}"), }); } info!("MRF heal completed successfully: {} ({} drives)", meta_path, result.after.drives.len()); - + { let mut progress = self.progress.write().await; progress.update_progress(2, 2, 0, 0); @@ -766,7 +715,7 @@ impl HealTask { progress.update_progress(2, 2, 0, 0); } Err(Error::TaskExecutionFailed { - message: format!("Failed to heal MRF {}: {}", meta_path, e), + message: format!("Failed to heal MRF {meta_path}: {e}"), }) } } @@ -774,11 +723,11 @@ impl HealTask { async fn heal_ec_decode(&self, bucket: &str, object: &str, version_id: Option<&str>) -> Result<()> { info!("Healing EC decode: {}/{}", bucket, object); - + // update progress { let mut progress = self.progress.write().await; - progress.set_current_object(Some(format!("ec_decode: {}/{}", bucket, object))); + progress.set_current_object(Some(format!("ec_decode: {bucket}/{object}"))); progress.update_progress(0, 3, 0, 0); } @@ -788,7 +737,7 @@ impl HealTask { if !object_exists { warn!("Object does not exist: {}/{}", bucket, object); return Err(Error::TaskExecutionFailed { - message: format!("Object not found: {}/{}", bucket, object), + message: format!("Object not found: {bucket}/{object}"), }); } @@ -820,14 +769,19 @@ impl HealTask { progress.update_progress(3, 3, 0, 0); } return Err(Error::TaskExecutionFailed { - message: format!("Failed to heal EC decode {}/{}: {}", bucket, object, e), + message: format!("Failed to heal EC decode {bucket}/{object}: {e}"), }); } let object_size = result.object_size as u64; - info!("EC decode heal completed successfully: {}/{} ({} bytes, {} drives)", - bucket, object, object_size, result.after.drives.len()); - + info!( + "EC decode heal completed successfully: {}/{} ({} bytes, {} drives)", + bucket, + object, + object_size, + result.after.drives.len() + ); + { let mut progress = self.progress.write().await; progress.update_progress(3, 3, object_size, object_size); @@ -841,11 +795,86 @@ impl HealTask { progress.update_progress(3, 3, 0, 0); } Err(Error::TaskExecutionFailed { - message: format!("Failed to heal EC decode {}/{}: {}", bucket, object, e), + message: format!("Failed to heal EC decode {bucket}/{object}: {e}"), }) } } } + + async fn heal_fresh_disk(&self, endpoint: &Endpoint) -> Result<()> { + // Locate disk via endpoint + let disk = get_disk_via_endpoint(endpoint) + .await + .ok_or_else(|| Error::other(format!("Disk not found for endpoint: {endpoint}")))?; + + // Skip if drive is root or other fatal errors + if let Err(e) = disk.disk_info(&DiskInfoOptions::default()).await { + match e { + DiskError::DriveIsRoot => return Ok(()), + DiskError::UnformattedDisk => { /* continue healing */ } + _ => return Err(Error::other(e)), + } + } + + // Load or init HealingTracker + let mut tracker = match load_healing_tracker(&Some(disk.clone())).await { + Ok(t) => t, + Err(err) => match err { + DiskError::FileNotFound => init_healing_tracker(disk.clone(), &Uuid::new_v4().to_string()) + .await + .map_err(Error::other)?, + _ => return Err(Error::other(err)), + }, + }; + + // Build bucket list + let mut buckets = self.storage.list_buckets().await.map_err(Error::other)?; + buckets.push(BucketInfo { + name: path_join(&[PathBuf::from(RUSTFS_META_BUCKET), PathBuf::from(RUSTFS_CONFIG_PREFIX)]) + .to_string_lossy() + .to_string(), + ..Default::default() + }); + buckets.push(BucketInfo { + name: path_join(&[PathBuf::from(RUSTFS_META_BUCKET), PathBuf::from(BUCKET_META_PREFIX)]) + .to_string_lossy() + .to_string(), + ..Default::default() + }); + + // Sort: system buckets first, others by creation time desc + buckets.sort_by(|a, b| { + let a_sys = a.name.starts_with(RUSTFS_META_BUCKET); + let b_sys = b.name.starts_with(RUSTFS_META_BUCKET); + match (a_sys, b_sys) { + (true, false) => Ordering::Less, + (false, true) => Ordering::Greater, + _ => b.created.cmp(&a.created), + } + }); + + // Update tracker queue and persist + tracker.set_queue_buckets(&buckets).await; + tracker.save().await.map_err(Error::other)?; + + // Prepare bucket names list + let bucket_names: Vec = buckets.iter().map(|b| b.name.clone()).collect(); + + // Run heal_erasure_set using underlying SetDisk + let (pool_idx, set_idx) = (endpoint.pool_idx as usize, endpoint.set_idx as usize); + let Some(store) = new_object_layer_fn() else { + return Err(Error::other("errServerNotInitialized")); + }; + let set_disk = store.pools[pool_idx].disk_set[set_idx].clone(); + + let tracker_arc = Arc::new(RwLock::new(tracker)); + set_disk + .heal_erasure_set(&bucket_names, tracker_arc) + .await + .map_err(Error::other)?; + + Ok(()) + } } impl std::fmt::Debug for HealTask { @@ -857,4 +886,4 @@ impl std::fmt::Debug for HealTask { .field("created_at", &self.created_at) .finish() } -} \ No newline at end of file +} diff --git a/crates/ahm/src/lib.rs b/crates/ahm/src/lib.rs index 0e288d51..3a5117f7 100644 --- a/crates/ahm/src/lib.rs +++ b/crates/ahm/src/lib.rs @@ -20,11 +20,11 @@ pub mod heal; pub mod scanner; pub use error::{Error, Result}; +pub use heal::{HealManager, HealOptions, HealPriority, HealRequest, HealType}; pub use scanner::{ BucketTargetUsageInfo, BucketUsageInfo, DataUsageInfo, Scanner, ScannerMetrics, load_data_usage_from_backend, store_data_usage_in_backend, }; -pub use heal::{HealManager, HealRequest, HealType, HealOptions, HealPriority}; // Global cancellation token for AHM services (scanner and other background tasks) static GLOBAL_AHM_SERVICES_CANCEL_TOKEN: OnceLock = OnceLock::new(); diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index 243296b3..b2d1103a 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -32,11 +32,11 @@ use super::{ data_usage::DataUsageInfo, metrics::{BucketMetrics, DiskMetrics, MetricsCollector, ScannerMetrics}, }; +use crate::heal::HealManager; use crate::{ error::{Error, Result}, get_ahm_services_cancel_token, HealRequest, }; -use crate::heal::HealManager; use rustfs_ecstore::disk::RUSTFS_META_BUCKET; @@ -539,12 +539,12 @@ impl Scanner { Ok(volumes) => volumes, Err(e) => { error!("Failed to list volumes on disk {}: {}", disk_path, e); - + // 磁盘访问失败,提交磁盘heal任务 let enable_healing = self.config.read().await.enable_healing; if enable_healing { if let Some(heal_manager) = &self.heal_manager { - use crate::heal::{HealRequest, HealPriority}; + use crate::heal::{HealPriority, HealRequest}; let req = HealRequest::new( crate::heal::HealType::Disk { endpoint: disk.endpoint().clone(), @@ -562,7 +562,7 @@ impl Scanner { } } } - + return Err(Error::Storage(e.into())); } }; @@ -674,7 +674,7 @@ impl Scanner { if file_meta.versions.is_empty() { objects_with_issues += 1; warn!("Object {} has no versions", entry.name); - + // 对象元数据损坏,提交元数据heal任务 let enable_healing = self.config.read().await.enable_healing; if enable_healing { @@ -682,10 +682,16 @@ impl Scanner { let req = HealRequest::metadata(bucket.to_string(), entry.name.clone()); match heal_manager.submit_heal_request(req).await { Ok(task_id) => { - warn!("object metadata damaged, submit heal task: {} {} / {}", task_id, bucket, entry.name); + warn!( + "object metadata damaged, submit heal task: {} {} / {}", + task_id, bucket, entry.name + ); } Err(e) => { - error!("object metadata damaged, submit heal task failed: {} / {} {}", bucket, entry.name, e); + error!( + "object metadata damaged, submit heal task failed: {} / {} {}", + bucket, entry.name, e + ); } } } @@ -697,7 +703,7 @@ impl Scanner { } else { objects_with_issues += 1; warn!("Failed to parse metadata for object {}", entry.name); - + // 对象元数据解析失败,提交元数据heal任务 let enable_healing = self.config.read().await.enable_healing; if enable_healing { @@ -705,10 +711,16 @@ impl Scanner { let req = HealRequest::metadata(bucket.to_string(), entry.name.clone()); match heal_manager.submit_heal_request(req).await { Ok(task_id) => { - warn!("object metadata parse failed, submit heal task: {} {} / {}", task_id, bucket, entry.name); + warn!( + "object metadata parse failed, submit heal task: {} {} / {}", + task_id, bucket, entry.name + ); } Err(e) => { - error!("object metadata parse failed, submit heal task failed: {} / {} {}", bucket, entry.name, e); + error!( + "object metadata parse failed, submit heal task failed: {} / {} {}", + bucket, entry.name, e + ); } } } @@ -816,12 +828,12 @@ impl Scanner { let missing_disks: Vec = (0..disks.len()).filter(|&i| !locations.contains(&i)).collect(); warn!("Object {}/{} missing from disks: {:?}", bucket, object_name, missing_disks); println!("Object {bucket}/{object_name} missing from disks: {missing_disks:?}"); - + // submit heal task let enable_healing = self.config.read().await.enable_healing; if enable_healing { if let Some(heal_manager) = &self.heal_manager { - use crate::heal::{HealRequest, HealPriority}; + use crate::heal::{HealPriority, HealRequest}; let req = HealRequest::new( crate::heal::HealType::Object { bucket: bucket.clone(), @@ -833,8 +845,10 @@ impl Scanner { ); match heal_manager.submit_heal_request(req).await { Ok(task_id) => { - warn!("object missing, submit heal task: {} {} / {} (missing disks: {:?})", - task_id, bucket, object_name, missing_disks); + warn!( + "object missing, submit heal task: {} {} / {} (missing disks: {:?})", + task_id, bucket, object_name, missing_disks + ); } Err(e) => { error!("object missing, submit heal task failed: {} / {} {}", bucket, object_name, e); @@ -1142,6 +1156,7 @@ mod tests { StorageAPI, store_api::{MakeBucketOptions, ObjectIO, PutObjReader}, }; + use serial_test::serial; use std::fs; use std::net::SocketAddr; @@ -1211,7 +1226,7 @@ mod tests { } #[tokio::test(flavor = "multi_thread")] - #[ignore] + #[serial] async fn test_scanner_basic_functionality() { const TEST_DIR_BASIC: &str = "/tmp/rustfs_ahm_test_basic"; let (disk_paths, ecstore) = prepare_test_env(Some(TEST_DIR_BASIC), Some(9001)).await; @@ -1309,7 +1324,7 @@ mod tests { // test data usage statistics collection and validation #[tokio::test(flavor = "multi_thread")] - #[ignore] + #[serial] async fn test_scanner_usage_stats() { const TEST_DIR_USAGE_STATS: &str = "/tmp/rustfs_ahm_test_usage_stats"; let (_, ecstore) = prepare_test_env(Some(TEST_DIR_USAGE_STATS), Some(9002)).await; diff --git a/crates/ahm/tests/heal_integration_test.rs b/crates/ahm/tests/heal_integration_test.rs index ac5aae85..c161af15 100644 --- a/crates/ahm/tests/heal_integration_test.rs +++ b/crates/ahm/tests/heal_integration_test.rs @@ -1,25 +1,44 @@ use rustfs_ahm::heal::{ - manager::HealManager, + manager::{HealConfig, HealManager}, storage::{ECStoreHealStorage, HealStorageAPI}, - task::{HealOptions, HealPriority, HealRequest, HealType, HEAL_NORMAL_SCAN}, + task::{HealOptions, HealPriority, HealRequest, HealTaskStatus, HealType}, }; use rustfs_ecstore::{ disk::endpoint::Endpoint, endpoints::{EndpointServerPools, Endpoints, PoolEndpoints}, + heal::heal_commands::HEAL_NORMAL_SCAN, store::ECStore, - store_api::{PutObjReader, ObjectOptions, StorageAPI, ObjectIO}, + store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI}, }; +use serial_test::serial; +use std::sync::Once; use std::{path::PathBuf, sync::Arc, time::Duration}; use tokio::fs; use tracing::info; +use walkdir::WalkDir; +static INIT: Once = Once::new(); +fn init_tracing() { + INIT.call_once(|| { + let _ = tracing_subscriber::fmt::try_init(); + }); +} /// Test helper: Create test environment with ECStore async fn setup_test_env() -> (Vec, Arc, Arc) { - // create temp dir as 4 disks - let test_base_dir = "/tmp/rustfs_ahm_heal_test"; - let temp_dir = std::path::PathBuf::from(test_base_dir); + use std::sync::OnceLock; + init_tracing(); + static GLOBAL_ENV: OnceLock<(Vec, Arc, Arc)> = OnceLock::new(); + + // Fast path: already initialized, just clone and return + if let Some((paths, ecstore, heal_storage)) = GLOBAL_ENV.get() { + return (paths.clone(), ecstore.clone(), heal_storage.clone()); + } + + // create temp dir as 4 disks with unique base dir + let test_base_dir = format!("/tmp/rustfs_ahm_heal_test_{}", uuid::Uuid::new_v4()); + let temp_dir = std::path::PathBuf::from(&test_base_dir); if temp_dir.exists() { - fs::remove_dir_all(&temp_dir).await.unwrap(); + fs::remove_dir_all(&temp_dir).await.ok(); } fs::create_dir_all(&temp_dir).await.unwrap(); @@ -57,11 +76,11 @@ async fn setup_test_env() -> (Vec, Arc, Arc (Vec, Arc, Arc, bucket_name: &str) { - (&**ecstore) + (**ecstore) .make_bucket(bucket_name, &Default::default()) .await .expect("Failed to create test bucket"); @@ -92,22 +114,14 @@ async fn create_test_bucket(ecstore: &Arc, bucket_name: &str) { } /// Test helper: Upload test object -async fn upload_test_object( - ecstore: &Arc, - bucket: &str, - object: &str, - data: &[u8], -) { +async fn upload_test_object(ecstore: &Arc, bucket: &str, object: &str, data: &[u8]) { let mut reader = PutObjReader::from_vec(data.to_vec()); - let object_info = (&**ecstore) + let object_info = (**ecstore) .put_object(bucket, object, &mut reader, &ObjectOptions::default()) .await .expect("Failed to upload test object"); - - info!( - "Uploaded test object: {}/{} ({} bytes)", - bucket, object, object_info.size - ); + + info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size); } /// Test helper: Cleanup test environment @@ -117,31 +131,54 @@ async fn cleanup_test_env(disk_paths: &[PathBuf]) { fs::remove_dir_all(disk_path).await.expect("Failed to cleanup disk path"); } } - - // Clean up test base directory - let test_base_dir = PathBuf::from("/tmp/rustfs_ahm_heal_test"); - if test_base_dir.exists() { - fs::remove_dir_all(&test_base_dir).await.expect("Failed to cleanup test base directory"); + + // Attempt to clean up base directory inferred from disk_paths[0] + if let Some(parent) = disk_paths.first().and_then(|p| p.parent()).and_then(|p| p.parent()) { + if parent.exists() { + fs::remove_dir_all(parent).await.ok(); + } } - + info!("Test environment cleaned up"); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] async fn test_heal_object_basic() { let (disk_paths, ecstore, heal_storage) = setup_test_env().await; - + // Create test bucket and object let bucket_name = "test-bucket"; let object_name = "test-object.txt"; let test_data = b"Hello, this is test data for healing!"; - + create_test_bucket(&ecstore, bucket_name).await; upload_test_object(&ecstore, bucket_name, object_name, test_data).await; - - // Create heal manager - let heal_manager = HealManager::new(heal_storage.clone(), Default::default()); - + + // ─── 1️⃣ delete single data shard file ───────────────────────────────────── + let obj_dir = disk_paths[0].join(bucket_name).join(object_name); + // find part file at depth 2, e.g. ...//part.1 + let target_part = WalkDir::new(&obj_dir) + .min_depth(2) + .max_depth(2) + .into_iter() + .filter_map(Result::ok) + .find(|e| e.file_type().is_file() && e.file_name().to_str().map(|n| n.starts_with("part.")).unwrap_or(false)) + .map(|e| e.into_path()) + .expect("Failed to locate part file to delete"); + + std::fs::remove_file(&target_part).expect("failed to delete part file"); + assert!(!target_part.exists()); + println!("✅ Deleted shard part file: {target_part:?}"); + + // Create heal manager with faster interval + let cfg = HealConfig { + heal_interval: Duration::from_millis(1), + ..Default::default() + }; + let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg)); + heal_manager.start().await.unwrap(); + // Submit heal request for the object let heal_request = HealRequest::new( HealType::Object { @@ -160,46 +197,56 @@ async fn test_heal_object_basic() { }, HealPriority::Normal, ); - + let task_id = heal_manager .submit_heal_request(heal_request) .await .expect("Failed to submit heal request"); - + info!("Submitted heal request with task ID: {}", task_id); - + // Wait for task completion - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - - // Check task status - let task_status = heal_manager.get_task_status(&task_id).await; - assert!(task_status.is_ok()); - - let status = task_status.unwrap(); - info!("Task status: {:?}", status); - - // Verify object still exists and is accessible - let object_exists = heal_storage.object_exists(bucket_name, object_name).await; - assert!(object_exists.is_ok()); - assert!(object_exists.unwrap()); - + tokio::time::sleep(tokio::time::Duration::from_secs(8)).await; + + // Attempt to fetch task status (might be removed if finished) + match heal_manager.get_task_status(&task_id).await { + Ok(status) => info!("Task status: {:?}", status), + Err(e) => info!("Task status not found (likely completed): {}", e), + } + + // ─── 2️⃣ verify each part file is restored ─────── + assert!(target_part.exists()); + // Cleanup cleanup_test_env(&disk_paths).await; - + info!("Heal object basic test passed"); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] async fn test_heal_bucket_basic() { let (disk_paths, ecstore, heal_storage) = setup_test_env().await; - + // Create test bucket let bucket_name = "test-bucket-heal"; create_test_bucket(&ecstore, bucket_name).await; - - // Create heal manager - let heal_manager = HealManager::new(heal_storage.clone(), Default::default()); - + + // ─── 1️⃣ delete bucket dir on disk ────────────── + let broken_bucket_path = disk_paths[0].join(bucket_name); + assert!(broken_bucket_path.exists(), "bucket dir does not exist on disk"); + std::fs::remove_dir_all(&broken_bucket_path).expect("failed to delete bucket dir on disk"); + assert!(!broken_bucket_path.exists(), "bucket dir still exists after deletion"); + println!("✅ Deleted bucket directory on disk: {broken_bucket_path:?}"); + + // Create heal manager with faster interval + let cfg = HealConfig { + heal_interval: Duration::from_millis(1), + ..Default::default() + }; + let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg)); + heal_manager.start().await.unwrap(); + // Submit heal request for the bucket let heal_request = HealRequest::new( HealType::Bucket { @@ -216,99 +263,83 @@ async fn test_heal_bucket_basic() { }, HealPriority::Normal, ); - + let task_id = heal_manager .submit_heal_request(heal_request) .await .expect("Failed to submit bucket heal request"); - + info!("Submitted bucket heal request with task ID: {}", task_id); - + // Wait for task completion tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - - // Check task status - let task_status = heal_manager.get_task_status(&task_id).await; - assert!(task_status.is_ok()); - - let status = task_status.unwrap(); - info!("Bucket heal task status: {:?}", status); - - // Verify bucket still exists - let bucket_exists = heal_storage.get_bucket_info(bucket_name).await; - assert!(bucket_exists.is_ok()); - assert!(bucket_exists.unwrap().is_some()); - + + // Attempt to fetch task status (optional) + if let Ok(status) = heal_manager.get_task_status(&task_id).await { + if status == HealTaskStatus::Completed { + info!("Bucket heal task status: {:?}", status); + } else { + panic!("Bucket heal task status: {status:?}"); + } + } + + // ─── 3️⃣ Verify bucket directory is restored on every disk ─────── + assert!(broken_bucket_path.exists(), "bucket dir does not exist on disk"); + // Cleanup cleanup_test_env(&disk_paths).await; - + info!("Heal bucket basic test passed"); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] async fn test_heal_format_basic() { let (disk_paths, _ecstore, heal_storage) = setup_test_env().await; - - // Create heal manager - let heal_manager = HealManager::new(heal_storage.clone(), Default::default()); - - // Get disk endpoint for testing - let disk_endpoint = Endpoint::try_from(disk_paths[0].to_str().unwrap()) - .expect("Failed to create disk endpoint"); - - // Submit disk heal request (format heal) - let heal_request = HealRequest::new( - HealType::Disk { endpoint: disk_endpoint }, - HealOptions { - dry_run: true, // Use dry run for format heal test - recursive: false, - remove_corrupted: false, - recreate_missing: false, - scan_mode: HEAL_NORMAL_SCAN, - update_parity: false, - timeout: Some(Duration::from_secs(300)), - }, - HealPriority::Normal, - ); - - let task_id = heal_manager - .submit_heal_request(heal_request) - .await - .expect("Failed to submit disk heal request"); - - info!("Submitted disk heal request with task ID: {}", task_id); - + + // ─── 1️⃣ delete format.json on one disk ────────────── + let format_path = disk_paths[0].join(".rustfs.sys").join("format.json"); + assert!(format_path.exists(), "format.json does not exist on disk"); + std::fs::remove_file(&format_path).expect("failed to delete format.json on disk"); + assert!(!format_path.exists(), "format.json still exists after deletion"); + println!("✅ Deleted format.json on disk: {format_path:?}"); + + // Create heal manager with faster interval + let cfg = HealConfig { + heal_interval: Duration::from_secs(2), + ..Default::default() + }; + let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg)); + heal_manager.start().await.unwrap(); + // Wait for task completion - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - - // Check task status - let task_status = heal_manager.get_task_status(&task_id).await; - assert!(task_status.is_ok()); - - let status = task_status.unwrap(); - info!("Disk heal task status: {:?}", status); - + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // ─── 2️⃣ verify format.json is restored ─────── + assert!(format_path.exists(), "format.json does not exist on disk after heal"); + // Cleanup cleanup_test_env(&disk_paths).await; - + info!("Heal format basic test passed"); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] async fn test_heal_storage_api_direct() { let (disk_paths, ecstore, heal_storage) = setup_test_env().await; - + // Test direct heal storage API calls - + // Test heal_format let format_result = heal_storage.heal_format(true).await; // dry run assert!(format_result.is_ok()); info!("Direct heal_format test passed"); - + // Test heal_bucket let bucket_name = "test-bucket-direct"; create_test_bucket(&ecstore, bucket_name).await; - + let heal_opts = rustfs_ecstore::heal::heal_commands::HealOpts { recursive: true, dry_run: true, @@ -320,16 +351,16 @@ async fn test_heal_storage_api_direct() { pool: None, set: None, }; - + let bucket_result = heal_storage.heal_bucket(bucket_name, &heal_opts).await; assert!(bucket_result.is_ok()); info!("Direct heal_bucket test passed"); - + // Test heal_object let object_name = "test-object-direct.txt"; let test_data = b"Test data for direct heal API"; upload_test_object(&ecstore, bucket_name, object_name, test_data).await; - + let object_heal_opts = rustfs_ecstore::heal::heal_commands::HealOpts { recursive: false, dry_run: true, @@ -341,13 +372,15 @@ async fn test_heal_storage_api_direct() { pool: None, set: None, }; - - let object_result = heal_storage.heal_object(bucket_name, object_name, None, &object_heal_opts).await; + + let object_result = heal_storage + .heal_object(bucket_name, object_name, None, &object_heal_opts) + .await; assert!(object_result.is_ok()); info!("Direct heal_object test passed"); - + // Cleanup cleanup_test_env(&disk_paths).await; - + info!("Direct heal storage API test passed"); -} \ No newline at end of file +} diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 87b79eca..851bbf27 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -2757,8 +2757,8 @@ impl SetDisks { false } }; - - for disk in out_dated_disks.iter() { + // write to all disks + for disk in self.disks.read().await.iter() { let writer = create_bitrot_writer( is_inline_buffer, disk.as_ref(), @@ -2821,7 +2821,6 @@ impl SetDisks { // writers.push(None); // } } - // Heal each part. erasure.Heal() will write the healed // part to .rustfs/tmp/uuid/ which needs to be renamed // later to the final location. @@ -2872,7 +2871,6 @@ impl SetDisks { } } } - // Rename from tmp location to the actual location. for (index, disk) in out_dated_disks.iter().enumerate() { if let Some(disk) = disk {