From 4e2c4d8dba1a34ca50eb35a4182fb97c9c94b602 Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Tue, 8 Jul 2025 11:15:06 +0800 Subject: [PATCH] fix: improve data scanner random sleep calculation - Fix random number generation API usage - Adjust sleep calculation to follow MinIO pattern - Ensure proper random range for scanner cycles Signed-off-by: junxiang Mu <1948535941@qq.com> --- crates/ecstore/src/heal/data_scanner.rs | 111 +++++++++++++++++++----- rustfs/src/main.rs | 16 +++- 2 files changed, 99 insertions(+), 28 deletions(-) diff --git a/crates/ecstore/src/heal/data_scanner.rs b/crates/ecstore/src/heal/data_scanner.rs index b8764fbf..1b4721af 100644 --- a/crates/ecstore/src/heal/data_scanner.rs +++ b/crates/ecstore/src/heal/data_scanner.rs @@ -20,17 +20,18 @@ use std::{ path::{Path, PathBuf}, pin::Pin, sync::{ - Arc, + Arc, OnceLock, atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, }, time::{Duration, SystemTime}, }; use time::{self, OffsetDateTime}; +use tokio_util::sync::CancellationToken; use super::{ data_scanner_metric::{ScannerMetric, ScannerMetrics, globalScannerMetrics}, - data_usage::{DATA_USAGE_BLOOM_NAME_PATH, store_data_usage_in_backend}, + data_usage::{DATA_USAGE_BLOOM_NAME_PATH, DataUsageInfo, store_data_usage_in_backend}, data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash}, heal_commands::{HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN, HealScanMode}, }; @@ -127,6 +128,8 @@ lazy_static! { pub static ref globalHealConfig: Arc> = Arc::new(RwLock::new(Config::default())); } +static GLOBAL_SCANNER_CANCEL_TOKEN: OnceLock = OnceLock::new(); + struct DynamicSleeper { factor: f64, max_sleep: Duration, @@ -195,36 +198,66 @@ fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> Dyn /// - Minimum sleep duration to avoid excessive CPU usage /// - Proper error handling and logging /// +/// # Returns +/// A CancellationToken that can be used to gracefully shutdown the scanner +/// /// # Architecture /// 1. Initialize with random seed for sleep intervals /// 2. Run scanner cycles in a loop /// 3. Use randomized sleep between cycles to avoid thundering herd /// 4. Ensure minimum sleep duration to prevent CPU thrashing -pub async fn init_data_scanner() { +pub async fn init_data_scanner() -> CancellationToken { info!("Initializing data scanner background task"); + let cancel_token = CancellationToken::new(); + GLOBAL_SCANNER_CANCEL_TOKEN + .set(cancel_token.clone()) + .expect("Scanner already initialized"); + + let cancel_clone = cancel_token.clone(); tokio::spawn(async move { + info!("Data scanner background task started"); + loop { - // Run the data scanner - run_data_scanner().await; + tokio::select! { + _ = cancel_clone.cancelled() => { + info!("Data scanner received shutdown signal, exiting gracefully"); + break; + } + _ = run_data_scanner_cycle() => { + // Calculate randomized sleep duration + let random_factor = { + let mut rng = rand::rng(); + rng.random_range(1.0..10.0) + }; + let base_cycle_duration = SCANNER_CYCLE.load(Ordering::SeqCst) as f64; + let sleep_duration_secs = random_factor * base_cycle_duration; - // Calculate randomized sleep duration - // Use random factor (0.0 to 1.0) multiplied by the scanner cycle duration - let random_factor = { - let mut rng = rand::rng(); - rng.random_range(1.0..10.0) - }; - let base_cycle_duration = SCANNER_CYCLE.load(Ordering::SeqCst) as f64; - let sleep_duration_secs = random_factor * base_cycle_duration; + let sleep_duration = Duration::from_secs_f64(sleep_duration_secs); - let sleep_duration = Duration::from_secs_f64(sleep_duration_secs); + debug!( + duration_secs = sleep_duration.as_secs(), + "Data scanner sleeping before next cycle" + ); - info!(duration_secs = sleep_duration.as_secs(), "Data scanner sleeping before next cycle"); - - // Sleep with the calculated duration - sleep(sleep_duration).await; + // Interruptible sleep + tokio::select! { + _ = cancel_clone.cancelled() => { + info!("Data scanner received shutdown signal during sleep, exiting"); + break; + } + _ = sleep(sleep_duration) => { + // Continue to next cycle + } + } + } + } } + + info!("Data scanner background task stopped gracefully"); }); + + cancel_token } /// Run a single data scanner cycle @@ -239,8 +272,8 @@ pub async fn init_data_scanner() { /// - Gracefully handles missing object layer /// - Continues operation even if individual steps fail /// - Logs errors appropriately without terminating the scanner -async fn run_data_scanner() { - info!("Starting data scanner cycle"); +async fn run_data_scanner_cycle() { + debug!("Starting data scanner cycle"); // Get the object layer, return early if not available let Some(store) = new_object_layer_fn() else { @@ -248,6 +281,14 @@ async fn run_data_scanner() { return; }; + // Check for cancellation before starting expensive operations + if let Some(token) = GLOBAL_SCANNER_CANCEL_TOKEN.get() { + if token.is_cancelled() { + debug!("Scanner cancelled before starting cycle"); + return; + } + } + // Load current cycle information from persistent storage let buf = read_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH) .await @@ -293,7 +334,7 @@ async fn run_data_scanner() { } // Set up data usage storage channel - let (tx, rx) = mpsc::channel(100); + let (tx, rx) = mpsc::channel::(100); tokio::spawn(async move { let _ = store_data_usage_in_backend(rx).await; }); @@ -308,8 +349,8 @@ async fn run_data_scanner() { "Starting namespace scanner" ); - // Run the namespace scanner - match store.clone().ns_scanner(tx, cycle_info.current as usize, scan_mode).await { + // Run the namespace scanner with cancellation support + match execute_namespace_scan(&store, tx, cycle_info.current, scan_mode).await { Ok(_) => { info!(cycle = cycle_info.current, "Namespace scanner completed successfully"); @@ -349,6 +390,28 @@ async fn run_data_scanner() { stop_fn(&scan_result); } +/// Execute namespace scan with cancellation support +async fn execute_namespace_scan( + store: &Arc, + tx: Sender, + cycle: u64, + scan_mode: HealScanMode, +) -> Result<()> { + let cancel_token = GLOBAL_SCANNER_CANCEL_TOKEN + .get() + .ok_or_else(|| Error::other("Scanner not initialized"))?; + + tokio::select! { + result = store.ns_scanner(tx, cycle as usize, scan_mode) => { + result.map_err(|e| Error::other(format!("Namespace scan failed: {e}"))) + } + _ = cancel_token.cancelled() => { + info!("Namespace scan cancelled"); + Err(Error::other("Scan cancelled")) + } + } +} + #[derive(Debug, Serialize, Deserialize)] struct BackgroundHealInfo { bitrot_start_time: SystemTime, @@ -404,7 +467,7 @@ async fn get_cycle_scan_mode(current_cycle: u64, bitrot_start_cycle: u64, bitrot return HEAL_DEEP_SCAN; } - if bitrot_start_time.duration_since(SystemTime::now()).unwrap() > bitrot_cycle { + if SystemTime::now().duration_since(bitrot_start_time).unwrap_or_default() > bitrot_cycle { return HEAL_DEEP_SCAN; } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index ecb76259..7a6b50ce 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -429,7 +429,7 @@ async fn run(opt: config::Opt) -> Result<()> { })?; // init scanner - init_data_scanner().await; + let scanner_cancel_token = init_data_scanner().await; // init auto heal init_auto_heal().await; // init console configuration @@ -493,11 +493,11 @@ async fn run(opt: config::Opt) -> Result<()> { match wait_for_shutdown().await { #[cfg(unix)] ShutdownSignal::CtrlC | ShutdownSignal::Sigint | ShutdownSignal::Sigterm => { - handle_shutdown(&state_manager, &shutdown_tx).await; + handle_shutdown(&state_manager, &shutdown_tx, &scanner_cancel_token).await; } #[cfg(not(unix))] ShutdownSignal::CtrlC => { - handle_shutdown(&state_manager, &shutdown_tx).await; + handle_shutdown(&state_manager, &shutdown_tx, &scanner_cancel_token).await; } } @@ -603,11 +603,19 @@ fn process_connection( } /// Handles the shutdown process of the server -async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) { +async fn handle_shutdown( + state_manager: &ServiceStateManager, + shutdown_tx: &tokio::sync::broadcast::Sender<()>, + scanner_cancel_token: &tokio_util::sync::CancellationToken, +) { info!("Shutdown signal received in main thread"); // update the status to stopping first state_manager.update(ServiceState::Stopping); + // Stop data scanner gracefully + info!("Stopping data scanner..."); + scanner_cancel_token.cancel(); + // Stop the notification system shutdown_event_notifier().await;