mirror of
https://github.com/rustfs/rustfs.git
synced 2026-03-17 14:24:08 +00:00
feat(scanner): Add dynamic throttling presets (#2095)
Co-authored-by: loverustfs <hello@rustfs.com> Co-authored-by: GatewayJ <835269233@qq.com> Co-authored-by: houseme <housemecn@gmail.com> Co-authored-by: heihutu <30542132+heihutu@users.noreply.github.com> Co-authored-by: 安正超 <anzhengchao@gmail.com> Co-authored-by: weisd <im@weisd.in>
This commit is contained in:
@@ -12,17 +12,102 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
/// Environment variable name that specifies the data scanner start delay in seconds.
|
||||
/// - Purpose: Define the delay between data scanner operations.
|
||||
/// If set, this overrides the cycle interval derived from `RUSTFS_SCANNER_SPEED`.
|
||||
/// - Unit: seconds (u64).
|
||||
/// - Valid values: any positive integer.
|
||||
/// - Semantics: This delay controls how frequently the data scanner checks for and processes data; shorter delays lead to more responsive scanning but may increase system load.
|
||||
/// - Example: `export RUSTFS_DATA_SCANNER_START_DELAY_SECS=10`
|
||||
/// - Note: Choose an appropriate delay that balances scanning responsiveness with overall system performance.
|
||||
pub const ENV_DATA_SCANNER_START_DELAY_SECS: &str = "RUSTFS_DATA_SCANNER_START_DELAY_SECS";
|
||||
|
||||
/// Default data scanner start delay in seconds if not specified in the environment variable.
|
||||
/// - Value: 10 seconds.
|
||||
/// - Rationale: This default interval provides a reasonable balance between scanning responsiveness and system load for most deployments.
|
||||
/// - Adjustments: Users may modify this value via the `RUSTFS_DATA_SCANNER_START_DELAY_SECS` environment variable based on their specific scanning requirements and system performance.
|
||||
pub const DEFAULT_DATA_SCANNER_START_DELAY_SECS: u64 = 60;
|
||||
/// Environment variable that selects the scanner speed preset.
|
||||
/// Valid values: `fastest`, `fast`, `default`, `slow`, `slowest`.
|
||||
/// Controls the sleep factor, maximum sleep duration, and cycle interval.
|
||||
/// - Example: `export RUSTFS_SCANNER_SPEED=slow`
|
||||
pub const ENV_SCANNER_SPEED: &str = "RUSTFS_SCANNER_SPEED";
|
||||
|
||||
/// Default scanner speed preset.
|
||||
pub const DEFAULT_SCANNER_SPEED: &str = "default";
|
||||
|
||||
/// Environment variable that controls whether the scanner sleeps between operations.
|
||||
/// When `true` (default), the scanner throttles itself. When `false`, it runs at full speed.
|
||||
/// - Example: `export RUSTFS_SCANNER_IDLE_MODE=false`
|
||||
pub const ENV_SCANNER_IDLE_MODE: &str = "RUSTFS_SCANNER_IDLE_MODE";
|
||||
|
||||
/// Default scanner idle mode.
|
||||
pub const DEFAULT_SCANNER_IDLE_MODE: bool = true;
|
||||
|
||||
/// Scanner speed preset controlling throttling behavior.
|
||||
///
|
||||
/// Each preset defines three parameters:
|
||||
/// - **sleep_factor**: Multiplier applied to elapsed work time to compute inter-object sleep.
|
||||
/// - **max_sleep**: Upper bound on any single throttle sleep.
|
||||
/// - **cycle_interval**: Base delay between scan cycles.
|
||||
///
|
||||
/// | Preset | Factor | Max Sleep | Cycle Interval |
|
||||
/// |-----------|--------|-----------|----------------|
|
||||
/// | `fastest` | 0 | 0 | 1 second |
|
||||
/// | `fast` | 1x | 100ms | 1 minute |
|
||||
/// | `default` | 2x | 1 second | 1 minute |
|
||||
/// | `slow` | 10x | 15 seconds| 1 minute |
|
||||
/// | `slowest` | 100x | 15 seconds| 30 minutes |
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub enum ScannerSpeed {
|
||||
Fastest,
|
||||
Fast,
|
||||
#[default]
|
||||
Default,
|
||||
Slow,
|
||||
Slowest,
|
||||
}
|
||||
|
||||
impl ScannerSpeed {
|
||||
pub fn sleep_factor(self) -> f64 {
|
||||
match self {
|
||||
Self::Fastest => 0.0,
|
||||
Self::Fast => 1.0,
|
||||
Self::Default => 2.0,
|
||||
Self::Slow => 10.0,
|
||||
Self::Slowest => 100.0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn max_sleep(self) -> Duration {
|
||||
match self {
|
||||
Self::Fastest => Duration::ZERO,
|
||||
Self::Fast => Duration::from_millis(100),
|
||||
Self::Default => Duration::from_secs(1),
|
||||
Self::Slow | Self::Slowest => Duration::from_secs(15),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cycle_interval(self) -> Duration {
|
||||
match self {
|
||||
Self::Fastest => Duration::from_secs(1),
|
||||
Self::Fast | Self::Default | Self::Slow => Duration::from_secs(60),
|
||||
Self::Slowest => Duration::from_secs(30 * 60),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_env_str(s: &str) -> Self {
|
||||
match s.trim().to_ascii_lowercase().as_str() {
|
||||
"fastest" => Self::Fastest,
|
||||
"fast" => Self::Fast,
|
||||
"slow" => Self::Slow,
|
||||
"slowest" => Self::Slowest,
|
||||
_ => Self::Default,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ScannerSpeed {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Fastest => write!(f, "fastest"),
|
||||
Self::Fast => write!(f, "fast"),
|
||||
Self::Default => write!(f, "default"),
|
||||
Self::Slow => write!(f, "slow"),
|
||||
Self::Slowest => write!(f, "slowest"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ use tokio::{
|
||||
time::interval,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
/// Priority queue wrapper for heal requests
|
||||
/// Uses BinaryHeap for priority-based ordering while maintaining FIFO for same-priority items
|
||||
@@ -509,7 +509,7 @@ impl HealManager {
|
||||
}
|
||||
|
||||
if endpoints.is_empty() {
|
||||
info!("start_auto_disk_scanner: No endpoints need healing");
|
||||
debug!("start_auto_disk_scanner: No endpoints need healing");
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -55,3 +55,4 @@ s3s = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
serial_test = { workspace = true }
|
||||
uuid = { workspace = true, features = ["v4", "serde"] }
|
||||
tokio = { workspace = true, features = ["test-util"] }
|
||||
|
||||
@@ -26,7 +26,9 @@ pub mod last_minute;
|
||||
pub mod scanner;
|
||||
pub mod scanner_folder;
|
||||
pub mod scanner_io;
|
||||
pub mod sleeper;
|
||||
|
||||
pub use data_usage_define::*;
|
||||
pub use error::ScannerError;
|
||||
pub use scanner::init_data_scanner;
|
||||
pub use sleeper::{DynamicSleeper, SCANNER_IDLE_MODE, SCANNER_SLEEPER};
|
||||
|
||||
@@ -17,11 +17,12 @@ use std::sync::Arc;
|
||||
use crate::data_usage_define::{BACKGROUND_HEAL_INFO_PATH, DATA_USAGE_BLOOM_NAME_PATH, DATA_USAGE_OBJ_NAME_PATH};
|
||||
use crate::scanner_folder::data_usage_update_dir_cycles;
|
||||
use crate::scanner_io::ScannerIO;
|
||||
use crate::sleeper::SCANNER_SLEEPER;
|
||||
use crate::{DataUsageInfo, ScannerError};
|
||||
use chrono::{DateTime, Utc};
|
||||
use rustfs_common::heal_channel::HealScanMode;
|
||||
use rustfs_common::metrics::{CurrentCycle, Metric, Metrics, emit_scan_cycle_complete, global_metrics};
|
||||
use rustfs_config::{DEFAULT_DATA_SCANNER_START_DELAY_SECS, ENV_DATA_SCANNER_START_DELAY_SECS};
|
||||
use rustfs_config::{DEFAULT_SCANNER_SPEED, ENV_DATA_SCANNER_START_DELAY_SECS, ENV_SCANNER_SPEED, ScannerSpeed};
|
||||
use rustfs_ecstore::StorageAPI as _;
|
||||
use rustfs_ecstore::config::com::{read_config, save_config};
|
||||
use rustfs_ecstore::disk::RUSTFS_META_BUCKET;
|
||||
@@ -30,16 +31,37 @@ use rustfs_ecstore::global::is_erasure_sd;
|
||||
use rustfs_ecstore::store::ECStore;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::{Duration, Instant};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
|
||||
fn data_scanner_start_delay() -> Duration {
|
||||
let secs = rustfs_utils::get_env_u64(ENV_DATA_SCANNER_START_DELAY_SECS, DEFAULT_DATA_SCANNER_START_DELAY_SECS);
|
||||
Duration::from_secs(secs)
|
||||
const LOCK_RETRY_MAX: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Returns the base cycle interval. If `RUSTFS_DATA_SCANNER_START_DELAY_SECS`
|
||||
/// is set, it takes precedence; otherwise the value is derived from the
|
||||
/// `RUSTFS_SCANNER_SPEED` preset.
|
||||
fn cycle_interval() -> Duration {
|
||||
if let Some(secs) = rustfs_utils::get_env_opt_u64(ENV_DATA_SCANNER_START_DELAY_SECS) {
|
||||
return Duration::from_secs(secs);
|
||||
}
|
||||
let speed_str = rustfs_utils::get_env_str(ENV_SCANNER_SPEED, DEFAULT_SCANNER_SPEED);
|
||||
ScannerSpeed::from_env_str(&speed_str).cycle_interval()
|
||||
}
|
||||
|
||||
/// Compute a randomized inter-cycle sleep.
|
||||
// Delay is scan interval +- 10%, with a floor of 1 second.
|
||||
fn randomized_cycle_delay() -> Duration {
|
||||
let interval = cycle_interval().max(Duration::from_secs(1));
|
||||
// Uniform in [-0.1, 0.1), keeping actual delay within 10% of interval.
|
||||
let jitter_factor = (rand::random::<f64>() * 0.2) - 0.1;
|
||||
let delay = interval.mul_f64(1.0 + jitter_factor);
|
||||
delay.max(Duration::from_secs(1))
|
||||
}
|
||||
|
||||
pub async fn init_data_scanner(ctx: CancellationToken, storeapi: Arc<ECStore>) {
|
||||
// Force init global sleeper so config is read once at startup.
|
||||
let _ = &*SCANNER_SLEEPER;
|
||||
|
||||
let ctx_clone = ctx.clone();
|
||||
let storeapi_clone = storeapi.clone();
|
||||
tokio::spawn(async move {
|
||||
@@ -54,7 +76,8 @@ pub async fn init_data_scanner(ctx: CancellationToken, storeapi: Arc<ECStore>) {
|
||||
if let Err(e) = run_data_scanner(ctx_clone.clone(), storeapi_clone.clone()).await {
|
||||
error!("Failed to run data scanner: {e}");
|
||||
}
|
||||
tokio::time::sleep(data_scanner_start_delay()).await;
|
||||
// Sleep if couldn't acquire lock or scan failed
|
||||
tokio::time::sleep(randomized_cycle_delay().min(LOCK_RETRY_MAX)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -130,8 +153,10 @@ fn get_lock_acquire_timeout() -> Duration {
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn run_scan_cycle(ctx: &CancellationToken, storeapi: &Arc<ECStore>, cycle_info: &mut CurrentCycle) {
|
||||
async fn run_data_scanner_cycle(ctx: &CancellationToken, storeapi: &Arc<ECStore>, cycle_info: &mut CurrentCycle) {
|
||||
info!("Start run data scanner cycle");
|
||||
cycle_info.current = cycle_info.next;
|
||||
let now = Instant::now();
|
||||
cycle_info.started = Utc::now();
|
||||
|
||||
global_metrics().set_cycle(Some(cycle_info.clone())).await;
|
||||
@@ -169,34 +194,35 @@ async fn run_scan_cycle(ctx: &CancellationToken, storeapi: &Arc<ECStore>, cycle_
|
||||
.nsscanner(ctx.clone(), sender, cycle_info.current, scan_mode)
|
||||
.await
|
||||
{
|
||||
error!("Failed to scan namespace: {e}");
|
||||
error!(duration = ?now.elapsed(), "Fail run data scanner cycle: {e}");
|
||||
emit_scan_cycle_complete(false, cycle_start.elapsed());
|
||||
return;
|
||||
}
|
||||
done_cycle();
|
||||
emit_scan_cycle_complete(true, cycle_start.elapsed());
|
||||
|
||||
cycle_info.next += 1;
|
||||
cycle_info.current = 0;
|
||||
cycle_info.cycle_completed.push(Utc::now());
|
||||
|
||||
info!(duration = ?now.elapsed(), cycles_total=cycle_info.cycle_completed.len(), "Success run data scanner cycle");
|
||||
|
||||
if cycle_info.cycle_completed.len() >= data_usage_update_dir_cycles() as usize {
|
||||
cycle_info.cycle_completed = cycle_info.cycle_completed.split_off(data_usage_update_dir_cycles() as usize);
|
||||
}
|
||||
|
||||
global_metrics().set_cycle(Some(cycle_info.clone())).await;
|
||||
|
||||
let cycle_info_buf = cycle_info.marshal().unwrap_or_default();
|
||||
|
||||
let mut buf = Vec::with_capacity(cycle_info_buf.len() + 8);
|
||||
buf.extend_from_slice(&cycle_info.next.to_le_bytes());
|
||||
buf.extend_from_slice(&cycle_info_buf);
|
||||
|
||||
if let Err(e) = save_config(storeapi.clone(), &DATA_USAGE_BLOOM_NAME_PATH, buf).await {
|
||||
error!("Failed to save data usage bloom name to {}: {}", &*DATA_USAGE_BLOOM_NAME_PATH, e);
|
||||
} else {
|
||||
done_cycle();
|
||||
emit_scan_cycle_complete(true, cycle_start.elapsed());
|
||||
info!("Namespace scanned successfully");
|
||||
|
||||
cycle_info.next += 1;
|
||||
cycle_info.current = 0;
|
||||
cycle_info.cycle_completed.push(Utc::now());
|
||||
|
||||
if cycle_info.cycle_completed.len() >= data_usage_update_dir_cycles() as usize {
|
||||
cycle_info.cycle_completed = cycle_info.cycle_completed.split_off(data_usage_update_dir_cycles() as usize);
|
||||
}
|
||||
|
||||
global_metrics().set_cycle(Some(cycle_info.clone())).await;
|
||||
|
||||
let cycle_info_buf = cycle_info.marshal().unwrap_or_default();
|
||||
|
||||
let mut buf = Vec::with_capacity(cycle_info_buf.len() + 8);
|
||||
buf.extend_from_slice(&cycle_info.next.to_le_bytes());
|
||||
buf.extend_from_slice(&cycle_info_buf);
|
||||
|
||||
if let Err(e) = save_config(storeapi.clone(), &DATA_USAGE_BLOOM_NAME_PATH, buf).await {
|
||||
error!("Failed to save data usage bloom name to {}: {}", &*DATA_USAGE_BLOOM_NAME_PATH, e);
|
||||
} else {
|
||||
info!("Data usage bloom name saved successfully");
|
||||
}
|
||||
info!("Data usage bloom name saved successfully");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -232,16 +258,17 @@ pub async fn run_data_scanner(ctx: CancellationToken, storeapi: Arc<ECStore>) ->
|
||||
}
|
||||
}
|
||||
|
||||
let mut ticker = tokio::time::interval(data_scanner_start_delay());
|
||||
loop {
|
||||
if ctx.is_cancelled() {
|
||||
break;
|
||||
}
|
||||
|
||||
// Randomized inter-cycle delay
|
||||
tokio::select! {
|
||||
_ = ctx.cancelled() => {
|
||||
break;
|
||||
}
|
||||
_ = ticker.tick() => {
|
||||
run_scan_cycle(&ctx, &storeapi, &mut cycle_info).await;
|
||||
ticker.reset();
|
||||
}
|
||||
_ = ctx.cancelled() => break,
|
||||
_ = tokio::time::sleep(randomized_cycle_delay()) => {
|
||||
run_data_scanner_cycle(&ctx, &storeapi, &mut cycle_info).await;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ use crate::ReplTargetSizeSummary;
|
||||
use crate::data_usage_define::{DataUsageCache, DataUsageEntry, DataUsageHash, DataUsageHashMap, SizeSummary, hash_path};
|
||||
use crate::error::ScannerError;
|
||||
use crate::scanner_io::ScannerIODisk as _;
|
||||
use crate::sleeper::DynamicSleeper;
|
||||
use rustfs_common::heal_channel::{HEAL_DELETE_DANGLING, HealChannelRequest, HealOpts, HealScanMode, send_heal_request};
|
||||
use rustfs_common::metrics::{IlmAction, Metric, Metrics, UpdateCurrentPathFn, current_path_updater};
|
||||
use rustfs_ecstore::StorageAPI;
|
||||
@@ -50,10 +51,9 @@ use tokio::sync::mpsc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
// Constants from Go code
|
||||
const DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1);
|
||||
const DATA_USAGE_UPDATE_DIR_CYCLES: u32 = 16;
|
||||
const DATA_SCANNER_COMPACT_LEAST_OBJECT: usize = 500;
|
||||
const YIELD_EVERY_N_OBJECTS: u64 = 128;
|
||||
const DATA_SCANNER_COMPACT_AT_CHILDREN: usize = 10000;
|
||||
const DATA_SCANNER_COMPACT_AT_FOLDERS: usize = DATA_SCANNER_COMPACT_AT_CHILDREN / 4;
|
||||
const DATA_SCANNER_FORCE_COMPACT_AT_FOLDERS: usize = 250_000;
|
||||
@@ -397,7 +397,7 @@ pub struct FolderScanner {
|
||||
failed_object_ttl_secs: u64,
|
||||
failed_objects_max: usize,
|
||||
|
||||
we_sleep: Box<dyn Fn() -> bool + Send + Sync>,
|
||||
sleeper: DynamicSleeper,
|
||||
// should_heal: Arc<dyn Fn() -> bool + Send + Sync>,
|
||||
disks: Vec<Arc<Disk>>,
|
||||
disks_quorum: usize,
|
||||
@@ -561,8 +561,6 @@ impl FolderScanner {
|
||||
// Store initial compaction state.
|
||||
let was_compacted = into.compacted;
|
||||
|
||||
let wait_time = None;
|
||||
|
||||
loop {
|
||||
if ctx.is_cancelled() {
|
||||
return Err(ScannerError::Other("Operation cancelled".to_string()));
|
||||
@@ -599,13 +597,12 @@ impl FolderScanner {
|
||||
None
|
||||
};
|
||||
|
||||
if (self.we_sleep)() {
|
||||
tokio::time::sleep(DATA_SCANNER_SLEEP_PER_FOLDER).await;
|
||||
}
|
||||
self.sleeper.sleep_folder().await;
|
||||
|
||||
let mut existing_folders: Vec<CachedFolder> = Vec::new();
|
||||
let mut new_folders: Vec<CachedFolder> = Vec::new();
|
||||
let mut found_objects = false;
|
||||
let mut object_count: u64 = 0;
|
||||
|
||||
let dir_path = path_join_buf(&[&self.root, &folder.name]);
|
||||
|
||||
@@ -679,11 +676,7 @@ impl FolderScanner {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut wait = wait_time;
|
||||
|
||||
if (self.we_sleep)() {
|
||||
wait = Some(SystemTime::now());
|
||||
}
|
||||
let timer = self.sleeper.timer();
|
||||
|
||||
let heal_enabled = this_hash.mod_alt(
|
||||
self.old_cache.info.next_cycle as u32 / folder.object_heal_prob_div,
|
||||
@@ -724,15 +717,9 @@ impl FolderScanner {
|
||||
|
||||
// Only log non-skip errors to avoid noise
|
||||
warn!("scan_folder: failed to get size for item {}: {}", item.path, e);
|
||||
|
||||
// Apply sleep if configured
|
||||
if let Some(t) = wait
|
||||
&& let Ok(elapsed) = t.elapsed()
|
||||
{
|
||||
tokio::time::sleep(elapsed).await;
|
||||
}
|
||||
}
|
||||
|
||||
timer.sleep().await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -743,14 +730,14 @@ impl FolderScanner {
|
||||
|
||||
abandoned_children.remove(&path_join_buf(&[&item.bucket, &item.object_path()]));
|
||||
|
||||
// TODO: check err
|
||||
into.add_sizes(&sz);
|
||||
into.objects += 1;
|
||||
object_count += 1;
|
||||
|
||||
if let Some(t) = wait
|
||||
&& let Ok(elapsed) = t.elapsed()
|
||||
{
|
||||
tokio::time::sleep(elapsed).await;
|
||||
timer.sleep().await;
|
||||
|
||||
if object_count.is_multiple_of(YIELD_EVERY_N_OBJECTS) {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -830,6 +817,7 @@ impl FolderScanner {
|
||||
// Use Box::pin for recursive async call
|
||||
let fut = Box::pin(self.scan_folder(ctx.clone(), folder_item.clone(), &mut dst));
|
||||
fut.await.map_err(|e| ScannerError::Other(e.to_string()))?;
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
if !into.compacted {
|
||||
let h = DataUsageHash(folder_item.name.clone());
|
||||
@@ -878,6 +866,7 @@ impl FolderScanner {
|
||||
// Use Box::pin for recursive async call
|
||||
let fut = Box::pin(self.scan_folder(ctx.clone(), folder_item.clone(), &mut dst));
|
||||
fut.await.map_err(|e| ScannerError::Other(e.to_string()))?;
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
if !into.compacted {
|
||||
let h = DataUsageHash(folder_item.name.clone());
|
||||
@@ -1088,6 +1077,7 @@ impl FolderScanner {
|
||||
// Use Box::pin for recursive async call
|
||||
let fut = Box::pin(self.scan_folder(ctx.clone(), folder_item.clone(), &mut dst));
|
||||
fut.await.map_err(|e| ScannerError::Other(e.to_string()))?;
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
if !into.compacted {
|
||||
let h = DataUsageHash(folder_item.name.clone());
|
||||
@@ -1167,7 +1157,7 @@ impl FolderScanner {
|
||||
/// Scan a data folder
|
||||
/// This function scans the basepath+cache.info.name and returns an updated cache.
|
||||
/// The returned cache will always be valid, but may not be updated from the existing.
|
||||
/// Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
|
||||
/// Throttling between operations is controlled by the provided [`DynamicSleeper`].
|
||||
/// If the supplied context is canceled the function will return at the first chance.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn scan_data_folder(
|
||||
@@ -1177,7 +1167,7 @@ pub async fn scan_data_folder(
|
||||
cache: DataUsageCache,
|
||||
updates: Option<mpsc::Sender<DataUsageEntry>>,
|
||||
scan_mode: HealScanMode,
|
||||
we_sleep: Box<dyn Fn() -> bool + Send + Sync>,
|
||||
sleeper: DynamicSleeper,
|
||||
) -> Result<DataUsageCache, ScannerError> {
|
||||
use crate::data_usage_define::DATA_USAGE_ROOT;
|
||||
|
||||
@@ -1224,7 +1214,7 @@ pub async fn scan_data_folder(
|
||||
scan_mode,
|
||||
failed_object_ttl_secs: failed_object_ttl,
|
||||
failed_objects_max,
|
||||
we_sleep,
|
||||
sleeper,
|
||||
disks,
|
||||
disks_quorum,
|
||||
updates,
|
||||
@@ -1269,6 +1259,8 @@ pub async fn scan_data_folder(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::SCANNER_SLEEPER;
|
||||
|
||||
use super::*;
|
||||
use rustfs_ecstore::disk::{DiskOption, endpoint::Endpoint, new_disk};
|
||||
use serial_test::serial;
|
||||
@@ -1304,7 +1296,7 @@ mod tests {
|
||||
scan_mode: HealScanMode::Normal,
|
||||
failed_object_ttl_secs: u64::MAX,
|
||||
failed_objects_max: usize::MAX,
|
||||
we_sleep: Box::new(|| false),
|
||||
sleeper: SCANNER_SLEEPER.clone(),
|
||||
disks: Vec::new(),
|
||||
disks_quorum: 0,
|
||||
updates: None,
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use crate::scanner_folder::{ScannerItem, scan_data_folder};
|
||||
use crate::sleeper::SCANNER_SLEEPER;
|
||||
use crate::{
|
||||
DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT, DataUsageCache, DataUsageCacheInfo, DataUsageEntry, DataUsageEntryInfo,
|
||||
DataUsageInfo, SizeSummary, TierStats,
|
||||
@@ -636,10 +637,7 @@ impl ScannerIODisk for Disk {
|
||||
|
||||
let disks = disks_result.into_iter().flatten().collect::<Vec<Arc<Disk>>>();
|
||||
|
||||
// Create we_sleep function (always return false for now, can be enhanced later)
|
||||
let we_sleep: Box<dyn Fn() -> bool + Send + Sync> = Box::new(|| false);
|
||||
|
||||
let result = scan_data_folder(ctx, disks, local_disk, cache, updates, scan_mode, we_sleep).await;
|
||||
let result = scan_data_folder(ctx, disks, local_disk, cache, updates, scan_mode, SCANNER_SLEEPER.clone()).await;
|
||||
|
||||
match result {
|
||||
Ok(mut data_usage_info) => {
|
||||
|
||||
193
crates/scanner/src/sleeper.rs
Normal file
193
crates/scanner/src/sleeper.rs
Normal file
@@ -0,0 +1,193 @@
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, LazyLock, RwLock};
|
||||
use std::time::Instant;
|
||||
|
||||
use rustfs_config::{DEFAULT_SCANNER_IDLE_MODE, DEFAULT_SCANNER_SPEED, ENV_SCANNER_IDLE_MODE, ENV_SCANNER_SPEED, ScannerSpeed};
|
||||
use tokio::time::Duration;
|
||||
|
||||
const MIN_SLEEP: Duration = Duration::from_millis(1);
|
||||
|
||||
/// When `true` (default), the scanner throttles itself between operations.
|
||||
/// When `false`, all sleeps are skipped and the scanner runs at full speed.
|
||||
pub static SCANNER_IDLE_MODE: AtomicBool = AtomicBool::new(DEFAULT_SCANNER_IDLE_MODE);
|
||||
|
||||
/// Global scanner sleeper initialized from the `RUSTFS_SCANNER_SPEED` and
|
||||
/// `RUSTFS_SCANNER_IDLE_MODE` environment variables.
|
||||
pub static SCANNER_SLEEPER: LazyLock<DynamicSleeper> = LazyLock::new(|| {
|
||||
let speed_str = rustfs_utils::get_env_str(ENV_SCANNER_SPEED, DEFAULT_SCANNER_SPEED);
|
||||
let speed = ScannerSpeed::from_env_str(&speed_str);
|
||||
|
||||
let idle_mode = rustfs_utils::get_env_bool(ENV_SCANNER_IDLE_MODE, DEFAULT_SCANNER_IDLE_MODE);
|
||||
SCANNER_IDLE_MODE.store(idle_mode, Ordering::Relaxed);
|
||||
|
||||
DynamicSleeper::new(speed)
|
||||
});
|
||||
|
||||
/// Proportional-backoff sleeper for the data scanner.
|
||||
///
|
||||
/// For operations timed via [`SleepTimer`], sleeps are computed as
|
||||
/// `elapsed_work_time * factor`, clamped to `[MIN_SLEEP, max_sleep]`.
|
||||
/// For folder-level gaps, [`sleep_folder`] uses `MIN_SLEEP * factor`,
|
||||
/// clamped only by `max_sleep`. All sleeps are gated on [`SCANNER_IDLE_MODE`];
|
||||
#[derive(Clone)]
|
||||
pub struct DynamicSleeper {
|
||||
inner: Arc<SleeperParams>,
|
||||
}
|
||||
|
||||
struct SleeperParams {
|
||||
factor: RwLock<f64>,
|
||||
max_sleep: RwLock<Duration>,
|
||||
}
|
||||
|
||||
impl DynamicSleeper {
|
||||
pub fn new(speed: ScannerSpeed) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(SleeperParams {
|
||||
factor: RwLock::new(speed.sleep_factor()),
|
||||
max_sleep: RwLock::new(speed.max_sleep()),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn should_sleep() -> bool {
|
||||
SCANNER_IDLE_MODE.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn read_params(&self) -> (f64, Duration) {
|
||||
let factor = *self.inner.factor.read().unwrap_or_else(|e| e.into_inner());
|
||||
let max_sleep = *self.inner.max_sleep.read().unwrap_or_else(|e| e.into_inner());
|
||||
(factor, max_sleep)
|
||||
}
|
||||
|
||||
pub async fn sleep_folder(&self) {
|
||||
if !Self::should_sleep() {
|
||||
return;
|
||||
}
|
||||
let (factor, max_sleep) = self.read_params();
|
||||
if factor == 0.0 || max_sleep.is_zero() {
|
||||
return;
|
||||
}
|
||||
let sleep_dur = Duration::from_secs_f64(MIN_SLEEP.as_secs_f64() * factor).min(max_sleep);
|
||||
if !sleep_dur.is_zero() {
|
||||
tokio::time::sleep(sleep_dur).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Begin timing an operation. Call [`sleep()`](SleepTimer::sleep) on
|
||||
/// the returned timer after the work is done; it will sleep for
|
||||
/// dynamic duration.
|
||||
pub fn timer(&self) -> SleepTimer {
|
||||
SleepTimer {
|
||||
start: Instant::now(),
|
||||
sleeper: self.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Swap in parameters from a new speed preset (for runtime reconfiguration).
|
||||
pub fn update(&self, speed: ScannerSpeed) {
|
||||
let mut f = self.inner.factor.write().unwrap_or_else(|e| e.into_inner());
|
||||
*f = speed.sleep_factor();
|
||||
let mut m = self.inner.max_sleep.write().unwrap_or_else(|e| e.into_inner());
|
||||
*m = speed.max_sleep();
|
||||
}
|
||||
}
|
||||
|
||||
/// A timer returned by [`DynamicSleeper::timer`]. Records the instant it
|
||||
/// was created so that [`sleep`](Self::sleep) can compute a proportional
|
||||
/// backoff from the elapsed work time.
|
||||
pub struct SleepTimer {
|
||||
start: Instant,
|
||||
sleeper: DynamicSleeper,
|
||||
}
|
||||
|
||||
impl SleepTimer {
|
||||
/// Sleep proportional to the elapsed time since this timer was created.
|
||||
pub async fn sleep(self) {
|
||||
if !DynamicSleeper::should_sleep() {
|
||||
return;
|
||||
}
|
||||
let (factor, max_sleep) = self.sleeper.read_params();
|
||||
if factor == 0.0 || max_sleep.is_zero() {
|
||||
return;
|
||||
}
|
||||
let elapsed = self.start.elapsed();
|
||||
let sleep_dur = Duration::from_secs_f64(elapsed.as_secs_f64() * factor)
|
||||
.max(MIN_SLEEP)
|
||||
.min(max_sleep);
|
||||
if !sleep_dur.is_zero() {
|
||||
tokio::time::sleep(sleep_dur).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_scanner_speed_presets() {
|
||||
let s = DynamicSleeper::new(ScannerSpeed::Fastest);
|
||||
let (factor, max_sleep) = s.read_params();
|
||||
assert_eq!(factor, 0.0);
|
||||
assert_eq!(max_sleep, Duration::ZERO);
|
||||
|
||||
let s = DynamicSleeper::new(ScannerSpeed::Default);
|
||||
let (factor, max_sleep) = s.read_params();
|
||||
assert_eq!(factor, 2.0);
|
||||
assert_eq!(max_sleep, Duration::from_secs(1));
|
||||
|
||||
let s = DynamicSleeper::new(ScannerSpeed::Slowest);
|
||||
let (factor, max_sleep) = s.read_params();
|
||||
assert_eq!(factor, 100.0);
|
||||
assert_eq!(max_sleep, Duration::from_secs(15));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_update_changes_params() {
|
||||
let s = DynamicSleeper::new(ScannerSpeed::Default);
|
||||
s.update(ScannerSpeed::Slow);
|
||||
let (factor, max_sleep) = s.read_params();
|
||||
assert_eq!(factor, 10.0);
|
||||
assert_eq!(max_sleep, Duration::from_secs(15));
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_fastest_never_sleeps() {
|
||||
let prev_mode = SCANNER_IDLE_MODE.load(Ordering::Relaxed);
|
||||
SCANNER_IDLE_MODE.store(true, Ordering::Relaxed);
|
||||
|
||||
let s = DynamicSleeper::new(ScannerSpeed::Fastest);
|
||||
let start = tokio::time::Instant::now();
|
||||
s.sleep_folder().await;
|
||||
assert_eq!(start.elapsed(), Duration::ZERO);
|
||||
|
||||
SCANNER_IDLE_MODE.store(prev_mode, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_idle_mode_off_skips_sleep() {
|
||||
let prev_mode = SCANNER_IDLE_MODE.load(Ordering::Relaxed);
|
||||
SCANNER_IDLE_MODE.store(false, Ordering::Relaxed);
|
||||
|
||||
let s = DynamicSleeper::new(ScannerSpeed::Slowest);
|
||||
let start = tokio::time::Instant::now();
|
||||
s.sleep_folder().await;
|
||||
assert_eq!(start.elapsed(), Duration::ZERO);
|
||||
|
||||
SCANNER_IDLE_MODE.store(prev_mode, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user