decrease scanner frequency

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2025-05-29 03:17:47 +00:00
committed by houseme
parent eec086c1ec
commit 2f487be832
4 changed files with 49 additions and 78 deletions

View File

@@ -144,16 +144,16 @@ fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> Dyn
}
/// Initialize and start the data scanner in the background
///
///
/// This function starts a background task that continuously runs the data scanner
/// with randomized intervals between cycles to avoid resource contention.
///
///
/// # Features
/// - Graceful shutdown support via cancellation token
/// - Randomized sleep intervals to prevent synchronized scanning across nodes
/// - Minimum sleep duration to avoid excessive CPU usage
/// - Proper error handling and logging
///
///
/// # Architecture
/// 1. Initialize with random seed for sleep intervals
/// 2. Run scanner cycles in a loop
@@ -161,33 +161,25 @@ fn new_dynamic_sleeper(factor: f64, max_wait: Duration, is_scanner: bool) -> Dyn
/// 4. Ensure minimum sleep duration to prevent CPU thrashing
pub async fn init_data_scanner() {
info!("Initializing data scanner background task");
tokio::spawn(async move {
loop {
// Run the data scanner
run_data_scanner().await;
// Calculate randomized sleep duration
// Use random factor (0.0 to 1.0) multiplied by the scanner cycle duration
let random_factor: f64 = {
let mut rng = rand::thread_rng();
rng.gen_range(0.0..1.0)
rng.gen_range(1.0..10.0)
};
let base_cycle_duration = SCANNER_CYCLE.load(Ordering::SeqCst) as f64;
let sleep_duration_secs = random_factor * base_cycle_duration;
// Ensure minimum sleep duration of 1 second to avoid high CPU usage
let sleep_duration = if sleep_duration_secs < 1.0 {
Duration::from_secs(1)
} else {
Duration::from_secs_f64(sleep_duration_secs)
};
info!(
duration_secs = sleep_duration.as_secs(),
"Data scanner sleeping before next cycle"
);
let sleep_duration = Duration::from_secs_f64(sleep_duration_secs);
info!(duration_secs = sleep_duration.as_secs(), "Data scanner sleeping before next cycle");
// Sleep with the calculated duration
sleep(sleep_duration).await;
}
@@ -195,20 +187,20 @@ pub async fn init_data_scanner() {
}
/// Run a single data scanner cycle
///
///
/// This function performs one complete scan cycle, including:
/// - Loading and updating cycle information
/// - Determining scan mode based on healing configuration
/// - Running the namespace scanner
/// - Saving cycle completion state
///
///
/// # Error Handling
/// - Gracefully handles missing object layer
/// - Continues operation even if individual steps fail
/// - Logs errors appropriately without terminating the scanner
async fn run_data_scanner() {
info!("Starting data scanner cycle");
// Get the object layer, return early if not available
let Some(store) = new_object_layer_fn() else {
error!("Object layer not initialized, skipping scanner cycle");
@@ -235,7 +227,7 @@ async fn run_data_scanner() {
// Start metrics collection for this cycle
let stop_fn = ScannerMetrics::log(ScannerMetric::ScanCycle);
// Update cycle information
cycle_info.current = cycle_info.next;
cycle_info.started = Utc::now();
@@ -245,12 +237,8 @@ async fn run_data_scanner() {
// Read background healing information and determine scan mode
let bg_heal_info = read_background_heal_info(store.clone()).await;
let scan_mode = get_cycle_scan_mode(
cycle_info.current,
bg_heal_info.bitrot_start_cycle,
bg_heal_info.bitrot_start_time,
)
.await;
let scan_mode =
get_cycle_scan_mode(cycle_info.current, bg_heal_info.bitrot_start_cycle, bg_heal_info.bitrot_start_time).await;
// Update healing info if scan mode changed
if bg_heal_info.current_scan_mode != scan_mode {
@@ -280,19 +268,15 @@ async fn run_data_scanner() {
);
// Run the namespace scanner
match store
.clone()
.ns_scanner(tx, cycle_info.current as usize, scan_mode)
.await
{
match store.clone().ns_scanner(tx, cycle_info.current as usize, scan_mode).await {
Ok(_) => {
info!(cycle = cycle_info.current, "Namespace scanner completed successfully");
// Update cycle completion information
cycle_info.next += 1;
cycle_info.current = 0;
cycle_info.cycle_completed.push(Utc::now());
// Maintain cycle completion history (keep only recent cycles)
if cycle_info.cycle_completed.len() > DATA_USAGE_UPDATE_DIR_CYCLES as usize {
let _ = cycle_info.cycle_completed.remove(0);

View File

@@ -4,12 +4,12 @@ use lazy_static::lazy_static;
use madmin::metrics::ScannerMetrics as M_ScannerMetrics;
use std::{
collections::HashMap,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, SystemTime},
pin::Pin,
};
use tokio::sync::{Mutex, RwLock};
use tracing::debug;
@@ -86,7 +86,7 @@ impl ScannerMetric {
Self::Last => "last",
}
}
/// Convert from index back to enum (safe version)
pub fn from_index(index: usize) -> Option<Self> {
if index >= Self::Last as usize {
@@ -154,7 +154,7 @@ impl LockedLastMinuteLatency {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let elem = AccElem {
n: 1,
total: duration.as_secs(),
@@ -206,10 +206,8 @@ pub struct ScannerMetrics {
impl ScannerMetrics {
pub fn new() -> Self {
let operations = (0..ScannerMetric::Last as usize)
.map(|_| AtomicU64::new(0))
.collect();
let operations = (0..ScannerMetric::Last as usize).map(|_| AtomicU64::new(0)).collect();
let latency = (0..ScannerMetric::LastRealtime as usize)
.map(|_| LockedLastMinuteLatency::new())
.collect();
@@ -227,10 +225,10 @@ impl ScannerMetrics {
let start_time = SystemTime::now();
move |_custom: &HashMap<String, String>| {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task for this)
if (metric as usize) < ScannerMetric::LastRealtime as usize {
let metric_index = metric as usize;
@@ -241,11 +239,7 @@ impl ScannerMetrics {
// Log trace metrics
if metric as u8 > ScannerMetric::StartTrace as u8 {
debug!(
metric = metric.as_str(),
duration_ms = duration.as_millis(),
"Scanner trace metric"
);
debug!(metric = metric.as_str(), duration_ms = duration.as_millis(), "Scanner trace metric");
}
}
}
@@ -255,10 +249,10 @@ impl ScannerMetrics {
let start_time = SystemTime::now();
move |size: u64| {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics with size (spawn async task)
if (metric as usize) < ScannerMetric::LastRealtime as usize {
let metric_index = metric as usize;
@@ -274,10 +268,10 @@ impl ScannerMetrics {
let start_time = SystemTime::now();
move || {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task)
if (metric as usize) < ScannerMetric::LastRealtime as usize {
let metric_index = metric as usize;
@@ -294,10 +288,10 @@ impl ScannerMetrics {
Box::new(move |count: usize| {
Box::new(move || {
let duration = SystemTime::now().duration_since(start_time).unwrap_or_default();
// Update operation count
globalScannerMetrics.operations[metric as usize].fetch_add(count as u64, Ordering::Relaxed);
// Update latency for realtime metrics (spawn async task)
if (metric as usize) < ScannerMetric::LastRealtime as usize {
let metric_index = metric as usize;
@@ -313,7 +307,7 @@ impl ScannerMetrics {
pub async fn inc_time(metric: ScannerMetric, duration: Duration) {
// Update operation count
globalScannerMetrics.operations[metric as usize].fetch_add(1, Ordering::Relaxed);
// Update latency for realtime metrics
if (metric as usize) < ScannerMetric::LastRealtime as usize {
globalScannerMetrics.latency[metric as usize].add(duration).await;
@@ -350,12 +344,12 @@ impl ScannerMetrics {
pub async fn get_current_paths(&self) -> Vec<String> {
let mut result = Vec::new();
let paths = self.current_paths.read().await;
for (disk, tracker) in paths.iter() {
let path = tracker.get_path().await;
result.push(format!("{}/{}", disk, path));
}
result
}
@@ -367,17 +361,17 @@ impl ScannerMetrics {
/// Generate metrics report
pub async fn report(&self) -> M_ScannerMetrics {
let mut metrics = M_ScannerMetrics::default();
// Set cycle information
if let Some(cycle) = self.get_cycle().await {
metrics.current_cycle = cycle.current;
metrics.cycles_completed_at = cycle.cycle_completed;
metrics.current_started = cycle.started;
}
metrics.collected_at = Utc::now();
metrics.active_paths = self.get_current_paths().await;
// Lifetime operations
for i in 0..ScannerMetric::Last as usize {
let count = self.operations[i].load(Ordering::Relaxed);
@@ -387,7 +381,7 @@ impl ScannerMetrics {
}
}
}
// Last minute statistics for realtime metrics
for i in 0..ScannerMetric::LastRealtime as usize {
let last_min = self.latency[i].total().await;
@@ -398,7 +392,7 @@ impl ScannerMetrics {
}
}
}
metrics
}
}
@@ -408,13 +402,10 @@ pub type UpdateCurrentPathFn = Arc<dyn Fn(&str) -> Pin<Box<dyn std::future::Futu
pub type CloseDiskFn = Arc<dyn Fn() -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
/// Create a current path updater for tracking scan progress
pub fn current_path_updater(
disk: &str,
initial: &str
) -> (UpdateCurrentPathFn, CloseDiskFn) {
pub fn current_path_updater(disk: &str, initial: &str) -> (UpdateCurrentPathFn, CloseDiskFn) {
let tracker = Arc::new(CurrentPathTracker::new(initial.to_string()));
let disk_name = disk.to_string();
// Store the tracker in global metrics
let tracker_clone = Arc::clone(&tracker);
let disk_clone = disk_name.clone();
@@ -442,11 +433,7 @@ pub fn current_path_updater(
Arc::new(move || -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
let disk_name = disk_name.clone();
Box::pin(async move {
globalScannerMetrics
.current_paths
.write()
.await
.remove(&disk_name);
globalScannerMetrics.current_paths.write().await.remove(&disk_name);
})
})
};
@@ -458,4 +445,4 @@ impl Default for ScannerMetrics {
fn default() -> Self {
Self::new()
}
}
}

View File

@@ -3002,7 +3002,7 @@ impl SetDisks {
let self_clone = Arc::clone(&self);
let bucket_rx_clone = bucket_rx.clone();
let buckets_results_tx_clone = buckets_results_tx.clone();
futures.push(tokio::spawn(async move {
futures.push(async move {
loop {
match bucket_rx_clone.write().await.try_recv() {
Err(_) => return,
@@ -3083,12 +3083,11 @@ impl SetDisks {
}
info!("continue scanner");
}
}));
});
}
info!("ns_scanner start");
let _ = join_all(futures).await;
drop(buckets_results_tx);
let _ = task.await;
info!("ns_scanner completed");
Ok(())

View File

@@ -827,7 +827,8 @@ impl ECStore {
}
}
});
if let Err(err) = set.clone()
if let Err(err) = set
.clone()
.ns_scanner(&all_buckets_clone, want_cycle as u32, tx, heal_scan_mode)
.await
{