diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index ad1d2126..7948f91c 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -435,18 +435,19 @@ impl Scanner { let records_cow: Cow<[LocalObjectRecord]> = if let Some(existing) = bucket_objects_map.get(bucket_name) { if existing.is_empty() { - debug!("Bucket {} had empty local snapshot entries; refreshing via ECStore listing", bucket_name); + warn!("Bucket {} had empty local snapshot entries; refreshing via ECStore listing", bucket_name); let fetched = self.collect_bucket_records_from_store(ecstore, bucket_name).await?; if fetched.is_empty() { - debug!("No objects discovered via ECStore listing for bucket {}; skipping", bucket_name); + warn!("No objects discovered via ECStore listing for bucket {}; skipping", bucket_name); continue; } Cow::Owned(fetched) } else { + warn!("Bucket {} had non-empty local snapshot entries; using them", bucket_name); Cow::Borrowed(existing.as_slice()) } } else { - debug!( + warn!( "No local snapshot entries found for bucket {}; falling back to ECStore listing", bucket_name ); @@ -547,7 +548,7 @@ impl Scanner { } }; let bucket_objects_map = &scan_outcome.bucket_objects; - + warn!("Basic test scan: bucket_objects: {:?}", bucket_objects_map); let lifecycle_result = self .handle_buckets_from_local_records(&ecstore, bucket_objects_map, enable_healing, enable_deep_scan, false) .await; @@ -680,6 +681,8 @@ impl Scanner { return info.clone(); } + warn!("convert_record_to_object_info no object info found, record: {:?}", record); + let usage = &record.usage; ObjectInfo { @@ -1322,6 +1325,8 @@ impl Scanner { return Ok(()); } + warn!("Lifecycle evaluation: bucket_objects: {:?}", scan_outcome.bucket_objects); + if let Err(e) = self .handle_buckets_from_local_records(&ecstore, &scan_outcome.bucket_objects, false, false, true) .await @@ -1593,11 +1598,14 @@ impl Scanner { } } else { // This is a real data loss scenario - trigger healing - warn!("Data parts integrity check failed for {}/{}: {}. Triggering heal.", bucket, object, e); + warn!( + "Data parts integrity check failed 1 for {}/{}: {}. Triggering heal.", + bucket, object, e + ); integrity_failed = true; } } else { - warn!("Data parts integrity check failed for {}/{}: {}. Triggering heal.", bucket, object, e); + warn!("Data parts integrity check failed 2 for {}/{}: {}. Triggering heal.", bucket, object, e); integrity_failed = true; } } diff --git a/crates/ahm/src/scanner/lifecycle.rs b/crates/ahm/src/scanner/lifecycle.rs index e9a4b180..31520b3a 100644 --- a/crates/ahm/src/scanner/lifecycle.rs +++ b/crates/ahm/src/scanner/lifecycle.rs @@ -366,6 +366,9 @@ impl ScannerItem { } async fn heal_replication(&self, oi: &ObjectInfo) -> Result<()> { + warn!("heal_replication: healing replication for {}/{}", oi.bucket, oi.name); + warn!("heal_replication: ObjectInfo oi: {:?}", oi); + let enriched = Self::hydrate_replication_metadata(oi); let pending_lagging = self.is_pending_lagging(&enriched); @@ -429,7 +432,7 @@ impl ScannerItem { if let Some(handle) = &self.replication_metrics { handle.record_task_submission(&self.bucket).await; } - debug!("heal_replication: queued replication heal task for {}/{}", enriched.bucket, enriched.name); + warn!("heal_replication: queued replication heal task for {}/{}", enriched.bucket, enriched.name); } else { warn!( "heal_replication: GLOBAL_REPLICATION_POOL not initialized, skipping heal for {}/{}", diff --git a/crates/ahm/src/scanner/local_scan/mod.rs b/crates/ahm/src/scanner/local_scan/mod.rs index f8fb77f8..ffed855b 100644 --- a/crates/ahm/src/scanner/local_scan/mod.rs +++ b/crates/ahm/src/scanner/local_scan/mod.rs @@ -224,9 +224,11 @@ fn scan_disk_blocking(root: PathBuf, meta: LocalUsageSnapshotMeta, mut state: In record.usage.last_modified_ns = mtime_ns; state.objects.insert(rel_path.clone(), record.usage.clone()); emitted.insert(rel_path.clone()); + warn!("compute_object_usage: record: {:?}", record.clone()); objects_by_bucket.entry(record.usage.bucket.clone()).or_default().push(record); } Ok(None) => { + warn!("compute_object_usage: None, rel_path: {:?}", rel_path); state.objects.remove(&rel_path); } Err(err) => { @@ -241,25 +243,27 @@ fn scan_disk_blocking(root: PathBuf, meta: LocalUsageSnapshotMeta, mut state: In warn!("Failed to read xl.meta {:?}: {}", xl_path, err); } } + } else { + warn!("should_parse: false, rel_path: {:?}", rel_path); } } state.objects.retain(|key, _| visited.contains(key)); state.last_scan_ns = Some(now_ns); - for (key, usage) in &state.objects { - if emitted.contains(key) { - continue; - } - objects_by_bucket - .entry(usage.bucket.clone()) - .or_default() - .push(LocalObjectRecord { - usage: usage.clone(), - object_info: None, - file_info: None, - }); - } + // for (key, usage) in &state.objects { + // if emitted.contains(key) { + // continue; + // } + // objects_by_bucket + // .entry(usage.bucket.clone()) + // .or_default() + // .push(LocalObjectRecord { + // usage: usage.clone(), + // object_info: None, + // file_info: None, + // }); + // } let snapshot = build_snapshot(meta, &state.objects, now); status.snapshot_exists = true; diff --git a/crates/ecstore/src/store_api.rs b/crates/ecstore/src/store_api.rs index 90c8fd96..123147c5 100644 --- a/crates/ecstore/src/store_api.rs +++ b/crates/ecstore/src/store_api.rs @@ -34,8 +34,8 @@ use rustfs_madmin::heal_commands::HealResultItem; use rustfs_rio::Checksum; use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader}; use rustfs_utils::CompressionAlgorithm; -use rustfs_utils::http::AMZ_STORAGE_CLASS; use rustfs_utils::http::headers::{AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX_LOWER}; +use rustfs_utils::http::{AMZ_BUCKET_REPLICATION_STATUS, AMZ_STORAGE_CLASS}; use rustfs_utils::path::decode_dir_object; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -745,7 +745,22 @@ impl ObjectInfo { let inlined = fi.inline_data(); // TODO:expires - // TODO:ReplicationState + + let mut replication_status_internal = None; + let mut version_purge_status_internal = None; + if let Some(replication_state) = fi.replication_state_internal.as_ref() { + replication_status_internal = replication_state.replication_status_internal.clone(); + version_purge_status_internal = replication_state.version_purge_status_internal.clone(); + } + let mut replication_status = fi.replication_status(); + if replication_status.is_empty() + && let Some(status) = fi.metadata.get(AMZ_BUCKET_REPLICATION_STATUS) + && status == ReplicationStatusType::Replica.as_str() + { + replication_status = ReplicationStatusType::Replica; + } + + let version_purge_status = fi.version_purge_status(); let transitioned_object = TransitionedObject { name: fi.transitioned_objname.clone(), @@ -810,6 +825,10 @@ impl ObjectInfo { transitioned_object, checksum: fi.checksum.clone(), storage_class, + replication_status_internal, + version_purge_status_internal, + replication_status, + version_purge_status, ..Default::default() } } diff --git a/crates/ecstore/src/store_list_objects.rs b/crates/ecstore/src/store_list_objects.rs index 676d43cf..526c41cf 100644 --- a/crates/ecstore/src/store_list_objects.rs +++ b/crates/ecstore/src/store_list_objects.rs @@ -314,13 +314,15 @@ impl ECStore { // contextCanceled - let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_infos( - &list_result.entries.unwrap_or_default(), - bucket, - prefix, - delimiter.clone(), - ) - .await; + let entries = list_result.entries.unwrap_or_default(); + for entry in entries.entries() { + if entry.is_object() { + let fi = entry.to_fileinfo(bucket).unwrap(); + tracing::warn!("list_objects_generic file_info: {:?}", fi); + } + } + + let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_infos(&entries, bucket, prefix, delimiter.clone()).await; let is_truncated = { if max_keys > 0 && get_objects.len() > max_keys as usize {