diff --git a/Cargo.lock b/Cargo.lock index dd42ff16..8773cca8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1168,6 +1168,7 @@ dependencies = [ "tracing-error", "tracing-subscriber", "transform-stream", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9420e9b3..92b3b1c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = ["rustfs", "ecstore"] [workspace.package] edition = "2021" -license = "MIT OR Apache-2.0" +license = "Apache-2.0" repository = "https://github.com/rustfs/rustfs" rust-version = "1.75" diff --git a/TODO.md b/TODO.md index 2a9e5ba5..e405b8d3 100644 --- a/TODO.md +++ b/TODO.md @@ -29,7 +29,7 @@ - [x] 提交完成 CompleteMultipartUpload - [x] 取消上传 AbortMultipartUpload - [x] 下载 GetObject - - [ ] 删除 DeleteObjects + - [x] 删除 DeleteObjects - [ ] 版本控制 - [ ] 对象锁 - [ ] 复制 CopyObject diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 9de369ac..3e8524aa 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -1,7 +1,7 @@ use super::{endpoint::Endpoint, error::DiskError, format::FormatV3}; use super::{ - DeleteOptions, DiskAPI, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, ReadOptions, - RenameDataResp, VolumeInfo, WalkDirOptions, + DeleteOptions, DiskAPI, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq, ReadMultipleResp, + ReadOptions, RenameDataResp, VolumeInfo, WalkDirOptions, }; use crate::disk::STORAGE_FORMAT_FILE; use crate::{ @@ -138,8 +138,34 @@ impl LocalDisk { Ok(()) } + pub async fn move_to_trash(&self, delete_path: &PathBuf, _recursive: bool, _immediate_purge: bool) -> Result<()> { + let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?; + // TODO: 清空回收站 + if let Err(err) = fs::rename(&delete_path, &trash_path).await { + match err.kind() { + ErrorKind::NotFound => (), + _ => { + warn!("delete_file rename {:?} err {:?}", &delete_path, &err); + return Err(Error::from(err)); + } + } + } + + // FIXME: 先清空回收站吧,有时间再添加判断逻辑 + let _ = fs::remove_dir_all(&trash_path).await; + + // TODO: immediate + Ok(()) + } + // #[tracing::instrument(skip(self))] - pub async fn delete_file(&self, base_path: &PathBuf, delete_path: &PathBuf, recursive: bool, _immediate: bool) -> Result<()> { + pub async fn delete_file( + &self, + base_path: &PathBuf, + delete_path: &PathBuf, + recursive: bool, + immediate_purge: bool, + ) -> Result<()> { debug!("delete_file {:?}\n base_path:{:?}", &delete_path, &base_path); if is_root_path(base_path) || is_root_path(delete_path) { @@ -153,29 +179,7 @@ impl LocalDisk { } if recursive { - let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?; - - if let Some(dir_path) = trash_path.parent() { - fs::create_dir_all(dir_path).await?; - } - - debug!("delete_file ranme to trash {:?} to {:?}", &delete_path, &trash_path); - - // TODO: 清空回收站 - if let Err(err) = fs::rename(&delete_path, &trash_path).await { - match err.kind() { - ErrorKind::NotFound => (), - _ => { - warn!("delete_file rename {:?} err {:?}", &delete_path, &err); - return Err(Error::from(err)); - } - } - } - - // FIXME: 先清空回收站吧,有时间再添加判断逻辑 - let _ = fs::remove_dir_all(&trash_path).await; - - // TODO: immediate + self.move_to_trash(delete_path, recursive, immediate_purge).await?; } else { if delete_path.is_dir() { if let Err(err) = fs::remove_dir(&delete_path).await { @@ -253,6 +257,52 @@ impl LocalDisk { Ok((data, modtime)) } + + async fn delete_versions_internal(&self, volume: &str, path: &str, fis: &Vec) -> Result<()> { + let volume_dir = self.get_bucket_path(volume)?; + let xlpath = self.get_object_path(volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str())?; + + let (data, _) = match self.read_all_data(volume, volume_dir.as_path(), &xlpath).await { + Ok(res) => res, + Err(_err) => { + // TODO: check if not found return err + + (Vec::new(), OffsetDateTime::UNIX_EPOCH) + } + }; + + if data.is_empty() { + return Err(Error::new(DiskError::FileNotFound)); + } + + let mut fm = FileMeta::default(); + + fm.unmarshal_msg(&data)?; + + for fi in fis { + let data_dir = fm.delete_version(fi)?; + + if data_dir.is_some() { + let dir_path = self.get_object_path(volume, format!("{}/{}", path, data_dir.unwrap().to_string()).as_str())?; + + self.move_to_trash(&dir_path, true, false).await?; + } + } + + // 没有版本了,删除xl.meta + if fm.versions.is_empty() { + self.delete_file(&volume_dir, &xlpath, true, false).await?; + return Ok(()); + } + + // 更新xl.meta + let buf = fm.marshal_msg()?; + + self.write_all(volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str(), buf) + .await?; + + Ok(()) + } } fn is_root_path(path: impl AsRef) -> bool { @@ -600,7 +650,7 @@ impl DiskAPI for LocalDisk { let (src_data_path, dst_data_path) = { let mut data_dir = String::new(); if !fi.is_remote() { - data_dir = utils::path::retain_slash(fi.data_dir.to_string().as_str()); + data_dir = utils::path::retain_slash(fi.data_dir.unwrap_or(Uuid::nil()).to_string().as_str()); } if !data_dir.is_empty() { @@ -639,10 +689,9 @@ impl DiskAPI for LocalDisk { let old_data_dir = meta .find_version(fi.version_id) .map(|(_, version)| { - version.get_data_dir().filter(|data_dir| { - warn!("get data dir {}", &data_dir); - meta.shard_data_dir_count(&fi.version_id, data_dir) == 0 - }) + version + .get_data_dir() + .filter(|data_dir| meta.shard_data_dir_count(&fi.version_id, &Some(data_dir.clone())) == 0) }) .unwrap_or_default(); @@ -815,6 +864,27 @@ impl DiskAPI for LocalDisk { Ok(RawFileInfo { buf }) } + async fn delete_versions( + &self, + volume: &str, + versions: Vec, + _opts: DeleteOptions, + ) -> Result>> { + let mut errs = Vec::with_capacity(versions.len()); + for _ in 0..versions.len() { + errs.push(None); + } + + for (i, ver) in versions.iter().enumerate() { + if let Err(e) = self.delete_versions_internal(volume, ver.name.as_str(), &ver.versions).await { + errs[i] = Some(e); + } else { + errs[i] = None; + } + } + + Ok(errs) + } async fn read_multiple(&self, req: ReadMultipleReq) -> Result> { let mut results = Vec::new(); let mut found = 0; diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index b694f99b..71701467 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -13,7 +13,7 @@ const STORAGE_FORMAT_FILE: &str = "xl.meta"; use crate::{ erasure::ReadAt, - error::Result, + error::{Error, Result}, file_meta::FileMeta, store_api::{FileInfo, RawFileInfo}, }; @@ -79,9 +79,31 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { opts: &ReadOptions, ) -> Result; async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result; + async fn delete_versions( + &self, + volume: &str, + versions: Vec, + opts: DeleteOptions, + ) -> Result>>; async fn read_multiple(&self, req: ReadMultipleReq) -> Result>; } +#[derive(Debug, Default, Clone)] +pub struct FileInfoVersions { + // Name of the volume. + pub volume: String, + + // Name of the file. + pub name: String, + + // Represents the latest mod time of the + // latest version. + pub latest_mod_time: Option, + + pub versions: Vec, + pub free_versions: Vec, +} + #[derive(Debug, Default, Clone)] pub struct WalkDirOptions { // Bucket to scanner diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index b9adb1f9..63815089 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -1,3 +1,6 @@ +use std::collections::HashMap; + +use futures::future::join_all; use http::HeaderMap; use uuid::Uuid; @@ -7,11 +10,11 @@ use crate::{ DiskStore, }, endpoints::PoolEndpoints, - error::Result, + error::{Error, Result}, set_disk::SetDisks, store_api::{ - BucketInfo, BucketOptions, CompletePart, GetObjectReader, HTTPRangeSpec, ListObjectsV2Info, MakeBucketOptions, - MultipartUploadResult, ObjectInfo, ObjectOptions, PartInfo, PutObjReader, StorageAPI, + BucketInfo, BucketOptions, CompletePart, DeletedObject, GetObjectReader, HTTPRangeSpec, ListObjectsV2Info, + MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI, }, utils::hash, }; @@ -110,6 +113,22 @@ impl Sets { // ) -> Vec> { // unimplemented!() // } + + async fn delete_prefix(&self, bucket: &str, object: &str) -> Result<()> { + let mut futures = Vec::new(); + let opt = ObjectOptions { + delete_prefix: true, + ..Default::default() + }; + + for set in self.disk_set.iter() { + futures.push(set.delete_object(bucket, object, opt.clone())); + } + + let _results = join_all(futures).await; + + Ok(()) + } } // #[derive(Debug)] @@ -122,6 +141,12 @@ impl Sets { // pub default_parity_count: usize, // } +struct DelObj { + // set_idx: usize, + orig_idx: usize, + obj: ObjectToDelete, +} + #[async_trait::async_trait] impl StorageAPI for Sets { async fn list_bucket(&self, _opts: &BucketOptions) -> Result> { @@ -134,7 +159,77 @@ impl StorageAPI for Sets { async fn get_bucket_info(&self, _bucket: &str, _opts: &BucketOptions) -> Result { unimplemented!() } + async fn delete_objects( + &self, + bucket: &str, + objects: Vec, + opts: ObjectOptions, + ) -> Result<(Vec, Vec>)> { + // 默认返回值 + let mut del_objects = vec![DeletedObject::default(); objects.len()]; + let mut del_errs = Vec::with_capacity(objects.len()); + for _ in 0..objects.len() { + del_errs.push(None) + } + + let mut set_obj_map = HashMap::new(); + + // hash key + let mut i = 0; + for obj in objects.iter() { + let idx = self.get_hashed_set_index(obj.object_name.as_str()); + + if !set_obj_map.contains_key(&idx) { + set_obj_map.insert( + idx, + vec![DelObj { + // set_idx: idx, + orig_idx: i, + obj: obj.clone(), + }], + ); + } else { + if let Some(val) = set_obj_map.get_mut(&idx) { + val.push(DelObj { + // set_idx: idx, + orig_idx: i, + obj: obj.clone(), + }); + } + } + + i += 1; + } + + // TODO: 并发 + for (k, v) in set_obj_map { + let disks = self.get_disks(k); + let objs: Vec = v.iter().map(|v| v.obj.clone()).collect(); + let (dobjects, errs) = disks.delete_objects(bucket, objs, opts.clone()).await?; + + let mut i = 0; + for err in errs { + let obj = v.get(i).unwrap(); + + del_errs[obj.orig_idx] = err; + + del_objects[obj.orig_idx] = dobjects.get(i).unwrap().clone(); + + i += 1; + } + } + + Ok((del_objects, del_errs)) + } + async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result { + if opts.delete_prefix { + self.delete_prefix(bucket, object).await?; + return Ok(ObjectInfo::default()); + } + + self.get_disks_by_key(object).delete_object(bucket, object, opts).await + } async fn list_objects_v2( &self, _bucket: &str, diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index bbcc49fa..8fdcc2fb 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -7,8 +7,9 @@ use crate::{ peer::{PeerS3Client, S3PeerSys}, sets::Sets, store_api::{ - BucketInfo, BucketOptions, CompletePart, GetObjectReader, HTTPRangeSpec, ListObjectsInfo, ListObjectsV2Info, - MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, PartInfo, PutObjReader, StorageAPI, + BucketInfo, BucketOptions, CompletePart, DeletedObject, GetObjectReader, HTTPRangeSpec, ListObjectsInfo, + ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, + PutObjReader, StorageAPI, }, store_init, utils, }; @@ -16,6 +17,7 @@ use futures::future::join_all; use http::HeaderMap; use s3s::{dto::StreamingBlob, Body}; use std::collections::{HashMap, HashSet}; +use time::OffsetDateTime; use tracing::{debug, warn}; use uuid::Uuid; @@ -227,6 +229,77 @@ impl ECStore { Ok(()) } + async fn delete_prefix(&self, _bucket: &str, _object: &str) -> Result<()> { + unimplemented!() + } + + async fn get_pool_info_existing_with_opts( + &self, + bucket: &str, + object: &str, + opts: &ObjectOptions, + ) -> Result<(PoolObjInfo, Vec)> { + let mut futures = Vec::new(); + + for pool in self.pools.iter() { + futures.push(pool.get_object_info(bucket, object, opts)); + } + + let results = join_all(futures).await; + + let mut ress = Vec::new(); + + let mut i = 0; + + // join_all结果跟输入顺序一致 + for res in results { + let index = i; + + match res { + Ok(r) => { + ress.push(PoolObjInfo { + index, + object_info: r, + err: None, + }); + } + Err(e) => { + ress.push(PoolObjInfo { + index, + err: Some(e), + ..Default::default() + }); + } + } + i += 1; + } + + ress.sort_by(|a, b| { + let at = a.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH); + let bt = b.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH); + + at.cmp(&bt) + }); + + for res in ress { + // check + if res.err.is_none() { + // TODO: let errs = self.poolsWithObject() + return Ok((res, Vec::new())); + } + } + + let ret = PoolObjInfo::default(); + + Ok((ret, Vec::new())) + } +} + +#[derive(Debug, Default)] +pub struct PoolObjInfo { + pub index: usize, + pub object_info: ObjectInfo, + pub err: Option, } #[derive(Debug, Default)] @@ -301,7 +374,149 @@ impl StorageAPI for ECStore { Ok(info) } + async fn delete_objects( + &self, + bucket: &str, + objects: Vec, + opts: ObjectOptions, + ) -> Result<(Vec, Vec>)> { + // encode object name + let objects: Vec = objects + .iter() + .map(|v| { + let mut v = v.clone(); + v.object_name = utils::path::encode_dir_object(v.object_name.as_str()); + v + }) + .collect(); + // 默认返回值 + let mut del_objects = vec![DeletedObject::default(); objects.len()]; + + let mut del_errs = Vec::with_capacity(objects.len()); + for _ in 0..objects.len() { + del_errs.push(None) + } + + // TODO: limte 限制并发数量 + let opt = ObjectOptions::default(); + // 取所有poolObjInfo + let mut futures = Vec::new(); + for obj in objects.iter() { + futures.push(self.get_pool_info_existing_with_opts(bucket, &obj.object_name, &opt)); + } + + let results = join_all(futures).await; + + // 记录pool Index 对应的objects pool_idx -> objects idx + let mut pool_index_objects = HashMap::new(); + + let mut i = 0; + for res in results { + match res { + Ok((pinfo, _)) => { + if pinfo.object_info.delete_marker && opts.version_id.is_empty() { + del_objects[i] = DeletedObject { + delete_marker: pinfo.object_info.delete_marker, + delete_marker_version_id: pinfo.object_info.version_id.map(|v| v.to_string()), + object_name: utils::path::decode_dir_object(&pinfo.object_info.name), + delete_marker_mtime: pinfo.object_info.mod_time, + ..Default::default() + }; + } + + if !pool_index_objects.contains_key(&pinfo.index) { + pool_index_objects.insert(pinfo.index, vec![i]); + } else { + // let mut vals = pool_index_objects. + if let Some(val) = pool_index_objects.get_mut(&pinfo.index) { + val.push(i); + } + } + } + Err(e) => { + //TODO: check not found + + del_errs[i] = Some(e) + } + } + + i += 1; + } + + if !pool_index_objects.is_empty() { + for sets in self.pools.iter() { + // 取pool idx 对应的 objects index + let vals = pool_index_objects.get(&sets.pool_idx); + if vals.is_none() { + continue; + } + + let obj_idxs = vals.unwrap(); + // 取对应obj,理论上不会none + let objs: Vec = obj_idxs + .iter() + .filter_map(|&idx| { + if let Some(obj) = objects.get(idx) { + Some(obj.clone()) + } else { + None + } + }) + .collect(); + + if objs.is_empty() { + continue; + } + + let (pdel_objs, perrs) = sets.delete_objects(bucket, objs, opts.clone()).await?; + + // perrs的顺序理论上跟obj_idxs顺序一致 + let mut i = 0; + for err in perrs { + let obj_idx = obj_idxs[i]; + + if err.is_some() { + del_errs[obj_idx] = err; + } + + let mut dobj = pdel_objs.get(i).unwrap().clone(); + dobj.object_name = utils::path::decode_dir_object(&dobj.object_name); + + del_objects[obj_idx] = dobj; + + i += 1; + } + } + } + + Ok((del_objects, del_errs)) + } + async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result { + if opts.delete_prefix { + self.delete_prefix(bucket, &object).await?; + return Ok(ObjectInfo::default()); + } + + let object = utils::path::encode_dir_object(object); + let object = object.as_str(); + + // 查询在哪个pool + let (mut pinfo, errs) = self.get_pool_info_existing_with_opts(bucket, object, &opts).await?; + if pinfo.object_info.delete_marker && opts.version_id.is_empty() { + pinfo.object_info.name = utils::path::decode_dir_object(object); + return Ok(pinfo.object_info); + } + + if !errs.is_empty() { + // TODO: deleteObjectFromAllPools + } + + let mut obj = self.pools[pinfo.index].delete_object(bucket, object, opts.clone()).await?; + obj.name = utils::path::decode_dir_object(object); + + Ok(obj) + } async fn list_objects_v2( &self, bucket: &str, diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 3aa50f56..a330f3e3 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -10,15 +10,15 @@ pub const ERASURE_ALGORITHM: &str = "rs-vandermonde"; pub const BLOCK_SIZE_V2: usize = 1048576; // 1M // #[derive(Debug, Clone)] -#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)] pub struct FileInfo { pub name: String, pub volume: String, - pub version_id: Uuid, + pub version_id: Option, pub erasure: ErasureInfo, pub deleted: bool, // DataDir of the file - pub data_dir: Uuid, + pub data_dir: Option, pub mod_time: Option, pub size: usize, pub data: Option>, @@ -27,7 +27,66 @@ pub struct FileInfo { pub is_latest: bool, } +// impl Default for FileInfo { +// fn default() -> Self { +// Self { +// version_id: Default::default(), +// erasure: Default::default(), +// deleted: Default::default(), +// data_dir: Default::default(), +// mod_time: None, +// size: Default::default(), +// data: Default::default(), +// fresh: Default::default(), +// name: Default::default(), +// volume: Default::default(), +// parts: Default::default(), +// is_latest: Default::default(), +// } +// } +// } + impl FileInfo { + pub fn new(object: &str, data_blocks: usize, parity_blocks: usize) -> Self { + let indexs = { + let cardinality = data_blocks + parity_blocks; + let mut nums = vec![0; cardinality]; + let key_crc = crc32fast::hash(object.as_bytes()); + + let start = key_crc as usize % cardinality; + for i in 1..=cardinality { + nums[i - 1] = 1 + ((start + i) % cardinality); + } + + nums + }; + Self { + erasure: ErasureInfo { + algorithm: String::from(ERASURE_ALGORITHM), + data_blocks, + parity_blocks, + block_size: BLOCK_SIZE_V2, + distribution: indexs, + ..Default::default() + }, + ..Default::default() + } + } + + pub fn is_valid(&self) -> bool { + if self.deleted { + return true; + } + + let data_blocks = self.erasure.data_blocks; + let parity_blocks = self.erasure.parity_blocks; + + (data_blocks >= parity_blocks) + && (data_blocks > 0) + && (self.erasure.index > 0 + && self.erasure.index <= data_blocks + parity_blocks + && self.erasure.distribution.len() == (data_blocks + parity_blocks)) + } pub fn is_remote(&self) -> bool { // TODO: when lifecycle false @@ -86,7 +145,7 @@ impl FileInfo { parity_blocks: self.erasure.parity_blocks, data_blocks: self.erasure.data_blocks, version_id: self.version_id, - deleted: self.deleted, + delete_marker: self.deleted, mod_time: self.mod_time, size: self.size, parts: self.parts.clone(), @@ -113,68 +172,6 @@ impl FileInfo { } } -impl Default for FileInfo { - fn default() -> Self { - Self { - version_id: Uuid::nil(), - erasure: Default::default(), - deleted: Default::default(), - data_dir: Uuid::nil(), - mod_time: None, - size: Default::default(), - data: Default::default(), - fresh: Default::default(), - name: Default::default(), - volume: Default::default(), - parts: Default::default(), - is_latest: Default::default(), - } - } -} - -impl FileInfo { - pub fn new(object: &str, data_blocks: usize, parity_blocks: usize) -> Self { - let indexs = { - let cardinality = data_blocks + parity_blocks; - let mut nums = vec![0; cardinality]; - let key_crc = crc32fast::hash(object.as_bytes()); - - let start = key_crc as usize % cardinality; - for i in 1..=cardinality { - nums[i - 1] = 1 + ((start + i) % cardinality); - } - - nums - }; - Self { - erasure: ErasureInfo { - algorithm: String::from(ERASURE_ALGORITHM), - data_blocks, - parity_blocks, - block_size: BLOCK_SIZE_V2, - distribution: indexs, - ..Default::default() - }, - ..Default::default() - } - } - - pub fn is_valid(&self) -> bool { - if self.deleted { - return true; - } - - let data_blocks = self.erasure.data_blocks; - let parity_blocks = self.erasure.parity_blocks; - - (data_blocks >= parity_blocks) - && (data_blocks > 0) - && (self.erasure.index > 0 - && self.erasure.index <= data_blocks + parity_blocks - && self.erasure.distribution.len() == (data_blocks + parity_blocks)) - } -} - #[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)] pub struct ObjectPartInfo { // pub etag: Option, @@ -358,12 +355,15 @@ impl HTTPRangeSpec { } } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct ObjectOptions { // Use the maximum parity (N/2), used when saving server configuration files pub max_parity: bool, pub mod_time: Option, pub part_number: usize, + + pub delete_prefix: bool, + pub version_id: String, } // impl Default for ObjectOptions { @@ -413,13 +413,13 @@ impl From for CompletePart { pub struct ObjectInfo { pub bucket: String, pub name: String, + pub mod_time: Option, + pub size: usize, pub is_dir: bool, pub parity_blocks: usize, pub data_blocks: usize, - pub version_id: Uuid, - pub deleted: bool, - pub mod_time: Option, - pub size: usize, + pub version_id: Option, + pub delete_marker: bool, pub parts: Vec, pub is_latest: bool, } @@ -468,13 +468,36 @@ pub struct ListObjectsV2Info { pub prefixes: Vec, } +#[derive(Debug, Default, Clone)] +pub struct ObjectToDelete { + pub object_name: String, + pub version_id: Option, +} +#[derive(Debug, Default, Clone)] +pub struct DeletedObject { + pub delete_marker: bool, + pub delete_marker_version_id: Option, + pub object_name: String, + pub version_id: Option, + // MTime of DeleteMarker on source that needs to be propagated to replica + pub delete_marker_mtime: Option, + // to support delete marker replication + // pub replication_state: ReplicationState, +} + #[async_trait::async_trait] pub trait StorageAPI { async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; async fn delete_bucket(&self, bucket: &str) -> Result<()>; async fn list_bucket(&self, opts: &BucketOptions) -> Result>; async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result; - + async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result; + async fn delete_objects( + &self, + bucket: &str, + objects: Vec, + opts: ObjectOptions, + ) -> Result<(Vec, Vec>)>; async fn list_objects_v2( &self, bucket: &str, diff --git a/rustfs-inner.zip b/rustfs-inner.zip new file mode 100644 index 00000000..f5aad0ed Binary files /dev/null and b/rustfs-inner.zip differ diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 5e5414f0..f08b977e 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -23,7 +23,7 @@ http.workspace = true bytes.workspace = true futures.workspace = true futures-util.workspace = true - +uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] } ecstore = { path = "../ecstore" } s3s = "0.10.0" clap = { version = "4.5.7", features = ["derive"] } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index d43b4ab5..54aeccac 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -31,10 +31,13 @@ fn setup_tracing() { } fn main() -> Result<()> { + //解析获得到的参数 let opt = config::Opt::parse(); + //设置trace setup_tracing(); + //运行参数 run(opt) } @@ -42,7 +45,9 @@ fn main() -> Result<()> { async fn run(opt: config::Opt) -> Result<()> { debug!("opt: {:?}", &opt); + //监听地址,端口从参数中获取 let listener = TcpListener::bind(opt.address.clone()).await?; + //获取监听地址 let local_addr: SocketAddr = listener.local_addr()?; // let mut domain_name = { @@ -62,9 +67,11 @@ async fn run(opt: config::Opt) -> Result<()> { // }; // Setup S3 service + // 本项目使用s3s库来实现s3服务 let service = { let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(opt.address.clone(), opt.volumes.clone()).await?); - + //设置AK和SK + //其中部份内容从config配置文件中读取 let mut access_key = String::from_str(config::DEFAULT_ACCESS_KEY).unwrap(); let mut secret_key = String::from_str(config::DEFAULT_SECRET_KEY).unwrap(); @@ -73,7 +80,7 @@ async fn run(opt: config::Opt) -> Result<()> { access_key = ak; secret_key = sk; } - + //显示info信息 info!("authentication is enabled {}, {}", &access_key, &secret_key); b.set_auth(SimpleAuth::from_single(access_key, secret_key)); diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 8e8f53db..0117cd83 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -6,6 +6,7 @@ use ecstore::store_api::HTTPRangeSpec; use ecstore::store_api::MakeBucketOptions; use ecstore::store_api::MultipartUploadResult; use ecstore::store_api::ObjectOptions; +use ecstore::store_api::ObjectToDelete; use ecstore::store_api::PutObjReader; use ecstore::store_api::StorageAPI; use futures::pin_mut; @@ -21,6 +22,7 @@ use s3s::{S3Request, S3Response}; use std::fmt::Debug; use std::str::FromStr; use transform_stream::AsyncTryStream; +use uuid::Uuid; use ecstore::error::Result; use ecstore::store::ECStore; @@ -91,17 +93,111 @@ impl S3 for FS { #[tracing::instrument(level = "debug", skip(self, req))] async fn delete_object(&self, req: S3Request) -> S3Result> { - let _input = req.input; + let DeleteObjectInput { + bucket, key, version_id, .. + } = req.input; - let output = DeleteObjectOutput::default(); + let version_id = version_id + .as_ref() + .map(|v| match Uuid::parse_str(v) { + Ok(id) => Some(id), + Err(_) => None, + }) + .unwrap_or_default(); + let dobj = ObjectToDelete { + object_name: key, + version_id, + }; + + let objects: Vec = vec![dobj]; + + let (dobjs, _errs) = try_!(self.store.delete_objects(&bucket, objects, ObjectOptions::default()).await); + + // TODO: let errors; + + let (delete_marker, version_id) = { + if let Some((a, b)) = dobjs + .iter() + .map(|v| { + let delete_marker = { + if v.delete_marker { + Some(true) + } else { + None + } + }; + + let version_id = v.version_id.clone(); + + (delete_marker, version_id) + }) + .next() + { + (a, b) + } else { + (None, None) + } + }; + + let output = DeleteObjectOutput { + delete_marker, + version_id, + ..Default::default() + }; Ok(S3Response::new(output)) } #[tracing::instrument(level = "debug", skip(self, req))] async fn delete_objects(&self, req: S3Request) -> S3Result> { - let _input = req.input; + // info!("delete_objects args {:?}", req.input); - let output = DeleteObjectsOutput { ..Default::default() }; + let DeleteObjectsInput { bucket, delete, .. } = req.input; + + let objects: Vec = delete + .objects + .iter() + .map(|v| { + let version_id = v + .version_id + .as_ref() + .map(|v| match Uuid::parse_str(v) { + Ok(id) => Some(id), + Err(_) => None, + }) + .unwrap_or_default(); + ObjectToDelete { + object_name: v.key.clone(), + version_id: version_id, + } + }) + .collect(); + + let (dobjs, _errs) = try_!(self.store.delete_objects(&bucket, objects, ObjectOptions::default()).await); + // info!("delete_objects res {:?} {:?}", &dobjs, errs); + + let deleted = dobjs + .iter() + .map(|v| DeletedObject { + delete_marker: { + if v.delete_marker { + Some(true) + } else { + None + } + }, + delete_marker_version_id: v.delete_marker_version_id.clone(), + key: Some(v.object_name.clone()), + version_id: v.version_id.clone(), + }) + .collect(); + + // TODO: let errors; + + let output = DeleteObjectsOutput { + deleted: Some(deleted), + // errors, + ..Default::default() + }; Ok(S3Response::new(output)) }