fix: improve data scanner random sleep calculation

- Fix random number generation API usage
- Adjust sleep calculation to follow MinIO pattern
- Ensure proper random range for scanner cycles

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2025-07-08 11:15:06 +08:00
parent 9862677fcf
commit 4e2c4d8dba
2 changed files with 99 additions and 28 deletions

View File

@@ -20,17 +20,18 @@ use std::{
path::{Path, PathBuf},
pin::Pin,
sync::{
Arc,
Arc, OnceLock,
atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
},
time::{Duration, SystemTime},
};
use time::{self, OffsetDateTime};
use tokio_util::sync::CancellationToken;
use super::{
data_scanner_metric::{ScannerMetric, ScannerMetrics, globalScannerMetrics},
data_usage::{DATA_USAGE_BLOOM_NAME_PATH, store_data_usage_in_backend},
data_usage::{DATA_USAGE_BLOOM_NAME_PATH, DataUsageInfo, store_data_usage_in_backend},
data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash},
heal_commands::{HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN, HealScanMode},
};
@@ -127,6 +128,8 @@ lazy_static! {
pub static ref globalHealConfig: Arc<RwLock<Config>> = Arc::new(RwLock::new(Config::default()));
}
static GLOBAL_SCANNER_CANCEL_TOKEN: OnceLock<CancellationToken> = OnceLock::new();
struct DynamicSleeper {
factor: f64,
max_sleep: Duration,
@@ -195,36 +198,66 @@ fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> Dyn
/// - Minimum sleep duration to avoid excessive CPU usage
/// - Proper error handling and logging
///
/// # Returns
/// A CancellationToken that can be used to gracefully shutdown the scanner
///
/// # Architecture
/// 1. Initialize with random seed for sleep intervals
/// 2. Run scanner cycles in a loop
/// 3. Use randomized sleep between cycles to avoid thundering herd
/// 4. Ensure minimum sleep duration to prevent CPU thrashing
pub async fn init_data_scanner() {
pub async fn init_data_scanner() -> CancellationToken {
info!("Initializing data scanner background task");
let cancel_token = CancellationToken::new();
GLOBAL_SCANNER_CANCEL_TOKEN
.set(cancel_token.clone())
.expect("Scanner already initialized");
let cancel_clone = cancel_token.clone();
tokio::spawn(async move {
info!("Data scanner background task started");
loop {
// Run the data scanner
run_data_scanner().await;
tokio::select! {
_ = cancel_clone.cancelled() => {
info!("Data scanner received shutdown signal, exiting gracefully");
break;
}
_ = run_data_scanner_cycle() => {
// Calculate randomized sleep duration
let random_factor = {
let mut rng = rand::rng();
rng.random_range(1.0..10.0)
};
let base_cycle_duration = SCANNER_CYCLE.load(Ordering::SeqCst) as f64;
let sleep_duration_secs = random_factor * base_cycle_duration;
// Calculate randomized sleep duration
// Use random factor (0.0 to 1.0) multiplied by the scanner cycle duration
let random_factor = {
let mut rng = rand::rng();
rng.random_range(1.0..10.0)
};
let base_cycle_duration = SCANNER_CYCLE.load(Ordering::SeqCst) as f64;
let sleep_duration_secs = random_factor * base_cycle_duration;
let sleep_duration = Duration::from_secs_f64(sleep_duration_secs);
let sleep_duration = Duration::from_secs_f64(sleep_duration_secs);
debug!(
duration_secs = sleep_duration.as_secs(),
"Data scanner sleeping before next cycle"
);
info!(duration_secs = sleep_duration.as_secs(), "Data scanner sleeping before next cycle");
// Sleep with the calculated duration
sleep(sleep_duration).await;
// Interruptible sleep
tokio::select! {
_ = cancel_clone.cancelled() => {
info!("Data scanner received shutdown signal during sleep, exiting");
break;
}
_ = sleep(sleep_duration) => {
// Continue to next cycle
}
}
}
}
}
info!("Data scanner background task stopped gracefully");
});
cancel_token
}
/// Run a single data scanner cycle
@@ -239,8 +272,8 @@ pub async fn init_data_scanner() {
/// - Gracefully handles missing object layer
/// - Continues operation even if individual steps fail
/// - Logs errors appropriately without terminating the scanner
async fn run_data_scanner() {
info!("Starting data scanner cycle");
async fn run_data_scanner_cycle() {
debug!("Starting data scanner cycle");
// Get the object layer, return early if not available
let Some(store) = new_object_layer_fn() else {
@@ -248,6 +281,14 @@ async fn run_data_scanner() {
return;
};
// Check for cancellation before starting expensive operations
if let Some(token) = GLOBAL_SCANNER_CANCEL_TOKEN.get() {
if token.is_cancelled() {
debug!("Scanner cancelled before starting cycle");
return;
}
}
// Load current cycle information from persistent storage
let buf = read_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH)
.await
@@ -293,7 +334,7 @@ async fn run_data_scanner() {
}
// Set up data usage storage channel
let (tx, rx) = mpsc::channel(100);
let (tx, rx) = mpsc::channel::<DataUsageInfo>(100);
tokio::spawn(async move {
let _ = store_data_usage_in_backend(rx).await;
});
@@ -308,8 +349,8 @@ async fn run_data_scanner() {
"Starting namespace scanner"
);
// Run the namespace scanner
match store.clone().ns_scanner(tx, cycle_info.current as usize, scan_mode).await {
// Run the namespace scanner with cancellation support
match execute_namespace_scan(&store, tx, cycle_info.current, scan_mode).await {
Ok(_) => {
info!(cycle = cycle_info.current, "Namespace scanner completed successfully");
@@ -349,6 +390,28 @@ async fn run_data_scanner() {
stop_fn(&scan_result);
}
/// Execute namespace scan with cancellation support
async fn execute_namespace_scan(
store: &Arc<ECStore>,
tx: Sender<DataUsageInfo>,
cycle: u64,
scan_mode: HealScanMode,
) -> Result<()> {
let cancel_token = GLOBAL_SCANNER_CANCEL_TOKEN
.get()
.ok_or_else(|| Error::other("Scanner not initialized"))?;
tokio::select! {
result = store.ns_scanner(tx, cycle as usize, scan_mode) => {
result.map_err(|e| Error::other(format!("Namespace scan failed: {e}")))
}
_ = cancel_token.cancelled() => {
info!("Namespace scan cancelled");
Err(Error::other("Scan cancelled"))
}
}
}
#[derive(Debug, Serialize, Deserialize)]
struct BackgroundHealInfo {
bitrot_start_time: SystemTime,
@@ -404,7 +467,7 @@ async fn get_cycle_scan_mode(current_cycle: u64, bitrot_start_cycle: u64, bitrot
return HEAL_DEEP_SCAN;
}
if bitrot_start_time.duration_since(SystemTime::now()).unwrap() > bitrot_cycle {
if SystemTime::now().duration_since(bitrot_start_time).unwrap_or_default() > bitrot_cycle {
return HEAL_DEEP_SCAN;
}

View File

@@ -429,7 +429,7 @@ async fn run(opt: config::Opt) -> Result<()> {
})?;
// init scanner
init_data_scanner().await;
let scanner_cancel_token = init_data_scanner().await;
// init auto heal
init_auto_heal().await;
// init console configuration
@@ -493,11 +493,11 @@ async fn run(opt: config::Opt) -> Result<()> {
match wait_for_shutdown().await {
#[cfg(unix)]
ShutdownSignal::CtrlC | ShutdownSignal::Sigint | ShutdownSignal::Sigterm => {
handle_shutdown(&state_manager, &shutdown_tx).await;
handle_shutdown(&state_manager, &shutdown_tx, &scanner_cancel_token).await;
}
#[cfg(not(unix))]
ShutdownSignal::CtrlC => {
handle_shutdown(&state_manager, &shutdown_tx).await;
handle_shutdown(&state_manager, &shutdown_tx, &scanner_cancel_token).await;
}
}
@@ -603,11 +603,19 @@ fn process_connection(
}
/// Handles the shutdown process of the server
async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) {
async fn handle_shutdown(
state_manager: &ServiceStateManager,
shutdown_tx: &tokio::sync::broadcast::Sender<()>,
scanner_cancel_token: &tokio_util::sync::CancellationToken,
) {
info!("Shutdown signal received in main thread");
// update the status to stopping first
state_manager.update(ServiceState::Stopping);
// Stop data scanner gracefully
info!("Stopping data scanner...");
scanner_cancel_token.cancel();
// Stop the notification system
shutdown_event_notifier().await;