diff --git a/crates/ahm/src/heal/channel.rs b/crates/ahm/src/heal/channel.rs index fc85e671..4490239f 100644 --- a/crates/ahm/src/heal/channel.rs +++ b/crates/ahm/src/heal/channel.rs @@ -90,7 +90,12 @@ impl HealChannelProcessor { /// Process start request async fn process_start_request(&self, request: HealChannelRequest) -> Result<()> { - info!("Processing heal start request: {} for bucket: {}", request.id, request.bucket); + info!( + "Processing heal start request: {} for bucket: {}/{}", + request.id, + request.bucket, + request.object_prefix.as_deref().unwrap_or("") + ); // Convert channel request to heal request let heal_request = self.convert_to_heal_request(request.clone())?; @@ -324,6 +329,14 @@ mod tests { async fn list_objects_for_heal(&self, _bucket: &str, _prefix: &str) -> crate::Result> { Ok(vec![]) } + async fn list_objects_for_heal_page( + &self, + _bucket: &str, + _prefix: &str, + _continuation_token: Option<&str>, + ) -> crate::Result<(Vec, Option, bool)> { + Ok((vec![], None, false)) + } async fn get_disk_for_resume(&self, _set_disk_id: &str) -> crate::Result { Err(crate::Error::other("Not implemented in mock")) } diff --git a/crates/ahm/src/heal/erasure_healer.rs b/crates/ahm/src/heal/erasure_healer.rs index 540499e4..209a0be3 100644 --- a/crates/ahm/src/heal/erasure_healer.rs +++ b/crates/ahm/src/heal/erasure_healer.rs @@ -256,84 +256,114 @@ impl ErasureSetHealer { } }; - // 2. get objects to heal - let objects = self.storage.list_objects_for_heal(bucket, "").await?; + // 2. process objects with pagination to avoid loading all objects into memory + let mut continuation_token: Option = None; + let mut global_obj_idx = 0usize; - // 3. continue from checkpoint - for (obj_idx, object) in objects.iter().enumerate().skip(*current_object_index) { - // check if already processed - if checkpoint_manager.get_checkpoint().await.processed_objects.contains(object) { - continue; - } - - // update current object - resume_manager - .set_current_item(Some(bucket.to_string()), Some(object.clone())) + loop { + // Get one page of objects + let (objects, next_token, is_truncated) = self + .storage + .list_objects_for_heal_page(bucket, "", continuation_token.as_deref()) .await?; - // Check if object still exists before attempting heal - let object_exists = match self.storage.object_exists(bucket, object).await { - Ok(exists) => exists, - Err(e) => { - warn!("Failed to check existence of {}/{}: {}, marking as failed", bucket, object, e); - *failed_objects += 1; - checkpoint_manager.add_failed_object(object.clone()).await?; - *current_object_index = obj_idx + 1; + // Process objects in this page + for object in objects { + // Skip objects before the checkpoint + if global_obj_idx < *current_object_index { + global_obj_idx += 1; continue; } - }; - if !object_exists { - info!( - target: "rustfs:ahm:heal_bucket_with_resume" ,"Object {}/{} no longer exists, skipping heal (likely deleted intentionally)", - bucket, object - ); - checkpoint_manager.add_processed_object(object.clone()).await?; - *successful_objects += 1; // Treat as successful - object is gone as intended - *current_object_index = obj_idx + 1; - continue; - } - - // heal object - let heal_opts = HealOpts { - scan_mode: HealScanMode::Normal, - remove: true, - recreate: true, // Keep recreate enabled for legitimate heal scenarios - ..Default::default() - }; - - match self.storage.heal_object(bucket, object, None, &heal_opts).await { - Ok((_result, None)) => { - *successful_objects += 1; - checkpoint_manager.add_processed_object(object.clone()).await?; - info!("Successfully healed object {}/{}", bucket, object); + // check if already processed + if checkpoint_manager.get_checkpoint().await.processed_objects.contains(&object) { + global_obj_idx += 1; + continue; } - Ok((_, Some(err))) => { - *failed_objects += 1; - checkpoint_manager.add_failed_object(object.clone()).await?; - warn!("Failed to heal object {}/{}: {}", bucket, object, err); - } - Err(err) => { - *failed_objects += 1; - checkpoint_manager.add_failed_object(object.clone()).await?; - warn!("Error healing object {}/{}: {}", bucket, object, err); - } - } - *processed_objects += 1; - *current_object_index = obj_idx + 1; - - // check cancel status - if self.cancel_token.is_cancelled() { - info!("Heal task cancelled during object processing"); - return Err(Error::TaskCancelled); - } - - // save checkpoint periodically - if obj_idx % 100 == 0 { - checkpoint_manager - .update_position(bucket_index, *current_object_index) + // update current object + resume_manager + .set_current_item(Some(bucket.to_string()), Some(object.clone())) .await?; + + // Check if object still exists before attempting heal + let object_exists = match self.storage.object_exists(bucket, &object).await { + Ok(exists) => exists, + Err(e) => { + warn!("Failed to check existence of {}/{}: {}, marking as failed", bucket, object, e); + *failed_objects += 1; + checkpoint_manager.add_failed_object(object.clone()).await?; + global_obj_idx += 1; + *current_object_index = global_obj_idx; + continue; + } + }; + + if !object_exists { + info!( + target: "rustfs:ahm:heal_bucket_with_resume" ,"Object {}/{} no longer exists, skipping heal (likely deleted intentionally)", + bucket, object + ); + checkpoint_manager.add_processed_object(object.clone()).await?; + *successful_objects += 1; // Treat as successful - object is gone as intended + global_obj_idx += 1; + *current_object_index = global_obj_idx; + continue; + } + + // heal object + let heal_opts = HealOpts { + scan_mode: HealScanMode::Normal, + remove: true, + recreate: true, // Keep recreate enabled for legitimate heal scenarios + ..Default::default() + }; + + match self.storage.heal_object(bucket, &object, None, &heal_opts).await { + Ok((_result, None)) => { + *successful_objects += 1; + checkpoint_manager.add_processed_object(object.clone()).await?; + info!("Successfully healed object {}/{}", bucket, object); + } + Ok((_, Some(err))) => { + *failed_objects += 1; + checkpoint_manager.add_failed_object(object.clone()).await?; + warn!("Failed to heal object {}/{}: {}", bucket, object, err); + } + Err(err) => { + *failed_objects += 1; + checkpoint_manager.add_failed_object(object.clone()).await?; + warn!("Error healing object {}/{}: {}", bucket, object, err); + } + } + + *processed_objects += 1; + global_obj_idx += 1; + *current_object_index = global_obj_idx; + + // check cancel status + if self.cancel_token.is_cancelled() { + info!("Heal task cancelled during object processing"); + return Err(Error::TaskCancelled); + } + + // save checkpoint periodically + if global_obj_idx % 100 == 0 { + checkpoint_manager + .update_position(bucket_index, *current_object_index) + .await?; + } + } + + // Check if there are more pages + if !is_truncated { + break; + } + + continuation_token = next_token; + if continuation_token.is_none() { + warn!("List is truncated but no continuation token provided for {}", bucket); + break; } } @@ -399,16 +429,12 @@ impl ErasureSetHealer { } }; - // 2. get objects to heal - let objects = storage.list_objects_for_heal(bucket, "").await?; + // 2. process objects with pagination to avoid loading all objects into memory + let mut continuation_token: Option = None; + let mut total_scanned = 0u64; + let mut total_success = 0u64; + let mut total_failed = 0u64; - // 3. update progress - { - let mut p = progress.write().await; - p.objects_scanned += objects.len() as u64; - } - - // 4. heal objects concurrently let heal_opts = HealOpts { scan_mode: HealScanMode::Normal, remove: true, // remove corrupted data @@ -416,27 +442,65 @@ impl ErasureSetHealer { ..Default::default() }; - let object_results = Self::heal_objects_concurrently(storage, bucket, &objects, &heal_opts, progress).await; + loop { + // Get one page of objects + let (objects, next_token, is_truncated) = storage + .list_objects_for_heal_page(bucket, "", continuation_token.as_deref()) + .await?; - // 5. count results - let (success_count, failure_count) = object_results - .into_iter() - .fold((0, 0), |(success, failure), result| match result { - Ok(_) => (success + 1, failure), - Err(_) => (success, failure + 1), - }); + let page_count = objects.len() as u64; + total_scanned += page_count; - // 6. update progress + // 3. update progress + { + let mut p = progress.write().await; + p.objects_scanned = total_scanned; + } + + // 4. heal objects concurrently for this page + let object_results = Self::heal_objects_concurrently(storage, bucket, &objects, &heal_opts, progress).await; + + // 5. count results for this page + let (success_count, failure_count) = + object_results + .into_iter() + .fold((0, 0), |(success, failure), result| match result { + Ok(_) => (success + 1, failure), + Err(_) => (success, failure + 1), + }); + + total_success += success_count; + total_failed += failure_count; + + // 6. update progress + { + let mut p = progress.write().await; + p.objects_healed = total_success; + p.objects_failed = total_failed; + p.set_current_object(Some(format!("processing bucket: {bucket} (page)"))); + } + + // Check if there are more pages + if !is_truncated { + break; + } + + continuation_token = next_token; + if continuation_token.is_none() { + warn!("List is truncated but no continuation token provided for {}", bucket); + break; + } + } + + // 7. final progress update { let mut p = progress.write().await; - p.objects_healed += success_count; - p.objects_failed += failure_count; p.set_current_object(Some(format!("completed bucket: {bucket}"))); } info!( - "Completed heal for bucket {}: {} success, {} failures", - bucket, success_count, failure_count + "Completed heal for bucket {}: {} success, {} failures (total scanned: {})", + bucket, total_success, total_failed, total_scanned ); Ok(()) diff --git a/crates/ahm/src/heal/manager.rs b/crates/ahm/src/heal/manager.rs index 93a68396..95556386 100644 --- a/crates/ahm/src/heal/manager.rs +++ b/crates/ahm/src/heal/manager.rs @@ -270,7 +270,7 @@ impl HealManager { // start scheduler self.start_scheduler().await?; - // start auto disk scanner + // start auto disk scanner to heal unformatted disks self.start_auto_disk_scanner().await?; info!("HealManager started successfully"); @@ -453,13 +453,18 @@ impl HealManager { let cancel_token = self.cancel_token.clone(); let storage = self.storage.clone(); + info!( + "start_auto_disk_scanner: Starting auto disk scanner with interval: {:?}", + config.read().await.heal_interval + ); + tokio::spawn(async move { let mut interval = interval(config.read().await.heal_interval); loop { tokio::select! { _ = cancel_token.cancelled() => { - info!("Auto disk scanner received shutdown signal"); + info!("start_auto_disk_scanner: Auto disk scanner received shutdown signal"); break; } _ = interval.tick() => { @@ -478,6 +483,7 @@ impl HealManager { } if endpoints.is_empty() { + info!("start_auto_disk_scanner: No endpoints need healing"); continue; } @@ -485,7 +491,7 @@ impl HealManager { let buckets = match storage.list_buckets().await { Ok(buckets) => buckets.iter().map(|b| b.name.clone()).collect::>(), Err(e) => { - error!("Failed to get bucket list for auto healing: {}", e); + error!("start_auto_disk_scanner: Failed to get bucket list for auto healing: {}", e); continue; } }; @@ -495,7 +501,7 @@ impl HealManager { let Some(set_disk_id) = crate::heal::utils::format_set_disk_id_from_i32(ep.pool_idx, ep.set_idx) else { - warn!("Skipping endpoint {} without valid pool/set index", ep); + warn!("start_auto_disk_scanner: Skipping endpoint {} without valid pool/set index", ep); continue; }; // skip if already queued or healing @@ -521,6 +527,7 @@ impl HealManager { } if skip { + info!("start_auto_disk_scanner: Skipping auto erasure set heal for endpoint: {} (set_disk_id: {}) because it is already queued or healing", ep, set_disk_id); continue; } @@ -535,7 +542,7 @@ impl HealManager { ); let mut queue = heal_queue.lock().await; queue.push(req); - info!("Enqueued auto erasure set heal for endpoint: {} (set_disk_id: {})", ep, set_disk_id); + info!("start_auto_disk_scanner: Enqueued auto erasure set heal for endpoint: {} (set_disk_id: {})", ep, set_disk_id); } } } diff --git a/crates/ahm/src/heal/storage.rs b/crates/ahm/src/heal/storage.rs index 40e13659..369c475c 100644 --- a/crates/ahm/src/heal/storage.rs +++ b/crates/ahm/src/heal/storage.rs @@ -107,9 +107,21 @@ pub trait HealStorageAPI: Send + Sync { /// Heal format using ecstore async fn heal_format(&self, dry_run: bool) -> Result<(HealResultItem, Option)>; - /// List objects for healing + /// List objects for healing (returns all objects, may use significant memory for large buckets) + /// + /// WARNING: This method loads all objects into memory at once. For buckets with many objects, + /// consider using `list_objects_for_heal_page` instead to process objects in pages. async fn list_objects_for_heal(&self, bucket: &str, prefix: &str) -> Result>; + /// List objects for healing with pagination (returns one page and continuation token) + /// Returns (objects, next_continuation_token, is_truncated) + async fn list_objects_for_heal_page( + &self, + bucket: &str, + prefix: &str, + continuation_token: Option<&str>, + ) -> Result<(Vec, Option, bool)>; + /// Get disk for resume functionality async fn get_disk_for_resume(&self, set_disk_id: &str) -> Result; } @@ -493,24 +505,67 @@ impl HealStorageAPI for ECStoreHealStorage { async fn list_objects_for_heal(&self, bucket: &str, prefix: &str) -> Result> { debug!("Listing objects for heal: {}/{}", bucket, prefix); + warn!( + "list_objects_for_heal loads all objects into memory. For large buckets, consider using list_objects_for_heal_page instead." + ); - // Use list_objects_v2 to get objects - match self - .ecstore - .clone() - .list_objects_v2(bucket, prefix, None, None, 1000, false, None, false) - .await - { - Ok(list_info) => { - let objects: Vec = list_info.objects.into_iter().map(|obj| obj.name).collect(); - info!("Found {} objects for heal in {}/{}", objects.len(), bucket, prefix); - Ok(objects) + let mut all_objects = Vec::new(); + let mut continuation_token: Option = None; + + loop { + let (page_objects, next_token, is_truncated) = self + .list_objects_for_heal_page(bucket, prefix, continuation_token.as_deref()) + .await?; + + all_objects.extend(page_objects); + + if !is_truncated { + break; } - Err(e) => { - error!("Failed to list objects for heal: {}/{} - {}", bucket, prefix, e); - Err(Error::other(e)) + + continuation_token = next_token; + if continuation_token.is_none() { + warn!("List is truncated but no continuation token provided for {}/{}", bucket, prefix); + break; } } + + info!("Found {} objects for heal in {}/{}", all_objects.len(), bucket, prefix); + Ok(all_objects) + } + + async fn list_objects_for_heal_page( + &self, + bucket: &str, + prefix: &str, + continuation_token: Option<&str>, + ) -> Result<(Vec, Option, bool)> { + debug!("Listing objects for heal (page): {}/{}", bucket, prefix); + + const MAX_KEYS: i32 = 1000; + let continuation_token_opt = continuation_token.map(|s| s.to_string()); + + // Use list_objects_v2 to get objects with pagination + let list_info = match self + .ecstore + .clone() + .list_objects_v2(bucket, prefix, continuation_token_opt, None, MAX_KEYS, false, None, false) + .await + { + Ok(info) => info, + Err(e) => { + error!("Failed to list objects for heal: {}/{} - {}", bucket, prefix, e); + return Err(Error::other(e)); + } + }; + + // Collect objects from this page + let page_objects: Vec = list_info.objects.into_iter().map(|obj| obj.name).collect(); + let page_count = page_objects.len(); + + debug!("Listed {} objects (page) for heal in {}/{}", page_count, bucket, prefix); + + Ok((page_objects, list_info.next_continuation_token, list_info.is_truncated)) } async fn get_disk_for_resume(&self, set_disk_id: &str) -> Result { diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index e895af0a..900d40ce 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -600,6 +600,7 @@ impl Scanner { // Initialize and start the node scanner self.node_scanner.initialize_stats().await?; + // update object count and size for each bucket self.node_scanner.start().await?; // Set local stats in aggregator @@ -614,21 +615,6 @@ impl Scanner { } }); - // Trigger an immediate data usage collection so that admin APIs have fresh data after startup. - let scanner = self.clone_for_background(); - tokio::spawn(async move { - let enable_stats = { - let cfg = scanner.config.read().await; - cfg.enable_data_usage_stats - }; - - if enable_stats { - if let Err(e) = scanner.collect_and_persist_data_usage().await { - warn!("Initial data usage collection failed: {}", e); - } - } - }); - Ok(()) } diff --git a/crates/ahm/src/scanner/node_scanner.rs b/crates/ahm/src/scanner/node_scanner.rs index a6a01f95..5f3d4d43 100644 --- a/crates/ahm/src/scanner/node_scanner.rs +++ b/crates/ahm/src/scanner/node_scanner.rs @@ -711,6 +711,7 @@ impl NodeScanner { // start scanning loop let scanner_clone = self.clone_for_background(); tokio::spawn(async move { + // update object count and size for each bucket if let Err(e) = scanner_clone.scan_loop_with_resume(None).await { error!("scanning loop failed: {}", e); } diff --git a/crates/ahm/tests/heal_bug_fixes_test.rs b/crates/ahm/tests/heal_bug_fixes_test.rs index 814d62c2..a05ffb1a 100644 --- a/crates/ahm/tests/heal_bug_fixes_test.rs +++ b/crates/ahm/tests/heal_bug_fixes_test.rs @@ -244,6 +244,14 @@ fn test_heal_task_status_atomic_update() { async fn list_objects_for_heal(&self, _bucket: &str, _prefix: &str) -> rustfs_ahm::Result> { Ok(vec![]) } + async fn list_objects_for_heal_page( + &self, + _bucket: &str, + _prefix: &str, + _continuation_token: Option<&str>, + ) -> rustfs_ahm::Result<(Vec, Option, bool)> { + Ok((vec![], None, false)) + } async fn get_disk_for_resume(&self, _set_disk_id: &str) -> rustfs_ahm::Result { Err(rustfs_ahm::Error::other("Not implemented in mock")) } diff --git a/crates/common/src/heal_channel.rs b/crates/common/src/heal_channel.rs index 449afbd8..31b906d3 100644 --- a/crates/common/src/heal_channel.rs +++ b/crates/common/src/heal_channel.rs @@ -85,12 +85,90 @@ impl Display for DriveState { } } -#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[repr(u8)] pub enum HealScanMode { - Unknown, + Unknown = 0, #[default] - Normal, - Deep, + Normal = 1, + Deep = 2, +} + +impl Serialize for HealScanMode { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_u8(*self as u8) + } +} + +impl<'de> Deserialize<'de> for HealScanMode { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct HealScanModeVisitor; + + impl<'de> serde::de::Visitor<'de> for HealScanModeVisitor { + type Value = HealScanMode; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("an integer between 0 and 2") + } + + fn visit_u8(self, value: u8) -> Result + where + E: serde::de::Error, + { + match value { + 0 => Ok(HealScanMode::Unknown), + 1 => Ok(HealScanMode::Normal), + 2 => Ok(HealScanMode::Deep), + _ => Err(E::custom(format!("invalid HealScanMode value: {}", value))), + } + } + + fn visit_u64(self, value: u64) -> Result + where + E: serde::de::Error, + { + if value > u8::MAX as u64 { + return Err(E::custom(format!("HealScanMode value too large: {}", value))); + } + self.visit_u8(value as u8) + } + + fn visit_i64(self, value: i64) -> Result + where + E: serde::de::Error, + { + if value < 0 || value > u8::MAX as i64 { + return Err(E::custom(format!("invalid HealScanMode value: {}", value))); + } + self.visit_u8(value as u8) + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + // Try parsing as number string first (for URL-encoded values) + if let Ok(num) = value.parse::() { + return self.visit_u8(num); + } + // Try parsing as named string + match value { + "Unknown" | "unknown" => Ok(HealScanMode::Unknown), + "Normal" | "normal" => Ok(HealScanMode::Normal), + "Deep" | "deep" => Ok(HealScanMode::Deep), + _ => Err(E::custom(format!("invalid HealScanMode string: {}", value))), + } + } + } + + deserializer.deserialize_any(HealScanModeVisitor) + } } #[derive(Clone, Copy, Debug, Default, Serialize, Deserialize)] @@ -106,7 +184,9 @@ pub struct HealOpts { pub update_parity: bool, #[serde(rename = "nolock")] pub no_lock: bool, + #[serde(rename = "pool", default)] pub pool: Option, + #[serde(rename = "set", default)] pub set: Option, } diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index 7f13cd47..ea35e79c 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -967,9 +967,7 @@ impl LocalDisk { sum: &[u8], shard_size: usize, ) -> Result<()> { - let file = super::fs::open_file(part_path, O_CREATE | O_WRONLY) - .await - .map_err(to_file_error)?; + let file = super::fs::open_file(part_path, O_RDONLY).await.map_err(to_file_error)?; let meta = file.metadata().await.map_err(to_file_error)?; let file_size = meta.len() as usize; @@ -1465,6 +1463,7 @@ impl DiskAPI for LocalDisk { resp.results[i] = conv_part_err_to_int(&err); if resp.results[i] == CHECK_PART_UNKNOWN { if let Some(err) = err { + error!("verify_file: failed to bitrot verify file: {:?}, error: {:?}", &part_path, &err); if err == DiskError::FileAccessDenied { continue; } @@ -1551,7 +1550,7 @@ impl DiskAPI for LocalDisk { .join(fi.data_dir.map_or("".to_string(), |dir| dir.to_string())) .join(format!("part.{}", part.number)); - match lstat(file_path).await { + match lstat(&file_path).await { Ok(st) => { if st.is_dir() { resp.results[i] = CHECK_PART_FILE_NOT_FOUND; @@ -1577,6 +1576,8 @@ impl DiskAPI for LocalDisk { } } resp.results[i] = CHECK_PART_FILE_NOT_FOUND; + } else { + error!("check_parts: failed to stat file: {:?}, error: {:?}", &file_path, &e); } continue; } diff --git a/crates/ecstore/src/disk/mod.rs b/crates/ecstore/src/disk/mod.rs index 3716f5eb..7d301843 100644 --- a/crates/ecstore/src/disk/mod.rs +++ b/crates/ecstore/src/disk/mod.rs @@ -681,7 +681,10 @@ pub fn conv_part_err_to_int(err: &Option) -> usize { Some(DiskError::VolumeNotFound) => CHECK_PART_VOLUME_NOT_FOUND, Some(DiskError::DiskNotFound) => CHECK_PART_DISK_NOT_FOUND, None => CHECK_PART_SUCCESS, - _ => CHECK_PART_UNKNOWN, + _ => { + tracing::warn!("conv_part_err_to_int: unknown error: {err:?}"); + CHECK_PART_UNKNOWN + } } } diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index bf3a9c35..696c8c71 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -25,7 +25,7 @@ use crate::client::{object_api_utils::get_raw_etag, transition_api::ReaderImpl}; use crate::disk::STORAGE_FORMAT_FILE; use crate::disk::error_reduce::{OBJECT_OP_IGNORED_ERRS, reduce_read_quorum_errs, reduce_write_quorum_errs}; use crate::disk::{ - self, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, + self, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, CHECK_PART_UNKNOWN, conv_part_err_to_int, has_part_err, }; use crate::erasure_coding; @@ -6431,6 +6431,10 @@ fn join_errs(errs: &[Option]) -> String { errs.join(", ") } +/// disks_with_all_partsv2 is a corrected version based on Go implementation. +/// It sets partsMetadata and onlineDisks when xl.meta is inexistant/corrupted or outdated. +/// It also checks if the status of each part (corrupted, missing, ok) in each drive. +/// Returns (availableDisks, dataErrsByDisk, dataErrsByPart). async fn disks_with_all_parts( online_disks: &[Option], parts_metadata: &mut [FileInfo], @@ -6440,39 +6444,66 @@ async fn disks_with_all_parts( object: &str, scan_mode: HealScanMode, ) -> disk::error::Result<(Vec>, HashMap>, HashMap>)> { - info!( - "disks_with_all_parts: starting with online_disks.len()={}, scan_mode={:?}", + let object_name = latest_meta.name.clone(); + debug!( + "disks_with_all_partsv2: starting with object_name={}, online_disks.len()={}, scan_mode={:?}", + object_name, online_disks.len(), scan_mode ); + let mut available_disks = vec![None; online_disks.len()]; + + // Initialize dataErrsByDisk and dataErrsByPart with 0 (CHECK_PART_UNKNOWN) to match Go let mut data_errs_by_disk: HashMap> = HashMap::new(); for i in 0..online_disks.len() { - data_errs_by_disk.insert(i, vec![1; latest_meta.parts.len()]); + data_errs_by_disk.insert(i, vec![CHECK_PART_SUCCESS; latest_meta.parts.len()]); } let mut data_errs_by_part: HashMap> = HashMap::new(); for i in 0..latest_meta.parts.len() { - data_errs_by_part.insert(i, vec![1; online_disks.len()]); + data_errs_by_part.insert(i, vec![CHECK_PART_SUCCESS; online_disks.len()]); } + // Check for inconsistent erasure distribution let mut inconsistent = 0; - parts_metadata.iter().enumerate().for_each(|(index, meta)| { - if meta.is_valid() && !meta.deleted && meta.erasure.distribution.len() != online_disks.len() - || (!meta.erasure.distribution.is_empty() && meta.erasure.distribution[index] != meta.erasure.index) - { - warn!("file info inconsistent, meta: {:?}", meta); - inconsistent += 1; + for (index, meta) in parts_metadata.iter().enumerate() { + if !meta.is_valid() { + // Since for majority of the cases erasure.Index matches with erasure.Distribution we can + // consider the offline disks as consistent. + continue; } - }); + if !meta.deleted { + if meta.erasure.distribution.len() != online_disks.len() { + // Erasure distribution seems to have lesser + // number of items than number of online disks. + inconsistent += 1; + continue; + } + if !meta.erasure.distribution.is_empty() + && index < meta.erasure.distribution.len() + && meta.erasure.distribution[index] != meta.erasure.index + { + // Mismatch indexes with distribution order + inconsistent += 1; + } + } + } let erasure_distribution_reliable = inconsistent <= parts_metadata.len() / 2; + // Initialize metaErrs let mut meta_errs = Vec::with_capacity(errs.len()); for _ in 0..errs.len() { meta_errs.push(None); } + // Process meta errors for (index, disk) in online_disks.iter().enumerate() { + if let Some(err) = &errs[index] { + meta_errs[index] = Some(err.clone()); + continue; + } + let disk = if let Some(disk) = disk { disk } else { @@ -6480,48 +6511,59 @@ async fn disks_with_all_parts( continue; }; - if let Some(err) = &errs[index] { - meta_errs[index] = Some(err.clone()); - continue; - } if !disk.is_online().await { meta_errs[index] = Some(DiskError::DiskNotFound); continue; } + let meta = &parts_metadata[index]; - if !meta.mod_time.eq(&latest_meta.mod_time) || !meta.data_dir.eq(&latest_meta.data_dir) { - warn!("mod_time is not Eq, file corrupt, index: {index}"); + // Check if metadata is corrupted (equivalent to filterByETag=false in Go) + let corrupted = !meta.mod_time.eq(&latest_meta.mod_time) || !meta.data_dir.eq(&latest_meta.data_dir); + + if corrupted { meta_errs[index] = Some(DiskError::FileCorrupt); parts_metadata[index] = FileInfo::default(); continue; } + if erasure_distribution_reliable { if !meta.is_valid() { - warn!("file info is not valid, file corrupt, index: {index}"); parts_metadata[index] = FileInfo::default(); meta_errs[index] = Some(DiskError::FileCorrupt); continue; } + #[allow(clippy::collapsible_if)] if !meta.deleted && meta.erasure.distribution.len() != online_disks.len() { - warn!("file info distribution len not Eq online_disks len, file corrupt, index: {index}"); + // Erasure distribution is not the same as onlineDisks + // attempt a fix if possible, assuming other entries + // might have the right erasure distribution. parts_metadata[index] = FileInfo::default(); meta_errs[index] = Some(DiskError::FileCorrupt); continue; } } } - // info!("meta_errs: {:?}, errs: {:?}", meta_errs, errs); - meta_errs.iter().enumerate().for_each(|(index, err)| { + + // Copy meta errors to part errors + for (index, err) in meta_errs.iter().enumerate() { if err.is_some() { let part_err = conv_part_err_to_int(err); for p in 0..latest_meta.parts.len() { - data_errs_by_part.entry(p).or_insert(vec![0; meta_errs.len()])[index] = part_err; + if let Some(vec) = data_errs_by_part.get_mut(&p) { + if index < vec.len() { + info!( + "data_errs_by_part: copy meta errors to part errors: object_name={}, index: {index}, part: {p}, part_err: {part_err}", + object_name + ); + vec[index] = part_err; + } + } } } - }); + } - // info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk); + // Check data for each disk for (index, disk) in online_disks.iter().enumerate() { if meta_errs[index].is_some() { continue; @@ -6530,7 +6572,6 @@ async fn disks_with_all_parts( let disk = if let Some(disk) = disk { disk } else { - meta_errs[index] = Some(DiskError::DiskNotFound); continue; }; @@ -6558,16 +6599,21 @@ async fn disks_with_all_parts( if let Some(vec) = data_errs_by_part.get_mut(&0) { if index < vec.len() { vec[index] = conv_part_err_to_int(&verify_err.map(|e| e.into())); - info!("bitrot check result: {}", vec[index]); + info!( + "data_errs_by_part:bitrot check result: object_name={}, index: {index}, result: {}", + object_name, vec[index] + ); } } } continue; } + // Verify file or check parts let mut verify_resp = CheckPartsResp::default(); let mut verify_err = None; meta.data_dir = latest_meta.data_dir; + if scan_mode == HealScanMode::Deep { // disk has a valid xl.meta but may not have all the // parts. This is considered an outdated disk, since @@ -6577,6 +6623,7 @@ async fn disks_with_all_parts( verify_resp = v; } Err(err) => { + warn!("verify_file failed: {err:?}, object_name={}, index: {index}", object_name); verify_err = Some(err); } } @@ -6586,38 +6633,85 @@ async fn disks_with_all_parts( verify_resp = v; } Err(err) => { + warn!("check_parts failed: {err:?}, object_name={}, index: {index}", object_name); verify_err = Some(err); } } } + // Update dataErrsByPart for all parts for p in 0..latest_meta.parts.len() { if let Some(vec) = data_errs_by_part.get_mut(&p) { if index < vec.len() { if verify_err.is_some() { - info!("verify_err"); + info!( + "data_errs_by_part: verify_err: object_name={}, index: {index}, part: {p}, verify_err: {verify_err:?}", + object_name + ); vec[index] = conv_part_err_to_int(&verify_err.clone()); } else { - info!("verify_resp, verify_resp.results {}", verify_resp.results[p]); - vec[index] = verify_resp.results[p]; + // Fix: verify_resp.results length is based on meta.parts, not latest_meta.parts + // We need to check bounds to avoid panic + if p < verify_resp.results.len() { + info!( + "data_errs_by_part: update data_errs_by_part: object_name={}, index: {}, part: {}, verify_resp.results: {:?}", + object_name, index, p, verify_resp.results[p] + ); + vec[index] = verify_resp.results[p]; + } else { + debug!( + "data_errs_by_part: verify_resp.results length mismatch: expected at least {}, got {}, object_name={}, index: {index}, part: {p}", + p + 1, + verify_resp.results.len(), + object_name + ); + vec[index] = CHECK_PART_SUCCESS; + } } } } } } - // info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk); + + // Build dataErrsByDisk from dataErrsByPart for (part, disks) in data_errs_by_part.iter() { - for (idx, disk) in disks.iter().enumerate() { - if let Some(vec) = data_errs_by_disk.get_mut(&idx) { - vec[*part] = *disk; + for (disk_idx, disk_err) in disks.iter().enumerate() { + if let Some(vec) = data_errs_by_disk.get_mut(&disk_idx) { + if *part < vec.len() { + vec[*part] = *disk_err; + info!( + "data_errs_by_disk: update data_errs_by_disk: object_name={}, part: {part}, disk_idx: {disk_idx}, disk_err: {disk_err}", + object_name, + ); + } } } } - // info!("data_errs_by_part: {:?}, data_errs_by_disk: {:?}", data_errs_by_part, data_errs_by_disk); + + // Calculate available_disks based on meta_errs and data_errs_by_disk for (i, disk) in online_disks.iter().enumerate() { - if meta_errs[i].is_none() && disk.is_some() && !has_part_err(&data_errs_by_disk[&i]) { - available_disks[i] = Some(disk.clone().unwrap()); + if let Some(disk_errs) = data_errs_by_disk.get(&i) { + if meta_errs[i].is_none() && disk.is_some() && !has_part_err(disk_errs) { + available_disks[i] = Some(disk.clone().unwrap()); + } else { + warn!( + "disks_with_all_partsv2: disk is not available, object_name={}, index: {}, meta_errs={:?}, disk_errs={:?}, disk_is_some={:?}", + object_name, + i, + meta_errs[i], + disk_errs, + disk.is_some(), + ); + parts_metadata[i] = FileInfo::default(); + } } else { + warn!( + "disks_with_all_partsv2: data_errs_by_disk missing entry for object_name={},index {}, meta_errs={:?}, disk_is_some={:?}", + object_name, + i, + meta_errs[i], + disk.is_some(), + ); parts_metadata[i] = FileInfo::default(); } } diff --git a/crates/policy/src/auth/credentials.rs b/crates/policy/src/auth/credentials.rs index 0bc2618e..4cbe8707 100644 --- a/crates/policy/src/auth/credentials.rs +++ b/crates/policy/src/auth/credentials.rs @@ -21,6 +21,7 @@ use serde_json::{Value, json}; use std::collections::HashMap; use time::OffsetDateTime; use time::macros::offset; +use tracing::warn; const ACCESS_KEY_MIN_LEN: usize = 3; const ACCESS_KEY_MAX_LEN: usize = 20; @@ -239,6 +240,8 @@ pub fn create_new_credentials_with_metadata( } }; + warn!("create_new_credentials_with_metadata expiration {expiration:?}, access_key: {ak}"); + let token = utils::generate_jwt(&claims, token_secret)?; Ok(Credentials { diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index f6ce6b04..4e9fcfb9 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -137,6 +137,11 @@ pub fn make_admin_route(console_enabled: bool) -> std::io::Result // Some APIs are only available in EC mode // if is_dist_erasure().await || is_erasure().await { + r.insert( + Method::POST, + format!("{}{}", ADMIN_PREFIX, "/v3/heal/{bucket}").as_str(), + AdminOperation(&handlers::HealHandler {}), + )?; r.insert( Method::POST, format!("{}{}", ADMIN_PREFIX, "/v3/heal/{bucket}/{prefix}").as_str(), diff --git a/rustfs/src/storage/tonic_service.rs b/rustfs/src/storage/tonic_service.rs index 7a9a9b7d..bcee4903 100644 --- a/rustfs/src/storage/tonic_service.rs +++ b/rustfs/src/storage/tonic_service.rs @@ -451,7 +451,7 @@ impl Node for NodeService { })); } }; - match disk.verify_file(&request.volume, &request.path, &file_info).await { + match disk.check_parts(&request.volume, &request.path, &file_info).await { Ok(check_parts_resp) => { let check_parts_resp = match serde_json::to_string(&check_parts_resp) { Ok(check_parts_resp) => check_parts_resp,