From 07c11b2097d85b2e52de4dc6427e736465b7fecb Mon Sep 17 00:00:00 2001 From: overtrue Date: Tue, 30 Jul 2024 16:54:11 +0800 Subject: [PATCH] feat: delete bucket --- .gitignore | 3 +- ecstore/src/disk.rs | 36 +++++++++++++++ ecstore/src/disk_api.rs | 1 + ecstore/src/peer.rs | 91 ++++++++++++++++++++++++++++++-------- ecstore/src/sets.rs | 4 ++ ecstore/src/store.rs | 7 +++ ecstore/src/store_api.rs | 2 + rustfs/src/storage/ecfs.rs | 4 +- 8 files changed, 128 insertions(+), 20 deletions(-) diff --git a/.gitignore b/.gitignore index 212de442..2f791d6e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target -.DS_Store \ No newline at end of file +.DS_Store +.idea diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index 7a933aed..c793d899 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -828,6 +828,14 @@ impl DiskAPI for LocalDisk { Ok(results) } + + async fn delete_volume(&self, volume: &str) -> Result<()> { + let p = self.get_bucket_path(&volume)?; + + fs::remove_dir_all(&p).await?; + + Ok(()) + } } #[cfg(test)] @@ -880,4 +888,32 @@ mod test { fs::remove_dir_all(&p).await.unwrap(); } + + #[tokio::test] + async fn test_delete_volume() { + let p = "./testv"; + fs::create_dir_all(&p).await.unwrap(); + + let ep = match Endpoint::try_from(p) { + Ok(e) => e, + Err(e) => { + println!("{e}"); + return; + } + }; + + let disk = LocalDisk::new(&ep, false).await.unwrap(); + + let tmpp = disk.resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET)).unwrap(); + + println!("ppp :{:?}", &tmpp); + + let volumes = vec!["a", "b", "c"]; + + disk.make_volumes(volumes.clone()).await.unwrap(); + + disk.delete_volume("a").await.unwrap(); + + fs::remove_dir_all(&p).await.unwrap(); + } } diff --git a/ecstore/src/disk_api.rs b/ecstore/src/disk_api.rs index f9bdf702..01822872 100644 --- a/ecstore/src/disk_api.rs +++ b/ecstore/src/disk_api.rs @@ -40,6 +40,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { ) -> Result; async fn make_volumes(&self, volume: Vec<&str>) -> Result<()>; + async fn delete_volume(&self, volume: &str) -> Result<()>; async fn list_volumes(&self) -> Result>; async fn make_volume(&self, volume: &str) -> Result<()>; async fn stat_volume(&self, volume: &str) -> Result; diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index 5638436d..aed57746 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -17,6 +17,7 @@ type Client = Arc>; pub trait PeerS3Client: Debug + Sync + Send + 'static { async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; async fn list_bucket(&self, opts: &BucketOptions) -> Result>; + async fn delete_bucket(&self, bucket: &str) -> Result<()>; async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result; fn get_pools(&self) -> Vec; } @@ -57,8 +58,42 @@ impl S3PeerSys { #[async_trait] impl PeerS3Client for S3PeerSys { - fn get_pools(&self) -> Vec { - unimplemented!() + async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { + let mut futures = Vec::with_capacity(self.clients.len()); + for cli in self.clients.iter() { + futures.push(cli.make_bucket(bucket, opts)); + } + + let mut errors = Vec::with_capacity(self.clients.len()); + + let results = join_all(futures).await; + for result in results { + match result { + Ok(_) => { + errors.push(None); + } + Err(e) => { + errors.push(Some(e)); + } + } + } + + for i in 0..self.pools_count { + let mut per_pool_errs = Vec::with_capacity(self.clients.len()); + for (j, cli) in self.clients.iter().enumerate() { + let pools = cli.get_pools(); + let idx = i; + if pools.contains(&idx) { + per_pool_errs.push(errors[j].as_ref()); + } + + // TODO: reduceWriteQuorumErrs + } + } + + // TODO: + + Ok(()) } async fn list_bucket(&self, opts: &BucketOptions) -> Result> { let mut futures = Vec::with_capacity(self.clients.len()); @@ -105,15 +140,16 @@ impl PeerS3Client for S3PeerSys { Ok(buckets) } - async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { + async fn delete_bucket(&self, bucket: &str) -> Result<()> { let mut futures = Vec::with_capacity(self.clients.len()); for cli in self.clients.iter() { - futures.push(cli.make_bucket(bucket, opts)); + futures.push(cli.delete_bucket(bucket)); } let mut errors = Vec::with_capacity(self.clients.len()); let results = join_all(futures).await; + for result in results { match result { Ok(_) => { @@ -125,20 +161,7 @@ impl PeerS3Client for S3PeerSys { } } - for i in 0..self.pools_count { - let mut per_pool_errs = Vec::with_capacity(self.clients.len()); - for (j, cli) in self.clients.iter().enumerate() { - let pools = cli.get_pools(); - let idx = i; - if pools.contains(&idx) { - per_pool_errs.push(errors[j].as_ref()); - } - - // TODO: reduceWriteQuorumErrs - } - } - - // TODO: + // TODO: reduceWriteQuorumErrs Ok(()) } @@ -182,6 +205,10 @@ impl PeerS3Client for S3PeerSys { .find_map(|op| op.as_ref().map(|v| v.clone())) .ok_or(Error::new(DiskError::VolumeNotFound)) } + + fn get_pools(&self) -> Vec { + unimplemented!() + } } #[derive(Debug)] @@ -282,6 +309,7 @@ impl PeerS3Client for LocalPeerS3Client { Ok(()) } + async fn get_bucket_info(&self, bucket: &str, _opts: &BucketOptions) -> Result { let mut futures = Vec::with_capacity(self.local_disks.len()); for disk in self.local_disks.iter() { @@ -319,6 +347,29 @@ impl PeerS3Client for LocalPeerS3Client { }) .ok_or(Error::new(DiskError::VolumeNotFound)) } + + async fn delete_bucket(&self, bucket: &str) -> Result<()> { + let mut futures = Vec::with_capacity(self.local_disks.len()); + + for disk in self.local_disks.iter() { + futures.push(disk.delete_volume(bucket)); + } + + let results = join_all(futures).await; + + let mut errs = Vec::new(); + + for res in results { + match res { + Ok(_) => errs.push(None), + Err(e) => errs.push(Some(e)), + } + } + + // TODO: reduceWriteQuorumErrs + + Ok(()) + } } #[derive(Debug)] @@ -348,4 +399,8 @@ impl PeerS3Client for RemotePeerS3Client { async fn get_bucket_info(&self, _bucket: &str, _opts: &BucketOptions) -> Result { unimplemented!() } + + async fn delete_bucket(&self, bucket: &str) -> Result<()> { + unimplemented!() + } } diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 79a69950..9fbd7e40 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -186,4 +186,8 @@ impl StorageAPI for Sets { .complete_multipart_upload(bucket, object, upload_id, uploaded_parts, opts) .await } + + async fn delete_bucket(&self, bucket: &str) -> Result<()> { + unimplemented!() + } } diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 62776705..5011fc1b 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -4,6 +4,7 @@ use anyhow::{Error, Result}; use http::HeaderMap; use s3s::{dto::StreamingBlob, Body}; +use tracing::debug; use uuid::Uuid; use crate::{ @@ -239,4 +240,10 @@ impl StorageAPI for ECStore { } unimplemented!() } + + async fn delete_bucket(&self, bucket: &str) -> Result<()> { + self.peer_sys.delete_bucket(bucket).await?; + + Ok(()) + } } diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 89cb283c..3bfda728 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -249,6 +249,7 @@ pub struct MakeBucketOptions { pub force_create: bool, } +#[derive(Debug)] pub struct PutObjReader { pub stream: StreamingBlob, pub content_length: usize, @@ -426,6 +427,7 @@ pub struct ObjectInfo { #[async_trait::async_trait] pub trait StorageAPI { async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; + async fn delete_bucket(&self, bucket: &str) -> Result<()>; async fn list_bucket(&self, opts: &BucketOptions) -> Result>; async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result; async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result; diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 48a46f10..4680a330 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -83,7 +83,9 @@ impl S3 for FS { #[tracing::instrument(level = "debug", skip(self, req))] async fn delete_bucket(&self, req: S3Request) -> S3Result> { - let _input = req.input; + let input = req.input; + + try_!(self.store.delete_bucket(&input.bucket).await); Ok(S3Response::new(DeleteBucketOutput {})) }