mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
Merge pull request #96 from guojidan/scanner
fix: improve data scanner random sleep calculation
This commit is contained in:
@@ -20,17 +20,18 @@ use std::{
|
||||
path::{Path, PathBuf},
|
||||
pin::Pin,
|
||||
sync::{
|
||||
Arc,
|
||||
Arc, OnceLock,
|
||||
atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
|
||||
},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use time::{self, OffsetDateTime};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use super::{
|
||||
data_scanner_metric::{ScannerMetric, ScannerMetrics, globalScannerMetrics},
|
||||
data_usage::{DATA_USAGE_BLOOM_NAME_PATH, store_data_usage_in_backend},
|
||||
data_usage::{DATA_USAGE_BLOOM_NAME_PATH, DataUsageInfo, store_data_usage_in_backend},
|
||||
data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash},
|
||||
heal_commands::{HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN, HealScanMode},
|
||||
};
|
||||
@@ -127,6 +128,8 @@ lazy_static! {
|
||||
pub static ref globalHealConfig: Arc<RwLock<Config>> = Arc::new(RwLock::new(Config::default()));
|
||||
}
|
||||
|
||||
static GLOBAL_SCANNER_CANCEL_TOKEN: OnceLock<CancellationToken> = OnceLock::new();
|
||||
|
||||
struct DynamicSleeper {
|
||||
factor: f64,
|
||||
max_sleep: Duration,
|
||||
@@ -195,36 +198,66 @@ fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> Dyn
|
||||
/// - Minimum sleep duration to avoid excessive CPU usage
|
||||
/// - Proper error handling and logging
|
||||
///
|
||||
/// # Returns
|
||||
/// A CancellationToken that can be used to gracefully shutdown the scanner
|
||||
///
|
||||
/// # Architecture
|
||||
/// 1. Initialize with random seed for sleep intervals
|
||||
/// 2. Run scanner cycles in a loop
|
||||
/// 3. Use randomized sleep between cycles to avoid thundering herd
|
||||
/// 4. Ensure minimum sleep duration to prevent CPU thrashing
|
||||
pub async fn init_data_scanner() {
|
||||
pub async fn init_data_scanner() -> CancellationToken {
|
||||
info!("Initializing data scanner background task");
|
||||
|
||||
let cancel_token = CancellationToken::new();
|
||||
GLOBAL_SCANNER_CANCEL_TOKEN
|
||||
.set(cancel_token.clone())
|
||||
.expect("Scanner already initialized");
|
||||
|
||||
let cancel_clone = cancel_token.clone();
|
||||
tokio::spawn(async move {
|
||||
info!("Data scanner background task started");
|
||||
|
||||
loop {
|
||||
// Run the data scanner
|
||||
run_data_scanner().await;
|
||||
tokio::select! {
|
||||
_ = cancel_clone.cancelled() => {
|
||||
info!("Data scanner received shutdown signal, exiting gracefully");
|
||||
break;
|
||||
}
|
||||
_ = run_data_scanner_cycle() => {
|
||||
// Calculate randomized sleep duration
|
||||
let random_factor = {
|
||||
let mut rng = rand::rng();
|
||||
rng.random_range(1.0..10.0)
|
||||
};
|
||||
let base_cycle_duration = SCANNER_CYCLE.load(Ordering::SeqCst) as f64;
|
||||
let sleep_duration_secs = random_factor * base_cycle_duration;
|
||||
|
||||
// Calculate randomized sleep duration
|
||||
// Use random factor (0.0 to 1.0) multiplied by the scanner cycle duration
|
||||
let random_factor = {
|
||||
let mut rng = rand::rng();
|
||||
rng.random_range(1.0..10.0)
|
||||
};
|
||||
let base_cycle_duration = SCANNER_CYCLE.load(Ordering::SeqCst) as f64;
|
||||
let sleep_duration_secs = random_factor * base_cycle_duration;
|
||||
let sleep_duration = Duration::from_secs_f64(sleep_duration_secs);
|
||||
|
||||
let sleep_duration = Duration::from_secs_f64(sleep_duration_secs);
|
||||
debug!(
|
||||
duration_secs = sleep_duration.as_secs(),
|
||||
"Data scanner sleeping before next cycle"
|
||||
);
|
||||
|
||||
info!(duration_secs = sleep_duration.as_secs(), "Data scanner sleeping before next cycle");
|
||||
|
||||
// Sleep with the calculated duration
|
||||
sleep(sleep_duration).await;
|
||||
// Interruptible sleep
|
||||
tokio::select! {
|
||||
_ = cancel_clone.cancelled() => {
|
||||
info!("Data scanner received shutdown signal during sleep, exiting");
|
||||
break;
|
||||
}
|
||||
_ = sleep(sleep_duration) => {
|
||||
// Continue to next cycle
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Data scanner background task stopped gracefully");
|
||||
});
|
||||
|
||||
cancel_token
|
||||
}
|
||||
|
||||
/// Run a single data scanner cycle
|
||||
@@ -239,8 +272,8 @@ pub async fn init_data_scanner() {
|
||||
/// - Gracefully handles missing object layer
|
||||
/// - Continues operation even if individual steps fail
|
||||
/// - Logs errors appropriately without terminating the scanner
|
||||
async fn run_data_scanner() {
|
||||
info!("Starting data scanner cycle");
|
||||
async fn run_data_scanner_cycle() {
|
||||
debug!("Starting data scanner cycle");
|
||||
|
||||
// Get the object layer, return early if not available
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
@@ -248,6 +281,14 @@ async fn run_data_scanner() {
|
||||
return;
|
||||
};
|
||||
|
||||
// Check for cancellation before starting expensive operations
|
||||
if let Some(token) = GLOBAL_SCANNER_CANCEL_TOKEN.get() {
|
||||
if token.is_cancelled() {
|
||||
debug!("Scanner cancelled before starting cycle");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Load current cycle information from persistent storage
|
||||
let buf = read_config(store.clone(), &DATA_USAGE_BLOOM_NAME_PATH)
|
||||
.await
|
||||
@@ -293,7 +334,7 @@ async fn run_data_scanner() {
|
||||
}
|
||||
|
||||
// Set up data usage storage channel
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
let (tx, rx) = mpsc::channel::<DataUsageInfo>(100);
|
||||
tokio::spawn(async move {
|
||||
let _ = store_data_usage_in_backend(rx).await;
|
||||
});
|
||||
@@ -308,8 +349,8 @@ async fn run_data_scanner() {
|
||||
"Starting namespace scanner"
|
||||
);
|
||||
|
||||
// Run the namespace scanner
|
||||
match store.clone().ns_scanner(tx, cycle_info.current as usize, scan_mode).await {
|
||||
// Run the namespace scanner with cancellation support
|
||||
match execute_namespace_scan(&store, tx, cycle_info.current, scan_mode).await {
|
||||
Ok(_) => {
|
||||
info!(cycle = cycle_info.current, "Namespace scanner completed successfully");
|
||||
|
||||
@@ -349,6 +390,28 @@ async fn run_data_scanner() {
|
||||
stop_fn(&scan_result);
|
||||
}
|
||||
|
||||
/// Execute namespace scan with cancellation support
|
||||
async fn execute_namespace_scan(
|
||||
store: &Arc<ECStore>,
|
||||
tx: Sender<DataUsageInfo>,
|
||||
cycle: u64,
|
||||
scan_mode: HealScanMode,
|
||||
) -> Result<()> {
|
||||
let cancel_token = GLOBAL_SCANNER_CANCEL_TOKEN
|
||||
.get()
|
||||
.ok_or_else(|| Error::other("Scanner not initialized"))?;
|
||||
|
||||
tokio::select! {
|
||||
result = store.ns_scanner(tx, cycle as usize, scan_mode) => {
|
||||
result.map_err(|e| Error::other(format!("Namespace scan failed: {e}")))
|
||||
}
|
||||
_ = cancel_token.cancelled() => {
|
||||
info!("Namespace scan cancelled");
|
||||
Err(Error::other("Scan cancelled"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct BackgroundHealInfo {
|
||||
bitrot_start_time: SystemTime,
|
||||
@@ -404,7 +467,7 @@ async fn get_cycle_scan_mode(current_cycle: u64, bitrot_start_cycle: u64, bitrot
|
||||
return HEAL_DEEP_SCAN;
|
||||
}
|
||||
|
||||
if bitrot_start_time.duration_since(SystemTime::now()).unwrap() > bitrot_cycle {
|
||||
if SystemTime::now().duration_since(bitrot_start_time).unwrap_or_default() > bitrot_cycle {
|
||||
return HEAL_DEEP_SCAN;
|
||||
}
|
||||
|
||||
|
||||
@@ -429,7 +429,7 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
})?;
|
||||
|
||||
// init scanner
|
||||
init_data_scanner().await;
|
||||
let scanner_cancel_token = init_data_scanner().await;
|
||||
// init auto heal
|
||||
init_auto_heal().await;
|
||||
// init console configuration
|
||||
@@ -493,11 +493,11 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
match wait_for_shutdown().await {
|
||||
#[cfg(unix)]
|
||||
ShutdownSignal::CtrlC | ShutdownSignal::Sigint | ShutdownSignal::Sigterm => {
|
||||
handle_shutdown(&state_manager, &shutdown_tx).await;
|
||||
handle_shutdown(&state_manager, &shutdown_tx, &scanner_cancel_token).await;
|
||||
}
|
||||
#[cfg(not(unix))]
|
||||
ShutdownSignal::CtrlC => {
|
||||
handle_shutdown(&state_manager, &shutdown_tx).await;
|
||||
handle_shutdown(&state_manager, &shutdown_tx, &scanner_cancel_token).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -603,11 +603,19 @@ fn process_connection(
|
||||
}
|
||||
|
||||
/// Handles the shutdown process of the server
|
||||
async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) {
|
||||
async fn handle_shutdown(
|
||||
state_manager: &ServiceStateManager,
|
||||
shutdown_tx: &tokio::sync::broadcast::Sender<()>,
|
||||
scanner_cancel_token: &tokio_util::sync::CancellationToken,
|
||||
) {
|
||||
info!("Shutdown signal received in main thread");
|
||||
// update the status to stopping first
|
||||
state_manager.update(ServiceState::Stopping);
|
||||
|
||||
// Stop data scanner gracefully
|
||||
info!("Stopping data scanner...");
|
||||
scanner_cancel_token.cancel();
|
||||
|
||||
// Stop the notification system
|
||||
shutdown_event_notifier().await;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user