feat: implement heal channel mechanism for admin-ahm communication

- Add global unbounded channel in common crate for heal requests
- Implement channel processor in ahm to handle heal commands
- Add Start/Query/Cancel commands support via channel
- Integrate heal manager initialization in main.rs
- Replace direct MRF calls with channel-based heal requests in ecstore
- Support advanced heal options including pool_index and set_index
- Enable admin handlers to send heal requests via channel
This commit is contained in:
junxiang Mu
2025-07-15 17:40:51 +08:00
parent 3409cd8dff
commit 8e766b90cd
17 changed files with 679 additions and 98 deletions

1
Cargo.lock generated
View File

@@ -7967,6 +7967,7 @@ version = "0.0.5"
dependencies = [
"tokio",
"tonic",
"uuid",
]
[[package]]

View File

@@ -0,0 +1,232 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::error::Result;
use crate::heal::{
manager::HealManager,
task::{HealOptions, HealPriority, HealRequest, HealType},
};
use rustfs_common::heal_channel::{
HealChannelCommand, HealChannelPriority, HealChannelReceiver, HealChannelRequest, HealChannelResponse,
};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{error, info};
/// Heal channel processor
pub struct HealChannelProcessor {
/// Heal manager
heal_manager: Arc<HealManager>,
/// Response sender
response_sender: mpsc::UnboundedSender<HealChannelResponse>,
/// Response receiver
response_receiver: mpsc::UnboundedReceiver<HealChannelResponse>,
}
impl HealChannelProcessor {
/// Create new HealChannelProcessor
pub fn new(heal_manager: Arc<HealManager>) -> Self {
let (response_tx, response_rx) = mpsc::unbounded_channel();
Self {
heal_manager,
response_sender: response_tx,
response_receiver: response_rx,
}
}
/// Start processing heal channel requests
pub async fn start(&mut self, mut receiver: HealChannelReceiver) -> Result<()> {
info!("Starting heal channel processor");
loop {
tokio::select! {
command = receiver.recv() => {
match command {
Some(command) => {
if let Err(e) = self.process_command(command).await {
error!("Failed to process heal command: {}", e);
}
}
None => {
info!("Heal channel receiver closed, stopping processor");
break;
}
}
}
response = self.response_receiver.recv() => {
if let Some(response) = response {
// Handle response if needed
info!("Received heal response for request: {}", response.request_id);
}
}
}
}
info!("Heal channel processor stopped");
Ok(())
}
/// Process heal command
async fn process_command(&self, command: HealChannelCommand) -> Result<()> {
match command {
HealChannelCommand::Start(request) => self.process_start_request(request).await,
HealChannelCommand::Query { heal_path, client_token } => self.process_query_request(heal_path, client_token).await,
HealChannelCommand::Cancel { heal_path } => self.process_cancel_request(heal_path).await,
}
}
/// Process start request
async fn process_start_request(&self, request: HealChannelRequest) -> Result<()> {
info!("Processing heal start request: {} for bucket: {}", request.id, request.bucket);
// Convert channel request to heal request
let heal_request = self.convert_to_heal_request(request.clone())?;
// Submit to heal manager
match self.heal_manager.submit_heal_request(heal_request).await {
Ok(task_id) => {
info!("Successfully submitted heal request: {} as task: {}", request.id, task_id);
// Send success response
let response = HealChannelResponse {
request_id: request.id,
success: true,
data: Some(format!("Task ID: {}", task_id).into_bytes()),
error: None,
};
if let Err(e) = self.response_sender.send(response) {
error!("Failed to send heal response: {}", e);
}
}
Err(e) => {
error!("Failed to submit heal request: {} - {}", request.id, e);
// Send error response
let response = HealChannelResponse {
request_id: request.id,
success: false,
data: None,
error: Some(e.to_string()),
};
if let Err(e) = self.response_sender.send(response) {
error!("Failed to send heal error response: {}", e);
}
}
}
Ok(())
}
/// Process query request
async fn process_query_request(&self, heal_path: String, client_token: String) -> Result<()> {
info!("Processing heal query request for path: {}", heal_path);
// TODO: Implement query logic based on heal_path and client_token
// For now, return a placeholder response
let response = HealChannelResponse {
request_id: client_token,
success: true,
data: Some(format!("Query result for path: {}", heal_path).into_bytes()),
error: None,
};
if let Err(e) = self.response_sender.send(response) {
error!("Failed to send query response: {}", e);
}
Ok(())
}
/// Process cancel request
async fn process_cancel_request(&self, heal_path: String) -> Result<()> {
info!("Processing heal cancel request for path: {}", heal_path);
// TODO: Implement cancel logic based on heal_path
// For now, return a placeholder response
let response = HealChannelResponse {
request_id: heal_path.clone(),
success: true,
data: Some(format!("Cancel request for path: {}", heal_path).into_bytes()),
error: None,
};
if let Err(e) = self.response_sender.send(response) {
error!("Failed to send cancel response: {}", e);
}
Ok(())
}
/// Convert channel request to heal request
fn convert_to_heal_request(&self, request: HealChannelRequest) -> Result<HealRequest> {
let heal_type = match &request.object_prefix {
Some(prefix) if !prefix.is_empty() => HealType::Object {
bucket: request.bucket.clone(),
object: prefix.clone(),
version_id: None,
},
_ => HealType::Bucket {
bucket: request.bucket.clone(),
},
};
let priority = match request.priority {
HealChannelPriority::Low => HealPriority::Low,
HealChannelPriority::Normal => HealPriority::Normal,
HealChannelPriority::High => HealPriority::High,
HealChannelPriority::Critical => HealPriority::Urgent,
};
// Convert scan mode
let scan_mode = match request.scan_mode {
Some(rustfs_common::heal_channel::HealChannelScanMode::Normal) => {
rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN
}
Some(rustfs_common::heal_channel::HealChannelScanMode::Deep) => {
rustfs_ecstore::heal::heal_commands::HEAL_DEEP_SCAN
}
None => rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN,
};
// Build HealOptions with all available fields
let mut options = HealOptions {
scan_mode,
remove_corrupted: request.remove_corrupted.unwrap_or(false),
recreate_missing: request.recreate_missing.unwrap_or(true),
update_parity: request.update_parity.unwrap_or(true),
recursive: request.recursive.unwrap_or(false),
dry_run: request.dry_run.unwrap_or(false),
timeout: request.timeout_seconds.map(|secs| std::time::Duration::from_secs(secs)),
pool_index: request.pool_index,
set_index: request.set_index,
};
// Apply force_start overrides
if request.force_start {
options.remove_corrupted = true;
options.recreate_missing = true;
options.update_parity = true;
}
Ok(HealRequest::new(heal_type, options, priority))
}
/// Get response sender for external use
pub fn get_response_sender(&self) -> mpsc::UnboundedSender<HealChannelResponse> {
self.response_sender.clone()
}
}

