This commit is contained in:
weisd
2025-12-08 17:26:07 +08:00
parent ad34f1b031
commit dbdcecb9c5
3 changed files with 200 additions and 107 deletions

54
Cargo.lock generated
View File

@@ -3613,6 +3613,18 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "getset"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cf0fc11e47561d47397154977bc219f4cf809b2974facc3ccb3b89e2436f912"
dependencies = [
"proc-macro-error2",
"proc-macro2",
"quote",
"syn 2.0.111",
]
[[package]]
name = "ghash"
version = "0.6.0-rc.3"
@@ -4854,14 +4866,14 @@ dependencies = [
[[package]]
name = "local-ip-address"
version = "0.6.5"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "656b3b27f8893f7bbf9485148ff9a65f019e3f33bd5cdc87c83cab16b3fd9ec8"
checksum = "786c72d9739fc316a7acf9b22d9c2794ac9cb91074e9668feb04304ab7219783"
dependencies = [
"libc",
"neli",
"thiserror 2.0.17",
"windows-sys 0.59.0",
"windows-sys 0.61.2",
]
[[package]]
@@ -5117,27 +5129,31 @@ checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
[[package]]
name = "neli"
version = "0.6.5"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93062a0dce6da2517ea35f301dfc88184ce18d3601ec786a727a87bf535deca9"
checksum = "87fe4204517c0dafc04a1d99ecb577d52c0ffc81e1bbe5cf322769aa8fbd1b05"
dependencies = [
"bitflags 2.10.0",
"byteorder",
"derive_builder 0.20.2",
"getset",
"libc",
"log",
"neli-proc-macros",
"parking_lot",
]
[[package]]
name = "neli-proc-macros"
version = "0.1.4"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c8034b7fbb6f9455b2a96c19e6edf8dc9fc34c70449938d8ee3b4df363f61fe"
checksum = "90e502fe5db321c6e0ae649ccda600675680125a8e8dee327744fe1910b19332"
dependencies = [
"either",
"proc-macro2",
"quote",
"serde",
"syn 1.0.109",
"syn 2.0.111",
]
[[package]]
@@ -6160,6 +6176,28 @@ dependencies = [
"toml_edit 0.23.7",
]
[[package]]
name = "proc-macro-error-attr2"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5"
dependencies = [
"proc-macro2",
"quote",
]
[[package]]
name = "proc-macro-error2"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802"
dependencies = [
"proc-macro-error-attr2",
"proc-macro2",
"quote",
"syn 2.0.111",
]
[[package]]
name = "proc-macro2"
version = "1.0.103"

View File

@@ -783,36 +783,59 @@ impl Scanner {
Ok(processed_count)
}
// TODO: optimize this function
async fn collect_bucket_records_from_store(
&self,
ecstore: &Arc<rustfs_ecstore::store::ECStore>,
bucket_name: &str,
) -> Result<Vec<LocalObjectRecord>> {
let mut records = Vec::new();
let mut continuation_token: Option<String> = None;
const MAX_KEYS: i32 = 1000;
let list_result = ecstore
.clone()
.list_objects_v2(bucket_name, "", None, None, 1000, false, None, false)
.await
.map_err(Error::from)?;
loop {
let list_result = ecstore
.clone()
.list_objects_v2(bucket_name, "", continuation_token.clone(), None, MAX_KEYS, false, None, false)
.await
.map_err(Error::from)?;
for object in list_result.objects {
let usage = local_scan::LocalObjectUsage {
bucket: bucket_name.to_string(),
object: object.name.clone(),
last_modified_ns: object.mod_time.map(|ts| ts.unix_timestamp_nanos()),
versions_count: 1,
delete_markers_count: 0,
total_size: object.size.max(0) as u64,
has_live_object: true,
};
records.push(LocalObjectRecord {
usage,
object_info: Some(object),
file_info: None,
});
// Process objects from this page
for object in list_result.objects {
let usage = local_scan::LocalObjectUsage {
bucket: bucket_name.to_string(),
object: object.name.clone(),
last_modified_ns: object.mod_time.map(|ts| ts.unix_timestamp_nanos()),
versions_count: 1,
delete_markers_count: 0,
total_size: object.size.max(0) as u64,
has_live_object: true,
};
records.push(LocalObjectRecord {
usage,
object_info: Some(object),
file_info: None,
});
}
// Check if there are more pages to fetch
if !list_result.is_truncated {
break;
}
// Get continuation token for next page
continuation_token = list_result.next_continuation_token;
if continuation_token.is_none() {
warn!(
"List objects response is truncated but no continuation token provided for bucket {}",
bucket_name
);
break;
}
}
debug!("Collected {} objects from bucket {} via paginated listing", records.len(), bucket_name);
Ok(records)
}
@@ -839,15 +862,27 @@ impl Scanner {
let local_stats = self.node_scanner.get_stats_summary().await;
self.stats_aggregator.set_local_stats(local_stats).await;
// Start background legacy scan loop for backward compatibility
let scanner = self.clone_for_background();
tokio::spawn(async move {
if let Err(e) = scanner.legacy_scan_loop().await {
error!("Legacy scanner loop failed: {}", e);
}
});
// Background tasks overview:
// The following three tasks can be started in parallel with no ordering dependencies. Each task
// serves a specific purpose:
//
// 1. scan_loop: Primary scan loop that periodically calls scan_cycle() at configured intervals
// to drive healing and lifecycle workflows. This is the core of the new optimized scanning
// mechanism, responsible for continuous health checks and data management. Each scan_cycle()
// also updates local stats in the aggregator and collects data usage statistics.
//
// 2. Initial data usage collection: Performs an immediate data usage collection after startup
// to ensure admin APIs have fresh data available right away. Subsequent collections will be
// handled periodically by scan_loop.
//
// 3. Initial scan cycle: Executes an immediate scan cycle to avoid waiting for a full interval
// before starting healing checks. Subsequent cycles will be triggered by scan_loop at
// configured intervals.
// Launch the primary scan loop to drive healing and lifecycle workflows
// Task 1: Launch the primary scan loop to drive healing and lifecycle workflows
// Reason: Core scanning mechanism that periodically triggers scan_cycle() at configured intervals
// to perform health checks and data management. Each scan_cycle() also updates local stats
// in the aggregator and collects data usage statistics, replacing the legacy_scan_loop.
let scanner = self.clone_for_background();
tokio::spawn(async move {
if let Err(e) = scanner.scan_loop().await {
@@ -855,7 +890,10 @@ impl Scanner {
}
});
// Trigger an immediate data usage collection so that admin APIs have fresh data after startup.
// Task 2: Trigger an immediate data usage collection so that admin APIs have fresh data after startup
// Reason: Provide immediate data availability to admin APIs without waiting for the first scheduled
// collection. Subsequent collections will be handled by scan_loop.
// Dependency: None - can run independently
let scanner = self.clone_for_background();
tokio::spawn(async move {
let enable_stats = {
@@ -870,7 +908,11 @@ impl Scanner {
}
});
// Kick off an initial scan cycle so we do not wait one full interval to run healing checks
// Task 3: Kick off an initial scan cycle so we do not wait one full interval to run healing checks
// Reason: Start healing and lifecycle checks immediately rather than waiting for the first interval
// in scan_loop. Subsequent cycles will be triggered by scan_loop at configured intervals.
// Dependency: None - can run independently, though it will also trigger data usage collection
// if enabled (which may overlap with Task 2, but that's safe)
let scanner = self.clone_for_background();
tokio::spawn(async move {
if let Err(e) = scanner.scan_cycle().await {
@@ -1138,6 +1180,11 @@ impl Scanner {
state.current_scan_duration = Some(scan_duration);
}
// Update local stats in aggregator so other nodes can get the latest stats during aggregation
// This replaces the functionality previously provided by legacy_scan_loop
let local_stats = self.node_scanner.get_stats_summary().await;
self.stats_aggregator.set_local_stats(local_stats).await;
// Complete global metrics collection for this cycle
stop_fn();
@@ -1360,27 +1407,73 @@ impl Scanner {
ecstore: &Arc<rustfs_ecstore::store::ECStore>,
bucket: &str,
) -> Option<BucketUsageInfo> {
match ecstore
.clone()
.list_objects_v2(bucket, "", None, None, 100, false, None, false)
.await
{
Ok(result) => {
let object_count = result.objects.len() as u64;
let bucket_size = result.objects.iter().map(|obj| obj.size as u64).sum();
let usage = BucketUsageInfo {
size: bucket_size,
objects_count: object_count,
versions_count: object_count,
..Default::default()
let mut continuation: Option<String> = None;
let mut objects_count: u64 = 0;
let mut versions_count: u64 = 0;
let mut total_size: u64 = 0;
let mut delete_markers: u64 = 0;
loop {
let result = match ecstore
.clone()
.list_objects_v2(bucket, "", continuation.clone(), None, 1000, false, None, false)
.await
{
Ok(result) => result,
Err(e) => {
warn!("Failed to list objects during realtime data usage fallback for bucket {}: {}", bucket, e);
return None;
}
};
for object in result.objects.iter() {
if object.is_dir {
continue;
}
if object.delete_marker {
delete_markers = delete_markers.saturating_add(1);
continue;
}
let object_size = object.size.max(0) as u64;
objects_count = objects_count.saturating_add(1);
total_size = total_size.saturating_add(object_size);
let detected_versions = if object.num_versions > 0 {
object.num_versions as u64
} else {
1
};
Some(usage)
versions_count = versions_count.saturating_add(detected_versions);
}
Err(e) => {
warn!("Failed to list objects during realtime data usage fallback for bucket {}: {}", bucket, e);
None
if !result.is_truncated {
break;
}
continuation = result.next_continuation_token.clone();
if continuation.is_none() {
warn!(
"Bucket {} listing marked truncated but no continuation token returned; stopping early",
bucket
);
break;
}
}
if versions_count == 0 {
versions_count = objects_count;
}
let usage = BucketUsageInfo {
size: total_size,
objects_count,
versions_count,
delete_markers_count: delete_markers,
..Default::default()
};
Some(usage)
}
/// Verify object integrity and trigger healing if necessary
@@ -1736,7 +1829,7 @@ impl Scanner {
debug!("Part {} not found on disk {}", part_idx, disk.path().display());
}
_ => {
debug!("Part {} check result: {} on disk {}", part_idx, result, disk.path().display());
warn!("Part {} check result: {} on disk {}", part_idx, result, disk.path().display());
}
}
}
@@ -1787,12 +1880,6 @@ impl Scanner {
)));
}
if missing_parts_found > 0 {
return Err(Error::Other(format!(
"Object has missing data parts across disks: {bucket}/{object} (missing parts: {missing_parts_found})"
)));
}
// Special case: if this is a single-part object and we have missing parts on multiple disks,
// it might indicate actual data loss rather than normal EC distribution
if object_info.parts.len() == 1 && missing_parts_found > (total_disks_checked / 2) {
@@ -1811,6 +1898,12 @@ impl Scanner {
}
}
if missing_parts_found > 0 {
return Err(Error::Other(format!(
"Object has missing data parts across disks: {bucket}/{object} (missing parts: {missing_parts_found})"
)));
}
debug!("EC data parts integrity verified for {}/{}", bucket, object);
Ok(())
}
@@ -2869,50 +2962,6 @@ impl Scanner {
Ok(())
}
/// Legacy scan loop for backward compatibility (runs in background)
async fn legacy_scan_loop(&self) -> Result<()> {
info!("Starting legacy scan loop for backward compatibility");
loop {
if let Some(token) = get_ahm_services_cancel_token() {
if token.is_cancelled() {
info!("Cancellation requested, exiting legacy scan loop");
break;
}
}
let (enable_data_usage_stats, scan_interval) = {
let config = self.config.read().await;
(config.enable_data_usage_stats, config.scan_interval)
};
if enable_data_usage_stats {
if let Err(e) = self.collect_and_persist_data_usage().await {
warn!("Background data usage collection failed: {}", e);
}
}
// Update local stats in aggregator after latest scan
let local_stats = self.node_scanner.get_stats_summary().await;
self.stats_aggregator.set_local_stats(local_stats).await;
match get_ahm_services_cancel_token() {
Some(token) => {
tokio::select! {
_ = tokio::time::sleep(scan_interval) => {}
_ = token.cancelled() => {
info!("Cancellation requested, exiting legacy scan loop");
break;
}
}
}
None => tokio::time::sleep(scan_interval).await,
}
}
Ok(())
}
/// Update legacy metrics from aggregated statistics
async fn update_legacy_metrics_from_aggregated(&self, aggregated: &super::stats_aggregator::AggregatedStats) {
// Update metrics collector with aggregated data

View File

@@ -500,7 +500,13 @@ impl ScannerItem {
return false;
};
let grace = TimeDuration::try_from(self.replication_pending_grace).unwrap_or_else(|_| TimeDuration::seconds(0));
let grace = TimeDuration::try_from(self.replication_pending_grace).unwrap_or_else(|_| {
warn!(
"replication_pending_grace is invalid, using default value: 0 seconds, grace: {:?}",
self.replication_pending_grace
);
TimeDuration::seconds(0)
});
if grace.is_zero() {
return true;
}