stash delete_object

This commit is contained in:
weisd
2024-08-23 17:48:29 +08:00
parent 7f8854d7f5
commit 2058f765b0
3 changed files with 176 additions and 89 deletions

View File

@@ -1,7 +1,7 @@
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
use super::{
DeleteOptions, DiskAPI, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions,
RenameDataResp, VolumeInfo, WalkDirOptions,
DeleteOptions, DiskAPI, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp,
ReadOptions, RenameDataResp, VolumeInfo, WalkDirOptions,
};
use crate::disk::STORAGE_FORMAT_FILE;
use crate::{
@@ -138,8 +138,34 @@ impl LocalDisk {
Ok(())
}
pub async fn move_to_trash(&self, delete_path: &PathBuf, _recursive: bool, _immediate_purge: bool) -> Result<()> {
let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
// TODO: 清空回收站
if let Err(err) = fs::rename(&delete_path, &trash_path).await {
match err.kind() {
ErrorKind::NotFound => (),
_ => {
warn!("delete_file rename {:?} err {:?}", &delete_path, &err);
return Err(Error::from(err));
}
}
}
// FIXME: 先清空回收站吧,有时间再添加判断逻辑
let _ = fs::remove_dir_all(&trash_path).await;
// TODO: immediate
Ok(())
}
// #[tracing::instrument(skip(self))]
pub async fn delete_file(&self, base_path: &PathBuf, delete_path: &PathBuf, recursive: bool, _immediate: bool) -> Result<()> {
pub async fn delete_file(
&self,
base_path: &PathBuf,
delete_path: &PathBuf,
recursive: bool,
immediate_purge: bool,
) -> Result<()> {
debug!("delete_file {:?}\n base_path:{:?}", &delete_path, &base_path);
if is_root_path(base_path) || is_root_path(delete_path) {
@@ -153,29 +179,7 @@ impl LocalDisk {
}
if recursive {
let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
if let Some(dir_path) = trash_path.parent() {
fs::create_dir_all(dir_path).await?;
}
debug!("delete_file ranme to trash {:?} to {:?}", &delete_path, &trash_path);
// TODO: 清空回收站
if let Err(err) = fs::rename(&delete_path, &trash_path).await {
match err.kind() {
ErrorKind::NotFound => (),
_ => {
warn!("delete_file rename {:?} err {:?}", &delete_path, &err);
return Err(Error::from(err));
}
}
}
// FIXME: 先清空回收站吧,有时间再添加判断逻辑
let _ = fs::remove_dir_all(&trash_path).await;
// TODO: immediate
self.move_to_trash(delete_path, recursive, immediate_purge).await?;
} else {
if delete_path.is_dir() {
if let Err(err) = fs::remove_dir(&delete_path).await {
@@ -253,6 +257,51 @@ impl LocalDisk {
Ok((data, modtime))
}
async fn delete_versions_internal(&self, volume: &str, path: &str, fis: &Vec<FileInfo>) -> Result<()> {
let volume_dir = self.get_bucket_path(volume)?;
let xlpath = self.get_object_path(volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str())?;
let (data, _) = match self.read_all_data(volume, volume_dir.as_path(), &xlpath).await {
Ok(res) => res,
Err(_err) => {
// TODO: check if not found return err
(Vec::new(), OffsetDateTime::UNIX_EPOCH)
}
};
if data.is_empty() {
return Err(Error::new(DiskError::FileNotFound));
}
let mut fm = FileMeta::default();
fm.unmarshal_msg(&data)?;
for fi in fis {
let data_dir = fm.delete_version(fi)?;
if data_dir.is_some() {
let dir_path = self.get_object_path(volume, format!("{}/{}", path, data_dir.unwrap().to_string()).as_str())?;
self.move_to_trash(&dir_path, true, false).await?;
}
}
// 没有版本了删除xl.meta
if fm.versions.is_empty() {
self.delete_file(&volume_dir, &xlpath, true, false).await?;
}
// 更新xl.meta
let buf = fm.marshal_msg()?;
self.write_all(volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str(), buf)
.await?;
Ok(())
}
}
fn is_root_path(path: impl AsRef<Path>) -> bool {
@@ -815,6 +864,24 @@ impl DiskAPI for LocalDisk {
Ok(RawFileInfo { buf })
}
async fn delete_versions(
&self,
volume: &str,
versions: Vec<FileInfoVersions>,
_opts: DeleteOptions,
) -> Result<Vec<Option<Error>>> {
let mut errs = Vec::with_capacity(versions.len());
for (i, ver) in versions.iter().enumerate() {
if let Err(e) = self.delete_versions_internal(volume, ver.name.as_str(), &ver.versions).await {
errs[i] = Some(e);
} else {
errs[i] = None;
}
}
Ok(errs)
}
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>> {
let mut results = Vec::new();
let mut found = 0;

View File

@@ -13,7 +13,7 @@ const STORAGE_FORMAT_FILE: &str = "xl.meta";
use crate::{
erasure::ReadAt,
error::Result,
error::{Error, Result},
file_meta::FileMeta,
store_api::{FileInfo, RawFileInfo},
};
@@ -79,9 +79,31 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
opts: &ReadOptions,
) -> Result<FileInfo>;
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo>;
async fn delete_versions(
&self,
volume: &str,
versions: Vec<FileInfoVersions>,
opts: DeleteOptions,
) -> Result<Vec<Option<Error>>>;
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>>;
}
#[derive(Debug, Default, Clone)]
pub struct FileInfoVersions {
// Name of the volume.
pub volume: String,
// Name of the file.
pub name: String,
// Represents the latest mod time of the
// latest version.
pub latest_mod_time: Option<OffsetDateTime>,
pub versions: Vec<FileInfo>,
pub free_versions: Vec<FileInfo>,
}
#[derive(Debug, Default, Clone)]
pub struct WalkDirOptions {
// Bucket to scanner

View File

@@ -27,7 +27,66 @@ pub struct FileInfo {
pub is_latest: bool,
}
impl Default for FileInfo {
fn default() -> Self {
Self {
version_id: Uuid::nil(),
erasure: Default::default(),
deleted: Default::default(),
data_dir: Uuid::nil(),
mod_time: None,
size: Default::default(),
data: Default::default(),
fresh: Default::default(),
name: Default::default(),
volume: Default::default(),
parts: Default::default(),
is_latest: Default::default(),
}
}
}
impl FileInfo {
pub fn new(object: &str, data_blocks: usize, parity_blocks: usize) -> Self {
let indexs = {
let cardinality = data_blocks + parity_blocks;
let mut nums = vec![0; cardinality];
let key_crc = crc32fast::hash(object.as_bytes());
let start = key_crc as usize % cardinality;
for i in 1..=cardinality {
nums[i - 1] = 1 + ((start + i) % cardinality);
}
nums
};
Self {
erasure: ErasureInfo {
algorithm: String::from(ERASURE_ALGORITHM),
data_blocks,
parity_blocks,
block_size: BLOCK_SIZE_V2,
distribution: indexs,
..Default::default()
},
..Default::default()
}
}
pub fn is_valid(&self) -> bool {
if self.deleted {
return true;
}
let data_blocks = self.erasure.data_blocks;
let parity_blocks = self.erasure.parity_blocks;
(data_blocks >= parity_blocks)
&& (data_blocks > 0)
&& (self.erasure.index > 0
&& self.erasure.index <= data_blocks + parity_blocks
&& self.erasure.distribution.len() == (data_blocks + parity_blocks))
}
pub fn is_remote(&self) -> bool {
// TODO: when lifecycle
false
@@ -113,68 +172,6 @@ impl FileInfo {
}
}
impl Default for FileInfo {
fn default() -> Self {
Self {
version_id: Uuid::nil(),
erasure: Default::default(),
deleted: Default::default(),
data_dir: Uuid::nil(),
mod_time: None,
size: Default::default(),
data: Default::default(),
fresh: Default::default(),
name: Default::default(),
volume: Default::default(),
parts: Default::default(),
is_latest: Default::default(),
}
}
}
impl FileInfo {
pub fn new(object: &str, data_blocks: usize, parity_blocks: usize) -> Self {
let indexs = {
let cardinality = data_blocks + parity_blocks;
let mut nums = vec![0; cardinality];
let key_crc = crc32fast::hash(object.as_bytes());
let start = key_crc as usize % cardinality;
for i in 1..=cardinality {
nums[i - 1] = 1 + ((start + i) % cardinality);
}
nums
};
Self {
erasure: ErasureInfo {
algorithm: String::from(ERASURE_ALGORITHM),
data_blocks,
parity_blocks,
block_size: BLOCK_SIZE_V2,
distribution: indexs,
..Default::default()
},
..Default::default()
}
}
pub fn is_valid(&self) -> bool {
if self.deleted {
return true;
}
let data_blocks = self.erasure.data_blocks;
let parity_blocks = self.erasure.parity_blocks;
(data_blocks >= parity_blocks)
&& (data_blocks > 0)
&& (self.erasure.index > 0
&& self.erasure.index <= data_blocks + parity_blocks
&& self.erasure.distribution.len() == (data_blocks + parity_blocks))
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
pub struct ObjectPartInfo {
// pub etag: Option<String>,
@@ -474,6 +471,7 @@ pub struct ListObjectsV2Info {
#[derive(Debug, Default, Clone)]
pub struct ObjectToDelete {
pub object_name: String,
pub version_id: Option<Uuid>,
}
#[derive(Debug, Default, Clone)]
pub struct DeletedObject {