From dd0a744a3e1298e5a9614da6dd00c4a2d285c9f4 Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 31 Dec 2024 16:57:01 +0800 Subject: [PATCH] fix: #137 #186 skip delete marker objects when list object --- ecstore/src/disk/error.rs | 4 ++ ecstore/src/disk/local.rs | 43 +++++++++++++++---- ecstore/src/disk/mod.rs | 4 +- ecstore/src/file_meta.rs | 71 ++++++++++++++++++++++++++++--- ecstore/src/set_disk.rs | 1 - ecstore/src/store.rs | 21 ++++----- ecstore/src/store_err.rs | 5 +++ ecstore/src/store_list_objects.rs | 4 +- rustfs/src/storage/ecfs.rs | 2 +- rustfs/src/storage/error.rs | 1 + scripts/run.sh | 4 +- 11 files changed, 128 insertions(+), 32 deletions(-) diff --git a/ecstore/src/disk/error.rs b/ecstore/src/disk/error.rs index 5a61de76..a98c9d58 100644 --- a/ecstore/src/disk/error.rs +++ b/ecstore/src/disk/error.rs @@ -355,6 +355,10 @@ pub fn is_err_file_not_found(err: &Error) -> bool { matches!(err.downcast_ref::(), Some(DiskError::FileNotFound)) } +pub fn is_err_file_version_not_found(err: &Error) -> bool { + matches!(err.downcast_ref::(), Some(DiskError::FileVersionNotFound)) +} + pub fn is_err_volume_not_found(err: &Error) -> bool { matches!(err.downcast_ref::(), Some(DiskError::VolumeNotFound)) } diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index f314a2a5..d28b0b61 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -1,5 +1,6 @@ use super::error::{ - is_err_file_not_found, is_sys_err_io, is_sys_err_not_empty, is_sys_err_too_many_files, os_is_not_exist, os_is_permission, + is_err_file_not_found, is_err_file_version_not_found, is_sys_err_io, is_sys_err_not_empty, is_sys_err_too_many_files, + os_is_not_exist, os_is_permission, }; use super::os::{is_root_disk, rename_all}; use super::{endpoint::Endpoint, error::DiskError, format::FormatV3}; @@ -560,10 +561,22 @@ impl LocalDisk { let volume_dir = self.get_bucket_path(volume)?; let xlpath = self.get_object_path(volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str())?; - let data = self - .read_all_data(volume, volume_dir.as_path(), &xlpath) - .await - .unwrap_or_default(); + let data = match self.read_all_data_with_dmtime(volume, volume_dir.as_path(), &xlpath).await { + Ok((data, _)) => data, + Err(err) => { + if is_err_file_not_found(&err) && !skip_access_checks(volume) { + if let Err(er) = access(&volume_dir).await { + if os_is_not_exist(&er) { + return Err(Error::new(DiskError::VolumeNotFound)); + } + } + + return Err(Error::new(DiskError::FileNotFound)); + } + + return Err(err); + } + }; if data.is_empty() { return Err(Error::new(DiskError::FileNotFound)); @@ -574,10 +587,22 @@ impl LocalDisk { fm.unmarshal_msg(&data)?; for fi in fis { - let data_dir = fm.delete_version(fi)?; + let data_dir = match fm.delete_version(fi) { + Ok(res) => res, + Err(err) => { + if !fi.deleted && (is_err_file_not_found(&err) || is_err_file_version_not_found(&err)) { + continue; + } - if data_dir.is_some() { - let dir_path = self.get_object_path(volume, format!("{}/{}", path, data_dir.unwrap()).as_str())?; + return Err(err); + } + }; + + if let Some(dir) = data_dir { + let vid = fi.version_id.unwrap_or(Uuid::nil()); + let _ = fm.data.remove(vec![vid, dir]); + + let dir_path = self.get_object_path(volume, format!("{}/{}", path, dir).as_str())?; self.move_to_trash(&dir_path, true, false).await?; } } @@ -761,7 +786,7 @@ impl LocalDisk { Ok(res) => res, Err(e) => { if !DiskError::VolumeNotFound.is(&e) && !is_err_file_not_found(&e) { - warn!("scan list_dir {}, err {:?}", ¤t, &e); + info!("scan list_dir {}, err {:?}", ¤t, &e); } if opts.report_notfound && is_err_file_not_found(&e) && current == &opts.base_dir { diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index d3d5f08f..32bc2ca5 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -662,7 +662,7 @@ impl MetaCacheEntry { !self.metadata.is_empty() && self.name.ends_with(SLASH_SEPARATOR) } - pub fn is_latest_deletemarker(&mut self) -> bool { + pub fn is_latest_delete_marker(&mut self) -> bool { if let Some(cached) = &self.cached { if cached.versions.is_empty() { return true; @@ -678,7 +678,7 @@ impl MetaCacheEntry { match FileMeta::check_xl2_v1(&self.metadata) { Ok((meta, _, _)) => { if !meta.is_empty() { - // TODO: IsLatestDeleteMarker + return FileMeta::is_latest_delete_marker(meta); } } Err(_) => return true, diff --git a/ecstore/src/file_meta.rs b/ecstore/src/file_meta.rs index 8f6b03ea..e130ada4 100644 --- a/ecstore/src/file_meta.rs +++ b/ecstore/src/file_meta.rs @@ -14,6 +14,7 @@ use xxhash_rust::xxh64; use crate::disk::FileInfoVersions; use crate::file_meta_inline::InlineData; use crate::store_api::RawFileInfo; +use crate::store_err::StorageError; use crate::{ disk::error::DiskError, error::{Error, Result}, @@ -39,6 +40,8 @@ const _XL_FLAG_INLINE_DATA: u8 = 1 << 2; const META_DATA_READ_DEFAULT: usize = 4 << 10; const MSGP_UINT32_SIZE: usize = 5; +// type ScanHeaderVersionFn = Box Result<()>>; + #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub struct FileMeta { pub versions: Vec, @@ -128,9 +131,9 @@ impl FileMeta { // 解析meta if !meta.is_empty() { - let (versions_len, _, meta_ver, read_size) = Self::decode_head_ver(meta)?; + let (versions_len, _, meta_ver, meta) = Self::decode_xl_headers(meta)?; - let (_, meta) = meta.split_at(read_size as usize); + // let (_, meta) = meta.split_at(read_size as usize); self.meta_ver = meta_ver; @@ -164,9 +167,9 @@ impl FileMeta { Ok(i) } - // decode_head_ver 解析 meta 头,返回 (versions数量,xl_header_version, xl_meta_version, 已读数据长度) + // decode_xl_headers 解析 meta 头,返回 (versions数量,xl_header_version, xl_meta_version, 已读数据长度) #[tracing::instrument] - fn decode_head_ver(buf: &[u8]) -> Result<(usize, u8, u8, u64)> { + fn decode_xl_headers(buf: &[u8]) -> Result<(usize, u8, u8, &[u8])> { let mut cur = Cursor::new(buf); let header_ver: u8 = rmp::decode::read_int(&mut cur)?; @@ -182,7 +185,65 @@ impl FileMeta { let versions_len: usize = rmp::decode::read_int(&mut cur)?; - Ok((versions_len, header_ver, meta_ver, cur.position())) + Ok((versions_len, header_ver, meta_ver, &buf[cur.position() as usize..])) + } + + fn decode_versions Result<()>>(buf: &[u8], versions: usize, mut fnc: F) -> Result<()> { + let mut cur: Cursor<&[u8]> = Cursor::new(buf); + + for i in 0..versions { + let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize; + let start = cur.position() as usize; + let end = start + bin_len; + let header_buf = &buf[start..end]; + + cur.set_position(end as u64); + + let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize; + let start = cur.position() as usize; + let end = start + bin_len; + let ver_meta_buf = &buf[start..end]; + + cur.set_position(end as u64); + + if let Err(err) = fnc(i, header_buf, ver_meta_buf) { + if let Some(e) = err.downcast_ref::() { + if e == &StorageError::DoneForNow { + return Ok(()); + } + } + + return Err(err); + } + } + + Ok(()) + } + + pub fn is_latest_delete_marker(buf: &[u8]) -> bool { + let header = Self::decode_xl_headers(buf).ok(); + if let Some((versions, _hdr_v, _meta_v, meta)) = header { + if versions == 0 { + return false; + } + + let mut is_delete_marker = false; + + let _ = Self::decode_versions(meta, versions, |_: usize, hdr: &[u8], _: &[u8]| { + let mut header = FileMetaVersionHeader::default(); + if header.unmarshal_msg(hdr).is_err() { + return Err(Error::new(StorageError::DoneForNow)); + } + + is_delete_marker = header.version_type == VersionType::Delete; + + Err(Error::new(StorageError::DoneForNow)) + }); + + is_delete_marker + } else { + false + } } #[tracing::instrument] diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 1584878a..79c78eb9 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -3845,7 +3845,6 @@ impl StorageAPI for SetDisks { }; if vr.deleted { - error!("delete marker {:?}", &vr); del_objects[i] = DeletedObject { delete_marker: vr.deleted, delete_marker_version_id: vr.version_id.map(|v| v.to_string()), diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index bd27a35f..9014eaa3 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -1366,17 +1366,18 @@ impl StorageAPI for ECStore { for (i, res) in results.into_iter().enumerate() { match res { Ok((pinfo, _)) => { - if pinfo.object_info.delete_marker && opts.version_id.is_none() { - del_objects[i] = DeletedObject { - delete_marker: pinfo.object_info.delete_marker, - delete_marker_version_id: pinfo.object_info.version_id.map(|v| v.to_string()), - object_name: utils::path::decode_dir_object(&pinfo.object_info.name), - delete_marker_mtime: pinfo.object_info.mod_time, - ..Default::default() - }; - } - if let Some(obj) = objects.get(i) { + if pinfo.object_info.delete_marker && obj.version_id.is_none() { + del_objects[i] = DeletedObject { + delete_marker: pinfo.object_info.delete_marker, + delete_marker_version_id: pinfo.object_info.version_id.map(|v| v.to_string()), + object_name: utils::path::decode_dir_object(&pinfo.object_info.name), + delete_marker_mtime: pinfo.object_info.mod_time, + ..Default::default() + }; + continue; + } + if !pool_obj_idx_map.contains_key(&pinfo.index) { pool_obj_idx_map.insert(pinfo.index, vec![obj.clone()]); } else if let Some(val) = pool_obj_idx_map.get_mut(&pinfo.index) { diff --git a/ecstore/src/store_err.rs b/ecstore/src/store_err.rs index d02406e4..74498c17 100644 --- a/ecstore/src/store_err.rs +++ b/ecstore/src/store_err.rs @@ -80,6 +80,9 @@ pub enum StorageError { #[error("Decommission not started")] DecommissionNotStarted, + + #[error("DoneForNow")] + DoneForNow, } impl StorageError { @@ -111,6 +114,7 @@ impl StorageError { StorageError::DecommissionNotStarted => 0x18, StorageError::InvalidPart(_, _, _) => 0x19, StorageError::VolumeNotFound(_) => 0x20, + StorageError::DoneForNow => 0x21, } } @@ -146,6 +150,7 @@ impl StorageError { 0x18 => Some(StorageError::DecommissionNotStarted), 0x19 => Some(StorageError::InvalidPart(Default::default(), Default::default(), Default::default())), 0x20 => Some(StorageError::VolumeNotFound(Default::default())), + 0x21 => Some(StorageError::DoneForNow), _ => None, } } diff --git a/ecstore/src/store_list_objects.rs b/ecstore/src/store_list_objects.rs index 1515c939..6bc16998 100644 --- a/ecstore/src/store_list_objects.rs +++ b/ecstore/src/store_list_objects.rs @@ -692,7 +692,7 @@ async fn gather_results( // TODO: isLatestDeletemarker if !opts.include_directories - && (entry.is_dir() || (!opts.versioned && entry.is_object() && entry.is_latest_deletemarker())) + && (entry.is_dir() || (!opts.versioned && entry.is_object() && entry.is_latest_delete_marker())) { continue; } @@ -713,7 +713,7 @@ async fn gather_results( } } - if !opts.incl_deleted && entry.is_object() && entry.is_latest_deletemarker() && entry.is_object_dir() { + if !opts.incl_deleted && entry.is_object() && entry.is_latest_delete_marker() && entry.is_object_dir() { continue; } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index dcf65ce2..a3b08e1f 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -309,7 +309,7 @@ impl S3 for FS { async fn get_object(&self, req: S3Request) -> S3Result> { // mc get 3 - warn!("get_object input {:?}, vid {:?}", &req.input, req.input.version_id); + // warn!("get_object input {:?}, vid {:?}", &req.input, req.input.version_id); let GetObjectInput { bucket, key, version_id, .. diff --git a/rustfs/src/storage/error.rs b/rustfs/src/storage/error.rs index 43218d44..459a1b29 100644 --- a/rustfs/src/storage/error.rs +++ b/rustfs/src/storage/error.rs @@ -75,6 +75,7 @@ pub fn to_s3_error(err: Error) -> S3Error { version_id ) } + StorageError::DoneForNow => s3_error!(InternalError, "DoneForNow"), }; } diff --git a/scripts/run.sh b/scripts/run.sh index 4f1942d1..5b1326e6 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -18,8 +18,8 @@ fi export RUSTFS_STORAGE_CLASS_INLINE_BLOCK="512 KB" -# DATA_DIR_ARG="./target/volume/test{0...4}" -DATA_DIR_ARG="./target/volume/test" +DATA_DIR_ARG="./target/volume/test{0...4}" +# DATA_DIR_ARG="./target/volume/test" if [ -n "$1" ]; then DATA_DIR_ARG="$1"