refactor: integrate disk healing into erasure set healing

- Remove HealType::Disk and related disk-specific healing methods
- Integrate disk format healing into heal_erasure_set with include_format_heal option
- Update auto disk scanner to use ErasureSet heal type instead of Disk heal
- Fix disk status change event handling to use ErasureSet heal requests
- Add proper bucket list retrieval for auto healing scenarios
- Update data scanner to submit ErasureSet heal tasks for offline disks
- Remove duplicate healing logic between Disk and ErasureSet types
- Ensure all healing operations go through unified ErasureSet healing path
This commit is contained in:
junxiang Mu
2025-07-18 12:06:24 +08:00
parent c49414f6ac
commit aed8f52423
9 changed files with 1381 additions and 219 deletions

View File

@@ -0,0 +1,458 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::{Error, Result};
use crate::heal::{
progress::HealProgress,
resume::{CheckpointManager, ResumeManager, ResumeUtils},
storage::HealStorageAPI,
};
use futures::future::join_all;
use rustfs_ecstore::{
disk::DiskStore,
heal::heal_commands::{HealOpts, HEAL_NORMAL_SCAN},
};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
/// Erasure Set Healer
pub struct ErasureSetHealer {
storage: Arc<dyn HealStorageAPI>,
progress: Arc<RwLock<HealProgress>>,
cancel_token: tokio_util::sync::CancellationToken,
disk: DiskStore,
}
impl ErasureSetHealer {
pub fn new(
storage: Arc<dyn HealStorageAPI>,
progress: Arc<RwLock<HealProgress>>,
cancel_token: tokio_util::sync::CancellationToken,
disk: DiskStore,
) -> Self {
Self {
storage,
progress,
cancel_token,
disk,
}
}
/// execute erasure set heal with resume
pub async fn heal_erasure_set(&self, buckets: &[String], set_disk_id: &str) -> Result<()> {
info!("Starting erasure set heal for {} buckets on set disk {}", buckets.len(), set_disk_id);
// 1. generate or get task id
let task_id = self.get_or_create_task_id(set_disk_id).await?;
// 2. initialize or resume resume state
let (resume_manager, checkpoint_manager) = self.initialize_resume_state(&task_id, buckets).await?;
// 3. execute heal with resume
let result = self
.execute_heal_with_resume(buckets, &resume_manager, &checkpoint_manager)
.await;
// 4. cleanup resume state
if result.is_ok() {
if let Err(e) = resume_manager.cleanup().await {
warn!("Failed to cleanup resume state: {}", e);
}
if let Err(e) = checkpoint_manager.cleanup().await {
warn!("Failed to cleanup checkpoint: {}", e);
}
}
result
}
/// get or create task id
async fn get_or_create_task_id(&self, _set_disk_id: &str) -> Result<String> {
// check if there are resumable tasks
let resumable_tasks = ResumeUtils::get_resumable_tasks(&self.disk).await?;
for task_id in resumable_tasks {
if ResumeUtils::can_resume_task(&self.disk, &task_id).await {
info!("Found resumable task: {}", task_id);
return Ok(task_id);
}
}
// create new task id
let task_id = ResumeUtils::generate_task_id();
info!("Created new heal task: {}", task_id);
Ok(task_id)
}
/// initialize or resume resume state
async fn initialize_resume_state(&self, task_id: &str, buckets: &[String]) -> Result<(ResumeManager, CheckpointManager)> {
// check if resume state exists
if ResumeManager::has_resume_state(&self.disk, task_id).await {
info!("Loading existing resume state for task: {}", task_id);
let resume_manager = ResumeManager::load_from_disk(self.disk.clone(), task_id).await?;
let checkpoint_manager = if CheckpointManager::has_checkpoint(&self.disk, task_id).await {
CheckpointManager::load_from_disk(self.disk.clone(), task_id).await?
} else {
CheckpointManager::new(self.disk.clone(), task_id.to_string()).await?
};
Ok((resume_manager, checkpoint_manager))
} else {
info!("Creating new resume state for task: {}", task_id);
let resume_manager =
ResumeManager::new(self.disk.clone(), task_id.to_string(), "erasure_set".to_string(), buckets.to_vec()).await?;
let checkpoint_manager = CheckpointManager::new(self.disk.clone(), task_id.to_string()).await?;
Ok((resume_manager, checkpoint_manager))
}
}
/// execute heal with resume
async fn execute_heal_with_resume(
&self,
buckets: &[String],
resume_manager: &ResumeManager,
checkpoint_manager: &CheckpointManager,
) -> Result<()> {
// 1. get current state
let state = resume_manager.get_state().await;
let checkpoint = checkpoint_manager.get_checkpoint().await;
info!(
"Resuming from bucket {} object {}",
checkpoint.current_bucket_index, checkpoint.current_object_index
);
// 2. initialize progress
self.initialize_progress(buckets, &state).await;
// 3. continue from checkpoint
let current_bucket_index = checkpoint.current_bucket_index;
let mut current_object_index = checkpoint.current_object_index;
let mut processed_objects = state.processed_objects;
let mut successful_objects = state.successful_objects;
let mut failed_objects = state.failed_objects;
let mut skipped_objects = state.skipped_objects;
// 4. process remaining buckets
for (bucket_idx, bucket) in buckets.iter().enumerate().skip(current_bucket_index) {
// check if completed
if state.completed_buckets.contains(bucket) {
continue;
}
// update current bucket
resume_manager.set_current_item(Some(bucket.clone()), None).await?;
// process objects in bucket
let bucket_result = self
.heal_bucket_with_resume(
bucket,
&mut current_object_index,
&mut processed_objects,
&mut successful_objects,
&mut failed_objects,
&mut skipped_objects,
resume_manager,
checkpoint_manager,
)
.await;
// update checkpoint position
checkpoint_manager.update_position(bucket_idx, current_object_index).await?;
// update progress
resume_manager
.update_progress(processed_objects, successful_objects, failed_objects, skipped_objects)
.await?;
// check cancel status
if self.cancel_token.is_cancelled() {
info!("Heal task cancelled");
return Err(Error::TaskCancelled);
}
// process bucket result
match bucket_result {
Ok(_) => {
resume_manager.complete_bucket(bucket).await?;
info!("Completed heal for bucket: {}", bucket);
}
Err(e) => {
error!("Failed to heal bucket {}: {}", bucket, e);
// continue to next bucket, do not interrupt the whole process
}
}
// reset object index
current_object_index = 0;
}
// 5. mark task completed
resume_manager.mark_completed().await?;
info!("Erasure set heal completed successfully");
Ok(())
}
/// heal single bucket with resume
#[allow(clippy::too_many_arguments)]
async fn heal_bucket_with_resume(
&self,
bucket: &str,
current_object_index: &mut usize,
processed_objects: &mut u64,
successful_objects: &mut u64,
failed_objects: &mut u64,
_skipped_objects: &mut u64,
resume_manager: &ResumeManager,
checkpoint_manager: &CheckpointManager,
) -> Result<()> {
info!("Starting heal for bucket: {} from object index {}", bucket, current_object_index);
// 1. get bucket info
let _bucket_info = match self.storage.get_bucket_info(bucket).await? {
Some(info) => info,
None => {
warn!("Bucket {} not found, skipping", bucket);
return Ok(());
}
};
// 2. get objects to heal
let objects = self.storage.list_objects_for_heal(bucket, "").await?;
// 3. continue from checkpoint
for (obj_idx, object) in objects.iter().enumerate().skip(*current_object_index) {
// check if already processed
if checkpoint_manager.get_checkpoint().await.processed_objects.contains(object) {
continue;
}
// update current object
resume_manager
.set_current_item(Some(bucket.to_string()), Some(object.clone()))
.await?;
// heal object
let heal_opts = HealOpts {
scan_mode: HEAL_NORMAL_SCAN,
remove: true,
recreate: true,
..Default::default()
};
match self.storage.heal_object(bucket, object, None, &heal_opts).await {
Ok((_result, None)) => {
*successful_objects += 1;
checkpoint_manager.add_processed_object(object.clone()).await?;
info!("Successfully healed object {}/{}", bucket, object);
}
Ok((_, Some(err))) => {
*failed_objects += 1;
checkpoint_manager.add_failed_object(object.clone()).await?;
warn!("Failed to heal object {}/{}: {}", bucket, object, err);
}
Err(err) => {
*failed_objects += 1;
checkpoint_manager.add_failed_object(object.clone()).await?;
warn!("Error healing object {}/{}: {}", bucket, object, err);
}
}
*processed_objects += 1;
*current_object_index = obj_idx + 1;
// check cancel status
if self.cancel_token.is_cancelled() {
info!("Heal task cancelled during object processing");
return Err(Error::TaskCancelled);
}
// save checkpoint periodically
if obj_idx % 100 == 0 {
checkpoint_manager.update_position(0, *current_object_index).await?;
}
}
Ok(())
}
/// initialize progress tracking
async fn initialize_progress(&self, _buckets: &[String], state: &crate::heal::resume::ResumeState) {
let mut progress = self.progress.write().await;
progress.objects_scanned = state.total_objects;
progress.objects_healed = state.successful_objects;
progress.objects_failed = state.failed_objects;
progress.bytes_processed = 0; // set to 0 for now, can be extended later
progress.set_current_object(state.current_object.clone());
}
/// heal all buckets concurrently
#[allow(dead_code)]
async fn heal_buckets_concurrently(&self, buckets: &[String]) -> Vec<Result<()>> {
// use semaphore to control concurrency, avoid too many concurrent healings
let semaphore = Arc::new(tokio::sync::Semaphore::new(4)); // max 4 concurrent healings
let heal_futures = buckets.iter().map(|bucket| {
let bucket = bucket.clone();
let storage = self.storage.clone();
let progress = self.progress.clone();
let semaphore = semaphore.clone();
let cancel_token = self.cancel_token.clone();
async move {
let _permit = semaphore.acquire().await.unwrap();
if cancel_token.is_cancelled() {
return Err(Error::TaskCancelled);
}
Self::heal_single_bucket(&storage, &bucket, &progress).await
}
});
// use join_all to process concurrently
join_all(heal_futures).await
}
/// heal single bucket
#[allow(dead_code)]
async fn heal_single_bucket(
storage: &Arc<dyn HealStorageAPI>,
bucket: &str,
progress: &Arc<RwLock<HealProgress>>,
) -> Result<()> {
info!("Starting heal for bucket: {}", bucket);
// 1. get bucket info
let _bucket_info = match storage.get_bucket_info(bucket).await? {
Some(info) => info,
None => {
warn!("Bucket {} not found, skipping", bucket);
return Ok(());
}
};
// 2. get objects to heal
let objects = storage.list_objects_for_heal(bucket, "").await?;
// 3. update progress
{
let mut p = progress.write().await;
p.objects_scanned += objects.len() as u64;
}
// 4. heal objects concurrently
let heal_opts = HealOpts {
scan_mode: HEAL_NORMAL_SCAN,
remove: true, // remove corrupted data
recreate: true, // recreate missing data
..Default::default()
};
let object_results = Self::heal_objects_concurrently(storage, bucket, &objects, &heal_opts, progress).await;
// 5. count results
let (success_count, failure_count) = object_results
.into_iter()
.fold((0, 0), |(success, failure), result| match result {
Ok(_) => (success + 1, failure),
Err(_) => (success, failure + 1),
});
// 6. update progress
{
let mut p = progress.write().await;
p.objects_healed += success_count;
p.objects_failed += failure_count;
p.set_current_object(Some(format!("completed bucket: {bucket}")));
}
info!(
"Completed heal for bucket {}: {} success, {} failures",
bucket, success_count, failure_count
);
Ok(())
}
/// heal objects concurrently
#[allow(dead_code)]
async fn heal_objects_concurrently(
storage: &Arc<dyn HealStorageAPI>,
bucket: &str,
objects: &[String],
heal_opts: &HealOpts,
_progress: &Arc<RwLock<HealProgress>>,
) -> Vec<Result<()>> {
// use semaphore to control object healing concurrency
let semaphore = Arc::new(tokio::sync::Semaphore::new(8)); // max 8 concurrent object healings
let heal_futures = objects.iter().map(|object| {
let object = object.clone();
let bucket = bucket.to_string();
let storage = storage.clone();
let heal_opts = *heal_opts;
let semaphore = semaphore.clone();
async move {
let _permit = semaphore.acquire().await.unwrap();
match storage.heal_object(&bucket, &object, None, &heal_opts).await {
Ok((_result, None)) => {
info!("Successfully healed object {}/{}", bucket, object);
Ok(())
}
Ok((_, Some(err))) => {
warn!("Failed to heal object {}/{}: {}", bucket, object, err);
Err(Error::other(err))
}
Err(err) => {
warn!("Error healing object {}/{}: {}", bucket, object, err);
Err(err)
}
}
}
});
join_all(heal_futures).await
}
/// process results
#[allow(dead_code)]
async fn process_results(&self, results: Vec<Result<()>>) -> Result<()> {
let (success_count, failure_count): (usize, usize) =
results.into_iter().fold((0, 0), |(success, failure), result| match result {
Ok(_) => (success + 1, failure),
Err(_) => (success, failure + 1),
});
let total = success_count + failure_count;
info!("Erasure set heal completed: {}/{} buckets successful", success_count, total);
if failure_count > 0 {
warn!("{} buckets failed to heal", failure_count);
return Err(Error::other(format!("{failure_count} buckets failed to heal")));
}
Ok(())
}
}

