Merge pull request #198 from rustfs/list-objects

fix: #137 #186 skip delete marker objects when list object
This commit is contained in:
weisd
2024-12-31 17:02:20 +08:00
committed by GitHub
11 changed files with 128 additions and 32 deletions

View File

@@ -355,6 +355,10 @@ pub fn is_err_file_not_found(err: &Error) -> bool {
matches!(err.downcast_ref::<DiskError>(), Some(DiskError::FileNotFound))
}
pub fn is_err_file_version_not_found(err: &Error) -> bool {
matches!(err.downcast_ref::<DiskError>(), Some(DiskError::FileVersionNotFound))
}
pub fn is_err_volume_not_found(err: &Error) -> bool {
matches!(err.downcast_ref::<DiskError>(), Some(DiskError::VolumeNotFound))
}

View File

@@ -1,5 +1,6 @@
use super::error::{
is_err_file_not_found, is_sys_err_io, is_sys_err_not_empty, is_sys_err_too_many_files, os_is_not_exist, os_is_permission,
is_err_file_not_found, is_err_file_version_not_found, is_sys_err_io, is_sys_err_not_empty, is_sys_err_too_many_files,
os_is_not_exist, os_is_permission,
};
use super::os::{is_root_disk, rename_all};
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
@@ -560,10 +561,22 @@ impl LocalDisk {
let volume_dir = self.get_bucket_path(volume)?;
let xlpath = self.get_object_path(volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str())?;
let data = self
.read_all_data(volume, volume_dir.as_path(), &xlpath)
.await
.unwrap_or_default();
let data = match self.read_all_data_with_dmtime(volume, volume_dir.as_path(), &xlpath).await {
Ok((data, _)) => data,
Err(err) => {
if is_err_file_not_found(&err) && !skip_access_checks(volume) {
if let Err(er) = access(&volume_dir).await {
if os_is_not_exist(&er) {
return Err(Error::new(DiskError::VolumeNotFound));
}
}
return Err(Error::new(DiskError::FileNotFound));
}
return Err(err);
}
};
if data.is_empty() {
return Err(Error::new(DiskError::FileNotFound));
@@ -574,10 +587,22 @@ impl LocalDisk {
fm.unmarshal_msg(&data)?;
for fi in fis {
let data_dir = fm.delete_version(fi)?;
let data_dir = match fm.delete_version(fi) {
Ok(res) => res,
Err(err) => {
if !fi.deleted && (is_err_file_not_found(&err) || is_err_file_version_not_found(&err)) {
continue;
}
if data_dir.is_some() {
let dir_path = self.get_object_path(volume, format!("{}/{}", path, data_dir.unwrap()).as_str())?;
return Err(err);
}
};
if let Some(dir) = data_dir {
let vid = fi.version_id.unwrap_or(Uuid::nil());
let _ = fm.data.remove(vec![vid, dir]);
let dir_path = self.get_object_path(volume, format!("{}/{}", path, dir).as_str())?;
self.move_to_trash(&dir_path, true, false).await?;
}
}
@@ -761,7 +786,7 @@ impl LocalDisk {
Ok(res) => res,
Err(e) => {
if !DiskError::VolumeNotFound.is(&e) && !is_err_file_not_found(&e) {
warn!("scan list_dir {}, err {:?}", &current, &e);
info!("scan list_dir {}, err {:?}", &current, &e);
}
if opts.report_notfound && is_err_file_not_found(&e) && current == &opts.base_dir {

View File

@@ -662,7 +662,7 @@ impl MetaCacheEntry {
!self.metadata.is_empty() && self.name.ends_with(SLASH_SEPARATOR)
}
pub fn is_latest_deletemarker(&mut self) -> bool {
pub fn is_latest_delete_marker(&mut self) -> bool {
if let Some(cached) = &self.cached {
if cached.versions.is_empty() {
return true;
@@ -678,7 +678,7 @@ impl MetaCacheEntry {
match FileMeta::check_xl2_v1(&self.metadata) {
Ok((meta, _, _)) => {
if !meta.is_empty() {
// TODO: IsLatestDeleteMarker
return FileMeta::is_latest_delete_marker(meta);
}
}
Err(_) => return true,

View File

@@ -14,6 +14,7 @@ use xxhash_rust::xxh64;
use crate::disk::FileInfoVersions;
use crate::file_meta_inline::InlineData;
use crate::store_api::RawFileInfo;
use crate::store_err::StorageError;
use crate::{
disk::error::DiskError,
error::{Error, Result},
@@ -39,6 +40,8 @@ const _XL_FLAG_INLINE_DATA: u8 = 1 << 2;
const META_DATA_READ_DEFAULT: usize = 4 << 10;
const MSGP_UINT32_SIZE: usize = 5;
// type ScanHeaderVersionFn = Box<dyn Fn(usize, &[u8], &[u8]) -> Result<()>>;
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct FileMeta {
pub versions: Vec<FileMetaShallowVersion>,
@@ -128,9 +131,9 @@ impl FileMeta {
// 解析meta
if !meta.is_empty() {
let (versions_len, _, meta_ver, read_size) = Self::decode_head_ver(meta)?;
let (versions_len, _, meta_ver, meta) = Self::decode_xl_headers(meta)?;
let (_, meta) = meta.split_at(read_size as usize);
// let (_, meta) = meta.split_at(read_size as usize);
self.meta_ver = meta_ver;
@@ -164,9 +167,9 @@ impl FileMeta {
Ok(i)
}
// decode_head_ver 解析 meta 头,返回 (versions数量xl_header_version, xl_meta_version, 已读数据长度)
// decode_xl_headers 解析 meta 头,返回 (versions数量xl_header_version, xl_meta_version, 已读数据长度)
#[tracing::instrument]
fn decode_head_ver(buf: &[u8]) -> Result<(usize, u8, u8, u64)> {
fn decode_xl_headers(buf: &[u8]) -> Result<(usize, u8, u8, &[u8])> {
let mut cur = Cursor::new(buf);
let header_ver: u8 = rmp::decode::read_int(&mut cur)?;
@@ -182,7 +185,65 @@ impl FileMeta {
let versions_len: usize = rmp::decode::read_int(&mut cur)?;
Ok((versions_len, header_ver, meta_ver, cur.position()))
Ok((versions_len, header_ver, meta_ver, &buf[cur.position() as usize..]))
}
fn decode_versions<F: FnMut(usize, &[u8], &[u8]) -> Result<()>>(buf: &[u8], versions: usize, mut fnc: F) -> Result<()> {
let mut cur: Cursor<&[u8]> = Cursor::new(buf);
for i in 0..versions {
let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize;
let start = cur.position() as usize;
let end = start + bin_len;
let header_buf = &buf[start..end];
cur.set_position(end as u64);
let bin_len = rmp::decode::read_bin_len(&mut cur)? as usize;
let start = cur.position() as usize;
let end = start + bin_len;
let ver_meta_buf = &buf[start..end];
cur.set_position(end as u64);
if let Err(err) = fnc(i, header_buf, ver_meta_buf) {
if let Some(e) = err.downcast_ref::<StorageError>() {
if e == &StorageError::DoneForNow {
return Ok(());
}
}
return Err(err);
}
}
Ok(())
}
pub fn is_latest_delete_marker(buf: &[u8]) -> bool {
let header = Self::decode_xl_headers(buf).ok();
if let Some((versions, _hdr_v, _meta_v, meta)) = header {
if versions == 0 {
return false;
}
let mut is_delete_marker = false;
let _ = Self::decode_versions(meta, versions, |_: usize, hdr: &[u8], _: &[u8]| {
let mut header = FileMetaVersionHeader::default();
if header.unmarshal_msg(hdr).is_err() {
return Err(Error::new(StorageError::DoneForNow));
}
is_delete_marker = header.version_type == VersionType::Delete;
Err(Error::new(StorageError::DoneForNow))
});
is_delete_marker
} else {
false
}
}
#[tracing::instrument]

View File

@@ -3845,7 +3845,6 @@ impl StorageAPI for SetDisks {
};
if vr.deleted {
error!("delete marker {:?}", &vr);
del_objects[i] = DeletedObject {
delete_marker: vr.deleted,
delete_marker_version_id: vr.version_id.map(|v| v.to_string()),

View File

@@ -1366,17 +1366,18 @@ impl StorageAPI for ECStore {
for (i, res) in results.into_iter().enumerate() {
match res {
Ok((pinfo, _)) => {
if pinfo.object_info.delete_marker && opts.version_id.is_none() {
del_objects[i] = DeletedObject {
delete_marker: pinfo.object_info.delete_marker,
delete_marker_version_id: pinfo.object_info.version_id.map(|v| v.to_string()),
object_name: utils::path::decode_dir_object(&pinfo.object_info.name),
delete_marker_mtime: pinfo.object_info.mod_time,
..Default::default()
};
}
if let Some(obj) = objects.get(i) {
if pinfo.object_info.delete_marker && obj.version_id.is_none() {
del_objects[i] = DeletedObject {
delete_marker: pinfo.object_info.delete_marker,
delete_marker_version_id: pinfo.object_info.version_id.map(|v| v.to_string()),
object_name: utils::path::decode_dir_object(&pinfo.object_info.name),
delete_marker_mtime: pinfo.object_info.mod_time,
..Default::default()
};
continue;
}
if !pool_obj_idx_map.contains_key(&pinfo.index) {
pool_obj_idx_map.insert(pinfo.index, vec![obj.clone()]);
} else if let Some(val) = pool_obj_idx_map.get_mut(&pinfo.index) {

View File

@@ -80,6 +80,9 @@ pub enum StorageError {
#[error("Decommission not started")]
DecommissionNotStarted,
#[error("DoneForNow")]
DoneForNow,
}
impl StorageError {
@@ -111,6 +114,7 @@ impl StorageError {
StorageError::DecommissionNotStarted => 0x18,
StorageError::InvalidPart(_, _, _) => 0x19,
StorageError::VolumeNotFound(_) => 0x20,
StorageError::DoneForNow => 0x21,
}
}
@@ -146,6 +150,7 @@ impl StorageError {
0x18 => Some(StorageError::DecommissionNotStarted),
0x19 => Some(StorageError::InvalidPart(Default::default(), Default::default(), Default::default())),
0x20 => Some(StorageError::VolumeNotFound(Default::default())),
0x21 => Some(StorageError::DoneForNow),
_ => None,
}
}

View File

@@ -692,7 +692,7 @@ async fn gather_results(
// TODO: isLatestDeletemarker
if !opts.include_directories
&& (entry.is_dir() || (!opts.versioned && entry.is_object() && entry.is_latest_deletemarker()))
&& (entry.is_dir() || (!opts.versioned && entry.is_object() && entry.is_latest_delete_marker()))
{
continue;
}
@@ -713,7 +713,7 @@ async fn gather_results(
}
}
if !opts.incl_deleted && entry.is_object() && entry.is_latest_deletemarker() && entry.is_object_dir() {
if !opts.incl_deleted && entry.is_object() && entry.is_latest_delete_marker() && entry.is_object_dir() {
continue;
}

View File

@@ -309,7 +309,7 @@ impl S3 for FS {
async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
// mc get 3
warn!("get_object input {:?}, vid {:?}", &req.input, req.input.version_id);
// warn!("get_object input {:?}, vid {:?}", &req.input, req.input.version_id);
let GetObjectInput {
bucket, key, version_id, ..

View File

@@ -75,6 +75,7 @@ pub fn to_s3_error(err: Error) -> S3Error {
version_id
)
}
StorageError::DoneForNow => s3_error!(InternalError, "DoneForNow"),
};
}

View File

@@ -18,8 +18,8 @@ fi
export RUSTFS_STORAGE_CLASS_INLINE_BLOCK="512 KB"
# DATA_DIR_ARG="./target/volume/test{0...4}"
DATA_DIR_ARG="./target/volume/test"
DATA_DIR_ARG="./target/volume/test{0...4}"
# DATA_DIR_ARG="./target/volume/test"
if [ -n "$1" ]; then
DATA_DIR_ARG="$1"