diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index 08908298..ad1d2126 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -27,8 +27,14 @@ use rustfs_common::data_usage::{BucketUsageInfo, DataUsageInfo, SizeSummary}; use rustfs_common::metrics::{Metric, Metrics, global_metrics}; use rustfs_ecstore::{ self as ecstore, StorageAPI, - bucket::versioning_sys::BucketVersioningSys, - bucket::{object_lock::objectlock_sys::BucketObjectLockSys, versioning::VersioningApi}, + bucket::{ + bucket_target_sys::BucketTargetSys, + metadata_sys::get_replication_config, + object_lock::objectlock_sys::BucketObjectLockSys, + replication::{ReplicationConfig, ReplicationConfigurationExt}, + versioning::VersioningApi, + versioning_sys::BucketVersioningSys, + }, data_usage::{aggregate_local_snapshots, store_data_usage_in_backend}, disk::{Disk, DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions}, set_disk::SetDisks, @@ -389,7 +395,10 @@ impl Scanner { enable_deep_scan: bool, run_lifecycle: bool, ) -> Result { - debug!("Listing buckets for lifecycle/replication evaluation"); + debug!( + "Listing buckets for lifecycle/replication evaluation, enable_healing: {}, enable_deep_scan: {}, run_lifecycle: {}", + enable_healing, enable_deep_scan, run_lifecycle + ); let buckets = ecstore .list_bucket(&rustfs_ecstore::store_api::BucketOptions::default()) .await @@ -401,6 +410,8 @@ impl Scanner { cfg.replication_pending_grace }; + let bucket_target_sys = BucketTargetSys::get(); + for bucket_info in buckets { let bucket_name = &bucket_info.name; @@ -467,8 +478,25 @@ impl Scanner { let object_lock_config = BucketObjectLockSys::get(bucket_name).await; let replication_metrics = Some(ReplicationMetricsHandle::new(Arc::clone(&self.metrics), Arc::clone(&self.bucket_metrics))); + + let replication_config = { + if let Ok((cfg, _)) = get_replication_config(bucket_name).await { + if cfg.has_active_rules("", true) { + Some(ReplicationConfig::new( + Some(cfg), + bucket_target_sys.list_bucket_targets(bucket_name).await.ok(), + )) + } else { + None + } + } else { + None + } + }; + let mut scanner_item = ScannerItem::new( bucket_name.to_string(), + replication_config, lifecycle_config.clone(), Some(versioning_config.clone()), object_lock_config, @@ -477,7 +505,7 @@ impl Scanner { ); if let Err(e) = self - .process_bucket_objects_for_lifecycle(ecstore, bucket_name, &mut scanner_item, records) + .process_bucket_objects_for_lifecycle(bucket_name, &mut scanner_item, records) .await { warn!("Failed to process lifecycle/replication actions for bucket {}: {}", bucket_name, e); @@ -750,7 +778,6 @@ impl Scanner { /// Process bucket objects for lifecycle actions async fn process_bucket_objects_for_lifecycle( &self, - ecstore: &Arc, bucket_name: &str, scanner_item: &mut ScannerItem, records: &[LocalObjectRecord], @@ -763,14 +790,7 @@ impl Scanner { continue; } - let mut object_info = Self::convert_record_to_object_info(record); - if object_info.replication_status.is_empty() && object_info.replication_status_internal.is_none() { - let object_name = object_info.name.clone(); - let object_opts = rustfs_ecstore::store_api::ObjectOptions::default(); - if let Ok(fresh_info) = ecstore.get_object_info(bucket_name, &object_name, &object_opts).await { - object_info = fresh_info; - } - } + let object_info = Self::convert_record_to_object_info(record); let mut size_summary = SizeSummary::default(); let (deleted, _size) = scanner_item.apply_actions(&object_info, &mut size_summary).await; if deleted { @@ -2405,8 +2425,21 @@ impl Scanner { let cfg = self.config.read().await; cfg.replication_pending_grace }; + + let replication_config = { + if let Ok((cfg, _)) = get_replication_config(bucket).await { + Some(ReplicationConfig::new( + Some(cfg), + BucketTargetSys::get().list_bucket_targets(bucket).await.ok(), + )) + } else { + None + } + }; + let mut scanner_item = ScannerItem::new( bucket.to_string(), + replication_config, Some(lifecycle_config.clone()), versioning_config.clone(), object_lock_config, @@ -2762,12 +2795,13 @@ impl Scanner { _ = interval.tick() => { // Check if scanner should still be running if !self.state.read().await.is_running { + warn!("Scanner is not running, exiting scanner loop"); break; } // check cancel signal if cancel_token.is_cancelled() { - info!("Cancellation requested, exiting scanner loop"); + warn!("Cancellation requested, exiting scanner loop"); break; } @@ -2787,13 +2821,13 @@ impl Scanner { } } _ = cancel_token.cancelled() => { - info!("Received cancellation, stopping scanner loop"); + warn!("Received cancellation, exiting scanner loop"); break; } } } - info!("Scanner loop stopped"); + warn!("Scanner loop stopped"); Ok(()) } diff --git a/crates/ahm/src/scanner/lifecycle.rs b/crates/ahm/src/scanner/lifecycle.rs index 6976f81f..f4a716fb 100644 --- a/crates/ahm/src/scanner/lifecycle.rs +++ b/crates/ahm/src/scanner/lifecycle.rs @@ -25,7 +25,7 @@ use rustfs_ecstore::bucket::{ lifecycle, lifecycle::Lifecycle, }, - metadata_sys::{get_bucket_targets_config, get_object_lock_config, get_replication_config}, + metadata_sys::get_object_lock_config, object_lock::objectlock_sys::{BucketObjectLockSys, enforce_retention_for_deletion}, versioning::VersioningApi, versioning_sys::BucketVersioningSys, @@ -58,6 +58,7 @@ static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(102 pub struct ScannerItem { pub bucket: String, pub object_name: String, + pub replication: Option, pub lifecycle: Option>, pub versioning: Option>, pub object_lock_config: Option, @@ -129,6 +130,7 @@ impl ScannerItem { pub fn new( bucket: String, + replication: Option, lifecycle: Option>, versioning: Option>, object_lock_config: Option, @@ -138,6 +140,7 @@ impl ScannerItem { Self { bucket, object_name: "".to_string(), + replication, lifecycle, versioning, object_lock_config, @@ -377,32 +380,36 @@ impl ScannerItem { enriched.bucket, enriched.name, enriched.replication_status, enriched.replication_status_internal ); - if !self.needs_replication_heal(&enriched, pending_lagging) { - return Ok(()); - } + // if !self.needs_replication_heal(&enriched, pending_lagging) { + // return Ok(()); + // } - let replication_cfg = match get_replication_config(&self.bucket).await { - Ok((cfg, _)) => Some(cfg), - Err(err) => { - debug!("heal_replication: failed to fetch replication config for bucket {}: {}", self.bucket, err); - None - } - }; + // let replication_cfg = match get_replication_config(&self.bucket).await { + // Ok((cfg, _)) => Some(cfg), + // Err(err) => { + // debug!("heal_replication: failed to fetch replication config for bucket {}: {}", self.bucket, err); + // None + // } + // }; - if replication_cfg.is_none() { - return Ok(()); - } + // if replication_cfg.is_none() { + // return Ok(()); + // } - let bucket_targets = match get_bucket_targets_config(&self.bucket).await { - Ok(targets) => Some(targets), - Err(err) => { - debug!("heal_replication: no bucket targets for bucket {}: {}", self.bucket, err); - None - } - }; + // let bucket_targets = match get_bucket_targets_config(&self.bucket).await { + // Ok(targets) => Some(targets), + // Err(err) => { + // debug!("heal_replication: no bucket targets for bucket {}: {}", self.bucket, err); + // None + // } + // }; - let replication_cfg = ReplicationConfig::new(replication_cfg, bucket_targets); - if replication_cfg.is_empty() { + // let replication_cfg = ReplicationConfig::new(replication_cfg, bucket_targets); + + let replication_cfg = self.replication.clone().unwrap_or_default(); + + if replication_cfg.config.is_none() && replication_cfg.remotes.is_none() { + debug!("heal_replication: no replication config for {}/{}", enriched.bucket, enriched.name); return Ok(()); }