diff --git a/Cargo.lock b/Cargo.lock index 08ae3bbf..3f00d8cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3613,6 +3613,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "getset" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf0fc11e47561d47397154977bc219f4cf809b2974facc3ccb3b89e2436f912" +dependencies = [ + "proc-macro-error2", + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "ghash" version = "0.6.0-rc.3" @@ -4854,14 +4866,14 @@ dependencies = [ [[package]] name = "local-ip-address" -version = "0.6.5" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "656b3b27f8893f7bbf9485148ff9a65f019e3f33bd5cdc87c83cab16b3fd9ec8" +checksum = "786c72d9739fc316a7acf9b22d9c2794ac9cb91074e9668feb04304ab7219783" dependencies = [ "libc", "neli", "thiserror 2.0.17", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5117,27 +5129,31 @@ checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" [[package]] name = "neli" -version = "0.6.5" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93062a0dce6da2517ea35f301dfc88184ce18d3601ec786a727a87bf535deca9" +checksum = "87fe4204517c0dafc04a1d99ecb577d52c0ffc81e1bbe5cf322769aa8fbd1b05" dependencies = [ + "bitflags 2.10.0", "byteorder", + "derive_builder 0.20.2", + "getset", "libc", "log", "neli-proc-macros", + "parking_lot", ] [[package]] name = "neli-proc-macros" -version = "0.1.4" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c8034b7fbb6f9455b2a96c19e6edf8dc9fc34c70449938d8ee3b4df363f61fe" +checksum = "90e502fe5db321c6e0ae649ccda600675680125a8e8dee327744fe1910b19332" dependencies = [ "either", "proc-macro2", "quote", "serde", - "syn 1.0.109", + "syn 2.0.111", ] [[package]] @@ -6160,6 +6176,28 @@ dependencies = [ "toml_edit 0.23.7", ] +[[package]] +name = "proc-macro-error-attr2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96de42df36bb9bba5542fe9f1a054b8cc87e172759a1868aa05c1f3acc89dfc5" +dependencies = [ + "proc-macro2", + "quote", +] + +[[package]] +name = "proc-macro-error2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11ec05c52be0a07b08061f7dd003e7d7092e0472bc731b4af7bb1ef876109802" +dependencies = [ + "proc-macro-error-attr2", + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "proc-macro2" version = "1.0.103" diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index 1e1d6482..08908298 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -783,36 +783,59 @@ impl Scanner { Ok(processed_count) } + // TODO: optimize this function async fn collect_bucket_records_from_store( &self, ecstore: &Arc, bucket_name: &str, ) -> Result> { let mut records = Vec::new(); + let mut continuation_token: Option = None; + const MAX_KEYS: i32 = 1000; - let list_result = ecstore - .clone() - .list_objects_v2(bucket_name, "", None, None, 1000, false, None, false) - .await - .map_err(Error::from)?; + loop { + let list_result = ecstore + .clone() + .list_objects_v2(bucket_name, "", continuation_token.clone(), None, MAX_KEYS, false, None, false) + .await + .map_err(Error::from)?; - for object in list_result.objects { - let usage = local_scan::LocalObjectUsage { - bucket: bucket_name.to_string(), - object: object.name.clone(), - last_modified_ns: object.mod_time.map(|ts| ts.unix_timestamp_nanos()), - versions_count: 1, - delete_markers_count: 0, - total_size: object.size.max(0) as u64, - has_live_object: true, - }; - records.push(LocalObjectRecord { - usage, - object_info: Some(object), - file_info: None, - }); + // Process objects from this page + for object in list_result.objects { + let usage = local_scan::LocalObjectUsage { + bucket: bucket_name.to_string(), + object: object.name.clone(), + last_modified_ns: object.mod_time.map(|ts| ts.unix_timestamp_nanos()), + versions_count: 1, + delete_markers_count: 0, + total_size: object.size.max(0) as u64, + has_live_object: true, + }; + records.push(LocalObjectRecord { + usage, + object_info: Some(object), + file_info: None, + }); + } + + // Check if there are more pages to fetch + if !list_result.is_truncated { + break; + } + + // Get continuation token for next page + continuation_token = list_result.next_continuation_token; + if continuation_token.is_none() { + warn!( + "List objects response is truncated but no continuation token provided for bucket {}", + bucket_name + ); + break; + } } + debug!("Collected {} objects from bucket {} via paginated listing", records.len(), bucket_name); + Ok(records) } @@ -839,15 +862,27 @@ impl Scanner { let local_stats = self.node_scanner.get_stats_summary().await; self.stats_aggregator.set_local_stats(local_stats).await; - // Start background legacy scan loop for backward compatibility - let scanner = self.clone_for_background(); - tokio::spawn(async move { - if let Err(e) = scanner.legacy_scan_loop().await { - error!("Legacy scanner loop failed: {}", e); - } - }); + // Background tasks overview: + // The following three tasks can be started in parallel with no ordering dependencies. Each task + // serves a specific purpose: + // + // 1. scan_loop: Primary scan loop that periodically calls scan_cycle() at configured intervals + // to drive healing and lifecycle workflows. This is the core of the new optimized scanning + // mechanism, responsible for continuous health checks and data management. Each scan_cycle() + // also updates local stats in the aggregator and collects data usage statistics. + // + // 2. Initial data usage collection: Performs an immediate data usage collection after startup + // to ensure admin APIs have fresh data available right away. Subsequent collections will be + // handled periodically by scan_loop. + // + // 3. Initial scan cycle: Executes an immediate scan cycle to avoid waiting for a full interval + // before starting healing checks. Subsequent cycles will be triggered by scan_loop at + // configured intervals. - // Launch the primary scan loop to drive healing and lifecycle workflows + // Task 1: Launch the primary scan loop to drive healing and lifecycle workflows + // Reason: Core scanning mechanism that periodically triggers scan_cycle() at configured intervals + // to perform health checks and data management. Each scan_cycle() also updates local stats + // in the aggregator and collects data usage statistics, replacing the legacy_scan_loop. let scanner = self.clone_for_background(); tokio::spawn(async move { if let Err(e) = scanner.scan_loop().await { @@ -855,7 +890,10 @@ impl Scanner { } }); - // Trigger an immediate data usage collection so that admin APIs have fresh data after startup. + // Task 2: Trigger an immediate data usage collection so that admin APIs have fresh data after startup + // Reason: Provide immediate data availability to admin APIs without waiting for the first scheduled + // collection. Subsequent collections will be handled by scan_loop. + // Dependency: None - can run independently let scanner = self.clone_for_background(); tokio::spawn(async move { let enable_stats = { @@ -870,7 +908,11 @@ impl Scanner { } }); - // Kick off an initial scan cycle so we do not wait one full interval to run healing checks + // Task 3: Kick off an initial scan cycle so we do not wait one full interval to run healing checks + // Reason: Start healing and lifecycle checks immediately rather than waiting for the first interval + // in scan_loop. Subsequent cycles will be triggered by scan_loop at configured intervals. + // Dependency: None - can run independently, though it will also trigger data usage collection + // if enabled (which may overlap with Task 2, but that's safe) let scanner = self.clone_for_background(); tokio::spawn(async move { if let Err(e) = scanner.scan_cycle().await { @@ -1138,6 +1180,11 @@ impl Scanner { state.current_scan_duration = Some(scan_duration); } + // Update local stats in aggregator so other nodes can get the latest stats during aggregation + // This replaces the functionality previously provided by legacy_scan_loop + let local_stats = self.node_scanner.get_stats_summary().await; + self.stats_aggregator.set_local_stats(local_stats).await; + // Complete global metrics collection for this cycle stop_fn(); @@ -1360,27 +1407,73 @@ impl Scanner { ecstore: &Arc, bucket: &str, ) -> Option { - match ecstore - .clone() - .list_objects_v2(bucket, "", None, None, 100, false, None, false) - .await - { - Ok(result) => { - let object_count = result.objects.len() as u64; - let bucket_size = result.objects.iter().map(|obj| obj.size as u64).sum(); - let usage = BucketUsageInfo { - size: bucket_size, - objects_count: object_count, - versions_count: object_count, - ..Default::default() + let mut continuation: Option = None; + let mut objects_count: u64 = 0; + let mut versions_count: u64 = 0; + let mut total_size: u64 = 0; + let mut delete_markers: u64 = 0; + + loop { + let result = match ecstore + .clone() + .list_objects_v2(bucket, "", continuation.clone(), None, 1000, false, None, false) + .await + { + Ok(result) => result, + Err(e) => { + warn!("Failed to list objects during realtime data usage fallback for bucket {}: {}", bucket, e); + return None; + } + }; + + for object in result.objects.iter() { + if object.is_dir { + continue; + } + + if object.delete_marker { + delete_markers = delete_markers.saturating_add(1); + continue; + } + + let object_size = object.size.max(0) as u64; + objects_count = objects_count.saturating_add(1); + total_size = total_size.saturating_add(object_size); + + let detected_versions = if object.num_versions > 0 { + object.num_versions as u64 + } else { + 1 }; - Some(usage) + versions_count = versions_count.saturating_add(detected_versions); } - Err(e) => { - warn!("Failed to list objects during realtime data usage fallback for bucket {}: {}", bucket, e); - None + + if !result.is_truncated { + break; + } + + continuation = result.next_continuation_token.clone(); + if continuation.is_none() { + warn!( + "Bucket {} listing marked truncated but no continuation token returned; stopping early", + bucket + ); + break; } } + + if versions_count == 0 { + versions_count = objects_count; + } + + let usage = BucketUsageInfo { + size: total_size, + objects_count, + versions_count, + delete_markers_count: delete_markers, + ..Default::default() + }; + Some(usage) } /// Verify object integrity and trigger healing if necessary @@ -1736,7 +1829,7 @@ impl Scanner { debug!("Part {} not found on disk {}", part_idx, disk.path().display()); } _ => { - debug!("Part {} check result: {} on disk {}", part_idx, result, disk.path().display()); + warn!("Part {} check result: {} on disk {}", part_idx, result, disk.path().display()); } } } @@ -1787,12 +1880,6 @@ impl Scanner { ))); } - if missing_parts_found > 0 { - return Err(Error::Other(format!( - "Object has missing data parts across disks: {bucket}/{object} (missing parts: {missing_parts_found})" - ))); - } - // Special case: if this is a single-part object and we have missing parts on multiple disks, // it might indicate actual data loss rather than normal EC distribution if object_info.parts.len() == 1 && missing_parts_found > (total_disks_checked / 2) { @@ -1811,6 +1898,12 @@ impl Scanner { } } + if missing_parts_found > 0 { + return Err(Error::Other(format!( + "Object has missing data parts across disks: {bucket}/{object} (missing parts: {missing_parts_found})" + ))); + } + debug!("EC data parts integrity verified for {}/{}", bucket, object); Ok(()) } @@ -2869,50 +2962,6 @@ impl Scanner { Ok(()) } - /// Legacy scan loop for backward compatibility (runs in background) - async fn legacy_scan_loop(&self) -> Result<()> { - info!("Starting legacy scan loop for backward compatibility"); - - loop { - if let Some(token) = get_ahm_services_cancel_token() { - if token.is_cancelled() { - info!("Cancellation requested, exiting legacy scan loop"); - break; - } - } - - let (enable_data_usage_stats, scan_interval) = { - let config = self.config.read().await; - (config.enable_data_usage_stats, config.scan_interval) - }; - - if enable_data_usage_stats { - if let Err(e) = self.collect_and_persist_data_usage().await { - warn!("Background data usage collection failed: {}", e); - } - } - - // Update local stats in aggregator after latest scan - let local_stats = self.node_scanner.get_stats_summary().await; - self.stats_aggregator.set_local_stats(local_stats).await; - - match get_ahm_services_cancel_token() { - Some(token) => { - tokio::select! { - _ = tokio::time::sleep(scan_interval) => {} - _ = token.cancelled() => { - info!("Cancellation requested, exiting legacy scan loop"); - break; - } - } - } - None => tokio::time::sleep(scan_interval).await, - } - } - - Ok(()) - } - /// Update legacy metrics from aggregated statistics async fn update_legacy_metrics_from_aggregated(&self, aggregated: &super::stats_aggregator::AggregatedStats) { // Update metrics collector with aggregated data diff --git a/crates/ahm/src/scanner/lifecycle.rs b/crates/ahm/src/scanner/lifecycle.rs index 4bcef12a..6976f81f 100644 --- a/crates/ahm/src/scanner/lifecycle.rs +++ b/crates/ahm/src/scanner/lifecycle.rs @@ -500,7 +500,13 @@ impl ScannerItem { return false; }; - let grace = TimeDuration::try_from(self.replication_pending_grace).unwrap_or_else(|_| TimeDuration::seconds(0)); + let grace = TimeDuration::try_from(self.replication_pending_grace).unwrap_or_else(|_| { + warn!( + "replication_pending_grace is invalid, using default value: 0 seconds, grace: {:?}", + self.replication_pending_grace + ); + TimeDuration::seconds(0) + }); if grace.is_zero() { return true; }