From aed8f5242300a76a9a523bedb40616f759024fbe Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Fri, 18 Jul 2025 12:06:24 +0800 Subject: [PATCH] refactor: integrate disk healing into erasure set healing - Remove HealType::Disk and related disk-specific healing methods - Integrate disk format healing into heal_erasure_set with include_format_heal option - Update auto disk scanner to use ErasureSet heal type instead of Disk heal - Fix disk status change event handling to use ErasureSet heal requests - Add proper bucket list retrieval for auto healing scenarios - Update data scanner to submit ErasureSet heal tasks for offline disks - Remove duplicate healing logic between Disk and ErasureSet types - Ensure all healing operations go through unified ErasureSet healing path --- crates/ahm/src/heal/erasure_healer.rs | 458 ++++++++++++++++ crates/ahm/src/heal/event.rs | 20 +- crates/ahm/src/heal/manager.rs | 30 +- crates/ahm/src/heal/mod.rs | 4 + crates/ahm/src/heal/resume.rs | 696 +++++++++++++++++++++++++ crates/ahm/src/heal/storage.rs | 45 +- crates/ahm/src/heal/task.rs | 227 +++----- crates/ahm/src/scanner/data_scanner.rs | 69 ++- crates/ecstore/src/set_disk.rs | 51 +- 9 files changed, 1381 insertions(+), 219 deletions(-) create mode 100644 crates/ahm/src/heal/erasure_healer.rs create mode 100644 crates/ahm/src/heal/resume.rs diff --git a/crates/ahm/src/heal/erasure_healer.rs b/crates/ahm/src/heal/erasure_healer.rs new file mode 100644 index 00000000..210648c4 --- /dev/null +++ b/crates/ahm/src/heal/erasure_healer.rs @@ -0,0 +1,458 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{Error, Result}; +use crate::heal::{ + progress::HealProgress, + resume::{CheckpointManager, ResumeManager, ResumeUtils}, + storage::HealStorageAPI, +}; +use futures::future::join_all; +use rustfs_ecstore::{ + disk::DiskStore, + heal::heal_commands::{HealOpts, HEAL_NORMAL_SCAN}, +}; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::{error, info, warn}; + +/// Erasure Set Healer +pub struct ErasureSetHealer { + storage: Arc, + progress: Arc>, + cancel_token: tokio_util::sync::CancellationToken, + disk: DiskStore, +} + +impl ErasureSetHealer { + pub fn new( + storage: Arc, + progress: Arc>, + cancel_token: tokio_util::sync::CancellationToken, + disk: DiskStore, + ) -> Self { + Self { + storage, + progress, + cancel_token, + disk, + } + } + + /// execute erasure set heal with resume + pub async fn heal_erasure_set(&self, buckets: &[String], set_disk_id: &str) -> Result<()> { + info!("Starting erasure set heal for {} buckets on set disk {}", buckets.len(), set_disk_id); + + // 1. generate or get task id + let task_id = self.get_or_create_task_id(set_disk_id).await?; + + // 2. initialize or resume resume state + let (resume_manager, checkpoint_manager) = self.initialize_resume_state(&task_id, buckets).await?; + + // 3. execute heal with resume + let result = self + .execute_heal_with_resume(buckets, &resume_manager, &checkpoint_manager) + .await; + + // 4. cleanup resume state + if result.is_ok() { + if let Err(e) = resume_manager.cleanup().await { + warn!("Failed to cleanup resume state: {}", e); + } + if let Err(e) = checkpoint_manager.cleanup().await { + warn!("Failed to cleanup checkpoint: {}", e); + } + } + + result + } + + /// get or create task id + async fn get_or_create_task_id(&self, _set_disk_id: &str) -> Result { + // check if there are resumable tasks + let resumable_tasks = ResumeUtils::get_resumable_tasks(&self.disk).await?; + + for task_id in resumable_tasks { + if ResumeUtils::can_resume_task(&self.disk, &task_id).await { + info!("Found resumable task: {}", task_id); + return Ok(task_id); + } + } + + // create new task id + let task_id = ResumeUtils::generate_task_id(); + info!("Created new heal task: {}", task_id); + Ok(task_id) + } + + /// initialize or resume resume state + async fn initialize_resume_state(&self, task_id: &str, buckets: &[String]) -> Result<(ResumeManager, CheckpointManager)> { + // check if resume state exists + if ResumeManager::has_resume_state(&self.disk, task_id).await { + info!("Loading existing resume state for task: {}", task_id); + + let resume_manager = ResumeManager::load_from_disk(self.disk.clone(), task_id).await?; + let checkpoint_manager = if CheckpointManager::has_checkpoint(&self.disk, task_id).await { + CheckpointManager::load_from_disk(self.disk.clone(), task_id).await? + } else { + CheckpointManager::new(self.disk.clone(), task_id.to_string()).await? + }; + + Ok((resume_manager, checkpoint_manager)) + } else { + info!("Creating new resume state for task: {}", task_id); + + let resume_manager = + ResumeManager::new(self.disk.clone(), task_id.to_string(), "erasure_set".to_string(), buckets.to_vec()).await?; + + let checkpoint_manager = CheckpointManager::new(self.disk.clone(), task_id.to_string()).await?; + + Ok((resume_manager, checkpoint_manager)) + } + } + + /// execute heal with resume + async fn execute_heal_with_resume( + &self, + buckets: &[String], + resume_manager: &ResumeManager, + checkpoint_manager: &CheckpointManager, + ) -> Result<()> { + // 1. get current state + let state = resume_manager.get_state().await; + let checkpoint = checkpoint_manager.get_checkpoint().await; + + info!( + "Resuming from bucket {} object {}", + checkpoint.current_bucket_index, checkpoint.current_object_index + ); + + // 2. initialize progress + self.initialize_progress(buckets, &state).await; + + // 3. continue from checkpoint + let current_bucket_index = checkpoint.current_bucket_index; + let mut current_object_index = checkpoint.current_object_index; + + let mut processed_objects = state.processed_objects; + let mut successful_objects = state.successful_objects; + let mut failed_objects = state.failed_objects; + let mut skipped_objects = state.skipped_objects; + + // 4. process remaining buckets + for (bucket_idx, bucket) in buckets.iter().enumerate().skip(current_bucket_index) { + // check if completed + if state.completed_buckets.contains(bucket) { + continue; + } + + // update current bucket + resume_manager.set_current_item(Some(bucket.clone()), None).await?; + + // process objects in bucket + let bucket_result = self + .heal_bucket_with_resume( + bucket, + &mut current_object_index, + &mut processed_objects, + &mut successful_objects, + &mut failed_objects, + &mut skipped_objects, + resume_manager, + checkpoint_manager, + ) + .await; + + // update checkpoint position + checkpoint_manager.update_position(bucket_idx, current_object_index).await?; + + // update progress + resume_manager + .update_progress(processed_objects, successful_objects, failed_objects, skipped_objects) + .await?; + + // check cancel status + if self.cancel_token.is_cancelled() { + info!("Heal task cancelled"); + return Err(Error::TaskCancelled); + } + + // process bucket result + match bucket_result { + Ok(_) => { + resume_manager.complete_bucket(bucket).await?; + info!("Completed heal for bucket: {}", bucket); + } + Err(e) => { + error!("Failed to heal bucket {}: {}", bucket, e); + // continue to next bucket, do not interrupt the whole process + } + } + + // reset object index + current_object_index = 0; + } + + // 5. mark task completed + resume_manager.mark_completed().await?; + + info!("Erasure set heal completed successfully"); + Ok(()) + } + + /// heal single bucket with resume + #[allow(clippy::too_many_arguments)] + async fn heal_bucket_with_resume( + &self, + bucket: &str, + current_object_index: &mut usize, + processed_objects: &mut u64, + successful_objects: &mut u64, + failed_objects: &mut u64, + _skipped_objects: &mut u64, + resume_manager: &ResumeManager, + checkpoint_manager: &CheckpointManager, + ) -> Result<()> { + info!("Starting heal for bucket: {} from object index {}", bucket, current_object_index); + + // 1. get bucket info + let _bucket_info = match self.storage.get_bucket_info(bucket).await? { + Some(info) => info, + None => { + warn!("Bucket {} not found, skipping", bucket); + return Ok(()); + } + }; + + // 2. get objects to heal + let objects = self.storage.list_objects_for_heal(bucket, "").await?; + + // 3. continue from checkpoint + for (obj_idx, object) in objects.iter().enumerate().skip(*current_object_index) { + // check if already processed + if checkpoint_manager.get_checkpoint().await.processed_objects.contains(object) { + continue; + } + + // update current object + resume_manager + .set_current_item(Some(bucket.to_string()), Some(object.clone())) + .await?; + + // heal object + let heal_opts = HealOpts { + scan_mode: HEAL_NORMAL_SCAN, + remove: true, + recreate: true, + ..Default::default() + }; + + match self.storage.heal_object(bucket, object, None, &heal_opts).await { + Ok((_result, None)) => { + *successful_objects += 1; + checkpoint_manager.add_processed_object(object.clone()).await?; + info!("Successfully healed object {}/{}", bucket, object); + } + Ok((_, Some(err))) => { + *failed_objects += 1; + checkpoint_manager.add_failed_object(object.clone()).await?; + warn!("Failed to heal object {}/{}: {}", bucket, object, err); + } + Err(err) => { + *failed_objects += 1; + checkpoint_manager.add_failed_object(object.clone()).await?; + warn!("Error healing object {}/{}: {}", bucket, object, err); + } + } + + *processed_objects += 1; + *current_object_index = obj_idx + 1; + + // check cancel status + if self.cancel_token.is_cancelled() { + info!("Heal task cancelled during object processing"); + return Err(Error::TaskCancelled); + } + + // save checkpoint periodically + if obj_idx % 100 == 0 { + checkpoint_manager.update_position(0, *current_object_index).await?; + } + } + + Ok(()) + } + + /// initialize progress tracking + async fn initialize_progress(&self, _buckets: &[String], state: &crate::heal::resume::ResumeState) { + let mut progress = self.progress.write().await; + progress.objects_scanned = state.total_objects; + progress.objects_healed = state.successful_objects; + progress.objects_failed = state.failed_objects; + progress.bytes_processed = 0; // set to 0 for now, can be extended later + progress.set_current_object(state.current_object.clone()); + } + + /// heal all buckets concurrently + #[allow(dead_code)] + async fn heal_buckets_concurrently(&self, buckets: &[String]) -> Vec> { + // use semaphore to control concurrency, avoid too many concurrent healings + let semaphore = Arc::new(tokio::sync::Semaphore::new(4)); // max 4 concurrent healings + + let heal_futures = buckets.iter().map(|bucket| { + let bucket = bucket.clone(); + let storage = self.storage.clone(); + let progress = self.progress.clone(); + let semaphore = semaphore.clone(); + let cancel_token = self.cancel_token.clone(); + + async move { + let _permit = semaphore.acquire().await.unwrap(); + + if cancel_token.is_cancelled() { + return Err(Error::TaskCancelled); + } + + Self::heal_single_bucket(&storage, &bucket, &progress).await + } + }); + + // use join_all to process concurrently + join_all(heal_futures).await + } + + /// heal single bucket + #[allow(dead_code)] + async fn heal_single_bucket( + storage: &Arc, + bucket: &str, + progress: &Arc>, + ) -> Result<()> { + info!("Starting heal for bucket: {}", bucket); + + // 1. get bucket info + let _bucket_info = match storage.get_bucket_info(bucket).await? { + Some(info) => info, + None => { + warn!("Bucket {} not found, skipping", bucket); + return Ok(()); + } + }; + + // 2. get objects to heal + let objects = storage.list_objects_for_heal(bucket, "").await?; + + // 3. update progress + { + let mut p = progress.write().await; + p.objects_scanned += objects.len() as u64; + } + + // 4. heal objects concurrently + let heal_opts = HealOpts { + scan_mode: HEAL_NORMAL_SCAN, + remove: true, // remove corrupted data + recreate: true, // recreate missing data + ..Default::default() + }; + + let object_results = Self::heal_objects_concurrently(storage, bucket, &objects, &heal_opts, progress).await; + + // 5. count results + let (success_count, failure_count) = object_results + .into_iter() + .fold((0, 0), |(success, failure), result| match result { + Ok(_) => (success + 1, failure), + Err(_) => (success, failure + 1), + }); + + // 6. update progress + { + let mut p = progress.write().await; + p.objects_healed += success_count; + p.objects_failed += failure_count; + p.set_current_object(Some(format!("completed bucket: {bucket}"))); + } + + info!( + "Completed heal for bucket {}: {} success, {} failures", + bucket, success_count, failure_count + ); + + Ok(()) + } + + /// heal objects concurrently + #[allow(dead_code)] + async fn heal_objects_concurrently( + storage: &Arc, + bucket: &str, + objects: &[String], + heal_opts: &HealOpts, + _progress: &Arc>, + ) -> Vec> { + // use semaphore to control object healing concurrency + let semaphore = Arc::new(tokio::sync::Semaphore::new(8)); // max 8 concurrent object healings + + let heal_futures = objects.iter().map(|object| { + let object = object.clone(); + let bucket = bucket.to_string(); + let storage = storage.clone(); + let heal_opts = *heal_opts; + let semaphore = semaphore.clone(); + + async move { + let _permit = semaphore.acquire().await.unwrap(); + + match storage.heal_object(&bucket, &object, None, &heal_opts).await { + Ok((_result, None)) => { + info!("Successfully healed object {}/{}", bucket, object); + Ok(()) + } + Ok((_, Some(err))) => { + warn!("Failed to heal object {}/{}: {}", bucket, object, err); + Err(Error::other(err)) + } + Err(err) => { + warn!("Error healing object {}/{}: {}", bucket, object, err); + Err(err) + } + } + } + }); + + join_all(heal_futures).await + } + + /// process results + #[allow(dead_code)] + async fn process_results(&self, results: Vec>) -> Result<()> { + let (success_count, failure_count): (usize, usize) = + results.into_iter().fold((0, 0), |(success, failure), result| match result { + Ok(_) => (success + 1, failure), + Err(_) => (success, failure + 1), + }); + + let total = success_count + failure_count; + + info!("Erasure set heal completed: {}/{} buckets successful", success_count, total); + + if failure_count > 0 { + warn!("{} buckets failed to heal", failure_count); + return Err(Error::other(format!("{failure_count} buckets failed to heal"))); + } + + Ok(()) + } +} diff --git a/crates/ahm/src/heal/event.rs b/crates/ahm/src/heal/event.rs index 11705385..4e773d1d 100644 --- a/crates/ahm/src/heal/event.rs +++ b/crates/ahm/src/heal/event.rs @@ -143,13 +143,19 @@ impl HealEvent { HealOptions::default(), HealPriority::High, ), - HealEvent::DiskStatusChange { endpoint, .. } => HealRequest::new( - HealType::Disk { - endpoint: endpoint.clone(), - }, - HealOptions::default(), - HealPriority::High, - ), + HealEvent::DiskStatusChange { endpoint, .. } => { + // Convert disk status change to erasure set heal + // Note: This requires access to storage to get bucket list, which is not available here + // The actual bucket list will need to be provided by the caller or retrieved differently + HealRequest::new( + HealType::ErasureSet { + buckets: vec![], // Empty bucket list - caller should populate this + set_disk_id: format!("{}_{}", endpoint.pool_idx, endpoint.set_idx), + }, + HealOptions::default(), + HealPriority::High, + ) + } HealEvent::ECDecodeFailure { bucket, object, diff --git a/crates/ahm/src/heal/manager.rs b/crates/ahm/src/heal/manager.rs index 3a267839..0fd8c7ab 100644 --- a/crates/ahm/src/heal/manager.rs +++ b/crates/ahm/src/heal/manager.rs @@ -259,12 +259,13 @@ impl HealManager { Ok(()) } - /// Start background task to auto scan local disks and enqueue disk heal requests + /// Start background task to auto scan local disks and enqueue erasure set heal requests async fn start_auto_disk_scanner(&self) -> Result<()> { let config = self.config.clone(); let heal_queue = self.heal_queue.clone(); let active_heals = self.active_heals.clone(); let cancel_token = self.cancel_token.clone(); + let storage = self.storage.clone(); tokio::spawn(async move { let mut interval = interval(config.read().await.heal_interval); @@ -300,18 +301,28 @@ impl HealManager { continue; } + // Get bucket list for erasure set healing + let buckets = match storage.list_buckets().await { + Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::>(), + Err(e) => { + error!("Failed to get bucket list for auto healing: {}", e); + continue; + } + }; + + // Create erasure set heal requests for each endpoint for ep in endpoints { // skip if already queued or healing let mut skip = false; { let queue = heal_queue.lock().await; - if queue.iter().any(|req| matches!(&req.heal_type, crate::heal::task::HealType::Disk { endpoint } if endpoint == &ep)) { + if queue.iter().any(|req| matches!(&req.heal_type, crate::heal::task::HealType::ErasureSet { set_disk_id, .. } if set_disk_id == &format!("{}_{}", ep.pool_idx, ep.set_idx))) { skip = true; } } if !skip { let active = active_heals.lock().await; - if active.values().any(|task| matches!(&task.heal_type, crate::heal::task::HealType::Disk { endpoint } if endpoint == &ep)) { + if active.values().any(|task| matches!(&task.heal_type, crate::heal::task::HealType::ErasureSet { set_disk_id, .. } if set_disk_id == &format!("{}_{}", ep.pool_idx, ep.set_idx))) { skip = true; } } @@ -320,15 +331,19 @@ impl HealManager { continue; } - // enqueue heal request for this disk + // enqueue erasure set heal request for this disk + let set_disk_id = format!("{}_{}", ep.pool_idx, ep.set_idx); let req = crate::heal::task::HealRequest::new( - crate::heal::task::HealType::Disk { endpoint: ep.clone() }, + crate::heal::task::HealType::ErasureSet { + buckets: buckets.clone(), + set_disk_id: set_disk_id.clone() + }, crate::heal::task::HealOptions::default(), crate::heal::task::HealPriority::Normal, ); let mut queue = heal_queue.lock().await; queue.push_back(req); - info!("Enqueued auto disk heal for endpoint: {}", ep); + info!("Enqueued auto erasure set heal for endpoint: {} (set_disk_id: {})", ep, set_disk_id); } } } @@ -365,7 +380,8 @@ impl HealManager { // start heal task tokio::spawn(async move { info!("Starting heal task: {}", task_id); - match task.execute().await { + let result = task.execute().await; + match result { Ok(_) => { info!("Heal task completed successfully: {}", task_id); } diff --git a/crates/ahm/src/heal/mod.rs b/crates/ahm/src/heal/mod.rs index f209d450..aaa57d7a 100644 --- a/crates/ahm/src/heal/mod.rs +++ b/crates/ahm/src/heal/mod.rs @@ -13,11 +13,15 @@ // limitations under the License. pub mod channel; +pub mod erasure_healer; pub mod event; pub mod manager; pub mod progress; +pub mod resume; pub mod storage; pub mod task; +pub use erasure_healer::ErasureSetHealer; pub use manager::HealManager; +pub use resume::{CheckpointManager, ResumeCheckpoint, ResumeManager, ResumeState, ResumeUtils}; pub use task::{HealOptions, HealPriority, HealRequest, HealTask, HealType}; diff --git a/crates/ahm/src/heal/resume.rs b/crates/ahm/src/heal/resume.rs new file mode 100644 index 00000000..260a2243 --- /dev/null +++ b/crates/ahm/src/heal/resume.rs @@ -0,0 +1,696 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{Error, Result}; +use rustfs_ecstore::disk::{DiskAPI, DiskStore, BUCKET_META_PREFIX, RUSTFS_META_BUCKET}; +use serde::{Deserialize, Serialize}; +use std::path::Path; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::RwLock; +use tracing::{debug, info, warn}; +use uuid::Uuid; + +/// resume state file constants +const RESUME_STATE_FILE: &str = "ahm_resume_state.json"; +const RESUME_PROGRESS_FILE: &str = "ahm_progress.json"; +const RESUME_CHECKPOINT_FILE: &str = "ahm_checkpoint.json"; + +/// resume state +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ResumeState { + /// task id + pub task_id: String, + /// task type + pub task_type: String, + /// start time + pub start_time: u64, + /// last update time + pub last_update: u64, + /// completed + pub completed: bool, + /// total objects + pub total_objects: u64, + /// processed objects + pub processed_objects: u64, + /// successful objects + pub successful_objects: u64, + /// failed objects + pub failed_objects: u64, + /// skipped objects + pub skipped_objects: u64, + /// current bucket + pub current_bucket: Option, + /// current object + pub current_object: Option, + /// completed buckets + pub completed_buckets: Vec, + /// pending buckets + pub pending_buckets: Vec, + /// error message + pub error_message: Option, + /// retry count + pub retry_count: u32, + /// max retries + pub max_retries: u32, +} + +impl ResumeState { + pub fn new(task_id: String, task_type: String, buckets: Vec) -> Self { + Self { + task_id, + task_type, + start_time: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), + last_update: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), + completed: false, + total_objects: 0, + processed_objects: 0, + successful_objects: 0, + failed_objects: 0, + skipped_objects: 0, + current_bucket: None, + current_object: None, + completed_buckets: Vec::new(), + pending_buckets: buckets, + error_message: None, + retry_count: 0, + max_retries: 3, + } + } + + pub fn update_progress(&mut self, processed: u64, successful: u64, failed: u64, skipped: u64) { + self.processed_objects = processed; + self.successful_objects = successful; + self.failed_objects = failed; + self.skipped_objects = skipped; + self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + } + + pub fn set_current_item(&mut self, bucket: Option, object: Option) { + self.current_bucket = bucket; + self.current_object = object; + self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + } + + pub fn complete_bucket(&mut self, bucket: &str) { + if !self.completed_buckets.contains(&bucket.to_string()) { + self.completed_buckets.push(bucket.to_string()); + } + if let Some(pos) = self.pending_buckets.iter().position(|b| b == bucket) { + self.pending_buckets.remove(pos); + } + self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + } + + pub fn mark_completed(&mut self) { + self.completed = true; + self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + } + + pub fn set_error(&mut self, error: String) { + self.error_message = Some(error); + self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + } + + pub fn increment_retry(&mut self) { + self.retry_count += 1; + self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + } + + pub fn can_retry(&self) -> bool { + self.retry_count < self.max_retries + } + + pub fn get_progress_percentage(&self) -> f64 { + if self.total_objects == 0 { + return 0.0; + } + (self.processed_objects as f64 / self.total_objects as f64) * 100.0 + } + + pub fn get_success_rate(&self) -> f64 { + let total = self.successful_objects + self.failed_objects; + if total == 0 { + return 0.0; + } + (self.successful_objects as f64 / total as f64) * 100.0 + } +} + +/// resume manager +pub struct ResumeManager { + disk: DiskStore, + state: Arc>, +} + +impl ResumeManager { + /// create new resume manager + pub async fn new(disk: DiskStore, task_id: String, task_type: String, buckets: Vec) -> Result { + let state = ResumeState::new(task_id, task_type, buckets); + let manager = Self { + disk, + state: Arc::new(RwLock::new(state)), + }; + + // save initial state + manager.save_state().await?; + Ok(manager) + } + + /// load resume state from disk + pub async fn load_from_disk(disk: DiskStore, task_id: &str) -> Result { + let state_data = Self::read_state_file(&disk, task_id).await?; + let state: ResumeState = serde_json::from_slice(&state_data).map_err(|e| Error::TaskExecutionFailed { + message: format!("Failed to deserialize resume state: {e}"), + })?; + + Ok(Self { + disk, + state: Arc::new(RwLock::new(state)), + }) + } + + /// check if resume state exists + pub async fn has_resume_state(disk: &DiskStore, task_id: &str) -> bool { + let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_STATE_FILE}")); + match disk.read_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap()).await { + Ok(data) => !data.is_empty(), + Err(_) => false, + } + } + + /// get current state + pub async fn get_state(&self) -> ResumeState { + self.state.read().await.clone() + } + + /// update progress + pub async fn update_progress(&self, processed: u64, successful: u64, failed: u64, skipped: u64) -> Result<()> { + let mut state = self.state.write().await; + state.update_progress(processed, successful, failed, skipped); + drop(state); + self.save_state().await + } + + /// set current item + pub async fn set_current_item(&self, bucket: Option, object: Option) -> Result<()> { + let mut state = self.state.write().await; + state.set_current_item(bucket, object); + drop(state); + self.save_state().await + } + + /// complete bucket + pub async fn complete_bucket(&self, bucket: &str) -> Result<()> { + let mut state = self.state.write().await; + state.complete_bucket(bucket); + drop(state); + self.save_state().await + } + + /// mark task completed + pub async fn mark_completed(&self) -> Result<()> { + let mut state = self.state.write().await; + state.mark_completed(); + drop(state); + self.save_state().await + } + + /// set error message + pub async fn set_error(&self, error: String) -> Result<()> { + let mut state = self.state.write().await; + state.set_error(error); + drop(state); + self.save_state().await + } + + /// increment retry count + pub async fn increment_retry(&self) -> Result<()> { + let mut state = self.state.write().await; + state.increment_retry(); + drop(state); + self.save_state().await + } + + /// cleanup resume state + pub async fn cleanup(&self) -> Result<()> { + let state = self.state.read().await; + let task_id = &state.task_id; + + // delete state files + let state_file = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_STATE_FILE}")); + let progress_file = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_PROGRESS_FILE}")); + let checkpoint_file = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_CHECKPOINT_FILE}")); + + // ignore delete errors, files may not exist + let _ = self + .disk + .delete(RUSTFS_META_BUCKET, state_file.to_str().unwrap(), Default::default()) + .await; + let _ = self + .disk + .delete(RUSTFS_META_BUCKET, progress_file.to_str().unwrap(), Default::default()) + .await; + let _ = self + .disk + .delete(RUSTFS_META_BUCKET, checkpoint_file.to_str().unwrap(), Default::default()) + .await; + + info!("Cleaned up resume state for task: {}", task_id); + Ok(()) + } + + /// save state to disk + async fn save_state(&self) -> Result<()> { + let state = self.state.read().await; + let state_data = serde_json::to_vec(&*state).map_err(|e| Error::TaskExecutionFailed { + message: format!("Failed to serialize resume state: {e}"), + })?; + + let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{}_{}", state.task_id, RESUME_STATE_FILE)); + + self.disk + .write_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap(), state_data.into()) + .await + .map_err(|e| Error::TaskExecutionFailed { + message: format!("Failed to save resume state: {e}"), + })?; + + debug!("Saved resume state for task: {}", state.task_id); + Ok(()) + } + + /// read state file from disk + async fn read_state_file(disk: &DiskStore, task_id: &str) -> Result> { + let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_STATE_FILE}")); + + disk.read_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap()) + .await + .map(|bytes| bytes.to_vec()) + .map_err(|e| Error::TaskExecutionFailed { + message: format!("Failed to read resume state file: {e}"), + }) + } +} + +/// resume checkpoint +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ResumeCheckpoint { + /// task id + pub task_id: String, + /// checkpoint time + pub checkpoint_time: u64, + /// current bucket index + pub current_bucket_index: usize, + /// current object index + pub current_object_index: usize, + /// processed objects + pub processed_objects: Vec, + /// failed objects + pub failed_objects: Vec, + /// skipped objects + pub skipped_objects: Vec, +} + +impl ResumeCheckpoint { + pub fn new(task_id: String) -> Self { + Self { + task_id, + checkpoint_time: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), + current_bucket_index: 0, + current_object_index: 0, + processed_objects: Vec::new(), + failed_objects: Vec::new(), + skipped_objects: Vec::new(), + } + } + + pub fn update_position(&mut self, bucket_index: usize, object_index: usize) { + self.current_bucket_index = bucket_index; + self.current_object_index = object_index; + self.checkpoint_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + } + + pub fn add_processed_object(&mut self, object: String) { + if !self.processed_objects.contains(&object) { + self.processed_objects.push(object); + } + } + + pub fn add_failed_object(&mut self, object: String) { + if !self.failed_objects.contains(&object) { + self.failed_objects.push(object); + } + } + + pub fn add_skipped_object(&mut self, object: String) { + if !self.skipped_objects.contains(&object) { + self.skipped_objects.push(object); + } + } +} + +/// resume checkpoint manager +pub struct CheckpointManager { + disk: DiskStore, + checkpoint: Arc>, +} + +impl CheckpointManager { + /// create new checkpoint manager + pub async fn new(disk: DiskStore, task_id: String) -> Result { + let checkpoint = ResumeCheckpoint::new(task_id); + let manager = Self { + disk, + checkpoint: Arc::new(RwLock::new(checkpoint)), + }; + + // save initial checkpoint + manager.save_checkpoint().await?; + Ok(manager) + } + + /// load checkpoint from disk + pub async fn load_from_disk(disk: DiskStore, task_id: &str) -> Result { + let checkpoint_data = Self::read_checkpoint_file(&disk, task_id).await?; + let checkpoint: ResumeCheckpoint = serde_json::from_slice(&checkpoint_data).map_err(|e| Error::TaskExecutionFailed { + message: format!("Failed to deserialize checkpoint: {e}"), + })?; + + Ok(Self { + disk, + checkpoint: Arc::new(RwLock::new(checkpoint)), + }) + } + + /// check if checkpoint exists + pub async fn has_checkpoint(disk: &DiskStore, task_id: &str) -> bool { + let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_CHECKPOINT_FILE}")); + match disk.read_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap()).await { + Ok(data) => !data.is_empty(), + Err(_) => false, + } + } + + /// get current checkpoint + pub async fn get_checkpoint(&self) -> ResumeCheckpoint { + self.checkpoint.read().await.clone() + } + + /// update position + pub async fn update_position(&self, bucket_index: usize, object_index: usize) -> Result<()> { + let mut checkpoint = self.checkpoint.write().await; + checkpoint.update_position(bucket_index, object_index); + drop(checkpoint); + self.save_checkpoint().await + } + + /// add processed object + pub async fn add_processed_object(&self, object: String) -> Result<()> { + let mut checkpoint = self.checkpoint.write().await; + checkpoint.add_processed_object(object); + drop(checkpoint); + self.save_checkpoint().await + } + + /// add failed object + pub async fn add_failed_object(&self, object: String) -> Result<()> { + let mut checkpoint = self.checkpoint.write().await; + checkpoint.add_failed_object(object); + drop(checkpoint); + self.save_checkpoint().await + } + + /// add skipped object + pub async fn add_skipped_object(&self, object: String) -> Result<()> { + let mut checkpoint = self.checkpoint.write().await; + checkpoint.add_skipped_object(object); + drop(checkpoint); + self.save_checkpoint().await + } + + /// cleanup checkpoint + pub async fn cleanup(&self) -> Result<()> { + let checkpoint = self.checkpoint.read().await; + let task_id = &checkpoint.task_id; + + let checkpoint_file = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_CHECKPOINT_FILE}")); + let _ = self + .disk + .delete(RUSTFS_META_BUCKET, checkpoint_file.to_str().unwrap(), Default::default()) + .await; + + info!("Cleaned up checkpoint for task: {}", task_id); + Ok(()) + } + + /// save checkpoint to disk + async fn save_checkpoint(&self) -> Result<()> { + let checkpoint = self.checkpoint.read().await; + let checkpoint_data = serde_json::to_vec(&*checkpoint).map_err(|e| Error::TaskExecutionFailed { + message: format!("Failed to serialize checkpoint: {e}"), + })?; + + let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{}_{}", checkpoint.task_id, RESUME_CHECKPOINT_FILE)); + + self.disk + .write_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap(), checkpoint_data.into()) + .await + .map_err(|e| Error::TaskExecutionFailed { + message: format!("Failed to save checkpoint: {e}"), + })?; + + debug!("Saved checkpoint for task: {}", checkpoint.task_id); + Ok(()) + } + + /// read checkpoint file from disk + async fn read_checkpoint_file(disk: &DiskStore, task_id: &str) -> Result> { + let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_CHECKPOINT_FILE}")); + + disk.read_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap()) + .await + .map(|bytes| bytes.to_vec()) + .map_err(|e| Error::TaskExecutionFailed { + message: format!("Failed to read checkpoint file: {e}"), + }) + } +} + +/// resume utils +pub struct ResumeUtils; + +impl ResumeUtils { + /// generate unique task id + pub fn generate_task_id() -> String { + Uuid::new_v4().to_string() + } + + /// check if task can be resumed + pub async fn can_resume_task(disk: &DiskStore, task_id: &str) -> bool { + ResumeManager::has_resume_state(disk, task_id).await + } + + /// get all resumable task ids + pub async fn get_resumable_tasks(disk: &DiskStore) -> Result> { + // List all files in the buckets metadata directory + let entries = match disk.list_dir("", RUSTFS_META_BUCKET, BUCKET_META_PREFIX, -1).await { + Ok(entries) => entries, + Err(e) => { + debug!("Failed to list resume state files: {}", e); + return Ok(Vec::new()); + } + }; + + let mut task_ids = Vec::new(); + + // Filter files that end with ahm_resume_state.json and extract task IDs + for entry in entries { + if entry.ends_with(&format!("_{RESUME_STATE_FILE}")) { + // Extract task ID from filename: {task_id}_ahm_resume_state.json + if let Some(task_id) = entry.strip_suffix(&format!("_{RESUME_STATE_FILE}")) { + if !task_id.is_empty() { + task_ids.push(task_id.to_string()); + } + } + } + } + + debug!("Found {} resumable tasks: {:?}", task_ids.len(), task_ids); + Ok(task_ids) + } + + /// cleanup expired resume states + pub async fn cleanup_expired_states(disk: &DiskStore, max_age_hours: u64) -> Result<()> { + let task_ids = Self::get_resumable_tasks(disk).await?; + let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + + for task_id in task_ids { + if let Ok(resume_manager) = ResumeManager::load_from_disk(disk.clone(), &task_id).await { + let state = resume_manager.get_state().await; + let age_hours = (current_time - state.last_update) / 3600; + + if age_hours > max_age_hours { + info!("Cleaning up expired resume state for task: {} (age: {} hours)", task_id, age_hours); + if let Err(e) = resume_manager.cleanup().await { + warn!("Failed to cleanup expired resume state for task {}: {}", task_id, e); + } + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_resume_state_creation() { + let task_id = ResumeUtils::generate_task_id(); + let buckets = vec!["bucket1".to_string(), "bucket2".to_string()]; + let state = ResumeState::new(task_id.clone(), "erasure_set".to_string(), buckets); + + assert_eq!(state.task_id, task_id); + assert_eq!(state.task_type, "erasure_set"); + assert!(!state.completed); + assert_eq!(state.processed_objects, 0); + assert_eq!(state.pending_buckets.len(), 2); + } + + #[tokio::test] + async fn test_resume_state_progress() { + let task_id = ResumeUtils::generate_task_id(); + let buckets = vec!["bucket1".to_string()]; + let mut state = ResumeState::new(task_id, "erasure_set".to_string(), buckets); + + state.update_progress(10, 8, 1, 1); + assert_eq!(state.processed_objects, 10); + assert_eq!(state.successful_objects, 8); + assert_eq!(state.failed_objects, 1); + assert_eq!(state.skipped_objects, 1); + + let progress = state.get_progress_percentage(); + assert_eq!(progress, 0.0); // total_objects is 0 + + state.total_objects = 100; + let progress = state.get_progress_percentage(); + assert_eq!(progress, 10.0); + } + + #[tokio::test] + async fn test_resume_state_bucket_completion() { + let task_id = ResumeUtils::generate_task_id(); + let buckets = vec!["bucket1".to_string(), "bucket2".to_string()]; + let mut state = ResumeState::new(task_id, "erasure_set".to_string(), buckets); + + assert_eq!(state.pending_buckets.len(), 2); + assert_eq!(state.completed_buckets.len(), 0); + + state.complete_bucket("bucket1"); + assert_eq!(state.pending_buckets.len(), 1); + assert_eq!(state.completed_buckets.len(), 1); + assert!(state.completed_buckets.contains(&"bucket1".to_string())); + } + + #[tokio::test] + async fn test_resume_utils() { + let task_id1 = ResumeUtils::generate_task_id(); + let task_id2 = ResumeUtils::generate_task_id(); + + assert_ne!(task_id1, task_id2); + assert_eq!(task_id1.len(), 36); // UUID length + assert_eq!(task_id2.len(), 36); + } + + #[tokio::test] + async fn test_get_resumable_tasks_integration() { + use rustfs_ecstore::disk::{endpoint::Endpoint, new_disk, DiskOption}; + use tempfile::TempDir; + + // Create a temporary directory for testing + let temp_dir = TempDir::new().unwrap(); + let disk_path = temp_dir.path().join("test_disk"); + std::fs::create_dir_all(&disk_path).unwrap(); + + // Create a local disk for testing + let endpoint = Endpoint::try_from(disk_path.to_string_lossy().as_ref()).unwrap(); + let disk_option = DiskOption { + cleanup: false, + health_check: false, + }; + let disk = new_disk(&endpoint, &disk_option).await.unwrap(); + + // Create necessary directories first (ignore if already exist) + let _ = disk.make_volume(RUSTFS_META_BUCKET).await; + let _ = disk.make_volume(&format!("{RUSTFS_META_BUCKET}/{BUCKET_META_PREFIX}")).await; + + // Create some test resume state files + let task_ids = vec![ + "test-task-1".to_string(), + "test-task-2".to_string(), + "test-task-3".to_string(), + ]; + + // Save resume state files for each task + for task_id in &task_ids { + let state = ResumeState::new( + task_id.clone(), + "erasure_set".to_string(), + vec!["bucket1".to_string(), "bucket2".to_string()], + ); + + let state_data = serde_json::to_vec(&state).unwrap(); + let file_path = format!("{BUCKET_META_PREFIX}/{task_id}_{RESUME_STATE_FILE}"); + + disk.write_all(RUSTFS_META_BUCKET, &file_path, state_data.into()) + .await + .unwrap(); + } + + // Also create some non-resume state files to test filtering + let non_resume_files = vec![ + "other_file.txt", + "task4_ahm_checkpoint.json", + "task5_ahm_progress.json", + "_ahm_resume_state.json", // Invalid: empty task ID + ]; + + for file_name in non_resume_files { + let file_path = format!("{BUCKET_META_PREFIX}/{file_name}"); + disk.write_all(RUSTFS_META_BUCKET, &file_path, b"test data".to_vec().into()) + .await + .unwrap(); + } + + // Now call get_resumable_tasks to see if it finds the correct files + let found_task_ids = ResumeUtils::get_resumable_tasks(&disk).await.unwrap(); + + // Verify that only the valid resume state files are found + assert_eq!(found_task_ids.len(), 3); + for task_id in &task_ids { + assert!(found_task_ids.contains(task_id), "Task ID {task_id} not found"); + } + + // Verify that invalid files are not included + assert!(!found_task_ids.contains(&"".to_string())); + assert!(!found_task_ids.contains(&"task4".to_string())); + assert!(!found_task_ids.contains(&"task5".to_string())); + + // Clean up + temp_dir.close().unwrap(); + } +} diff --git a/crates/ahm/src/heal/storage.rs b/crates/ahm/src/heal/storage.rs index d04270c8..aa204307 100644 --- a/crates/ahm/src/heal/storage.rs +++ b/crates/ahm/src/heal/storage.rs @@ -15,7 +15,7 @@ use crate::error::{Error, Result}; use async_trait::async_trait; use rustfs_ecstore::{ - disk::endpoint::Endpoint, + disk::{endpoint::Endpoint, DiskStore}, heal::heal_commands::{HealOpts, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN}, store::ECStore, store_api::{BucketInfo, ObjectIO, StorageAPI}, @@ -109,6 +109,9 @@ pub trait HealStorageAPI: Send + Sync { /// List objects for healing async fn list_objects_for_heal(&self, bucket: &str, prefix: &str) -> Result>; + + /// Get disk for resume functionality + async fn get_disk_for_resume(&self, set_disk_id: &str) -> Result; } /// ECStore Heal storage layer implementation @@ -460,4 +463,44 @@ impl HealStorageAPI for ECStoreHealStorage { } } } + + async fn get_disk_for_resume(&self, set_disk_id: &str) -> Result { + debug!("Getting disk for resume: {}", set_disk_id); + + // Parse set_disk_id to extract pool and set indices + // Format: "pool_{pool_idx}_set_{set_idx}" + let parts: Vec<&str> = set_disk_id.split('_').collect(); + if parts.len() != 4 || parts[0] != "pool" || parts[2] != "set" { + return Err(Error::TaskExecutionFailed { + message: format!("Invalid set_disk_id format: {set_disk_id}"), + }); + } + + let pool_idx: usize = parts[1].parse().map_err(|_| Error::TaskExecutionFailed { + message: format!("Invalid pool index in set_disk_id: {set_disk_id}"), + })?; + + let set_idx: usize = parts[3].parse().map_err(|_| Error::TaskExecutionFailed { + message: format!("Invalid set index in set_disk_id: {set_disk_id}"), + })?; + + // Get the first available disk from the set + let disks = self + .ecstore + .get_disks(pool_idx, set_idx) + .await + .map_err(|e| Error::TaskExecutionFailed { + message: format!("Failed to get disks for pool {pool_idx} set {set_idx}: {e}"), + })?; + + // Find the first available disk + if let Some(disk_store) = disks.into_iter().flatten().next() { + info!("Found disk for resume: {:?}", disk_store); + return Ok(disk_store); + } + + Err(Error::TaskExecutionFailed { + message: format!("No available disk found for set_disk_id: {set_disk_id}"), + }) + } } diff --git a/crates/ahm/src/heal/task.rs b/crates/ahm/src/heal/task.rs index ea07786d..44f12dac 100644 --- a/crates/ahm/src/heal/task.rs +++ b/crates/ahm/src/heal/task.rs @@ -13,20 +13,10 @@ // limitations under the License. use crate::error::{Error, Result}; -use crate::heal::{progress::HealProgress, storage::HealStorageAPI}; -use rustfs_ecstore::config::RUSTFS_CONFIG_PREFIX; -use rustfs_ecstore::disk::endpoint::Endpoint; -use rustfs_ecstore::disk::error::DiskError; -use rustfs_ecstore::disk::{DiskAPI, DiskInfoOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET}; +use crate::heal::{erasure_healer::ErasureSetHealer, progress::HealProgress, storage::HealStorageAPI}; +use rustfs_ecstore::heal::heal_commands::HealScanMode; use rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN; -use rustfs_ecstore::heal::heal_commands::{init_healing_tracker, load_healing_tracker, HealScanMode}; -use rustfs_ecstore::new_object_layer_fn; -use rustfs_ecstore::store::get_disk_via_endpoint; -use rustfs_ecstore::store_api::BucketInfo; -use rustfs_utils::path::path_join; use serde::{Deserialize, Serialize}; -use std::cmp::Ordering; -use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, SystemTime}; use tokio::sync::RwLock; @@ -44,8 +34,8 @@ pub enum HealType { }, /// Bucket heal Bucket { bucket: String }, - /// Disk heal - Disk { endpoint: Endpoint }, + /// Erasure Set heal (includes disk format repair) + ErasureSet { buckets: Vec, set_disk_id: String }, /// Metadata heal Metadata { bucket: String, object: String }, /// MRF heal @@ -175,10 +165,6 @@ impl HealRequest { Self::new(HealType::Bucket { bucket }, HealOptions::default(), HealPriority::Normal) } - pub fn disk(endpoint: Endpoint) -> Self { - Self::new(HealType::Disk { endpoint }, HealOptions::default(), HealPriority::High) - } - pub fn metadata(bucket: String, object: String) -> Self { Self::new(HealType::Metadata { bucket, object }, HealOptions::default(), HealPriority::High) } @@ -256,7 +242,7 @@ impl HealTask { version_id, } => self.heal_object(bucket, object, version_id.as_deref()).await, HealType::Bucket { bucket } => self.heal_bucket(bucket).await, - HealType::Disk { endpoint } => self.heal_disk(endpoint).await, + HealType::Metadata { bucket, object } => self.heal_metadata(bucket, object).await, HealType::MRF { meta_path } => self.heal_mrf(meta_path).await, HealType::ECDecode { @@ -264,6 +250,7 @@ impl HealTask { object, version_id, } => self.heal_ec_decode(bucket, object, version_id.as_deref()).await, + HealType::ErasureSet { buckets, set_disk_id } => self.heal_erasure_set(buckets.clone(), set_disk_id.clone()).await, }; // update completed time and status @@ -524,62 +511,6 @@ impl HealTask { } } - async fn heal_disk(&self, endpoint: &Endpoint) -> Result<()> { - info!("Healing disk: {:?}", endpoint); - - // update progress - { - let mut progress = self.progress.write().await; - progress.set_current_object(Some(format!("disk: {endpoint:?}"))); - progress.update_progress(0, 3, 0, 0); - } - - // Step 1: Perform disk format heal using ecstore - info!("Step 1: Performing disk format heal using ecstore"); - - match self.storage.heal_format(self.options.dry_run).await { - Ok((result, error)) => { - if let Some(e) = error { - error!("Disk heal failed: {:?} - {}", endpoint, e); - { - let mut progress = self.progress.write().await; - progress.update_progress(3, 3, 0, 0); - } - return Err(Error::TaskExecutionFailed { - message: format!("Failed to heal disk {endpoint:?}: {e}"), - }); - } - - info!("Disk heal completed successfully: {:?} ({} drives)", endpoint, result.after.drives.len()); - - { - let mut progress = self.progress.write().await; - progress.update_progress(2, 3, 0, 0); - } - - // Step 2: Synchronize data/buckets on the fresh disk - info!("Step 2: Healing buckets on fresh disk"); - self.heal_fresh_disk(endpoint).await?; - - { - let mut progress = self.progress.write().await; - progress.update_progress(3, 3, 0, 0); - } - Ok(()) - } - Err(e) => { - error!("Disk heal failed: {:?} - {}", endpoint, e); - { - let mut progress = self.progress.write().await; - progress.update_progress(3, 3, 0, 0); - } - Err(Error::TaskExecutionFailed { - message: format!("Failed to heal disk {endpoint:?}: {e}"), - }) - } - } - } - async fn heal_metadata(&self, bucket: &str, object: &str) -> Result<()> { info!("Healing metadata: {}/{}", bucket, object); @@ -807,79 +738,93 @@ impl HealTask { } } - async fn heal_fresh_disk(&self, endpoint: &Endpoint) -> Result<()> { - // Locate disk via endpoint - let disk = get_disk_via_endpoint(endpoint) - .await - .ok_or_else(|| Error::other(format!("Disk not found for endpoint: {endpoint}")))?; + async fn heal_erasure_set(&self, buckets: Vec, set_disk_id: String) -> Result<()> { + info!("Healing Erasure Set: {} ({} buckets)", set_disk_id, buckets.len()); - // Skip if drive is root or other fatal errors - if let Err(e) = disk.disk_info(&DiskInfoOptions::default()).await { - match e { - DiskError::DriveIsRoot => return Ok(()), - DiskError::UnformattedDisk => { /* continue healing */ } - _ => return Err(Error::other(e)), + // update progress + { + let mut progress = self.progress.write().await; + progress.set_current_object(Some(format!("erasure_set: {} ({} buckets)", set_disk_id, buckets.len()))); + progress.update_progress(0, 4, 0, 0); + } + + // Step 1: Perform disk format heal using ecstore + info!("Step 1: Performing disk format heal using ecstore"); + match self.storage.heal_format(self.options.dry_run).await { + Ok((result, error)) => { + if let Some(e) = error { + error!("Disk format heal failed: {} - {}", set_disk_id, e); + { + let mut progress = self.progress.write().await; + progress.update_progress(4, 4, 0, 0); + } + return Err(Error::TaskExecutionFailed { + message: format!("Failed to heal disk format for {set_disk_id}: {e}"), + }); + } + + info!( + "Disk format heal completed successfully: {} ({} drives)", + set_disk_id, + result.after.drives.len() + ); + } + Err(e) => { + error!("Disk format heal failed: {} - {}", set_disk_id, e); + { + let mut progress = self.progress.write().await; + progress.update_progress(4, 4, 0, 0); + } + return Err(Error::TaskExecutionFailed { + message: format!("Failed to heal disk format for {set_disk_id}: {e}"), + }); } } - // Load or init HealingTracker - let mut tracker = match load_healing_tracker(&Some(disk.clone())).await { - Ok(t) => t, - Err(err) => match err { - DiskError::FileNotFound => init_healing_tracker(disk.clone(), &Uuid::new_v4().to_string()) - .await - .map_err(Error::other)?, - _ => return Err(Error::other(err)), - }, - }; + { + let mut progress = self.progress.write().await; + progress.update_progress(1, 4, 0, 0); + } - // Build bucket list - let mut buckets = self.storage.list_buckets().await.map_err(Error::other)?; - buckets.push(BucketInfo { - name: path_join(&[PathBuf::from(RUSTFS_META_BUCKET), PathBuf::from(RUSTFS_CONFIG_PREFIX)]) - .to_string_lossy() - .to_string(), - ..Default::default() - }); - buckets.push(BucketInfo { - name: path_join(&[PathBuf::from(RUSTFS_META_BUCKET), PathBuf::from(BUCKET_META_PREFIX)]) - .to_string_lossy() - .to_string(), - ..Default::default() - }); + // Step 2: Get disk for resume functionality + info!("Step 2: Getting disk for resume functionality"); + let disk = self.storage.get_disk_for_resume(&set_disk_id).await?; - // Sort: system buckets first, others by creation time desc - buckets.sort_by(|a, b| { - let a_sys = a.name.starts_with(RUSTFS_META_BUCKET); - let b_sys = b.name.starts_with(RUSTFS_META_BUCKET); - match (a_sys, b_sys) { - (true, false) => Ordering::Less, - (false, true) => Ordering::Greater, - _ => b.created.cmp(&a.created), + { + let mut progress = self.progress.write().await; + progress.update_progress(2, 4, 0, 0); + } + + // Step 3: Create erasure set healer with resume support + info!("Step 3: Creating erasure set healer with resume support"); + let erasure_healer = ErasureSetHealer::new(self.storage.clone(), self.progress.clone(), self.cancel_token.clone(), disk); + + { + let mut progress = self.progress.write().await; + progress.update_progress(3, 4, 0, 0); + } + + // Step 4: Execute erasure set heal with resume + info!("Step 4: Executing erasure set heal with resume"); + let result = erasure_healer.heal_erasure_set(&buckets, &set_disk_id).await; + + { + let mut progress = self.progress.write().await; + progress.update_progress(4, 4, 0, 0); + } + + match result { + Ok(_) => { + info!("Erasure set heal completed successfully: {} ({} buckets)", set_disk_id, buckets.len()); + Ok(()) } - }); - - // Update tracker queue and persist - tracker.set_queue_buckets(&buckets).await; - tracker.save().await.map_err(Error::other)?; - - // Prepare bucket names list - let bucket_names: Vec = buckets.iter().map(|b| b.name.clone()).collect(); - - // Run heal_erasure_set using underlying SetDisk - let (pool_idx, set_idx) = (endpoint.pool_idx as usize, endpoint.set_idx as usize); - let Some(store) = new_object_layer_fn() else { - return Err(Error::other("errServerNotInitialized")); - }; - let set_disk = store.pools[pool_idx].disk_set[set_idx].clone(); - - let tracker_arc = Arc::new(RwLock::new(tracker)); - set_disk - .heal_erasure_set(&bucket_names, tracker_arc) - .await - .map_err(Error::other)?; - - Ok(()) + Err(e) => { + error!("Erasure set heal failed: {} - {}", set_disk_id, e); + Err(Error::TaskExecutionFailed { + message: format!("Failed to heal erasure set {set_disk_id}: {e}"), + }) + } + } } } diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index aed19e87..9c5e1aa1 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -22,7 +22,7 @@ use ecstore::{ disk::{DiskAPI, DiskStore, WalkDirOptions}, set_disk::SetDisks, }; -use rustfs_ecstore as ecstore; +use rustfs_ecstore::{self as ecstore, StorageAPI}; use rustfs_filemeta::MetacacheReader; use tokio::sync::{Mutex, RwLock}; use tokio_util::sync::CancellationToken; @@ -502,18 +502,43 @@ impl Scanner { metrics.free_space = disk_info.free; metrics.is_online = disk.is_online().await; - // 检查磁盘状态,如果离线则提交heal任务 + // check disk status, if offline, submit erasure set heal task if !metrics.is_online { let enable_healing = self.config.read().await.enable_healing; if enable_healing { if let Some(heal_manager) = &self.heal_manager { - let req = HealRequest::disk(disk.endpoint().clone()); + // Get bucket list for erasure set healing + let buckets = match rustfs_ecstore::new_object_layer_fn() { + Some(ecstore) => { + match ecstore.list_bucket(&ecstore::store_api::BucketOptions::default()).await { + Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::>(), + Err(e) => { + error!("Failed to get bucket list for disk healing: {}", e); + return Err(Error::Storage(e.into())); + } + } + } + None => { + error!("No ECStore available for getting bucket list"); + return Err(Error::Storage(ecstore::error::StorageError::other("No ECStore available"))); + } + }; + + let set_disk_id = format!("{}_{}", disk.endpoint().pool_idx, disk.endpoint().set_idx); + let req = HealRequest::new( + crate::heal::task::HealType::ErasureSet { + buckets, + set_disk_id, + }, + crate::heal::task::HealOptions::default(), + crate::heal::task::HealPriority::High, + ); match heal_manager.submit_heal_request(req).await { Ok(task_id) => { - warn!("disk offline, submit heal task: {} {}", task_id, disk_path); + warn!("disk offline, submit erasure set heal task: {} {}", task_id, disk_path); } Err(e) => { - error!("disk offline, submit heal task failed: {} {}", disk_path, e); + error!("disk offline, submit erasure set heal task failed: {} {}", disk_path, e); } } } @@ -540,24 +565,42 @@ impl Scanner { Err(e) => { error!("Failed to list volumes on disk {}: {}", disk_path, e); - // 磁盘访问失败,提交磁盘heal任务 + // disk access failed, submit erasure set heal task let enable_healing = self.config.read().await.enable_healing; if enable_healing { if let Some(heal_manager) = &self.heal_manager { - use crate::heal::{HealPriority, HealRequest}; + // Get bucket list for erasure set healing + let buckets = match rustfs_ecstore::new_object_layer_fn() { + Some(ecstore) => { + match ecstore.list_bucket(&ecstore::store_api::BucketOptions::default()).await { + Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::>(), + Err(e) => { + error!("Failed to get bucket list for disk healing: {}", e); + return Err(Error::Storage(e.into())); + } + } + } + None => { + error!("No ECStore available for getting bucket list"); + return Err(Error::Storage(ecstore::error::StorageError::other("No ECStore available"))); + } + }; + + let set_disk_id = format!("{}_{}", disk.endpoint().pool_idx, disk.endpoint().set_idx); let req = HealRequest::new( - crate::heal::HealType::Disk { - endpoint: disk.endpoint().clone(), + crate::heal::task::HealType::ErasureSet { + buckets, + set_disk_id, }, - crate::heal::HealOptions::default(), - HealPriority::Urgent, + crate::heal::task::HealOptions::default(), + crate::heal::task::HealPriority::Urgent, ); match heal_manager.submit_heal_request(req).await { Ok(task_id) => { - warn!("disk access failed, submit heal task: {} {}", task_id, disk_path); + warn!("disk access failed, submit erasure set heal task: {} {}", task_id, disk_path); } Err(heal_err) => { - error!("disk access failed, submit heal task failed: {} {}", disk_path, heal_err); + error!("disk access failed, submit erasure set heal task failed: {} {}", disk_path, heal_err); } } } diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 6edd46f0..678480d4 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -3468,7 +3468,7 @@ impl SetDisks { return Err(Error::other(format!("unable to get disk information before healing it: {err}"))); } }; - let num_cores = num_cpus::get(); // 使用 num_cpus crate 获取核心数 + let num_cores = num_cpus::get(); // use num_cpus crate to get the number of cores let mut num_healers: usize; if info.nr_requests as usize > num_cores { @@ -3577,44 +3577,6 @@ impl SetDisks { continue; } - // let vc: VersioningConfiguration; - // let lc: BucketLifecycleConfiguration; - // let lr: ObjectLockConfiguration; - // let rcfg: ReplicationConfiguration; - // if !is_rustfs_meta_bucket_name(bucket) { - // vc = match get_versioning_config(bucket).await { - // Ok((r, _)) => r, - // Err(err) => { - // ret_err = Some(err); - // info!("get versioning config failed, err: {}", err.to_string()); - // continue; - // } - // }; - // lc = match get_lifecycle_config(bucket).await { - // Ok((r, _)) => r, - // Err(err) => { - // ret_err = Some(err); - // info!("get lifecycle config failed, err: {}", err.to_string()); - // continue; - // } - // }; - // lr = match get_object_lock_config(bucket).await { - // Ok((r, _)) => r, - // Err(err) => { - // ret_err = Some(err); - // info!("get object lock config failed, err: {}", err.to_string()); - // continue; - // } - // }; - // rcfg = match get_replication_config(bucket).await { - // Ok((r, _)) => r, - // Err(err) => { - // ret_err = Some(err); - // info!("get replication config failed, err: {}", err.to_string()); - // continue; - // } - // }; - // } let (mut disks, _, healing) = self.get_online_disk_with_healing_and_info(true).await?; if disks.len() == healing { info!("all drives are in healing state, aborting.."); @@ -3645,17 +3607,6 @@ impl SetDisks { let fallback_disks = disks[expected_disk..].to_vec(); disks = disks[..expected_disk].to_vec(); - //todo - // let filter_life_cycle = |bucket: &str, object: &str, fi: FileInfo| { - // if lc.rules.is_empty() { - // return false; - // } - // // todo: versioning - // let versioned = false; - // let obj_info = fi.to_object_info(bucket, object, versioned); - // - // }; - let result_tx_send = result_tx.clone(); let bg_seq_send = bg_seq.clone(); let send = Box::new(move |result: HealEntryResult| {