Fix Admin Heal API and Add Pagination Support for Large Buckets (#933)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: loverustfs <hello@rustfs.com>
Co-authored-by: houseme <housemecn@gmail.com>
This commit is contained in:
weisd
2025-12-03 18:10:46 +08:00
committed by GitHub
parent e355d3db80
commit a8b7b28fd0
14 changed files with 494 additions and 174 deletions

View File

@@ -90,7 +90,12 @@ impl HealChannelProcessor {
/// Process start request
async fn process_start_request(&self, request: HealChannelRequest) -> Result<()> {
info!("Processing heal start request: {} for bucket: {}", request.id, request.bucket);
info!(
"Processing heal start request: {} for bucket: {}/{}",
request.id,
request.bucket,
request.object_prefix.as_deref().unwrap_or("")
);
// Convert channel request to heal request
let heal_request = self.convert_to_heal_request(request.clone())?;
@@ -324,6 +329,14 @@ mod tests {
async fn list_objects_for_heal(&self, _bucket: &str, _prefix: &str) -> crate::Result<Vec<String>> {
Ok(vec![])
}
async fn list_objects_for_heal_page(
&self,
_bucket: &str,
_prefix: &str,
_continuation_token: Option<&str>,
) -> crate::Result<(Vec<String>, Option<String>, bool)> {
Ok((vec![], None, false))
}
async fn get_disk_for_resume(&self, _set_disk_id: &str) -> crate::Result<rustfs_ecstore::disk::DiskStore> {
Err(crate::Error::other("Not implemented in mock"))
}

View File

@@ -256,84 +256,114 @@ impl ErasureSetHealer {
}
};
// 2. get objects to heal
let objects = self.storage.list_objects_for_heal(bucket, "").await?;
// 2. process objects with pagination to avoid loading all objects into memory
let mut continuation_token: Option<String> = None;
let mut global_obj_idx = 0usize;
// 3. continue from checkpoint
for (obj_idx, object) in objects.iter().enumerate().skip(*current_object_index) {
// check if already processed
if checkpoint_manager.get_checkpoint().await.processed_objects.contains(object) {
continue;
}
// update current object
resume_manager
.set_current_item(Some(bucket.to_string()), Some(object.clone()))
loop {
// Get one page of objects
let (objects, next_token, is_truncated) = self
.storage
.list_objects_for_heal_page(bucket, "", continuation_token.as_deref())
.await?;
// Check if object still exists before attempting heal
let object_exists = match self.storage.object_exists(bucket, object).await {
Ok(exists) => exists,
Err(e) => {
warn!("Failed to check existence of {}/{}: {}, marking as failed", bucket, object, e);
*failed_objects += 1;
checkpoint_manager.add_failed_object(object.clone()).await?;
*current_object_index = obj_idx + 1;
// Process objects in this page
for object in objects {
// Skip objects before the checkpoint
if global_obj_idx < *current_object_index {
global_obj_idx += 1;
continue;
}
};
if !object_exists {
info!(
target: "rustfs:ahm:heal_bucket_with_resume" ,"Object {}/{} no longer exists, skipping heal (likely deleted intentionally)",
bucket, object
);
checkpoint_manager.add_processed_object(object.clone()).await?;
*successful_objects += 1; // Treat as successful - object is gone as intended
*current_object_index = obj_idx + 1;
continue;
}
// heal object
let heal_opts = HealOpts {
scan_mode: HealScanMode::Normal,
remove: true,
recreate: true, // Keep recreate enabled for legitimate heal scenarios
..Default::default()
};
match self.storage.heal_object(bucket, object, None, &heal_opts).await {
Ok((_result, None)) => {
*successful_objects += 1;
checkpoint_manager.add_processed_object(object.clone()).await?;
info!("Successfully healed object {}/{}", bucket, object);
// check if already processed
if checkpoint_manager.get_checkpoint().await.processed_objects.contains(&object) {
global_obj_idx += 1;
continue;
}
Ok((_, Some(err))) => {
*failed_objects += 1;
checkpoint_manager.add_failed_object(object.clone()).await?;
warn!("Failed to heal object {}/{}: {}", bucket, object, err);
}
Err(err) => {
*failed_objects += 1;
checkpoint_manager.add_failed_object(object.clone()).await?;
warn!("Error healing object {}/{}: {}", bucket, object, err);
}
}
*processed_objects += 1;
*current_object_index = obj_idx + 1;
// check cancel status
if self.cancel_token.is_cancelled() {
info!("Heal task cancelled during object processing");
return Err(Error::TaskCancelled);
}
// save checkpoint periodically
if obj_idx % 100 == 0 {
checkpoint_manager
.update_position(bucket_index, *current_object_index)
// update current object
resume_manager
.set_current_item(Some(bucket.to_string()), Some(object.clone()))
.await?;
// Check if object still exists before attempting heal
let object_exists = match self.storage.object_exists(bucket, &object).await {
Ok(exists) => exists,
Err(e) => {
warn!("Failed to check existence of {}/{}: {}, marking as failed", bucket, object, e);
*failed_objects += 1;
checkpoint_manager.add_failed_object(object.clone()).await?;
global_obj_idx += 1;
*current_object_index = global_obj_idx;
continue;
}
};
if !object_exists {
info!(
target: "rustfs:ahm:heal_bucket_with_resume" ,"Object {}/{} no longer exists, skipping heal (likely deleted intentionally)",
bucket, object
);
checkpoint_manager.add_processed_object(object.clone()).await?;
*successful_objects += 1; // Treat as successful - object is gone as intended
global_obj_idx += 1;
*current_object_index = global_obj_idx;
continue;
}
// heal object
let heal_opts = HealOpts {
scan_mode: HealScanMode::Normal,
remove: true,
recreate: true, // Keep recreate enabled for legitimate heal scenarios
..Default::default()
};
match self.storage.heal_object(bucket, &object, None, &heal_opts).await {
Ok((_result, None)) => {
*successful_objects += 1;
checkpoint_manager.add_processed_object(object.clone()).await?;
info!("Successfully healed object {}/{}", bucket, object);
}
Ok((_, Some(err))) => {
*failed_objects += 1;
checkpoint_manager.add_failed_object(object.clone()).await?;
warn!("Failed to heal object {}/{}: {}", bucket, object, err);
}
Err(err) => {
*failed_objects += 1;
checkpoint_manager.add_failed_object(object.clone()).await?;
warn!("Error healing object {}/{}: {}", bucket, object, err);
}
}
*processed_objects += 1;
global_obj_idx += 1;
*current_object_index = global_obj_idx;
// check cancel status
if self.cancel_token.is_cancelled() {
info!("Heal task cancelled during object processing");
return Err(Error::TaskCancelled);
}
// save checkpoint periodically
if global_obj_idx % 100 == 0 {
checkpoint_manager
.update_position(bucket_index, *current_object_index)
.await?;
}
}
// Check if there are more pages
if !is_truncated {
break;
}
continuation_token = next_token;
if continuation_token.is_none() {
warn!("List is truncated but no continuation token provided for {}", bucket);
break;
}
}
@@ -399,16 +429,12 @@ impl ErasureSetHealer {
}
};
// 2. get objects to heal
let objects = storage.list_objects_for_heal(bucket, "").await?;
// 2. process objects with pagination to avoid loading all objects into memory
let mut continuation_token: Option<String> = None;
let mut total_scanned = 0u64;
let mut total_success = 0u64;
let mut total_failed = 0u64;
// 3. update progress
{
let mut p = progress.write().await;
p.objects_scanned += objects.len() as u64;
}
// 4. heal objects concurrently
let heal_opts = HealOpts {
scan_mode: HealScanMode::Normal,
remove: true, // remove corrupted data
@@ -416,27 +442,65 @@ impl ErasureSetHealer {
..Default::default()
};
let object_results = Self::heal_objects_concurrently(storage, bucket, &objects, &heal_opts, progress).await;
loop {
// Get one page of objects
let (objects, next_token, is_truncated) = storage
.list_objects_for_heal_page(bucket, "", continuation_token.as_deref())
.await?;
// 5. count results
let (success_count, failure_count) = object_results
.into_iter()
.fold((0, 0), |(success, failure), result| match result {
Ok(_) => (success + 1, failure),
Err(_) => (success, failure + 1),
});
let page_count = objects.len() as u64;
total_scanned += page_count;
// 6. update progress
// 3. update progress
{
let mut p = progress.write().await;
p.objects_scanned = total_scanned;
}
// 4. heal objects concurrently for this page
let object_results = Self::heal_objects_concurrently(storage, bucket, &objects, &heal_opts, progress).await;
// 5. count results for this page
let (success_count, failure_count) =
object_results
.into_iter()
.fold((0, 0), |(success, failure), result| match result {
Ok(_) => (success + 1, failure),
Err(_) => (success, failure + 1),
});
total_success += success_count;
total_failed += failure_count;
// 6. update progress
{
let mut p = progress.write().await;
p.objects_healed = total_success;
p.objects_failed = total_failed;
p.set_current_object(Some(format!("processing bucket: {bucket} (page)")));
}
// Check if there are more pages
if !is_truncated {
break;
}
continuation_token = next_token;
if continuation_token.is_none() {
warn!("List is truncated but no continuation token provided for {}", bucket);
break;
}
}
// 7. final progress update
{
let mut p = progress.write().await;
p.objects_healed += success_count;
p.objects_failed += failure_count;
p.set_current_object(Some(format!("completed bucket: {bucket}")));
}
info!(
"Completed heal for bucket {}: {} success, {} failures",
bucket, success_count, failure_count
"Completed heal for bucket {}: {} success, {} failures (total scanned: {})",
bucket, total_success, total_failed, total_scanned
);
Ok(())

View File

@@ -270,7 +270,7 @@ impl HealManager {
// start scheduler
self.start_scheduler().await?;
// start auto disk scanner
// start auto disk scanner to heal unformatted disks
self.start_auto_disk_scanner().await?;
info!("HealManager started successfully");
@@ -453,13 +453,18 @@ impl HealManager {
let cancel_token = self.cancel_token.clone();
let storage = self.storage.clone();
info!(
"start_auto_disk_scanner: Starting auto disk scanner with interval: {:?}",
config.read().await.heal_interval
);
tokio::spawn(async move {
let mut interval = interval(config.read().await.heal_interval);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("Auto disk scanner received shutdown signal");
info!("start_auto_disk_scanner: Auto disk scanner received shutdown signal");
break;
}
_ = interval.tick() => {
@@ -478,6 +483,7 @@ impl HealManager {
}
if endpoints.is_empty() {
info!("start_auto_disk_scanner: No endpoints need healing");
continue;
}
@@ -485,7 +491,7 @@ impl HealManager {
let buckets = match storage.list_buckets().await {
Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::<Vec<String>>(),
Err(e) => {
error!("Failed to get bucket list for auto healing: {}", e);
error!("start_auto_disk_scanner: Failed to get bucket list for auto healing: {}", e);
continue;
}
};
@@ -495,7 +501,7 @@ impl HealManager {
let Some(set_disk_id) =
crate::heal::utils::format_set_disk_id_from_i32(ep.pool_idx, ep.set_idx)
else {
warn!("Skipping endpoint {} without valid pool/set index", ep);
warn!("start_auto_disk_scanner: Skipping endpoint {} without valid pool/set index", ep);
continue;
};
// skip if already queued or healing
@@ -521,6 +527,7 @@ impl HealManager {
}
if skip {
info!("start_auto_disk_scanner: Skipping auto erasure set heal for endpoint: {} (set_disk_id: {}) because it is already queued or healing", ep, set_disk_id);
continue;
}
@@ -535,7 +542,7 @@ impl HealManager {
);
let mut queue = heal_queue.lock().await;
queue.push(req);
info!("Enqueued auto erasure set heal for endpoint: {} (set_disk_id: {})", ep, set_disk_id);
info!("start_auto_disk_scanner: Enqueued auto erasure set heal for endpoint: {} (set_disk_id: {})", ep, set_disk_id);
}
}
}

View File

@@ -107,9 +107,21 @@ pub trait HealStorageAPI: Send + Sync {
/// Heal format using ecstore
async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option<Error>)>;
/// List objects for healing
/// List objects for healing (returns all objects, may use significant memory for large buckets)
///
/// WARNING: This method loads all objects into memory at once. For buckets with many objects,
/// consider using `list_objects_for_heal_page` instead to process objects in pages.
async fn list_objects_for_heal(&self, bucket: &str, prefix: &str) -> Result<Vec<String>>;
/// List objects for healing with pagination (returns one page and continuation token)
/// Returns (objects, next_continuation_token, is_truncated)
async fn list_objects_for_heal_page(
&self,
bucket: &str,
prefix: &str,
continuation_token: Option<&str>,
) -> Result<(Vec<String>, Option<String>, bool)>;
/// Get disk for resume functionality
async fn get_disk_for_resume(&self, set_disk_id: &str) -> Result<DiskStore>;
}
@@ -493,24 +505,67 @@ impl HealStorageAPI for ECStoreHealStorage {
async fn list_objects_for_heal(&self, bucket: &str, prefix: &str) -> Result<Vec<String>> {
debug!("Listing objects for heal: {}/{}", bucket, prefix);
warn!(
"list_objects_for_heal loads all objects into memory. For large buckets, consider using list_objects_for_heal_page instead."
);
// Use list_objects_v2 to get objects
match self
.ecstore
.clone()
.list_objects_v2(bucket, prefix, None, None, 1000, false, None, false)
.await
{
Ok(list_info) => {
let objects: Vec<String> = list_info.objects.into_iter().map(|obj| obj.name).collect();
info!("Found {} objects for heal in {}/{}", objects.len(), bucket, prefix);
Ok(objects)
let mut all_objects = Vec::new();
let mut continuation_token: Option<String> = None;
loop {
let (page_objects, next_token, is_truncated) = self
.list_objects_for_heal_page(bucket, prefix, continuation_token.as_deref())
.await?;
all_objects.extend(page_objects);
if !is_truncated {
break;
}
Err(e) => {
error!("Failed to list objects for heal: {}/{} - {}", bucket, prefix, e);
Err(Error::other(e))
continuation_token = next_token;
if continuation_token.is_none() {
warn!("List is truncated but no continuation token provided for {}/{}", bucket, prefix);
break;
}
}
info!("Found {} objects for heal in {}/{}", all_objects.len(), bucket, prefix);
Ok(all_objects)
}
async fn list_objects_for_heal_page(
&self,
bucket: &str,
prefix: &str,
continuation_token: Option<&str>,
) -> Result<(Vec<String>, Option<String>, bool)> {
debug!("Listing objects for heal (page): {}/{}", bucket, prefix);
const MAX_KEYS: i32 = 1000;
let continuation_token_opt = continuation_token.map(|s| s.to_string());
// Use list_objects_v2 to get objects with pagination
let list_info = match self
.ecstore
.clone()
.list_objects_v2(bucket, prefix, continuation_token_opt, None, MAX_KEYS, false, None, false)
.await
{
Ok(info) => info,
Err(e) => {
error!("Failed to list objects for heal: {}/{} - {}", bucket, prefix, e);
return Err(Error::other(e));
}
};
// Collect objects from this page
let page_objects: Vec<String> = list_info.objects.into_iter().map(|obj| obj.name).collect();
let page_count = page_objects.len();
debug!("Listed {} objects (page) for heal in {}/{}", page_count, bucket, prefix);
Ok((page_objects, list_info.next_continuation_token, list_info.is_truncated))
}
async fn get_disk_for_resume(&self, set_disk_id: &str) -> Result<DiskStore> {

View File

@@ -600,6 +600,7 @@ impl Scanner {
// Initialize and start the node scanner
self.node_scanner.initialize_stats().await?;
// update object count and size for each bucket
self.node_scanner.start().await?;
// Set local stats in aggregator
@@ -614,21 +615,6 @@ impl Scanner {
}
});
// Trigger an immediate data usage collection so that admin APIs have fresh data after startup.
let scanner = self.clone_for_background();
tokio::spawn(async move {
let enable_stats = {
let cfg = scanner.config.read().await;
cfg.enable_data_usage_stats
};
if enable_stats {
if let Err(e) = scanner.collect_and_persist_data_usage().await {
warn!("Initial data usage collection failed: {}", e);
}
}
});
Ok(())
}

View File

@@ -711,6 +711,7 @@ impl NodeScanner {
// start scanning loop
let scanner_clone = self.clone_for_background();
tokio::spawn(async move {
// update object count and size for each bucket
if let Err(e) = scanner_clone.scan_loop_with_resume(None).await {
error!("scanning loop failed: {}", e);
}

View File

@@ -244,6 +244,14 @@ fn test_heal_task_status_atomic_update() {
async fn list_objects_for_heal(&self, _bucket: &str, _prefix: &str) -> rustfs_ahm::Result<Vec<String>> {
Ok(vec![])
}
async fn list_objects_for_heal_page(
&self,
_bucket: &str,
_prefix: &str,
_continuation_token: Option<&str>,
) -> rustfs_ahm::Result<(Vec<String>, Option<String>, bool)> {
Ok((vec![], None, false))
}
async fn get_disk_for_resume(&self, _set_disk_id: &str) -> rustfs_ahm::Result<rustfs_ecstore::disk::DiskStore> {
Err(rustfs_ahm::Error::other("Not implemented in mock"))
}

View File

@@ -85,12 +85,90 @@ impl Display for DriveState {
}
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[repr(u8)]
pub enum HealScanMode {
Unknown,
Unknown = 0,
#[default]
Normal,
Deep,
Normal = 1,
Deep = 2,
}
impl Serialize for HealScanMode {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_u8(*self as u8)
}
}
impl<'de> Deserialize<'de> for HealScanMode {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct HealScanModeVisitor;
impl<'de> serde::de::Visitor<'de> for HealScanModeVisitor {
type Value = HealScanMode;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("an integer between 0 and 2")
}
fn visit_u8<E>(self, value: u8) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
match value {
0 => Ok(HealScanMode::Unknown),
1 => Ok(HealScanMode::Normal),
2 => Ok(HealScanMode::Deep),
_ => Err(E::custom(format!("invalid HealScanMode value: {}", value))),
}
}
fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
if value > u8::MAX as u64 {
return Err(E::custom(format!("HealScanMode value too large: {}", value)));
}
self.visit_u8(value as u8)
}
fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
if value < 0 || value > u8::MAX as i64 {
return Err(E::custom(format!("invalid HealScanMode value: {}", value)));
}
self.visit_u8(value as u8)
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
// Try parsing as number string first (for URL-encoded values)
if let Ok(num) = value.parse::<u8>() {
return self.visit_u8(num);
}
// Try parsing as named string
match value {
"Unknown" | "unknown" => Ok(HealScanMode::Unknown),
"Normal" | "normal" => Ok(HealScanMode::Normal),
"Deep" | "deep" => Ok(HealScanMode::Deep),
_ => Err(E::custom(format!("invalid HealScanMode string: {}", value))),
}
}
}
deserializer.deserialize_any(HealScanModeVisitor)
}
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize)]
@@ -106,7 +184,9 @@ pub struct HealOpts {
pub update_parity: bool,
#[serde(rename = "nolock")]
pub no_lock: bool,
#[serde(rename = "pool", default)]
pub pool: Option<usize>,
#[serde(rename = "set", default)]
pub set: Option<usize>,
}

View File

@@ -967,9 +967,7 @@ impl LocalDisk {
sum: &[u8],
shard_size: usize,
) -> Result<()> {
let file = super::fs::open_file(part_path, O_CREATE | O_WRONLY)
.await
.map_err(to_file_error)?;
let file = super::fs::open_file(part_path, O_RDONLY).await.map_err(to_file_error)?;
let meta = file.metadata().await.map_err(to_file_error)?;
let file_size = meta.len() as usize;
@@ -1465,6 +1463,7 @@ impl DiskAPI for LocalDisk {
resp.results[i] = conv_part_err_to_int(&err);
if resp.results[i] == CHECK_PART_UNKNOWN {
if let Some(err) = err {
error!("verify_file: failed to bitrot verify file: {:?}, error: {:?}", &part_path, &err);
if err == DiskError::FileAccessDenied {
continue;
}
@@ -1551,7 +1550,7 @@ impl DiskAPI for LocalDisk {
.join(fi.data_dir.map_or("".to_string(), |dir| dir.to_string()))
.join(format!("part.{}", part.number));
match lstat(file_path).await {
match lstat(&file_path).await {
Ok(st) => {
if st.is_dir() {
resp.results[i] = CHECK_PART_FILE_NOT_FOUND;
@@ -1577,6 +1576,8 @@ impl DiskAPI for LocalDisk {
}
}
resp.results[i] = CHECK_PART_FILE_NOT_FOUND;
} else {
error!("check_parts: failed to stat file: {:?}, error: {:?}", &file_path, &e);
}
continue;
}

View File

@@ -681,7 +681,10 @@ pub fn conv_part_err_to_int(err: &Option<Error>) -> usize {
Some(DiskError::VolumeNotFound) => CHECK_PART_VOLUME_NOT_FOUND,
Some(DiskError::DiskNotFound) => CHECK_PART_DISK_NOT_FOUND,
None => CHECK_PART_SUCCESS,
_ => CHECK_PART_UNKNOWN,
_ => {
tracing::warn!("conv_part_err_to_int: unknown error: {err:?}");
CHECK_PART_UNKNOWN
}
}
}

View File

@@ -25,7 +25,7 @@ use crate::client::{object_api_utils::get_raw_etag, transition_api::ReaderImpl};
use crate::disk::STORAGE_FORMAT_FILE;
use crate::disk::error_reduce::{OBJECT_OP_IGNORED_ERRS, reduce_read_quorum_errs, reduce_write_quorum_errs};
use crate::disk::{
self, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS,
self, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, CHECK_PART_UNKNOWN,
conv_part_err_to_int, has_part_err,
};
use crate::erasure_coding;
@@ -6431,6 +6431,10 @@ fn join_errs(errs: &[Option<DiskError>]) -> String {
errs.join(", ")
}
/// disks_with_all_partsv2 is a corrected version based on Go implementation.
/// It sets partsMetadata and onlineDisks when xl.meta is inexistant/corrupted or outdated.
/// It also checks if the status of each part (corrupted, missing, ok) in each drive.
/// Returns (availableDisks, dataErrsByDisk, dataErrsByPart).
async fn disks_with_all_parts(
online_disks: &[Option<DiskStore>],
parts_metadata: &mut [FileInfo],
@@ -6440,39 +6444,66 @@ async fn disks_with_all_parts(
object: &str,
scan_mode: HealScanMode,
) -> disk::error::Result<(Vec<Option<DiskStore>>, HashMap<usize, Vec<usize>>, HashMap<usize, Vec<usize>>)> {
info!(
"disks_with_all_parts: starting with online_disks.len()={}, scan_mode={:?}",
let object_name = latest_meta.name.clone();
debug!(
"disks_with_all_partsv2: starting with object_name={}, online_disks.len()={}, scan_mode={:?}",
object_name,
online_disks.len(),
scan_mode
);
let mut available_disks = vec![None; online_disks.len()];
// Initialize dataErrsByDisk and dataErrsByPart with 0 (CHECK_PART_UNKNOWN) to match Go
let mut data_errs_by_disk: HashMap<usize, Vec<usize>> = HashMap::new();
for i in 0..online_disks.len() {
data_errs_by_disk.insert(i, vec![1; latest_meta.parts.len()]);
data_errs_by_disk.insert(i, vec![CHECK_PART_SUCCESS; latest_meta.parts.len()]);
}
let mut data_errs_by_part: HashMap<usize, Vec<usize>> = HashMap::new();
for i in 0..latest_meta.parts.len() {
data_errs_by_part.insert(i, vec![1; online_disks.len()]);
data_errs_by_part.insert(i, vec![CHECK_PART_SUCCESS; online_disks.len()]);
}
// Check for inconsistent erasure distribution
let mut inconsistent = 0;
parts_metadata.iter().enumerate().for_each(|(index, meta)| {
if meta.is_valid() && !meta.deleted && meta.erasure.distribution.len() != online_disks.len()
|| (!meta.erasure.distribution.is_empty() && meta.erasure.distribution[index] != meta.erasure.index)
{
warn!("file info inconsistent, meta: {:?}", meta);
inconsistent += 1;
for (index, meta) in parts_metadata.iter().enumerate() {
if !meta.is_valid() {
// Since for majority of the cases erasure.Index matches with erasure.Distribution we can
// consider the offline disks as consistent.
continue;
}
});
if !meta.deleted {
if meta.erasure.distribution.len() != online_disks.len() {
// Erasure distribution seems to have lesser
// number of items than number of online disks.
inconsistent += 1;
continue;
}
if !meta.erasure.distribution.is_empty()
&& index < meta.erasure.distribution.len()
&& meta.erasure.distribution[index] != meta.erasure.index
{
// Mismatch indexes with distribution order
inconsistent += 1;
}
}
}
let erasure_distribution_reliable = inconsistent <= parts_metadata.len() / 2;
// Initialize metaErrs
let mut meta_errs = Vec::with_capacity(errs.len());
for _ in 0..errs.len() {
meta_errs.push(None);
}
// Process meta errors
for (index, disk) in online_disks.iter().enumerate() {
if let Some(err) = &errs[index] {
meta_errs[index] = Some(err.clone());
continue;
}
let disk = if let Some(disk) = disk {
disk
} else {
@@ -6480,48 +6511,59 @@ async fn disks_with_all_parts(
continue;
};
if let Some(err) = &errs[index] {
meta_errs[index] = Some(err.clone());
continue;
}
if !disk.is_online().await {
meta_errs[index] = Some(DiskError::DiskNotFound);
continue;
}
let meta = &parts_metadata[index];
if !meta.mod_time.eq(&latest_meta.mod_time) || !meta.data_dir.eq(&latest_meta.data_dir) {
warn!("mod_time is not Eq, file corrupt, index: {index}");
// Check if metadata is corrupted (equivalent to filterByETag=false in Go)
let corrupted = !meta.mod_time.eq(&latest_meta.mod_time) || !meta.data_dir.eq(&latest_meta.data_dir);
if corrupted {
meta_errs[index] = Some(DiskError::FileCorrupt);
parts_metadata[index] = FileInfo::default();
continue;
}
if erasure_distribution_reliable {
if !meta.is_valid() {
warn!("file info is not valid, file corrupt, index: {index}");
parts_metadata[index] = FileInfo::default();
meta_errs[index] = Some(DiskError::FileCorrupt);
continue;
}
#[allow(clippy::collapsible_if)]
if !meta.deleted && meta.erasure.distribution.len() != online_disks.len() {
warn!("file info distribution len not Eq online_disks len, file corrupt, index: {index}");
// Erasure distribution is not the same as onlineDisks
// attempt a fix if possible, assuming other entries
// might have the right erasure distribution.
parts_metadata[index] = FileInfo::default();
meta_errs[index] = Some(DiskError::FileCorrupt);
continue;
}
}
}
// info!("meta_errs: {:?}, errs: {:?}", meta_errs, errs);
meta_errs.iter().enumerate().for_each(|(index, err)| {
// Copy meta errors to part errors
for (index, err) in meta_errs.iter().enumerate() {
if err.is_some() {
let part_err = conv_part_err_to_int(err);
for p in 0..latest_meta.parts.len() {
data_errs_by_part.entry(p).or_insert(vec![0; meta_errs.len()])[index] = part_err;
if let Some(vec) = data_errs_by_part.get_mut(&p) {
if index < vec.len() {
info!(
"data_errs_by_part: copy meta errors to part errors: object_name={}, index: {index}, part: {p}, part_err: {part_err}",
object_name
);
vec[index] = part_err;
}
}
}
}
});
}
// info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk);
// Check data for each disk
for (index, disk) in online_disks.iter().enumerate() {
if meta_errs[index].is_some() {
continue;
@@ -6530,7 +6572,6 @@ async fn disks_with_all_parts(
let disk = if let Some(disk) = disk {
disk
} else {
meta_errs[index] = Some(DiskError::DiskNotFound);
continue;
};
@@ -6558,16 +6599,21 @@ async fn disks_with_all_parts(
if let Some(vec) = data_errs_by_part.get_mut(&0) {
if index < vec.len() {
vec[index] = conv_part_err_to_int(&verify_err.map(|e| e.into()));
info!("bitrot check result: {}", vec[index]);
info!(
"data_errs_by_part:bitrot check result: object_name={}, index: {index}, result: {}",
object_name, vec[index]
);
}
}
}
continue;
}
// Verify file or check parts
let mut verify_resp = CheckPartsResp::default();
let mut verify_err = None;
meta.data_dir = latest_meta.data_dir;
if scan_mode == HealScanMode::Deep {
// disk has a valid xl.meta but may not have all the
// parts. This is considered an outdated disk, since
@@ -6577,6 +6623,7 @@ async fn disks_with_all_parts(
verify_resp = v;
}
Err(err) => {
warn!("verify_file failed: {err:?}, object_name={}, index: {index}", object_name);
verify_err = Some(err);
}
}
@@ -6586,38 +6633,85 @@ async fn disks_with_all_parts(
verify_resp = v;
}
Err(err) => {
warn!("check_parts failed: {err:?}, object_name={}, index: {index}", object_name);
verify_err = Some(err);
}
}
}
// Update dataErrsByPart for all parts
for p in 0..latest_meta.parts.len() {
if let Some(vec) = data_errs_by_part.get_mut(&p) {
if index < vec.len() {
if verify_err.is_some() {
info!("verify_err");
info!(
"data_errs_by_part: verify_err: object_name={}, index: {index}, part: {p}, verify_err: {verify_err:?}",
object_name
);
vec[index] = conv_part_err_to_int(&verify_err.clone());
} else {
info!("verify_resp, verify_resp.results {}", verify_resp.results[p]);
vec[index] = verify_resp.results[p];
// Fix: verify_resp.results length is based on meta.parts, not latest_meta.parts
// We need to check bounds to avoid panic
if p < verify_resp.results.len() {
info!(
"data_errs_by_part: update data_errs_by_part: object_name={}, index: {}, part: {}, verify_resp.results: {:?}",
object_name, index, p, verify_resp.results[p]
);
vec[index] = verify_resp.results[p];
} else {
debug!(
"data_errs_by_part: verify_resp.results length mismatch: expected at least {}, got {}, object_name={}, index: {index}, part: {p}",
p + 1,
verify_resp.results.len(),
object_name
);
vec[index] = CHECK_PART_SUCCESS;
}
}
}
}
}
}
// info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk);
// Build dataErrsByDisk from dataErrsByPart
for (part, disks) in data_errs_by_part.iter() {
for (idx, disk) in disks.iter().enumerate() {
if let Some(vec) = data_errs_by_disk.get_mut(&idx) {
vec[*part] = *disk;
for (disk_idx, disk_err) in disks.iter().enumerate() {
if let Some(vec) = data_errs_by_disk.get_mut(&disk_idx) {
if *part < vec.len() {
vec[*part] = *disk_err;
info!(
"data_errs_by_disk: update data_errs_by_disk: object_name={}, part: {part}, disk_idx: {disk_idx}, disk_err: {disk_err}",
object_name,
);
}
}
}
}
// info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk);
// Calculate available_disks based on meta_errs and data_errs_by_disk
for (i, disk) in online_disks.iter().enumerate() {
if meta_errs[i].is_none() && disk.is_some() && !has_part_err(&data_errs_by_disk[&i]) {
available_disks[i] = Some(disk.clone().unwrap());
if let Some(disk_errs) = data_errs_by_disk.get(&i) {
if meta_errs[i].is_none() && disk.is_some() && !has_part_err(disk_errs) {
available_disks[i] = Some(disk.clone().unwrap());
} else {
warn!(
"disks_with_all_partsv2: disk is not available, object_name={}, index: {}, meta_errs={:?}, disk_errs={:?}, disk_is_some={:?}",
object_name,
i,
meta_errs[i],
disk_errs,
disk.is_some(),
);
parts_metadata[i] = FileInfo::default();
}
} else {
warn!(
"disks_with_all_partsv2: data_errs_by_disk missing entry for object_name={},index {}, meta_errs={:?}, disk_is_some={:?}",
object_name,
i,
meta_errs[i],
disk.is_some(),
);
parts_metadata[i] = FileInfo::default();
}
}

View File

@@ -21,6 +21,7 @@ use serde_json::{Value, json};
use std::collections::HashMap;
use time::OffsetDateTime;
use time::macros::offset;
use tracing::warn;
const ACCESS_KEY_MIN_LEN: usize = 3;
const ACCESS_KEY_MAX_LEN: usize = 20;
@@ -239,6 +240,8 @@ pub fn create_new_credentials_with_metadata(
}
};
warn!("create_new_credentials_with_metadata expiration {expiration:?}, access_key: {ak}");
let token = utils::generate_jwt(&claims, token_secret)?;
Ok(Credentials {

View File

@@ -137,6 +137,11 @@ pub fn make_admin_route(console_enabled: bool) -> std::io::Result<impl S3Route>
// Some APIs are only available in EC mode
// if is_dist_erasure().await || is_erasure().await {
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/heal/{bucket}").as_str(),
AdminOperation(&handlers::HealHandler {}),
)?;
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/heal/{bucket}/{prefix}").as_str(),

View File

@@ -451,7 +451,7 @@ impl Node for NodeService {
}));
}
};
match disk.verify_file(&request.volume, &request.path, &file_info).await {
match disk.check_parts(&request.volume, &request.path, &file_info).await {
Ok(check_parts_resp) => {
let check_parts_resp = match serde_json::to_string(&check_parts_resp) {
Ok(check_parts_resp) => check_parts_resp,