mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
@@ -133,8 +133,14 @@ impl HealStorageAPI for ECStoreHealStorage {
|
||||
match self.ecstore.get_object_info(bucket, object, &Default::default()).await {
|
||||
Ok(info) => Ok(Some(info)),
|
||||
Err(e) => {
|
||||
error!("Failed to get object meta: {}/{} - {}", bucket, object, e);
|
||||
Err(Error::other(e))
|
||||
// Map ObjectNotFound to None to align with Option return type
|
||||
if matches!(e, rustfs_ecstore::error::StorageError::ObjectNotFound(_, _)) {
|
||||
debug!("Object meta not found: {}/{}", bucket, object);
|
||||
Ok(None)
|
||||
} else {
|
||||
error!("Failed to get object meta: {}/{} - {}", bucket, object, e);
|
||||
Err(Error::other(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -154,8 +160,13 @@ impl HealStorageAPI for ECStoreHealStorage {
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Failed to get object: {}/{} - {}", bucket, object, e);
|
||||
Err(Error::other(e))
|
||||
if matches!(e, rustfs_ecstore::error::StorageError::ObjectNotFound(_, _)) {
|
||||
debug!("Object data not found: {}/{}", bucket, object);
|
||||
Ok(None)
|
||||
} else {
|
||||
error!("Failed to get object: {}/{} - {}", bucket, object, e);
|
||||
Err(Error::other(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ use ecstore::{
|
||||
set_disk::SetDisks,
|
||||
};
|
||||
use rustfs_ecstore::{self as ecstore, StorageAPI, data_usage::store_data_usage_in_backend};
|
||||
use rustfs_filemeta::MetacacheReader;
|
||||
use rustfs_filemeta::{MetacacheReader, VersionType};
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, warn};
|
||||
@@ -432,8 +432,27 @@ impl Scanner {
|
||||
}
|
||||
|
||||
if let Some(ecstore) = rustfs_ecstore::new_object_layer_fn() {
|
||||
// First try the standard integrity check
|
||||
// First check whether the object still logically exists.
|
||||
// If it's already deleted (e.g., non-versioned bucket), do not trigger heal.
|
||||
let object_opts = ecstore::store_api::ObjectOptions::default();
|
||||
match ecstore.get_object_info(bucket, object, &object_opts).await {
|
||||
Ok(_) => {
|
||||
// Object exists logically, continue with verification below
|
||||
}
|
||||
Err(e) => {
|
||||
if matches!(e, ecstore::error::StorageError::ObjectNotFound(_, _)) {
|
||||
debug!(
|
||||
"Object {}/{} not found logically (likely deleted), skip integrity check & heal",
|
||||
bucket, object
|
||||
);
|
||||
return Ok(());
|
||||
} else {
|
||||
debug!("get_object_info error for {}/{}: {}", bucket, object, e);
|
||||
// Fall through to existing logic which will handle accordingly
|
||||
}
|
||||
}
|
||||
}
|
||||
// First try the standard integrity check
|
||||
let mut integrity_failed = false;
|
||||
|
||||
debug!("Running standard object verification for {}/{}", bucket, object);
|
||||
@@ -1398,8 +1417,64 @@ impl Scanner {
|
||||
let empty_vec = Vec::new();
|
||||
let locations = object_locations.get(&key).unwrap_or(&empty_vec);
|
||||
|
||||
// If any disk reports this object as a latest delete marker (tombstone),
|
||||
// it's a legitimate deletion. Skip missing-object heal to avoid recreating
|
||||
// deleted objects. Optional: a metadata heal could be submitted to fan-out
|
||||
// the delete marker, but we keep it conservative here.
|
||||
let mut has_latest_delete_marker = false;
|
||||
for &disk_idx in locations {
|
||||
if let Some(bucket_map) = all_disk_objects.get(disk_idx) {
|
||||
if let Some(file_map) = bucket_map.get(bucket) {
|
||||
if let Some(fm) = file_map.get(object_name) {
|
||||
if let Some(first_ver) = fm.versions.first() {
|
||||
if first_ver.header.version_type == VersionType::Delete {
|
||||
has_latest_delete_marker = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if has_latest_delete_marker {
|
||||
debug!(
|
||||
"Object {}/{} is a delete marker on some disk(s), skipping heal for missing parts",
|
||||
bucket, object_name
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if object is missing from some disks
|
||||
if locations.len() < disks.len() {
|
||||
// Before submitting heal, confirm the object still exists logically.
|
||||
let should_heal = if let Some(store) = rustfs_ecstore::new_object_layer_fn() {
|
||||
match store.get_object_info(bucket, object_name, &Default::default()).await {
|
||||
Ok(_) => true, // exists -> propagate by heal
|
||||
Err(e) => {
|
||||
if matches!(e, rustfs_ecstore::error::StorageError::ObjectNotFound(_, _)) {
|
||||
debug!(
|
||||
"Object {}/{} not found logically (deleted), skip missing-disks heal",
|
||||
bucket, object_name
|
||||
);
|
||||
false
|
||||
} else {
|
||||
debug!(
|
||||
"Object {}/{} get_object_info errored ({}), conservatively skip heal",
|
||||
bucket, object_name, e
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No store available; be conservative and skip to avoid recreating deletions
|
||||
debug!("No ECStore available to confirm existence, skip heal for {}/{}", bucket, object_name);
|
||||
false
|
||||
};
|
||||
|
||||
if !should_heal {
|
||||
continue;
|
||||
}
|
||||
objects_needing_heal += 1;
|
||||
let missing_disks: Vec<usize> = (0..disks.len()).filter(|&i| !locations.contains(&i)).collect();
|
||||
warn!("Object {}/{} missing from disks: {:?}", bucket, object_name, missing_disks);
|
||||
|
||||
Reference in New Issue
Block a user