feat: implement heal subsystem for automatic data repair

- Add heal module with core types (HealType, HealRequest, HealTask)
- Implement HealManager for task scheduling and execution
- Add HealStorageAPI trait and ECStoreHealStorage implementation
- Integrate heal capabilities into scanner for automatic repair
- Support multiple heal types: object, bucket, disk, metadata, MRF, EC decode
- Add progress tracking and event system for heal operations
- Merge heal and scanner error types for unified error handling
- Include comprehensive logging and metrics for heal operations

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2025-07-10 18:11:42 +08:00
parent 0aff736efd
commit 4fb3d187d0
10 changed files with 1774 additions and 12 deletions

View File

@@ -14,8 +14,10 @@
use thiserror::Error;
/// RustFS AHM/Heal/Scanner 统一错误类型
#[derive(Debug, Error)]
pub enum Error {
// 通用
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
@@ -25,21 +27,65 @@ pub enum Error {
#[error("Configuration error: {0}")]
Config(String),
#[error("Heal configuration error: {message}")]
ConfigurationError { message: String },
#[error("Other error: {0}")]
Other(String),
#[error(transparent)]
Anyhow(#[from] anyhow::Error),
// Scanner相关
#[error("Scanner error: {0}")]
Scanner(String),
#[error("Metrics error: {0}")]
Metrics(String),
#[error(transparent)]
Other(#[from] anyhow::Error),
// Heal相关
#[error("Heal task not found: {task_id}")]
TaskNotFound { task_id: String },
#[error("Heal task already exists: {task_id}")]
TaskAlreadyExists { task_id: String },
#[error("Heal manager is not running")]
ManagerNotRunning,
#[error("Heal task execution failed: {message}")]
TaskExecutionFailed { message: String },
#[error("Invalid heal type: {heal_type}")]
InvalidHealType { heal_type: String },
#[error("Heal task cancelled")]
TaskCancelled,
#[error("Heal task timeout")]
TaskTimeout,
#[error("Heal event processing failed: {message}")]
EventProcessingFailed { message: String },
#[error("Heal progress tracking failed: {message}")]
ProgressTrackingFailed { message: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
// Implement conversion from ahm::Error to std::io::Error for use in main.rs
impl Error {
pub fn other<E>(error: E) -> Self
where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
Error::Other(error.into().to_string())
}
}
// 可选:实现与 std::io::Error 的互转
impl From<Error> for std::io::Error {
fn from(err: Error) -> Self {
std::io::Error::other(err)
}
}
}

View File

@@ -0,0 +1,325 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::heal::task::{HealOptions, HealPriority, HealRequest, HealType};
use rustfs_ecstore::disk::endpoint::Endpoint;
use serde::{Deserialize, Serialize};
use std::time::SystemTime;
/// Corruption type
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CorruptionType {
/// Data corruption
DataCorruption,
/// Metadata corruption
MetadataCorruption,
/// Partial corruption
PartialCorruption,
/// Complete corruption
CompleteCorruption,
}
/// Severity level
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum Severity {
/// Low severity
Low = 0,
/// Medium severity
Medium = 1,
/// High severity
High = 2,
/// Critical severity
Critical = 3,
}
/// Heal event
#[derive(Debug, Clone)]
pub enum HealEvent {
/// Object corruption event
ObjectCorruption {
bucket: String,
object: String,
version_id: Option<String>,
corruption_type: CorruptionType,
severity: Severity,
},
/// Object missing event
ObjectMissing {
bucket: String,
object: String,
version_id: Option<String>,
expected_locations: Vec<usize>,
available_locations: Vec<usize>,
},
/// Metadata corruption event
MetadataCorruption {
bucket: String,
object: String,
corruption_type: CorruptionType,
},
/// Disk status change event
DiskStatusChange {
endpoint: Endpoint,
old_status: String,
new_status: String,
},
/// EC decode failure event
ECDecodeFailure {
bucket: String,
object: String,
version_id: Option<String>,
missing_shards: Vec<usize>,
available_shards: Vec<usize>,
},
/// Checksum mismatch event
ChecksumMismatch {
bucket: String,
object: String,
version_id: Option<String>,
expected_checksum: String,
actual_checksum: String,
},
/// Bucket metadata corruption event
BucketMetadataCorruption {
bucket: String,
corruption_type: CorruptionType,
},
/// MRF metadata corruption event
MRFMetadataCorruption {
meta_path: String,
corruption_type: CorruptionType,
},
}
impl HealEvent {
/// Convert HealEvent to HealRequest
pub fn to_heal_request(&self) -> HealRequest {
match self {
HealEvent::ObjectCorruption { bucket, object, version_id, severity, .. } => {
HealRequest::new(
HealType::Object {
bucket: bucket.clone(),
object: object.clone(),
version_id: version_id.clone(),
},
HealOptions::default(),
Self::severity_to_priority(severity),
)
}
HealEvent::ObjectMissing { bucket, object, version_id, .. } => {
HealRequest::new(
HealType::Object {
bucket: bucket.clone(),
object: object.clone(),
version_id: version_id.clone(),
},
HealOptions::default(),
HealPriority::High,
)
}
HealEvent::MetadataCorruption { bucket, object, .. } => {
HealRequest::new(
HealType::Metadata {
bucket: bucket.clone(),
object: object.clone(),
},
HealOptions::default(),
HealPriority::High,
)
}
HealEvent::DiskStatusChange { endpoint, .. } => {
HealRequest::new(
HealType::Disk {
endpoint: endpoint.clone(),
},
HealOptions::default(),
HealPriority::High,
)
}
HealEvent::ECDecodeFailure { bucket, object, version_id, .. } => {
HealRequest::new(
HealType::ECDecode {
bucket: bucket.clone(),
object: object.clone(),
version_id: version_id.clone(),
},
HealOptions::default(),
HealPriority::Urgent,
)
}
HealEvent::ChecksumMismatch { bucket, object, version_id, .. } => {
HealRequest::new(
HealType::Object {
bucket: bucket.clone(),
object: object.clone(),
version_id: version_id.clone(),
},
HealOptions::default(),
HealPriority::High,
)
}
HealEvent::BucketMetadataCorruption { bucket, .. } => {
HealRequest::new(
HealType::Bucket {
bucket: bucket.clone(),
},
HealOptions::default(),
HealPriority::High,
)
}
HealEvent::MRFMetadataCorruption { meta_path, .. } => {
HealRequest::new(
HealType::MRF {
meta_path: meta_path.clone(),
},
HealOptions::default(),
HealPriority::High,
)
}
}
}
/// Convert severity to priority
fn severity_to_priority(severity: &Severity) -> HealPriority {
match severity {
Severity::Low => HealPriority::Low,
Severity::Medium => HealPriority::Normal,
Severity::High => HealPriority::High,
Severity::Critical => HealPriority::Urgent,
}
}
/// Get event description
pub fn description(&self) -> String {
match self {
HealEvent::ObjectCorruption { bucket, object, corruption_type, .. } => {
format!("Object corruption detected: {}/{} - {:?}", bucket, object, corruption_type)
}
HealEvent::ObjectMissing { bucket, object, .. } => {
format!("Object missing: {}/{}", bucket, object)
}
HealEvent::MetadataCorruption { bucket, object, corruption_type, .. } => {
format!("Metadata corruption: {}/{} - {:?}", bucket, object, corruption_type)
}
HealEvent::DiskStatusChange { endpoint, old_status, new_status, .. } => {
format!("Disk status changed: {:?} {} -> {}", endpoint, old_status, new_status)
}
HealEvent::ECDecodeFailure { bucket, object, missing_shards, .. } => {
format!("EC decode failure: {}/{} - missing shards: {:?}", bucket, object, missing_shards)
}
HealEvent::ChecksumMismatch { bucket, object, expected_checksum, actual_checksum, .. } => {
format!("Checksum mismatch: {}/{} - expected: {}, actual: {}", bucket, object, expected_checksum, actual_checksum)
}
HealEvent::BucketMetadataCorruption { bucket, corruption_type, .. } => {
format!("Bucket metadata corruption: {} - {:?}", bucket, corruption_type)
}
HealEvent::MRFMetadataCorruption { meta_path, corruption_type, .. } => {
format!("MRF metadata corruption: {} - {:?}", meta_path, corruption_type)
}
}
}
/// Get event severity
pub fn severity(&self) -> Severity {
match self {
HealEvent::ObjectCorruption { severity, .. } => severity.clone(),
HealEvent::ObjectMissing { .. } => Severity::High,
HealEvent::MetadataCorruption { .. } => Severity::High,
HealEvent::DiskStatusChange { .. } => Severity::High,
HealEvent::ECDecodeFailure { .. } => Severity::Critical,
HealEvent::ChecksumMismatch { .. } => Severity::High,
HealEvent::BucketMetadataCorruption { .. } => Severity::High,
HealEvent::MRFMetadataCorruption { .. } => Severity::High,
}
}
/// Get event timestamp
pub fn timestamp(&self) -> SystemTime {
SystemTime::now()
}
}
/// Heal event handler
pub struct HealEventHandler {
/// Event queue
events: Vec<HealEvent>,
/// Maximum number of events
max_events: usize,
}
impl HealEventHandler {
pub fn new(max_events: usize) -> Self {
Self {
events: Vec::new(),
max_events,
}
}
/// Add event
pub fn add_event(&mut self, event: HealEvent) {
if self.events.len() >= self.max_events {
// Remove oldest event
self.events.remove(0);
}
self.events.push(event);
}
/// Get all events
pub fn get_events(&self) -> &[HealEvent] {
&self.events
}
/// Clear events
pub fn clear_events(&mut self) {
self.events.clear();
}
/// Get event count
pub fn event_count(&self) -> usize {
self.events.len()
}
/// Filter events by severity
pub fn filter_by_severity(&self, min_severity: Severity) -> Vec<&HealEvent> {
self.events
.iter()
.filter(|event| event.severity() >= min_severity)
.collect()
}
/// Filter events by type
pub fn filter_by_type(&self, event_type: &str) -> Vec<&HealEvent> {
self.events
.iter()
.filter(|event| {
match event {
HealEvent::ObjectCorruption { .. } => event_type == "ObjectCorruption",
HealEvent::ObjectMissing { .. } => event_type == "ObjectMissing",
HealEvent::MetadataCorruption { .. } => event_type == "MetadataCorruption",
HealEvent::DiskStatusChange { .. } => event_type == "DiskStatusChange",
HealEvent::ECDecodeFailure { .. } => event_type == "ECDecodeFailure",
HealEvent::ChecksumMismatch { .. } => event_type == "ChecksumMismatch",
HealEvent::BucketMetadataCorruption { .. } => event_type == "BucketMetadataCorruption",
HealEvent::MRFMetadataCorruption { .. } => event_type == "MRFMetadataCorruption",
}
})
.collect()
}
}
impl Default for HealEventHandler {
fn default() -> Self {
Self::new(1000)
}
}

View File

@@ -0,0 +1,381 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::{Error, Result};
use crate::heal::{
progress::{HealProgress, HealStatistics},
storage::HealStorageAPI,
task::{HealRequest, HealTask, HealTaskStatus},
};
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::{
sync::{Mutex, RwLock},
time::interval,
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
/// Heal config
#[derive(Debug, Clone)]
pub struct HealConfig {
/// Whether to enable auto heal
pub enable_auto_heal: bool,
/// Heal interval
pub heal_interval: Duration,
/// Maximum concurrent heal tasks
pub max_concurrent_heals: usize,
/// Task timeout
pub task_timeout: Duration,
/// Queue size
pub queue_size: usize,
}
impl Default for HealConfig {
fn default() -> Self {
Self {
enable_auto_heal: true,
heal_interval: Duration::from_secs(60), // 1 minute
max_concurrent_heals: 4,
task_timeout: Duration::from_secs(300), // 5 minutes
queue_size: 1000,
}
}
}
/// Heal 状态
#[derive(Debug, Default)]
pub struct HealState {
/// 是否正在运行
pub is_running: bool,
/// 当前 heal 周期
pub current_cycle: u64,
/// 最后 heal 时间
pub last_heal_time: Option<SystemTime>,
/// 总 heal 对象数
pub total_healed_objects: u64,
/// 总 heal 失败数
pub total_heal_failures: u64,
/// 当前活跃 heal 任务数
pub active_heal_count: usize,
}
/// Heal 管理器
pub struct HealManager {
/// Heal 配置
config: Arc<RwLock<HealConfig>>,
/// Heal 状态
state: Arc<RwLock<HealState>>,
/// 活跃的 heal 任务
active_heals: Arc<Mutex<HashMap<String, Arc<HealTask>>>>,
/// Heal 队列
heal_queue: Arc<Mutex<VecDeque<HealRequest>>>,
/// 存储层接口
storage: Arc<dyn HealStorageAPI>,
/// 取消令牌
cancel_token: CancellationToken,
/// 统计信息
statistics: Arc<RwLock<HealStatistics>>,
}
impl HealManager {
/// 创建新的 HealManager
pub fn new(storage: Arc<dyn HealStorageAPI>, config: Option<HealConfig>) -> Self {
let config = config.unwrap_or_default();
Self {
config: Arc::new(RwLock::new(config)),
state: Arc::new(RwLock::new(HealState::default())),
active_heals: Arc::new(Mutex::new(HashMap::new())),
heal_queue: Arc::new(Mutex::new(VecDeque::new())),
storage,
cancel_token: CancellationToken::new(),
statistics: Arc::new(RwLock::new(HealStatistics::new())),
}
}
/// 启动 HealManager
pub async fn start(&self) -> Result<()> {
let mut state = self.state.write().await;
if state.is_running {
warn!("HealManager is already running");
return Ok(());
}
state.is_running = true;
drop(state);
info!("Starting HealManager");
// 启动调度器
self.start_scheduler().await?;
// 启动工作器
self.start_workers().await?;
info!("HealManager started successfully");
Ok(())
}
/// 停止 HealManager
pub async fn stop(&self) -> Result<()> {
info!("Stopping HealManager");
// 取消所有任务
self.cancel_token.cancel();
// 等待所有任务完成
let mut active_heals = self.active_heals.lock().await;
for task in active_heals.values() {
if let Err(e) = task.cancel().await {
warn!("Failed to cancel task {}: {}", task.id, e);
}
}
active_heals.clear();
// 更新状态
let mut state = self.state.write().await;
state.is_running = false;
info!("HealManager stopped successfully");
Ok(())
}
/// 提交 heal 请求
pub async fn submit_heal_request(&self, request: HealRequest) -> Result<String> {
let config = self.config.read().await;
let mut queue = self.heal_queue.lock().await;
if queue.len() >= config.queue_size {
return Err(Error::ConfigurationError {
message: "Heal queue is full".to_string(),
});
}
let request_id = request.id.clone();
queue.push_back(request);
drop(queue);
info!("Submitted heal request: {}", request_id);
Ok(request_id)
}
/// 获取任务状态
pub async fn get_task_status(&self, task_id: &str) -> Result<HealTaskStatus> {
let active_heals = self.active_heals.lock().await;
if let Some(task) = active_heals.get(task_id) {
Ok(task.get_status().await)
} else {
Err(Error::TaskNotFound {
task_id: task_id.to_string(),
})
}
}
/// 获取任务进度
pub async fn get_task_progress(&self, task_id: &str) -> Result<HealProgress> {
let active_heals = self.active_heals.lock().await;
if let Some(task) = active_heals.get(task_id) {
Ok(task.get_progress().await)
} else {
Err(Error::TaskNotFound {
task_id: task_id.to_string(),
})
}
}
/// 取消任务
pub async fn cancel_task(&self, task_id: &str) -> Result<()> {
let mut active_heals = self.active_heals.lock().await;
if let Some(task) = active_heals.get(task_id) {
task.cancel().await?;
active_heals.remove(task_id);
info!("Cancelled heal task: {}", task_id);
Ok(())
} else {
Err(Error::TaskNotFound {
task_id: task_id.to_string(),
})
}
}
/// 获取统计信息
pub async fn get_statistics(&self) -> HealStatistics {
self.statistics.read().await.clone()
}
/// 获取活跃任务数量
pub async fn get_active_task_count(&self) -> usize {
let active_heals = self.active_heals.lock().await;
active_heals.len()
}
/// 获取队列长度
pub async fn get_queue_length(&self) -> usize {
let queue = self.heal_queue.lock().await;
queue.len()
}
/// 启动调度器
async fn start_scheduler(&self) -> Result<()> {
let config = self.config.clone();
let heal_queue = self.heal_queue.clone();
let active_heals = self.active_heals.clone();
let cancel_token = self.cancel_token.clone();
let statistics = self.statistics.clone();
let storage = self.storage.clone();
tokio::spawn(async move {
let mut interval = interval(config.read().await.heal_interval);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("Heal scheduler received shutdown signal");
break;
}
_ = interval.tick() => {
Self::process_heal_queue(&heal_queue, &active_heals, &config, &statistics, &storage).await;
}
}
}
});
Ok(())
}
/// 启动工作器
async fn start_workers(&self) -> Result<()> {
let config = self.config.clone();
let active_heals = self.active_heals.clone();
let storage = self.storage.clone();
let cancel_token = self.cancel_token.clone();
let statistics = self.statistics.clone();
let worker_count = config.read().await.max_concurrent_heals;
for worker_id in 0..worker_count {
let active_heals = active_heals.clone();
let _storage = storage.clone();
let cancel_token = cancel_token.clone();
let statistics = statistics.clone();
tokio::spawn(async move {
info!("Starting heal worker {}", worker_id);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("Heal worker {} received shutdown signal", worker_id);
break;
}
_ = async {
// 等待任务
tokio::time::sleep(Duration::from_millis(100)).await;
} => {
// 检查是否有可执行的任务
let mut active_heals_guard = active_heals.lock().await;
let mut completed_tasks = Vec::new();
for (id, task) in active_heals_guard.iter() {
let status = task.get_status().await;
if matches!(status, HealTaskStatus::Completed | HealTaskStatus::Failed { .. } | HealTaskStatus::Cancelled) {
completed_tasks.push(id.clone());
}
}
// 移除已完成的任务
for task_id in completed_tasks {
if let Some(task) = active_heals_guard.remove(&task_id) {
// 更新统计信息
let mut stats = statistics.write().await;
match task.get_status().await {
HealTaskStatus::Completed => {
stats.update_task_completion(true);
}
HealTaskStatus::Failed { .. } => {
stats.update_task_completion(false);
}
_ => {}
}
}
}
// 更新活跃任务数量
let mut stats = statistics.write().await;
stats.update_running_tasks(active_heals_guard.len() as u64);
}
}
}
info!("Heal worker {} stopped", worker_id);
});
}
Ok(())
}
/// 处理 heal 队列
async fn process_heal_queue(
heal_queue: &Arc<Mutex<VecDeque<HealRequest>>>,
active_heals: &Arc<Mutex<HashMap<String, Arc<HealTask>>>>,
config: &Arc<RwLock<HealConfig>>,
statistics: &Arc<RwLock<HealStatistics>>,
storage: &Arc<dyn HealStorageAPI>,
) {
let config = config.read().await;
let mut active_heals = active_heals.lock().await;
// 检查是否可以启动新的 heal 任务
if active_heals.len() >= config.max_concurrent_heals {
return;
}
let mut queue = heal_queue.lock().await;
if let Some(request) = queue.pop_front() {
let task = Arc::new(HealTask::from_request(request, storage.clone()));
let task_id = task.id.clone();
active_heals.insert(task_id.clone(), task.clone());
// 启动 heal 任务
tokio::spawn(async move {
info!("Starting heal task: {}", task_id);
match task.execute().await {
Ok(_) => {
info!("Heal task completed successfully: {}", task_id);
}
Err(e) => {
error!("Heal task failed: {} - {}", task_id, e);
}
}
});
// 更新统计信息
let mut stats = statistics.write().await;
stats.total_tasks += 1;
}
}
}
impl std::fmt::Debug for HealManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HealManager")
.field("config", &"<config>")
.field("state", &"<state>")
.field("active_heals_count", &"<active_heals>")
.field("queue_length", &"<queue>")
.finish()
}
}