View File

@@ -246,9 +246,7 @@ impl HealEvent {
actual_checksum,
..
} => {
format!(
"Checksum mismatch: {bucket}/{object} - expected: {expected_checksum}, actual: {actual_checksum}"
)
format!("Checksum mismatch: {bucket}/{object} - expected: {expected_checksum}, actual: {actual_checksum}")
}
HealEvent::BucketMetadataCorruption {
bucket, corruption_type, ..

View File

@@ -52,7 +52,7 @@ impl Default for HealConfig {
fn default() -> Self {
Self {
enable_auto_heal: true,
heal_interval: Duration::from_secs(60), // 1 minute
heal_interval: Duration::from_secs(10), // 10 seconds
max_concurrent_heals: 4,
task_timeout: Duration::from_secs(300), // 5 minutes
queue_size: 1000,

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod channel;
pub mod event;
pub mod manager;
pub mod progress;

View File

@@ -94,6 +94,10 @@ pub struct HealOptions {
pub dry_run: bool,
/// Timeout
pub timeout: Option<Duration>,
/// pool index
pub pool_index: Option<usize>,
/// set index
pub set_index: Option<usize>,
}
impl Default for HealOptions {
@@ -106,6 +110,8 @@ impl Default for HealOptions {
recursive: false,
dry_run: false,
timeout: Some(Duration::from_secs(300)), // 5 minutes default timeout
pool_index: None,
set_index: None,
}
}
}
@@ -339,8 +345,8 @@ impl HealTask {
scan_mode: self.options.scan_mode,
update_parity: self.options.update_parity,
no_lock: false,
pool: None,
set: None,
pool: self.options.pool_index,
set: self.options.set_index,
};
match self.storage.heal_object(bucket, object, version_id, &heal_opts).await {
@@ -491,8 +497,8 @@ impl HealTask {
scan_mode: self.options.scan_mode,
update_parity: self.options.update_parity,
no_lock: false,
pool: None,
set: None,
pool: self.options.pool_index,
set: self.options.set_index,
};
match self.storage.heal_bucket(bucket, &heal_opts).await {
@@ -609,8 +615,8 @@ impl HealTask {
scan_mode: rustfs_ecstore::heal::heal_commands::HEAL_DEEP_SCAN,
update_parity: false,
no_lock: false,
pool: None,
set: None,
pool: self.options.pool_index,
set: self.options.set_index,
};
match self.storage.heal_object(bucket, object, None, &heal_opts).await {

View File

@@ -12,15 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::OnceLock;
use std::sync::{Arc, OnceLock};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
pub mod error;
pub mod heal;
pub mod scanner;
pub use error::{Error, Result};
pub use heal::{HealManager, HealOptions, HealPriority, HealRequest, HealType};
pub use heal::{channel::HealChannelProcessor, HealManager, HealOptions, HealPriority, HealRequest, HealType};
pub use scanner::{
BucketTargetUsageInfo, BucketUsageInfo, DataUsageInfo, Scanner, ScannerMetrics, load_data_usage_from_backend,
store_data_usage_in_backend,
@@ -54,3 +55,61 @@ pub fn shutdown_ahm_services() {
cancel_token.cancel();
}
}
/// Global heal manager instance
static GLOBAL_HEAL_MANAGER: OnceLock<Arc<HealManager>> = OnceLock::new();
/// Global heal channel processor instance
static GLOBAL_HEAL_CHANNEL_PROCESSOR: OnceLock<Arc<tokio::sync::Mutex<HealChannelProcessor>>> = OnceLock::new();
/// Initialize and start heal manager with channel processor
pub async fn init_heal_manager_with_channel(
storage: Arc<dyn heal::storage::HealStorageAPI>,
config: Option<heal::manager::HealConfig>,
) -> Result<()> {
// Create heal manager
let heal_manager = Arc::new(HealManager::new(storage, config));
// Start heal manager
heal_manager.start().await?;
// Store global instance
GLOBAL_HEAL_MANAGER
.set(heal_manager.clone())
.map_err(|_| Error::Config("Heal manager already initialized".to_string()))?;
// Initialize heal channel
let channel_receiver = rustfs_common::heal_channel::init_heal_channel();
// Create channel processor
let channel_processor = HealChannelProcessor::new(heal_manager);
// Store channel processor instance first
GLOBAL_HEAL_CHANNEL_PROCESSOR
.set(Arc::new(tokio::sync::Mutex::new(channel_processor)))
.map_err(|_| Error::Config("Heal channel processor already initialized".to_string()))?;
// Start channel processor in background
let receiver = channel_receiver;
tokio::spawn(async move {
if let Some(processor_guard) = GLOBAL_HEAL_CHANNEL_PROCESSOR.get() {
let mut processor = processor_guard.lock().await;
if let Err(e) = processor.start(receiver).await {
error!("Heal channel processor failed: {}", e);
}
}
});
info!("Heal manager with channel processor initialized successfully");
Ok(())
}
/// Get global heal manager instance
pub fn get_heal_manager() -> Option<&'static Arc<HealManager>> {
GLOBAL_HEAL_MANAGER.get()
}
/// Get global heal channel processor instance
pub fn get_heal_channel_processor() -> Option<&'static Arc<tokio::sync::Mutex<HealChannelProcessor>>> {
GLOBAL_HEAL_CHANNEL_PROCESSOR.get()
}

View File

@@ -194,6 +194,8 @@ async fn test_heal_object_basic() {
scan_mode: HEAL_NORMAL_SCAN,
update_parity: true,
timeout: Some(Duration::from_secs(300)),
pool_index: None,
set_index: None,
},
HealPriority::Normal,
);
@@ -260,6 +262,8 @@ async fn test_heal_bucket_basic() {
scan_mode: HEAL_NORMAL_SCAN,
update_parity: false,
timeout: Some(Duration::from_secs(300)),
pool_index: None,
set_index: None,
},
HealPriority::Normal,
);

View File

@@ -30,3 +30,4 @@ workspace = true
[dependencies]
tokio.workspace = true
tonic = { workspace = true }
uuid = { workspace = true }

View File

@@ -0,0 +1,226 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::OnceLock;
use tokio::sync::mpsc;
use uuid::Uuid;
/// Heal channel command type
#[derive(Debug, Clone)]
pub enum HealChannelCommand {
/// Start a new heal task
Start(HealChannelRequest),
/// Query heal task status
Query { heal_path: String, client_token: String },
/// Cancel heal task
Cancel { heal_path: String },
}
/// Heal request from admin to ahm
#[derive(Debug, Clone)]
pub struct HealChannelRequest {
/// Unique request ID
pub id: String,
/// Bucket name
pub bucket: String,
/// Object prefix (optional)
pub object_prefix: Option<String>,
/// Force start heal
pub force_start: bool,
/// Priority
pub priority: HealChannelPriority,
/// Pool index (optional)
pub pool_index: Option<usize>,
/// Set index (optional)
pub set_index: Option<usize>,
/// Scan mode (optional)
pub scan_mode: Option<HealChannelScanMode>,
/// Whether to remove corrupted data
pub remove_corrupted: Option<bool>,
/// Whether to recreate missing data
pub recreate_missing: Option<bool>,
/// Whether to update parity
pub update_parity: Option<bool>,
/// Whether to recursively process
pub recursive: Option<bool>,
/// Whether to dry run
pub dry_run: Option<bool>,
/// Timeout in seconds (optional)
pub timeout_seconds: Option<u64>,
}
/// Heal response from ahm to admin
#[derive(Debug, Clone)]
pub struct HealChannelResponse {
/// Request ID
pub request_id: String,
/// Success status
pub success: bool,
/// Response data (if successful)
pub data: Option<Vec<u8>>,
/// Error message (if failed)
pub error: Option<String>,
}
/// Heal priority
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HealChannelPriority {
/// Low priority
Low,
/// Normal priority
Normal,
/// High priority
High,
/// Critical priority
Critical,
}
impl Default for HealChannelPriority {
fn default() -> Self {
Self::Normal
}
}
/// Heal channel sender
pub type HealChannelSender = mpsc::UnboundedSender<HealChannelCommand>;
/// Heal channel receiver
pub type HealChannelReceiver = mpsc::UnboundedReceiver<HealChannelCommand>;
/// Global heal channel sender
static GLOBAL_HEAL_CHANNEL_SENDER: OnceLock<HealChannelSender> = OnceLock::new();
/// Initialize global heal channel
pub fn init_heal_channel() -> HealChannelReceiver {
let (tx, rx) = mpsc::unbounded_channel();
GLOBAL_HEAL_CHANNEL_SENDER
.set(tx)
.expect("Heal channel sender already initialized");
rx
}
/// Get global heal channel sender
pub fn get_heal_channel_sender() -> Option<&'static HealChannelSender> {
GLOBAL_HEAL_CHANNEL_SENDER.get()
}
/// Send heal command through global channel
pub async fn send_heal_command(command: HealChannelCommand) -> Result<(), String> {
if let Some(sender) = get_heal_channel_sender() {
sender
.send(command)
.map_err(|e| format!("Failed to send heal command: {}", e))?;
Ok(())
} else {
Err("Heal channel not initialized".to_string())
}
}
/// Send heal start request
pub async fn send_heal_request(request: HealChannelRequest) -> Result<(), String> {
send_heal_command(HealChannelCommand::Start(request)).await
}
/// Send heal query request
pub async fn query_heal_status(heal_path: String, client_token: String) -> Result<(), String> {
send_heal_command(HealChannelCommand::Query { heal_path, client_token }).await
}
/// Send heal cancel request
pub async fn cancel_heal_task(heal_path: String) -> Result<(), String> {
send_heal_command(HealChannelCommand::Cancel { heal_path }).await
}
/// Create a new heal request
pub fn create_heal_request(
bucket: String,
object_prefix: Option<String>,
force_start: bool,
priority: Option<HealChannelPriority>,
) -> HealChannelRequest {
HealChannelRequest {
id: Uuid::new_v4().to_string(),
bucket,
object_prefix,
force_start,
priority: priority.unwrap_or_default(),
pool_index: None,
set_index: None,
scan_mode: None,
remove_corrupted: None,
recreate_missing: None,
update_parity: None,
recursive: None,
dry_run: None,
timeout_seconds: None,
}
}
/// Create a new heal request with advanced options
pub fn create_heal_request_with_options(
bucket: String,
object_prefix: Option<String>,
force_start: bool,
priority: Option<HealChannelPriority>,
pool_index: Option<usize>,
set_index: Option<usize>,
scan_mode: Option<HealChannelScanMode>,
remove_corrupted: Option<bool>,
recreate_missing: Option<bool>,
update_parity: Option<bool>,
recursive: Option<bool>,
dry_run: Option<bool>,
timeout_seconds: Option<u64>,
) -> HealChannelRequest {
HealChannelRequest {
id: Uuid::new_v4().to_string(),
bucket,
object_prefix,
force_start,
priority: priority.unwrap_or_default(),
pool_index,
set_index,
scan_mode,
remove_corrupted,
recreate_missing,
update_parity,
recursive,
dry_run,
timeout_seconds,
}
}
/// Create a heal response
pub fn create_heal_response(
request_id: String,
success: bool,
data: Option<Vec<u8>>,
error: Option<String>,
) -> HealChannelResponse {
HealChannelResponse {
request_id,
success,
data,
error,
}
}
/// Heal scan mode
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HealChannelScanMode {
/// Normal scan
Normal,
/// Deep scan
Deep,
}

View File

@@ -15,6 +15,7 @@
pub mod bucket_stats;
// pub mod error;
pub mod globals;
pub mod heal_channel;
pub mod last_minute;
// is ','

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::heal::mrf::MRFState;
use crate::{
bucket::lifecycle::bucket_lifecycle_ops::LifecycleSys,
disk::DiskStore,
@@ -53,12 +52,11 @@ pub static ref GLOBAL_Endpoints: OnceLock<EndpointServerPools> = OnceLock::new()
pub static ref GLOBAL_RootDiskThreshold: RwLock<u64> = RwLock::new(0);
pub static ref GLOBAL_BackgroundHealRoutine: Arc<HealRoutine> = HealRoutine::new();
pub static ref GLOBAL_BackgroundHealState: Arc<AllHealState> = AllHealState::new(false);
// pub static ref GLOBAL_MRFState: Arc<MRFState> = Arc::new(MRFState::new());
pub static ref GLOBAL_TierConfigMgr: Arc<RwLock<TierConfigMgr>> = TierConfigMgr::new();
pub static ref GLOBAL_LifecycleSys: Arc<LifecycleSys> = LifecycleSys::new();
pub static ref GLOBAL_EventNotifier: Arc<RwLock<EventNotifier>> = EventNotifier::new();
//pub static ref GLOBAL_RemoteTargetTransport
pub static ref GLOBAL_ALlHealState: Arc<AllHealState> = AllHealState::new(false);
pub static ref GLOBAL_MRFState: Arc<MRFState> = Arc::new(MRFState::new());
static ref globalDeploymentIDPtr: OnceLock<Uuid> = OnceLock::new();
pub static ref GLOBAL_BOOT_TIME: OnceCell<SystemTime> = OnceCell::new();
pub static ref GLOBAL_LocalNodeName: String = "127.0.0.1:9000".to_string();

View File

@@ -33,7 +33,7 @@ use super::{
heal_ops::{HealSequence, new_bg_heal_sequence},
};
use crate::error::{Error, Result};
use crate::global::{GLOBAL_MRFState, get_background_services_cancel_token};
use crate::global::get_background_services_cancel_token;
use crate::heal::error::ERR_RETRY_HEALING;
use crate::heal::heal_commands::{HEAL_ITEM_BUCKET, HealScanMode};
use crate::heal::heal_ops::{BG_HEALING_UUID, HealSource};
@@ -76,10 +76,10 @@ pub async fn init_auto_heal() {
});
}
let cancel_clone = cancel_token.clone();
spawn(async move {
GLOBAL_MRFState.heal_routine_with_cancel(cancel_clone).await;
});
// let cancel_clone = cancel_token.clone();
// spawn(async move {
// GLOBAL_MRFState.heal_routine_with_cancel(cancel_clone).await;
// });
}
async fn init_background_healing() {

View File

@@ -2033,17 +2033,23 @@ impl SetDisks {
let fi = Self::pick_valid_fileinfo(&parts_metadata, mot_time, etag, read_quorum as usize)?;
if errs.iter().any(|err| err.is_some()) {
GLOBAL_MRFState
.add_partial(PartialOperation {
bucket: fi.volume.to_string(),
object: fi.name.to_string(),
queued: Utc::now(),
version_id: fi.version_id.map(|v| v.to_string()),
set_index: self.set_index,
pool_index: self.pool_index,
..Default::default()
})
.await;
let _ = rustfs_common::heal_channel::send_heal_request(
rustfs_common::heal_channel::create_heal_request_with_options(
fi.volume.to_string(), // bucket
Some(fi.name.to_string()), // object_prefix
false, // force_start
Some(rustfs_common::heal_channel::HealChannelPriority::Normal), // priority
Some(self.pool_index), // pool_index
Some(self.set_index), // set_index
None, // scan_mode
None, // remove_corrupted
None, // recreate_missing
None, // update_parity
None, // recursive
None, // dry_run
None, // timeout_seconds
)
).await;
}
// debug!("get_object_fileinfo pick fi {:?}", &fi);
@@ -2174,18 +2180,23 @@ impl SetDisks {
match de_err {
DiskError::FileNotFound | DiskError::FileCorrupt => {
error!("erasure.decode err 111 {:?}", &de_err);
GLOBAL_MRFState
.add_partial(PartialOperation {
bucket: bucket.to_string(),
object: object.to_string(),
queued: Utc::now(),
version_id: fi.version_id.map(|v| v.to_string()),
set_index,
pool_index,
bitrot_scan: de_err == DiskError::FileCorrupt,
..Default::default()
})
.await;
let _ = rustfs_common::heal_channel::send_heal_request(
rustfs_common::heal_channel::create_heal_request_with_options(
bucket.to_string(),
Some(object.to_string()),
false,
Some(rustfs_common::heal_channel::HealChannelPriority::Normal),
Some(pool_index),
Some(set_index),
None,
None,
None,
None,
None,
None,
None,
)
).await;
has_err = false;
}
_ => {}
@@ -4693,17 +4704,23 @@ impl StorageAPI for SetDisks {
#[tracing::instrument(skip(self))]
async fn add_partial(&self, bucket: &str, object: &str, version_id: &str) -> Result<()> {
GLOBAL_MRFState
.add_partial(PartialOperation {
bucket: bucket.to_string(),
object: object.to_string(),
version_id: Some(version_id.to_string()),
queued: Utc::now(),
set_index: self.set_index,
pool_index: self.pool_index,
..Default::default()
})
.await;
let _ = rustfs_common::heal_channel::send_heal_request(
rustfs_common::heal_channel::create_heal_request_with_options(
bucket.to_string(),
Some(object.to_string()),
false,
Some(rustfs_common::heal_channel::HealChannelPriority::Normal),
Some(self.pool_index),
Some(self.set_index),
None,
None,
None,
None,
None,
None,
None,
)
).await;
Ok(())
}
@@ -5845,17 +5862,23 @@ impl StorageAPI for SetDisks {
.await?;
}
if let Some(versions) = versions {
GLOBAL_MRFState
.add_partial(PartialOperation {
bucket: bucket.to_string(),
object: object.to_string(),
queued: Utc::now(),
versions,
set_index: self.set_index,
pool_index: self.pool_index,
..Default::default()
})
.await;
let _ = rustfs_common::heal_channel::send_heal_request(
rustfs_common::heal_channel::create_heal_request_with_options(
bucket.to_string(),
Some(object.to_string()),
false,
Some(rustfs_common::heal_channel::HealChannelPriority::Normal),
Some(self.pool_index),
Some(self.set_index),
None,
None,
None,
None,
None,
None,
None,
)
).await;
}
let upload_id_path = upload_id_path.clone();

View File

@@ -29,12 +29,9 @@ use rustfs_ecstore::bucket::target::BucketTarget;
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
use rustfs_ecstore::cmd::bucket_targets::{self, GLOBAL_Bucket_Target_Sys};
use rustfs_ecstore::error::StorageError;
use rustfs_ecstore::global::GLOBAL_ALlHealState;
use rustfs_ecstore::global::get_global_action_cred;
// use rustfs_ecstore::heal::data_usage::load_data_usage_from_backend;
use rustfs_ecstore::heal::data_usage::load_data_usage_from_backend;
use rustfs_ecstore::heal::heal_commands::HealOpts;
use rustfs_ecstore::heal::heal_ops::new_heal_sequence;
use rustfs_ecstore::metrics_realtime::{CollectMetricsOpts, MetricType, collect_local_metrics};
use rustfs_ecstore::new_object_layer_fn;
use rustfs_ecstore::pools::{get_total_usable_capacity, get_total_usable_capacity_free};
@@ -689,33 +686,20 @@ impl Operation for HealHandler {
}
let heal_path = path_join(&[PathBuf::from(hip.bucket.clone()), PathBuf::from(hip.obj_prefix.clone())]);
if !hip.client_token.is_empty() && !hip.force_start && !hip.force_stop {
match GLOBAL_ALlHealState
.pop_heal_status_json(heal_path.to_str().unwrap_or_default(), &hip.client_token)
.await
{
Ok(b) => {
info!("pop_heal_status_json success");
return Ok(S3Response::new((StatusCode::OK, Body::from(b))));
}
Err(_e) => {
info!("pop_heal_status_json failed");
return Ok(S3Response::new((StatusCode::INTERNAL_SERVER_ERROR, Body::from(vec![]))));
}
}
}
let (tx, mut rx) = mpsc::channel(1);
if hip.force_stop {
if !hip.client_token.is_empty() && !hip.force_start && !hip.force_stop {
// Query heal status
let tx_clone = tx.clone();
let heal_path_str = heal_path.to_str().unwrap_or_default().to_string();
let client_token = hip.client_token.clone();
spawn(async move {
match GLOBAL_ALlHealState
.stop_heal_sequence(heal_path.to_str().unwrap_or_default())
.await
{
Ok(b) => {
match rustfs_common::heal_channel::query_heal_status(heal_path_str, client_token).await {
Ok(_) => {
// TODO: Get actual response from channel
let _ = tx_clone
.send(HealResp {
resp_bytes: b,
resp_bytes: vec![],
..Default::default()
})
.await;
@@ -723,7 +707,32 @@ impl Operation for HealHandler {
Err(e) => {
let _ = tx_clone
.send(HealResp {
_api_err: Some(e),
_api_err: Some(StorageError::other(e)),
..Default::default()
})
.await;
}
}
});
} else if hip.force_stop {
// Cancel heal task
let tx_clone = tx.clone();
let heal_path_str = heal_path.to_str().unwrap_or_default().to_string();
spawn(async move {
match rustfs_common::heal_channel::cancel_heal_task(heal_path_str).await {
Ok(_) => {
// TODO: Get actual response from channel
let _ = tx_clone
.send(HealResp {
resp_bytes: vec![],
..Default::default()
})
.await;
}
Err(e) => {
let _ = tx_clone
.send(HealResp {
_api_err: Some(StorageError::other(e)),
..Default::default()
})
.await;
@@ -731,22 +740,36 @@ impl Operation for HealHandler {
}
});
} else if hip.client_token.is_empty() {
let nh = Arc::new(new_heal_sequence(&hip.bucket, &hip.obj_prefix, "", hip.hs, hip.force_start));
// Use new heal channel mechanism
let tx_clone = tx.clone();
spawn(async move {
match GLOBAL_ALlHealState.launch_new_heal_sequence(nh).await {
Ok(b) => {
// Create heal request through channel
let heal_request = rustfs_common::heal_channel::create_heal_request(
hip.bucket.clone(),
if hip.obj_prefix.is_empty() {
None
} else {
Some(hip.obj_prefix.clone())
},
hip.force_start,
Some(rustfs_common::heal_channel::HealChannelPriority::Normal),
);
match rustfs_common::heal_channel::send_heal_request(heal_request).await {
Ok(_) => {
// Success - send empty response for now
let _ = tx_clone
.send(HealResp {
resp_bytes: b,
resp_bytes: vec![],
..Default::default()
})
.await;
}
Err(e) => {
// Error - send error response
let _ = tx_clone
.send(HealResp {
_api_err: Some(e),
_api_err: Some(StorageError::other(e)),
..Default::default()
})
.await;

View File

@@ -31,7 +31,7 @@ use router::{AdminOperation, S3Router};
use rpc::register_rpc_route;
use s3s::route::S3Route;
const ADMIN_PREFIX: &str = "/rustfs/admin";
const ADMIN_PREFIX: &str = "/minio/admin";
pub fn make_admin_route(console_enabled: bool) -> std::io::Result<impl S3Route> {
let mut r: S3Router<AdminOperation> = S3Router::new(console_enabled);

View File

@@ -29,7 +29,10 @@ use chrono::Datelike;
use clap::Parser;
use license::init_license;
use rustfs_ahm::scanner::data_scanner::ScannerConfig;
use rustfs_ahm::{Scanner, create_ahm_services_cancel_token, shutdown_ahm_services};
use rustfs_ahm::{
Scanner, create_ahm_services_cancel_token, heal::storage::ECStoreHealStorage, init_heal_manager_with_channel,
shutdown_ahm_services,
};
use rustfs_common::globals::set_global_addr;
use rustfs_config::DEFAULT_DELIMITER;
use rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
@@ -185,6 +188,11 @@ async fn run(opt: config::Opt) -> Result<()> {
// init_data_scanner().await;
// init_auto_heal().await;
let _ = create_ahm_services_cancel_token();
// Initialize heal manager with channel processor
let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone()));
init_heal_manager_with_channel(heal_storage, None).await?;
let scanner = Scanner::new(Some(ScannerConfig::default()), None);
scanner.start().await?;
print_server_info();