View File

@@ -143,13 +143,19 @@ impl HealEvent {
HealOptions::default(),
HealPriority::High,
),
HealEvent::DiskStatusChange { endpoint, .. } => HealRequest::new(
HealType::Disk {
endpoint: endpoint.clone(),
},
HealOptions::default(),
HealPriority::High,
),
HealEvent::DiskStatusChange { endpoint, .. } => {
// Convert disk status change to erasure set heal
// Note: This requires access to storage to get bucket list, which is not available here
// The actual bucket list will need to be provided by the caller or retrieved differently
HealRequest::new(
HealType::ErasureSet {
buckets: vec![], // Empty bucket list - caller should populate this
set_disk_id: format!("{}_{}", endpoint.pool_idx, endpoint.set_idx),
},
HealOptions::default(),
HealPriority::High,
)
}
HealEvent::ECDecodeFailure {
bucket,
object,

View File

@@ -259,12 +259,13 @@ impl HealManager {
Ok(())
}
/// Start background task to auto scan local disks and enqueue disk heal requests
/// Start background task to auto scan local disks and enqueue erasure set heal requests
async fn start_auto_disk_scanner(&self) -> Result<()> {
let config = self.config.clone();
let heal_queue = self.heal_queue.clone();
let active_heals = self.active_heals.clone();
let cancel_token = self.cancel_token.clone();
let storage = self.storage.clone();
tokio::spawn(async move {
let mut interval = interval(config.read().await.heal_interval);
@@ -300,18 +301,28 @@ impl HealManager {
continue;
}
// Get bucket list for erasure set healing
let buckets = match storage.list_buckets().await {
Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::<Vec<String>>(),
Err(e) => {
error!("Failed to get bucket list for auto healing: {}", e);
continue;
}
};
// Create erasure set heal requests for each endpoint
for ep in endpoints {
// skip if already queued or healing
let mut skip = false;
{
let queue = heal_queue.lock().await;
if queue.iter().any(|req| matches!(&req.heal_type, crate::heal::task::HealType::Disk { endpoint } if endpoint == &ep)) {
if queue.iter().any(|req| matches!(&req.heal_type, crate::heal::task::HealType::ErasureSet { set_disk_id, .. } if set_disk_id == &format!("{}_{}", ep.pool_idx, ep.set_idx))) {
skip = true;
}
}
if !skip {
let active = active_heals.lock().await;
if active.values().any(|task| matches!(&task.heal_type, crate::heal::task::HealType::Disk { endpoint } if endpoint == &ep)) {
if active.values().any(|task| matches!(&task.heal_type, crate::heal::task::HealType::ErasureSet { set_disk_id, .. } if set_disk_id == &format!("{}_{}", ep.pool_idx, ep.set_idx))) {
skip = true;
}
}
@@ -320,15 +331,19 @@ impl HealManager {
continue;
}
// enqueue heal request for this disk
// enqueue erasure set heal request for this disk
let set_disk_id = format!("{}_{}", ep.pool_idx, ep.set_idx);
let req = crate::heal::task::HealRequest::new(
crate::heal::task::HealType::Disk { endpoint: ep.clone() },
crate::heal::task::HealType::ErasureSet {
buckets: buckets.clone(),
set_disk_id: set_disk_id.clone()
},
crate::heal::task::HealOptions::default(),
crate::heal::task::HealPriority::Normal,
);
let mut queue = heal_queue.lock().await;
queue.push_back(req);
info!("Enqueued auto disk heal for endpoint: {}", ep);
info!("Enqueued auto erasure set heal for endpoint: {} (set_disk_id: {})", ep, set_disk_id);
}
}
}
@@ -365,7 +380,8 @@ impl HealManager {
// start heal task
tokio::spawn(async move {
info!("Starting heal task: {}", task_id);
match task.execute().await {
let result = task.execute().await;
match result {
Ok(_) => {
info!("Heal task completed successfully: {}", task_id);
}

View File

@@ -13,11 +13,15 @@
// limitations under the License.
pub mod channel;
pub mod erasure_healer;
pub mod event;
pub mod manager;
pub mod progress;
pub mod resume;
pub mod storage;
pub mod task;
pub use erasure_healer::ErasureSetHealer;
pub use manager::HealManager;
pub use resume::{CheckpointManager, ResumeCheckpoint, ResumeManager, ResumeState, ResumeUtils};
pub use task::{HealOptions, HealPriority, HealRequest, HealTask, HealType};

View File

@@ -0,0 +1,696 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::{Error, Result};
use rustfs_ecstore::disk::{DiskAPI, DiskStore, BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use uuid::Uuid;
/// resume state file constants
const RESUME_STATE_FILE: &str = "ahm_resume_state.json";
const RESUME_PROGRESS_FILE: &str = "ahm_progress.json";
const RESUME_CHECKPOINT_FILE: &str = "ahm_checkpoint.json";
/// resume state
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResumeState {
/// task id
pub task_id: String,
/// task type
pub task_type: String,
/// start time
pub start_time: u64,
/// last update time
pub last_update: u64,
/// completed
pub completed: bool,
/// total objects
pub total_objects: u64,
/// processed objects
pub processed_objects: u64,
/// successful objects
pub successful_objects: u64,
/// failed objects
pub failed_objects: u64,
/// skipped objects
pub skipped_objects: u64,
/// current bucket
pub current_bucket: Option<String>,
/// current object
pub current_object: Option<String>,
/// completed buckets
pub completed_buckets: Vec<String>,
/// pending buckets
pub pending_buckets: Vec<String>,
/// error message
pub error_message: Option<String>,
/// retry count
pub retry_count: u32,
/// max retries
pub max_retries: u32,
}
impl ResumeState {
pub fn new(task_id: String, task_type: String, buckets: Vec<String>) -> Self {
Self {
task_id,
task_type,
start_time: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
last_update: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
completed: false,
total_objects: 0,
processed_objects: 0,
successful_objects: 0,
failed_objects: 0,
skipped_objects: 0,
current_bucket: None,
current_object: None,
completed_buckets: Vec::new(),
pending_buckets: buckets,
error_message: None,
retry_count: 0,
max_retries: 3,
}
}
pub fn update_progress(&mut self, processed: u64, successful: u64, failed: u64, skipped: u64) {
self.processed_objects = processed;
self.successful_objects = successful;
self.failed_objects = failed;
self.skipped_objects = skipped;
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn set_current_item(&mut self, bucket: Option<String>, object: Option<String>) {
self.current_bucket = bucket;
self.current_object = object;
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn complete_bucket(&mut self, bucket: &str) {
if !self.completed_buckets.contains(&bucket.to_string()) {
self.completed_buckets.push(bucket.to_string());
}
if let Some(pos) = self.pending_buckets.iter().position(|b| b == bucket) {
self.pending_buckets.remove(pos);
}
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn mark_completed(&mut self) {
self.completed = true;
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn set_error(&mut self, error: String) {
self.error_message = Some(error);
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn increment_retry(&mut self) {
self.retry_count += 1;
self.last_update = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn can_retry(&self) -> bool {
self.retry_count < self.max_retries
}
pub fn get_progress_percentage(&self) -> f64 {
if self.total_objects == 0 {
return 0.0;
}
(self.processed_objects as f64 / self.total_objects as f64) * 100.0
}
pub fn get_success_rate(&self) -> f64 {
let total = self.successful_objects + self.failed_objects;
if total == 0 {
return 0.0;
}
(self.successful_objects as f64 / total as f64) * 100.0
}
}
/// resume manager
pub struct ResumeManager {
disk: DiskStore,
state: Arc<RwLock<ResumeState>>,
}
impl ResumeManager {
/// create new resume manager
pub async fn new(disk: DiskStore, task_id: String, task_type: String, buckets: Vec<String>) -> Result<Self> {
let state = ResumeState::new(task_id, task_type, buckets);
let manager = Self {
disk,
state: Arc::new(RwLock::new(state)),
};
// save initial state
manager.save_state().await?;
Ok(manager)
}
/// load resume state from disk
pub async fn load_from_disk(disk: DiskStore, task_id: &str) -> Result<Self> {
let state_data = Self::read_state_file(&disk, task_id).await?;
let state: ResumeState = serde_json::from_slice(&state_data).map_err(|e| Error::TaskExecutionFailed {
message: format!("Failed to deserialize resume state: {e}"),
})?;
Ok(Self {
disk,
state: Arc::new(RwLock::new(state)),
})
}
/// check if resume state exists
pub async fn has_resume_state(disk: &DiskStore, task_id: &str) -> bool {
let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_STATE_FILE}"));
match disk.read_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap()).await {
Ok(data) => !data.is_empty(),
Err(_) => false,
}
}
/// get current state
pub async fn get_state(&self) -> ResumeState {
self.state.read().await.clone()
}
/// update progress
pub async fn update_progress(&self, processed: u64, successful: u64, failed: u64, skipped: u64) -> Result<()> {
let mut state = self.state.write().await;
state.update_progress(processed, successful, failed, skipped);
drop(state);
self.save_state().await
}
/// set current item
pub async fn set_current_item(&self, bucket: Option<String>, object: Option<String>) -> Result<()> {
let mut state = self.state.write().await;
state.set_current_item(bucket, object);
drop(state);
self.save_state().await
}
/// complete bucket
pub async fn complete_bucket(&self, bucket: &str) -> Result<()> {
let mut state = self.state.write().await;
state.complete_bucket(bucket);
drop(state);
self.save_state().await
}
/// mark task completed
pub async fn mark_completed(&self) -> Result<()> {
let mut state = self.state.write().await;
state.mark_completed();
drop(state);
self.save_state().await
}
/// set error message
pub async fn set_error(&self, error: String) -> Result<()> {
let mut state = self.state.write().await;
state.set_error(error);
drop(state);
self.save_state().await
}
/// increment retry count
pub async fn increment_retry(&self) -> Result<()> {
let mut state = self.state.write().await;
state.increment_retry();
drop(state);
self.save_state().await
}
/// cleanup resume state
pub async fn cleanup(&self) -> Result<()> {
let state = self.state.read().await;
let task_id = &state.task_id;
// delete state files
let state_file = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_STATE_FILE}"));
let progress_file = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_PROGRESS_FILE}"));
let checkpoint_file = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_CHECKPOINT_FILE}"));
// ignore delete errors, files may not exist
let _ = self
.disk
.delete(RUSTFS_META_BUCKET, state_file.to_str().unwrap(), Default::default())
.await;
let _ = self
.disk
.delete(RUSTFS_META_BUCKET, progress_file.to_str().unwrap(), Default::default())
.await;
let _ = self
.disk
.delete(RUSTFS_META_BUCKET, checkpoint_file.to_str().unwrap(), Default::default())
.await;
info!("Cleaned up resume state for task: {}", task_id);
Ok(())
}
/// save state to disk
async fn save_state(&self) -> Result<()> {
let state = self.state.read().await;
let state_data = serde_json::to_vec(&*state).map_err(|e| Error::TaskExecutionFailed {
message: format!("Failed to serialize resume state: {e}"),
})?;
let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{}_{}", state.task_id, RESUME_STATE_FILE));
self.disk
.write_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap(), state_data.into())
.await
.map_err(|e| Error::TaskExecutionFailed {
message: format!("Failed to save resume state: {e}"),
})?;
debug!("Saved resume state for task: {}", state.task_id);
Ok(())
}
/// read state file from disk
async fn read_state_file(disk: &DiskStore, task_id: &str) -> Result<Vec<u8>> {
let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_STATE_FILE}"));
disk.read_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap())
.await
.map(|bytes| bytes.to_vec())
.map_err(|e| Error::TaskExecutionFailed {
message: format!("Failed to read resume state file: {e}"),
})
}
}
/// resume checkpoint
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResumeCheckpoint {
/// task id
pub task_id: String,
/// checkpoint time
pub checkpoint_time: u64,
/// current bucket index
pub current_bucket_index: usize,
/// current object index
pub current_object_index: usize,
/// processed objects
pub processed_objects: Vec<String>,
/// failed objects
pub failed_objects: Vec<String>,
/// skipped objects
pub skipped_objects: Vec<String>,
}
impl ResumeCheckpoint {
pub fn new(task_id: String) -> Self {
Self {
task_id,
checkpoint_time: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
current_bucket_index: 0,
current_object_index: 0,
processed_objects: Vec::new(),
failed_objects: Vec::new(),
skipped_objects: Vec::new(),
}
}
pub fn update_position(&mut self, bucket_index: usize, object_index: usize) {
self.current_bucket_index = bucket_index;
self.current_object_index = object_index;
self.checkpoint_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
}
pub fn add_processed_object(&mut self, object: String) {
if !self.processed_objects.contains(&object) {
self.processed_objects.push(object);
}
}
pub fn add_failed_object(&mut self, object: String) {
if !self.failed_objects.contains(&object) {
self.failed_objects.push(object);
}
}
pub fn add_skipped_object(&mut self, object: String) {
if !self.skipped_objects.contains(&object) {
self.skipped_objects.push(object);
}
}
}
/// resume checkpoint manager
pub struct CheckpointManager {
disk: DiskStore,
checkpoint: Arc<RwLock<ResumeCheckpoint>>,
}
impl CheckpointManager {
/// create new checkpoint manager
pub async fn new(disk: DiskStore, task_id: String) -> Result<Self> {
let checkpoint = ResumeCheckpoint::new(task_id);
let manager = Self {
disk,
checkpoint: Arc::new(RwLock::new(checkpoint)),
};
// save initial checkpoint
manager.save_checkpoint().await?;
Ok(manager)
}
/// load checkpoint from disk
pub async fn load_from_disk(disk: DiskStore, task_id: &str) -> Result<Self> {
let checkpoint_data = Self::read_checkpoint_file(&disk, task_id).await?;
let checkpoint: ResumeCheckpoint = serde_json::from_slice(&checkpoint_data).map_err(|e| Error::TaskExecutionFailed {
message: format!("Failed to deserialize checkpoint: {e}"),
})?;
Ok(Self {
disk,
checkpoint: Arc::new(RwLock::new(checkpoint)),
})
}
/// check if checkpoint exists
pub async fn has_checkpoint(disk: &DiskStore, task_id: &str) -> bool {
let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_CHECKPOINT_FILE}"));
match disk.read_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap()).await {
Ok(data) => !data.is_empty(),
Err(_) => false,
}
}
/// get current checkpoint
pub async fn get_checkpoint(&self) -> ResumeCheckpoint {
self.checkpoint.read().await.clone()
}
/// update position
pub async fn update_position(&self, bucket_index: usize, object_index: usize) -> Result<()> {
let mut checkpoint = self.checkpoint.write().await;
checkpoint.update_position(bucket_index, object_index);
drop(checkpoint);
self.save_checkpoint().await
}
/// add processed object
pub async fn add_processed_object(&self, object: String) -> Result<()> {
let mut checkpoint = self.checkpoint.write().await;
checkpoint.add_processed_object(object);
drop(checkpoint);
self.save_checkpoint().await
}
/// add failed object
pub async fn add_failed_object(&self, object: String) -> Result<()> {
let mut checkpoint = self.checkpoint.write().await;
checkpoint.add_failed_object(object);
drop(checkpoint);
self.save_checkpoint().await
}
/// add skipped object
pub async fn add_skipped_object(&self, object: String) -> Result<()> {
let mut checkpoint = self.checkpoint.write().await;
checkpoint.add_skipped_object(object);
drop(checkpoint);
self.save_checkpoint().await
}
/// cleanup checkpoint
pub async fn cleanup(&self) -> Result<()> {
let checkpoint = self.checkpoint.read().await;
let task_id = &checkpoint.task_id;
let checkpoint_file = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_CHECKPOINT_FILE}"));
let _ = self
.disk
.delete(RUSTFS_META_BUCKET, checkpoint_file.to_str().unwrap(), Default::default())
.await;
info!("Cleaned up checkpoint for task: {}", task_id);
Ok(())
}
/// save checkpoint to disk
async fn save_checkpoint(&self) -> Result<()> {
let checkpoint = self.checkpoint.read().await;
let checkpoint_data = serde_json::to_vec(&*checkpoint).map_err(|e| Error::TaskExecutionFailed {
message: format!("Failed to serialize checkpoint: {e}"),
})?;
let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{}_{}", checkpoint.task_id, RESUME_CHECKPOINT_FILE));
self.disk
.write_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap(), checkpoint_data.into())
.await
.map_err(|e| Error::TaskExecutionFailed {
message: format!("Failed to save checkpoint: {e}"),
})?;
debug!("Saved checkpoint for task: {}", checkpoint.task_id);
Ok(())
}
/// read checkpoint file from disk
async fn read_checkpoint_file(disk: &DiskStore, task_id: &str) -> Result<Vec<u8>> {
let file_path = Path::new(BUCKET_META_PREFIX).join(format!("{task_id}_{RESUME_CHECKPOINT_FILE}"));
disk.read_all(RUSTFS_META_BUCKET, file_path.to_str().unwrap())
.await
.map(|bytes| bytes.to_vec())
.map_err(|e| Error::TaskExecutionFailed {
message: format!("Failed to read checkpoint file: {e}"),
})
}
}
/// resume utils
pub struct ResumeUtils;
impl ResumeUtils {
/// generate unique task id
pub fn generate_task_id() -> String {
Uuid::new_v4().to_string()
}
/// check if task can be resumed
pub async fn can_resume_task(disk: &DiskStore, task_id: &str) -> bool {
ResumeManager::has_resume_state(disk, task_id).await
}
/// get all resumable task ids
pub async fn get_resumable_tasks(disk: &DiskStore) -> Result<Vec<String>> {
// List all files in the buckets metadata directory
let entries = match disk.list_dir("", RUSTFS_META_BUCKET, BUCKET_META_PREFIX, -1).await {
Ok(entries) => entries,
Err(e) => {
debug!("Failed to list resume state files: {}", e);
return Ok(Vec::new());
}
};
let mut task_ids = Vec::new();
// Filter files that end with ahm_resume_state.json and extract task IDs
for entry in entries {
if entry.ends_with(&format!("_{RESUME_STATE_FILE}")) {
// Extract task ID from filename: {task_id}_ahm_resume_state.json
if let Some(task_id) = entry.strip_suffix(&format!("_{RESUME_STATE_FILE}")) {
if !task_id.is_empty() {
task_ids.push(task_id.to_string());
}
}
}
}
debug!("Found {} resumable tasks: {:?}", task_ids.len(), task_ids);
Ok(task_ids)
}
/// cleanup expired resume states
pub async fn cleanup_expired_states(disk: &DiskStore, max_age_hours: u64) -> Result<()> {
let task_ids = Self::get_resumable_tasks(disk).await?;
let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
for task_id in task_ids {
if let Ok(resume_manager) = ResumeManager::load_from_disk(disk.clone(), &task_id).await {
let state = resume_manager.get_state().await;
let age_hours = (current_time - state.last_update) / 3600;
if age_hours > max_age_hours {
info!("Cleaning up expired resume state for task: {} (age: {} hours)", task_id, age_hours);
if let Err(e) = resume_manager.cleanup().await {
warn!("Failed to cleanup expired resume state for task {}: {}", task_id, e);
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_resume_state_creation() {
let task_id = ResumeUtils::generate_task_id();
let buckets = vec!["bucket1".to_string(), "bucket2".to_string()];
let state = ResumeState::new(task_id.clone(), "erasure_set".to_string(), buckets);
assert_eq!(state.task_id, task_id);
assert_eq!(state.task_type, "erasure_set");
assert!(!state.completed);
assert_eq!(state.processed_objects, 0);
assert_eq!(state.pending_buckets.len(), 2);
}
#[tokio::test]
async fn test_resume_state_progress() {
let task_id = ResumeUtils::generate_task_id();
let buckets = vec!["bucket1".to_string()];
let mut state = ResumeState::new(task_id, "erasure_set".to_string(), buckets);
state.update_progress(10, 8, 1, 1);
assert_eq!(state.processed_objects, 10);
assert_eq!(state.successful_objects, 8);
assert_eq!(state.failed_objects, 1);
assert_eq!(state.skipped_objects, 1);
let progress = state.get_progress_percentage();
assert_eq!(progress, 0.0); // total_objects is 0
state.total_objects = 100;
let progress = state.get_progress_percentage();
assert_eq!(progress, 10.0);
}
#[tokio::test]
async fn test_resume_state_bucket_completion() {
let task_id = ResumeUtils::generate_task_id();
let buckets = vec!["bucket1".to_string(), "bucket2".to_string()];
let mut state = ResumeState::new(task_id, "erasure_set".to_string(), buckets);
assert_eq!(state.pending_buckets.len(), 2);
assert_eq!(state.completed_buckets.len(), 0);
state.complete_bucket("bucket1");
assert_eq!(state.pending_buckets.len(), 1);
assert_eq!(state.completed_buckets.len(), 1);
assert!(state.completed_buckets.contains(&"bucket1".to_string()));
}
#[tokio::test]
async fn test_resume_utils() {
let task_id1 = ResumeUtils::generate_task_id();
let task_id2 = ResumeUtils::generate_task_id();
assert_ne!(task_id1, task_id2);
assert_eq!(task_id1.len(), 36); // UUID length
assert_eq!(task_id2.len(), 36);
}
#[tokio::test]
async fn test_get_resumable_tasks_integration() {
use rustfs_ecstore::disk::{endpoint::Endpoint, new_disk, DiskOption};
use tempfile::TempDir;
// Create a temporary directory for testing
let temp_dir = TempDir::new().unwrap();
let disk_path = temp_dir.path().join("test_disk");
std::fs::create_dir_all(&disk_path).unwrap();
// Create a local disk for testing
let endpoint = Endpoint::try_from(disk_path.to_string_lossy().as_ref()).unwrap();
let disk_option = DiskOption {
cleanup: false,
health_check: false,
};
let disk = new_disk(&endpoint, &disk_option).await.unwrap();
// Create necessary directories first (ignore if already exist)
let _ = disk.make_volume(RUSTFS_META_BUCKET).await;
let _ = disk.make_volume(&format!("{RUSTFS_META_BUCKET}/{BUCKET_META_PREFIX}")).await;
// Create some test resume state files
let task_ids = vec![
"test-task-1".to_string(),
"test-task-2".to_string(),
"test-task-3".to_string(),
];
// Save resume state files for each task
for task_id in &task_ids {
let state = ResumeState::new(
task_id.clone(),
"erasure_set".to_string(),
vec!["bucket1".to_string(), "bucket2".to_string()],
);
let state_data = serde_json::to_vec(&state).unwrap();
let file_path = format!("{BUCKET_META_PREFIX}/{task_id}_{RESUME_STATE_FILE}");
disk.write_all(RUSTFS_META_BUCKET, &file_path, state_data.into())
.await
.unwrap();
}
// Also create some non-resume state files to test filtering
let non_resume_files = vec![
"other_file.txt",
"task4_ahm_checkpoint.json",
"task5_ahm_progress.json",
"_ahm_resume_state.json", // Invalid: empty task ID
];
for file_name in non_resume_files {
let file_path = format!("{BUCKET_META_PREFIX}/{file_name}");
disk.write_all(RUSTFS_META_BUCKET, &file_path, b"test data".to_vec().into())
.await
.unwrap();
}
// Now call get_resumable_tasks to see if it finds the correct files
let found_task_ids = ResumeUtils::get_resumable_tasks(&disk).await.unwrap();
// Verify that only the valid resume state files are found
assert_eq!(found_task_ids.len(), 3);
for task_id in &task_ids {
assert!(found_task_ids.contains(task_id), "Task ID {task_id} not found");
}
// Verify that invalid files are not included
assert!(!found_task_ids.contains(&"".to_string()));
assert!(!found_task_ids.contains(&"task4".to_string()));
assert!(!found_task_ids.contains(&"task5".to_string()));
// Clean up
temp_dir.close().unwrap();
}
}

View File

@@ -15,7 +15,7 @@
use crate::error::{Error, Result};
use async_trait::async_trait;
use rustfs_ecstore::{
disk::endpoint::Endpoint,
disk::{endpoint::Endpoint, DiskStore},
heal::heal_commands::{HealOpts, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN},
store::ECStore,
store_api::{BucketInfo, ObjectIO, StorageAPI},
@@ -109,6 +109,9 @@ pub trait HealStorageAPI: Send + Sync {
/// List objects for healing
async fn list_objects_for_heal(&self, bucket: &str, prefix: &str) -> Result<Vec<String>>;
/// Get disk for resume functionality
async fn get_disk_for_resume(&self, set_disk_id: &str) -> Result<DiskStore>;
}
/// ECStore Heal storage layer implementation
@@ -460,4 +463,44 @@ impl HealStorageAPI for ECStoreHealStorage {
}
}
}
async fn get_disk_for_resume(&self, set_disk_id: &str) -> Result<DiskStore> {
debug!("Getting disk for resume: {}", set_disk_id);
// Parse set_disk_id to extract pool and set indices
// Format: "pool_{pool_idx}_set_{set_idx}"
let parts: Vec<&str> = set_disk_id.split('_').collect();
if parts.len() != 4 || parts[0] != "pool" || parts[2] != "set" {
return Err(Error::TaskExecutionFailed {
message: format!("Invalid set_disk_id format: {set_disk_id}"),
});
}
let pool_idx: usize = parts[1].parse().map_err(|_| Error::TaskExecutionFailed {
message: format!("Invalid pool index in set_disk_id: {set_disk_id}"),
})?;
let set_idx: usize = parts[3].parse().map_err(|_| Error::TaskExecutionFailed {
message: format!("Invalid set index in set_disk_id: {set_disk_id}"),
})?;
// Get the first available disk from the set
let disks = self
.ecstore
.get_disks(pool_idx, set_idx)
.await
.map_err(|e| Error::TaskExecutionFailed {
message: format!("Failed to get disks for pool {pool_idx} set {set_idx}: {e}"),
})?;
// Find the first available disk
if let Some(disk_store) = disks.into_iter().flatten().next() {
info!("Found disk for resume: {:?}", disk_store);
return Ok(disk_store);
}
Err(Error::TaskExecutionFailed {
message: format!("No available disk found for set_disk_id: {set_disk_id}"),
})
}
}

View File

@@ -13,20 +13,10 @@
// limitations under the License.
use crate::error::{Error, Result};
use crate::heal::{progress::HealProgress, storage::HealStorageAPI};
use rustfs_ecstore::config::RUSTFS_CONFIG_PREFIX;
use rustfs_ecstore::disk::endpoint::Endpoint;
use rustfs_ecstore::disk::error::DiskError;
use rustfs_ecstore::disk::{DiskAPI, DiskInfoOptions, BUCKET_META_PREFIX, RUSTFS_META_BUCKET};
use crate::heal::{erasure_healer::ErasureSetHealer, progress::HealProgress, storage::HealStorageAPI};
use rustfs_ecstore::heal::heal_commands::HealScanMode;
use rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN;
use rustfs_ecstore::heal::heal_commands::{init_healing_tracker, load_healing_tracker, HealScanMode};
use rustfs_ecstore::new_object_layer_fn;
use rustfs_ecstore::store::get_disk_via_endpoint;
use rustfs_ecstore::store_api::BucketInfo;
use rustfs_utils::path::path_join;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
@@ -44,8 +34,8 @@ pub enum HealType {
},
/// Bucket heal
Bucket { bucket: String },
/// Disk heal
Disk { endpoint: Endpoint },
/// Erasure Set heal (includes disk format repair)
ErasureSet { buckets: Vec<String>, set_disk_id: String },
/// Metadata heal
Metadata { bucket: String, object: String },
/// MRF heal
@@ -175,10 +165,6 @@ impl HealRequest {
Self::new(HealType::Bucket { bucket }, HealOptions::default(), HealPriority::Normal)
}
pub fn disk(endpoint: Endpoint) -> Self {
Self::new(HealType::Disk { endpoint }, HealOptions::default(), HealPriority::High)
}
pub fn metadata(bucket: String, object: String) -> Self {
Self::new(HealType::Metadata { bucket, object }, HealOptions::default(), HealPriority::High)
}
@@ -256,7 +242,7 @@ impl HealTask {
version_id,
} => self.heal_object(bucket, object, version_id.as_deref()).await,
HealType::Bucket { bucket } => self.heal_bucket(bucket).await,
HealType::Disk { endpoint } => self.heal_disk(endpoint).await,
HealType::Metadata { bucket, object } => self.heal_metadata(bucket, object).await,
HealType::MRF { meta_path } => self.heal_mrf(meta_path).await,
HealType::ECDecode {
@@ -264,6 +250,7 @@ impl HealTask {
object,
version_id,
} => self.heal_ec_decode(bucket, object, version_id.as_deref()).await,
HealType::ErasureSet { buckets, set_disk_id } => self.heal_erasure_set(buckets.clone(), set_disk_id.clone()).await,
};
// update completed time and status
@@ -524,62 +511,6 @@ impl HealTask {
}
}
async fn heal_disk(&self, endpoint: &Endpoint) -> Result<()> {
info!("Healing disk: {:?}", endpoint);
// update progress
{
let mut progress = self.progress.write().await;
progress.set_current_object(Some(format!("disk: {endpoint:?}")));
progress.update_progress(0, 3, 0, 0);
}
// Step 1: Perform disk format heal using ecstore
info!("Step 1: Performing disk format heal using ecstore");
match self.storage.heal_format(self.options.dry_run).await {
Ok((result, error)) => {
if let Some(e) = error {
error!("Disk heal failed: {:?} - {}", endpoint, e);
{
let mut progress = self.progress.write().await;
progress.update_progress(3, 3, 0, 0);
}
return Err(Error::TaskExecutionFailed {
message: format!("Failed to heal disk {endpoint:?}: {e}"),
});
}
info!("Disk heal completed successfully: {:?} ({} drives)", endpoint, result.after.drives.len());
{
let mut progress = self.progress.write().await;
progress.update_progress(2, 3, 0, 0);
}
// Step 2: Synchronize data/buckets on the fresh disk
info!("Step 2: Healing buckets on fresh disk");
self.heal_fresh_disk(endpoint).await?;
{
let mut progress = self.progress.write().await;
progress.update_progress(3, 3, 0, 0);
}
Ok(())
}
Err(e) => {
error!("Disk heal failed: {:?} - {}", endpoint, e);
{
let mut progress = self.progress.write().await;
progress.update_progress(3, 3, 0, 0);
}
Err(Error::TaskExecutionFailed {
message: format!("Failed to heal disk {endpoint:?}: {e}"),
})
}
}
}
async fn heal_metadata(&self, bucket: &str, object: &str) -> Result<()> {
info!("Healing metadata: {}/{}", bucket, object);
@@ -807,79 +738,93 @@ impl HealTask {
}
}
async fn heal_fresh_disk(&self, endpoint: &Endpoint) -> Result<()> {
// Locate disk via endpoint
let disk = get_disk_via_endpoint(endpoint)
.await
.ok_or_else(|| Error::other(format!("Disk not found for endpoint: {endpoint}")))?;
async fn heal_erasure_set(&self, buckets: Vec<String>, set_disk_id: String) -> Result<()> {
info!("Healing Erasure Set: {} ({} buckets)", set_disk_id, buckets.len());
// Skip if drive is root or other fatal errors
if let Err(e) = disk.disk_info(&DiskInfoOptions::default()).await {
match e {
DiskError::DriveIsRoot => return Ok(()),
DiskError::UnformattedDisk => { /* continue healing */ }
_ => return Err(Error::other(e)),
// update progress
{
let mut progress = self.progress.write().await;
progress.set_current_object(Some(format!("erasure_set: {} ({} buckets)", set_disk_id, buckets.len())));
progress.update_progress(0, 4, 0, 0);
}
// Step 1: Perform disk format heal using ecstore
info!("Step 1: Performing disk format heal using ecstore");
match self.storage.heal_format(self.options.dry_run).await {
Ok((result, error)) => {
if let Some(e) = error {
error!("Disk format heal failed: {} - {}", set_disk_id, e);
{
let mut progress = self.progress.write().await;
progress.update_progress(4, 4, 0, 0);
}
return Err(Error::TaskExecutionFailed {
message: format!("Failed to heal disk format for {set_disk_id}: {e}"),
});
}
info!(
"Disk format heal completed successfully: {} ({} drives)",
set_disk_id,
result.after.drives.len()
);
}
Err(e) => {
error!("Disk format heal failed: {} - {}", set_disk_id, e);
{
let mut progress = self.progress.write().await;
progress.update_progress(4, 4, 0, 0);
}
return Err(Error::TaskExecutionFailed {
message: format!("Failed to heal disk format for {set_disk_id}: {e}"),
});
}
}
// Load or init HealingTracker
let mut tracker = match load_healing_tracker(&Some(disk.clone())).await {
Ok(t) => t,
Err(err) => match err {
DiskError::FileNotFound => init_healing_tracker(disk.clone(), &Uuid::new_v4().to_string())
.await
.map_err(Error::other)?,
_ => return Err(Error::other(err)),
},
};
{
let mut progress = self.progress.write().await;
progress.update_progress(1, 4, 0, 0);
}
// Build bucket list
let mut buckets = self.storage.list_buckets().await.map_err(Error::other)?;
buckets.push(BucketInfo {
name: path_join(&[PathBuf::from(RUSTFS_META_BUCKET), PathBuf::from(RUSTFS_CONFIG_PREFIX)])
.to_string_lossy()
.to_string(),
..Default::default()
});
buckets.push(BucketInfo {
name: path_join(&[PathBuf::from(RUSTFS_META_BUCKET), PathBuf::from(BUCKET_META_PREFIX)])
.to_string_lossy()
.to_string(),
..Default::default()
});
// Step 2: Get disk for resume functionality
info!("Step 2: Getting disk for resume functionality");
let disk = self.storage.get_disk_for_resume(&set_disk_id).await?;
// Sort: system buckets first, others by creation time desc
buckets.sort_by(|a, b| {
let a_sys = a.name.starts_with(RUSTFS_META_BUCKET);
let b_sys = b.name.starts_with(RUSTFS_META_BUCKET);
match (a_sys, b_sys) {
(true, false) => Ordering::Less,
(false, true) => Ordering::Greater,
_ => b.created.cmp(&a.created),
{
let mut progress = self.progress.write().await;
progress.update_progress(2, 4, 0, 0);
}
// Step 3: Create erasure set healer with resume support
info!("Step 3: Creating erasure set healer with resume support");
let erasure_healer = ErasureSetHealer::new(self.storage.clone(), self.progress.clone(), self.cancel_token.clone(), disk);
{
let mut progress = self.progress.write().await;
progress.update_progress(3, 4, 0, 0);
}
// Step 4: Execute erasure set heal with resume
info!("Step 4: Executing erasure set heal with resume");
let result = erasure_healer.heal_erasure_set(&buckets, &set_disk_id).await;
{
let mut progress = self.progress.write().await;
progress.update_progress(4, 4, 0, 0);
}
match result {
Ok(_) => {
info!("Erasure set heal completed successfully: {} ({} buckets)", set_disk_id, buckets.len());
Ok(())
}
});
// Update tracker queue and persist
tracker.set_queue_buckets(&buckets).await;
tracker.save().await.map_err(Error::other)?;
// Prepare bucket names list
let bucket_names: Vec<String> = buckets.iter().map(|b| b.name.clone()).collect();
// Run heal_erasure_set using underlying SetDisk
let (pool_idx, set_idx) = (endpoint.pool_idx as usize, endpoint.set_idx as usize);
let Some(store) = new_object_layer_fn() else {
return Err(Error::other("errServerNotInitialized"));
};
let set_disk = store.pools[pool_idx].disk_set[set_idx].clone();
let tracker_arc = Arc::new(RwLock::new(tracker));
set_disk
.heal_erasure_set(&bucket_names, tracker_arc)
.await
.map_err(Error::other)?;
Ok(())
Err(e) => {
error!("Erasure set heal failed: {} - {}", set_disk_id, e);
Err(Error::TaskExecutionFailed {
message: format!("Failed to heal erasure set {set_disk_id}: {e}"),
})
}
}
}
}

View File

@@ -22,7 +22,7 @@ use ecstore::{
disk::{DiskAPI, DiskStore, WalkDirOptions},
set_disk::SetDisks,
};
use rustfs_ecstore as ecstore;
use rustfs_ecstore::{self as ecstore, StorageAPI};
use rustfs_filemeta::MetacacheReader;
use tokio::sync::{Mutex, RwLock};
use tokio_util::sync::CancellationToken;
@@ -502,18 +502,43 @@ impl Scanner {
metrics.free_space = disk_info.free;
metrics.is_online = disk.is_online().await;
// 检查磁盘状态如果离线则提交heal任务
// check disk status, if offline, submit erasure set heal task
if !metrics.is_online {
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
let req = HealRequest::disk(disk.endpoint().clone());
// Get bucket list for erasure set healing
let buckets = match rustfs_ecstore::new_object_layer_fn() {
Some(ecstore) => {
match ecstore.list_bucket(&ecstore::store_api::BucketOptions::default()).await {
Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::<Vec<String>>(),
Err(e) => {
error!("Failed to get bucket list for disk healing: {}", e);
return Err(Error::Storage(e.into()));
}
}
}
None => {
error!("No ECStore available for getting bucket list");
return Err(Error::Storage(ecstore::error::StorageError::other("No ECStore available")));
}
};
let set_disk_id = format!("{}_{}", disk.endpoint().pool_idx, disk.endpoint().set_idx);
let req = HealRequest::new(
crate::heal::task::HealType::ErasureSet {
buckets,
set_disk_id,
},
crate::heal::task::HealOptions::default(),
crate::heal::task::HealPriority::High,
);
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("disk offline, submit heal task: {} {}", task_id, disk_path);
warn!("disk offline, submit erasure set heal task: {} {}", task_id, disk_path);
}
Err(e) => {
error!("disk offline, submit heal task failed: {} {}", disk_path, e);
error!("disk offline, submit erasure set heal task failed: {} {}", disk_path, e);
}
}
}
@@ -540,24 +565,42 @@ impl Scanner {
Err(e) => {
error!("Failed to list volumes on disk {}: {}", disk_path, e);
// 磁盘访问失败提交磁盘heal任务
// disk access failed, submit erasure set heal task
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
use crate::heal::{HealPriority, HealRequest};
// Get bucket list for erasure set healing
let buckets = match rustfs_ecstore::new_object_layer_fn() {
Some(ecstore) => {
match ecstore.list_bucket(&ecstore::store_api::BucketOptions::default()).await {
Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::<Vec<String>>(),
Err(e) => {
error!("Failed to get bucket list for disk healing: {}", e);
return Err(Error::Storage(e.into()));
}
}
}
None => {
error!("No ECStore available for getting bucket list");
return Err(Error::Storage(ecstore::error::StorageError::other("No ECStore available")));
}
};
let set_disk_id = format!("{}_{}", disk.endpoint().pool_idx, disk.endpoint().set_idx);
let req = HealRequest::new(
crate::heal::HealType::Disk {
endpoint: disk.endpoint().clone(),
crate::heal::task::HealType::ErasureSet {
buckets,
set_disk_id,
},
crate::heal::HealOptions::default(),
HealPriority::Urgent,
crate::heal::task::HealOptions::default(),
crate::heal::task::HealPriority::Urgent,
);
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("disk access failed, submit heal task: {} {}", task_id, disk_path);
warn!("disk access failed, submit erasure set heal task: {} {}", task_id, disk_path);
}
Err(heal_err) => {
error!("disk access failed, submit heal task failed: {} {}", disk_path, heal_err);
error!("disk access failed, submit erasure set heal task failed: {} {}", disk_path, heal_err);
}
}
}

View File

@@ -3468,7 +3468,7 @@ impl SetDisks {
return Err(Error::other(format!("unable to get disk information before healing it: {err}")));
}
};
let num_cores = num_cpus::get(); // 使用 num_cpus crate 获取核心数
let num_cores = num_cpus::get(); // use num_cpus crate to get the number of cores
let mut num_healers: usize;
if info.nr_requests as usize > num_cores {
@@ -3577,44 +3577,6 @@ impl SetDisks {
continue;
}
// let vc: VersioningConfiguration;
// let lc: BucketLifecycleConfiguration;
// let lr: ObjectLockConfiguration;
// let rcfg: ReplicationConfiguration;
// if !is_rustfs_meta_bucket_name(bucket) {
// vc = match get_versioning_config(bucket).await {
// Ok((r, _)) => r,
// Err(err) => {
// ret_err = Some(err);
// info!("get versioning config failed, err: {}", err.to_string());
// continue;
// }
// };
// lc = match get_lifecycle_config(bucket).await {
// Ok((r, _)) => r,
// Err(err) => {
// ret_err = Some(err);
// info!("get lifecycle config failed, err: {}", err.to_string());
// continue;
// }
// };
// lr = match get_object_lock_config(bucket).await {
// Ok((r, _)) => r,
// Err(err) => {
// ret_err = Some(err);
// info!("get object lock config failed, err: {}", err.to_string());
// continue;
// }
// };
// rcfg = match get_replication_config(bucket).await {
// Ok((r, _)) => r,
// Err(err) => {
// ret_err = Some(err);
// info!("get replication config failed, err: {}", err.to_string());
// continue;
// }
// };
// }
let (mut disks, _, healing) = self.get_online_disk_with_healing_and_info(true).await?;
if disks.len() == healing {
info!("all drives are in healing state, aborting..");
@@ -3645,17 +3607,6 @@ impl SetDisks {
let fallback_disks = disks[expected_disk..].to_vec();
disks = disks[..expected_disk].to_vec();
//todo
// let filter_life_cycle = |bucket: &str, object: &str, fi: FileInfo| {
// if lc.rules.is_empty() {
// return false;
// }
// // todo: versioning
// let versioned = false;
// let obj_info = fi.to_object_info(bucket, object, versioned);
//
// };
let result_tx_send = result_tx.clone();
let bg_seq_send = bg_seq.clone();
let send = Box::new(move |result: HealEntryResult| {