View File

@@ -0,0 +1,22 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod event;
pub mod manager;
pub mod progress;
pub mod storage;
pub mod task;
pub use manager::HealManager;
pub use task::{HealOptions, HealPriority, HealRequest, HealTask, HealType};

View File

@@ -0,0 +1,141 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
use std::time::SystemTime;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct HealProgress {
/// 已扫描对象数
pub objects_scanned: u64,
/// 已修复对象数
pub objects_healed: u64,
/// 修复失败对象数
pub objects_failed: u64,
/// 已处理字节数
pub bytes_processed: u64,
/// 当前处理的对象
pub current_object: Option<String>,
/// 进度百分比
pub progress_percentage: f64,
/// 开始时间
pub start_time: Option<SystemTime>,
/// 最后更新时间
pub last_update_time: Option<SystemTime>,
/// 预计完成时间
pub estimated_completion_time: Option<SystemTime>,
}
impl HealProgress {
pub fn new() -> Self {
Self {
start_time: Some(SystemTime::now()),
last_update_time: Some(SystemTime::now()),
..Default::default()
}
}
pub fn update_progress(&mut self, scanned: u64, healed: u64, failed: u64, bytes: u64) {
self.objects_scanned = scanned;
self.objects_healed = healed;
self.objects_failed = failed;
self.bytes_processed = bytes;
self.last_update_time = Some(SystemTime::now());
// 计算进度百分比
let total = scanned + healed + failed;
if total > 0 {
self.progress_percentage = (healed as f64 / total as f64) * 100.0;
}
}
pub fn set_current_object(&mut self, object: Option<String>) {
self.current_object = object;
self.last_update_time = Some(SystemTime::now());
}
pub fn is_completed(&self) -> bool {
self.progress_percentage >= 100.0 || self.objects_scanned > 0 && self.objects_healed + self.objects_failed >= self.objects_scanned
}
pub fn get_success_rate(&self) -> f64 {
let total = self.objects_healed + self.objects_failed;
if total > 0 {
(self.objects_healed as f64 / total as f64) * 100.0
} else {
0.0
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealStatistics {
/// 总 heal 任务数
pub total_tasks: u64,
/// 成功完成的任务数
pub successful_tasks: u64,
/// 失败的任务数
pub failed_tasks: u64,
/// 正在运行的任务数
pub running_tasks: u64,
/// 总修复对象数
pub total_objects_healed: u64,
/// 总修复字节数
pub total_bytes_healed: u64,
/// 最后更新时间
pub last_update_time: SystemTime,
}
impl HealStatistics {
pub fn new() -> Self {
Self {
total_tasks: 0,
successful_tasks: 0,
failed_tasks: 0,
running_tasks: 0,
total_objects_healed: 0,
total_bytes_healed: 0,
last_update_time: SystemTime::now(),
}
}
pub fn update_task_completion(&mut self, success: bool) {
if success {
self.successful_tasks += 1;
} else {
self.failed_tasks += 1;
}
self.last_update_time = SystemTime::now();
}
pub fn update_running_tasks(&mut self, count: u64) {
self.running_tasks = count;
self.last_update_time = SystemTime::now();
}
pub fn add_healed_objects(&mut self, count: u64, bytes: u64) {
self.total_objects_healed += count;
self.total_bytes_healed += bytes;
self.last_update_time = SystemTime::now();
}
pub fn get_success_rate(&self) -> f64 {
let total = self.successful_tasks + self.failed_tasks;
if total > 0 {
(self.successful_tasks as f64 / total as f64) * 100.0
} else {
0.0
}
}
}

View File

@@ -0,0 +1,307 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::{Error, Result};
use async_trait::async_trait;
use rustfs_ecstore::{
disk::endpoint::Endpoint,
store_api::{BucketInfo, StorageAPI, ObjectIO},
store::ECStore,
};
use std::sync::Arc;
use tracing::{debug, error, info};
/// 磁盘状态
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiskStatus {
/// 正常
Ok,
/// 离线
Offline,
/// 损坏
Corrupt,
/// 缺失
Missing,
/// 权限错误
PermissionDenied,
/// 故障
Faulty,
/// 根挂载
RootMount,
/// 未知
Unknown,
/// 未格式化
Unformatted,
}
/// Heal 存储层接口
#[async_trait]
pub trait HealStorageAPI: Send + Sync {
/// 获取对象元数据
async fn get_object_meta(&self, bucket: &str, object: &str) -> Result<Option<rustfs_ecstore::store_api::ObjectInfo>>;
/// 获取对象数据
async fn get_object_data(&self, bucket: &str, object: &str) -> Result<Option<Vec<u8>>>;
/// 写入对象数据
async fn put_object_data(&self, bucket: &str, object: &str, data: &[u8]) -> Result<()>;
/// 删除对象
async fn delete_object(&self, bucket: &str, object: &str) -> Result<()>;
/// 检查对象完整性
async fn verify_object_integrity(&self, bucket: &str, object: &str) -> Result<bool>;
/// EC 解码重建
async fn ec_decode_rebuild(&self, bucket: &str, object: &str) -> Result<Vec<u8>>;
/// 获取磁盘状态
async fn get_disk_status(&self, endpoint: &Endpoint) -> Result<DiskStatus>;
/// 格式化磁盘
async fn format_disk(&self, endpoint: &Endpoint) -> Result<()>;
/// 获取桶信息
async fn get_bucket_info(&self, bucket: &str) -> Result<Option<BucketInfo>>;
/// 修复桶元数据
async fn heal_bucket_metadata(&self, bucket: &str) -> Result<()>;
/// 获取所有桶列表
async fn list_buckets(&self) -> Result<Vec<BucketInfo>>;
/// 检查对象是否存在
async fn object_exists(&self, bucket: &str, object: &str) -> Result<bool>;
/// 获取对象大小
async fn get_object_size(&self, bucket: &str, object: &str) -> Result<Option<u64>>;
/// 获取对象校验和
async fn get_object_checksum(&self, bucket: &str, object: &str) -> Result<Option<String>>;
}
/// ECStore Heal 存储层实现
pub struct ECStoreHealStorage {
ecstore: Arc<ECStore>,
}
impl ECStoreHealStorage {
pub fn new(ecstore: Arc<ECStore>) -> Self {
Self { ecstore }
}
}
#[async_trait]
impl HealStorageAPI for ECStoreHealStorage {
async fn get_object_meta(&self, bucket: &str, object: &str) -> Result<Option<rustfs_ecstore::store_api::ObjectInfo>> {
debug!("Getting object meta: {}/{}", bucket, object);
match self.ecstore.get_object_info(bucket, object, &Default::default()).await {
Ok(info) => Ok(Some(info)),
Err(e) => {
error!("Failed to get object meta: {}/{} - {}", bucket, object, e);
Err(Error::other(e))
}
}
}
async fn get_object_data(&self, bucket: &str, object: &str) -> Result<Option<Vec<u8>>> {
debug!("Getting object data: {}/{}", bucket, object);
match self.ecstore.get_object_reader(bucket, object, None, Default::default(), &Default::default()).await {
Ok(mut reader) => {
match reader.read_all().await {
Ok(data) => Ok(Some(data)),
Err(e) => {
error!("Failed to read object data: {}/{} - {}", bucket, object, e);
Err(Error::other(e))
}
}
}
Err(e) => {
error!("Failed to get object: {}/{} - {}", bucket, object, e);
Err(Error::other(e))
}
}
}
async fn put_object_data(&self, bucket: &str, object: &str, data: &[u8]) -> Result<()> {
debug!("Putting object data: {}/{} ({} bytes)", bucket, object, data.len());
let mut reader = rustfs_ecstore::store_api::PutObjReader::from_vec(data.to_vec());
match self.ecstore.put_object(bucket, object, &mut reader, &Default::default()).await {
Ok(_) => {
info!("Successfully put object: {}/{}", bucket, object);
Ok(())
}
Err(e) => {
error!("Failed to put object: {}/{} - {}", bucket, object, e);
Err(Error::other(e))
}
}
}
async fn delete_object(&self, bucket: &str, object: &str) -> Result<()> {
debug!("Deleting object: {}/{}", bucket, object);
match self.ecstore.delete_object(bucket, object, Default::default()).await {
Ok(_) => {
info!("Successfully deleted object: {}/{}", bucket, object);
Ok(())
}
Err(e) => {
error!("Failed to delete object: {}/{} - {}", bucket, object, e);
Err(Error::other(e))
}
}
}
async fn verify_object_integrity(&self, bucket: &str, object: &str) -> Result<bool> {
debug!("Verifying object integrity: {}/{}", bucket, object);
// TODO: 实现对象完整性检查
// 1. 获取对象元数据
// 2. 检查数据块完整性
// 3. 验证校验和
// 4. 检查 EC 编码正确性
// 临时实现:总是返回 true
info!("Object integrity check passed: {}/{}", bucket, object);
Ok(true)
}
async fn ec_decode_rebuild(&self, bucket: &str, object: &str) -> Result<Vec<u8>> {
debug!("EC decode rebuild: {}/{}", bucket, object);
// TODO: 实现 EC 解码重建
// 1. 获取对象元数据
// 2. 读取可用的数据块
// 3. 使用 EC 算法重建缺失数据
// 4. 返回完整数据
// 临时实现:尝试获取对象数据
match self.get_object_data(bucket, object).await? {
Some(data) => {
info!("EC decode rebuild successful: {}/{}", bucket, object);
Ok(data)
}
None => {
error!("Object not found for EC decode: {}/{}", bucket, object);
Err(Error::TaskExecutionFailed {
message: format!("Object not found: {}/{}", bucket, object),
})
}
}
}
async fn get_disk_status(&self, endpoint: &Endpoint) -> Result<DiskStatus> {
debug!("Getting disk status: {:?}", endpoint);
// TODO: 实现磁盘状态检查
// 1. 检查磁盘是否可访问
// 2. 检查磁盘格式
// 3. 检查权限
// 4. 返回状态
// 临时实现:总是返回 Ok
info!("Disk status check: {:?} - OK", endpoint);
Ok(DiskStatus::Ok)
}
async fn format_disk(&self, endpoint: &Endpoint) -> Result<()> {
debug!("Formatting disk: {:?}", endpoint);
// TODO: 实现磁盘格式化
// 1. 检查磁盘权限
// 2. 执行格式化操作
// 3. 验证格式化结果
// 临时实现:总是成功
info!("Disk formatted successfully: {:?}", endpoint);
Ok(())
}
async fn get_bucket_info(&self, bucket: &str) -> Result<Option<BucketInfo>> {
debug!("Getting bucket info: {}", bucket);
match self.ecstore.get_bucket_info(bucket, &Default::default()).await {
Ok(info) => Ok(Some(info)),
Err(e) => {
error!("Failed to get bucket info: {} - {}", bucket, e);
Err(Error::other(e))
}
}
}
async fn heal_bucket_metadata(&self, bucket: &str) -> Result<()> {
debug!("Healing bucket metadata: {}", bucket);
// TODO: 实现桶元数据修复
// 1. 检查桶元数据完整性
// 2. 修复损坏的元数据
// 3. 更新桶配置
// 临时实现:总是成功
info!("Bucket metadata healed successfully: {}", bucket);
Ok(())
}
async fn list_buckets(&self) -> Result<Vec<BucketInfo>> {
debug!("Listing buckets");
match self.ecstore.list_bucket(&Default::default()).await {
Ok(buckets) => Ok(buckets),
Err(e) => {
error!("Failed to list buckets: {}", e);
Err(Error::other(e))
}
}
}
async fn object_exists(&self, bucket: &str, object: &str) -> Result<bool> {
debug!("Checking if object exists: {}/{}", bucket, object);
match self.ecstore.get_object_info(bucket, object, &Default::default()).await {
Ok(_) => Ok(true),
Err(e) => {
error!("Failed to check object existence: {}/{} - {}", bucket, object, e);
Err(Error::other(e))
}
}
}
async fn get_object_size(&self, bucket: &str, object: &str) -> Result<Option<u64>> {
debug!("Getting object size: {}/{}", bucket, object);
match self.ecstore.get_object_info(bucket, object, &Default::default()).await {
Ok(info) => Ok(Some(info.size as u64)),
Err(e) => {
error!("Failed to get object size: {}/{} - {}", bucket, object, e);
Err(Error::other(e))
}
}
}
async fn get_object_checksum(&self, bucket: &str, object: &str) -> Result<Option<String>> {
debug!("Getting object checksum: {}/{}", bucket, object);
match self.ecstore.get_object_info(bucket, object, &Default::default()).await {
Ok(info) => Ok(info.etag),
Err(e) => {
error!("Failed to get object checksum: {}/{} - {}", bucket, object, e);
Err(Error::other(e))
}
}
}
}

415
crates/ahm/src/heal/task.rs Normal file
View File

@@ -0,0 +1,415 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::Result;
use crate::heal::{progress::HealProgress, storage::HealStorageAPI};
use rustfs_ecstore::disk::endpoint::Endpoint;
use serde::{Deserialize, Serialize};
use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::sync::RwLock;
use tracing::{debug, error, info};
use uuid::Uuid;
/// Heal 扫描模式
pub type HealScanMode = usize;
pub const HEAL_UNKNOWN_SCAN: HealScanMode = 0;
pub const HEAL_NORMAL_SCAN: HealScanMode = 1;
pub const HEAL_DEEP_SCAN: HealScanMode = 2;
/// Heal 类型
#[derive(Debug, Clone)]
pub enum HealType {
/// 对象 heal
Object {
bucket: String,
object: String,
version_id: Option<String>,
},
/// 桶 heal
Bucket {
bucket: String,
},
/// 磁盘 heal
Disk {
endpoint: Endpoint,
},
/// 元数据 heal
Metadata {
bucket: String,
object: String,
},
/// MRF heal
MRF {
meta_path: String,
},
/// EC 解码 heal
ECDecode {
bucket: String,
object: String,
version_id: Option<String>,
},
}
/// Heal 优先级
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum HealPriority {
/// 低优先级
Low = 0,
/// 普通优先级
Normal = 1,
/// 高优先级
High = 2,
/// 紧急优先级
Urgent = 3,
}
impl Default for HealPriority {
fn default() -> Self {
Self::Normal
}
}
/// Heal 选项
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealOptions {
/// 扫描模式
pub scan_mode: HealScanMode,
/// 是否删除损坏数据
pub remove_corrupted: bool,
/// 是否重新创建
pub recreate_missing: bool,
/// 是否更新奇偶校验
pub update_parity: bool,
/// 是否递归处理
pub recursive: bool,
/// 是否试运行
pub dry_run: bool,
/// 超时时间
pub timeout: Option<Duration>,
}
impl Default for HealOptions {
fn default() -> Self {
Self {
scan_mode: HEAL_NORMAL_SCAN,
remove_corrupted: false,
recreate_missing: true,
update_parity: true,
recursive: false,
dry_run: false,
timeout: Some(Duration::from_secs(300)), // 5分钟默认超时
}
}
}
/// Heal 任务状态
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HealTaskStatus {
/// 等待中
Pending,
/// 运行中
Running,
/// 完成
Completed,
/// 失败
Failed { error: String },
/// 取消
Cancelled,
/// 超时
Timeout,
}
/// Heal 请求
#[derive(Debug, Clone)]
pub struct HealRequest {
/// 请求 ID
pub id: String,
/// Heal 类型
pub heal_type: HealType,
/// Heal 选项
pub options: HealOptions,
/// 优先级
pub priority: HealPriority,
/// 创建时间
pub created_at: SystemTime,
}
impl HealRequest {
pub fn new(heal_type: HealType, options: HealOptions, priority: HealPriority) -> Self {
Self {
id: Uuid::new_v4().to_string(),
heal_type,
options,
priority,
created_at: SystemTime::now(),
}
}
pub fn object(bucket: String, object: String, version_id: Option<String>) -> Self {
Self::new(
HealType::Object {
bucket,
object,
version_id,
},
HealOptions::default(),
HealPriority::Normal,
)
}
pub fn bucket(bucket: String) -> Self {
Self::new(
HealType::Bucket { bucket },
HealOptions::default(),
HealPriority::Normal,
)
}
pub fn disk(endpoint: Endpoint) -> Self {
Self::new(
HealType::Disk { endpoint },
HealOptions::default(),
HealPriority::High,
)
}
pub fn metadata(bucket: String, object: String) -> Self {
Self::new(
HealType::Metadata { bucket, object },
HealOptions::default(),
HealPriority::High,
)
}
pub fn ec_decode(bucket: String, object: String, version_id: Option<String>) -> Self {
Self::new(
HealType::ECDecode {
bucket,
object,
version_id,
},
HealOptions::default(),
HealPriority::Urgent,
)
}
}
/// Heal 任务
pub struct HealTask {
/// 任务 ID
pub id: String,
/// Heal 类型
pub heal_type: HealType,
/// Heal 选项
pub options: HealOptions,
/// 任务状态
pub status: Arc<RwLock<HealTaskStatus>>,
/// 进度跟踪
pub progress: Arc<RwLock<HealProgress>>,
/// 创建时间
pub created_at: SystemTime,
/// 开始时间
pub started_at: Arc<RwLock<Option<SystemTime>>>,
/// 完成时间
pub completed_at: Arc<RwLock<Option<SystemTime>>>,
/// 取消令牌
pub cancel_token: tokio_util::sync::CancellationToken,
/// 存储层接口
pub storage: Arc<dyn HealStorageAPI>,
}
impl HealTask {
pub fn from_request(request: HealRequest, storage: Arc<dyn HealStorageAPI>) -> Self {
Self {
id: request.id,
heal_type: request.heal_type,
options: request.options,
status: Arc::new(RwLock::new(HealTaskStatus::Pending)),
progress: Arc::new(RwLock::new(HealProgress::new())),
created_at: request.created_at,
started_at: Arc::new(RwLock::new(None)),
completed_at: Arc::new(RwLock::new(None)),
cancel_token: tokio_util::sync::CancellationToken::new(),
storage,
}
}
pub async fn execute(&self) -> Result<()> {
// 更新状态为运行中
{
let mut status = self.status.write().await;
*status = HealTaskStatus::Running;
}
{
let mut started_at = self.started_at.write().await;
*started_at = Some(SystemTime::now());
}
info!("Starting heal task: {} with type: {:?}", self.id, self.heal_type);
let result = match &self.heal_type {
HealType::Object { bucket, object, version_id } => {
self.heal_object(bucket, object, version_id.as_deref()).await
}
HealType::Bucket { bucket } => {
self.heal_bucket(bucket).await
}
HealType::Disk { endpoint } => {
self.heal_disk(endpoint).await
}
HealType::Metadata { bucket, object } => {
self.heal_metadata(bucket, object).await
}
HealType::MRF { meta_path } => {
self.heal_mrf(meta_path).await
}
HealType::ECDecode { bucket, object, version_id } => {
self.heal_ec_decode(bucket, object, version_id.as_deref()).await
}
};
// 更新完成时间和状态
{
let mut completed_at = self.completed_at.write().await;
*completed_at = Some(SystemTime::now());
}
match &result {
Ok(_) => {
let mut status = self.status.write().await;
*status = HealTaskStatus::Completed;
info!("Heal task completed successfully: {}", self.id);
}
Err(e) => {
let mut status = self.status.write().await;
*status = HealTaskStatus::Failed {
error: e.to_string(),
};
error!("Heal task failed: {} with error: {}", self.id, e);
}
}
result
}
pub async fn cancel(&self) -> Result<()> {
self.cancel_token.cancel();
let mut status = self.status.write().await;
*status = HealTaskStatus::Cancelled;
info!("Heal task cancelled: {}", self.id);
Ok(())
}
pub async fn get_status(&self) -> HealTaskStatus {
self.status.read().await.clone()
}
pub async fn get_progress(&self) -> HealProgress {
self.progress.read().await.clone()
}
// 具体的 heal 实现方法
async fn heal_object(&self, bucket: &str, object: &str, _version_id: Option<&str>) -> Result<()> {
debug!("Healing object: {}/{}", bucket, object);
// 更新进度
{
let mut progress = self.progress.write().await;
progress.set_current_object(Some(format!("{}/{}", bucket, object)));
}
// TODO: 实现具体的对象 heal 逻辑
// 1. 检查对象完整性
// 2. 如果损坏,尝试 EC 重建
// 3. 更新对象数据
// 4. 更新进度
{
let mut progress = self.progress.write().await;
progress.update_progress(1, 1, 0, 1024); // 示例数据
}
Ok(())
}
async fn heal_bucket(&self, bucket: &str) -> Result<()> {
debug!("Healing bucket: {}", bucket);
// TODO: 实现桶 heal 逻辑
// 1. 检查桶元数据
// 2. 修复桶配置
// 3. 更新进度
Ok(())
}
async fn heal_disk(&self, endpoint: &Endpoint) -> Result<()> {
debug!("Healing disk: {:?}", endpoint);
// TODO: 实现磁盘 heal 逻辑
// 1. 检查磁盘状态
// 2. 格式化磁盘(如果需要)
// 3. 更新进度
Ok(())
}
async fn heal_metadata(&self, bucket: &str, object: &str) -> Result<()> {
debug!("Healing metadata: {}/{}", bucket, object);
// TODO: 实现元数据 heal 逻辑
// 1. 检查元数据完整性
// 2. 重建元数据
// 3. 更新进度
Ok(())
}
async fn heal_mrf(&self, meta_path: &str) -> Result<()> {
debug!("Healing MRF: {}", meta_path);
// TODO: 实现 MRF heal 逻辑
// 1. 检查元数据复制因子
// 2. 修复元数据
// 3. 更新进度
Ok(())
}
async fn heal_ec_decode(&self, bucket: &str, object: &str, _version_id: Option<&str>) -> Result<()> {
debug!("Healing EC decode: {}/{}", bucket, object);
// TODO: 实现 EC 解码 heal 逻辑
// 1. 检查 EC 分片
// 2. 使用 EC 算法重建数据
// 3. 更新进度
Ok(())
}
}
impl std::fmt::Debug for HealTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HealTask")
.field("id", &self.id)
.field("heal_type", &self.heal_type)
.field("options", &self.options)
.field("created_at", &self.created_at)
.finish()
}
}

View File

@@ -16,6 +16,7 @@ use std::sync::OnceLock;
use tokio_util::sync::CancellationToken;
pub mod error;
pub mod heal;
pub mod scanner;
pub use error::{Error, Result};
@@ -23,6 +24,7 @@ pub use scanner::{
BucketTargetUsageInfo, BucketUsageInfo, DataUsageInfo, Scanner, ScannerMetrics, load_data_usage_from_backend,
store_data_usage_in_backend,
};
pub use heal::{HealManager, HealRequest, HealType, HealOptions, HealPriority};
// Global cancellation token for AHM services (scanner and other background tasks)
static GLOBAL_AHM_SERVICES_CANCEL_TOKEN: OnceLock<CancellationToken> = OnceLock::new();

View File

@@ -34,8 +34,9 @@ use super::{
};
use crate::{
error::{Error, Result},
get_ahm_services_cancel_token,
get_ahm_services_cancel_token, HealRequest,
};
use crate::heal::HealManager;
use rustfs_ecstore::disk::RUSTFS_META_BUCKET;
@@ -126,15 +127,15 @@ pub struct Scanner {
data_usage_stats: Arc<Mutex<HashMap<String, DataUsageInfo>>>,
/// Last data usage statistics collection time
last_data_usage_collection: Arc<RwLock<Option<SystemTime>>>,
/// Heal manager for auto-heal integration
heal_manager: Option<Arc<HealManager>>,
}
impl Scanner {
/// Create a new scanner
pub fn new(config: Option<ScannerConfig>) -> Self {
pub fn new(config: Option<ScannerConfig>, heal_manager: Option<Arc<HealManager>>) -> Self {
let config = config.unwrap_or_default();
info!("Creating AHM scanner for all EC sets");
Self {
config: Arc::new(RwLock::new(config)),
state: Arc::new(RwLock::new(ScannerState::default())),
@@ -143,9 +144,15 @@ impl Scanner {
disk_metrics: Arc::new(Mutex::new(HashMap::new())),
data_usage_stats: Arc::new(Mutex::new(HashMap::new())),
last_data_usage_collection: Arc::new(RwLock::new(None)),
heal_manager,
}
}
/// Set the heal manager after construction
pub fn set_heal_manager(&mut self, heal_manager: Arc<HealManager>) {
self.heal_manager = Some(heal_manager);
}
/// Start the scanner
pub async fn start(&self) -> Result<()> {
let mut state = self.state.write().await;
@@ -495,6 +502,24 @@ impl Scanner {
metrics.free_space = disk_info.free;
metrics.is_online = disk.is_online().await;
// 检查磁盘状态如果离线则提交heal任务
if !metrics.is_online {
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
let req = HealRequest::disk(disk.endpoint().clone());
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("磁盘离线已自动提交heal任务: {} 磁盘: {}", task_id, disk_path);
}
Err(e) => {
error!("磁盘离线heal任务提交失败: {},错误: {}", disk_path, e);
}
}
}
}
}
// Additional disk info for debugging
debug!(
"Disk {}: total={}, used={}, free={}, online={}",
@@ -514,6 +539,30 @@ impl Scanner {
Ok(volumes) => volumes,
Err(e) => {
error!("Failed to list volumes on disk {}: {}", disk_path, e);
// 磁盘访问失败提交磁盘heal任务
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
use crate::heal::{HealRequest, HealPriority};
let req = HealRequest::new(
crate::heal::HealType::Disk {
endpoint: disk.endpoint().clone(),
},
crate::heal::HealOptions::default(),
HealPriority::Urgent,
);
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("磁盘访问失败已自动提交heal任务: {} 磁盘: {} 错误: {}", task_id, disk_path, e);
}
Err(heal_err) => {
error!("磁盘访问失败heal任务提交失败: {},错误: {}", disk_path, heal_err);
}
}
}
}
return Err(Error::Storage(e.into()));
}
};
@@ -625,6 +674,22 @@ impl Scanner {
if file_meta.versions.is_empty() {
objects_with_issues += 1;
warn!("Object {} has no versions", entry.name);
// 对象元数据损坏提交元数据heal任务
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
let req = HealRequest::metadata(bucket.to_string(), entry.name.clone());
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("对象元数据损坏已自动提交heal任务: {} {} / {}", task_id, bucket, entry.name);
}
Err(e) => {
error!("对象元数据损坏heal任务提交失败: {} / {},错误: {}", bucket, entry.name, e);
}
}
}
}
} else {
// Store object metadata for later analysis
object_metadata.insert(entry.name.clone(), file_meta.clone());
@@ -632,6 +697,22 @@ impl Scanner {
} else {
objects_with_issues += 1;
warn!("Failed to parse metadata for object {}", entry.name);
// 对象元数据解析失败提交元数据heal任务
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
let req = HealRequest::metadata(bucket.to_string(), entry.name.clone());
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("对象元数据解析失败已自动提交heal任务: {} {} / {}", task_id, bucket, entry.name);
}
Err(e) => {
error!("对象元数据解析失败heal任务提交失败: {} / {},错误: {}", bucket, entry.name, e);
}
}
}
}
}
}
}
@@ -735,7 +816,32 @@ impl Scanner {
let missing_disks: Vec<usize> = (0..disks.len()).filter(|&i| !locations.contains(&i)).collect();
warn!("Object {}/{} missing from disks: {:?}", bucket, object_name, missing_disks);
println!("Object {bucket}/{object_name} missing from disks: {missing_disks:?}");
// TODO: Trigger heal for this object
// 自动提交heal任务
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
use crate::heal::{HealRequest, HealPriority};
let req = HealRequest::new(
crate::heal::HealType::Object {
bucket: bucket.clone(),
object: object_name.clone(),
version_id: None,
},
crate::heal::HealOptions::default(),
HealPriority::High,
);
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("对象缺失已自动提交heal任务: {} {} / {} (缺失磁盘: {:?})",
task_id, bucket, object_name, missing_disks);
}
Err(e) => {
error!("对象缺失heal任务提交失败: {} / {},错误: {}", bucket, object_name, e);
}
}
}
}
}
// Step 3: Deep scan EC verification
@@ -863,6 +969,22 @@ impl Scanner {
);
Ok(())
} else {
// 自动提交heal任务
let enable_healing = self.config.read().await.enable_healing;
if enable_healing {
if let Some(heal_manager) = &self.heal_manager {
use crate::heal::HealRequest;
let req = HealRequest::ec_decode(bucket.to_string(), object.to_string(), None);
match heal_manager.submit_heal_request(req).await {
Ok(task_id) => {
warn!("EC decode失败已自动提交heal任务: {} {} / {}", task_id, bucket, object);
}
Err(e) => {
error!("EC decode失败heal任务提交失败: {} / {},错误: {}", bucket, object, e);
}
}
}
}
Err(Error::Scanner(format!(
"EC decode verification failed for object {bucket}/{object}: need {read_quorum} reads, got {successful_reads} (errors: {errors:?})"
)))
@@ -1005,6 +1127,7 @@ impl Scanner {
disk_metrics: Arc::clone(&self.disk_metrics),
data_usage_stats: Arc::clone(&self.data_usage_stats),
last_data_usage_collection: Arc::clone(&self.last_data_usage_collection),
heal_manager: self.heal_manager.clone(),
}
}
}
@@ -1121,7 +1244,7 @@ mod tests {
.expect("put_object failed");
// create Scanner and test basic functionality
let scanner = Scanner::new(None);
let scanner = Scanner::new(None, None);
// Test 1: Normal scan - verify object is found
println!("=== Test 1: Normal scan ===");
@@ -1200,7 +1323,7 @@ mod tests {
.await
.unwrap();
let scanner = Scanner::new(None);
let scanner = Scanner::new(None, None);
// enable statistics
{

View File

@@ -185,7 +185,7 @@ async fn run(opt: config::Opt) -> Result<()> {
// init_data_scanner().await;
// init_auto_heal().await;
let _ = create_ahm_services_cancel_token();
let scanner = Scanner::new(Some(ScannerConfig::default()));
let scanner = Scanner::new(Some(ScannerConfig::default()), None);
scanner.start().await?;
print_server_info();
init_bucket_replication_pool().await;