feat(ahm): add HealingTracker support & complete fresh-disk healing

• Introduce ecstore HealingTracker into ahm crate; load/init/save tracker
• Re-implement heal_fresh_disk to use heal_erasure_set with tracker
• Enhance auto-disk scanner: detect unformatted disks via get_disk_id()
• Remove DataUsageCache handling for now
• Refactor imports & types, clean up duplicate constants
This commit is contained in:
junxiang Mu
2025-07-11 18:42:12 +08:00
parent f4973a681c
commit 3409cd8dff
11 changed files with 731 additions and 515 deletions

View File

@@ -24,6 +24,9 @@ pub enum Error {
#[error("Storage error: {0}")]
Storage(#[from] rustfs_ecstore::error::Error),
#[error("Disk error: {0}")]
Disk(#[from] rustfs_ecstore::disk::error::DiskError),
#[error("Configuration error: {0}")]
Config(String),
@@ -88,4 +91,4 @@ impl From<Error> for std::io::Error {
fn from(err: Error) -> Self {
std::io::Error::other(err)
}
}
}

View File

@@ -106,87 +106,88 @@ impl HealEvent {
/// Convert HealEvent to HealRequest
pub fn to_heal_request(&self) -> HealRequest {
match self {
HealEvent::ObjectCorruption { bucket, object, version_id, severity, .. } => {
HealRequest::new(
HealType::Object {
bucket: bucket.clone(),
object: object.clone(),
version_id: version_id.clone(),
},
HealOptions::default(),
Self::severity_to_priority(severity),
)
}
HealEvent::ObjectMissing { bucket, object, version_id, .. } => {
HealRequest::new(
HealType::Object {
bucket: bucket.clone(),
object: object.clone(),
version_id: version_id.clone(),
},
HealOptions::default(),
HealPriority::High,
)
}
HealEvent::MetadataCorruption { bucket, object, .. } => {
HealRequest::new(
HealType::Metadata {
bucket: bucket.clone(),
object: object.clone(),
},
HealOptions::default(),
HealPriority::High,
)
}
HealEvent::DiskStatusChange { endpoint, .. } => {
HealRequest::new(
HealType::Disk {
endpoint: endpoint.clone(),
},
HealOptions::default(),
HealPriority::High,
)
}
HealEvent::ECDecodeFailure { bucket, object, version_id, .. } => {
HealRequest::new(
HealType::ECDecode {
bucket: bucket.clone(),
object: object.clone(),
version_id: version_id.clone(),
},
HealOptions::default(),
HealPriority::Urgent,
)
}
HealEvent::ChecksumMismatch { bucket, object, version_id, .. } => {
HealRequest::new(
HealType::Object {
bucket: bucket.clone(),
object: object.clone(),
version_id: version_id.clone(),
},
HealOptions::default(),
HealPriority::High,
)
}
HealEvent::ObjectCorruption {
bucket,
object,
version_id,
severity,
..
} => HealRequest::new(
HealType::Object {
bucket: bucket.clone(),
object: object.clone(),
version_id: version_id.clone(),
},
HealOptions::default(),
Self::severity_to_priority(severity),
),
HealEvent::ObjectMissing {
bucket,
object,
version_id,
..
} => HealRequest::new(
HealType::Object {
bucket: bucket.clone(),
object: object.clone(),
version_id: version_id.clone(),
},
HealOptions::default(),
HealPriority::High,
),
HealEvent::MetadataCorruption { bucket, object, .. } => HealRequest::new(
HealType::Metadata {
bucket: bucket.clone(),
object: object.clone(),
},
HealOptions::default(),
HealPriority::High,
),
HealEvent::DiskStatusChange { endpoint, .. } => HealRequest::new(
HealType::Disk {
endpoint: endpoint.clone(),
},
HealOptions::default(),
HealPriority::High,
),
HealEvent::ECDecodeFailure {
bucket,
object,
version_id,
..
} => HealRequest::new(
HealType::ECDecode {
bucket: bucket.clone(),
object: object.clone(),
version_id: version_id.clone(),
},
HealOptions::default(),
HealPriority::Urgent,
),
HealEvent::ChecksumMismatch {
bucket,
object,
version_id,
..
} => HealRequest::new(
HealType::Object {
bucket: bucket.clone(),
object: object.clone(),
version_id: version_id.clone(),
},
HealOptions::default(),
HealPriority::High,
),
HealEvent::BucketMetadataCorruption { bucket, .. } => {
HealRequest::new(
HealType::Bucket {
bucket: bucket.clone(),
},
HealOptions::default(),
HealPriority::High,
)
}
HealEvent::MRFMetadataCorruption { meta_path, .. } => {
HealRequest::new(
HealType::MRF {
meta_path: meta_path.clone(),
},
HealOptions::default(),
HealPriority::High,
)
HealRequest::new(HealType::Bucket { bucket: bucket.clone() }, HealOptions::default(), HealPriority::High)
}
HealEvent::MRFMetadataCorruption { meta_path, .. } => HealRequest::new(
HealType::MRF {
meta_path: meta_path.clone(),
},
HealOptions::default(),
HealPriority::High,
),
}
}
@@ -203,29 +204,63 @@ impl HealEvent {
/// Get event description
pub fn description(&self) -> String {
match self {
HealEvent::ObjectCorruption { bucket, object, corruption_type, .. } => {
format!("Object corruption detected: {}/{} - {:?}", bucket, object, corruption_type)
HealEvent::ObjectCorruption {
bucket,
object,
corruption_type,
..
} => {
format!("Object corruption detected: {bucket}/{object} - {corruption_type:?}")
}
HealEvent::ObjectMissing { bucket, object, .. } => {
format!("Object missing: {}/{}", bucket, object)
format!("Object missing: {bucket}/{object}")
}
HealEvent::MetadataCorruption { bucket, object, corruption_type, .. } => {
format!("Metadata corruption: {}/{} - {:?}", bucket, object, corruption_type)
HealEvent::MetadataCorruption {
bucket,
object,
corruption_type,
..
} => {
format!("Metadata corruption: {bucket}/{object} - {corruption_type:?}")
}
HealEvent::DiskStatusChange { endpoint, old_status, new_status, .. } => {
format!("Disk status changed: {:?} {} -> {}", endpoint, old_status, new_status)
HealEvent::DiskStatusChange {
endpoint,
old_status,
new_status,
..
} => {
format!("Disk status changed: {endpoint:?} {old_status} -> {new_status}")
}
HealEvent::ECDecodeFailure { bucket, object, missing_shards, .. } => {
format!("EC decode failure: {}/{} - missing shards: {:?}", bucket, object, missing_shards)
HealEvent::ECDecodeFailure {
bucket,
object,
missing_shards,
..
} => {
format!("EC decode failure: {bucket}/{object} - missing shards: {missing_shards:?}")
}
HealEvent::ChecksumMismatch { bucket, object, expected_checksum, actual_checksum, .. } => {
format!("Checksum mismatch: {}/{} - expected: {}, actual: {}", bucket, object, expected_checksum, actual_checksum)
HealEvent::ChecksumMismatch {
bucket,
object,
expected_checksum,
actual_checksum,
..
} => {
format!(
"Checksum mismatch: {bucket}/{object} - expected: {expected_checksum}, actual: {actual_checksum}"
)
}
HealEvent::BucketMetadataCorruption { bucket, corruption_type, .. } => {
format!("Bucket metadata corruption: {} - {:?}", bucket, corruption_type)
HealEvent::BucketMetadataCorruption {
bucket, corruption_type, ..
} => {
format!("Bucket metadata corruption: {bucket} - {corruption_type:?}")
}
HealEvent::MRFMetadataCorruption { meta_path, corruption_type, .. } => {
format!("MRF metadata corruption: {} - {:?}", meta_path, corruption_type)
HealEvent::MRFMetadataCorruption {
meta_path,
corruption_type,
..
} => {
format!("MRF metadata corruption: {meta_path} - {corruption_type:?}")
}
}
}
@@ -292,27 +327,22 @@ impl HealEventHandler {
/// Filter events by severity
pub fn filter_by_severity(&self, min_severity: Severity) -> Vec<&HealEvent> {
self.events
.iter()
.filter(|event| event.severity() >= min_severity)
.collect()
self.events.iter().filter(|event| event.severity() >= min_severity).collect()
}
/// Filter events by type
pub fn filter_by_type(&self, event_type: &str) -> Vec<&HealEvent> {
self.events
.iter()
.filter(|event| {
match event {
HealEvent::ObjectCorruption { .. } => event_type == "ObjectCorruption",
HealEvent::ObjectMissing { .. } => event_type == "ObjectMissing",
HealEvent::MetadataCorruption { .. } => event_type == "MetadataCorruption",
HealEvent::DiskStatusChange { .. } => event_type == "DiskStatusChange",
HealEvent::ECDecodeFailure { .. } => event_type == "ECDecodeFailure",
HealEvent::ChecksumMismatch { .. } => event_type == "ChecksumMismatch",
HealEvent::BucketMetadataCorruption { .. } => event_type == "BucketMetadataCorruption",
HealEvent::MRFMetadataCorruption { .. } => event_type == "MRFMetadataCorruption",
}
.filter(|event| match event {
HealEvent::ObjectCorruption { .. } => event_type == "ObjectCorruption",
HealEvent::ObjectMissing { .. } => event_type == "ObjectMissing",
HealEvent::MetadataCorruption { .. } => event_type == "MetadataCorruption",
HealEvent::DiskStatusChange { .. } => event_type == "DiskStatusChange",
HealEvent::ECDecodeFailure { .. } => event_type == "ECDecodeFailure",
HealEvent::ChecksumMismatch { .. } => event_type == "ChecksumMismatch",
HealEvent::BucketMetadataCorruption { .. } => event_type == "BucketMetadataCorruption",
HealEvent::MRFMetadataCorruption { .. } => event_type == "MRFMetadataCorruption",
})
.collect()
}
@@ -322,4 +352,4 @@ impl Default for HealEventHandler {
fn default() -> Self {
Self::new(1000)
}
}
}

View File

@@ -18,6 +18,9 @@ use crate::heal::{
storage::HealStorageAPI,
task::{HealRequest, HealTask, HealTaskStatus},
};
use rustfs_ecstore::disk::error::DiskError;
use rustfs_ecstore::disk::DiskAPI;
use rustfs_ecstore::global::GLOBAL_LOCAL_DISK_MAP;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
@@ -122,6 +125,9 @@ impl HealManager {
// start scheduler
self.start_scheduler().await?;
// start auto disk scanner
self.start_auto_disk_scanner().await?;
info!("HealManager started successfully");
Ok(())
}
@@ -253,6 +259,86 @@ impl HealManager {
Ok(())
}
/// Start background task to auto scan local disks and enqueue disk heal requests
async fn start_auto_disk_scanner(&self) -> Result<()> {
let config = self.config.clone();
let heal_queue = self.heal_queue.clone();
let active_heals = self.active_heals.clone();
let cancel_token = self.cancel_token.clone();
tokio::spawn(async move {
let mut interval = interval(config.read().await.heal_interval);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("Auto disk scanner received shutdown signal");
break;
}
_ = interval.tick() => {
// Build list of endpoints that need healing
let mut endpoints = Vec::new();
println!("GLOBAL_LOCAL_DISK_MAP length: {:?}", GLOBAL_LOCAL_DISK_MAP.read().await.len());
for (_, disk_opt) in GLOBAL_LOCAL_DISK_MAP.read().await.iter() {
if let Some(disk) = disk_opt {
// detect unformatted disk via get_disk_id()
if let Err(err) = disk.get_disk_id().await {
if err == DiskError::UnformattedDisk {
endpoints.push(disk.endpoint());
continue;
}
}
// disk currently healing and not finished
if let Some(h) = disk.healing().await {
if !h.finished {
endpoints.push(disk.endpoint());
}
}
}
}
if endpoints.is_empty() {
continue;
}
println!("endpoints length: {:?}", endpoints.len());
for ep in endpoints {
// skip if already queued or healing
let mut skip = false;
{
let queue = heal_queue.lock().await;
if queue.iter().any(|req| matches!(&req.heal_type, crate::heal::task::HealType::Disk { endpoint } if endpoint == &ep)) {
skip = true;
}
}
if !skip {
let active = active_heals.lock().await;
if active.values().any(|task| matches!(&task.heal_type, crate::heal::task::HealType::Disk { endpoint } if endpoint == &ep)) {
skip = true;
}
}
if skip {
continue;
}
// enqueue heal request for this disk
let req = crate::heal::task::HealRequest::new(
crate::heal::task::HealType::Disk { endpoint: ep.clone() },
crate::heal::task::HealOptions::default(),
crate::heal::task::HealPriority::Normal,
);
let mut queue = heal_queue.lock().await;
queue.push_back(req);
info!("Enqueued auto disk heal for endpoint: {}", ep);
}
}
}
}
});
Ok(())
}
/// Process heal queue
async fn process_heal_queue(
heal_queue: &Arc<Mutex<VecDeque<HealRequest>>>,
@@ -321,4 +407,4 @@ impl std::fmt::Debug for HealManager {
.field("queue_length", &"<queue>")
.finish()
}
}
}

View File

@@ -19,4 +19,4 @@ pub mod storage;
pub mod task;
pub use manager::HealManager;
pub use task::{HealOptions, HealPriority, HealRequest, HealTask, HealType};
pub use task::{HealOptions, HealPriority, HealRequest, HealTask, HealType};

View File

@@ -66,7 +66,8 @@ impl HealProgress {
}
pub fn is_completed(&self) -> bool {
self.progress_percentage >= 100.0 || self.objects_scanned > 0 && self.objects_healed + self.objects_failed >= self.objects_scanned
self.progress_percentage >= 100.0
|| self.objects_scanned > 0 && self.objects_healed + self.objects_failed >= self.objects_scanned
}
pub fn get_success_rate(&self) -> f64 {
@@ -97,6 +98,12 @@ pub struct HealStatistics {
pub last_update_time: SystemTime,
}
impl Default for HealStatistics {
fn default() -> Self {
Self::new()
}
}
impl HealStatistics {
pub fn new() -> Self {
Self {
@@ -138,4 +145,4 @@ impl HealStatistics {
0.0
}
}
}
}

View File

@@ -17,8 +17,8 @@ use async_trait::async_trait;
use rustfs_ecstore::{
disk::endpoint::Endpoint,
heal::heal_commands::{HealOpts, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN},
store_api::{BucketInfo, StorageAPI, ObjectIO},
store::ECStore,
store_api::{BucketInfo, ObjectIO, StorageAPI},
};
use rustfs_madmin::heal_commands::HealResultItem;
use std::sync::Arc;
@@ -52,55 +52,61 @@ pub enum DiskStatus {
pub trait HealStorageAPI: Send + Sync {
/// Get object meta
async fn get_object_meta(&self, bucket: &str, object: &str) -> Result<Option<rustfs_ecstore::store_api::ObjectInfo>>;
/// Get object data
async fn get_object_data(&self, bucket: &str, object: &str) -> Result<Option<Vec<u8>>>;
/// Put object data
async fn put_object_data(&self, bucket: &str, object: &str, data: &[u8]) -> Result<()>;
/// Delete object
async fn delete_object(&self, bucket: &str, object: &str) -> Result<()>;
/// Check object integrity
async fn verify_object_integrity(&self, bucket: &str, object: &str) -> Result<bool>;
/// EC decode rebuild
async fn ec_decode_rebuild(&self, bucket: &str, object: &str) -> Result<Vec<u8>>;
/// Get disk status
async fn get_disk_status(&self, endpoint: &Endpoint) -> Result<DiskStatus>;
/// Format disk
async fn format_disk(&self, endpoint: &Endpoint) -> Result<()>;
/// Get bucket info
async fn get_bucket_info(&self, bucket: &str) -> Result<Option<BucketInfo>>;
/// Fix bucket metadata
async fn heal_bucket_metadata(&self, bucket: &str) -> Result<()>;
/// Get all buckets
async fn list_buckets(&self) -> Result<Vec<BucketInfo>>;
/// Check object exists
async fn object_exists(&self, bucket: &str, object: &str) -> Result<bool>;
/// Get object size
async fn get_object_size(&self, bucket: &str, object: &str) -> Result<Option<u64>>;
/// Get object checksum
async fn get_object_checksum(&self, bucket: &str, object: &str) -> Result<Option<String>>;
/// Heal object using ecstore
async fn heal_object(&self, bucket: &str, object: &str, version_id: Option<&str>, opts: &HealOpts) -> Result<(HealResultItem, Option<Error>)>;
async fn heal_object(
&self,
bucket: &str,
object: &str,
version_id: Option<&str>,
opts: &HealOpts,
) -> Result<(HealResultItem, Option<Error>)>;
/// Heal bucket using ecstore
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem>;
/// Heal format using ecstore
async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option<Error>)>;
/// List objects for healing
async fn list_objects_for_heal(&self, bucket: &str, prefix: &str) -> Result<Vec<String>>;
}
@@ -120,7 +126,7 @@ impl ECStoreHealStorage {
impl HealStorageAPI for ECStoreHealStorage {
async fn get_object_meta(&self, bucket: &str, object: &str) -> Result<Option<rustfs_ecstore::store_api::ObjectInfo>> {
debug!("Getting object meta: {}/{}", bucket, object);
match self.ecstore.get_object_info(bucket, object, &Default::default()).await {
Ok(info) => Ok(Some(info)),
Err(e) => {
@@ -129,32 +135,36 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
async fn get_object_data(&self, bucket: &str, object: &str) -> Result<Option<Vec<u8>>> {
debug!("Getting object data: {}/{}", bucket, object);
match (&*self.ecstore).get_object_reader(bucket, object, None, Default::default(), &Default::default()).await {
Ok(mut reader) => {
match reader.read_all().await {
Ok(data) => Ok(Some(data)),
Err(e) => {
error!("Failed to read object data: {}/{} - {}", bucket, object, e);
Err(Error::other(e))
}
match (*self.ecstore)
.get_object_reader(bucket, object, None, Default::default(), &Default::default())
.await
{
Ok(mut reader) => match reader.read_all().await {
Ok(data) => Ok(Some(data)),
Err(e) => {
error!("Failed to read object data: {}/{} - {}", bucket, object, e);
Err(Error::other(e))
}
}
},
Err(e) => {
error!("Failed to get object: {}/{} - {}", bucket, object, e);
Err(Error::other(e))
}
}
}
async fn put_object_data(&self, bucket: &str, object: &str, data: &[u8]) -> Result<()> {
debug!("Putting object data: {}/{} ({} bytes)", bucket, object, data.len());
let mut reader = rustfs_ecstore::store_api::PutObjReader::from_vec(data.to_vec());
match (&*self.ecstore).put_object(bucket, object, &mut reader, &Default::default()).await {
match (*self.ecstore)
.put_object(bucket, object, &mut reader, &Default::default())
.await
{
Ok(_) => {
info!("Successfully put object: {}/{}", bucket, object);
Ok(())
@@ -165,10 +175,10 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
async fn delete_object(&self, bucket: &str, object: &str) -> Result<()> {
debug!("Deleting object: {}/{}", bucket, object);
match self.ecstore.delete_object(bucket, object, Default::default()).await {
Ok(_) => {
info!("Successfully deleted object: {}/{}", bucket, object);
@@ -180,10 +190,10 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
async fn verify_object_integrity(&self, bucket: &str, object: &str) -> Result<bool> {
debug!("Verifying object integrity: {}/{}", bucket, object);
// Try to get object info and data to verify integrity
match self.get_object_meta(bucket, object).await? {
Some(obj_info) => {
@@ -192,7 +202,7 @@ impl HealStorageAPI for ECStoreHealStorage {
warn!("Object has invalid size: {}/{}", bucket, object);
return Ok(false);
}
// Try to read object data to verify it's accessible
match self.get_object_data(bucket, object).await {
Ok(Some(_)) => {
@@ -215,10 +225,10 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
async fn ec_decode_rebuild(&self, bucket: &str, object: &str) -> Result<Vec<u8>> {
debug!("EC decode rebuild: {}/{}", bucket, object);
// Use ecstore's heal_object to rebuild the object
let heal_opts = HealOpts {
recursive: false,
@@ -231,15 +241,15 @@ impl HealStorageAPI for ECStoreHealStorage {
pool: None,
set: None,
};
match self.heal_object(bucket, object, None, &heal_opts).await {
Ok((_result, error)) => {
if error.is_some() {
return Err(Error::TaskExecutionFailed {
message: format!("Heal failed: {:?}", error),
message: format!("Heal failed: {error:?}"),
});
}
// After healing, try to read the object data
match self.get_object_data(bucket, object).await? {
Some(data) => {
@@ -249,7 +259,7 @@ impl HealStorageAPI for ECStoreHealStorage {
None => {
error!("Object not found after heal: {}/{}", bucket, object);
Err(Error::TaskExecutionFailed {
message: format!("Object not found after heal: {}/{}", bucket, object),
message: format!("Object not found after heal: {bucket}/{object}"),
})
}
}
@@ -260,24 +270,24 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
async fn get_disk_status(&self, endpoint: &Endpoint) -> Result<DiskStatus> {
debug!("Getting disk status: {:?}", endpoint);
// TODO: implement disk status check using ecstore
// For now, return Ok status
info!("Disk status check: {:?} - OK", endpoint);
Ok(DiskStatus::Ok)
}
async fn format_disk(&self, endpoint: &Endpoint) -> Result<()> {
debug!("Formatting disk: {:?}", endpoint);
// Use ecstore's heal_format
match self.heal_format(false).await {
Ok((_, error)) => {
if error.is_some() {
return Err(Error::other(format!("Format failed: {:?}", error)));
return Err(Error::other(format!("Format failed: {error:?}")));
}
info!("Successfully formatted disk: {:?}", endpoint);
Ok(())
@@ -288,10 +298,10 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
async fn get_bucket_info(&self, bucket: &str) -> Result<Option<BucketInfo>> {
debug!("Getting bucket info: {}", bucket);
match self.ecstore.get_bucket_info(bucket, &Default::default()).await {
Ok(info) => Ok(Some(info)),
Err(e) => {
@@ -300,10 +310,10 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
async fn heal_bucket_metadata(&self, bucket: &str) -> Result<()> {
debug!("Healing bucket metadata: {}", bucket);
let heal_opts = HealOpts {
recursive: true,
dry_run: false,
@@ -315,7 +325,7 @@ impl HealStorageAPI for ECStoreHealStorage {
pool: None,
set: None,
};
match self.heal_bucket(bucket, &heal_opts).await {
Ok(_) => {
info!("Successfully healed bucket metadata: {}", bucket);
@@ -327,10 +337,10 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
async fn list_buckets(&self) -> Result<Vec<BucketInfo>> {
debug!("Listing buckets");
match self.ecstore.list_bucket(&Default::default()).await {
Ok(buckets) => Ok(buckets),
Err(e) => {
@@ -339,36 +349,34 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
async fn object_exists(&self, bucket: &str, object: &str) -> Result<bool> {
debug!("Checking object exists: {}/{}", bucket, object);
match self.get_object_meta(bucket, object).await {
Ok(Some(_)) => Ok(true),
Ok(None) => Ok(false),
Err(_) => Ok(false),
}
}
async fn get_object_size(&self, bucket: &str, object: &str) -> Result<Option<u64>> {
debug!("Getting object size: {}/{}", bucket, object);
match self.get_object_meta(bucket, object).await {
Ok(Some(obj_info)) => Ok(Some(obj_info.size as u64)),
Ok(None) => Ok(None),
Err(e) => Err(e),
}
}
async fn get_object_checksum(&self, bucket: &str, object: &str) -> Result<Option<String>> {
debug!("Getting object checksum: {}/{}", bucket, object);
match self.get_object_meta(bucket, object).await {
Ok(Some(obj_info)) => {
// Convert checksum bytes to hex string
let checksum = obj_info.checksum.iter()
.map(|b| format!("{:02x}", b))
.collect::<String>();
let checksum = obj_info.checksum.iter().map(|b| format!("{b:02x}")).collect::<String>();
Ok(Some(checksum))
}
Ok(None) => Ok(None),
@@ -376,14 +384,20 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
async fn heal_object(&self, bucket: &str, object: &str, version_id: Option<&str>, opts: &HealOpts) -> Result<(HealResultItem, Option<Error>)> {
async fn heal_object(
&self,
bucket: &str,
object: &str,
version_id: Option<&str>,
opts: &HealOpts,
) -> Result<(HealResultItem, Option<Error>)> {
debug!("Healing object: {}/{}", bucket, object);
let version_id_str = version_id.unwrap_or("");
match self.ecstore.heal_object(bucket, object, version_id_str, opts).await {
Ok((result, ecstore_error)) => {
let error = ecstore_error.map(|e| Error::other(e));
let error = ecstore_error.map(Error::other);
info!("Heal object completed: {}/{} - result: {:?}, error: {:?}", bucket, object, result, error);
Ok((result, error))
}
@@ -393,10 +407,10 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
async fn heal_bucket(&self, bucket: &str, opts: &HealOpts) -> Result<HealResultItem> {
debug!("Healing bucket: {}", bucket);
match self.ecstore.heal_bucket(bucket, opts).await {
Ok(result) => {
info!("Heal bucket completed: {} - result: {:?}", bucket, result);
@@ -408,13 +422,13 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option<Error>)> {
debug!("Healing format (dry_run: {})", dry_run);
match self.ecstore.heal_format(dry_run).await {
Ok((result, ecstore_error)) => {
let error = ecstore_error.map(|e| Error::other(e));
let error = ecstore_error.map(Error::other);
info!("Heal format completed - result: {:?}, error: {:?}", result, error);
Ok((result, error))
}
@@ -424,18 +438,19 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
async fn list_objects_for_heal(&self, bucket: &str, prefix: &str) -> Result<Vec<String>> {
debug!("Listing objects for heal: {}/{}", bucket, prefix);
// Use list_objects_v2 to get objects
match self.ecstore.clone().list_objects_v2(
bucket, prefix, None, None, 1000, false, None
).await {
match self
.ecstore
.clone()
.list_objects_v2(bucket, prefix, None, None, 1000, false, None)
.await
{
Ok(list_info) => {
let objects: Vec<String> = list_info.objects.into_iter()
.map(|obj| obj.name)
.collect();
let objects: Vec<String> = list_info.objects.into_iter().map(|obj| obj.name).collect();
info!("Found {} objects for heal in {}/{}", objects.len(), bucket, prefix);
Ok(objects)
}
@@ -445,4 +460,4 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
}
}

View File

@@ -14,24 +14,25 @@
use crate::error::{Error, Result};
use crate::heal::{progress::HealProgress, storage::HealStorageAPI};
use rustfs_ecstore::config::RUSTFS_CONFIG_PREFIX;
use rustfs_ecstore::disk::endpoint::Endpoint;
use crate::heal::storage::DiskStatus;
use rustfs_ecstore::disk::error::DiskError;
use rustfs_ecstore::disk::{DiskAPI, DiskInfoOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
use rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN;
use rustfs_ecstore::heal::heal_commands::{init_healing_tracker, load_healing_tracker, HealScanMode};
use rustfs_ecstore::new_object_layer_fn;
use rustfs_ecstore::store::get_disk_via_endpoint;
use rustfs_ecstore::store_api::BucketInfo;
use rustfs_utils::path::path_join;
use serde::{Deserialize, Serialize};
use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use std::cmp::Ordering;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use uuid::Uuid;
/// Heal scan mode
pub type HealScanMode = usize;
pub const HEAL_UNKNOWN_SCAN: HealScanMode = 0;
pub const HEAL_NORMAL_SCAN: HealScanMode = 1;
pub const HEAL_DEEP_SCAN: HealScanMode = 2;
/// Heal type
#[derive(Debug, Clone)]
pub enum HealType {
@@ -42,22 +43,13 @@ pub enum HealType {
version_id: Option<String>,
},
/// Bucket heal
Bucket {
bucket: String,
},
Bucket { bucket: String },
/// Disk heal
Disk {
endpoint: Endpoint,
},
Disk { endpoint: Endpoint },
/// Metadata heal
Metadata {
bucket: String,
object: String,
},
Metadata { bucket: String, object: String },
/// MRF heal
MRF {
meta_path: String,
},
MRF { meta_path: String },
/// EC decode heal
ECDecode {
bucket: String,
@@ -119,7 +111,7 @@ impl Default for HealOptions {
}
/// Heal task status
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum HealTaskStatus {
/// Pending
Pending,
@@ -174,27 +166,15 @@ impl HealRequest {
}
pub fn bucket(bucket: String) -> Self {
Self::new(
HealType::Bucket { bucket },
HealOptions::default(),
HealPriority::Normal,
)
Self::new(HealType::Bucket { bucket }, HealOptions::default(), HealPriority::Normal)
}
pub fn disk(endpoint: Endpoint) -> Self {
Self::new(
HealType::Disk { endpoint },
HealOptions::default(),
HealPriority::High,
)
Self::new(HealType::Disk { endpoint }, HealOptions::default(), HealPriority::High)
}
pub fn metadata(bucket: String, object: String) -> Self {
Self::new(
HealType::Metadata { bucket, object },
HealOptions::default(),
HealPriority::High,
)
Self::new(HealType::Metadata { bucket, object }, HealOptions::default(), HealPriority::High)
}
pub fn ec_decode(bucket: String, object: String, version_id: Option<String>) -> Self {
@@ -264,24 +244,20 @@ impl HealTask {
info!("Starting heal task: {} with type: {:?}", self.id, self.heal_type);
let result = match &self.heal_type {
HealType::Object { bucket, object, version_id } => {
self.heal_object(bucket, object, version_id.as_deref()).await
}
HealType::Bucket { bucket } => {
self.heal_bucket(bucket).await
}
HealType::Disk { endpoint } => {
self.heal_disk(endpoint).await
}
HealType::Metadata { bucket, object } => {
self.heal_metadata(bucket, object).await
}
HealType::MRF { meta_path } => {
self.heal_mrf(meta_path).await
}
HealType::ECDecode { bucket, object, version_id } => {
self.heal_ec_decode(bucket, object, version_id.as_deref()).await
}
HealType::Object {
bucket,
object,
version_id,
} => self.heal_object(bucket, object, version_id.as_deref()).await,
HealType::Bucket { bucket } => self.heal_bucket(bucket).await,
HealType::Disk { endpoint } => self.heal_disk(endpoint).await,
HealType::Metadata { bucket, object } => self.heal_metadata(bucket, object).await,
HealType::MRF { meta_path } => self.heal_mrf(meta_path).await,
HealType::ECDecode {
bucket,
object,
version_id,
} => self.heal_ec_decode(bucket, object, version_id.as_deref()).await,
};
// update completed time and status
@@ -298,9 +274,7 @@ impl HealTask {
}
Err(e) => {
let mut status = self.status.write().await;
*status = HealTaskStatus::Failed {
error: e.to_string(),
};
*status = HealTaskStatus::Failed { error: e.to_string() };
error!("Heal task failed: {} with error: {}", self.id, e);
}
}
@@ -327,11 +301,11 @@ impl HealTask {
// specific heal implementation method
async fn heal_object(&self, bucket: &str, object: &str, version_id: Option<&str>) -> Result<()> {
info!("Healing object: {}/{}", bucket, object);
// update progress
{
let mut progress = self.progress.write().await;
progress.set_current_object(Some(format!("{}/{}", bucket, object)));
progress.set_current_object(Some(format!("{bucket}/{object}")));
progress.update_progress(0, 4, 0, 0); // 开始heal总共4个步骤
}
@@ -345,47 +319,24 @@ impl HealTask {
return self.recreate_missing_object(bucket, object, version_id).await;
} else {
return Err(Error::TaskExecutionFailed {
message: format!("Object not found: {}/{}", bucket, object),
message: format!("Object not found: {bucket}/{object}"),
});
}
}
{
let mut progress = self.progress.write().await;
progress.update_progress(1, 4, 0, 0);
progress.update_progress(1, 3, 0, 0);
}
// Step 2: Verify object integrity
info!("Step 2: Verifying object integrity");
let integrity_ok = self.storage.verify_object_integrity(bucket, object).await?;
if integrity_ok {
info!("Object integrity check passed: {}/{}", bucket, object);
{
let mut progress = self.progress.write().await;
progress.update_progress(4, 4, 0, 0);
}
return Ok(());
}
warn!("Object integrity check failed: {}/{}", bucket, object);
{
let mut progress = self.progress.write().await;
progress.update_progress(2, 4, 0, 0);
}
// Step 3: Perform actual heal using ecstore
info!("Step 3: Performing heal using ecstore");
// Step 2: directly call ecstore to perform heal
info!("Step 2: Performing heal using ecstore");
let heal_opts = rustfs_ecstore::heal::heal_commands::HealOpts {
recursive: self.options.recursive,
dry_run: self.options.dry_run,
remove: self.options.remove_corrupted,
recreate: self.options.recreate_missing,
scan_mode: match self.options.scan_mode {
crate::heal::task::HEAL_UNKNOWN_SCAN => rustfs_ecstore::heal::heal_commands::HEAL_UNKNOWN_SCAN,
crate::heal::task::HEAL_NORMAL_SCAN => rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN,
crate::heal::task::HEAL_DEEP_SCAN => rustfs_ecstore::heal::heal_commands::HEAL_DEEP_SCAN,
_ => rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN,
},
scan_mode: self.options.scan_mode,
update_parity: self.options.update_parity,
no_lock: false,
pool: None,
@@ -396,7 +347,7 @@ impl HealTask {
Ok((result, error)) => {
if let Some(e) = error {
error!("Heal operation failed: {}/{} - {}", bucket, object, e);
// If heal failed and remove_corrupted is enabled, delete the corrupted object
if self.options.remove_corrupted {
warn!("Removing corrupted object: {}/{}", bucket, object);
@@ -410,29 +361,34 @@ impl HealTask {
{
let mut progress = self.progress.write().await;
progress.update_progress(4, 4, 0, 0);
progress.update_progress(3, 3, 0, 0);
}
return Err(Error::TaskExecutionFailed {
message: format!("Failed to heal object {}/{}: {}", bucket, object, e),
message: format!("Failed to heal object {bucket}/{object}: {e}"),
});
}
// Step 4: Verify heal result
info!("Step 4: Verifying heal result");
// Step 3: Verify heal result
info!("Step 3: Verifying heal result");
let object_size = result.object_size as u64;
info!("Heal completed successfully: {}/{} ({} bytes, {} drives healed)",
bucket, object, object_size, result.after.drives.len());
info!(
"Heal completed successfully: {}/{} ({} bytes, {} drives healed)",
bucket,
object,
object_size,
result.after.drives.len()
);
{
let mut progress = self.progress.write().await;
progress.update_progress(4, 4, object_size, object_size);
progress.update_progress(3, 3, object_size, object_size);
}
Ok(())
}
Err(e) => {
error!("Heal operation failed: {}/{} - {}", bucket, object, e);
// If heal failed and remove_corrupted is enabled, delete the corrupted object
if self.options.remove_corrupted {
warn!("Removing corrupted object: {}/{}", bucket, object);
@@ -446,11 +402,11 @@ impl HealTask {
{
let mut progress = self.progress.write().await;
progress.update_progress(4, 4, 0, 0);
progress.update_progress(3, 3, 0, 0);
}
Err(Error::TaskExecutionFailed {
message: format!("Failed to heal object {}/{}: {}", bucket, object, e),
message: format!("Failed to heal object {bucket}/{object}: {e}"),
})
}
}
@@ -459,7 +415,7 @@ impl HealTask {
/// Recreate missing object (for EC decode scenarios)
async fn recreate_missing_object(&self, bucket: &str, object: &str, version_id: Option<&str>) -> Result<()> {
info!("Attempting to recreate missing object: {}/{}", bucket, object);
// Use ecstore's heal_object with recreate option
let heal_opts = rustfs_ecstore::heal::heal_commands::HealOpts {
recursive: false,
@@ -478,7 +434,7 @@ impl HealTask {
if let Some(e) = error {
error!("Failed to recreate missing object: {}/{} - {}", bucket, object, e);
return Err(Error::TaskExecutionFailed {
message: format!("Failed to recreate missing object {}/{}: {}", bucket, object, e),
message: format!("Failed to recreate missing object {bucket}/{object}: {e}"),
});
}
@@ -494,7 +450,7 @@ impl HealTask {
Err(e) => {
error!("Failed to recreate missing object: {}/{} - {}", bucket, object, e);
Err(Error::TaskExecutionFailed {
message: format!("Failed to recreate missing object {}/{}: {}", bucket, object, e),
message: format!("Failed to recreate missing object {bucket}/{object}: {e}"),
})
}
}
@@ -502,11 +458,11 @@ impl HealTask {
async fn heal_bucket(&self, bucket: &str) -> Result<()> {
info!("Healing bucket: {}", bucket);
// update progress
{
let mut progress = self.progress.write().await;
progress.set_current_object(Some(format!("bucket: {}", bucket)));
progress.set_current_object(Some(format!("bucket: {bucket}")));
progress.update_progress(0, 3, 0, 0);
}
@@ -516,7 +472,7 @@ impl HealTask {
if !bucket_exists {
warn!("Bucket does not exist: {}", bucket);
return Err(Error::TaskExecutionFailed {
message: format!("Bucket not found: {}", bucket),
message: format!("Bucket not found: {bucket}"),
});
}
@@ -532,12 +488,7 @@ impl HealTask {
dry_run: self.options.dry_run,
remove: self.options.remove_corrupted,
recreate: self.options.recreate_missing,
scan_mode: match self.options.scan_mode {
crate::heal::task::HEAL_UNKNOWN_SCAN => rustfs_ecstore::heal::heal_commands::HEAL_UNKNOWN_SCAN,
crate::heal::task::HEAL_NORMAL_SCAN => rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN,
crate::heal::task::HEAL_DEEP_SCAN => rustfs_ecstore::heal::heal_commands::HEAL_DEEP_SCAN,
_ => rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN,
},
scan_mode: self.options.scan_mode,
update_parity: self.options.update_parity,
no_lock: false,
pool: None,
@@ -547,7 +498,7 @@ impl HealTask {
match self.storage.heal_bucket(bucket, &heal_opts).await {
Ok(result) => {
info!("Bucket heal completed successfully: {} ({} drives)", bucket, result.after.drives.len());
{
let mut progress = self.progress.write().await;
progress.update_progress(3, 3, 0, 0);
@@ -561,7 +512,7 @@ impl HealTask {
progress.update_progress(3, 3, 0, 0);
}
Err(Error::TaskExecutionFailed {
message: format!("Failed to heal bucket {}: {}", bucket, e),
message: format!("Failed to heal bucket {bucket}: {e}"),
})
}
}
@@ -569,33 +520,17 @@ impl HealTask {
async fn heal_disk(&self, endpoint: &Endpoint) -> Result<()> {
info!("Healing disk: {:?}", endpoint);
// update progress
{
let mut progress = self.progress.write().await;
progress.set_current_object(Some(format!("disk: {:?}", endpoint)));
progress.set_current_object(Some(format!("disk: {endpoint:?}")));
progress.update_progress(0, 3, 0, 0);
}
// Step 1: Check disk status
info!("Step 1: Checking disk status");
let disk_status = self.storage.get_disk_status(endpoint).await?;
if disk_status == DiskStatus::Ok {
info!("Disk is already healthy: {:?}", endpoint);
{
let mut progress = self.progress.write().await;
progress.update_progress(3, 3, 0, 0);
}
return Ok(());
}
// Step 1: Perform disk format heal using ecstore
info!("Step 1: Performing disk format heal using ecstore");
{
let mut progress = self.progress.write().await;
progress.update_progress(1, 3, 0, 0);
}
// Step 2: Perform disk heal using ecstore
info!("Step 2: Performing disk heal using ecstore");
match self.storage.heal_format(self.options.dry_run).await {
Ok((result, error)) => {
if let Some(e) = error {
@@ -605,12 +540,21 @@ impl HealTask {
progress.update_progress(3, 3, 0, 0);
}
return Err(Error::TaskExecutionFailed {
message: format!("Failed to heal disk {:?}: {}", endpoint, e),
message: format!("Failed to heal disk {endpoint:?}: {e}"),
});
}
info!("Disk heal completed successfully: {:?} ({} drives)", endpoint, result.after.drives.len());
{
let mut progress = self.progress.write().await;
progress.update_progress(2, 3, 0, 0);
}
// Step 2: Synchronize data/buckets on the fresh disk
info!("Step 2: Healing buckets on fresh disk");
self.heal_fresh_disk(endpoint).await?;
{
let mut progress = self.progress.write().await;
progress.update_progress(3, 3, 0, 0);
@@ -624,7 +568,7 @@ impl HealTask {
progress.update_progress(3, 3, 0, 0);
}
Err(Error::TaskExecutionFailed {
message: format!("Failed to heal disk {:?}: {}", endpoint, e),
message: format!("Failed to heal disk {endpoint:?}: {e}"),
})
}
}
@@ -632,11 +576,11 @@ impl HealTask {
async fn heal_metadata(&self, bucket: &str, object: &str) -> Result<()> {
info!("Healing metadata: {}/{}", bucket, object);
// update progress
{
let mut progress = self.progress.write().await;
progress.set_current_object(Some(format!("metadata: {}/{}", bucket, object)));
progress.set_current_object(Some(format!("metadata: {bucket}/{object}")));
progress.update_progress(0, 3, 0, 0);
}
@@ -646,7 +590,7 @@ impl HealTask {
if !object_exists {
warn!("Object does not exist: {}/{}", bucket, object);
return Err(Error::TaskExecutionFailed {
message: format!("Object not found: {}/{}", bucket, object),
message: format!("Object not found: {bucket}/{object}"),
});
}
@@ -678,12 +622,17 @@ impl HealTask {
progress.update_progress(3, 3, 0, 0);
}
return Err(Error::TaskExecutionFailed {
message: format!("Failed to heal metadata {}/{}: {}", bucket, object, e),
message: format!("Failed to heal metadata {bucket}/{object}: {e}"),
});
}
info!("Metadata heal completed successfully: {}/{} ({} drives)", bucket, object, result.after.drives.len());
info!(
"Metadata heal completed successfully: {}/{} ({} drives)",
bucket,
object,
result.after.drives.len()
);
{
let mut progress = self.progress.write().await;
progress.update_progress(3, 3, 0, 0);
@@ -697,7 +646,7 @@ impl HealTask {
progress.update_progress(3, 3, 0, 0);
}
Err(Error::TaskExecutionFailed {
message: format!("Failed to heal metadata {}/{}: {}", bucket, object, e),
message: format!("Failed to heal metadata {bucket}/{object}: {e}"),
})
}
}
@@ -705,11 +654,11 @@ impl HealTask {
async fn heal_mrf(&self, meta_path: &str) -> Result<()> {
info!("Healing MRF: {}", meta_path);
// update progress
{
let mut progress = self.progress.write().await;
progress.set_current_object(Some(format!("mrf: {}", meta_path)));
progress.set_current_object(Some(format!("mrf: {meta_path}")));
progress.update_progress(0, 2, 0, 0);
}
@@ -717,7 +666,7 @@ impl HealTask {
let parts: Vec<&str> = meta_path.split('/').collect();
if parts.len() < 2 {
return Err(Error::TaskExecutionFailed {
message: format!("Invalid meta path format: {}", meta_path),
message: format!("Invalid meta path format: {meta_path}"),
});
}
@@ -747,12 +696,12 @@ impl HealTask {
progress.update_progress(2, 2, 0, 0);
}
return Err(Error::TaskExecutionFailed {
message: format!("Failed to heal MRF {}: {}", meta_path, e),
message: format!("Failed to heal MRF {meta_path}: {e}"),
});
}
info!("MRF heal completed successfully: {} ({} drives)", meta_path, result.after.drives.len());
{
let mut progress = self.progress.write().await;
progress.update_progress(2, 2, 0, 0);
@@ -766,7 +715,7 @@ impl HealTask {
progress.update_progress(2, 2, 0, 0);
}
Err(Error::TaskExecutionFailed {
message: format!("Failed to heal MRF {}: {}", meta_path, e),
message: format!("Failed to heal MRF {meta_path}: {e}"),
})
}
}
@@ -774,11 +723,11 @@ impl HealTask {
async fn heal_ec_decode(&self, bucket: &str, object: &str, version_id: Option<&str>) -> Result<()> {
info!("Healing EC decode: {}/{}", bucket, object);
// update progress
{
let mut progress = self.progress.write().await;
progress.set_current_object(Some(format!("ec_decode: {}/{}", bucket, object)));
progress.set_current_object(Some(format!("ec_decode: {bucket}/{object}")));
progress.update_progress(0, 3, 0, 0);
}
@@ -788,7 +737,7 @@ impl HealTask {
if !object_exists {
warn!("Object does not exist: {}/{}", bucket, object);
return Err(Error::TaskExecutionFailed {
message: format!("Object not found: {}/{}", bucket, object),
message: format!("Object not found: {bucket}/{object}"),
});
}
@@ -820,14 +769,19 @@ impl HealTask {
progress.update_progress(3, 3, 0, 0);
}
return Err(Error::TaskExecutionFailed {
message: format!("Failed to heal EC decode {}/{}: {}", bucket, object, e),
message: format!("Failed to heal EC decode {bucket}/{object}: {e}"),
});
}
let object_size = result.object_size as u64;
info!("EC decode heal completed successfully: {}/{} ({} bytes, {} drives)",
bucket, object, object_size, result.after.drives.len());
info!(
"EC decode heal completed successfully: {}/{} ({} bytes, {} drives)",
bucket,
object,
object_size,
result.after.drives.len()
);
{
let mut progress = self.progress.write().await;
progress.update_progress(3, 3, object_size, object_size);
@@ -841,11 +795,86 @@ impl HealTask {
progress.update_progress(3, 3, 0, 0);
}
Err(Error::TaskExecutionFailed {
message: format!("Failed to heal EC decode {}/{}: {}", bucket, object, e),
message: format!("Failed to heal EC decode {bucket}/{object}: {e}"),
})
}
}
}
async fn heal_fresh_disk(&self, endpoint: &Endpoint) -> Result<()> {
// Locate disk via endpoint
let disk = get_disk_via_endpoint(endpoint)
.await
.ok_or_else(|| Error::other(format!("Disk not found for endpoint: {endpoint}")))?;
// Skip if drive is root or other fatal errors
if let Err(e) = disk.disk_info(&DiskInfoOptions::default()).await {
match e {
DiskError::DriveIsRoot => return Ok(()),
DiskError::UnformattedDisk => { /* continue healing */ }
_ => return Err(Error::other(e)),
}
}
// Load or init HealingTracker
let mut tracker = match load_healing_tracker(&Some(disk.clone())).await {
Ok(t) => t,
Err(err) => match err {
DiskError::FileNotFound => init_healing_tracker(disk.clone(), &Uuid::new_v4().to_string())
.await
.map_err(Error::other)?,
_ => return Err(Error::other(err)),
},
};
// Build bucket list
let mut buckets = self.storage.list_buckets().await.map_err(Error::other)?;
buckets.push(BucketInfo {
name: path_join(&[PathBuf::from(RUSTFS_META_BUCKET), PathBuf::from(RUSTFS_CONFIG_PREFIX)])
.to_string_lossy()
.to_string(),
..Default::default()
});
buckets.push(BucketInfo {
name: path_join(&[PathBuf::from(RUSTFS_META_BUCKET), PathBuf::from(BUCKET_META_PREFIX)])
.to_string_lossy()
.to_string(),
..Default::default()
});
// Sort: system buckets first, others by creation time desc
buckets.sort_by(|a, b| {
let a_sys = a.name.starts_with(RUSTFS_META_BUCKET);
let b_sys = b.name.starts_with(RUSTFS_META_BUCKET);
match (a_sys, b_sys) {
(true, false) => Ordering::Less,
(false, true) => Ordering::Greater,
_ => b.created.cmp(&a.created),
}
});
// Update tracker queue and persist
tracker.set_queue_buckets(&buckets).await;
tracker.save().await.map_err(Error::other)?;
// Prepare bucket names list
let bucket_names: Vec<String> = buckets.iter().map(|b| b.name.clone()).collect();
// Run heal_erasure_set using underlying SetDisk
let (pool_idx, set_idx) = (endpoint.pool_idx as usize, endpoint.set_idx as usize);
let Some(store) = new_object_layer_fn() else {
return Err(Error::other("errServerNotInitialized"));
};
let set_disk = store.pools[pool_idx].disk_set[set_idx].clone();
let tracker_arc = Arc::new(RwLock::new(tracker));
set_disk
.heal_erasure_set(&bucket_names, tracker_arc)
.await
.map_err(Error::other)?;
Ok(())
}
}
impl std::fmt::Debug for HealTask {
@@ -857,4 +886,4 @@ impl std::fmt::Debug for HealTask {
.field("created_at", &self.created_at)
.finish()
}
}
}

View File

@@ -20,11 +20,11 @@ pub mod heal;
pub mod scanner;
pub use error::{Error, Result};
pub use heal::{HealManager, HealOptions, HealPriority, HealRequest, HealType};
pub use scanner::{
BucketTargetUsageInfo, BucketUsageInfo, DataUsageInfo, Scanner, ScannerMetrics, load_data_usage_from_backend,
store_data_usage_in_backend,
};
pub use heal::{HealManager, HealRequest, HealType, HealOptions, HealPriority};
// Global cancellation token for AHM services (scanner and other background tasks)
static GLOBAL_AHM_SERVICES_CANCEL_TOKEN: OnceLock<CancellationToken> = OnceLock::new();

View File

@@ -32,11 +32,11 @@ use super::{
data_usage::DataUsageInfo,
metrics::{BucketMetrics, DiskMetrics, MetricsCollector, ScannerMetrics},
};
use crate::heal::HealManager;
use crate::{
error::{Error, Result},
get_ahm_services_cancel_token, HealRequest,
};
use crate::heal::HealManager;
use rustfs_ecstore::disk::RUSTFS_META_BUCKET;
@@ -539,12 +539,12 @@ impl Scanner {
Ok(volumes) => volumes,
Err(e) => {
error!("Failed to list volumes on disk {}: {}", disk_path, e);
// 磁盘访问失败提交磁盘heal任务
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
use crate::heal::{HealRequest, HealPriority};
use crate::heal::{HealPriority, HealRequest};
let req = HealRequest::new(
crate::heal::HealType::Disk {
endpoint: disk.endpoint().clone(),
@@ -562,7 +562,7 @@ impl Scanner {
}
}
}
return Err(Error::Storage(e.into()));
}
};
@@ -674,7 +674,7 @@ impl Scanner {
if file_meta.versions.is_empty() {
objects_with_issues += 1;
warn!("Object {} has no versions", entry.name);
// 对象元数据损坏提交元数据heal任务
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
@@ -682,10 +682,16 @@ impl Scanner {
let req = HealRequest::metadata(bucket.to_string(), entry.name.clone());
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("object metadata damaged, submit heal task: {} {} / {}", task_id, bucket, entry.name);
warn!(
"object metadata damaged, submit heal task: {} {} / {}",
task_id, bucket, entry.name
);
}
Err(e) => {
error!("object metadata damaged, submit heal task failed: {} / {} {}", bucket, entry.name, e);
error!(
"object metadata damaged, submit heal task failed: {} / {} {}",
bucket, entry.name, e
);
}
}
}
@@ -697,7 +703,7 @@ impl Scanner {
} else {
objects_with_issues += 1;
warn!("Failed to parse metadata for object {}", entry.name);
// 对象元数据解析失败提交元数据heal任务
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
@@ -705,10 +711,16 @@ impl Scanner {
let req = HealRequest::metadata(bucket.to_string(), entry.name.clone());
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("object metadata parse failed, submit heal task: {} {} / {}", task_id, bucket, entry.name);
warn!(
"object metadata parse failed, submit heal task: {} {} / {}",
task_id, bucket, entry.name
);
}
Err(e) => {
error!("object metadata parse failed, submit heal task failed: {} / {} {}", bucket, entry.name, e);
error!(
"object metadata parse failed, submit heal task failed: {} / {} {}",
bucket, entry.name, e
);
}
}
}
@@ -816,12 +828,12 @@ impl Scanner {
let missing_disks: Vec<usize> = (0..disks.len()).filter(|&i| !locations.contains(&i)).collect();
warn!("Object {}/{} missing from disks: {:?}", bucket, object_name, missing_disks);
println!("Object {bucket}/{object_name} missing from disks: {missing_disks:?}");
// submit heal task
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
use crate::heal::{HealRequest, HealPriority};
use crate::heal::{HealPriority, HealRequest};
let req = HealRequest::new(
crate::heal::HealType::Object {
bucket: bucket.clone(),
@@ -833,8 +845,10 @@ impl Scanner {
);
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("object missing, submit heal task: {} {} / {} (missing disks: {:?})",
task_id, bucket, object_name, missing_disks);
warn!(
"object missing, submit heal task: {} {} / {} (missing disks: {:?})",
task_id, bucket, object_name, missing_disks
);
}
Err(e) => {
error!("object missing, submit heal task failed: {} / {} {}", bucket, object_name, e);
@@ -1142,6 +1156,7 @@ mod tests {
StorageAPI,
store_api::{MakeBucketOptions, ObjectIO, PutObjReader},
};
use serial_test::serial;
use std::fs;
use std::net::SocketAddr;
@@ -1211,7 +1226,7 @@ mod tests {
}
#[tokio::test(flavor = "multi_thread")]
#[ignore]
#[serial]
async fn test_scanner_basic_functionality() {
const TEST_DIR_BASIC: &str = "/tmp/rustfs_ahm_test_basic";
let (disk_paths, ecstore) = prepare_test_env(Some(TEST_DIR_BASIC), Some(9001)).await;
@@ -1309,7 +1324,7 @@ mod tests {
// test data usage statistics collection and validation
#[tokio::test(flavor = "multi_thread")]
#[ignore]
#[serial]
async fn test_scanner_usage_stats() {
const TEST_DIR_USAGE_STATS: &str = "/tmp/rustfs_ahm_test_usage_stats";
let (_, ecstore) = prepare_test_env(Some(TEST_DIR_USAGE_STATS), Some(9002)).await;

View File

@@ -1,25 +1,44 @@
use rustfs_ahm::heal::{
manager::HealManager,
manager::{HealConfig, HealManager},
storage::{ECStoreHealStorage, HealStorageAPI},
task::{HealOptions, HealPriority, HealRequest, HealType, HEAL_NORMAL_SCAN},
task::{HealOptions, HealPriority, HealRequest, HealTaskStatus, HealType},
};
use rustfs_ecstore::{
disk::endpoint::Endpoint,
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
heal::heal_commands::HEAL_NORMAL_SCAN,
store::ECStore,
store_api::{PutObjReader, ObjectOptions, StorageAPI, ObjectIO},
store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
};
use serial_test::serial;
use std::sync::Once;
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::fs;
use tracing::info;
use walkdir::WalkDir;
static INIT: Once = Once::new();
fn init_tracing() {
INIT.call_once(|| {
let _ = tracing_subscriber::fmt::try_init();
});
}
/// Test helper: Create test environment with ECStore
async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage>) {
// create temp dir as 4 disks
let test_base_dir = "/tmp/rustfs_ahm_heal_test";
let temp_dir = std::path::PathBuf::from(test_base_dir);
use std::sync::OnceLock;
init_tracing();
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage>)> = OnceLock::new();
// Fast path: already initialized, just clone and return
if let Some((paths, ecstore, heal_storage)) = GLOBAL_ENV.get() {
return (paths.clone(), ecstore.clone(), heal_storage.clone());
}
// create temp dir as 4 disks with unique base dir
let test_base_dir = format!("/tmp/rustfs_ahm_heal_test_{}", uuid::Uuid::new_v4());
let temp_dir = std::path::PathBuf::from(&test_base_dir);
if temp_dir.exists() {
fs::remove_dir_all(&temp_dir).await.unwrap();
fs::remove_dir_all(&temp_dir).await.ok();
}
fs::create_dir_all(&temp_dir).await.unwrap();
@@ -57,11 +76,11 @@ async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage
let endpoint_pools = EndpointServerPools(vec![pool_endpoints]);
// format disks
// format disks (only first time)
rustfs_ecstore::store::init_local_disks(endpoint_pools.clone()).await.unwrap();
// create ECStore with dynamic port
let port = 9001;
// create ECStore with dynamic port 0 (let OS assign) or fixed 9001 if free
let port = 9001; // for simplicity
let server_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let ecstore = ECStore::new(server_addr, endpoint_pools).await.unwrap();
@@ -78,13 +97,16 @@ async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage
// Create heal storage layer
let heal_storage = Arc::new(ECStoreHealStorage::new(ecstore.clone()));
// Store in global once lock
let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone(), heal_storage.clone()));
(disk_paths, ecstore, heal_storage)
}
/// Test helper: Create a test bucket
async fn create_test_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
(&**ecstore)
(**ecstore)
.make_bucket(bucket_name, &Default::default())
.await
.expect("Failed to create test bucket");
@@ -92,22 +114,14 @@ async fn create_test_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
}
/// Test helper: Upload test object
async fn upload_test_object(
ecstore: &Arc<ECStore>,
bucket: &str,
object: &str,
data: &[u8],
) {
async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str, data: &[u8]) {
let mut reader = PutObjReader::from_vec(data.to_vec());
let object_info = (&**ecstore)
let object_info = (**ecstore)
.put_object(bucket, object, &mut reader, &ObjectOptions::default())
.await
.expect("Failed to upload test object");
info!(
"Uploaded test object: {}/{} ({} bytes)",
bucket, object, object_info.size
);
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
}
/// Test helper: Cleanup test environment
@@ -117,31 +131,54 @@ async fn cleanup_test_env(disk_paths: &[PathBuf]) {
fs::remove_dir_all(disk_path).await.expect("Failed to cleanup disk path");
}
}
// Clean up test base directory
let test_base_dir = PathBuf::from("/tmp/rustfs_ahm_heal_test");
if test_base_dir.exists() {
fs::remove_dir_all(&test_base_dir).await.expect("Failed to cleanup test base directory");
// Attempt to clean up base directory inferred from disk_paths[0]
if let Some(parent) = disk_paths.first().and_then(|p| p.parent()).and_then(|p| p.parent()) {
if parent.exists() {
fs::remove_dir_all(parent).await.ok();
}
}
info!("Test environment cleaned up");
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_heal_object_basic() {
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
// Create test bucket and object
let bucket_name = "test-bucket";
let object_name = "test-object.txt";
let test_data = b"Hello, this is test data for healing!";
create_test_bucket(&ecstore, bucket_name).await;
upload_test_object(&ecstore, bucket_name, object_name, test_data).await;
// Create heal manager
let heal_manager = HealManager::new(heal_storage.clone(), Default::default());
// ─── 1⃣ delete single data shard file ─────────────────────────────────────
let obj_dir = disk_paths[0].join(bucket_name).join(object_name);
// find part file at depth 2, e.g. .../<uuid>/part.1
let target_part = WalkDir::new(&obj_dir)
.min_depth(2)
.max_depth(2)
.into_iter()
.filter_map(Result::ok)
.find(|e| e.file_type().is_file() && e.file_name().to_str().map(|n| n.starts_with("part.")).unwrap_or(false))
.map(|e| e.into_path())
.expect("Failed to locate part file to delete");
std::fs::remove_file(&target_part).expect("failed to delete part file");
assert!(!target_part.exists());
println!("✅ Deleted shard part file: {target_part:?}");
// Create heal manager with faster interval
let cfg = HealConfig {
heal_interval: Duration::from_millis(1),
..Default::default()
};
let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg));
heal_manager.start().await.unwrap();
// Submit heal request for the object
let heal_request = HealRequest::new(
HealType::Object {
@@ -160,46 +197,56 @@ async fn test_heal_object_basic() {
},
HealPriority::Normal,
);
let task_id = heal_manager
.submit_heal_request(heal_request)
.await
.expect("Failed to submit heal request");
info!("Submitted heal request with task ID: {}", task_id);
// Wait for task completion
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// Check task status
let task_status = heal_manager.get_task_status(&task_id).await;
assert!(task_status.is_ok());
let status = task_status.unwrap();
info!("Task status: {:?}", status);
// Verify object still exists and is accessible
let object_exists = heal_storage.object_exists(bucket_name, object_name).await;
assert!(object_exists.is_ok());
assert!(object_exists.unwrap());
tokio::time::sleep(tokio::time::Duration::from_secs(8)).await;
// Attempt to fetch task status (might be removed if finished)
match heal_manager.get_task_status(&task_id).await {
Ok(status) => info!("Task status: {:?}", status),
Err(e) => info!("Task status not found (likely completed): {}", e),
}
// ─── 2⃣ verify each part file is restored ───────
assert!(target_part.exists());
// Cleanup
cleanup_test_env(&disk_paths).await;
info!("Heal object basic test passed");
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_heal_bucket_basic() {
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
// Create test bucket
let bucket_name = "test-bucket-heal";
create_test_bucket(&ecstore, bucket_name).await;
// Create heal manager
let heal_manager = HealManager::new(heal_storage.clone(), Default::default());
// ─── 1⃣ delete bucket dir on disk ──────────────
let broken_bucket_path = disk_paths[0].join(bucket_name);
assert!(broken_bucket_path.exists(), "bucket dir does not exist on disk");
std::fs::remove_dir_all(&broken_bucket_path).expect("failed to delete bucket dir on disk");
assert!(!broken_bucket_path.exists(), "bucket dir still exists after deletion");
println!("✅ Deleted bucket directory on disk: {broken_bucket_path:?}");
// Create heal manager with faster interval
let cfg = HealConfig {
heal_interval: Duration::from_millis(1),
..Default::default()
};
let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg));
heal_manager.start().await.unwrap();
// Submit heal request for the bucket
let heal_request = HealRequest::new(
HealType::Bucket {
@@ -216,99 +263,83 @@ async fn test_heal_bucket_basic() {
},
HealPriority::Normal,
);
let task_id = heal_manager
.submit_heal_request(heal_request)
.await
.expect("Failed to submit bucket heal request");
info!("Submitted bucket heal request with task ID: {}", task_id);
// Wait for task completion
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// Check task status
let task_status = heal_manager.get_task_status(&task_id).await;
assert!(task_status.is_ok());
let status = task_status.unwrap();
info!("Bucket heal task status: {:?}", status);
// Verify bucket still exists
let bucket_exists = heal_storage.get_bucket_info(bucket_name).await;
assert!(bucket_exists.is_ok());
assert!(bucket_exists.unwrap().is_some());
// Attempt to fetch task status (optional)
if let Ok(status) = heal_manager.get_task_status(&task_id).await {
if status == HealTaskStatus::Completed {
info!("Bucket heal task status: {:?}", status);
} else {
panic!("Bucket heal task status: {status:?}");
}
}
// ─── 3⃣ Verify bucket directory is restored on every disk ───────
assert!(broken_bucket_path.exists(), "bucket dir does not exist on disk");
// Cleanup
cleanup_test_env(&disk_paths).await;
info!("Heal bucket basic test passed");
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_heal_format_basic() {
let (disk_paths, _ecstore, heal_storage) = setup_test_env().await;
// Create heal manager
let heal_manager = HealManager::new(heal_storage.clone(), Default::default());
// Get disk endpoint for testing
let disk_endpoint = Endpoint::try_from(disk_paths[0].to_str().unwrap())
.expect("Failed to create disk endpoint");
// Submit disk heal request (format heal)
let heal_request = HealRequest::new(
HealType::Disk { endpoint: disk_endpoint },
HealOptions {
dry_run: true, // Use dry run for format heal test
recursive: false,
remove_corrupted: false,
recreate_missing: false,
scan_mode: HEAL_NORMAL_SCAN,
update_parity: false,
timeout: Some(Duration::from_secs(300)),
},
HealPriority::Normal,
);
let task_id = heal_manager
.submit_heal_request(heal_request)
.await
.expect("Failed to submit disk heal request");
info!("Submitted disk heal request with task ID: {}", task_id);
// ─── 1⃣ delete format.json on one disk ──────────────
let format_path = disk_paths[0].join(".rustfs.sys").join("format.json");
assert!(format_path.exists(), "format.json does not exist on disk");
std::fs::remove_file(&format_path).expect("failed to delete format.json on disk");
assert!(!format_path.exists(), "format.json still exists after deletion");
println!("✅ Deleted format.json on disk: {format_path:?}");
// Create heal manager with faster interval
let cfg = HealConfig {
heal_interval: Duration::from_secs(2),
..Default::default()
};
let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg));
heal_manager.start().await.unwrap();
// Wait for task completion
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
// Check task status
let task_status = heal_manager.get_task_status(&task_id).await;
assert!(task_status.is_ok());
let status = task_status.unwrap();
info!("Disk heal task status: {:?}", status);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// ─── 2⃣ verify format.json is restored ───────
assert!(format_path.exists(), "format.json does not exist on disk after heal");
// Cleanup
cleanup_test_env(&disk_paths).await;
info!("Heal format basic test passed");
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_heal_storage_api_direct() {
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
// Test direct heal storage API calls
// Test heal_format
let format_result = heal_storage.heal_format(true).await; // dry run
assert!(format_result.is_ok());
info!("Direct heal_format test passed");
// Test heal_bucket
let bucket_name = "test-bucket-direct";
create_test_bucket(&ecstore, bucket_name).await;
let heal_opts = rustfs_ecstore::heal::heal_commands::HealOpts {
recursive: true,
dry_run: true,
@@ -320,16 +351,16 @@ async fn test_heal_storage_api_direct() {
pool: None,
set: None,
};
let bucket_result = heal_storage.heal_bucket(bucket_name, &heal_opts).await;
assert!(bucket_result.is_ok());
info!("Direct heal_bucket test passed");
// Test heal_object
let object_name = "test-object-direct.txt";
let test_data = b"Test data for direct heal API";
upload_test_object(&ecstore, bucket_name, object_name, test_data).await;
let object_heal_opts = rustfs_ecstore::heal::heal_commands::HealOpts {
recursive: false,
dry_run: true,
@@ -341,13 +372,15 @@ async fn test_heal_storage_api_direct() {
pool: None,
set: None,
};
let object_result = heal_storage.heal_object(bucket_name, object_name, None, &object_heal_opts).await;
let object_result = heal_storage
.heal_object(bucket_name, object_name, None, &object_heal_opts)
.await;
assert!(object_result.is_ok());
info!("Direct heal_object test passed");
// Cleanup
cleanup_test_env(&disk_paths).await;
info!("Direct heal storage API test passed");
}
}

View File

@@ -2757,8 +2757,8 @@ impl SetDisks {
false
}
};
for disk in out_dated_disks.iter() {
// write to all disks
for disk in self.disks.read().await.iter() {
let writer = create_bitrot_writer(
is_inline_buffer,
disk.as_ref(),
@@ -2821,7 +2821,6 @@ impl SetDisks {
// writers.push(None);
// }
}
// Heal each part. erasure.Heal() will write the healed
// part to .rustfs/tmp/uuid/ which needs to be renamed
// later to the final location.
@@ -2872,7 +2871,6 @@ impl SetDisks {
}
}
}
// Rename from tmp location to the actual location.
for (index, disk) in out_dated_disks.iter().enumerate() {
if let Some(disk) = disk {