From 030d3c9426ec333f6b8716940e8da92a3d9773ef Mon Sep 17 00:00:00 2001 From: weisd Date: Fri, 5 Dec 2025 20:30:08 +0800 Subject: [PATCH] fix filemeta nil versionid (#1002) --- .vscode/launch.json | 1 + crates/ahm/src/heal/manager.rs | 12 ++++----- crates/common/src/heal_channel.rs | 8 +++--- crates/ecstore/src/disk/local.rs | 28 +++++++++----------- crates/filemeta/src/filemeta.rs | 44 ++++++++++++++++--------------- 5 files changed, 46 insertions(+), 47 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index e9ff57b7..f054a23a 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -22,6 +22,7 @@ "env": { "RUST_LOG": "rustfs=debug,ecstore=info,s3s=debug,iam=debug", "RUSTFS_SKIP_BACKGROUND_TASK": "on", + //"RUSTFS_OBS_LOG_DIRECTORY": "./deploy/logs", // "RUSTFS_POLICY_PLUGIN_URL":"http://localhost:8181/v1/data/rustfs/authz/allow", // "RUSTFS_POLICY_PLUGIN_AUTH_TOKEN":"your-opa-token" }, diff --git a/crates/ahm/src/heal/manager.rs b/crates/ahm/src/heal/manager.rs index c2717ef2..39c5f8fd 100644 --- a/crates/ahm/src/heal/manager.rs +++ b/crates/ahm/src/heal/manager.rs @@ -143,16 +143,16 @@ impl PriorityHealQueue { format!("object:{}:{}:{}", bucket, object, version_id.as_deref().unwrap_or("")) } HealType::Bucket { bucket } => { - format!("bucket:{}", bucket) + format!("bucket:{bucket}") } HealType::ErasureSet { set_disk_id, .. } => { - format!("erasure_set:{}", set_disk_id) + format!("erasure_set:{set_disk_id}") } HealType::Metadata { bucket, object } => { - format!("metadata:{}:{}", bucket, object) + format!("metadata:{bucket}:{object}") } HealType::MRF { meta_path } => { - format!("mrf:{}", meta_path) + format!("mrf:{meta_path}") } HealType::ECDecode { bucket, @@ -173,7 +173,7 @@ impl PriorityHealQueue { /// Check if an erasure set heal request for a specific set_disk_id exists fn contains_erasure_set(&self, set_disk_id: &str) -> bool { - let key = format!("erasure_set:{}", set_disk_id); + let key = format!("erasure_set:{set_disk_id}"); self.dedup_keys.contains(&key) } } @@ -327,7 +327,7 @@ impl HealManager { if queue_len >= queue_capacity { return Err(Error::ConfigurationError { - message: format!("Heal queue is full ({}/{})", queue_len, queue_capacity), + message: format!("Heal queue is full ({queue_len}/{queue_capacity})"), }); } diff --git a/crates/common/src/heal_channel.rs b/crates/common/src/heal_channel.rs index 31b906d3..67b7f46d 100644 --- a/crates/common/src/heal_channel.rs +++ b/crates/common/src/heal_channel.rs @@ -125,7 +125,7 @@ impl<'de> Deserialize<'de> for HealScanMode { 0 => Ok(HealScanMode::Unknown), 1 => Ok(HealScanMode::Normal), 2 => Ok(HealScanMode::Deep), - _ => Err(E::custom(format!("invalid HealScanMode value: {}", value))), + _ => Err(E::custom(format!("invalid HealScanMode value: {value}"))), } } @@ -134,7 +134,7 @@ impl<'de> Deserialize<'de> for HealScanMode { E: serde::de::Error, { if value > u8::MAX as u64 { - return Err(E::custom(format!("HealScanMode value too large: {}", value))); + return Err(E::custom(format!("HealScanMode value too large: {value}"))); } self.visit_u8(value as u8) } @@ -144,7 +144,7 @@ impl<'de> Deserialize<'de> for HealScanMode { E: serde::de::Error, { if value < 0 || value > u8::MAX as i64 { - return Err(E::custom(format!("invalid HealScanMode value: {}", value))); + return Err(E::custom(format!("invalid HealScanMode value: {value}"))); } self.visit_u8(value as u8) } @@ -162,7 +162,7 @@ impl<'de> Deserialize<'de> for HealScanMode { "Unknown" | "unknown" => Ok(HealScanMode::Unknown), "Normal" | "normal" => Ok(HealScanMode::Normal), "Deep" | "deep" => Ok(HealScanMode::Deep), - _ => Err(E::custom(format!("invalid HealScanMode string: {}", value))), + _ => Err(E::custom(format!("invalid HealScanMode string: {value}"))), } } } diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index ea35e79c..5ed851e6 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -1985,24 +1985,20 @@ impl DiskAPI for LocalDisk { // TODO: Healing + let search_version_id = fi.version_id.or(Some(Uuid::nil())); + + // Check if there's an existing version with the same version_id that has a data_dir to clean up + // Note: For non-versioned buckets, fi.version_id is None, but in xl.meta it's stored as Some(Uuid::nil()) let has_old_data_dir = { - if let Ok((_, ver)) = xlmeta.find_version(fi.version_id) { - let has_data_dir = ver.get_data_dir(); - if let Some(data_dir) = has_data_dir { - if xlmeta.shard_data_dir_count(&fi.version_id, &Some(data_dir)) == 0 { - // TODO: Healing - // remove inlinedata\ - Some(data_dir) - } else { - None - } - } else { - None - } - } else { - None - } + xlmeta.find_version(search_version_id).ok().and_then(|(_, ver)| { + // shard_count == 0 means no other version shares this data_dir + ver.get_data_dir() + .filter(|&data_dir| xlmeta.shard_data_dir_count(&search_version_id, &Some(data_dir)) == 0) + }) }; + if let Some(old_data_dir) = has_old_data_dir.as_ref() { + let _ = xlmeta.data.remove(vec![search_version_id.unwrap_or_default(), *old_data_dir]); + } xlmeta.add_version(fi.clone())?; diff --git a/crates/filemeta/src/filemeta.rs b/crates/filemeta/src/filemeta.rs index 55eb4e98..ad3b0f9e 100644 --- a/crates/filemeta/src/filemeta.rs +++ b/crates/filemeta/src/filemeta.rs @@ -34,7 +34,7 @@ use std::{collections::HashMap, io::Cursor}; use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; use tokio::io::AsyncRead; -use tracing::error; +use tracing::{error, warn}; use uuid::Uuid; use xxhash_rust::xxh64; @@ -444,8 +444,9 @@ impl FileMeta { // Find version pub fn find_version(&self, vid: Option) -> Result<(usize, FileMetaVersion)> { + let vid = vid.unwrap_or_default(); for (i, fver) in self.versions.iter().enumerate() { - if fver.header.version_id == vid { + if fver.header.version_id == Some(vid) { let version = self.get_idx(i)?; return Ok((i, version)); } @@ -456,9 +457,12 @@ impl FileMeta { // shard_data_dir_count queries the count of data_dir under vid pub fn shard_data_dir_count(&self, vid: &Option, data_dir: &Option) -> usize { + let vid = vid.unwrap_or_default(); self.versions .iter() - .filter(|v| v.header.version_type == VersionType::Object && v.header.version_id != *vid && v.header.user_data_dir()) + .filter(|v| { + v.header.version_type == VersionType::Object && v.header.version_id != Some(vid) && v.header.user_data_dir() + }) .map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).unwrap_or_default()) .filter(|v| v == data_dir) .count() @@ -890,12 +894,11 @@ impl FileMeta { read_data: bool, all_parts: bool, ) -> Result { - let has_vid = { + let vid = { if !version_id.is_empty() { - let id = Uuid::parse_str(version_id)?; - if !id.is_nil() { Some(id) } else { None } + Uuid::parse_str(version_id)? } else { - None + Uuid::nil() } }; @@ -905,12 +908,12 @@ impl FileMeta { for ver in self.versions.iter() { let header = &ver.header; - if let Some(vid) = has_vid { - if header.version_id != Some(vid) { - is_latest = false; - succ_mod_time = header.mod_time; - continue; - } + // TODO: freeVersion + + if !version_id.is_empty() && header.version_id != Some(vid) { + is_latest = false; + succ_mod_time = header.mod_time; + continue; } let mut fi = ver.into_fileinfo(volume, path, all_parts)?; @@ -932,7 +935,7 @@ impl FileMeta { return Ok(fi); } - if has_vid.is_none() { + if version_id.is_empty() { Err(Error::FileNotFound) } else { Err(Error::FileVersionNotFound) @@ -1091,13 +1094,10 @@ impl FileMeta { /// Count shared data directories pub fn shared_data_dir_count(&self, version_id: Option, data_dir: Option) -> usize { + let version_id = version_id.unwrap_or_default(); + if self.data.entries().unwrap_or_default() > 0 - && version_id.is_some() - && self - .data - .find(version_id.unwrap().to_string().as_str()) - .unwrap_or_default() - .is_some() + && self.data.find(version_id.to_string().as_str()).unwrap_or_default().is_some() { return 0; } @@ -1105,7 +1105,9 @@ impl FileMeta { self.versions .iter() .filter(|v| { - v.header.version_type == VersionType::Object && v.header.version_id != version_id && v.header.user_data_dir() + v.header.version_type == VersionType::Object + && v.header.version_id != Some(version_id) + && v.header.user_data_dir() }) .filter_map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).ok()) .filter(|&dir| dir == data_dir)