mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
fix filemeta nil versionid (#1002)
This commit is contained in:
1
.vscode/launch.json
vendored
1
.vscode/launch.json
vendored
@@ -22,6 +22,7 @@
|
||||
"env": {
|
||||
"RUST_LOG": "rustfs=debug,ecstore=info,s3s=debug,iam=debug",
|
||||
"RUSTFS_SKIP_BACKGROUND_TASK": "on",
|
||||
//"RUSTFS_OBS_LOG_DIRECTORY": "./deploy/logs",
|
||||
// "RUSTFS_POLICY_PLUGIN_URL":"http://localhost:8181/v1/data/rustfs/authz/allow",
|
||||
// "RUSTFS_POLICY_PLUGIN_AUTH_TOKEN":"your-opa-token"
|
||||
},
|
||||
|
||||
@@ -143,16 +143,16 @@ impl PriorityHealQueue {
|
||||
format!("object:{}:{}:{}", bucket, object, version_id.as_deref().unwrap_or(""))
|
||||
}
|
||||
HealType::Bucket { bucket } => {
|
||||
format!("bucket:{}", bucket)
|
||||
format!("bucket:{bucket}")
|
||||
}
|
||||
HealType::ErasureSet { set_disk_id, .. } => {
|
||||
format!("erasure_set:{}", set_disk_id)
|
||||
format!("erasure_set:{set_disk_id}")
|
||||
}
|
||||
HealType::Metadata { bucket, object } => {
|
||||
format!("metadata:{}:{}", bucket, object)
|
||||
format!("metadata:{bucket}:{object}")
|
||||
}
|
||||
HealType::MRF { meta_path } => {
|
||||
format!("mrf:{}", meta_path)
|
||||
format!("mrf:{meta_path}")
|
||||
}
|
||||
HealType::ECDecode {
|
||||
bucket,
|
||||
@@ -173,7 +173,7 @@ impl PriorityHealQueue {
|
||||
|
||||
/// Check if an erasure set heal request for a specific set_disk_id exists
|
||||
fn contains_erasure_set(&self, set_disk_id: &str) -> bool {
|
||||
let key = format!("erasure_set:{}", set_disk_id);
|
||||
let key = format!("erasure_set:{set_disk_id}");
|
||||
self.dedup_keys.contains(&key)
|
||||
}
|
||||
}
|
||||
@@ -327,7 +327,7 @@ impl HealManager {
|
||||
|
||||
if queue_len >= queue_capacity {
|
||||
return Err(Error::ConfigurationError {
|
||||
message: format!("Heal queue is full ({}/{})", queue_len, queue_capacity),
|
||||
message: format!("Heal queue is full ({queue_len}/{queue_capacity})"),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -125,7 +125,7 @@ impl<'de> Deserialize<'de> for HealScanMode {
|
||||
0 => Ok(HealScanMode::Unknown),
|
||||
1 => Ok(HealScanMode::Normal),
|
||||
2 => Ok(HealScanMode::Deep),
|
||||
_ => Err(E::custom(format!("invalid HealScanMode value: {}", value))),
|
||||
_ => Err(E::custom(format!("invalid HealScanMode value: {value}"))),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,7 +134,7 @@ impl<'de> Deserialize<'de> for HealScanMode {
|
||||
E: serde::de::Error,
|
||||
{
|
||||
if value > u8::MAX as u64 {
|
||||
return Err(E::custom(format!("HealScanMode value too large: {}", value)));
|
||||
return Err(E::custom(format!("HealScanMode value too large: {value}")));
|
||||
}
|
||||
self.visit_u8(value as u8)
|
||||
}
|
||||
@@ -144,7 +144,7 @@ impl<'de> Deserialize<'de> for HealScanMode {
|
||||
E: serde::de::Error,
|
||||
{
|
||||
if value < 0 || value > u8::MAX as i64 {
|
||||
return Err(E::custom(format!("invalid HealScanMode value: {}", value)));
|
||||
return Err(E::custom(format!("invalid HealScanMode value: {value}")));
|
||||
}
|
||||
self.visit_u8(value as u8)
|
||||
}
|
||||
@@ -162,7 +162,7 @@ impl<'de> Deserialize<'de> for HealScanMode {
|
||||
"Unknown" | "unknown" => Ok(HealScanMode::Unknown),
|
||||
"Normal" | "normal" => Ok(HealScanMode::Normal),
|
||||
"Deep" | "deep" => Ok(HealScanMode::Deep),
|
||||
_ => Err(E::custom(format!("invalid HealScanMode string: {}", value))),
|
||||
_ => Err(E::custom(format!("invalid HealScanMode string: {value}"))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1985,24 +1985,20 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
// TODO: Healing
|
||||
|
||||
let search_version_id = fi.version_id.or(Some(Uuid::nil()));
|
||||
|
||||
// Check if there's an existing version with the same version_id that has a data_dir to clean up
|
||||
// Note: For non-versioned buckets, fi.version_id is None, but in xl.meta it's stored as Some(Uuid::nil())
|
||||
let has_old_data_dir = {
|
||||
if let Ok((_, ver)) = xlmeta.find_version(fi.version_id) {
|
||||
let has_data_dir = ver.get_data_dir();
|
||||
if let Some(data_dir) = has_data_dir {
|
||||
if xlmeta.shard_data_dir_count(&fi.version_id, &Some(data_dir)) == 0 {
|
||||
// TODO: Healing
|
||||
// remove inlinedata\
|
||||
Some(data_dir)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
xlmeta.find_version(search_version_id).ok().and_then(|(_, ver)| {
|
||||
// shard_count == 0 means no other version shares this data_dir
|
||||
ver.get_data_dir()
|
||||
.filter(|&data_dir| xlmeta.shard_data_dir_count(&search_version_id, &Some(data_dir)) == 0)
|
||||
})
|
||||
};
|
||||
if let Some(old_data_dir) = has_old_data_dir.as_ref() {
|
||||
let _ = xlmeta.data.remove(vec![search_version_id.unwrap_or_default(), *old_data_dir]);
|
||||
}
|
||||
|
||||
xlmeta.add_version(fi.clone())?;
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ use std::{collections::HashMap, io::Cursor};
|
||||
use time::OffsetDateTime;
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
use tokio::io::AsyncRead;
|
||||
use tracing::error;
|
||||
use tracing::{error, warn};
|
||||
use uuid::Uuid;
|
||||
use xxhash_rust::xxh64;
|
||||
|
||||
@@ -444,8 +444,9 @@ impl FileMeta {
|
||||
|
||||
// Find version
|
||||
pub fn find_version(&self, vid: Option<Uuid>) -> Result<(usize, FileMetaVersion)> {
|
||||
let vid = vid.unwrap_or_default();
|
||||
for (i, fver) in self.versions.iter().enumerate() {
|
||||
if fver.header.version_id == vid {
|
||||
if fver.header.version_id == Some(vid) {
|
||||
let version = self.get_idx(i)?;
|
||||
return Ok((i, version));
|
||||
}
|
||||
@@ -456,9 +457,12 @@ impl FileMeta {
|
||||
|
||||
// shard_data_dir_count queries the count of data_dir under vid
|
||||
pub fn shard_data_dir_count(&self, vid: &Option<Uuid>, data_dir: &Option<Uuid>) -> usize {
|
||||
let vid = vid.unwrap_or_default();
|
||||
self.versions
|
||||
.iter()
|
||||
.filter(|v| v.header.version_type == VersionType::Object && v.header.version_id != *vid && v.header.user_data_dir())
|
||||
.filter(|v| {
|
||||
v.header.version_type == VersionType::Object && v.header.version_id != Some(vid) && v.header.user_data_dir()
|
||||
})
|
||||
.map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).unwrap_or_default())
|
||||
.filter(|v| v == data_dir)
|
||||
.count()
|
||||
@@ -890,12 +894,11 @@ impl FileMeta {
|
||||
read_data: bool,
|
||||
all_parts: bool,
|
||||
) -> Result<FileInfo> {
|
||||
let has_vid = {
|
||||
let vid = {
|
||||
if !version_id.is_empty() {
|
||||
let id = Uuid::parse_str(version_id)?;
|
||||
if !id.is_nil() { Some(id) } else { None }
|
||||
Uuid::parse_str(version_id)?
|
||||
} else {
|
||||
None
|
||||
Uuid::nil()
|
||||
}
|
||||
};
|
||||
|
||||
@@ -905,12 +908,12 @@ impl FileMeta {
|
||||
for ver in self.versions.iter() {
|
||||
let header = &ver.header;
|
||||
|
||||
if let Some(vid) = has_vid {
|
||||
if header.version_id != Some(vid) {
|
||||
is_latest = false;
|
||||
succ_mod_time = header.mod_time;
|
||||
continue;
|
||||
}
|
||||
// TODO: freeVersion
|
||||
|
||||
if !version_id.is_empty() && header.version_id != Some(vid) {
|
||||
is_latest = false;
|
||||
succ_mod_time = header.mod_time;
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut fi = ver.into_fileinfo(volume, path, all_parts)?;
|
||||
@@ -932,7 +935,7 @@ impl FileMeta {
|
||||
return Ok(fi);
|
||||
}
|
||||
|
||||
if has_vid.is_none() {
|
||||
if version_id.is_empty() {
|
||||
Err(Error::FileNotFound)
|
||||
} else {
|
||||
Err(Error::FileVersionNotFound)
|
||||
@@ -1091,13 +1094,10 @@ impl FileMeta {
|
||||
|
||||
/// Count shared data directories
|
||||
pub fn shared_data_dir_count(&self, version_id: Option<Uuid>, data_dir: Option<Uuid>) -> usize {
|
||||
let version_id = version_id.unwrap_or_default();
|
||||
|
||||
if self.data.entries().unwrap_or_default() > 0
|
||||
&& version_id.is_some()
|
||||
&& self
|
||||
.data
|
||||
.find(version_id.unwrap().to_string().as_str())
|
||||
.unwrap_or_default()
|
||||
.is_some()
|
||||
&& self.data.find(version_id.to_string().as_str()).unwrap_or_default().is_some()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
@@ -1105,7 +1105,9 @@ impl FileMeta {
|
||||
self.versions
|
||||
.iter()
|
||||
.filter(|v| {
|
||||
v.header.version_type == VersionType::Object && v.header.version_id != version_id && v.header.user_data_dir()
|
||||
v.header.version_type == VersionType::Object
|
||||
&& v.header.version_id != Some(version_id)
|
||||
&& v.header.user_data_dir()
|
||||
})
|
||||
.filter_map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).ok())
|
||||
.filter(|&dir| dir == data_dir)
|
||||
|
||||
Reference in New Issue
Block a user