From c43166c4c659607fd35039b3ffe44d753b39b9f1 Mon Sep 17 00:00:00 2001 From: guojidan <63799833+guojidan@users.noreply.github.com> Date: Thu, 4 Dec 2025 16:14:47 +0800 Subject: [PATCH] =?UTF-8?q?Enhance=20heal=20and=20lifecycle=20integration?= =?UTF-8?q?=20tests,=20improve=20replication=20han=E2=80=A6=20(#944)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: junxiang Mu <1948535941@qq.com> Co-authored-by: houseme Co-authored-by: weisd --- crates/ahm/src/heal/manager.rs | 9 +- crates/ahm/src/scanner/data_scanner.rs | 1071 ++++++++++++----- crates/ahm/src/scanner/lifecycle.rs | 308 ++++- crates/ahm/src/scanner/local_scan/mod.rs | 4 + crates/ahm/src/scanner/metrics.rs | 52 + crates/ahm/tests/heal_integration_test.rs | 624 +++++++++- .../ahm/tests/lifecycle_integration_test.rs | 305 ++++- .../src/bucket/object_lock/objectlock.rs | 6 +- 8 files changed, 2059 insertions(+), 320 deletions(-) diff --git a/crates/ahm/src/heal/manager.rs b/crates/ahm/src/heal/manager.rs index c2717ef2..d86b2de8 100644 --- a/crates/ahm/src/heal/manager.rs +++ b/crates/ahm/src/heal/manager.rs @@ -31,7 +31,7 @@ use tokio::{ time::interval, }; use tokio_util::sync::CancellationToken; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; /// Priority queue wrapper for heal requests /// Uses BinaryHeap for priority-based ordering while maintaining FIFO for same-priority items @@ -418,7 +418,12 @@ impl HealManager { /// Get statistics pub async fn get_statistics(&self) -> HealStatistics { - self.statistics.read().await.clone() + let stats = self.statistics.read().await.clone(); + debug!( + "HealManager stats snapshot: total_tasks={}, successful_tasks={}, failed_tasks={}, running_tasks={}", + stats.total_tasks, stats.successful_tasks, stats.failed_tasks, stats.running_tasks + ); + stats } /// Get active task count diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index 900d40ce..1e1d6482 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -14,30 +14,31 @@ // IO throttling component is integrated into NodeScanner use crate::{ - Error, HealRequest, Result, get_ahm_services_cancel_token, + Error, HealRequest, Result, get_ahm_services_cancel_token, get_heal_manager, heal::HealManager, scanner::{ BucketMetrics, DecentralizedStatsAggregator, DecentralizedStatsAggregatorConfig, DiskMetrics, MetricsCollector, NodeScanner, NodeScannerConfig, ScannerMetrics, - lifecycle::ScannerItem, + lifecycle::{ReplicationMetricsHandle, ScannerItem}, local_scan::{self, LocalObjectRecord, LocalScanOutcome}, }, }; -use rustfs_common::data_usage::{DataUsageInfo, SizeSummary}; +use rustfs_common::data_usage::{BucketUsageInfo, DataUsageInfo, SizeSummary}; use rustfs_common::metrics::{Metric, Metrics, global_metrics}; use rustfs_ecstore::{ self as ecstore, StorageAPI, - bucket::versioning::VersioningApi, bucket::versioning_sys::BucketVersioningSys, + bucket::{object_lock::objectlock_sys::BucketObjectLockSys, versioning::VersioningApi}, data_usage::{aggregate_local_snapshots, store_data_usage_in_backend}, disk::{Disk, DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions}, set_disk::SetDisks, store_api::ObjectInfo, }; -use rustfs_filemeta::{MetacacheReader, VersionType}; +use rustfs_filemeta::{FileInfo, MetacacheReader, VersionType, file_info_from_raw}; use s3s::dto::{BucketVersioningStatus, VersioningConfiguration}; use std::{ - collections::HashMap, + borrow::Cow, + collections::{HashMap, HashSet}, sync::Arc, time::{Duration, SystemTime}, }; @@ -74,6 +75,10 @@ pub struct ScannerConfig { pub scan_mode: ScanMode, /// Whether to enable data usage statistics collection pub enable_data_usage_stats: bool, + /// Maximum number of buckets refreshed per realtime data usage fallback cycle + pub realtime_usage_bucket_limit: usize, + /// Grace period before pending replication is considered lagging + pub replication_pending_grace: Duration, } impl Default for ScannerConfig { @@ -86,6 +91,8 @@ impl Default for ScannerConfig { enable_metrics: true, scan_mode: ScanMode::Normal, enable_data_usage_stats: true, + realtime_usage_bucket_limit: 32, + replication_pending_grace: Duration::from_secs(300), } } } @@ -111,6 +118,81 @@ pub struct ScannerState { pub scanning_disks: Vec, } +#[derive(Debug, Default)] +struct RealtimeUsageCache { + bucket_cursor: usize, + bucket_order: Vec, + buckets_usage: HashMap, +} + +impl RealtimeUsageCache { + fn align_with_bucket_list(&mut self, bucket_names: &[String]) { + let valid: HashSet<_> = bucket_names.iter().cloned().collect(); + self.buckets_usage.retain(|bucket, _| valid.contains(bucket)); + self.bucket_order = bucket_names.to_vec(); + if self.bucket_order.is_empty() { + self.bucket_cursor = 0; + } else { + self.bucket_cursor %= self.bucket_order.len(); + } + } + + fn record_snapshot(&mut self, snapshot: &DataUsageInfo) { + if snapshot.buckets_usage.is_empty() { + self.bucket_order.clear(); + self.bucket_cursor = 0; + self.buckets_usage.clear(); + return; + } + + let mut ordered: Vec = snapshot.buckets_usage.keys().cloned().collect(); + ordered.sort(); + self.bucket_order = ordered; + self.bucket_cursor = 0; + self.buckets_usage = snapshot.buckets_usage.clone(); + } + + fn next_batch(&mut self, batch_size: usize) -> Vec { + if self.bucket_order.is_empty() || batch_size == 0 { + return Vec::new(); + } + + let limit = batch_size.min(self.bucket_order.len()); + let mut selected = Vec::with_capacity(limit); + for _ in 0..limit { + let bucket = self.bucket_order[self.bucket_cursor].clone(); + self.bucket_cursor = (self.bucket_cursor + 1) % self.bucket_order.len(); + selected.push(bucket); + } + + selected + } + + fn upsert_bucket_usage(&mut self, bucket: &str, usage: BucketUsageInfo) { + self.buckets_usage.insert(bucket.to_string(), usage); + } + + fn build_usage_info(&self, bucket_names: &[String]) -> DataUsageInfo { + let mut info = DataUsageInfo { + buckets_count: bucket_names.len() as u64, + last_update: Some(SystemTime::now()), + ..Default::default() + }; + + for bucket in bucket_names { + if let Some(usage) = self.buckets_usage.get(bucket) { + info.objects_total_count += usage.objects_count; + info.objects_total_size += usage.size; + info.versions_total_count += usage.versions_count; + info.buckets_usage.insert(bucket.clone(), usage.clone()); + info.bucket_sizes.insert(bucket.clone(), usage.size); + } + } + + info + } +} + /// AHM Scanner - Automatic Health Management Scanner (Optimized Version) /// /// This is the new optimized scanner that uses the decentralized node-based architecture @@ -137,6 +219,8 @@ pub struct Scanner { data_usage_stats: Arc>>, /// Last data usage statistics collection time last_data_usage_collection: Arc>>, + /// Cached realtime data usage fallback state + realtime_usage_cache: Arc>, /// Heal manager for auto-heal integration heal_manager: Option>, @@ -192,6 +276,7 @@ impl Scanner { disk_metrics: Arc::new(Mutex::new(HashMap::new())), data_usage_stats: Arc::new(Mutex::new(HashMap::new())), last_data_usage_collection: Arc::new(RwLock::new(None)), + realtime_usage_cache: Arc::new(Mutex::new(RealtimeUsageCache::default())), heal_manager, node_scanner, stats_aggregator, @@ -217,11 +302,39 @@ impl Scanner { config.enable_data_usage_stats = enable; } + /// Configure realtime data usage fallback bucket limit. + pub async fn set_realtime_usage_bucket_limit(&self, limit: usize) { + let mut config = self.config.write().await; + config.realtime_usage_bucket_limit = limit.max(1); + } + /// Set the heal manager after construction pub fn set_heal_manager(&mut self, heal_manager: Arc) { self.heal_manager = Some(heal_manager); } + fn acquire_heal_manager(&self) -> Option> { + if let Some(local) = &self.heal_manager { + return Some(local.clone()); + } + get_heal_manager().map(Arc::clone) + } + + async fn ensure_heal_manager_ready(&self) -> bool { + if !self.config.read().await.enable_healing { + return true; + } + + if self.acquire_heal_manager().is_some() { + true + } else { + warn!( + "Scanner healing is enabled but no HealManager is available; heal submissions will be skipped until it becomes ready" + ); + false + } + } + /// Initialize scanner with ECStore disks (for testing and runtime) pub async fn initialize_with_ecstore(&self) { if let Some(ecstore) = rustfs_ecstore::new_object_layer_fn() { @@ -247,6 +360,144 @@ impl Scanner { } } + /// Run volume consistency checks to detect missing buckets and trigger heals + pub async fn run_volume_consistency_check(&self) -> Result<()> { + if let Some(ecstore) = rustfs_ecstore::new_object_layer_fn() { + for pool in &ecstore.pools { + for set_disks in &pool.disk_set { + let (disks, _) = set_disks.get_online_disks_with_healing(false).await; + if disks.is_empty() { + continue; + } + + self.check_and_heal_missing_volumes(&disks, set_disks.set_index, set_disks.pool_index) + .await?; + } + } + } else { + warn!("run_volume_consistency_check: ECStore not available"); + } + + Ok(()) + } + + async fn handle_buckets_from_local_records( + &self, + ecstore: &Arc, + bucket_objects_map: &HashMap>, + enable_healing: bool, + enable_deep_scan: bool, + run_lifecycle: bool, + ) -> Result { + debug!("Listing buckets for lifecycle/replication evaluation"); + let buckets = ecstore + .list_bucket(&rustfs_ecstore::store_api::BucketOptions::default()) + .await + .map_err(Error::from)?; + + let mut total_objects_scanned = 0u64; + let replication_pending_grace = { + let cfg = self.config.read().await; + cfg.replication_pending_grace + }; + + for bucket_info in buckets { + let bucket_name = &bucket_info.name; + + if bucket_name.starts_with('.') { + debug!("Skipping system bucket: {}", bucket_name); + continue; + } + + let lifecycle_config = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name) + .await + .ok() + .map(|(c, _)| Arc::new(c)); + let versioning_config = Arc::new(VersioningConfiguration { + status: if bucket_info.versioning { + Some(BucketVersioningStatus::from_static(BucketVersioningStatus::ENABLED)) + } else { + None + }, + ..Default::default() + }); + + let records_cow: Cow<[LocalObjectRecord]> = if let Some(existing) = bucket_objects_map.get(bucket_name) { + if existing.is_empty() { + debug!("Bucket {} had empty local snapshot entries; refreshing via ECStore listing", bucket_name); + let fetched = self.collect_bucket_records_from_store(ecstore, bucket_name).await?; + if fetched.is_empty() { + debug!("No objects discovered via ECStore listing for bucket {}; skipping", bucket_name); + continue; + } + Cow::Owned(fetched) + } else { + Cow::Borrowed(existing.as_slice()) + } + } else { + debug!( + "No local snapshot entries found for bucket {}; falling back to ECStore listing", + bucket_name + ); + let fetched = self.collect_bucket_records_from_store(ecstore, bucket_name).await?; + if fetched.is_empty() { + debug!("No objects discovered via ECStore listing for bucket {}; skipping", bucket_name); + continue; + } + Cow::Owned(fetched) + }; + + let records = records_cow.as_ref(); + if records.is_empty() { + continue; + } + + self.reset_bucket_replication_counters(bucket_name).await; + + let live_objects = records.iter().filter(|record| record.usage.has_live_object).count() as u64; + total_objects_scanned = total_objects_scanned.saturating_add(live_objects); + debug!("Counted {} objects in bucket {} using local snapshots", live_objects, bucket_name); + + if run_lifecycle { + if lifecycle_config.is_some() { + debug!("Processing lifecycle actions for bucket: {}", bucket_name); + } else { + debug!("Processing replication checks for bucket without lifecycle config: {}", bucket_name); + } + let object_lock_config = BucketObjectLockSys::get(bucket_name).await; + let replication_metrics = + Some(ReplicationMetricsHandle::new(Arc::clone(&self.metrics), Arc::clone(&self.bucket_metrics))); + let mut scanner_item = ScannerItem::new( + bucket_name.to_string(), + lifecycle_config.clone(), + Some(versioning_config.clone()), + object_lock_config, + replication_pending_grace, + replication_metrics, + ); + + if let Err(e) = self + .process_bucket_objects_for_lifecycle(ecstore, bucket_name, &mut scanner_item, records) + .await + { + warn!("Failed to process lifecycle/replication actions for bucket {}: {}", bucket_name, e); + } + } + + if enable_deep_scan && enable_healing { + debug!("Deep scan enabled, verifying object integrity in bucket {}", bucket_name); + if let Err(e) = self + .deep_scan_bucket_objects_with_records(ecstore, bucket_name, records) + .await + { + warn!("Deep scan failed for bucket {}: {}", bucket_name, e); + } + } + } + + Ok(total_objects_scanned) + } + /// Perform basic test scan for testing environments async fn perform_basic_test_scan(&self) -> Result<()> { debug!("Starting basic test scan using ECStore directly"); @@ -269,92 +520,28 @@ impl Scanner { }; let bucket_objects_map = &scan_outcome.bucket_objects; - // List all buckets - debug!("Listing buckets"); - match ecstore - .list_bucket(&rustfs_ecstore::store_api::BucketOptions::default()) - .await - { - Ok(buckets) => { - debug!("Found {} buckets", buckets.len()); - for bucket_info in buckets { - let bucket_name = &bucket_info.name; - - // Skip system buckets - if bucket_name.starts_with('.') { - debug!("Skipping system bucket: {}", bucket_name); - continue; - } - - // Get bucket lifecycle configuration - let lifecycle_config = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name) - .await - .ok() - .map(|(c, _)| Arc::new(c)); - - // Get bucket versioning configuration - let versioning_config = Arc::new(VersioningConfiguration { - status: if bucket_info.versioning { - Some(BucketVersioningStatus::from_static(BucketVersioningStatus::ENABLED)) - } else { - None - }, - ..Default::default() - }); - - let records = match bucket_objects_map.get(bucket_name) { - Some(records) => records, - None => { - debug!( - "No local snapshot entries found for bucket {}; skipping lifecycle/integrity", - bucket_name - ); - continue; - } - }; - - let live_objects = records.iter().filter(|record| record.usage.has_live_object).count() as u64; - total_objects_scanned = total_objects_scanned.saturating_add(live_objects); - debug!("Counted {} objects in bucket {} using local snapshots", live_objects, bucket_name); - - // Process objects for lifecycle actions - if let Some(lifecycle_config) = &lifecycle_config { - debug!("Processing lifecycle actions for bucket: {}", bucket_name); - let mut scanner_item = ScannerItem::new( - bucket_name.to_string(), - Some(lifecycle_config.clone()), - Some(versioning_config.clone()), - ); - - match self - .process_bucket_objects_for_lifecycle(bucket_name, &mut scanner_item, records) - .await - { - Ok(processed_count) => { - debug!("Processed {} objects for lifecycle in bucket {}", processed_count, bucket_name); - } - Err(e) => { - warn!("Failed to process lifecycle actions for bucket {}: {}", bucket_name, e); - } - } - } - - // If deep scan is enabled, verify each object's integrity - if enable_deep_scan && enable_healing { - debug!("Deep scan enabled, verifying object integrity in bucket {}", bucket_name); - if let Err(e) = self - .deep_scan_bucket_objects_with_records(&ecstore, bucket_name, records) - .await - { - warn!("Deep scan failed for bucket {}: {}", bucket_name, e); - } - } - } + let lifecycle_result = self + .handle_buckets_from_local_records(&ecstore, bucket_objects_map, enable_healing, enable_deep_scan, false) + .await; + match lifecycle_result { + Ok(total) => { + total_objects_scanned = total; self.update_data_usage_statistics(&scan_outcome, &ecstore).await; } Err(e) => { - error!("Failed to list buckets: {}", e); + error!("Failed to process buckets during basic scan: {}", e); + } + } + + for pool in &ecstore.pools { + for set_disks in &pool.disk_set { + if let Err(err) = self.scan_set_disks(set_disks.clone()).await { + warn!( + "Set disk scan failed for pool {} set {}: {}", + set_disks.pool_index, set_disks.set_index, err + ); + } } } @@ -497,7 +684,10 @@ impl Scanner { } let object_name = &record.usage.object; - if let Err(err) = self.verify_object_integrity(bucket_name, object_name).await { + if let Err(err) = self + .verify_object_integrity(bucket_name, object_name, record.file_info.as_ref()) + .await + { warn!( "Object integrity verification failed for {}/{} during deep scan: {}", bucket_name, object_name, err @@ -530,7 +720,8 @@ impl Scanner { if let Some(object_name) = entry.file_name().to_str() { if !object_name.starts_with('.') { debug!("Deep scanning object: {}/{}", bucket_name, object_name); - if let Err(e) = self.verify_object_integrity(bucket_name, object_name).await { + if let Err(e) = self.verify_object_integrity(bucket_name, object_name, None).await + { warn!( "Object integrity verification failed for {}/{}: {}", bucket_name, object_name, e @@ -559,6 +750,7 @@ impl Scanner { /// Process bucket objects for lifecycle actions async fn process_bucket_objects_for_lifecycle( &self, + ecstore: &Arc, bucket_name: &str, scanner_item: &mut ScannerItem, records: &[LocalObjectRecord], @@ -571,7 +763,14 @@ impl Scanner { continue; } - let object_info = Self::convert_record_to_object_info(record); + let mut object_info = Self::convert_record_to_object_info(record); + if object_info.replication_status.is_empty() && object_info.replication_status_internal.is_none() { + let object_name = object_info.name.clone(); + let object_opts = rustfs_ecstore::store_api::ObjectOptions::default(); + if let Ok(fresh_info) = ecstore.get_object_info(bucket_name, &object_name, &object_opts).await { + object_info = fresh_info; + } + } let mut size_summary = SizeSummary::default(); let (deleted, _size) = scanner_item.apply_actions(&object_info, &mut size_summary).await; if deleted { @@ -584,6 +783,39 @@ impl Scanner { Ok(processed_count) } + async fn collect_bucket_records_from_store( + &self, + ecstore: &Arc, + bucket_name: &str, + ) -> Result> { + let mut records = Vec::new(); + + let list_result = ecstore + .clone() + .list_objects_v2(bucket_name, "", None, None, 1000, false, None, false) + .await + .map_err(Error::from)?; + + for object in list_result.objects { + let usage = local_scan::LocalObjectUsage { + bucket: bucket_name.to_string(), + object: object.name.clone(), + last_modified_ns: object.mod_time.map(|ts| ts.unix_timestamp_nanos()), + versions_count: 1, + delete_markers_count: 0, + total_size: object.size.max(0) as u64, + has_live_object: true, + }; + records.push(LocalObjectRecord { + usage, + object_info: Some(object), + file_info: None, + }); + } + + Ok(records) + } + /// Start the optimized scanner pub async fn start(&self) -> Result<()> { let mut state = self.state.write().await; @@ -615,6 +847,37 @@ impl Scanner { } }); + // Launch the primary scan loop to drive healing and lifecycle workflows + let scanner = self.clone_for_background(); + tokio::spawn(async move { + if let Err(e) = scanner.scan_loop().await { + error!("Scanner loop failed: {}", e); + } + }); + + // Trigger an immediate data usage collection so that admin APIs have fresh data after startup. + let scanner = self.clone_for_background(); + tokio::spawn(async move { + let enable_stats = { + let cfg = scanner.config.read().await; + cfg.enable_data_usage_stats + }; + + if enable_stats { + if let Err(e) = scanner.collect_and_persist_data_usage().await { + warn!("Initial data usage collection failed: {}", e); + } + } + }); + + // Kick off an initial scan cycle so we do not wait one full interval to run healing checks + let scanner = self.clone_for_background(); + tokio::spawn(async move { + if let Err(e) = scanner.scan_cycle().await { + warn!("Initial scan cycle failed: {}", e); + } + }); + Ok(()) } @@ -766,6 +1029,8 @@ impl Scanner { self.metrics.get_metrics().current_cycle + 1 ); + let _ = self.ensure_heal_manager_ready().await; + // Update state { let mut state = self.state.write().await; @@ -852,6 +1117,10 @@ impl Scanner { } } + if let Err(e) = self.run_lifecycle_evaluation().await { + warn!("Lifecycle evaluation step failed: {}", e); + } + // Phase 2: Minimal EC verification for critical objects only // Note: The main scanning is now handled by NodeScanner in the background if let Some(ecstore) = rustfs_ecstore::new_object_layer_fn() { @@ -923,6 +1192,9 @@ impl Scanner { data_usage.last_update = Some(SystemTime::now()); } + // Keep realtime fallback cache in sync with the fresh consolidated snapshot. + self.update_realtime_cache_from_snapshot(&data_usage).await; + // Publish to node stats manager self.node_scanner.update_data_usage(data_usage.clone()).await; @@ -959,80 +1231,161 @@ impl Scanner { Ok(()) } + async fn update_realtime_cache_from_snapshot(&self, snapshot: &DataUsageInfo) { + let mut cache = self.realtime_usage_cache.lock().await; + cache.record_snapshot(snapshot); + } + + async fn run_lifecycle_evaluation(&self) -> Result<()> { + let Some(ecstore) = rustfs_ecstore::new_object_layer_fn() else { + warn!("Cannot run lifecycle evaluation without ECStore"); + return Ok(()); + }; + + let scan_outcome = match local_scan::scan_and_persist_local_usage(ecstore.clone()).await { + Ok(outcome) => outcome, + Err(err) => { + warn!("Lifecycle evaluation skipped: failed to collect local usage snapshots: {}", err); + return Ok(()); + } + }; + + if scan_outcome.bucket_objects.is_empty() { + debug!("No bucket records available for lifecycle evaluation"); + return Ok(()); + } + + if let Err(e) = self + .handle_buckets_from_local_records(&ecstore, &scan_outcome.bucket_objects, false, false, true) + .await + { + warn!("Lifecycle evaluation encountered errors: {}", e); + } + + Ok(()) + } + + /// Record that a heal task has been queued and update related metrics. + async fn record_heal_submission(&self, bucket: Option<&str>) { + self.metrics.increment_heal_tasks_queued(1); + + if let Some(bucket_name) = bucket { + let mut bucket_metrics_guard = self.bucket_metrics.lock().await; + let entry = bucket_metrics_guard + .entry(bucket_name.to_string()) + .or_insert_with(|| BucketMetrics { + bucket: bucket_name.to_string(), + ..Default::default() + }); + entry.heal_tasks_queued = entry.heal_tasks_queued.saturating_add(1); + } + } + + async fn reset_bucket_replication_counters(&self, bucket: &str) { + let mut bucket_metrics_guard = self.bucket_metrics.lock().await; + let entry = bucket_metrics_guard + .entry(bucket.to_string()) + .or_insert_with(|| BucketMetrics { + bucket: bucket.to_string(), + ..Default::default() + }); + entry.replication_pending = 0; + entry.replication_failed = 0; + entry.replication_lagging = 0; + entry.replication_tasks_queued = 0; + } + /// Build data usage statistics directly from ECStore async fn build_data_usage_from_ecstore(&self, ecstore: &Arc) -> Result { - let mut data_usage = DataUsageInfo::default(); - - // Get bucket list - match ecstore + let bucket_list = match ecstore .list_bucket(&rustfs_ecstore::store_api::BucketOptions::default()) .await { - Ok(buckets) => { - data_usage.buckets_count = buckets.len() as u64; - data_usage.last_update = Some(SystemTime::now()); - - let mut total_objects = 0u64; - let mut total_size = 0u64; - - for bucket_info in buckets { - if bucket_info.name.starts_with('.') { - continue; // Skip system buckets - } - - // Try to get actual object count for this bucket - let (object_count, bucket_size) = match ecstore - .clone() - .list_objects_v2( - &bucket_info.name, - "", // prefix - None, // continuation_token - None, // delimiter - 100, // max_keys - small limit for performance - false, // fetch_owner - None, // start_after - false, // incl_deleted - ) - .await - { - Ok(result) => { - let count = result.objects.len() as u64; - let size = result.objects.iter().map(|obj| obj.size as u64).sum(); - (count, size) - } - Err(_) => (0, 0), - }; - - total_objects += object_count; - total_size += bucket_size; - - let bucket_usage = rustfs_common::data_usage::BucketUsageInfo { - size: bucket_size, - objects_count: object_count, - versions_count: object_count, // Simplified - delete_markers_count: 0, - ..Default::default() - }; - - data_usage.buckets_usage.insert(bucket_info.name.clone(), bucket_usage); - data_usage.bucket_sizes.insert(bucket_info.name, bucket_size); - } - - data_usage.objects_total_count = total_objects; - data_usage.objects_total_size = total_size; - data_usage.versions_total_count = total_objects; - } + Ok(list) => list, Err(e) => { warn!("Failed to list buckets for data usage collection: {}", e); + return Ok(DataUsageInfo::default()); + } + }; + + let bucket_names: Vec = bucket_list + .into_iter() + .map(|info| info.name) + .filter(|name| !name.starts_with('.')) + .collect(); + + if bucket_names.is_empty() { + return Ok(DataUsageInfo::default()); + } + + let bucket_limit = { + let cfg = self.config.read().await; + cfg.realtime_usage_bucket_limit + }; + let bucket_limit = bucket_limit.max(1); + + let refresh_targets = { + let mut cache = self.realtime_usage_cache.lock().await; + cache.align_with_bucket_list(&bucket_names); + cache.next_batch(bucket_limit) + }; + + let mut refreshed_usage = Vec::with_capacity(refresh_targets.len()); + for bucket in &refresh_targets { + if let Some(usage) = self.collect_bucket_usage_from_ecstore(ecstore, bucket).await { + refreshed_usage.push((bucket.clone(), usage)); } } + let data_usage = { + let mut cache = self.realtime_usage_cache.lock().await; + for (bucket, usage) in refreshed_usage { + cache.upsert_bucket_usage(&bucket, usage); + } + cache.build_usage_info(&bucket_names) + }; + + info!( + "Realtime data usage fallback refreshed {} of {} buckets ({} cached entries)", + refresh_targets.len(), + bucket_names.len(), + data_usage.buckets_usage.len() + ); + Ok(data_usage) } + async fn collect_bucket_usage_from_ecstore( + &self, + ecstore: &Arc, + bucket: &str, + ) -> Option { + match ecstore + .clone() + .list_objects_v2(bucket, "", None, None, 100, false, None, false) + .await + { + Ok(result) => { + let object_count = result.objects.len() as u64; + let bucket_size = result.objects.iter().map(|obj| obj.size as u64).sum(); + let usage = BucketUsageInfo { + size: bucket_size, + objects_count: object_count, + versions_count: object_count, + ..Default::default() + }; + Some(usage) + } + Err(e) => { + warn!("Failed to list objects during realtime data usage fallback for bucket {}: {}", bucket, e); + None + } + } + } + /// Verify object integrity and trigger healing if necessary #[allow(dead_code)] - async fn verify_object_integrity(&self, bucket: &str, object: &str) -> Result<()> { + async fn verify_object_integrity(&self, bucket: &str, object: &str, file_info_hint: Option<&FileInfo>) -> Result<()> { debug!("Starting verify_object_integrity for {}/{}", bucket, object); let config = self.config.read().await; @@ -1045,9 +1398,10 @@ impl Scanner { // First check whether the object still logically exists. // If it's already deleted (e.g., non-versioned bucket), do not trigger heal. let object_opts = ecstore::store_api::ObjectOptions::default(); + let mut object_info_cache: Option = None; match ecstore.get_object_info(bucket, object, &object_opts).await { - Ok(_) => { - // Object exists logically, continue with verification below + Ok(info) => { + object_info_cache = Some(info); } Err(e) => { if matches!(e, ecstore::error::StorageError::ObjectNotFound(_, _)) { @@ -1070,7 +1424,10 @@ impl Scanner { Ok(_) => { debug!("Standard verification passed for {}/{}", bucket, object); // Standard verification passed, now check for missing data parts - match self.check_data_parts_integrity(bucket, object).await { + match self + .check_data_parts_integrity(bucket, object, object_info_cache.as_ref(), file_info_hint) + .await + { Ok(_) => { // Object is completely healthy debug!("Data parts integrity check passed for {}/{}", bucket, object); @@ -1144,7 +1501,10 @@ impl Scanner { // Object is accessible, but let's still check data parts integrity // to catch real issues like missing data files - match self.check_data_parts_integrity(bucket, object).await { + match self + .check_data_parts_integrity(bucket, object, object_info_cache.as_ref(), file_info_hint) + .await + { Ok(_) => { debug!("Object {}/{} accessible and data parts intact - treating as healthy", bucket, object); self.metrics.increment_healthy_objects(); @@ -1175,13 +1535,15 @@ impl Scanner { if integrity_failed { self.metrics.increment_corrupted_objects(); - if let Some(heal_manager) = &self.heal_manager { + if let Some(heal_manager) = self.acquire_heal_manager() { debug!("Submitting heal request for {}/{}", bucket, object); let heal_request = HealRequest::object(bucket.to_string(), object.to_string(), None); if let Err(e) = heal_manager.submit_heal_request(heal_request).await { error!("Failed to submit heal task for {}/{}: {}", bucket, object, e); + self.metrics.increment_heal_tasks_failed(1); } else { debug!("Successfully submitted heal request for {}/{}", bucket, object); + self.record_heal_submission(Some(bucket)).await; } } else { debug!("No heal manager available for {}/{}", bucket, object); @@ -1197,92 +1559,73 @@ impl Scanner { /// Check data parts integrity by verifying all parts exist on disks #[allow(dead_code)] - async fn check_data_parts_integrity(&self, bucket: &str, object: &str) -> Result<()> { + async fn check_data_parts_integrity( + &self, + bucket: &str, + object: &str, + object_info_hint: Option<&rustfs_ecstore::store_api::ObjectInfo>, + file_info_hint: Option<&FileInfo>, + ) -> Result<()> { debug!("Checking data parts integrity for {}/{}", bucket, object); if let Some(ecstore) = rustfs_ecstore::new_object_layer_fn() { - // Get object info - let object_info = match ecstore.get_object_info(bucket, object, &Default::default()).await { - Ok(info) => info, - Err(e) => { - return Err(Error::Other(format!("Failed to get object info: {e}"))); - } + // Get object info from cache or load it + let object_info_cow = if let Some(info) = object_info_hint { + Cow::Borrowed(info) + } else { + Cow::Owned( + ecstore + .get_object_info(bucket, object, &Default::default()) + .await + .map_err(|e| Error::Other(format!("Failed to get object info: {e}")))?, + ) + }; + // Get file info from hint or read from disks + let file_info_cow = if let Some(info) = file_info_hint { + Cow::Borrowed(info) + } else { + Cow::Owned(self.load_file_info_from_disks(&ecstore, bucket, object).await?) }; debug!( "Object info for {}/{}: data_blocks={}, parity_blocks={}, parts={}", bucket, object, - object_info.data_blocks, - object_info.parity_blocks, - object_info.parts.len() + object_info_cow.data_blocks, + object_info_cow.parity_blocks, + object_info_cow.parts.len() ); - // Create FileInfo from ObjectInfo - let file_info = rustfs_filemeta::FileInfo { - volume: bucket.to_string(), - name: object.to_string(), - version_id: object_info.version_id, - is_latest: object_info.is_latest, - deleted: object_info.delete_marker, - size: object_info.size, - mod_time: object_info.mod_time, - parts: object_info - .parts - .iter() - .map(|p| rustfs_filemeta::ObjectPartInfo { - etag: p.etag.clone(), - number: 0, // Will be set by erasure info - size: p.size, - actual_size: p.actual_size, - mod_time: p.mod_time, - index: p.index.clone(), - checksums: p.checksums.clone(), - error: None, - }) - .collect(), - erasure: rustfs_filemeta::ErasureInfo { - algorithm: "ReedSolomon".to_string(), - data_blocks: object_info.data_blocks, - parity_blocks: object_info.parity_blocks, - block_size: 0, // Default value - index: 1, // Default index - distribution: (1..=object_info.data_blocks + object_info.parity_blocks).collect(), - checksums: vec![], - }, - ..Default::default() - }; - debug!( "Object {}/{}: data_blocks={}, parity_blocks={}, parts={}", bucket, object, - object_info.data_blocks, - object_info.parity_blocks, - object_info.parts.len() + object_info_cow.data_blocks, + object_info_cow.parity_blocks, + object_info_cow.parts.len() ); // Check if this is an EC object or regular object // In the test environment, objects might have data_blocks=0 and parity_blocks=0 // but still be stored in EC mode. We need to be more lenient. - let is_ec_object = object_info.data_blocks > 0 && object_info.parity_blocks > 0; + let is_ec_object = object_info_cow.data_blocks > 0 && object_info_cow.parity_blocks > 0; if is_ec_object { debug!( "Treating {}/{} as EC object with data_blocks={}, parity_blocks={}", - bucket, object, object_info.data_blocks, object_info.parity_blocks + bucket, object, object_info_cow.data_blocks, object_info_cow.parity_blocks ); // For EC objects, use EC-aware integrity checking - self.check_ec_object_integrity(&ecstore, bucket, object, &object_info, &file_info) + self.check_ec_object_integrity(&ecstore, bucket, object, &object_info_cow, &file_info_cow) .await } else { debug!( "Treating {}/{} as regular object stored in EC system (data_blocks={}, parity_blocks={})", - bucket, object, object_info.data_blocks, object_info.parity_blocks + bucket, object, object_info_cow.data_blocks, object_info_cow.parity_blocks ); // For regular objects in EC storage, we should be more lenient // In EC storage, missing parts on some disks is normal - self.check_ec_stored_object_integrity(&ecstore, bucket, object, &file_info) + self.check_ec_stored_object_integrity(&ecstore, bucket, object, &file_info_cow) .await } } else { @@ -1290,6 +1633,39 @@ impl Scanner { } } + async fn load_file_info_from_disks( + &self, + ecstore: &rustfs_ecstore::store::ECStore, + bucket: &str, + object: &str, + ) -> Result { + for (pool_idx, pool_disks) in &ecstore.disk_map { + for (disk_idx, disk_opt) in pool_disks.iter().enumerate() { + if let Some(disk) = disk_opt { + match disk.read_xl(bucket, object, false).await { + Ok(raw) => match file_info_from_raw(raw, bucket, object, false).await { + Ok(info) => return Ok(info), + Err(err) => { + debug!( + "Failed to decode xl.meta for {}/{} on pool {} disk {}: {}", + bucket, object, pool_idx, disk_idx, err + ); + } + }, + Err(err) => { + debug!( + "Failed to read xl.meta for {}/{} on pool {} disk {}: {}", + bucket, object, pool_idx, disk_idx, err + ); + } + } + } + } + } + + Err(Error::Other(format!("Unable to load file info for {}/{} from any disk", bucket, object))) + } + /// Check integrity for EC (erasure coded) objects #[allow(dead_code)] async fn check_ec_object_integrity( @@ -1411,6 +1787,12 @@ impl Scanner { ))); } + if missing_parts_found > 0 { + return Err(Error::Other(format!( + "Object has missing data parts across disks: {bucket}/{object} (missing parts: {missing_parts_found})" + ))); + } + // Special case: if this is a single-part object and we have missing parts on multiple disks, // it might indicate actual data loss rather than normal EC distribution if object_info.parts.len() == 1 && missing_parts_found > (total_disks_checked / 2) { @@ -1548,6 +1930,7 @@ impl Scanner { // Get online disks from this EC set let (disks, _) = set_disks.get_online_disks_with_healing(false).await; + let disks_for_analysis = disks.clone(); // Check volume consistency across disks and heal missing buckets if !disks.is_empty() { @@ -1616,6 +1999,11 @@ impl Scanner { set_index, pool_index, successful_scans, failed_scans ); + // Analyze object distribution across disks to detect missing metadata/objects + if let Err(err) = self.analyze_object_distribution(&all_disk_objects, &disks_for_analysis).await { + error!("Failed to analyze object distribution for pool {} set {}: {}", pool_index, set_index, err); + } + Ok(all_disk_objects) } @@ -1623,7 +2011,6 @@ impl Scanner { #[allow(dead_code)] async fn scan_disk(&self, disk: &DiskStore) -> Result>> { let disk_path = disk.path().to_string_lossy().to_string(); - // Start global metrics collection for disk scan let stop_fn = Metrics::time(Metric::ScanBucketDrive); @@ -1656,40 +2043,12 @@ impl Scanner { // check disk status, if offline, submit erasure set heal task if !metrics.is_online { - let enable_healing = self.config.read().await.enable_healing; - if enable_healing { - if let Some(heal_manager) = &self.heal_manager { - // Get bucket list for erasure set healing - let buckets = match rustfs_ecstore::new_object_layer_fn() { - Some(ecstore) => match ecstore.list_bucket(&ecstore::store_api::BucketOptions::default()).await { - Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::>(), - Err(e) => { - error!("Failed to get bucket list for disk healing: {}", e); - return Err(Error::Storage(e)); - } - }, - None => { - error!("No ECStore available for getting bucket list"); - return Err(Error::Storage(ecstore::error::StorageError::other("No ECStore available"))); - } - }; - - let set_disk_id = format!("pool_{}_set_{}", disk.endpoint().pool_idx, disk.endpoint().set_idx); - let req = HealRequest::new( - crate::heal::task::HealType::ErasureSet { buckets, set_disk_id }, - crate::heal::task::HealOptions::default(), - crate::heal::task::HealPriority::High, - ); - match heal_manager.submit_heal_request(req).await { - Ok(task_id) => { - warn!("disk offline, submit erasure set heal task: {} {}", task_id, disk_path); - } - Err(e) => { - error!("disk offline, submit erasure set heal task failed: {} {}", disk_path, e); - } - } - } - } + self.submit_erasure_set_heal_for_disk( + disk, + crate::heal::task::HealPriority::High, + "disk offline detected during scan", + ) + .await; } // Additional disk info for debugging @@ -1712,48 +2071,19 @@ impl Scanner { Err(e) => { error!("Failed to list volumes on disk {}: {}", disk_path, e); - // disk access failed, submit erasure set heal task - let enable_healing = self.config.read().await.enable_healing; - if enable_healing { - if let Some(heal_manager) = &self.heal_manager { - // Get bucket list for erasure set healing - let buckets = match rustfs_ecstore::new_object_layer_fn() { - Some(ecstore) => match ecstore.list_bucket(&ecstore::store_api::BucketOptions::default()).await { - Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::>(), - Err(e) => { - error!("Failed to get bucket list for disk healing: {}", e); - return Err(Error::Storage(e)); - } - }, - None => { - error!("No ECStore available for getting bucket list"); - return Err(Error::Storage(ecstore::error::StorageError::other("No ECStore available"))); - } - }; - - let set_disk_id = format!("pool_{}_set_{}", disk.endpoint().pool_idx, disk.endpoint().set_idx); - let req = HealRequest::new( - crate::heal::task::HealType::ErasureSet { buckets, set_disk_id }, - crate::heal::task::HealOptions::default(), - crate::heal::task::HealPriority::Urgent, - ); - match heal_manager.submit_heal_request(req).await { - Ok(task_id) => { - warn!("disk access failed, submit erasure set heal task: {} {}", task_id, disk_path); - } - Err(heal_err) => { - error!("disk access failed, submit erasure set heal task failed: {} {}", disk_path, heal_err); - } - } - } - } - + self.submit_erasure_set_heal_for_disk( + disk, + crate::heal::task::HealPriority::Urgent, + "disk access failed while listing volumes", + ) + .await; return Err(Error::Storage(e.into())); } }; // Scan each volume and collect object metadata let mut disk_objects = HashMap::new(); + let mut submitted_volume_access_heal = false; for volume in volumes { // check cancel token if let Some(cancel_token) = get_ahm_services_cancel_token() { @@ -1769,6 +2099,15 @@ impl Scanner { } Err(e) => { error!("Failed to scan volume {} on disk {}: {}", volume.name, disk_path, e); + if !submitted_volume_access_heal { + self.submit_erasure_set_heal_for_disk( + disk, + crate::heal::task::HealPriority::High, + &format!("volume scan failed for {}", volume.name), + ) + .await; + submitted_volume_access_heal = true; + } continue; } } @@ -1796,6 +2135,52 @@ impl Scanner { Ok(disk_objects) } + async fn submit_erasure_set_heal_for_disk(&self, disk: &DiskStore, priority: crate::heal::task::HealPriority, reason: &str) { + if !self.config.read().await.enable_healing { + return; + } + + let Some(heal_manager) = self.acquire_heal_manager() else { + warn!( + "{} -> HealManager unavailable, skipping erasure set heal for disk {}", + reason, + disk.to_string() + ); + return; + }; + + let buckets = match rustfs_ecstore::new_object_layer_fn() { + Some(ecstore) => match ecstore.list_bucket(&ecstore::store_api::BucketOptions::default()).await { + Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::>(), + Err(e) => { + error!("Failed to list buckets for erasure set heal ({}): {}", reason, e); + return; + } + }, + None => { + error!("No ECStore available while building erasure set heal request ({})", reason); + return; + } + }; + + let set_disk_id = format!("pool_{}_set_{}", disk.endpoint().pool_idx, disk.endpoint().set_idx); + let req = HealRequest::new( + crate::heal::task::HealType::ErasureSet { buckets, set_disk_id }, + crate::heal::task::HealOptions::default(), + priority, + ); + match heal_manager.submit_heal_request(req).await { + Ok(task_id) => { + warn!("{} -> submitted erasure set heal task {} for disk {}", reason, task_id, disk.to_string()); + self.record_heal_submission(None).await; + } + Err(e) => { + error!("{} -> failed to submit erasure set heal for disk {}: {}", reason, disk.to_string(), e); + self.metrics.increment_heal_tasks_failed(1); + } + } + } + /// Scan a single volume (bucket) and collect object information /// /// This method collects all objects from a disk for a specific bucket. @@ -1892,7 +2277,7 @@ impl Scanner { // object metadata damaged, submit metadata heal task let enable_healing = self.config.read().await.enable_healing; if enable_healing { - if let Some(heal_manager) = &self.heal_manager { + if let Some(heal_manager) = self.acquire_heal_manager() { let req = HealRequest::metadata(bucket.to_string(), entry.name.clone()); match heal_manager.submit_heal_request(req).await { Ok(task_id) => { @@ -1900,12 +2285,14 @@ impl Scanner { "object metadata damaged, submit heal task: {} {} / {}", task_id, bucket, entry.name ); + self.record_heal_submission(Some(bucket)).await; } Err(e) => { error!( "object metadata damaged, submit heal task failed: {} / {} {}", bucket, entry.name, e ); + self.metrics.increment_heal_tasks_failed(1); } } } @@ -1916,13 +2303,24 @@ impl Scanner { if let Disk::Local(_local_disk) = &**disk { let vcfg = BucketVersioningSys::get(bucket).await.ok(); - let mut scanner_item = ScannerItem { - bucket: bucket.to_string(), - object_name: entry.name.clone(), - lifecycle: Some(lifecycle_config.clone()), - versioning: versioning_config.clone(), + let object_lock_config = BucketObjectLockSys::get(bucket).await; + let replication_metrics = Some(ReplicationMetricsHandle::new( + Arc::clone(&self.metrics), + Arc::clone(&self.bucket_metrics), + )); + let replication_pending_grace = { + let cfg = self.config.read().await; + cfg.replication_pending_grace }; - //ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone()); + let mut scanner_item = ScannerItem::new( + bucket.to_string(), + Some(lifecycle_config.clone()), + versioning_config.clone(), + object_lock_config, + replication_pending_grace, + replication_metrics, + ); + scanner_item.object_name = entry.name.clone(); let fivs = match entry.clone().file_info_versions(&scanner_item.bucket) { Ok(fivs) => fivs, Err(_err) => { @@ -2001,7 +2399,7 @@ impl Scanner { // object metadata parse failed, submit metadata heal task let enable_healing = self.config.read().await.enable_healing; if enable_healing { - if let Some(heal_manager) = &self.heal_manager { + if let Some(heal_manager) = self.acquire_heal_manager() { let req = HealRequest::metadata(bucket.to_string(), entry.name.clone()); match heal_manager.submit_heal_request(req).await { Ok(task_id) => { @@ -2009,12 +2407,14 @@ impl Scanner { "object metadata parse failed, submit heal task: {} {} / {}", task_id, bucket, entry.name ); + self.record_heal_submission(Some(bucket)).await; } Err(e) => { error!( "object metadata parse failed, submit heal task failed: {} / {} {}", bucket, entry.name, e ); + self.metrics.increment_heal_tasks_failed(1); } } } @@ -2185,7 +2585,7 @@ impl Scanner { // submit heal task let enable_healing = self.config.read().await.enable_healing; if enable_healing { - if let Some(heal_manager) = &self.heal_manager { + if let Some(heal_manager) = self.acquire_heal_manager() { use crate::heal::{HealPriority, HealRequest}; let req = HealRequest::new( crate::heal::HealType::Object { @@ -2202,9 +2602,11 @@ impl Scanner { "object missing, submit heal task: {} {} / {} (missing disks: {:?})", task_id, bucket, object_name, missing_disks ); + self.record_heal_submission(Some(bucket)).await; } Err(e) => { error!("object missing, submit heal task failed: {} / {} {}", bucket, object_name, e); + self.metrics.increment_heal_tasks_failed(1); } } } @@ -2214,7 +2616,16 @@ impl Scanner { // Step 3: Deep scan EC verification let config = self.config.read().await; if config.scan_mode == ScanMode::Deep { - if let Err(e) = self.verify_object_integrity(bucket, object_name).await { + let file_info_hint = all_disk_objects.iter().find_map(|disk_map| { + disk_map + .get(bucket) + .and_then(|object_map| object_map.get(object_name)) + .and_then(|meta| meta.into_fileinfo(bucket, object_name, "", false, false).ok()) + }); + if let Err(e) = self + .verify_object_integrity(bucket, object_name, file_info_hint.as_ref()) + .await + { objects_with_ec_issues += 1; warn!("Object integrity verification failed for object {}/{}: {}", bucket, object_name, e); } @@ -2240,7 +2651,6 @@ impl Scanner { } /// Background scan loop with graceful shutdown - #[allow(dead_code)] async fn scan_loop(self) -> Result<()> { let config = self.config.read().await; let mut interval = tokio::time::interval(config.scan_interval); @@ -2413,7 +2823,7 @@ impl Scanner { // Step 3: Submit heal tasks for missing buckets let enable_healing = self.config.read().await.enable_healing; if enable_healing { - if let Some(heal_manager) = &self.heal_manager { + if let Some(heal_manager) = self.acquire_heal_manager() { for bucket in missing_buckets { let req = crate::heal::HealRequest::bucket(bucket.clone()); match heal_manager.submit_heal_request(req).await { @@ -2422,9 +2832,11 @@ impl Scanner { "Submitted bucket heal task {} for missing bucket '{}' on disk {}", task_id, bucket, disk_idx ); + self.record_heal_submission(Some(bucket)).await; } Err(e) => { error!("Failed to submit bucket heal task for '{}' on disk {}: {}", bucket, disk_idx, e); + self.metrics.increment_heal_tasks_failed(1); } } } @@ -2554,6 +2966,7 @@ impl Scanner { disk_metrics: Arc::clone(&self.disk_metrics), data_usage_stats: Arc::clone(&self.data_usage_stats), last_data_usage_collection: Arc::clone(&self.last_data_usage_collection), + realtime_usage_cache: Arc::clone(&self.realtime_usage_cache), heal_manager: self.heal_manager.clone(), node_scanner: Arc::clone(&self.node_scanner), stats_aggregator: Arc::clone(&self.stats_aggregator), @@ -3570,4 +3983,80 @@ mod tests { // Clean up let _ = std::fs::remove_dir_all(std::path::Path::new(TEST_DIR_HEALTHY)); } + + #[test] + fn realtime_usage_cache_batches_wrap_in_order() { + let mut cache = RealtimeUsageCache::default(); + let buckets = vec!["alpha".to_string(), "beta".to_string(), "gamma".to_string()]; + cache.align_with_bucket_list(&buckets); + + let first = cache.next_batch(2); + assert_eq!(first, vec!["alpha".to_string(), "beta".to_string()]); + + let second = cache.next_batch(2); + assert_eq!(second, vec!["gamma".to_string(), "alpha".to_string()]); + } + + #[test] + fn realtime_usage_cache_builds_usage_info() { + let mut cache = RealtimeUsageCache::default(); + let buckets = vec!["bucket-a".to_string(), "bucket-b".to_string()]; + cache.align_with_bucket_list(&buckets); + + cache.upsert_bucket_usage( + "bucket-a", + BucketUsageInfo { + size: 128, + objects_count: 4, + versions_count: 4, + ..Default::default() + }, + ); + cache.upsert_bucket_usage( + "bucket-b", + BucketUsageInfo { + size: 64, + objects_count: 1, + versions_count: 1, + ..Default::default() + }, + ); + + let usage = cache.build_usage_info(&buckets); + assert_eq!(usage.buckets_count, 2); + assert_eq!(usage.objects_total_count, 5); + assert_eq!(usage.objects_total_size, 192); + assert!(usage.buckets_usage.contains_key("bucket-a")); + assert!(usage.buckets_usage.contains_key("bucket-b")); + } + + #[test] + fn realtime_usage_cache_records_snapshots() { + let mut cache = RealtimeUsageCache::default(); + let mut snapshot = DataUsageInfo::default(); + snapshot.buckets_usage.insert( + "primary".to_string(), + BucketUsageInfo { + size: 256, + objects_count: 10, + versions_count: 10, + ..Default::default() + }, + ); + snapshot.buckets_usage.insert( + "secondary".to_string(), + BucketUsageInfo { + size: 512, + objects_count: 3, + versions_count: 3, + ..Default::default() + }, + ); + + cache.record_snapshot(&snapshot); + assert_eq!(cache.bucket_order.len(), 2); + assert!(cache.bucket_order.contains(&"primary".to_string())); + assert!(cache.bucket_order.contains(&"secondary".to_string())); + assert_eq!(cache.bucket_cursor, 0); + } } diff --git a/crates/ahm/src/scanner/lifecycle.rs b/crates/ahm/src/scanner/lifecycle.rs index 9d410b26..79bf330b 100644 --- a/crates/ahm/src/scanner/lifecycle.rs +++ b/crates/ahm/src/scanner/lifecycle.rs @@ -12,9 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::Result; +use crate::{ + Result, + scanner::metrics::{BucketMetrics, MetricsCollector}, +}; use rustfs_common::data_usage::SizeSummary; use rustfs_common::metrics::IlmAction; +use rustfs_ecstore::bucket::replication::{GLOBAL_REPLICATION_POOL, ReplicationConfig, get_heal_replicate_object_info}; use rustfs_ecstore::bucket::{ lifecycle::{ bucket_lifecycle_audit::LcEventSrc, @@ -22,20 +26,27 @@ use rustfs_ecstore::bucket::{ lifecycle, lifecycle::Lifecycle, }, - metadata_sys::get_object_lock_config, + metadata_sys::{get_bucket_targets_config, get_object_lock_config, get_replication_config}, object_lock::objectlock_sys::{BucketObjectLockSys, enforce_retention_for_deletion}, versioning::VersioningApi, versioning_sys::BucketVersioningSys, }; use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete}; -use rustfs_filemeta::FileInfo; +use rustfs_filemeta::{FileInfo, ReplicationStatusType, replication_statuses_map}; +use rustfs_utils::http::headers::{AMZ_BUCKET_REPLICATION_STATUS, HeaderExt, VERSION_PURGE_STATUS_KEY}; +use s3s::dto::DefaultRetention; use s3s::dto::{BucketLifecycleConfiguration as LifecycleConfig, VersioningConfiguration}; -use std::sync::{ - Arc, - atomic::{AtomicU64, Ordering}, +use std::{ + collections::HashMap, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::Duration as StdDuration, }; -use time::OffsetDateTime; -use tracing::info; +use time::{Duration as TimeDuration, OffsetDateTime}; +use tokio::sync::Mutex; +use tracing::{debug, info, warn}; static SCANNER_EXCESS_OBJECT_VERSIONS: AtomicU64 = AtomicU64::new(100); static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(1024 * 1024 * 1024 * 1024); // 1 TB @@ -46,19 +57,89 @@ pub struct ScannerItem { pub object_name: String, pub lifecycle: Option>, pub versioning: Option>, + pub object_lock_config: Option, + pub replication_pending_grace: StdDuration, + pub replication_metrics: Option, +} + +#[derive(Clone)] +pub struct ReplicationMetricsHandle { + inner: Arc, +} + +struct ReplicationMetricsInner { + metrics: Arc, + bucket_metrics: Arc>>, +} + +impl ReplicationMetricsHandle { + pub fn new(metrics: Arc, bucket_metrics: Arc>>) -> Self { + Self { + inner: Arc::new(ReplicationMetricsInner { metrics, bucket_metrics }), + } + } + + pub async fn record_status(&self, bucket: &str, status: ReplicationStatusType, lagging: bool) { + match status { + ReplicationStatusType::Pending => self.inner.metrics.increment_replication_pending_objects(1), + ReplicationStatusType::Failed => self.inner.metrics.increment_replication_failed_objects(1), + _ => {} + } + if lagging { + self.inner.metrics.increment_replication_lagging_objects(1); + } + + let mut guard = self.inner.bucket_metrics.lock().await; + let entry = guard.entry(bucket.to_string()).or_insert_with(|| BucketMetrics { + bucket: bucket.to_string(), + ..Default::default() + }); + + match status { + ReplicationStatusType::Pending => { + entry.replication_pending = entry.replication_pending.saturating_add(1); + } + ReplicationStatusType::Failed => { + entry.replication_failed = entry.replication_failed.saturating_add(1); + } + _ => {} + } + + if lagging { + entry.replication_lagging = entry.replication_lagging.saturating_add(1); + } + } + + pub async fn record_task_submission(&self, bucket: &str) { + self.inner.metrics.increment_replication_tasks_queued(1); + let mut guard = self.inner.bucket_metrics.lock().await; + let entry = guard.entry(bucket.to_string()).or_insert_with(|| BucketMetrics { + bucket: bucket.to_string(), + ..Default::default() + }); + entry.replication_tasks_queued = entry.replication_tasks_queued.saturating_add(1); + } } impl ScannerItem { + const INTERNAL_REPLICATION_STATUS_KEY: &'static str = "x-rustfs-internal-replication-status"; + pub fn new( bucket: String, lifecycle: Option>, versioning: Option>, + object_lock_config: Option, + replication_pending_grace: StdDuration, + replication_metrics: Option, ) -> Self { Self { bucket, object_name: "".to_string(), lifecycle, versioning, + object_lock_config, + replication_pending_grace, + replication_metrics, } } @@ -164,6 +245,23 @@ impl ScannerItem { } pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, i64) { + let object_locked = self.is_object_lock_protected(oi); + + if let Err(err) = self.heal_replication(oi).await { + warn!( + "heal_replication failed for {}/{} (version {:?}): {}", + oi.bucket, oi.name, oi.version_id, err + ); + } + + if object_locked { + info!( + "apply_actions: Skipping lifecycle for {}/{} because object lock retention or legal hold is active", + oi.bucket, oi.name + ); + return (false, oi.size); + } + let (action, _size) = self.apply_lifecycle(oi).await; info!( @@ -174,16 +272,6 @@ impl ScannerItem { oi.user_defined.clone() ); - // Create a mutable clone if you need to modify fields - /*let mut oi = oi.clone(); - oi.replication_status = ReplicationStatusType::from( - oi.user_defined - .get("x-amz-bucket-replication-status") - .unwrap_or(&"PENDING".to_string()), - ); - info!("apply status is: {:?}", oi.replication_status); - self.heal_replication(&oi, _size_s).await;*/ - if action.delete_all() { return (true, 0); } @@ -266,4 +354,188 @@ impl ScannerItem { (lc_evt.action, new_size) } + + fn is_object_lock_protected(&self, oi: &ObjectInfo) -> bool { + enforce_retention_for_deletion(oi) + } + + async fn heal_replication(&self, oi: &ObjectInfo) -> Result<()> { + let enriched = Self::hydrate_replication_metadata(oi); + let pending_lagging = self.is_pending_lagging(&enriched); + + if let Some(handle) = &self.replication_metrics { + handle + .record_status(&self.bucket, enriched.replication_status.clone(), pending_lagging) + .await; + } + + debug!( + "heal_replication: evaluating {}/{} with status {:?} and internal {:?}", + enriched.bucket, enriched.name, enriched.replication_status, enriched.replication_status_internal + ); + + if !self.needs_replication_heal(&enriched, pending_lagging) { + return Ok(()); + } + + let replication_cfg = match get_replication_config(&self.bucket).await { + Ok((cfg, _)) => Some(cfg), + Err(err) => { + debug!("heal_replication: failed to fetch replication config for bucket {}: {}", self.bucket, err); + None + } + }; + + if replication_cfg.is_none() { + return Ok(()); + } + + let bucket_targets = match get_bucket_targets_config(&self.bucket).await { + Ok(targets) => Some(targets), + Err(err) => { + debug!("heal_replication: no bucket targets for bucket {}: {}", self.bucket, err); + None + } + }; + + let replication_cfg = ReplicationConfig::new(replication_cfg, bucket_targets); + if replication_cfg.is_empty() { + return Ok(()); + } + + let replicate_info = get_heal_replicate_object_info(&enriched, &replication_cfg).await; + let should_replicate = replicate_info.dsc.replicate_any() + || matches!( + enriched.replication_status, + ReplicationStatusType::Failed | ReplicationStatusType::Pending + ); + if !should_replicate { + debug!("heal_replication: no actionable targets for {}/{}", enriched.bucket, enriched.name); + return Ok(()); + } + + if let Some(pool) = GLOBAL_REPLICATION_POOL.get() { + pool.queue_replica_task(replicate_info).await; + if let Some(handle) = &self.replication_metrics { + handle.record_task_submission(&self.bucket).await; + } + debug!("heal_replication: queued replication heal task for {}/{}", enriched.bucket, enriched.name); + } else { + warn!( + "heal_replication: GLOBAL_REPLICATION_POOL not initialized, skipping heal for {}/{}", + enriched.bucket, enriched.name + ); + } + + Ok(()) + } + + fn needs_replication_heal(&self, oi: &ObjectInfo, pending_lagging: bool) -> bool { + if matches!(oi.replication_status, ReplicationStatusType::Failed) { + return true; + } + + if pending_lagging && matches!(oi.replication_status, ReplicationStatusType::Pending) { + return true; + } + + if let Some(raw) = oi.replication_status_internal.as_ref() { + let statuses = replication_statuses_map(raw); + if statuses + .values() + .any(|status| matches!(status, ReplicationStatusType::Failed)) + { + return true; + } + + if pending_lagging + && statuses + .values() + .any(|status| matches!(status, ReplicationStatusType::Pending)) + { + return true; + } + } + + false + } + + fn hydrate_replication_metadata(oi: &ObjectInfo) -> ObjectInfo { + let mut enriched = oi.clone(); + + if enriched.replication_status.is_empty() { + if let Some(status) = enriched.user_defined.lookup(AMZ_BUCKET_REPLICATION_STATUS) { + enriched.replication_status = ReplicationStatusType::from(status); + } + } + + if enriched.replication_status_internal.is_none() { + if let Some(raw) = enriched.user_defined.lookup(Self::INTERNAL_REPLICATION_STATUS_KEY) { + if !raw.is_empty() { + enriched.replication_status_internal = Some(raw.to_string()); + } + } + } + + if enriched.version_purge_status_internal.is_none() { + if let Some(raw) = enriched.user_defined.lookup(VERSION_PURGE_STATUS_KEY) { + if !raw.is_empty() { + enriched.version_purge_status_internal = Some(raw.to_string()); + } + } + } + + enriched + } + + fn is_pending_lagging(&self, oi: &ObjectInfo) -> bool { + if !matches!(oi.replication_status, ReplicationStatusType::Pending) { + return false; + } + + let Some(mod_time) = oi.mod_time else { + return false; + }; + + let grace = TimeDuration::try_from(self.replication_pending_grace).unwrap_or_else(|_| TimeDuration::seconds(0)); + if grace.is_zero() { + return true; + } + + let elapsed = OffsetDateTime::now_utc() - mod_time; + elapsed >= grace + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn replication_metrics_handle_tracks_counts() { + let metrics = Arc::new(MetricsCollector::new()); + let bucket_metrics = Arc::new(Mutex::new(HashMap::new())); + let handle = ReplicationMetricsHandle::new(metrics.clone(), bucket_metrics.clone()); + + handle + .record_status("test-bucket", ReplicationStatusType::Pending, true) + .await; + handle + .record_status("test-bucket", ReplicationStatusType::Failed, false) + .await; + handle.record_task_submission("test-bucket").await; + + let snapshot = metrics.get_metrics(); + assert_eq!(snapshot.replication_pending_objects, 1); + assert_eq!(snapshot.replication_failed_objects, 1); + assert_eq!(snapshot.replication_lagging_objects, 1); + assert_eq!(snapshot.replication_tasks_queued, 1); + + let guard = bucket_metrics.lock().await; + let bucket_entry = guard.get("test-bucket").expect("bucket metrics exists"); + assert_eq!(bucket_entry.replication_pending, 1); + assert_eq!(bucket_entry.replication_failed, 1); + assert_eq!(bucket_entry.replication_lagging, 1); + assert_eq!(bucket_entry.replication_tasks_queued, 1); + } } diff --git a/crates/ahm/src/scanner/local_scan/mod.rs b/crates/ahm/src/scanner/local_scan/mod.rs index 7e31d711..f8fb77f8 100644 --- a/crates/ahm/src/scanner/local_scan/mod.rs +++ b/crates/ahm/src/scanner/local_scan/mod.rs @@ -62,6 +62,7 @@ struct DiskScanResult { pub struct LocalObjectRecord { pub usage: LocalObjectUsage, pub object_info: Option, + pub file_info: Option, } #[derive(Debug, Default)] @@ -256,6 +257,7 @@ fn scan_disk_blocking(root: PathBuf, meta: LocalUsageSnapshotMeta, mut state: In .push(LocalObjectRecord { usage: usage.clone(), object_info: None, + file_info: None, }); } @@ -319,6 +321,7 @@ fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Res let versioned = fi.version_id.is_some(); ObjectInfo::from_file_info(fi, bucket, object, versioned) }); + let file_info = latest_file_info.clone(); Ok(Some(LocalObjectRecord { usage: LocalObjectUsage { @@ -331,6 +334,7 @@ fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Res has_live_object, }, object_info, + file_info, })) } diff --git a/crates/ahm/src/scanner/metrics.rs b/crates/ahm/src/scanner/metrics.rs index e5ee6ede..619506ad 100644 --- a/crates/ahm/src/scanner/metrics.rs +++ b/crates/ahm/src/scanner/metrics.rs @@ -45,6 +45,14 @@ pub struct ScannerMetrics { pub healthy_objects: u64, /// Total corrupted objects found pub corrupted_objects: u64, + /// Replication heal tasks queued + pub replication_tasks_queued: u64, + /// Objects observed with pending replication + pub replication_pending_objects: u64, + /// Objects observed with failed replication + pub replication_failed_objects: u64, + /// Objects with replication pending longer than grace period + pub replication_lagging_objects: u64, /// Last scan activity time pub last_activity: Option, /// Current scan cycle @@ -86,6 +94,14 @@ pub struct BucketMetrics { pub heal_tasks_completed: u64, /// Heal tasks failed for this bucket pub heal_tasks_failed: u64, + /// Objects observed with pending replication status + pub replication_pending: u64, + /// Objects observed with failed replication status + pub replication_failed: u64, + /// Objects exceeding replication grace period + pub replication_lagging: u64, + /// Replication heal tasks queued for this bucket + pub replication_tasks_queued: u64, } /// Disk-specific metrics @@ -127,6 +143,10 @@ pub struct MetricsCollector { total_cycles: AtomicU64, healthy_objects: AtomicU64, corrupted_objects: AtomicU64, + replication_tasks_queued: AtomicU64, + replication_pending_objects: AtomicU64, + replication_failed_objects: AtomicU64, + replication_lagging_objects: AtomicU64, } impl MetricsCollector { @@ -146,6 +166,10 @@ impl MetricsCollector { total_cycles: AtomicU64::new(0), healthy_objects: AtomicU64::new(0), corrupted_objects: AtomicU64::new(0), + replication_tasks_queued: AtomicU64::new(0), + replication_pending_objects: AtomicU64::new(0), + replication_failed_objects: AtomicU64::new(0), + replication_lagging_objects: AtomicU64::new(0), } } @@ -194,6 +218,26 @@ impl MetricsCollector { self.heal_tasks_failed.fetch_add(count, Ordering::Relaxed); } + /// Increment replication tasks queued + pub fn increment_replication_tasks_queued(&self, count: u64) { + self.replication_tasks_queued.fetch_add(count, Ordering::Relaxed); + } + + /// Increment replication pending objects + pub fn increment_replication_pending_objects(&self, count: u64) { + self.replication_pending_objects.fetch_add(count, Ordering::Relaxed); + } + + /// Increment replication failed objects + pub fn increment_replication_failed_objects(&self, count: u64) { + self.replication_failed_objects.fetch_add(count, Ordering::Relaxed); + } + + /// Increment replication lagging objects + pub fn increment_replication_lagging_objects(&self, count: u64) { + self.replication_lagging_objects.fetch_add(count, Ordering::Relaxed); + } + /// Set current cycle pub fn set_current_cycle(&self, cycle: u64) { self.current_cycle.store(cycle, Ordering::Relaxed); @@ -228,6 +272,10 @@ impl MetricsCollector { heal_tasks_failed: self.heal_tasks_failed.load(Ordering::Relaxed), healthy_objects: self.healthy_objects.load(Ordering::Relaxed), corrupted_objects: self.corrupted_objects.load(Ordering::Relaxed), + replication_tasks_queued: self.replication_tasks_queued.load(Ordering::Relaxed), + replication_pending_objects: self.replication_pending_objects.load(Ordering::Relaxed), + replication_failed_objects: self.replication_failed_objects.load(Ordering::Relaxed), + replication_lagging_objects: self.replication_lagging_objects.load(Ordering::Relaxed), last_activity: Some(SystemTime::now()), current_cycle: self.current_cycle.load(Ordering::Relaxed), total_cycles: self.total_cycles.load(Ordering::Relaxed), @@ -255,6 +303,10 @@ impl MetricsCollector { self.total_cycles.store(0, Ordering::Relaxed); self.healthy_objects.store(0, Ordering::Relaxed); self.corrupted_objects.store(0, Ordering::Relaxed); + self.replication_tasks_queued.store(0, Ordering::Relaxed); + self.replication_pending_objects.store(0, Ordering::Relaxed); + self.replication_failed_objects.store(0, Ordering::Relaxed); + self.replication_lagging_objects.store(0, Ordering::Relaxed); info!("Scanner metrics reset"); } diff --git a/crates/ahm/tests/heal_integration_test.rs b/crates/ahm/tests/heal_integration_test.rs index 85ce694e..35f8185a 100644 --- a/crates/ahm/tests/heal_integration_test.rs +++ b/crates/ahm/tests/heal_integration_test.rs @@ -12,31 +12,52 @@ // See the License for the specific language governing permissions and // limitations under the License. -use rustfs_ahm::heal::{ - manager::{HealConfig, HealManager}, - storage::{ECStoreHealStorage, HealStorageAPI}, - task::{HealOptions, HealPriority, HealRequest, HealTaskStatus, HealType}, +use async_trait::async_trait; +use rustfs_ahm::{ + heal::{ + manager::{HealConfig, HealManager}, + storage::{ECStoreHealStorage, HealStorageAPI}, + task::{HealOptions, HealPriority, HealRequest, HealTaskStatus, HealType}, + }, + scanner::{ScanMode, Scanner}, }; use rustfs_common::heal_channel::{HealOpts, HealScanMode}; +use rustfs_ecstore::bucket::metadata_sys::{self, set_bucket_metadata}; +use rustfs_ecstore::bucket::replication::{ + DeletedObjectReplicationInfo, DynReplicationPool, GLOBAL_REPLICATION_POOL, ReplicationPoolTrait, ReplicationPriority, +}; +use rustfs_ecstore::bucket::target::{BucketTarget, BucketTargetType, BucketTargets}; +use rustfs_ecstore::bucket::utils::serialize; +use rustfs_ecstore::error::Error as EcstoreError; use rustfs_ecstore::{ disk::endpoint::Endpoint, endpoints::{EndpointServerPools, Endpoints, PoolEndpoints}, store::ECStore, store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI}, }; +use rustfs_filemeta::{ReplicateObjectInfo, ReplicationStatusType}; +use rustfs_utils::http::headers::{AMZ_BUCKET_REPLICATION_STATUS, RESERVED_METADATA_PREFIX_LOWER}; +use s3s::dto::{ + BucketVersioningStatus, Destination, ExistingObjectReplication, ExistingObjectReplicationStatus, ReplicationConfiguration, + ReplicationRule, ReplicationRuleStatus, VersioningConfiguration, +}; use serial_test::serial; use std::{ + os::unix::fs::PermissionsExt, path::PathBuf, sync::{Arc, Once, OnceLock}, time::Duration, }; +use time::OffsetDateTime; use tokio::fs; +use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; use tracing::info; use walkdir::WalkDir; static GLOBAL_ENV: OnceLock<(Vec, Arc, Arc)> = OnceLock::new(); static INIT: Once = Once::new(); +const TEST_REPLICATION_TARGET_ARN: &str = "arn:aws:s3:::rustfs-replication-heal-target"; fn init_tracing() { INIT.call_once(|| { @@ -145,6 +166,225 @@ async fn upload_test_object(ecstore: &Arc, bucket: &str, object: &str, info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size); } +fn delete_first_part_file(disk_paths: &[PathBuf], bucket: &str, object: &str) -> PathBuf { + for disk_path in disk_paths { + let obj_dir = disk_path.join(bucket).join(object); + if !obj_dir.exists() { + continue; + } + + if let Some(part_path) = WalkDir::new(&obj_dir) + .min_depth(2) + .max_depth(2) + .into_iter() + .filter_map(Result::ok) + .find(|entry| { + entry.file_type().is_file() + && entry + .file_name() + .to_str() + .map(|name| name.starts_with("part.")) + .unwrap_or(false) + }) + .map(|entry| entry.into_path()) + { + std::fs::remove_file(&part_path).expect("Failed to delete part file"); + return part_path; + } + } + + panic!("Failed to locate part file for {}/{}", bucket, object); +} + +fn delete_xl_meta_file(disk_paths: &[PathBuf], bucket: &str, object: &str) -> PathBuf { + for disk_path in disk_paths { + let xl_meta_path = disk_path.join(bucket).join(object).join("xl.meta"); + if xl_meta_path.exists() { + std::fs::remove_file(&xl_meta_path).expect("Failed to delete xl.meta file"); + return xl_meta_path; + } + } + + panic!("Failed to locate xl.meta for {}/{}", bucket, object); +} + +struct FormatPathGuard { + original: PathBuf, + backup: PathBuf, +} + +impl FormatPathGuard { + fn new(original: PathBuf) -> std::io::Result { + let backup = original.with_extension("bak"); + if backup.exists() { + std::fs::remove_file(&backup)?; + } + std::fs::rename(&original, &backup)?; + Ok(Self { original, backup }) + } +} + +impl Drop for FormatPathGuard { + fn drop(&mut self) { + if self.backup.exists() { + let _ = std::fs::rename(&self.backup, &self.original); + } + } +} + +struct PermissionGuard { + path: PathBuf, + original_mode: u32, +} + +impl PermissionGuard { + fn new(path: PathBuf, new_mode: u32) -> std::io::Result { + let metadata = std::fs::metadata(&path)?; + let original_mode = metadata.permissions().mode(); + std::fs::set_permissions(&path, std::fs::Permissions::from_mode(new_mode))?; + Ok(Self { path, original_mode }) + } +} + +impl Drop for PermissionGuard { + fn drop(&mut self) { + if self.path.exists() { + let _ = std::fs::set_permissions(&self.path, std::fs::Permissions::from_mode(self.original_mode)); + } + } +} + +#[derive(Debug, Default)] +struct RecordingReplicationPool { + replica_tasks: Mutex>, + delete_tasks: Mutex>, +} + +impl RecordingReplicationPool { + async fn take_replica_tasks(&self) -> Vec { + let mut guard = self.replica_tasks.lock().await; + guard.drain(..).collect() + } + + async fn clear(&self) { + self.replica_tasks.lock().await.clear(); + self.delete_tasks.lock().await.clear(); + } +} + +#[async_trait] +impl ReplicationPoolTrait for RecordingReplicationPool { + async fn queue_replica_task(&self, ri: ReplicateObjectInfo) { + self.replica_tasks.lock().await.push(ri); + } + + async fn queue_replica_delete_task(&self, ri: DeletedObjectReplicationInfo) { + self.delete_tasks.lock().await.push(ri); + } + + async fn resize(&self, _priority: ReplicationPriority, _max_workers: usize, _max_l_workers: usize) {} + + async fn init_resync( + self: Arc, + _cancellation_token: CancellationToken, + _buckets: Vec, + ) -> Result<(), EcstoreError> { + Ok(()) + } +} + +async fn ensure_test_replication_pool() -> Arc { + static TEST_POOL: OnceLock> = OnceLock::new(); + + if let Some(pool) = TEST_POOL.get() { + pool.clear().await; + return pool.clone(); + } + + let pool = Arc::new(RecordingReplicationPool::default()); + let dyn_pool: Arc = pool.clone(); + let global_pool = GLOBAL_REPLICATION_POOL + .get_or_init(|| { + let pool_clone = dyn_pool.clone(); + async move { pool_clone } + }) + .await + .clone(); + + assert!( + Arc::ptr_eq(&dyn_pool, &global_pool), + "GLOBAL_REPLICATION_POOL initialized before test replication pool" + ); + + let _ = TEST_POOL.set(pool.clone()); + pool.clear().await; + pool +} + +async fn configure_bucket_replication(bucket: &str, target_arn: &str) { + let meta = metadata_sys::get(bucket) + .await + .expect("bucket metadata should exist for replication configuration"); + let mut metadata = (*meta).clone(); + + let replication_rule = ReplicationRule { + delete_marker_replication: None, + delete_replication: None, + destination: Destination { + access_control_translation: None, + account: None, + bucket: target_arn.to_string(), + encryption_configuration: None, + metrics: None, + replication_time: None, + storage_class: None, + }, + existing_object_replication: Some(ExistingObjectReplication { + status: ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::ENABLED), + }), + filter: None, + id: Some("heal-replication-rule".to_string()), + prefix: Some(String::new()), + priority: Some(1), + source_selection_criteria: None, + status: ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED), + }; + + let replication_cfg = ReplicationConfiguration { + role: target_arn.to_string(), + rules: vec![replication_rule], + }; + + let bucket_targets = BucketTargets { + targets: vec![BucketTarget { + source_bucket: bucket.to_string(), + endpoint: "replication.invalid".to_string(), + target_bucket: "replication-target".to_string(), + arn: target_arn.to_string(), + target_type: BucketTargetType::ReplicationService, + ..Default::default() + }], + }; + + metadata.replication_config = Some(replication_cfg.clone()); + metadata.replication_config_xml = serialize(&replication_cfg).expect("serialize replication config"); + metadata.replication_config_updated_at = OffsetDateTime::now_utc(); + metadata.bucket_target_config = Some(bucket_targets.clone()); + metadata.bucket_targets_config_json = serde_json::to_vec(&bucket_targets).expect("serialize bucket targets"); + metadata.bucket_targets_config_updated_at = OffsetDateTime::now_utc(); + let versioning_cfg = VersioningConfiguration { + status: Some(BucketVersioningStatus::from_static(BucketVersioningStatus::ENABLED)), + ..Default::default() + }; + metadata.versioning_config = Some(versioning_cfg.clone()); + metadata.versioning_config_xml = serialize(&versioning_cfg).expect("serialize versioning config"); + metadata.versioning_config_updated_at = OffsetDateTime::now_utc(); + + set_bucket_metadata(bucket.to_string(), metadata) + .await + .expect("failed to update bucket metadata for replication"); +} + mod serial_tests { use super::*; @@ -430,4 +670,380 @@ mod serial_tests { info!("Direct heal storage API test passed"); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_scanner_submits_heal_task_when_part_missing() { + let (disk_paths, ecstore, heal_storage) = setup_test_env().await; + + let bucket_name = format!("scanner-heal-bucket-{}", uuid::Uuid::new_v4().simple()); + let object_name = "scanner-heal-object.txt"; + create_test_bucket(&ecstore, &bucket_name).await; + upload_test_object(&ecstore, &bucket_name, object_name, b"Scanner auto-heal data").await; + + let heal_cfg = HealConfig { + enable_auto_heal: true, + heal_interval: Duration::from_millis(20), + max_concurrent_heals: 4, + ..Default::default() + }; + let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg))); + heal_manager.start().await.unwrap(); + + let scanner = Scanner::new(None, Some(heal_manager.clone())); + scanner.initialize_with_ecstore().await; + scanner.set_config_enable_healing(true).await; + scanner.set_config_scan_mode(ScanMode::Deep).await; + + scanner + .scan_cycle() + .await + .expect("Initial scan should succeed before simulating failures"); + let baseline_stats = heal_manager.get_statistics().await; + + let deleted_part_path = delete_first_part_file(&disk_paths, &bucket_name, object_name); + assert!(!deleted_part_path.exists(), "Deleted part file should not exist before healing"); + + scanner + .scan_cycle() + .await + .expect("Scan after part deletion should finish and enqueue heal task"); + tokio::time::sleep(Duration::from_millis(500)).await; + + let updated_stats = heal_manager.get_statistics().await; + assert!( + updated_stats.total_tasks > baseline_stats.total_tasks, + "Scanner should submit heal tasks when data parts go missing" + ); + + // Allow heal manager to restore the missing part + tokio::time::sleep(Duration::from_secs(2)).await; + assert!( + deleted_part_path.exists(), + "Missing part should be restored after heal: {:?}", + deleted_part_path + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_scanner_submits_metadata_heal_when_xl_meta_missing() { + let (disk_paths, ecstore, heal_storage) = setup_test_env().await; + + let bucket_name = format!("scanner-meta-bucket-{}", uuid::Uuid::new_v4().simple()); + let object_name = "scanner-meta-object.txt"; + create_test_bucket(&ecstore, &bucket_name).await; + upload_test_object(&ecstore, &bucket_name, object_name, b"Scanner metadata heal data").await; + + let heal_cfg = HealConfig { + enable_auto_heal: true, + heal_interval: Duration::from_millis(20), + max_concurrent_heals: 4, + ..Default::default() + }; + let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg))); + heal_manager.start().await.unwrap(); + + let scanner = Scanner::new(None, Some(heal_manager.clone())); + scanner.initialize_with_ecstore().await; + scanner.set_config_enable_healing(true).await; + scanner.set_config_scan_mode(ScanMode::Deep).await; + + scanner + .scan_cycle() + .await + .expect("Initial scan should succeed before metadata deletion"); + let baseline_stats = heal_manager.get_statistics().await; + + let deleted_meta_path = delete_xl_meta_file(&disk_paths, &bucket_name, object_name); + assert!(!deleted_meta_path.exists(), "Deleted xl.meta should not exist before healing"); + + scanner + .scan_cycle() + .await + .expect("Scan after metadata deletion should finish and enqueue heal task"); + tokio::time::sleep(Duration::from_millis(800)).await; + + let updated_stats = heal_manager.get_statistics().await; + assert!( + updated_stats.total_tasks > baseline_stats.total_tasks, + "Scanner should submit metadata heal tasks when xl.meta is missing" + ); + + tokio::time::sleep(Duration::from_secs(2)).await; + assert!( + deleted_meta_path.exists(), + "xl.meta should be restored after heal: {:?}", + deleted_meta_path + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_scanner_triggers_replication_heal_when_status_failed() { + let (_disk_paths, ecstore, heal_storage) = setup_test_env().await; + + let bucket_name = format!("scanner-replication-bucket-{}", uuid::Uuid::new_v4().simple()); + let object_name = "scanner-replication-heal-object"; + create_test_bucket(&ecstore, &bucket_name).await; + configure_bucket_replication(&bucket_name, TEST_REPLICATION_TARGET_ARN).await; + + let replication_pool = ensure_test_replication_pool().await; + replication_pool.clear().await; + + let mut opts = ObjectOptions::default(); + opts.user_defined.insert( + AMZ_BUCKET_REPLICATION_STATUS.to_string(), + ReplicationStatusType::Failed.as_str().to_string(), + ); + let replication_status_key = format!("{}replication-status", RESERVED_METADATA_PREFIX_LOWER); + opts.user_defined.insert( + replication_status_key.clone(), + format!("{}={};", TEST_REPLICATION_TARGET_ARN, ReplicationStatusType::Failed.as_str()), + ); + let mut reader = PutObjReader::from_vec(b"replication heal data".to_vec()); + ecstore + .put_object(&bucket_name, object_name, &mut reader, &opts) + .await + .expect("Failed to upload replication test object"); + + let object_info = ecstore + .get_object_info(&bucket_name, object_name, &ObjectOptions::default()) + .await + .expect("Failed to read object info for replication test"); + assert_eq!( + object_info + .user_defined + .get(AMZ_BUCKET_REPLICATION_STATUS) + .map(|s| s.as_str()), + Some(ReplicationStatusType::Failed.as_str()), + "Uploaded object should contain replication status metadata" + ); + assert!( + object_info + .user_defined + .get(&replication_status_key) + .map(|s| s.contains(ReplicationStatusType::Failed.as_str())) + .unwrap_or(false), + "Uploaded object should preserve internal replication status metadata" + ); + + let heal_cfg = HealConfig { + enable_auto_heal: true, + heal_interval: Duration::from_millis(20), + max_concurrent_heals: 4, + ..Default::default() + }; + let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg))); + heal_manager.start().await.unwrap(); + + let scanner = Scanner::new(None, Some(heal_manager.clone())); + scanner.initialize_with_ecstore().await; + scanner.set_config_enable_healing(true).await; + scanner.set_config_scan_mode(ScanMode::Deep).await; + + scanner + .scan_cycle() + .await + .expect("Scan cycle should succeed and evaluate replication state"); + + let replica_tasks = replication_pool.take_replica_tasks().await; + assert!( + replica_tasks + .iter() + .any(|info| info.bucket == bucket_name && info.name == object_name), + "Scanner should enqueue replication heal task when replication status is FAILED (recorded tasks: {:?})", + replica_tasks + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_scanner_submits_erasure_set_heal_when_disk_offline() { + let (disk_paths, _ecstore, heal_storage) = setup_test_env().await; + + let format_path = disk_paths[0].join(".rustfs.sys").join("format.json"); + assert!(format_path.exists(), "format.json should exist before simulating offline disk"); + let _format_guard = FormatPathGuard::new(format_path.clone()).expect("failed to move format.json"); + + let heal_cfg = HealConfig { + enable_auto_heal: true, + heal_interval: Duration::from_millis(20), + max_concurrent_heals: 2, + ..Default::default() + }; + let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg))); + heal_manager.start().await.unwrap(); + + let scanner = Scanner::new(None, Some(heal_manager.clone())); + scanner.initialize_with_ecstore().await; + scanner.set_config_enable_healing(true).await; + scanner.set_config_scan_mode(ScanMode::Normal).await; + + let baseline_stats = heal_manager.get_statistics().await; + scanner + .scan_cycle() + .await + .expect("Scan cycle should complete even when a disk is offline"); + tokio::time::sleep(Duration::from_millis(200)).await; + let updated_stats = heal_manager.get_statistics().await; + + assert!( + updated_stats.total_tasks > baseline_stats.total_tasks, + "Scanner should enqueue erasure set heal when disk is offline (before {}, after {})", + baseline_stats.total_tasks, + updated_stats.total_tasks + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_scanner_submits_erasure_set_heal_when_listing_volumes_fails() { + let (disk_paths, ecstore, heal_storage) = setup_test_env().await; + + let bucket_name = format!("scanner-list-volumes-{}", uuid::Uuid::new_v4().simple()); + let object_name = "scanner-list-volumes-object"; + create_test_bucket(&ecstore, &bucket_name).await; + upload_test_object(&ecstore, &bucket_name, object_name, b"disk list volumes failure").await; + + let heal_cfg = HealConfig { + enable_auto_heal: true, + heal_interval: Duration::from_millis(20), + max_concurrent_heals: 2, + ..Default::default() + }; + let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg))); + heal_manager.start().await.unwrap(); + + let scanner = Scanner::new(None, Some(heal_manager.clone())); + scanner.initialize_with_ecstore().await; + scanner.set_config_enable_healing(true).await; + scanner.set_config_scan_mode(ScanMode::Deep).await; + + scanner + .scan_cycle() + .await + .expect("Initial scan should succeed before simulating disk permission issues"); + let baseline_stats = heal_manager.get_statistics().await; + + let disk_root = disk_paths[0].clone(); + assert!(disk_root.exists(), "Disk root should exist so we can simulate permission failures"); + + { + let _root_perm_guard = + PermissionGuard::new(disk_root.clone(), 0o000).expect("Failed to change disk root permissions"); + + let scan_result = scanner.scan_cycle().await; + assert!( + scan_result.is_ok(), + "Scan cycle should continue even if disk volumes cannot be listed: {:?}", + scan_result + ); + tokio::time::sleep(Duration::from_millis(200)).await; + let updated_stats = heal_manager.get_statistics().await; + + assert!( + updated_stats.total_tasks > baseline_stats.total_tasks, + "Scanner should enqueue erasure set heal when listing volumes fails (before {}, after {})", + baseline_stats.total_tasks, + updated_stats.total_tasks + ); + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_scanner_submits_erasure_set_heal_when_disk_access_fails() { + let (disk_paths, ecstore, heal_storage) = setup_test_env().await; + + let bucket_name = format!("scanner-access-error-{}", uuid::Uuid::new_v4().simple()); + let object_name = "scanner-access-error-object.txt"; + create_test_bucket(&ecstore, &bucket_name).await; + upload_test_object(&ecstore, &bucket_name, object_name, b"disk access failure").await; + + let bucket_path = disk_paths[0].join(&bucket_name); + assert!(bucket_path.exists(), "Bucket path should exist on disk for access test"); + let _perm_guard = PermissionGuard::new(bucket_path.clone(), 0o000).expect("Failed to change permissions"); + + let heal_cfg = HealConfig { + enable_auto_heal: true, + heal_interval: Duration::from_millis(20), + max_concurrent_heals: 2, + ..Default::default() + }; + let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg))); + heal_manager.start().await.unwrap(); + + let scanner = Scanner::new(None, Some(heal_manager.clone())); + scanner.initialize_with_ecstore().await; + scanner.set_config_enable_healing(true).await; + scanner.set_config_scan_mode(ScanMode::Deep).await; + + let baseline_stats = heal_manager.get_statistics().await; + let scan_result = scanner.scan_cycle().await; + assert!( + scan_result.is_ok(), + "Scan cycle should complete even if a disk volume has access errors: {:?}", + scan_result + ); + tokio::time::sleep(Duration::from_millis(200)).await; + let updated_stats = heal_manager.get_statistics().await; + + assert!( + updated_stats.total_tasks > baseline_stats.total_tasks, + "Scanner should enqueue erasure set heal when disk access fails (before {}, after {})", + baseline_stats.total_tasks, + updated_stats.total_tasks + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_scanner_detects_missing_bucket_directory_and_queues_bucket_heal() { + let (disk_paths, ecstore, heal_storage) = setup_test_env().await; + + let bucket_name = format!("scanner-missing-bucket-{}", uuid::Uuid::new_v4().simple()); + create_test_bucket(&ecstore, &bucket_name).await; + upload_test_object(&ecstore, &bucket_name, "seed-object", b"bucket heal data").await; + + let scanner_heal_cfg = HealConfig { + enable_auto_heal: true, + heal_interval: Duration::from_millis(20), + max_concurrent_heals: 4, + ..Default::default() + }; + let scanner_heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(scanner_heal_cfg))); + scanner_heal_manager.start().await.unwrap(); + + let scanner = Scanner::new(None, Some(scanner_heal_manager.clone())); + scanner.initialize_with_ecstore().await; + scanner.set_config_enable_healing(true).await; + scanner.set_config_scan_mode(ScanMode::Normal).await; + + scanner + .scan_cycle() + .await + .expect("Initial scan should succeed before deleting bucket directory"); + let baseline_stats = scanner_heal_manager.get_statistics().await; + + let missing_dir = disk_paths[0].join(&bucket_name); + assert!(missing_dir.exists()); + std::fs::remove_dir_all(&missing_dir).expect("Failed to remove bucket directory for heal simulation"); + assert!(!missing_dir.exists(), "Bucket directory should be removed on disk to trigger heal"); + + scanner + .run_volume_consistency_check() + .await + .expect("Volume consistency check should run after bucket removal"); + tokio::time::sleep(Duration::from_millis(800)).await; + + let updated_stats = scanner_heal_manager.get_statistics().await; + assert!( + updated_stats.total_tasks > baseline_stats.total_tasks, + "Scanner should submit bucket heal tasks when a bucket directory is missing" + ); + + tokio::time::sleep(Duration::from_secs(1)).await; + assert!(missing_dir.exists(), "Bucket directory should be restored after heal"); + } } diff --git a/crates/ahm/tests/lifecycle_integration_test.rs b/crates/ahm/tests/lifecycle_integration_test.rs index f491531c..7407743c 100644 --- a/crates/ahm/tests/lifecycle_integration_test.rs +++ b/crates/ahm/tests/lifecycle_integration_test.rs @@ -12,10 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_trait::async_trait; use rustfs_ahm::scanner::{Scanner, data_scanner::ScannerConfig}; use rustfs_ecstore::{ - bucket::metadata::BUCKET_LIFECYCLE_CONFIG, - bucket::metadata_sys, + bucket::{ + metadata::BUCKET_LIFECYCLE_CONFIG, + metadata_sys, + replication::{ + DeletedObjectReplicationInfo, DynReplicationPool, GLOBAL_REPLICATION_POOL, ReplicationPoolTrait, ReplicationPriority, + }, + target::{BucketTarget, BucketTargetType, BucketTargets}, + utils::serialize, + }, disk::endpoint::Endpoint, endpoints::{EndpointServerPools, Endpoints, PoolEndpoints}, global::GLOBAL_TierConfigMgr, @@ -23,18 +31,27 @@ use rustfs_ecstore::{ store_api::{MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI}, tier::tier_config::{TierConfig, TierMinIO, TierType}, }; +use rustfs_filemeta::{ReplicateObjectInfo, ReplicationStatusType}; +use rustfs_utils::http::headers::{AMZ_BUCKET_REPLICATION_STATUS, RESERVED_METADATA_PREFIX_LOWER}; +use s3s::dto::{ + BucketVersioningStatus, Destination, ExistingObjectReplication, ExistingObjectReplicationStatus, ReplicationConfiguration, + ReplicationRule, ReplicationRuleStatus, VersioningConfiguration, +}; use serial_test::serial; use std::{ path::PathBuf, sync::{Arc, Once, OnceLock}, time::Duration, }; +use time::{Duration as TimeDuration, OffsetDateTime}; use tokio::fs; +use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; use tracing::info; static GLOBAL_ENV: OnceLock<(Vec, Arc)> = OnceLock::new(); static INIT: Once = Once::new(); +const TEST_REPLICATION_TARGET_ARN: &str = "arn:aws:s3:::rustfs-lifecycle-replication-test"; fn init_tracing() { INIT.call_once(|| { @@ -159,6 +176,167 @@ async fn upload_test_object(ecstore: &Arc, bucket: &str, object: &str, info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size); } +#[derive(Debug, Default)] +struct RecordingReplicationPool { + replica_tasks: Mutex>, + delete_tasks: Mutex>, +} + +impl RecordingReplicationPool { + async fn take_replica_tasks(&self) -> Vec { + let mut guard = self.replica_tasks.lock().await; + guard.drain(..).collect() + } +} + +#[async_trait] +impl ReplicationPoolTrait for RecordingReplicationPool { + async fn queue_replica_task(&self, ri: ReplicateObjectInfo) { + self.replica_tasks.lock().await.push(ri); + } + + async fn queue_replica_delete_task(&self, ri: DeletedObjectReplicationInfo) { + self.delete_tasks.lock().await.push(ri); + } + + async fn resize(&self, _priority: ReplicationPriority, _max_workers: usize, _max_l_workers: usize) {} + + async fn init_resync( + self: Arc, + _cancellation_token: CancellationToken, + _buckets: Vec, + ) -> Result<(), rustfs_ecstore::error::Error> { + Ok(()) + } +} + +async fn ensure_test_replication_pool() -> Arc { + static POOL: OnceLock> = OnceLock::new(); + if let Some(existing) = POOL.get() { + existing.replica_tasks.lock().await.clear(); + existing.delete_tasks.lock().await.clear(); + return existing.clone(); + } + + let pool = Arc::new(RecordingReplicationPool::default()); + let dyn_pool: Arc = pool.clone(); + GLOBAL_REPLICATION_POOL + .get_or_init(|| { + let pool_clone = dyn_pool.clone(); + async move { pool_clone } + }) + .await; + let _ = POOL.set(pool.clone()); + pool +} + +async fn configure_bucket_replication(bucket: &str) { + let meta = metadata_sys::get(bucket) + .await + .expect("bucket metadata should exist for replication configuration"); + let mut metadata = (*meta).clone(); + + let replication_rule = ReplicationRule { + delete_marker_replication: None, + delete_replication: None, + destination: Destination { + access_control_translation: None, + account: None, + bucket: TEST_REPLICATION_TARGET_ARN.to_string(), + encryption_configuration: None, + metrics: None, + replication_time: None, + storage_class: None, + }, + existing_object_replication: Some(ExistingObjectReplication { + status: ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::ENABLED), + }), + filter: None, + id: Some("lifecycle-replication-rule".to_string()), + prefix: Some(String::new()), + priority: Some(1), + source_selection_criteria: None, + status: ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED), + }; + + let replication_cfg = ReplicationConfiguration { + role: TEST_REPLICATION_TARGET_ARN.to_string(), + rules: vec![replication_rule], + }; + + let bucket_targets = BucketTargets { + targets: vec![BucketTarget { + source_bucket: bucket.to_string(), + endpoint: "replication.invalid".to_string(), + target_bucket: "replication-target".to_string(), + arn: TEST_REPLICATION_TARGET_ARN.to_string(), + target_type: BucketTargetType::ReplicationService, + ..Default::default() + }], + }; + + metadata.replication_config = Some(replication_cfg.clone()); + metadata.replication_config_xml = serialize(&replication_cfg).expect("serialize replication config"); + metadata.bucket_target_config = Some(bucket_targets.clone()); + metadata.bucket_targets_config_json = serde_json::to_vec(&bucket_targets).expect("serialize bucket targets"); + + let versioning_cfg = VersioningConfiguration { + status: Some(BucketVersioningStatus::from_static(BucketVersioningStatus::ENABLED)), + ..Default::default() + }; + metadata.versioning_config = Some(versioning_cfg.clone()); + metadata.versioning_config_xml = serialize(&versioning_cfg).expect("serialize versioning config"); + + metadata_sys::set_bucket_metadata(bucket.to_string(), metadata) + .await + .expect("failed to persist bucket metadata with replication config"); +} + +async fn upload_object_with_replication_status( + ecstore: &Arc, + bucket: &str, + object: &str, + status: ReplicationStatusType, +) { + let mut reader = PutObjReader::from_vec(b"replication-state".to_vec()); + let mut opts = ObjectOptions::default(); + opts.user_defined + .insert(AMZ_BUCKET_REPLICATION_STATUS.to_string(), status.as_str().to_string()); + let internal_key = format!("{}replication-status", RESERVED_METADATA_PREFIX_LOWER); + opts.user_defined + .insert(internal_key, format!("{}={};", TEST_REPLICATION_TARGET_ARN, status.as_str())); + + (**ecstore) + .put_object(bucket, object, &mut reader, &opts) + .await + .expect("failed to upload replication test object"); +} + +async fn upload_object_with_retention(ecstore: &Arc, bucket: &str, object: &str, data: &[u8], retain_for: Duration) { + use s3s::header::{X_AMZ_OBJECT_LOCK_MODE, X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE}; + use time::format_description::well_known::Rfc3339; + + let mut reader = PutObjReader::from_vec(data.to_vec()); + let mut opts = ObjectOptions::default(); + let retain_duration = TimeDuration::try_from(retain_for).unwrap_or_else(|_| TimeDuration::seconds(0)); + let retain_until = OffsetDateTime::now_utc() + retain_duration; + let retain_until_str = retain_until.format(&Rfc3339).expect("format retain date"); + let lock_mode_key = X_AMZ_OBJECT_LOCK_MODE.as_str().to_string(); + let lock_mode_lower = lock_mode_key.to_lowercase(); + opts.user_defined.insert(lock_mode_lower, "GOVERNANCE".to_string()); + opts.user_defined.insert(lock_mode_key, "GOVERNANCE".to_string()); + + let retain_key = X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str().to_string(); + let retain_key_lower = retain_key.to_lowercase(); + opts.user_defined.insert(retain_key_lower, retain_until_str.clone()); + opts.user_defined.insert(retain_key, retain_until_str); + + (**ecstore) + .put_object(bucket, object, &mut reader, &opts) + .await + .expect("Failed to upload retained object"); +} + /// Test helper: Set bucket lifecycle configuration async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box> { // Create a simple lifecycle configuration XML with 0 days expiry for immediate testing @@ -694,4 +872,127 @@ mod serial_tests { println!("Lifecycle transition basic test completed"); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_lifecycle_respects_object_lock_retention() { + let (_disk_paths, ecstore) = setup_test_env().await; + + let suffix = uuid::Uuid::new_v4().simple().to_string(); + let bucket_name = format!("test-lc-lock-retention-{}", &suffix[..8]); + let object_name = "test/locked-object.txt"; + let test_data = b"retained payload"; + + create_test_lock_bucket(&ecstore, bucket_name.as_str()).await; + upload_object_with_retention(&ecstore, bucket_name.as_str(), object_name, test_data, Duration::from_secs(3600)).await; + + assert!( + object_exists(&ecstore, bucket_name.as_str(), object_name).await, + "Object should exist before lifecycle processing" + ); + + set_bucket_lifecycle(bucket_name.as_str()) + .await + .expect("Failed to set lifecycle configuration"); + + let scanner_config = ScannerConfig { + scan_interval: Duration::from_millis(100), + deep_scan_interval: Duration::from_millis(500), + max_concurrent_scans: 1, + ..Default::default() + }; + let scanner = Scanner::new(Some(scanner_config), None); + scanner.start().await.expect("Failed to start scanner"); + + for _ in 0..3 { + scanner.scan_cycle().await.expect("scan cycle should succeed"); + tokio::time::sleep(Duration::from_millis(200)).await; + } + + assert!( + object_exists(&ecstore, bucket_name.as_str(), object_name).await, + "Object with active retention should not be deleted by lifecycle" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_lifecycle_triggers_replication_heal_for_lagging_and_failed_objects() { + let (_disk_paths, ecstore) = setup_test_env().await; + + let suffix = uuid::Uuid::new_v4().simple().to_string(); + let bucket_name = format!("lc-replication-{}", &suffix[..8]); + create_test_bucket(&ecstore, bucket_name.as_str()).await; + configure_bucket_replication(bucket_name.as_str()).await; + let replication_pool = ensure_test_replication_pool().await; + + upload_object_with_replication_status( + &ecstore, + bucket_name.as_str(), + "test/lagging-pending", + ReplicationStatusType::Pending, + ) + .await; + upload_object_with_replication_status( + &ecstore, + bucket_name.as_str(), + "test/failed-object", + ReplicationStatusType::Failed, + ) + .await; + + let scanner_config = ScannerConfig { + scan_interval: Duration::from_millis(100), + deep_scan_interval: Duration::from_millis(500), + max_concurrent_scans: 2, + replication_pending_grace: Duration::from_secs(0), + ..Default::default() + }; + let scanner = Scanner::new(Some(scanner_config), None); + + scanner.scan_cycle().await.expect("scan cycle should complete"); + tokio::time::sleep(Duration::from_millis(200)).await; + + let replica_tasks = replication_pool.take_replica_tasks().await; + assert!( + replica_tasks.iter().any(|t| t.name == "test/lagging-pending"), + "Pending object should be enqueued for replication heal: {:?}", + replica_tasks + ); + assert!( + replica_tasks.iter().any(|t| t.name == "test/failed-object"), + "Failed object should be enqueued for replication heal: {:?}", + replica_tasks + ); + + let metrics = scanner.get_metrics().await; + assert_eq!( + metrics.replication_tasks_queued, + replica_tasks.len() as u64, + "Replication tasks queued metric should match recorded tasks" + ); + assert!( + metrics.replication_pending_objects >= 1, + "Pending replication metric should be incremented" + ); + assert!(metrics.replication_failed_objects >= 1, "Failed replication metric should be incremented"); + assert!( + metrics.replication_lagging_objects >= 1, + "Lagging replication metric should track pending object beyond grace" + ); + + let bucket_metrics = metrics + .bucket_metrics + .get(&bucket_name) + .expect("bucket metrics should contain replication counters"); + assert!( + bucket_metrics.replication_pending >= 1 && bucket_metrics.replication_failed >= 1, + "Bucket-level replication metrics should reflect observed statuses" + ); + assert_eq!( + bucket_metrics.replication_tasks_queued, + replica_tasks.len() as u64, + "Bucket-level queued counter should match enqueued tasks" + ); + } } diff --git a/crates/ecstore/src/bucket/object_lock/objectlock.rs b/crates/ecstore/src/bucket/object_lock/objectlock.rs index 4309739b..c5b763cc 100644 --- a/crates/ecstore/src/bucket/object_lock/objectlock.rs +++ b/crates/ecstore/src/bucket/object_lock/objectlock.rs @@ -37,7 +37,7 @@ pub fn get_object_retention_meta(meta: HashMap) -> ObjectLockRet let mut mode_str = meta.get(X_AMZ_OBJECT_LOCK_MODE.as_str().to_lowercase().as_str()); if mode_str.is_none() { - mode_str = Some(&meta[X_AMZ_OBJECT_LOCK_MODE.as_str()]); + mode_str = meta.get(X_AMZ_OBJECT_LOCK_MODE.as_str()); } let mode = if let Some(mode_str) = mode_str { parse_ret_mode(mode_str.as_str()) @@ -50,7 +50,7 @@ pub fn get_object_retention_meta(meta: HashMap) -> ObjectLockRet let mut till_str = meta.get(X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str().to_lowercase().as_str()); if till_str.is_none() { - till_str = Some(&meta[X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str()]); + till_str = meta.get(X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str()); } if let Some(till_str) = till_str { let t = OffsetDateTime::parse(till_str, &format_description::well_known::Iso8601::DEFAULT); @@ -67,7 +67,7 @@ pub fn get_object_retention_meta(meta: HashMap) -> ObjectLockRet pub fn get_object_legalhold_meta(meta: HashMap) -> ObjectLockLegalHold { let mut hold_str = meta.get(X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str().to_lowercase().as_str()); if hold_str.is_none() { - hold_str = Some(&meta[X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str()]); + hold_str = meta.get(X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str()); } if let Some(hold_str) = hold_str { return ObjectLockLegalHold {