From 8e766b90cd0c5791a6b7b5650b1c8274853f66af Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Tue, 15 Jul 2025 17:40:51 +0800 Subject: [PATCH] feat: implement heal channel mechanism for admin-ahm communication - Add global unbounded channel in common crate for heal requests - Implement channel processor in ahm to handle heal commands - Add Start/Query/Cancel commands support via channel - Integrate heal manager initialization in main.rs - Replace direct MRF calls with channel-based heal requests in ecstore - Support advanced heal options including pool_index and set_index - Enable admin handlers to send heal requests via channel --- Cargo.lock | 1 + crates/ahm/src/heal/channel.rs | 232 ++++++++++++++++++ crates/ahm/src/heal/event.rs | 4 +- crates/ahm/src/heal/manager.rs | 2 +- crates/ahm/src/heal/mod.rs | 1 + crates/ahm/src/heal/task.rs | 18 +- crates/ahm/src/lib.rs | 63 ++++- crates/ahm/tests/heal_integration_test.rs | 4 + crates/common/Cargo.toml | 1 + crates/common/src/heal_channel.rs | 226 +++++++++++++++++ crates/common/src/lib.rs | 1 + crates/ecstore/src/global.rs | 4 +- .../ecstore/src/heal/background_heal_ops.rs | 10 +- crates/ecstore/src/set_disk.rs | 113 +++++---- rustfs/src/admin/handlers.rs | 85 ++++--- rustfs/src/admin/mod.rs | 2 +- rustfs/src/main.rs | 10 +- 17 files changed, 679 insertions(+), 98 deletions(-) create mode 100644 crates/ahm/src/heal/channel.rs create mode 100644 crates/common/src/heal_channel.rs diff --git a/Cargo.lock b/Cargo.lock index 0613b2f8..c1c1608c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7967,6 +7967,7 @@ version = "0.0.5" dependencies = [ "tokio", "tonic", + "uuid", ] [[package]] diff --git a/crates/ahm/src/heal/channel.rs b/crates/ahm/src/heal/channel.rs new file mode 100644 index 00000000..6ffc11e5 --- /dev/null +++ b/crates/ahm/src/heal/channel.rs @@ -0,0 +1,232 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::Result; +use crate::heal::{ + manager::HealManager, + task::{HealOptions, HealPriority, HealRequest, HealType}, +}; + +use rustfs_common::heal_channel::{ + HealChannelCommand, HealChannelPriority, HealChannelReceiver, HealChannelRequest, HealChannelResponse, +}; +use std::sync::Arc; +use tokio::sync::mpsc; +use tracing::{error, info}; + +/// Heal channel processor +pub struct HealChannelProcessor { + /// Heal manager + heal_manager: Arc, + /// Response sender + response_sender: mpsc::UnboundedSender, + /// Response receiver + response_receiver: mpsc::UnboundedReceiver, +} + +impl HealChannelProcessor { + /// Create new HealChannelProcessor + pub fn new(heal_manager: Arc) -> Self { + let (response_tx, response_rx) = mpsc::unbounded_channel(); + Self { + heal_manager, + response_sender: response_tx, + response_receiver: response_rx, + } + } + + /// Start processing heal channel requests + pub async fn start(&mut self, mut receiver: HealChannelReceiver) -> Result<()> { + info!("Starting heal channel processor"); + + loop { + tokio::select! { + command = receiver.recv() => { + match command { + Some(command) => { + if let Err(e) = self.process_command(command).await { + error!("Failed to process heal command: {}", e); + } + } + None => { + info!("Heal channel receiver closed, stopping processor"); + break; + } + } + } + response = self.response_receiver.recv() => { + if let Some(response) = response { + // Handle response if needed + info!("Received heal response for request: {}", response.request_id); + } + } + } + } + + info!("Heal channel processor stopped"); + Ok(()) + } + + /// Process heal command + async fn process_command(&self, command: HealChannelCommand) -> Result<()> { + match command { + HealChannelCommand::Start(request) => self.process_start_request(request).await, + HealChannelCommand::Query { heal_path, client_token } => self.process_query_request(heal_path, client_token).await, + HealChannelCommand::Cancel { heal_path } => self.process_cancel_request(heal_path).await, + } + } + + /// Process start request + async fn process_start_request(&self, request: HealChannelRequest) -> Result<()> { + info!("Processing heal start request: {} for bucket: {}", request.id, request.bucket); + + // Convert channel request to heal request + let heal_request = self.convert_to_heal_request(request.clone())?; + + // Submit to heal manager + match self.heal_manager.submit_heal_request(heal_request).await { + Ok(task_id) => { + info!("Successfully submitted heal request: {} as task: {}", request.id, task_id); + + // Send success response + let response = HealChannelResponse { + request_id: request.id, + success: true, + data: Some(format!("Task ID: {}", task_id).into_bytes()), + error: None, + }; + + if let Err(e) = self.response_sender.send(response) { + error!("Failed to send heal response: {}", e); + } + } + Err(e) => { + error!("Failed to submit heal request: {} - {}", request.id, e); + + // Send error response + let response = HealChannelResponse { + request_id: request.id, + success: false, + data: None, + error: Some(e.to_string()), + }; + + if let Err(e) = self.response_sender.send(response) { + error!("Failed to send heal error response: {}", e); + } + } + } + + Ok(()) + } + + /// Process query request + async fn process_query_request(&self, heal_path: String, client_token: String) -> Result<()> { + info!("Processing heal query request for path: {}", heal_path); + + // TODO: Implement query logic based on heal_path and client_token + // For now, return a placeholder response + let response = HealChannelResponse { + request_id: client_token, + success: true, + data: Some(format!("Query result for path: {}", heal_path).into_bytes()), + error: None, + }; + + if let Err(e) = self.response_sender.send(response) { + error!("Failed to send query response: {}", e); + } + + Ok(()) + } + + /// Process cancel request + async fn process_cancel_request(&self, heal_path: String) -> Result<()> { + info!("Processing heal cancel request for path: {}", heal_path); + + // TODO: Implement cancel logic based on heal_path + // For now, return a placeholder response + let response = HealChannelResponse { + request_id: heal_path.clone(), + success: true, + data: Some(format!("Cancel request for path: {}", heal_path).into_bytes()), + error: None, + }; + + if let Err(e) = self.response_sender.send(response) { + error!("Failed to send cancel response: {}", e); + } + + Ok(()) + } + + /// Convert channel request to heal request + fn convert_to_heal_request(&self, request: HealChannelRequest) -> Result { + let heal_type = match &request.object_prefix { + Some(prefix) if !prefix.is_empty() => HealType::Object { + bucket: request.bucket.clone(), + object: prefix.clone(), + version_id: None, + }, + _ => HealType::Bucket { + bucket: request.bucket.clone(), + }, + }; + + let priority = match request.priority { + HealChannelPriority::Low => HealPriority::Low, + HealChannelPriority::Normal => HealPriority::Normal, + HealChannelPriority::High => HealPriority::High, + HealChannelPriority::Critical => HealPriority::Urgent, + }; + + // Convert scan mode + let scan_mode = match request.scan_mode { + Some(rustfs_common::heal_channel::HealChannelScanMode::Normal) => { + rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN + } + Some(rustfs_common::heal_channel::HealChannelScanMode::Deep) => { + rustfs_ecstore::heal::heal_commands::HEAL_DEEP_SCAN + } + None => rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN, + }; + + // Build HealOptions with all available fields + let mut options = HealOptions { + scan_mode, + remove_corrupted: request.remove_corrupted.unwrap_or(false), + recreate_missing: request.recreate_missing.unwrap_or(true), + update_parity: request.update_parity.unwrap_or(true), + recursive: request.recursive.unwrap_or(false), + dry_run: request.dry_run.unwrap_or(false), + timeout: request.timeout_seconds.map(|secs| std::time::Duration::from_secs(secs)), + pool_index: request.pool_index, + set_index: request.set_index, + }; + + // Apply force_start overrides + if request.force_start { + options.remove_corrupted = true; + options.recreate_missing = true; + options.update_parity = true; + } + + Ok(HealRequest::new(heal_type, options, priority)) + } + + /// Get response sender for external use + pub fn get_response_sender(&self) -> mpsc::UnboundedSender { + self.response_sender.clone() + } +} diff --git a/crates/ahm/src/heal/event.rs b/crates/ahm/src/heal/event.rs index 85ecb002..11705385 100644 --- a/crates/ahm/src/heal/event.rs +++ b/crates/ahm/src/heal/event.rs @@ -246,9 +246,7 @@ impl HealEvent { actual_checksum, .. } => { - format!( - "Checksum mismatch: {bucket}/{object} - expected: {expected_checksum}, actual: {actual_checksum}" - ) + format!("Checksum mismatch: {bucket}/{object} - expected: {expected_checksum}, actual: {actual_checksum}") } HealEvent::BucketMetadataCorruption { bucket, corruption_type, .. diff --git a/crates/ahm/src/heal/manager.rs b/crates/ahm/src/heal/manager.rs index 694b340b..d0581620 100644 --- a/crates/ahm/src/heal/manager.rs +++ b/crates/ahm/src/heal/manager.rs @@ -52,7 +52,7 @@ impl Default for HealConfig { fn default() -> Self { Self { enable_auto_heal: true, - heal_interval: Duration::from_secs(60), // 1 minute + heal_interval: Duration::from_secs(10), // 10 seconds max_concurrent_heals: 4, task_timeout: Duration::from_secs(300), // 5 minutes queue_size: 1000, diff --git a/crates/ahm/src/heal/mod.rs b/crates/ahm/src/heal/mod.rs index 742c2766..f209d450 100644 --- a/crates/ahm/src/heal/mod.rs +++ b/crates/ahm/src/heal/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod channel; pub mod event; pub mod manager; pub mod progress; diff --git a/crates/ahm/src/heal/task.rs b/crates/ahm/src/heal/task.rs index 8650e571..ea07786d 100644 --- a/crates/ahm/src/heal/task.rs +++ b/crates/ahm/src/heal/task.rs @@ -94,6 +94,10 @@ pub struct HealOptions { pub dry_run: bool, /// Timeout pub timeout: Option, + /// pool index + pub pool_index: Option, + /// set index + pub set_index: Option, } impl Default for HealOptions { @@ -106,6 +110,8 @@ impl Default for HealOptions { recursive: false, dry_run: false, timeout: Some(Duration::from_secs(300)), // 5 minutes default timeout + pool_index: None, + set_index: None, } } } @@ -339,8 +345,8 @@ impl HealTask { scan_mode: self.options.scan_mode, update_parity: self.options.update_parity, no_lock: false, - pool: None, - set: None, + pool: self.options.pool_index, + set: self.options.set_index, }; match self.storage.heal_object(bucket, object, version_id, &heal_opts).await { @@ -491,8 +497,8 @@ impl HealTask { scan_mode: self.options.scan_mode, update_parity: self.options.update_parity, no_lock: false, - pool: None, - set: None, + pool: self.options.pool_index, + set: self.options.set_index, }; match self.storage.heal_bucket(bucket, &heal_opts).await { @@ -609,8 +615,8 @@ impl HealTask { scan_mode: rustfs_ecstore::heal::heal_commands::HEAL_DEEP_SCAN, update_parity: false, no_lock: false, - pool: None, - set: None, + pool: self.options.pool_index, + set: self.options.set_index, }; match self.storage.heal_object(bucket, object, None, &heal_opts).await { diff --git a/crates/ahm/src/lib.rs b/crates/ahm/src/lib.rs index 3a5117f7..207973e9 100644 --- a/crates/ahm/src/lib.rs +++ b/crates/ahm/src/lib.rs @@ -12,15 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::OnceLock; +use std::sync::{Arc, OnceLock}; use tokio_util::sync::CancellationToken; +use tracing::{error, info}; pub mod error; pub mod heal; pub mod scanner; pub use error::{Error, Result}; -pub use heal::{HealManager, HealOptions, HealPriority, HealRequest, HealType}; +pub use heal::{channel::HealChannelProcessor, HealManager, HealOptions, HealPriority, HealRequest, HealType}; pub use scanner::{ BucketTargetUsageInfo, BucketUsageInfo, DataUsageInfo, Scanner, ScannerMetrics, load_data_usage_from_backend, store_data_usage_in_backend, @@ -54,3 +55,61 @@ pub fn shutdown_ahm_services() { cancel_token.cancel(); } } + +/// Global heal manager instance +static GLOBAL_HEAL_MANAGER: OnceLock> = OnceLock::new(); + +/// Global heal channel processor instance +static GLOBAL_HEAL_CHANNEL_PROCESSOR: OnceLock>> = OnceLock::new(); + +/// Initialize and start heal manager with channel processor +pub async fn init_heal_manager_with_channel( + storage: Arc, + config: Option, +) -> Result<()> { + // Create heal manager + let heal_manager = Arc::new(HealManager::new(storage, config)); + + // Start heal manager + heal_manager.start().await?; + + // Store global instance + GLOBAL_HEAL_MANAGER + .set(heal_manager.clone()) + .map_err(|_| Error::Config("Heal manager already initialized".to_string()))?; + + // Initialize heal channel + let channel_receiver = rustfs_common::heal_channel::init_heal_channel(); + + // Create channel processor + let channel_processor = HealChannelProcessor::new(heal_manager); + + // Store channel processor instance first + GLOBAL_HEAL_CHANNEL_PROCESSOR + .set(Arc::new(tokio::sync::Mutex::new(channel_processor))) + .map_err(|_| Error::Config("Heal channel processor already initialized".to_string()))?; + + // Start channel processor in background + let receiver = channel_receiver; + tokio::spawn(async move { + if let Some(processor_guard) = GLOBAL_HEAL_CHANNEL_PROCESSOR.get() { + let mut processor = processor_guard.lock().await; + if let Err(e) = processor.start(receiver).await { + error!("Heal channel processor failed: {}", e); + } + } + }); + + info!("Heal manager with channel processor initialized successfully"); + Ok(()) +} + +/// Get global heal manager instance +pub fn get_heal_manager() -> Option<&'static Arc> { + GLOBAL_HEAL_MANAGER.get() +} + +/// Get global heal channel processor instance +pub fn get_heal_channel_processor() -> Option<&'static Arc>> { + GLOBAL_HEAL_CHANNEL_PROCESSOR.get() +} diff --git a/crates/ahm/tests/heal_integration_test.rs b/crates/ahm/tests/heal_integration_test.rs index c161af15..905115ed 100644 --- a/crates/ahm/tests/heal_integration_test.rs +++ b/crates/ahm/tests/heal_integration_test.rs @@ -194,6 +194,8 @@ async fn test_heal_object_basic() { scan_mode: HEAL_NORMAL_SCAN, update_parity: true, timeout: Some(Duration::from_secs(300)), + pool_index: None, + set_index: None, }, HealPriority::Normal, ); @@ -260,6 +262,8 @@ async fn test_heal_bucket_basic() { scan_mode: HEAL_NORMAL_SCAN, update_parity: false, timeout: Some(Duration::from_secs(300)), + pool_index: None, + set_index: None, }, HealPriority::Normal, ); diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index e8bcb370..85d8bf97 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -30,3 +30,4 @@ workspace = true [dependencies] tokio.workspace = true tonic = { workspace = true } +uuid = { workspace = true } diff --git a/crates/common/src/heal_channel.rs b/crates/common/src/heal_channel.rs new file mode 100644 index 00000000..22943852 --- /dev/null +++ b/crates/common/src/heal_channel.rs @@ -0,0 +1,226 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::OnceLock; +use tokio::sync::mpsc; +use uuid::Uuid; + +/// Heal channel command type +#[derive(Debug, Clone)] +pub enum HealChannelCommand { + /// Start a new heal task + Start(HealChannelRequest), + /// Query heal task status + Query { heal_path: String, client_token: String }, + /// Cancel heal task + Cancel { heal_path: String }, +} + +/// Heal request from admin to ahm +#[derive(Debug, Clone)] +pub struct HealChannelRequest { + /// Unique request ID + pub id: String, + /// Bucket name + pub bucket: String, + /// Object prefix (optional) + pub object_prefix: Option, + /// Force start heal + pub force_start: bool, + /// Priority + pub priority: HealChannelPriority, + /// Pool index (optional) + pub pool_index: Option, + /// Set index (optional) + pub set_index: Option, + /// Scan mode (optional) + pub scan_mode: Option, + /// Whether to remove corrupted data + pub remove_corrupted: Option, + /// Whether to recreate missing data + pub recreate_missing: Option, + /// Whether to update parity + pub update_parity: Option, + /// Whether to recursively process + pub recursive: Option, + /// Whether to dry run + pub dry_run: Option, + /// Timeout in seconds (optional) + pub timeout_seconds: Option, +} + +/// Heal response from ahm to admin +#[derive(Debug, Clone)] +pub struct HealChannelResponse { + /// Request ID + pub request_id: String, + /// Success status + pub success: bool, + /// Response data (if successful) + pub data: Option>, + /// Error message (if failed) + pub error: Option, +} + +/// Heal priority +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HealChannelPriority { + /// Low priority + Low, + /// Normal priority + Normal, + /// High priority + High, + /// Critical priority + Critical, +} + +impl Default for HealChannelPriority { + fn default() -> Self { + Self::Normal + } +} + +/// Heal channel sender +pub type HealChannelSender = mpsc::UnboundedSender; + +/// Heal channel receiver +pub type HealChannelReceiver = mpsc::UnboundedReceiver; + +/// Global heal channel sender +static GLOBAL_HEAL_CHANNEL_SENDER: OnceLock = OnceLock::new(); + +/// Initialize global heal channel +pub fn init_heal_channel() -> HealChannelReceiver { + let (tx, rx) = mpsc::unbounded_channel(); + GLOBAL_HEAL_CHANNEL_SENDER + .set(tx) + .expect("Heal channel sender already initialized"); + rx +} + +/// Get global heal channel sender +pub fn get_heal_channel_sender() -> Option<&'static HealChannelSender> { + GLOBAL_HEAL_CHANNEL_SENDER.get() +} + +/// Send heal command through global channel +pub async fn send_heal_command(command: HealChannelCommand) -> Result<(), String> { + if let Some(sender) = get_heal_channel_sender() { + sender + .send(command) + .map_err(|e| format!("Failed to send heal command: {}", e))?; + Ok(()) + } else { + Err("Heal channel not initialized".to_string()) + } +} + +/// Send heal start request +pub async fn send_heal_request(request: HealChannelRequest) -> Result<(), String> { + send_heal_command(HealChannelCommand::Start(request)).await +} + +/// Send heal query request +pub async fn query_heal_status(heal_path: String, client_token: String) -> Result<(), String> { + send_heal_command(HealChannelCommand::Query { heal_path, client_token }).await +} + +/// Send heal cancel request +pub async fn cancel_heal_task(heal_path: String) -> Result<(), String> { + send_heal_command(HealChannelCommand::Cancel { heal_path }).await +} + +/// Create a new heal request +pub fn create_heal_request( + bucket: String, + object_prefix: Option, + force_start: bool, + priority: Option, +) -> HealChannelRequest { + HealChannelRequest { + id: Uuid::new_v4().to_string(), + bucket, + object_prefix, + force_start, + priority: priority.unwrap_or_default(), + pool_index: None, + set_index: None, + scan_mode: None, + remove_corrupted: None, + recreate_missing: None, + update_parity: None, + recursive: None, + dry_run: None, + timeout_seconds: None, + } +} + +/// Create a new heal request with advanced options +pub fn create_heal_request_with_options( + bucket: String, + object_prefix: Option, + force_start: bool, + priority: Option, + pool_index: Option, + set_index: Option, + scan_mode: Option, + remove_corrupted: Option, + recreate_missing: Option, + update_parity: Option, + recursive: Option, + dry_run: Option, + timeout_seconds: Option, +) -> HealChannelRequest { + HealChannelRequest { + id: Uuid::new_v4().to_string(), + bucket, + object_prefix, + force_start, + priority: priority.unwrap_or_default(), + pool_index, + set_index, + scan_mode, + remove_corrupted, + recreate_missing, + update_parity, + recursive, + dry_run, + timeout_seconds, + } +} + +/// Create a heal response +pub fn create_heal_response( + request_id: String, + success: bool, + data: Option>, + error: Option, +) -> HealChannelResponse { + HealChannelResponse { + request_id, + success, + data, + error, + } +} + +/// Heal scan mode +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HealChannelScanMode { + /// Normal scan + Normal, + /// Deep scan + Deep, +} diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 16de991d..7c163b34 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -15,6 +15,7 @@ pub mod bucket_stats; // pub mod error; pub mod globals; +pub mod heal_channel; pub mod last_minute; // is ',' diff --git a/crates/ecstore/src/global.rs b/crates/ecstore/src/global.rs index 93c38fe0..d8411fba 100644 --- a/crates/ecstore/src/global.rs +++ b/crates/ecstore/src/global.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::heal::mrf::MRFState; use crate::{ bucket::lifecycle::bucket_lifecycle_ops::LifecycleSys, disk::DiskStore, @@ -53,12 +52,11 @@ pub static ref GLOBAL_Endpoints: OnceLock = OnceLock::new() pub static ref GLOBAL_RootDiskThreshold: RwLock = RwLock::new(0); pub static ref GLOBAL_BackgroundHealRoutine: Arc = HealRoutine::new(); pub static ref GLOBAL_BackgroundHealState: Arc = AllHealState::new(false); +// pub static ref GLOBAL_MRFState: Arc = Arc::new(MRFState::new()); pub static ref GLOBAL_TierConfigMgr: Arc> = TierConfigMgr::new(); pub static ref GLOBAL_LifecycleSys: Arc = LifecycleSys::new(); pub static ref GLOBAL_EventNotifier: Arc> = EventNotifier::new(); //pub static ref GLOBAL_RemoteTargetTransport -pub static ref GLOBAL_ALlHealState: Arc = AllHealState::new(false); -pub static ref GLOBAL_MRFState: Arc = Arc::new(MRFState::new()); static ref globalDeploymentIDPtr: OnceLock = OnceLock::new(); pub static ref GLOBAL_BOOT_TIME: OnceCell = OnceCell::new(); pub static ref GLOBAL_LocalNodeName: String = "127.0.0.1:9000".to_string(); diff --git a/crates/ecstore/src/heal/background_heal_ops.rs b/crates/ecstore/src/heal/background_heal_ops.rs index 0bc3e259..59abc3db 100644 --- a/crates/ecstore/src/heal/background_heal_ops.rs +++ b/crates/ecstore/src/heal/background_heal_ops.rs @@ -33,7 +33,7 @@ use super::{ heal_ops::{HealSequence, new_bg_heal_sequence}, }; use crate::error::{Error, Result}; -use crate::global::{GLOBAL_MRFState, get_background_services_cancel_token}; +use crate::global::get_background_services_cancel_token; use crate::heal::error::ERR_RETRY_HEALING; use crate::heal::heal_commands::{HEAL_ITEM_BUCKET, HealScanMode}; use crate::heal::heal_ops::{BG_HEALING_UUID, HealSource}; @@ -76,10 +76,10 @@ pub async fn init_auto_heal() { }); } - let cancel_clone = cancel_token.clone(); - spawn(async move { - GLOBAL_MRFState.heal_routine_with_cancel(cancel_clone).await; - }); + // let cancel_clone = cancel_token.clone(); + // spawn(async move { + // GLOBAL_MRFState.heal_routine_with_cancel(cancel_clone).await; + // }); } async fn init_background_healing() { diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 851bbf27..6edd46f0 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -2033,17 +2033,23 @@ impl SetDisks { let fi = Self::pick_valid_fileinfo(&parts_metadata, mot_time, etag, read_quorum as usize)?; if errs.iter().any(|err| err.is_some()) { - GLOBAL_MRFState - .add_partial(PartialOperation { - bucket: fi.volume.to_string(), - object: fi.name.to_string(), - queued: Utc::now(), - version_id: fi.version_id.map(|v| v.to_string()), - set_index: self.set_index, - pool_index: self.pool_index, - ..Default::default() - }) - .await; + let _ = rustfs_common::heal_channel::send_heal_request( + rustfs_common::heal_channel::create_heal_request_with_options( + fi.volume.to_string(), // bucket + Some(fi.name.to_string()), // object_prefix + false, // force_start + Some(rustfs_common::heal_channel::HealChannelPriority::Normal), // priority + Some(self.pool_index), // pool_index + Some(self.set_index), // set_index + None, // scan_mode + None, // remove_corrupted + None, // recreate_missing + None, // update_parity + None, // recursive + None, // dry_run + None, // timeout_seconds + ) + ).await; } // debug!("get_object_fileinfo pick fi {:?}", &fi); @@ -2174,18 +2180,23 @@ impl SetDisks { match de_err { DiskError::FileNotFound | DiskError::FileCorrupt => { error!("erasure.decode err 111 {:?}", &de_err); - GLOBAL_MRFState - .add_partial(PartialOperation { - bucket: bucket.to_string(), - object: object.to_string(), - queued: Utc::now(), - version_id: fi.version_id.map(|v| v.to_string()), - set_index, - pool_index, - bitrot_scan: de_err == DiskError::FileCorrupt, - ..Default::default() - }) - .await; + let _ = rustfs_common::heal_channel::send_heal_request( + rustfs_common::heal_channel::create_heal_request_with_options( + bucket.to_string(), + Some(object.to_string()), + false, + Some(rustfs_common::heal_channel::HealChannelPriority::Normal), + Some(pool_index), + Some(set_index), + None, + None, + None, + None, + None, + None, + None, + ) + ).await; has_err = false; } _ => {} @@ -4693,17 +4704,23 @@ impl StorageAPI for SetDisks { #[tracing::instrument(skip(self))] async fn add_partial(&self, bucket: &str, object: &str, version_id: &str) -> Result<()> { - GLOBAL_MRFState - .add_partial(PartialOperation { - bucket: bucket.to_string(), - object: object.to_string(), - version_id: Some(version_id.to_string()), - queued: Utc::now(), - set_index: self.set_index, - pool_index: self.pool_index, - ..Default::default() - }) - .await; + let _ = rustfs_common::heal_channel::send_heal_request( + rustfs_common::heal_channel::create_heal_request_with_options( + bucket.to_string(), + Some(object.to_string()), + false, + Some(rustfs_common::heal_channel::HealChannelPriority::Normal), + Some(self.pool_index), + Some(self.set_index), + None, + None, + None, + None, + None, + None, + None, + ) + ).await; Ok(()) } @@ -5845,17 +5862,23 @@ impl StorageAPI for SetDisks { .await?; } if let Some(versions) = versions { - GLOBAL_MRFState - .add_partial(PartialOperation { - bucket: bucket.to_string(), - object: object.to_string(), - queued: Utc::now(), - versions, - set_index: self.set_index, - pool_index: self.pool_index, - ..Default::default() - }) - .await; + let _ = rustfs_common::heal_channel::send_heal_request( + rustfs_common::heal_channel::create_heal_request_with_options( + bucket.to_string(), + Some(object.to_string()), + false, + Some(rustfs_common::heal_channel::HealChannelPriority::Normal), + Some(self.pool_index), + Some(self.set_index), + None, + None, + None, + None, + None, + None, + None, + ) + ).await; } let upload_id_path = upload_id_path.clone(); diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 5061a609..ab3a083c 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -29,12 +29,9 @@ use rustfs_ecstore::bucket::target::BucketTarget; use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys; use rustfs_ecstore::cmd::bucket_targets::{self, GLOBAL_Bucket_Target_Sys}; use rustfs_ecstore::error::StorageError; -use rustfs_ecstore::global::GLOBAL_ALlHealState; use rustfs_ecstore::global::get_global_action_cred; -// use rustfs_ecstore::heal::data_usage::load_data_usage_from_backend; use rustfs_ecstore::heal::data_usage::load_data_usage_from_backend; use rustfs_ecstore::heal::heal_commands::HealOpts; -use rustfs_ecstore::heal::heal_ops::new_heal_sequence; use rustfs_ecstore::metrics_realtime::{CollectMetricsOpts, MetricType, collect_local_metrics}; use rustfs_ecstore::new_object_layer_fn; use rustfs_ecstore::pools::{get_total_usable_capacity, get_total_usable_capacity_free}; @@ -689,33 +686,20 @@ impl Operation for HealHandler { } let heal_path = path_join(&[PathBuf::from(hip.bucket.clone()), PathBuf::from(hip.obj_prefix.clone())]); - if !hip.client_token.is_empty() && !hip.force_start && !hip.force_stop { - match GLOBAL_ALlHealState - .pop_heal_status_json(heal_path.to_str().unwrap_or_default(), &hip.client_token) - .await - { - Ok(b) => { - info!("pop_heal_status_json success"); - return Ok(S3Response::new((StatusCode::OK, Body::from(b)))); - } - Err(_e) => { - info!("pop_heal_status_json failed"); - return Ok(S3Response::new((StatusCode::INTERNAL_SERVER_ERROR, Body::from(vec![])))); - } - } - } let (tx, mut rx) = mpsc::channel(1); - if hip.force_stop { + + if !hip.client_token.is_empty() && !hip.force_start && !hip.force_stop { + // Query heal status let tx_clone = tx.clone(); + let heal_path_str = heal_path.to_str().unwrap_or_default().to_string(); + let client_token = hip.client_token.clone(); spawn(async move { - match GLOBAL_ALlHealState - .stop_heal_sequence(heal_path.to_str().unwrap_or_default()) - .await - { - Ok(b) => { + match rustfs_common::heal_channel::query_heal_status(heal_path_str, client_token).await { + Ok(_) => { + // TODO: Get actual response from channel let _ = tx_clone .send(HealResp { - resp_bytes: b, + resp_bytes: vec![], ..Default::default() }) .await; @@ -723,7 +707,32 @@ impl Operation for HealHandler { Err(e) => { let _ = tx_clone .send(HealResp { - _api_err: Some(e), + _api_err: Some(StorageError::other(e)), + ..Default::default() + }) + .await; + } + } + }); + } else if hip.force_stop { + // Cancel heal task + let tx_clone = tx.clone(); + let heal_path_str = heal_path.to_str().unwrap_or_default().to_string(); + spawn(async move { + match rustfs_common::heal_channel::cancel_heal_task(heal_path_str).await { + Ok(_) => { + // TODO: Get actual response from channel + let _ = tx_clone + .send(HealResp { + resp_bytes: vec![], + ..Default::default() + }) + .await; + } + Err(e) => { + let _ = tx_clone + .send(HealResp { + _api_err: Some(StorageError::other(e)), ..Default::default() }) .await; @@ -731,22 +740,36 @@ impl Operation for HealHandler { } }); } else if hip.client_token.is_empty() { - let nh = Arc::new(new_heal_sequence(&hip.bucket, &hip.obj_prefix, "", hip.hs, hip.force_start)); + // Use new heal channel mechanism let tx_clone = tx.clone(); spawn(async move { - match GLOBAL_ALlHealState.launch_new_heal_sequence(nh).await { - Ok(b) => { + // Create heal request through channel + let heal_request = rustfs_common::heal_channel::create_heal_request( + hip.bucket.clone(), + if hip.obj_prefix.is_empty() { + None + } else { + Some(hip.obj_prefix.clone()) + }, + hip.force_start, + Some(rustfs_common::heal_channel::HealChannelPriority::Normal), + ); + + match rustfs_common::heal_channel::send_heal_request(heal_request).await { + Ok(_) => { + // Success - send empty response for now let _ = tx_clone .send(HealResp { - resp_bytes: b, + resp_bytes: vec![], ..Default::default() }) .await; } Err(e) => { + // Error - send error response let _ = tx_clone .send(HealResp { - _api_err: Some(e), + _api_err: Some(StorageError::other(e)), ..Default::default() }) .await; diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index f8cbd0ed..eb4ecf43 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -31,7 +31,7 @@ use router::{AdminOperation, S3Router}; use rpc::register_rpc_route; use s3s::route::S3Route; -const ADMIN_PREFIX: &str = "/rustfs/admin"; +const ADMIN_PREFIX: &str = "/minio/admin"; pub fn make_admin_route(console_enabled: bool) -> std::io::Result { let mut r: S3Router = S3Router::new(console_enabled); diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index e1f481e9..b7cc0a32 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -29,7 +29,10 @@ use chrono::Datelike; use clap::Parser; use license::init_license; use rustfs_ahm::scanner::data_scanner::ScannerConfig; -use rustfs_ahm::{Scanner, create_ahm_services_cancel_token, shutdown_ahm_services}; +use rustfs_ahm::{ + Scanner, create_ahm_services_cancel_token, heal::storage::ECStoreHealStorage, init_heal_manager_with_channel, + shutdown_ahm_services, +}; use rustfs_common::globals::set_global_addr; use rustfs_config::DEFAULT_DELIMITER; use rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys; @@ -185,6 +188,11 @@ async fn run(opt: config::Opt) -> Result<()> { // init_data_scanner().await; // init_auto_heal().await; let _ = create_ahm_services_cancel_token(); + + // Initialize heal manager with channel processor + let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone())); + init_heal_manager_with_channel(heal_storage, None).await?; + let scanner = Scanner::new(Some(ScannerConfig::default()), None); scanner.start().await?; print_server_info();