diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 9de369ac..407c779e 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -1,7 +1,7 @@ use super::{endpoint::Endpoint, error::DiskError, format::FormatV3}; use super::{ - DeleteOptions, DiskAPI, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, - RenameDataResp, VolumeInfo, WalkDirOptions, + DeleteOptions, DiskAPI, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, + ReadOptions, RenameDataResp, VolumeInfo, WalkDirOptions, }; use crate::disk::STORAGE_FORMAT_FILE; use crate::{ @@ -138,8 +138,34 @@ impl LocalDisk { Ok(()) } + pub async fn move_to_trash(&self, delete_path: &PathBuf, _recursive: bool, _immediate_purge: bool) -> Result<()> { + let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?; + // TODO: 清空回收站 + if let Err(err) = fs::rename(&delete_path, &trash_path).await { + match err.kind() { + ErrorKind::NotFound => (), + _ => { + warn!("delete_file rename {:?} err {:?}", &delete_path, &err); + return Err(Error::from(err)); + } + } + } + + // FIXME: 先清空回收站吧,有时间再添加判断逻辑 + let _ = fs::remove_dir_all(&trash_path).await; + + // TODO: immediate + Ok(()) + } + // #[tracing::instrument(skip(self))] - pub async fn delete_file(&self, base_path: &PathBuf, delete_path: &PathBuf, recursive: bool, _immediate: bool) -> Result<()> { + pub async fn delete_file( + &self, + base_path: &PathBuf, + delete_path: &PathBuf, + recursive: bool, + immediate_purge: bool, + ) -> Result<()> { debug!("delete_file {:?}\n base_path:{:?}", &delete_path, &base_path); if is_root_path(base_path) || is_root_path(delete_path) { @@ -153,29 +179,7 @@ impl LocalDisk { } if recursive { - let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?; - - if let Some(dir_path) = trash_path.parent() { - fs::create_dir_all(dir_path).await?; - } - - debug!("delete_file ranme to trash {:?} to {:?}", &delete_path, &trash_path); - - // TODO: 清空回收站 - if let Err(err) = fs::rename(&delete_path, &trash_path).await { - match err.kind() { - ErrorKind::NotFound => (), - _ => { - warn!("delete_file rename {:?} err {:?}", &delete_path, &err); - return Err(Error::from(err)); - } - } - } - - // FIXME: 先清空回收站吧,有时间再添加判断逻辑 - let _ = fs::remove_dir_all(&trash_path).await; - - // TODO: immediate + self.move_to_trash(delete_path, recursive, immediate_purge).await?; } else { if delete_path.is_dir() { if let Err(err) = fs::remove_dir(&delete_path).await { @@ -253,6 +257,51 @@ impl LocalDisk { Ok((data, modtime)) } + + async fn delete_versions_internal(&self, volume: &str, path: &str, fis: &Vec) -> Result<()> { + let volume_dir = self.get_bucket_path(volume)?; + let xlpath = self.get_object_path(volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str())?; + + let (data, _) = match self.read_all_data(volume, volume_dir.as_path(), &xlpath).await { + Ok(res) => res, + Err(_err) => { + // TODO: check if not found return err + + (Vec::new(), OffsetDateTime::UNIX_EPOCH) + } + }; + + if data.is_empty() { + return Err(Error::new(DiskError::FileNotFound)); + } + + let mut fm = FileMeta::default(); + + fm.unmarshal_msg(&data)?; + + for fi in fis { + let data_dir = fm.delete_version(fi)?; + + if data_dir.is_some() { + let dir_path = self.get_object_path(volume, format!("{}/{}", path, data_dir.unwrap().to_string()).as_str())?; + + self.move_to_trash(&dir_path, true, false).await?; + } + } + + // 没有版本了,删除xl.meta + if fm.versions.is_empty() { + self.delete_file(&volume_dir, &xlpath, true, false).await?; + } + + // 更新xl.meta + let buf = fm.marshal_msg()?; + + self.write_all(volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str(), buf) + .await?; + + Ok(()) + } } fn is_root_path(path: impl AsRef) -> bool { @@ -815,6 +864,24 @@ impl DiskAPI for LocalDisk { Ok(RawFileInfo { buf }) } + async fn delete_versions( + &self, + volume: &str, + versions: Vec, + _opts: DeleteOptions, + ) -> Result>> { + let mut errs = Vec::with_capacity(versions.len()); + + for (i, ver) in versions.iter().enumerate() { + if let Err(e) = self.delete_versions_internal(volume, ver.name.as_str(), &ver.versions).await { + errs[i] = Some(e); + } else { + errs[i] = None; + } + } + + Ok(errs) + } async fn read_multiple(&self, req: ReadMultipleReq) -> Result> { let mut results = Vec::new(); let mut found = 0; diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index b694f99b..71701467 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -13,7 +13,7 @@ const STORAGE_FORMAT_FILE: &str = "xl.meta"; use crate::{ erasure::ReadAt, - error::Result, + error::{Error, Result}, file_meta::FileMeta, store_api::{FileInfo, RawFileInfo}, }; @@ -79,9 +79,31 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { opts: &ReadOptions, ) -> Result; async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result; + async fn delete_versions( + &self, + volume: &str, + versions: Vec, + opts: DeleteOptions, + ) -> Result>>; async fn read_multiple(&self, req: ReadMultipleReq) -> Result>; } +#[derive(Debug, Default, Clone)] +pub struct FileInfoVersions { + // Name of the volume. + pub volume: String, + + // Name of the file. + pub name: String, + + // Represents the latest mod time of the + // latest version. + pub latest_mod_time: Option, + + pub versions: Vec, + pub free_versions: Vec, +} + #[derive(Debug, Default, Clone)] pub struct WalkDirOptions { // Bucket to scanner diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 39f9a523..328d3f8f 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -27,7 +27,66 @@ pub struct FileInfo { pub is_latest: bool, } +impl Default for FileInfo { + fn default() -> Self { + Self { + version_id: Uuid::nil(), + erasure: Default::default(), + deleted: Default::default(), + data_dir: Uuid::nil(), + mod_time: None, + size: Default::default(), + data: Default::default(), + fresh: Default::default(), + name: Default::default(), + volume: Default::default(), + parts: Default::default(), + is_latest: Default::default(), + } + } +} + impl FileInfo { + pub fn new(object: &str, data_blocks: usize, parity_blocks: usize) -> Self { + let indexs = { + let cardinality = data_blocks + parity_blocks; + let mut nums = vec![0; cardinality]; + let key_crc = crc32fast::hash(object.as_bytes()); + + let start = key_crc as usize % cardinality; + for i in 1..=cardinality { + nums[i - 1] = 1 + ((start + i) % cardinality); + } + + nums + }; + Self { + erasure: ErasureInfo { + algorithm: String::from(ERASURE_ALGORITHM), + data_blocks, + parity_blocks, + block_size: BLOCK_SIZE_V2, + distribution: indexs, + ..Default::default() + }, + ..Default::default() + } + } + + pub fn is_valid(&self) -> bool { + if self.deleted { + return true; + } + + let data_blocks = self.erasure.data_blocks; + let parity_blocks = self.erasure.parity_blocks; + + (data_blocks >= parity_blocks) + && (data_blocks > 0) + && (self.erasure.index > 0 + && self.erasure.index <= data_blocks + parity_blocks + && self.erasure.distribution.len() == (data_blocks + parity_blocks)) + } pub fn is_remote(&self) -> bool { // TODO: when lifecycle false @@ -113,68 +172,6 @@ impl FileInfo { } } -impl Default for FileInfo { - fn default() -> Self { - Self { - version_id: Uuid::nil(), - erasure: Default::default(), - deleted: Default::default(), - data_dir: Uuid::nil(), - mod_time: None, - size: Default::default(), - data: Default::default(), - fresh: Default::default(), - name: Default::default(), - volume: Default::default(), - parts: Default::default(), - is_latest: Default::default(), - } - } -} - -impl FileInfo { - pub fn new(object: &str, data_blocks: usize, parity_blocks: usize) -> Self { - let indexs = { - let cardinality = data_blocks + parity_blocks; - let mut nums = vec![0; cardinality]; - let key_crc = crc32fast::hash(object.as_bytes()); - - let start = key_crc as usize % cardinality; - for i in 1..=cardinality { - nums[i - 1] = 1 + ((start + i) % cardinality); - } - - nums - }; - Self { - erasure: ErasureInfo { - algorithm: String::from(ERASURE_ALGORITHM), - data_blocks, - parity_blocks, - block_size: BLOCK_SIZE_V2, - distribution: indexs, - ..Default::default() - }, - ..Default::default() - } - } - - pub fn is_valid(&self) -> bool { - if self.deleted { - return true; - } - - let data_blocks = self.erasure.data_blocks; - let parity_blocks = self.erasure.parity_blocks; - - (data_blocks >= parity_blocks) - && (data_blocks > 0) - && (self.erasure.index > 0 - && self.erasure.index <= data_blocks + parity_blocks - && self.erasure.distribution.len() == (data_blocks + parity_blocks)) - } -} - #[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)] pub struct ObjectPartInfo { // pub etag: Option, @@ -474,6 +471,7 @@ pub struct ListObjectsV2Info { #[derive(Debug, Default, Clone)] pub struct ObjectToDelete { pub object_name: String, + pub version_id: Option, } #[derive(Debug, Default, Clone)] pub struct DeletedObject {