diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index b9adb1f9..a21b1714 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -1,3 +1,4 @@ +use futures::future::join_all; use http::HeaderMap; use uuid::Uuid; @@ -110,6 +111,22 @@ impl Sets { // ) -> Vec> { // unimplemented!() // } + + async fn delete_prefix(&self, bucket: &str, object: &str) -> Result<()> { + let mut futures = Vec::new(); + let opt = ObjectOptions { + delete_prefix: true, + ..Default::default() + }; + + for set in self.disk_set.iter() { + futures.push(set.delete_object(bucket, object, opt.clone())); + } + + let _results = join_all(futures).await; + + Ok(()) + } } // #[derive(Debug)] @@ -134,7 +151,14 @@ impl StorageAPI for Sets { async fn get_bucket_info(&self, _bucket: &str, _opts: &BucketOptions) -> Result { unimplemented!() } + async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result { + if opts.delete_prefix { + self.delete_prefix(bucket, object).await?; + return Ok(ObjectInfo::default()); + } + self.get_disks_by_key(object).delete_object(bucket, object, opts).await + } async fn list_objects_v2( &self, _bucket: &str, diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index bbcc49fa..44a43223 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -16,6 +16,7 @@ use futures::future::join_all; use http::HeaderMap; use s3s::{dto::StreamingBlob, Body}; use std::collections::{HashMap, HashSet}; +use time::OffsetDateTime; use tracing::{debug, warn}; use uuid::Uuid; @@ -227,6 +228,77 @@ impl ECStore { Ok(()) } + async fn delete_prefix(&self, _bucket: &str, _object: &str) -> Result<()> { + unimplemented!() + } + + async fn get_pool_info_existing_with_opts( + &self, + bucket: &str, + object: &str, + opts: &ObjectOptions, + ) -> Result<(PoolObjInfo, Vec)> { + let mut futures = Vec::new(); + + for pool in self.pools.iter() { + futures.push(pool.get_object_info(bucket, object, opts)); + } + + let results = join_all(futures).await; + + let mut ress = Vec::new(); + + let mut i = 0; + + // join_all结果跟输入顺序一致 + for res in results { + let index = i; + + match res { + Ok(r) => { + ress.push(PoolObjInfo { + index, + object_info: r, + err: None, + }); + } + Err(e) => { + ress.push(PoolObjInfo { + index, + err: Some(e), + ..Default::default() + }); + } + } + i += 1; + } + + ress.sort_by(|a, b| { + let at = a.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH); + let bt = b.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH); + + at.cmp(&bt) + }); + + for res in ress { + // check + if res.err.is_none() { + // TODO: let errs = self.poolsWithObject() + return Ok((res, Vec::new())); + } + } + + let ret = PoolObjInfo::default(); + + Ok((ret, Vec::new())) + } +} + +#[derive(Debug, Default)] +pub struct PoolObjInfo { + pub index: usize, + pub object_info: ObjectInfo, + pub err: Option, } #[derive(Debug, Default)] @@ -302,6 +374,31 @@ impl StorageAPI for ECStore { Ok(info) } + async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result { + if opts.delete_prefix { + self.delete_prefix(bucket, &object).await?; + return Ok(ObjectInfo::default()); + } + + let object = utils::path::encode_dir_object(object); + let object = object.as_str(); + + // 查询在哪个pool + let (mut pinfo, errs) = self.get_pool_info_existing_with_opts(bucket, object, &opts).await?; + if pinfo.object_info.delete_marker && opts.version_id.is_empty() { + pinfo.object_info.name = utils::path::decode_dir_object(object); + return Ok(pinfo.object_info); + } + + if !errs.is_empty() { + // TODO: deleteObjectFromAllPools + } + + let mut obj = self.pools[pinfo.index].delete_object(bucket, object, opts.clone()).await?; + obj.name = utils::path::decode_dir_object(object); + + Ok(obj) + } async fn list_objects_v2( &self, bucket: &str, diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 3aa50f56..0228338f 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -86,7 +86,7 @@ impl FileInfo { parity_blocks: self.erasure.parity_blocks, data_blocks: self.erasure.data_blocks, version_id: self.version_id, - deleted: self.deleted, + delete_marker: self.deleted, mod_time: self.mod_time, size: self.size, parts: self.parts.clone(), @@ -358,12 +358,15 @@ impl HTTPRangeSpec { } } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct ObjectOptions { // Use the maximum parity (N/2), used when saving server configuration files pub max_parity: bool, pub mod_time: Option, pub part_number: usize, + + pub delete_prefix: bool, + pub version_id: String, } // impl Default for ObjectOptions { @@ -413,13 +416,13 @@ impl From for CompletePart { pub struct ObjectInfo { pub bucket: String, pub name: String, + pub mod_time: Option, + pub size: usize, pub is_dir: bool, pub parity_blocks: usize, pub data_blocks: usize, pub version_id: Uuid, - pub deleted: bool, - pub mod_time: Option, - pub size: usize, + pub delete_marker: bool, pub parts: Vec, pub is_latest: bool, } @@ -474,7 +477,7 @@ pub trait StorageAPI { async fn delete_bucket(&self, bucket: &str) -> Result<()>; async fn list_bucket(&self, opts: &BucketOptions) -> Result>; async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result; - + async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result; async fn list_objects_v2( &self, bucket: &str,