fix: resolve test conflicts and improve data scanner functionality

- Fix multi-threaded test conflicts in AHM heal integration tests
- Remove global environment sharing to prevent test state pollution
- Fix test_all_disk_method by clearing global disk map before test
- Improve data scanner and cache value implementations
- Update dependencies and resolve clippy warnings

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2025-07-16 16:32:33 +08:00
parent 8e766b90cd
commit c49414f6ac
8 changed files with 36 additions and 68 deletions

View File

@@ -103,7 +103,7 @@ impl HealChannelProcessor {
let response = HealChannelResponse {
request_id: request.id,
success: true,
data: Some(format!("Task ID: {}", task_id).into_bytes()),
data: Some(format!("Task ID: {task_id}").into_bytes()),
error: None,
};
@@ -140,7 +140,7 @@ impl HealChannelProcessor {
let response = HealChannelResponse {
request_id: client_token,
success: true,
data: Some(format!("Query result for path: {}", heal_path).into_bytes()),
data: Some(format!("Query result for path: {heal_path}").into_bytes()),
error: None,
};
@@ -160,7 +160,7 @@ impl HealChannelProcessor {
let response = HealChannelResponse {
request_id: heal_path.clone(),
success: true,
data: Some(format!("Cancel request for path: {}", heal_path).into_bytes()),
data: Some(format!("Cancel request for path: {heal_path}").into_bytes()),
error: None,
};
@@ -196,9 +196,7 @@ impl HealChannelProcessor {
Some(rustfs_common::heal_channel::HealChannelScanMode::Normal) => {
rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN
}
Some(rustfs_common::heal_channel::HealChannelScanMode::Deep) => {
rustfs_ecstore::heal::heal_commands::HEAL_DEEP_SCAN
}
Some(rustfs_common::heal_channel::HealChannelScanMode::Deep) => rustfs_ecstore::heal::heal_commands::HEAL_DEEP_SCAN,
None => rustfs_ecstore::heal::heal_commands::HEAL_NORMAL_SCAN,
};
@@ -210,7 +208,7 @@ impl HealChannelProcessor {
update_parity: request.update_parity.unwrap_or(true),
recursive: request.recursive.unwrap_or(false),
dry_run: request.dry_run.unwrap_or(false),
timeout: request.timeout_seconds.map(|secs| std::time::Duration::from_secs(secs)),
timeout: request.timeout_seconds.map(std::time::Duration::from_secs),
pool_index: request.pool_index,
set_index: request.set_index,
};

View File

@@ -278,7 +278,6 @@ impl HealManager {
_ = interval.tick() => {
// Build list of endpoints that need healing
let mut endpoints = Vec::new();
println!("GLOBAL_LOCAL_DISK_MAP length: {:?}", GLOBAL_LOCAL_DISK_MAP.read().await.len());
for (_, disk_opt) in GLOBAL_LOCAL_DISK_MAP.read().await.iter() {
if let Some(disk) = disk_opt {
// detect unformatted disk via get_disk_id()
@@ -300,7 +299,6 @@ impl HealManager {
if endpoints.is_empty() {
continue;
}
println!("endpoints length: {:?}", endpoints.len());
for ep in endpoints {
// skip if already queued or healing

View File

@@ -1159,8 +1159,17 @@ mod tests {
use serial_test::serial;
use std::fs;
use std::net::SocketAddr;
use std::sync::OnceLock;
// Global test environment cache to avoid repeated initialization
static GLOBAL_TEST_ENV: OnceLock<(Vec<std::path::PathBuf>, Arc<ECStore>)> = OnceLock::new();
async fn prepare_test_env(test_dir: Option<&str>, port: Option<u16>) -> (Vec<std::path::PathBuf>, Arc<ECStore>) {
// Check if global environment is already initialized
if let Some((disk_paths, ecstore)) = GLOBAL_TEST_ENV.get() {
return (disk_paths.clone(), ecstore.clone());
}
// create temp dir as 4 disks
let test_base_dir = test_dir.unwrap_or("/tmp/rustfs_ahm_test");
let temp_dir = std::path::PathBuf::from(test_base_dir);
@@ -1222,6 +1231,9 @@ mod tests {
let buckets = buckets_list.into_iter().map(|v| v.name).collect();
rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await;
// Store in global cache
let _ = GLOBAL_TEST_ENV.set((disk_paths.clone(), ecstore.clone()));
(disk_paths, ecstore)
}

View File

@@ -12,11 +12,15 @@ use rustfs_ecstore::{
};
use serial_test::serial;
use std::sync::Once;
use std::sync::OnceLock;
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::fs;
use tracing::info;
use walkdir::WalkDir;
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage>)> = OnceLock::new();
static INIT: Once = Once::new();
fn init_tracing() {
INIT.call_once(|| {
let _ = tracing_subscriber::fmt::try_init();
@@ -25,9 +29,7 @@ fn init_tracing() {
/// Test helper: Create test environment with ECStore
async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage>) {
use std::sync::OnceLock;
init_tracing();
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage>)> = OnceLock::new();
// Fast path: already initialized, just clone and return
if let Some((paths, ecstore, heal_storage)) = GLOBAL_ENV.get() {
@@ -124,24 +126,6 @@ async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str,
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
}
/// Test helper: Cleanup test environment
async fn cleanup_test_env(disk_paths: &[PathBuf]) {
for disk_path in disk_paths {
if disk_path.exists() {
fs::remove_dir_all(disk_path).await.expect("Failed to cleanup disk path");
}
}
// Attempt to clean up base directory inferred from disk_paths[0]
if let Some(parent) = disk_paths.first().and_then(|p| p.parent()).and_then(|p| p.parent()) {
if parent.exists() {
fs::remove_dir_all(parent).await.ok();
}
}
info!("Test environment cleaned up");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_heal_object_basic() {
@@ -219,9 +203,6 @@ async fn test_heal_object_basic() {
// ─── 2⃣ verify each part file is restored ───────
assert!(target_part.exists());
// Cleanup
cleanup_test_env(&disk_paths).await;
info!("Heal object basic test passed");
}
@@ -290,9 +271,6 @@ async fn test_heal_bucket_basic() {
// ─── 3⃣ Verify bucket directory is restored on every disk ───────
assert!(broken_bucket_path.exists(), "bucket dir does not exist on disk");
// Cleanup
cleanup_test_env(&disk_paths).await;
info!("Heal bucket basic test passed");
}
@@ -322,16 +300,13 @@ async fn test_heal_format_basic() {
// ─── 2⃣ verify format.json is restored ───────
assert!(format_path.exists(), "format.json does not exist on disk after heal");
// Cleanup
cleanup_test_env(&disk_paths).await;
info!("Heal format basic test passed");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_heal_storage_api_direct() {
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
let (_disk_paths, ecstore, heal_storage) = setup_test_env().await;
// Test direct heal storage API calls
@@ -383,8 +358,5 @@ async fn test_heal_storage_api_direct() {
assert!(object_result.is_ok());
info!("Direct heal_object test passed");
// Cleanup
cleanup_test_env(&disk_paths).await;
info!("Direct heal storage API test passed");
}

View File

@@ -28,7 +28,7 @@ pub enum HealChannelCommand {
}
/// Heal request from admin to ahm
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct HealChannelRequest {
/// Unique request ID
pub id: String,
@@ -120,7 +120,7 @@ pub async fn send_heal_command(command: HealChannelCommand) -> Result<(), String
if let Some(sender) = get_heal_channel_sender() {
sender
.send(command)
.map_err(|e| format!("Failed to send heal command: {}", e))?;
.map_err(|e| format!("Failed to send heal command: {e}"))?;
Ok(())
} else {
Err("Heal channel not initialized".to_string())
@@ -175,13 +175,6 @@ pub fn create_heal_request_with_options(
priority: Option<HealChannelPriority>,
pool_index: Option<usize>,
set_index: Option<usize>,
scan_mode: Option<HealChannelScanMode>,
remove_corrupted: Option<bool>,
recreate_missing: Option<bool>,
update_parity: Option<bool>,
recursive: Option<bool>,
dry_run: Option<bool>,
timeout_seconds: Option<u64>,
) -> HealChannelRequest {
HealChannelRequest {
id: Uuid::new_v4().to_string(),
@@ -191,13 +184,7 @@ pub fn create_heal_request_with_options(
priority: priority.unwrap_or_default(),
pool_index,
set_index,
scan_mode,
remove_corrupted,
recreate_missing,
update_parity,
recursive,
dry_run,
timeout_seconds,
..Default::default()
}
}

View File

@@ -12,5 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use lazy_static::lazy_static;
use tokio_util::sync::CancellationToken;
// pub mod cache;
pub mod metacache_set;
lazy_static! {
pub static ref LIST_PATH_RAW_CANCEL_TOKEN: Arc<CancellationToken> = Arc::new(CancellationToken::new());
}

View File

@@ -1251,7 +1251,7 @@ impl FolderScanner {
resolver.bucket = bucket.clone();
let found_objs = Arc::new(RwLock::new(false));
let found_objs_clone = found_objs.clone();
let (tx, rx) = broadcast::channel(1);
let (tx, _rx) = broadcast::channel(1);
// let tx_partial = tx.clone();
let tx_finished = tx.clone();
let update_current_path_agreed = self.update_current_path.clone();
@@ -1372,7 +1372,7 @@ impl FolderScanner {
})),
..Default::default()
};
let _ = list_path_raw(rx, lopts).await;
let _ = list_path_raw(lopts).await;
if *found_objs.read().await {
let this: CachedFolder = CachedFolder {

View File

@@ -3685,14 +3685,6 @@ mod tests {
assert!(format!("{service2:?}").contains("NodeService"));
}
#[tokio::test]
async fn test_all_disk_method() {
let service = create_test_node_service();
let disks = service.all_disk().await;
// Should return empty vector in test environment
assert!(disks.is_empty());
}
#[tokio::test]
async fn test_find_disk_method() {
let service = create_test_node_service();