This commit is contained in:
weisd
2025-12-10 15:36:52 +08:00
parent 3ac004510a
commit 00787cbce4
5 changed files with 65 additions and 29 deletions

View File

@@ -435,18 +435,19 @@ impl Scanner {
let records_cow: Cow<[LocalObjectRecord]> = if let Some(existing) = bucket_objects_map.get(bucket_name) {
if existing.is_empty() {
debug!("Bucket {} had empty local snapshot entries; refreshing via ECStore listing", bucket_name);
warn!("Bucket {} had empty local snapshot entries; refreshing via ECStore listing", bucket_name);
let fetched = self.collect_bucket_records_from_store(ecstore, bucket_name).await?;
if fetched.is_empty() {
debug!("No objects discovered via ECStore listing for bucket {}; skipping", bucket_name);
warn!("No objects discovered via ECStore listing for bucket {}; skipping", bucket_name);
continue;
}
Cow::Owned(fetched)
} else {
warn!("Bucket {} had non-empty local snapshot entries; using them", bucket_name);
Cow::Borrowed(existing.as_slice())
}
} else {
debug!(
warn!(
"No local snapshot entries found for bucket {}; falling back to ECStore listing",
bucket_name
);
@@ -547,7 +548,7 @@ impl Scanner {
}
};
let bucket_objects_map = &scan_outcome.bucket_objects;
warn!("Basic test scan: bucket_objects: {:?}", bucket_objects_map);
let lifecycle_result = self
.handle_buckets_from_local_records(&ecstore, bucket_objects_map, enable_healing, enable_deep_scan, false)
.await;
@@ -680,6 +681,8 @@ impl Scanner {
return info.clone();
}
warn!("convert_record_to_object_info no object info found, record: {:?}", record);
let usage = &record.usage;
ObjectInfo {
@@ -1322,6 +1325,8 @@ impl Scanner {
return Ok(());
}
warn!("Lifecycle evaluation: bucket_objects: {:?}", scan_outcome.bucket_objects);
if let Err(e) = self
.handle_buckets_from_local_records(&ecstore, &scan_outcome.bucket_objects, false, false, true)
.await
@@ -1593,11 +1598,14 @@ impl Scanner {
}
} else {
// This is a real data loss scenario - trigger healing
warn!("Data parts integrity check failed for {}/{}: {}. Triggering heal.", bucket, object, e);
warn!(
"Data parts integrity check failed 1 for {}/{}: {}. Triggering heal.",
bucket, object, e
);
integrity_failed = true;
}
} else {
warn!("Data parts integrity check failed for {}/{}: {}. Triggering heal.", bucket, object, e);
warn!("Data parts integrity check failed 2 for {}/{}: {}. Triggering heal.", bucket, object, e);
integrity_failed = true;
}
}

View File

@@ -366,6 +366,9 @@ impl ScannerItem {
}
async fn heal_replication(&self, oi: &ObjectInfo) -> Result<()> {
warn!("heal_replication: healing replication for {}/{}", oi.bucket, oi.name);
warn!("heal_replication: ObjectInfo oi: {:?}", oi);
let enriched = Self::hydrate_replication_metadata(oi);
let pending_lagging = self.is_pending_lagging(&enriched);
@@ -429,7 +432,7 @@ impl ScannerItem {
if let Some(handle) = &self.replication_metrics {
handle.record_task_submission(&self.bucket).await;
}
debug!("heal_replication: queued replication heal task for {}/{}", enriched.bucket, enriched.name);
warn!("heal_replication: queued replication heal task for {}/{}", enriched.bucket, enriched.name);
} else {
warn!(
"heal_replication: GLOBAL_REPLICATION_POOL not initialized, skipping heal for {}/{}",

View File

@@ -224,9 +224,11 @@ fn scan_disk_blocking(root: PathBuf, meta: LocalUsageSnapshotMeta, mut state: In
record.usage.last_modified_ns = mtime_ns;
state.objects.insert(rel_path.clone(), record.usage.clone());
emitted.insert(rel_path.clone());
warn!("compute_object_usage: record: {:?}", record.clone());
objects_by_bucket.entry(record.usage.bucket.clone()).or_default().push(record);
}
Ok(None) => {
warn!("compute_object_usage: None, rel_path: {:?}", rel_path);
state.objects.remove(&rel_path);
}
Err(err) => {
@@ -241,25 +243,27 @@ fn scan_disk_blocking(root: PathBuf, meta: LocalUsageSnapshotMeta, mut state: In
warn!("Failed to read xl.meta {:?}: {}", xl_path, err);
}
}
} else {
warn!("should_parse: false, rel_path: {:?}", rel_path);
}
}
state.objects.retain(|key, _| visited.contains(key));
state.last_scan_ns = Some(now_ns);
for (key, usage) in &state.objects {
if emitted.contains(key) {
continue;
}
objects_by_bucket
.entry(usage.bucket.clone())
.or_default()
.push(LocalObjectRecord {
usage: usage.clone(),
object_info: None,
file_info: None,
});
}
// for (key, usage) in &state.objects {
// if emitted.contains(key) {
// continue;
// }
// objects_by_bucket
// .entry(usage.bucket.clone())
// .or_default()
// .push(LocalObjectRecord {
// usage: usage.clone(),
// object_info: None,
// file_info: None,
// });
// }
let snapshot = build_snapshot(meta, &state.objects, now);
status.snapshot_exists = true;

View File

@@ -34,8 +34,8 @@ use rustfs_madmin::heal_commands::HealResultItem;
use rustfs_rio::Checksum;
use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader};
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::http::AMZ_STORAGE_CLASS;
use rustfs_utils::http::headers::{AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX_LOWER};
use rustfs_utils::http::{AMZ_BUCKET_REPLICATION_STATUS, AMZ_STORAGE_CLASS};
use rustfs_utils::path::decode_dir_object;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -745,7 +745,22 @@ impl ObjectInfo {
let inlined = fi.inline_data();
// TODO:expires
// TODO:ReplicationState
let mut replication_status_internal = None;
let mut version_purge_status_internal = None;
if let Some(replication_state) = fi.replication_state_internal.as_ref() {
replication_status_internal = replication_state.replication_status_internal.clone();
version_purge_status_internal = replication_state.version_purge_status_internal.clone();
}
let mut replication_status = fi.replication_status();
if replication_status.is_empty()
&& let Some(status) = fi.metadata.get(AMZ_BUCKET_REPLICATION_STATUS)
&& status == ReplicationStatusType::Replica.as_str()
{
replication_status = ReplicationStatusType::Replica;
}
let version_purge_status = fi.version_purge_status();
let transitioned_object = TransitionedObject {
name: fi.transitioned_objname.clone(),
@@ -810,6 +825,10 @@ impl ObjectInfo {
transitioned_object,
checksum: fi.checksum.clone(),
storage_class,
replication_status_internal,
version_purge_status_internal,
replication_status,
version_purge_status,
..Default::default()
}
}

View File

@@ -314,13 +314,15 @@ impl ECStore {
// contextCanceled
let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_infos(
&list_result.entries.unwrap_or_default(),
bucket,
prefix,
delimiter.clone(),
)
.await;
let entries = list_result.entries.unwrap_or_default();
for entry in entries.entries() {
if entry.is_object() {
let fi = entry.to_fileinfo(bucket).unwrap();
tracing::warn!("list_objects_generic file_info: {:?}", fi);
}
}
let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_infos(&entries, bucket, prefix, delimiter.clone()).await;
let is_truncated = {
if max_keys > 0 && get_objects.len() > max_keys as usize {