diff --git a/crates/common/src/heal_channel.rs b/crates/common/src/heal_channel.rs index 14ed2877..19d6f0f4 100644 --- a/crates/common/src/heal_channel.rs +++ b/crates/common/src/heal_channel.rs @@ -50,7 +50,7 @@ impl Display for HealItemType { } } -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub enum DriveState { Ok, Offline, @@ -59,7 +59,7 @@ pub enum DriveState { PermissionDenied, Faulty, RootMount, - Unknown, + Unknown(String), Unformatted, // only returned by disk } @@ -73,12 +73,28 @@ impl DriveState { DriveState::PermissionDenied => "permission-denied", DriveState::Faulty => "faulty", DriveState::RootMount => "root-mount", - DriveState::Unknown => "unknown", + DriveState::Unknown(reason) => reason, DriveState::Unformatted => "unformatted", } } } +impl Clone for DriveState { + fn clone(&self) -> Self { + match self { + DriveState::Unknown(reason) => DriveState::Unknown(reason.clone()), + DriveState::Ok => DriveState::Ok, + DriveState::Offline => DriveState::Offline, + DriveState::Corrupt => DriveState::Corrupt, + DriveState::Missing => DriveState::Missing, + DriveState::PermissionDenied => DriveState::PermissionDenied, + DriveState::Faulty => DriveState::Faulty, + DriveState::RootMount => DriveState::RootMount, + DriveState::Unformatted => DriveState::Unformatted, + } + } +} + impl Display for DriveState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.to_str()) diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index 5fb513d9..aa813cae 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -1570,6 +1570,8 @@ impl DiskAPI for LocalDisk { .as_str(), )?; + info!("check_parts: part_path: {:?}", &part_path); + match lstat(&part_path).await { Ok(st) => { if st.is_dir() { @@ -1584,6 +1586,8 @@ impl DiskAPI for LocalDisk { resp.results[i] = CHECK_PART_SUCCESS; } Err(err) => { + info!("check_parts: failed to stat file: {:?}, error: {:?}", &part_path, &err); + let e: DiskError = to_file_error(err).into(); if e == DiskError::FileNotFound { diff --git a/crates/ecstore/src/disk/mod.rs b/crates/ecstore/src/disk/mod.rs index f0a75636..3c742dcd 100644 --- a/crates/ecstore/src/disk/mod.rs +++ b/crates/ecstore/src/disk/mod.rs @@ -711,6 +711,10 @@ pub fn has_part_err(part_errs: &[usize]) -> bool { part_errs.iter().any(|err| *err != CHECK_PART_SUCCESS) } +pub fn count_part_not_success(part_errs: &[usize]) -> usize { + part_errs.iter().filter(|err| **err != CHECK_PART_SUCCESS).count() +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index b705d9f3..859b3cbd 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -22,12 +22,12 @@ use crate::bucket::replication::check_replicate_delete; use crate::bucket::versioning::VersioningApi; use crate::bucket::versioning_sys::BucketVersioningSys; use crate::client::{object_api_utils::get_raw_etag, transition_api::ReaderImpl}; -use crate::disk::STORAGE_FORMAT_FILE; use crate::disk::error_reduce::{OBJECT_OP_IGNORED_ERRS, reduce_read_quorum_errs, reduce_write_quorum_errs}; use crate::disk::{ self, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS, CHECK_PART_UNKNOWN, conv_part_err_to_int, has_part_err, }; +use crate::disk::{STORAGE_FORMAT_FILE, count_part_not_success}; use crate::erasure_coding; use crate::erasure_coding::bitrot_verify; use crate::error::{Error, Result, is_err_version_not_found}; @@ -2148,6 +2148,18 @@ impl SetDisks { shuffled_disks } + fn shuffle_check_parts(parts_errs: &[usize], distribution: &[usize]) -> Vec { + if distribution.is_empty() { + return parts_errs.to_vec(); + } + let mut shuffled_parts_errs = vec![0; parts_errs.len()]; + for (i, v) in parts_errs.iter().enumerate() { + let idx = distribution[i]; + shuffled_parts_errs[idx - 1] = *v; + } + shuffled_parts_errs + } + #[tracing::instrument(level = "debug", skip(self))] async fn get_object_fileinfo( &self, @@ -2603,66 +2615,73 @@ impl SetDisks { opts: &HealOpts, ) -> disk::error::Result<(HealResultItem, Option)> { info!(?opts, "Starting heal_object"); + + let disks = self.get_disks_internal().await; + let mut result = HealResultItem { heal_item_type: HealItemType::Object.to_string(), bucket: bucket.to_string(), object: object.to_string(), version_id: version_id.to_string(), - disk_count: self.disks.read().await.len(), + disk_count: disks.len(), ..Default::default() }; let _write_lock_guard = if !opts.no_lock { info!("Acquiring write lock for object: {}, owner: {}", object, self.locker_owner); - // Check if lock is already held - let key = rustfs_lock::fast_lock::types::ObjectKey::new(bucket, object); - let mut reuse_existing_lock = false; - if let Some(lock_info) = self.fast_lock_manager.get_lock_info(&key) { - if lock_info.owner.as_ref() == self.locker_owner.as_str() - && matches!(lock_info.mode, rustfs_lock::fast_lock::types::LockMode::Exclusive) - { - reuse_existing_lock = true; - debug!("Reusing existing exclusive lock for object {} held by {}", object, self.locker_owner); - } else { - warn!("Lock already exists for object {}: {:?}", object, lock_info); - } - } else { - info!("No existing lock found for object {}", object); - } + let fast_lock_guard = self.new_ns_lock(bucket, object).await?; - if reuse_existing_lock { - None - } else { - let mut lock_result = None; - for i in 0..3 { - let start_time = Instant::now(); - match self - .fast_lock_manager - .acquire_write_lock(bucket, object, self.locker_owner.as_str()) - .await - { - Ok(res) => { - let elapsed = start_time.elapsed(); - info!(duration = ?elapsed, attempt = i + 1, "Write lock acquired"); - lock_result = Some(res); - break; - } - Err(e) => { - let elapsed = start_time.elapsed(); - info!(error = ?e, attempt = i + 1, duration = ?elapsed, "Lock acquisition failed, retrying"); - if i < 2 { - tokio::time::sleep(Duration::from_millis(50 * (i as u64 + 1))).await; - } else { - let message = self.format_lock_error(bucket, object, "write", &e); - error!("Failed to acquire write lock after retries: {}", message); - return Err(DiskError::other(message)); - } - } - } - } - lock_result - } + Some(fast_lock_guard) + + // // Check if lock is already held + // let key = rustfs_lock::fast_lock::types::ObjectKey::new(bucket, object); + // let mut reuse_existing_lock = false; + // if let Some(lock_info) = self.fast_lock_manager.get_lock_info(&key) { + // if lock_info.owner.as_ref() == self.locker_owner.as_str() + // && matches!(lock_info.mode, rustfs_lock::fast_lock::types::LockMode::Exclusive) + // { + // reuse_existing_lock = true; + // debug!("Reusing existing exclusive lock for object {} held by {}", object, self.locker_owner); + // } else { + // warn!("Lock already exists for object {}: {:?}", object, lock_info); + // } + // } else { + // info!("No existing lock found for object {}", object); + // } + + // if reuse_existing_lock { + // None + // } else { + // let mut lock_result = None; + // for i in 0..3 { + // let start_time = Instant::now(); + // match self + // .fast_lock_manager + // .acquire_write_lock(bucket, object, self.locker_owner.as_str()) + // .await + // { + // Ok(res) => { + // let elapsed = start_time.elapsed(); + // info!(duration = ?elapsed, attempt = i + 1, "Write lock acquired"); + // lock_result = Some(res); + // break; + // } + // Err(e) => { + // let elapsed = start_time.elapsed(); + // info!(error = ?e, attempt = i + 1, duration = ?elapsed, "Lock acquisition failed, retrying"); + // if i < 2 { + // tokio::time::sleep(Duration::from_millis(50 * (i as u64 + 1))).await; + // } else { + // let message = self.format_lock_error(bucket, object, "write", &e); + // error!("Failed to acquire write lock after retries: {}", message); + // return Err(DiskError::other(message)); + // } + // } + // } + // } + // lock_result + // } } else { info!("Skipping lock acquisition (no_lock=true)"); None @@ -2676,38 +2695,8 @@ impl SetDisks { } }; - let disks = { self.disks.read().await.clone() }; + let (mut parts_metadata, errs) = Self::read_all_fileinfo(&disks, "", bucket, object, version_id, true, true).await?; - let (mut parts_metadata, errs) = { - let mut retry_count = 0; - loop { - let (parts, errs) = Self::read_all_fileinfo(&disks, "", bucket, object, version_id, false, false).await?; - - // Check if we have enough valid metadata to proceed - // If we have too many errors, and we haven't exhausted retries, try again - let valid_count = errs.iter().filter(|e| e.is_none()).count(); - // Simple heuristic: if valid_count is less than expected quorum (e.g. half disks), retry - // But we don't know the exact quorum yet. Let's just retry on high error rate if possible. - // Actually, read_all_fileinfo shouldn't fail easily. - // Let's just retry if we see ANY non-NotFound errors that might be transient (like timeouts) - - let has_transient_error = errs - .iter() - .any(|e| matches!(e, Some(DiskError::SourceStalled) | Some(DiskError::Timeout))); - - if !has_transient_error || retry_count >= 3 { - break (parts, errs); - } - - info!( - "read_all_fileinfo encountered transient errors, retrying (attempt {}/3). Errs: {:?}", - retry_count + 1, - errs - ); - tokio::time::sleep(Duration::from_millis(50 * (retry_count as u64 + 1))).await; - retry_count += 1; - } - }; info!( parts_count = parts_metadata.len(), bucket = bucket, @@ -2734,25 +2723,31 @@ impl SetDisks { )); } - info!(parts_count = parts_metadata.len(), "Initiating quorum check"); + info!(parts_count = parts_metadata.len(), "heal_object Initiating quorum check"); match Self::object_quorum_from_meta(&parts_metadata, &errs, self.default_parity_count) { Ok((read_quorum, _)) => { result.parity_blocks = result.disk_count - read_quorum as usize; result.data_blocks = read_quorum as usize; - let ((online_disks, mod_time, etag), disk_len) = { + let ((mut online_disks, quorum_mod_time, quorum_etag), disk_len) = { let disks = self.disks.read().await; let disk_len = disks.len(); (Self::list_online_disks(&disks, &parts_metadata, &errs, read_quorum as usize), disk_len) }; - match Self::pick_valid_fileinfo(&parts_metadata, mod_time, etag, read_quorum as usize) { + info!(?parts_metadata, ?errs, ?read_quorum, ?disk_len, "heal_object List disks metadata"); + + info!(?online_disks, ?quorum_mod_time, ?quorum_etag, "heal_object List online disks"); + + let filter_by_etag = quorum_etag.is_some(); + match Self::pick_valid_fileinfo(&parts_metadata, quorum_mod_time, quorum_etag.clone(), read_quorum as usize) { Ok(latest_meta) => { - let (available_disks, data_errs_by_disk, data_errs_by_part) = disks_with_all_parts( - &online_disks, + let (data_errs_by_disk, data_errs_by_part) = disks_with_all_parts( + &mut online_disks, &mut parts_metadata, &errs, &latest_meta, + filter_by_etag, bucket, object, opts.scan_mode, @@ -2760,9 +2755,9 @@ impl SetDisks { .await?; info!( - "disks_with_all_parts results: available_disks count={}, total_disks={}", - available_disks.iter().filter(|d| d.is_some()).count(), - available_disks.len() + "disks_with_all_parts heal_object results: available_disks count={}, total_disks={}", + online_disks.iter().filter(|d| d.is_some()).count(), + online_disks.len() ); let erasure = if !latest_meta.deleted && !latest_meta.is_remote() { @@ -2781,12 +2776,16 @@ impl SetDisks { // Loop to find number of disks with valid data, per-drive // data state and a list of outdated disks on which data needs // to be healed. - let mut outdate_disks = vec![None; disk_len]; + let mut out_dated_disks = vec![None; disk_len]; let mut disks_to_heal_count = 0; + let mut meta_to_heal_count = 0; - info!("Checking {} disks for healing needs (bucket={}, object={})", disk_len, bucket, object); - for index in 0..available_disks.len() { - let (yes, reason) = should_heal_object_on_disk( + info!( + "heal_object Checking {} disks for healing needs (bucket={}, object={})", + disk_len, bucket, object + ); + for index in 0..online_disks.len() { + let (yes, is_meta, reason) = should_heal_object_on_disk( &errs[index], &data_errs_by_disk[&index], &parts_metadata[index], @@ -2794,14 +2793,17 @@ impl SetDisks { ); info!( - "Disk {} heal check: should_heal={}, reason={:?}, err={:?}, endpoint={}", + "heal_object Disk {} heal check: should_heal={}, reason={:?}, err={:?}, endpoint={}", index, yes, reason, errs[index], self.set_endpoints[index] ); if yes { - outdate_disks[index] = disks[index].clone(); + out_dated_disks[index] = disks[index].clone(); disks_to_heal_count += 1; - info!("Disk {} marked for healing (endpoint={})", index, self.set_endpoints[index]); + if is_meta { + meta_to_heal_count += 1; + } + info!("heal_object Disk {} marked for healing (endpoint={})", index, self.set_endpoints[index]); } let drive_state = match reason { @@ -2812,7 +2814,8 @@ impl SetDisks { | DiskError::VolumeNotFound | DiskError::PartMissingOrCorrupt | DiskError::OutdatedXLMeta => DriveState::Missing.to_string(), - _ => DriveState::Corrupt.to_string(), + DiskError::FileCorrupt => DriveState::Corrupt.to_string(), + _ => DriveState::Unknown(err.to_string()).to_string(), }, None => DriveState::Ok.to_string(), }; @@ -2834,24 +2837,6 @@ impl SetDisks { disks_to_heal_count, disk_len, bucket, object ); - if DiskError::is_all_not_found(&errs) { - warn!( - "heal_object failed, all obj part not found, bucket: {}, obj: {}, version_id: {}", - bucket, object, version_id - ); - let err = if !version_id.is_empty() { - DiskError::FileVersionNotFound - } else { - DiskError::FileNotFound - }; - - return Ok(( - self.default_heal_result(FileInfo::default(), &errs, bucket, object, version_id) - .await, - Some(err), - )); - } - if disks_to_heal_count == 0 { info!("No disks to heal, returning early"); return Ok((result, None)); @@ -2867,7 +2852,21 @@ impl SetDisks { disks_to_heal_count, opts.dry_run ); - if !latest_meta.deleted && disks_to_heal_count > latest_meta.erasure.parity_blocks { + let mut cannot_heal = !latest_meta.deleted && disks_to_heal_count > latest_meta.erasure.parity_blocks; + if cannot_heal && quorum_etag.is_some() { + cannot_heal = false; + } + + if !latest_meta.deleted && !latest_meta.is_remote() { + for (_, part_errs) in data_errs_by_part.iter() { + if count_part_not_success(part_errs) > latest_meta.erasure.parity_blocks { + cannot_heal = true; + break; + } + } + } + + if cannot_heal { let total_disks = parts_metadata.len(); let healthy_count = total_disks.saturating_sub(disks_to_heal_count); let required_data = total_disks.saturating_sub(latest_meta.erasure.parity_blocks); @@ -2926,10 +2925,10 @@ impl SetDisks { }; } - if !latest_meta.deleted && latest_meta.erasure.distribution.len() != available_disks.len() { + if !latest_meta.deleted && latest_meta.erasure.distribution.len() != online_disks.len() { let err_str = format!( "unexpected file distribution ({:?}) from available disks ({:?}), looks like backend disks have been manually modified refusing to heal {}/{}({})", - latest_meta.erasure.distribution, available_disks, bucket, object, version_id + latest_meta.erasure.distribution, online_disks, bucket, object, version_id ); warn!(err_str); let err = DiskError::other(err_str); @@ -2939,11 +2938,11 @@ impl SetDisks { )); } - let latest_disks = Self::shuffle_disks(&available_disks, &latest_meta.erasure.distribution); - if !latest_meta.deleted && latest_meta.erasure.distribution.len() != outdate_disks.len() { + let latest_disks = Self::shuffle_disks(&online_disks, &latest_meta.erasure.distribution); + if !latest_meta.deleted && latest_meta.erasure.distribution.len() != out_dated_disks.len() { let err_str = format!( "unexpected file distribution ({:?}) from outdated disks ({:?}), looks like backend disks have been manually modified refusing to heal {}/{}({})", - latest_meta.erasure.distribution, outdate_disks, bucket, object, version_id + latest_meta.erasure.distribution, out_dated_disks, bucket, object, version_id ); warn!(err_str); let err = DiskError::other(err_str); @@ -2970,7 +2969,7 @@ impl SetDisks { )); } - let out_dated_disks = Self::shuffle_disks(&outdate_disks, &latest_meta.erasure.distribution); + out_dated_disks = Self::shuffle_disks(&out_dated_disks, &latest_meta.erasure.distribution); let mut parts_metadata = Self::shuffle_parts_metadata(&parts_metadata, &latest_meta.erasure.distribution); let mut copy_parts_metadata = vec![None; parts_metadata.len()]; for (index, disk) in latest_disks.iter().enumerate() { @@ -3007,21 +3006,28 @@ impl SetDisks { latest_meta.is_remote() ); if !latest_meta.deleted && !latest_meta.is_remote() { - let erasure_info = latest_meta.erasure; - for part in latest_meta.parts.iter() { + let erasure_info = latest_meta.erasure.clone(); + for (part_index, part) in latest_meta.parts.iter().enumerate() { let till_offset = erasure.shard_file_offset(0, part.size, part.size); let checksum_algo = erasure_info.get_checksum_info(part.number).algorithm; let mut readers = Vec::with_capacity(latest_disks.len()); let mut writers = Vec::with_capacity(out_dated_disks.len()); // let mut errors = Vec::with_capacity(out_dated_disks.len()); + let mut prefer = vec![false; latest_disks.len()]; for (index, disk) in latest_disks.iter().enumerate() { + let this_part_errs = + Self::shuffle_check_parts(&data_errs_by_part[&index], &erasure_info.distribution); + if this_part_errs[part_index] != CHECK_PART_SUCCESS { + continue; + } + if let (Some(disk), Some(metadata)) = (disk, ©_parts_metadata[index]) { match create_bitrot_reader( metadata.data.as_deref(), Some(disk), bucket, - &format!("{}/{}/part.{}", object, src_data_dir, part.number), + &path_join_buf(&[object, &src_data_dir, &format!("part.{}", part.number)]), 0, till_offset, erasure.shard_size(), @@ -3064,14 +3070,18 @@ impl SetDisks { latest_disks.len(), out_dated_disks.len() ); - for (index, disk) in latest_disks.iter().enumerate() { - if let Some(outdated_disk) = &out_dated_disks[index] { + for (index, disk_op) in out_dated_disks.iter().enumerate() { + if let Some(outdated_disk) = disk_op { info!(disk_index = index, "Creating writer for outdated disk"); let writer = match create_bitrot_writer( is_inline_buffer, Some(outdated_disk), RUSTFS_META_TMP_BUCKET, - &format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number), + &path_join_buf(&[ + &tmp_id.to_string(), + &dst_data_dir.to_string(), + &format!("part.{}", part.number), + ]), erasure.shard_file_size(part.size as i64), erasure.shard_size(), HashAlgorithm::HighwayHash256, @@ -3094,56 +3104,6 @@ impl SetDisks { info!(disk_index = index, "Skipping writer (disk not outdated)"); writers.push(None); } - - // if let Some(disk) = disk { - // // let filewriter = { - // // if is_inline_buffer { - // // Box::new(Cursor::new(Vec::new())) - // // } else { - // // let disk = disk.clone(); - // // let part_path = format!("{}/{}/part.{}", object, src_data_dir, part.number); - // // disk.create_file("", RUSTFS_META_TMP_BUCKET, &part_path, 0).await? - // // } - // // }; - - // if is_inline_buffer { - // let writer = BitrotWriter::new( - // Writer::from_cursor(Cursor::new(Vec::new())), - // erasure.shard_size(), - // HashAlgorithm::HighwayHash256, - // ); - // writers.push(Some(writer)); - // } else { - // let f = disk - // .create_file( - // "", - // RUSTFS_META_TMP_BUCKET, - // &format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number), - // 0, - // ) - // .await?; - // let writer = BitrotWriter::new( - // Writer::from_tokio_writer(f), - // erasure.shard_size(), - // HashAlgorithm::HighwayHash256, - // ); - // writers.push(Some(writer)); - // } - - // // let writer = new_bitrot_filewriter( - // // disk.clone(), - // // RUSTFS_META_TMP_BUCKET, - // // format!("{}/{}/part.{}", tmp_id, dst_data_dir, part.number).as_str(), - // // is_inline_buffer, - // // DEFAULT_BITROT_ALGO, - // // erasure.shard_size(erasure.block_size), - // // ) - // // .await?; - - // // writers.push(Some(writer)); - // } else { - // writers.push(None); - // } } // Heal each part. erasure.Heal() will write the healed // part to .rustfs/tmp/uuid/ which needs to be renamed @@ -3151,13 +3111,13 @@ impl SetDisks { erasure.heal(&mut writers, readers, part.size, &prefer).await?; // close_bitrot_writers(&mut writers).await?; - for (index, disk) in out_dated_disks.iter().enumerate() { - if disk.is_none() { + for (index, disk_op) in out_dated_disks.iter_mut().enumerate() { + if disk_op.is_none() { continue; } if writers[index].is_none() { - outdate_disks[index] = None; + *disk_op = None; disks_to_heal_count -= 1; continue; } @@ -3226,45 +3186,49 @@ impl SetDisks { "Rename failed, attempting fallback" ); - // Preserve temp files for safety - info!(temp_uuid = %tmp_id, "Rename failed, preserving temporary files for safety"); + self.delete_all(RUSTFS_META_TMP_BUCKET, &tmp_id) + .await + .map_err(DiskError::other)?; - let healthy_index = latest_disks.iter().position(|d| d.is_some()).unwrap_or(0); + // // Preserve temp files for safety + // info!(temp_uuid = %tmp_id, "Rename failed, preserving temporary files for safety"); - if let Some(healthy_disk) = &latest_disks[healthy_index] { - let xlmeta_path = format!("{object}/xl.meta"); + // let healthy_index = latest_disks.iter().position(|d| d.is_some()).unwrap_or(0); - match healthy_disk.read_all(bucket, &xlmeta_path).await { - Ok(xlmeta_bytes) => { - if let Err(e) = disk.write_all(bucket, &xlmeta_path, xlmeta_bytes).await { - info!("fallback xl.meta overwrite failed: {}", e.to_string()); + // if let Some(healthy_disk) = &latest_disks[healthy_index] { + // let xlmeta_path = format!("{object}/xl.meta"); - return Ok(( - result, - Some(DiskError::other(format!("fallback xl.meta overwrite failed: {e}"))), - )); - } else { - info!("fallback xl.meta overwrite succeeded for disk {}", disk.to_string()); - } - } + // match healthy_disk.read_all(bucket, &xlmeta_path).await { + // Ok(xlmeta_bytes) => { + // if let Err(e) = disk.write_all(bucket, &xlmeta_path, xlmeta_bytes).await { + // info!("fallback xl.meta overwrite failed: {}", e.to_string()); - Err(e) => { - info!("read healthy xl.meta failed: {}", e.to_string()); + // return Ok(( + // result, + // Some(DiskError::other(format!("fallback xl.meta overwrite failed: {e}"))), + // )); + // } else { + // info!("fallback xl.meta overwrite succeeded for disk {}", disk.to_string()); + // } + // } - return Ok(( - result, - Some(DiskError::other(format!("read healthy xl.meta failed: {e}"))), - )); - } - } - } else { - info!("no healthy disk found for xl.meta fallback overwrite"); + // Err(e) => { + // info!("read healthy xl.meta failed: {}", e.to_string()); - return Ok(( - result, - Some(DiskError::other("no healthy disk found for xl.meta fallback overwrite")), - )); - } + // return Ok(( + // result, + // Some(DiskError::other(format!("read healthy xl.meta failed: {e}"))), + // )); + // } + // } + // } else { + // info!("no healthy disk found for xl.meta fallback overwrite"); + + // return Ok(( + // result, + // Some(DiskError::other("no healthy disk found for xl.meta fallback overwrite")), + // )); + // } } else { info!( "Successfully renamed healed data for disk {} (endpoint={}), removing temp files from volume={}, path={}", @@ -3542,73 +3506,99 @@ impl SetDisks { data_errs_by_part: &HashMap>, opts: ObjectOptions, ) -> disk::error::Result { - if let Ok(m) = is_object_dang_ling(meta_arr, errs, data_errs_by_part) { - let mut tags = HashMap::new(); - tags.insert("set", self.set_index.to_string()); - tags.insert("pool", self.pool_index.to_string()); - tags.insert("merrs", join_errs(errs)); - tags.insert("derrs", format!("{data_errs_by_part:?}")); - if m.is_valid() { - tags.insert("sz", m.size.to_string()); - tags.insert( - "mt", - m.mod_time - .as_ref() - .map_or(String::new(), |mod_time| mod_time.unix_timestamp().to_string()), - ); - tags.insert("d:p", format!("{}:{}", m.erasure.data_blocks, m.erasure.parity_blocks)); - } else { - tags.insert("invalid", "1".to_string()); - tags.insert( - "d:p", - format!("{}:{}", self.set_drive_count - self.default_parity_count, self.default_parity_count), - ); + let (m, can_heal) = is_object_dang_ling(meta_arr, errs, data_errs_by_part); + + if !can_heal { + return Err(DiskError::ErasureReadQuorum); + } + + let mut tags: HashMap = HashMap::new(); + tags.insert("set".to_string(), self.set_index.to_string()); + tags.insert("pool".to_string(), self.pool_index.to_string()); + tags.insert("merrs".to_string(), join_errs(errs)); + tags.insert("derrs".to_string(), format!("{data_errs_by_part:?}")); + if m.is_valid() { + tags.insert("sz".to_string(), m.size.to_string()); + tags.insert( + "mt".to_string(), + m.mod_time + .as_ref() + .map_or(String::new(), |mod_time| mod_time.unix_timestamp().to_string()), + ); + tags.insert("d:p".to_string(), format!("{}:{}", m.erasure.data_blocks, m.erasure.parity_blocks)); + } else { + tags.insert("invalid".to_string(), "1".to_string()); + tags.insert( + "d:p".to_string(), + format!("{}:{}", self.set_drive_count - self.default_parity_count, self.default_parity_count), + ); + } + let mut offline = 0; + for (i, err) in errs.iter().enumerate() { + let mut found = false; + if let Some(err) = err + && err == &DiskError::DiskNotFound + { + found = true; } - let mut offline = 0; - for (i, err) in errs.iter().enumerate() { - let mut found = false; - if let Some(err) = err - && err == &DiskError::DiskNotFound + for p in data_errs_by_part { + if let Some(v) = p.1.get(i) + && *v == CHECK_PART_DISK_NOT_FOUND { found = true; - } - for p in data_errs_by_part { - if let Some(v) = p.1.get(i) - && *v == CHECK_PART_DISK_NOT_FOUND - { - found = true; - break; - } - } - - if found { - offline += 1; + break; } } - if offline > 0 { - tags.insert("offline", offline.to_string()); + if found { + offline += 1; } - - // TODO: audit - let mut fi = FileInfo::default(); - if let Some(ref version_id) = opts.version_id { - fi.version_id = Uuid::parse_str(version_id).ok(); - } - // TODO: tier - for disk in self.disks.read().await.iter().flatten() { - let _ = disk - .delete_version(bucket, object, fi.clone(), false, DeleteOptions::default()) - .await; - } - Ok(m) - } else { - error!( - "Object {}/{} is corrupted but not dangling (some parts exist). Preserving data for potential manual recovery. Errors: {:?}", - bucket, object, errs - ); - Err(DiskError::ErasureReadQuorum) } + + if offline > 0 { + tags.insert("offline".to_string(), offline.to_string()); + } + + let mut fi = FileInfo::default(); + if let Some(ref version_id) = opts.version_id { + fi.version_id = Uuid::parse_str(version_id).ok(); + } + + fi.set_tier_free_version_id(&Uuid::new_v4().to_string()); + + let disks = self.get_disks_internal().await; + + let mut futures = Vec::with_capacity(disks.len()); + for disk_op in disks.iter() { + let bucket = bucket.to_string(); + let object = object.to_string(); + let fi = fi.clone(); + futures.push(async move { + if let Some(disk) = disk_op { + disk.delete_version(&bucket, &object, fi, false, DeleteOptions::default()) + .await + } else { + Err(DiskError::DiskNotFound) + } + }); + } + + let results = join_all(futures).await; + for (index, result) in results.into_iter().enumerate() { + let key = format!("ddisk-{index}"); + match result { + Ok(_) => { + tags.insert(key, "".to_string()); + } + Err(e) => { + tags.insert(key, e.to_string()); + } + } + } + + // TODO: audit + + Ok(m) } async fn delete_prefix(&self, bucket: &str, prefix: &str) -> disk::error::Result<()> { @@ -6330,8 +6320,7 @@ fn is_object_dang_ling( meta_arr: &[FileInfo], errs: &[Option], data_errs_by_part: &HashMap>, -) -> disk::error::Result { - let mut valid_meta = FileInfo::default(); +) -> (FileInfo, bool) { let (not_found_meta_errs, non_actionable_meta_errs) = dang_ling_meta_errs_count(errs); let (mut not_found_parts_errs, mut non_actionable_parts_errs) = (0, 0); @@ -6343,42 +6332,42 @@ fn is_object_dang_ling( } }); - meta_arr.iter().for_each(|fi| { + let mut valid_meta = FileInfo::default(); + + for fi in meta_arr.iter() { if fi.is_valid() { valid_meta = fi.clone(); + break; } - }); + } if !valid_meta.is_valid() { - let data_blocks = meta_arr.len().div_ceil(2); + let data_blocks = (meta_arr.len() + 1) / 2; if not_found_parts_errs > data_blocks { - return Ok(valid_meta); + return (valid_meta, true); } - return Err(DiskError::other("not ok")); + return (valid_meta, false); } if non_actionable_meta_errs > 0 || non_actionable_parts_errs > 0 { - return Err(DiskError::other("not ok")); + return (valid_meta, false); } if valid_meta.deleted { - let data_blocks = errs.len().div_ceil(2); - if not_found_meta_errs > data_blocks { - return Ok(valid_meta); - } - return Err(DiskError::other("not ok")); + let data_blocks = (errs.len() + 1) / 2; + return (valid_meta, not_found_meta_errs > data_blocks); } if not_found_meta_errs > 0 && not_found_meta_errs > valid_meta.erasure.parity_blocks { - return Ok(valid_meta); + return (valid_meta, true); } if !valid_meta.is_remote() && not_found_parts_errs > 0 && not_found_parts_errs > valid_meta.erasure.parity_blocks { - return Ok(valid_meta); + return (valid_meta, true); } - Err(DiskError::other("not ok")) + (valid_meta, false) } fn dang_ling_meta_errs_count(cerrs: &[Option]) -> (usize, usize) { @@ -6452,15 +6441,17 @@ fn join_errs(errs: &[Option]) -> String { /// It sets partsMetadata and onlineDisks when xl.meta is inexistant/corrupted or outdated. /// It also checks if the status of each part (corrupted, missing, ok) in each drive. /// Returns (availableDisks, dataErrsByDisk, dataErrsByPart). +#[allow(clippy::too_many_arguments)] async fn disks_with_all_parts( - online_disks: &[Option], + online_disks: &mut [Option], parts_metadata: &mut [FileInfo], errs: &[Option], latest_meta: &FileInfo, + filter_by_etag: bool, bucket: &str, object: &str, scan_mode: HealScanMode, -) -> disk::error::Result<(Vec>, HashMap>, HashMap>)> { +) -> disk::error::Result<(HashMap>, HashMap>)> { let object_name = latest_meta.name.clone(); debug!( "disks_with_all_partsv2: starting with object_name={}, online_disks.len()={}, scan_mode={:?}", @@ -6469,16 +6460,14 @@ async fn disks_with_all_parts( scan_mode ); - let mut available_disks = vec![None; online_disks.len()]; - // Initialize dataErrsByDisk and dataErrsByPart with 0 (CHECK_PART_UNKNOWN) to match Go let mut data_errs_by_disk: HashMap> = HashMap::new(); for i in 0..online_disks.len() { - data_errs_by_disk.insert(i, vec![CHECK_PART_SUCCESS; latest_meta.parts.len()]); + data_errs_by_disk.insert(i, vec![CHECK_PART_UNKNOWN; latest_meta.parts.len()]); } let mut data_errs_by_part: HashMap> = HashMap::new(); for i in 0..latest_meta.parts.len() { - data_errs_by_part.insert(i, vec![CHECK_PART_SUCCESS; online_disks.len()]); + data_errs_by_part.insert(i, vec![CHECK_PART_UNKNOWN; online_disks.len()]); } // Check for inconsistent erasure distribution @@ -6514,28 +6503,27 @@ async fn disks_with_all_parts( meta_errs.push(None); } + let online_disks_len = online_disks.len(); + // Process meta errors - for (index, disk) in online_disks.iter().enumerate() { + for (index, disk_op) in online_disks.iter_mut().enumerate() { if let Some(err) = &errs[index] { meta_errs[index] = Some(err.clone()); continue; } - let disk = if let Some(disk) = disk { - disk - } else { - meta_errs[index] = Some(DiskError::DiskNotFound); - continue; - }; - - if !disk.is_online().await { + if disk_op.is_none() { meta_errs[index] = Some(DiskError::DiskNotFound); continue; } let meta = &parts_metadata[index]; - // Check if metadata is corrupted (equivalent to filterByETag=false in Go) - let corrupted = !meta.mod_time.eq(&latest_meta.mod_time) || !meta.data_dir.eq(&latest_meta.data_dir); + + let corrupted = if filter_by_etag { + latest_meta.get_etag() != meta.get_etag() + } else { + !meta.mod_time.eq(&latest_meta.mod_time) || !meta.data_dir.eq(&latest_meta.data_dir) + }; if corrupted { warn!( @@ -6544,6 +6532,8 @@ async fn disks_with_all_parts( ); meta_errs[index] = Some(DiskError::FileCorrupt); parts_metadata[index] = FileInfo::default(); + *disk_op = None; + continue; } @@ -6555,11 +6545,11 @@ async fn disks_with_all_parts( ); parts_metadata[index] = FileInfo::default(); meta_errs[index] = Some(DiskError::FileCorrupt); + *disk_op = None; continue; } - #[allow(clippy::collapsible_if)] - if !meta.deleted && meta.erasure.distribution.len() != online_disks.len() { + if !meta.deleted && meta.erasure.distribution.len() != online_disks_len { // Erasure distribution is not the same as onlineDisks // attempt a fix if possible, assuming other entries // might have the right erasure distribution. @@ -6569,6 +6559,7 @@ async fn disks_with_all_parts( ); parts_metadata[index] = FileInfo::default(); meta_errs[index] = Some(DiskError::FileCorrupt); + *disk_op = None; continue; } } @@ -6659,6 +6650,7 @@ async fn disks_with_all_parts( } else { match disk.check_parts(bucket, object, meta).await { Ok(v) => { + info!("check_parts: verify_resp: {:?}", v); verify_resp = v; } Err(err) => { @@ -6704,48 +6696,20 @@ async fn disks_with_all_parts( // Build dataErrsByDisk from dataErrsByPart for (part, disks) in data_errs_by_part.iter() { - for (disk_idx, disk_err) in disks.iter().enumerate() { - if let Some(vec) = data_errs_by_disk.get_mut(&disk_idx) - && *part < vec.len() + for disk_idx in disks.iter() { + if let Some(parts) = data_errs_by_disk.get_mut(&disk_idx) + && *part < parts.len() { - vec[*part] = *disk_err; + parts[*part] = disks[*disk_idx]; info!( - "data_errs_by_disk: update data_errs_by_disk: object_name={}, part: {part}, disk_idx: {disk_idx}, disk_err: {disk_err}", - object_name, + "data_errs_by_disk: update data_errs_by_disk: object_name={}, part: {part}, disk_idx: {disk_idx}, disk_err: {}", + object_name, parts[*part] ); } } } - // Calculate available_disks based on meta_errs and data_errs_by_disk - for (i, disk) in online_disks.iter().enumerate() { - if let Some(disk_errs) = data_errs_by_disk.get(&i) { - if meta_errs[i].is_none() && disk.is_some() && !has_part_err(disk_errs) { - available_disks[i] = Some(disk.clone().unwrap()); - } else { - warn!( - "disks_with_all_partsv2: disk is not available, object_name={}, index: {}, meta_errs={:?}, disk_errs={:?}, disk_is_some={:?}", - object_name, - i, - meta_errs[i], - disk_errs, - disk.is_some(), - ); - parts_metadata[i] = FileInfo::default(); - } - } else { - warn!( - "disks_with_all_partsv2: data_errs_by_disk missing entry for object_name={},index {}, meta_errs={:?}, disk_is_some={:?}", - object_name, - i, - meta_errs[i], - disk.is_some(), - ); - parts_metadata[i] = FileInfo::default(); - } - } - - Ok((available_disks, data_errs_by_disk, data_errs_by_part)) + Ok((data_errs_by_disk, data_errs_by_part)) } pub fn should_heal_object_on_disk( @@ -6753,30 +6717,30 @@ pub fn should_heal_object_on_disk( parts_errs: &[usize], meta: &FileInfo, latest_meta: &FileInfo, -) -> (bool, Option) { +) -> (bool, bool, Option) { if let Some(err) = err && (err == &DiskError::FileNotFound || err == &DiskError::FileVersionNotFound || err == &DiskError::FileCorrupt) { - return (true, Some(err.clone())); + return (true, true, Some(err.clone())); } - if latest_meta.volume != meta.volume - || latest_meta.name != meta.name - || latest_meta.version_id != meta.version_id - || latest_meta.deleted != meta.deleted - { - info!("latest_meta not Eq meta, latest_meta: {:?}, meta: {:?}", latest_meta, meta); - return (true, Some(DiskError::OutdatedXLMeta)); + if err.is_some() { + return (false, false, err.clone()); } + + if !meta.equals(latest_meta) { + return (true, true, Some(DiskError::OutdatedXLMeta)); + } + if !meta.deleted && !meta.is_remote() { let err_vec = [CHECK_PART_FILE_NOT_FOUND, CHECK_PART_FILE_CORRUPT]; for part_err in parts_errs.iter() { if err_vec.contains(part_err) { - return (true, Some(DiskError::PartMissingOrCorrupt)); + return (true, false, Some(DiskError::PartMissingOrCorrupt)); } } } - (false, err.clone()) + (false, false, None) } async fn get_disks_info(disks: &[Option], eps: &[Endpoint]) -> Vec { @@ -7313,15 +7277,15 @@ mod tests { // Test with file not found error let err = Some(DiskError::FileNotFound); - let (should_heal, _) = should_heal_object_on_disk(&err, &[], &meta, &latest_meta); + let (should_heal, _, _) = should_heal_object_on_disk(&err, &[], &meta, &latest_meta); assert!(should_heal); // Test with no error and no part errors - let (should_heal, _) = should_heal_object_on_disk(&None, &[CHECK_PART_SUCCESS], &meta, &latest_meta); + let (should_heal, _, _) = should_heal_object_on_disk(&None, &[CHECK_PART_SUCCESS], &meta, &latest_meta); assert!(!should_heal); // Test with part corruption - let (should_heal, _) = should_heal_object_on_disk(&None, &[CHECK_PART_FILE_CORRUPT], &meta, &latest_meta); + let (should_heal, _, _) = should_heal_object_on_disk(&None, &[CHECK_PART_FILE_CORRUPT], &meta, &latest_meta); assert!(should_heal); }