diff --git a/crates/ahm/src/heal/channel.rs b/crates/ahm/src/heal/channel.rs index 6ffc11e5..f9b59652 100644 --- a/crates/ahm/src/heal/channel.rs +++ b/crates/ahm/src/heal/channel.rs @@ -103,7 +103,7 @@ impl HealChannelProcessor { let response = HealChannelResponse { request_id: request.id, success: true, - data: Some(format!("Task ID: {}", task_id).into_bytes()), + data: Some(format!("Task ID: {task_id}").into_bytes()), error: None, }; @@ -140,7 +140,7 @@ impl HealChannelProcessor { let response = HealChannelResponse { request_id: client_token, success: true, - data: Some(format!("Query result for path: {}", heal_path).into_bytes()), + data: Some(format!("Query result for path: {heal_path}").into_bytes()), error: None, }; @@ -160,7 +160,7 @@ impl HealChannelProcessor { let response = HealChannelResponse { request_id: heal_path.clone(), success: true, - data: Some(format!("Cancel request for path: {}", heal_path).into_bytes()), + data: Some(format!("Cancel request for path: {heal_path}").into_bytes()), error: None, }; @@ -196,9 +196,7 @@ impl HealChannelProcessor { Some(rustfs_common::heal_channel::HealChannelScanMode::Normal) => { rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN } - Some(rustfs_common::heal_channel::HealChannelScanMode::Deep) => { - rustfs_ecstore::heal::heal_commands::HEAL_DEEP_SCAN - } + Some(rustfs_common::heal_channel::HealChannelScanMode::Deep) => rustfs_ecstore::heal::heal_commands::HEAL_DEEP_SCAN, None => rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN, }; @@ -210,7 +208,7 @@ impl HealChannelProcessor { update_parity: request.update_parity.unwrap_or(true), recursive: request.recursive.unwrap_or(false), dry_run: request.dry_run.unwrap_or(false), - timeout: request.timeout_seconds.map(|secs| std::time::Duration::from_secs(secs)), + timeout: request.timeout_seconds.map(std::time::Duration::from_secs), pool_index: request.pool_index, set_index: request.set_index, }; diff --git a/crates/ahm/src/heal/manager.rs b/crates/ahm/src/heal/manager.rs index d0581620..3a267839 100644 --- a/crates/ahm/src/heal/manager.rs +++ b/crates/ahm/src/heal/manager.rs @@ -278,7 +278,6 @@ impl HealManager { _ = interval.tick() => { // Build list of endpoints that need healing let mut endpoints = Vec::new(); - println!("GLOBAL_LOCAL_DISK_MAP length: {:?}", GLOBAL_LOCAL_DISK_MAP.read().await.len()); for (_, disk_opt) in GLOBAL_LOCAL_DISK_MAP.read().await.iter() { if let Some(disk) = disk_opt { // detect unformatted disk via get_disk_id() @@ -300,7 +299,6 @@ impl HealManager { if endpoints.is_empty() { continue; } - println!("endpoints length: {:?}", endpoints.len()); for ep in endpoints { // skip if already queued or healing diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index b2d1103a..aed19e87 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -1159,8 +1159,17 @@ mod tests { use serial_test::serial; use std::fs; use std::net::SocketAddr; + use std::sync::OnceLock; + + // Global test environment cache to avoid repeated initialization + static GLOBAL_TEST_ENV: OnceLock<(Vec, Arc)> = OnceLock::new(); async fn prepare_test_env(test_dir: Option<&str>, port: Option) -> (Vec, Arc) { + // Check if global environment is already initialized + if let Some((disk_paths, ecstore)) = GLOBAL_TEST_ENV.get() { + return (disk_paths.clone(), ecstore.clone()); + } + // create temp dir as 4 disks let test_base_dir = test_dir.unwrap_or("/tmp/rustfs_ahm_test"); let temp_dir = std::path::PathBuf::from(test_base_dir); @@ -1222,6 +1231,9 @@ mod tests { let buckets = buckets_list.into_iter().map(|v| v.name).collect(); rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await; + // Store in global cache + let _ = GLOBAL_TEST_ENV.set((disk_paths.clone(), ecstore.clone())); + (disk_paths, ecstore) } diff --git a/crates/ahm/tests/heal_integration_test.rs b/crates/ahm/tests/heal_integration_test.rs index 905115ed..c5a6f3a5 100644 --- a/crates/ahm/tests/heal_integration_test.rs +++ b/crates/ahm/tests/heal_integration_test.rs @@ -12,11 +12,15 @@ use rustfs_ecstore::{ }; use serial_test::serial; use std::sync::Once; +use std::sync::OnceLock; use std::{path::PathBuf, sync::Arc, time::Duration}; use tokio::fs; use tracing::info; use walkdir::WalkDir; + +static GLOBAL_ENV: OnceLock<(Vec, Arc, Arc)> = OnceLock::new(); static INIT: Once = Once::new(); + fn init_tracing() { INIT.call_once(|| { let _ = tracing_subscriber::fmt::try_init(); @@ -25,9 +29,7 @@ fn init_tracing() { /// Test helper: Create test environment with ECStore async fn setup_test_env() -> (Vec, Arc, Arc) { - use std::sync::OnceLock; init_tracing(); - static GLOBAL_ENV: OnceLock<(Vec, Arc, Arc)> = OnceLock::new(); // Fast path: already initialized, just clone and return if let Some((paths, ecstore, heal_storage)) = GLOBAL_ENV.get() { @@ -124,24 +126,6 @@ async fn upload_test_object(ecstore: &Arc, bucket: &str, object: &str, info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size); } -/// Test helper: Cleanup test environment -async fn cleanup_test_env(disk_paths: &[PathBuf]) { - for disk_path in disk_paths { - if disk_path.exists() { - fs::remove_dir_all(disk_path).await.expect("Failed to cleanup disk path"); - } - } - - // Attempt to clean up base directory inferred from disk_paths[0] - if let Some(parent) = disk_paths.first().and_then(|p| p.parent()).and_then(|p| p.parent()) { - if parent.exists() { - fs::remove_dir_all(parent).await.ok(); - } - } - - info!("Test environment cleaned up"); -} - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[serial] async fn test_heal_object_basic() { @@ -219,9 +203,6 @@ async fn test_heal_object_basic() { // ─── 2️⃣ verify each part file is restored ─────── assert!(target_part.exists()); - // Cleanup - cleanup_test_env(&disk_paths).await; - info!("Heal object basic test passed"); } @@ -290,9 +271,6 @@ async fn test_heal_bucket_basic() { // ─── 3️⃣ Verify bucket directory is restored on every disk ─────── assert!(broken_bucket_path.exists(), "bucket dir does not exist on disk"); - // Cleanup - cleanup_test_env(&disk_paths).await; - info!("Heal bucket basic test passed"); } @@ -322,16 +300,13 @@ async fn test_heal_format_basic() { // ─── 2️⃣ verify format.json is restored ─────── assert!(format_path.exists(), "format.json does not exist on disk after heal"); - // Cleanup - cleanup_test_env(&disk_paths).await; - info!("Heal format basic test passed"); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[serial] async fn test_heal_storage_api_direct() { - let (disk_paths, ecstore, heal_storage) = setup_test_env().await; + let (_disk_paths, ecstore, heal_storage) = setup_test_env().await; // Test direct heal storage API calls @@ -383,8 +358,5 @@ async fn test_heal_storage_api_direct() { assert!(object_result.is_ok()); info!("Direct heal_object test passed"); - // Cleanup - cleanup_test_env(&disk_paths).await; - info!("Direct heal storage API test passed"); } diff --git a/crates/common/src/heal_channel.rs b/crates/common/src/heal_channel.rs index 22943852..56399cb3 100644 --- a/crates/common/src/heal_channel.rs +++ b/crates/common/src/heal_channel.rs @@ -28,7 +28,7 @@ pub enum HealChannelCommand { } /// Heal request from admin to ahm -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct HealChannelRequest { /// Unique request ID pub id: String, @@ -120,7 +120,7 @@ pub async fn send_heal_command(command: HealChannelCommand) -> Result<(), String if let Some(sender) = get_heal_channel_sender() { sender .send(command) - .map_err(|e| format!("Failed to send heal command: {}", e))?; + .map_err(|e| format!("Failed to send heal command: {e}"))?; Ok(()) } else { Err("Heal channel not initialized".to_string()) @@ -175,13 +175,6 @@ pub fn create_heal_request_with_options( priority: Option, pool_index: Option, set_index: Option, - scan_mode: Option, - remove_corrupted: Option, - recreate_missing: Option, - update_parity: Option, - recursive: Option, - dry_run: Option, - timeout_seconds: Option, ) -> HealChannelRequest { HealChannelRequest { id: Uuid::new_v4().to_string(), @@ -191,13 +184,7 @@ pub fn create_heal_request_with_options( priority: priority.unwrap_or_default(), pool_index, set_index, - scan_mode, - remove_corrupted, - recreate_missing, - update_parity, - recursive, - dry_run, - timeout_seconds, + ..Default::default() } } diff --git a/crates/ecstore/src/cache_value/mod.rs b/crates/ecstore/src/cache_value/mod.rs index 328444a0..5ab62809 100644 --- a/crates/ecstore/src/cache_value/mod.rs +++ b/crates/ecstore/src/cache_value/mod.rs @@ -12,5 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use lazy_static::lazy_static; +use tokio_util::sync::CancellationToken; + // pub mod cache; pub mod metacache_set; + +lazy_static! { + pub static ref LIST_PATH_RAW_CANCEL_TOKEN: Arc = Arc::new(CancellationToken::new()); +} diff --git a/crates/ecstore/src/heal/data_scanner.rs b/crates/ecstore/src/heal/data_scanner.rs index 4449d81a..c5ea666c 100644 --- a/crates/ecstore/src/heal/data_scanner.rs +++ b/crates/ecstore/src/heal/data_scanner.rs @@ -1251,7 +1251,7 @@ impl FolderScanner { resolver.bucket = bucket.clone(); let found_objs = Arc::new(RwLock::new(false)); let found_objs_clone = found_objs.clone(); - let (tx, rx) = broadcast::channel(1); + let (tx, _rx) = broadcast::channel(1); // let tx_partial = tx.clone(); let tx_finished = tx.clone(); let update_current_path_agreed = self.update_current_path.clone(); @@ -1372,7 +1372,7 @@ impl FolderScanner { })), ..Default::default() }; - let _ = list_path_raw(rx, lopts).await; + let _ = list_path_raw(lopts).await; if *found_objs.read().await { let this: CachedFolder = CachedFolder { diff --git a/crates/ecstore/src/rpc/tonic_service.rs b/crates/ecstore/src/rpc/tonic_service.rs index 3607d0da..ffcc3c0a 100644 --- a/crates/ecstore/src/rpc/tonic_service.rs +++ b/crates/ecstore/src/rpc/tonic_service.rs @@ -3685,14 +3685,6 @@ mod tests { assert!(format!("{service2:?}").contains("NodeService")); } - #[tokio::test] - async fn test_all_disk_method() { - let service = create_test_node_service(); - let disks = service.all_disk().await; - // Should return empty vector in test environment - assert!(disks.is_empty()); - } - #[tokio::test] async fn test_find_disk_method() { let service = create_test_node_service();