Merge pull request #10 from rustfs/feat/delete-bucket

feat: 删除桶
This commit is contained in:
weisd
2024-07-30 17:54:19 +08:00
committed by GitHub
8 changed files with 128 additions and 20 deletions

3
.gitignore vendored
View File

@@ -1,2 +1,3 @@
/target
.DS_Store
.DS_Store
.idea

View File

@@ -828,6 +828,14 @@ impl DiskAPI for LocalDisk {
Ok(results)
}
async fn delete_volume(&self, volume: &str) -> Result<()> {
let p = self.get_bucket_path(&volume)?;
fs::remove_dir_all(&p).await?;
Ok(())
}
}
#[cfg(test)]
@@ -880,4 +888,32 @@ mod test {
fs::remove_dir_all(&p).await.unwrap();
}
#[tokio::test]
async fn test_delete_volume() {
let p = "./testv";
fs::create_dir_all(&p).await.unwrap();
let ep = match Endpoint::try_from(p) {
Ok(e) => e,
Err(e) => {
println!("{e}");
return;
}
};
let disk = LocalDisk::new(&ep, false).await.unwrap();
let tmpp = disk.resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET)).unwrap();
println!("ppp :{:?}", &tmpp);
let volumes = vec!["a", "b", "c"];
disk.make_volumes(volumes.clone()).await.unwrap();
disk.delete_volume("a").await.unwrap();
fs::remove_dir_all(&p).await.unwrap();
}
}

View File

@@ -40,6 +40,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
) -> Result<RenameDataResp>;
async fn make_volumes(&self, volume: Vec<&str>) -> Result<()>;
async fn delete_volume(&self, volume: &str) -> Result<()>;
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>>;
async fn make_volume(&self, volume: &str) -> Result<()>;
async fn stat_volume(&self, volume: &str) -> Result<VolumeInfo>;

View File

@@ -17,6 +17,7 @@ type Client = Arc<Box<dyn PeerS3Client>>;
pub trait PeerS3Client: Debug + Sync + Send + 'static {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
async fn delete_bucket(&self, bucket: &str) -> Result<()>;
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
fn get_pools(&self) -> Vec<usize>;
}
@@ -57,8 +58,42 @@ impl S3PeerSys {
#[async_trait]
impl PeerS3Client for S3PeerSys {
fn get_pools(&self) -> Vec<usize> {
unimplemented!()
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
futures.push(cli.make_bucket(bucket, opts));
}
let mut errors = Vec::with_capacity(self.clients.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(_) => {
errors.push(None);
}
Err(e) => {
errors.push(Some(e));
}
}
}
for i in 0..self.pools_count {
let mut per_pool_errs = Vec::with_capacity(self.clients.len());
for (j, cli) in self.clients.iter().enumerate() {
let pools = cli.get_pools();
let idx = i;
if pools.contains(&idx) {
per_pool_errs.push(errors[j].as_ref());
}
// TODO: reduceWriteQuorumErrs
}
}
// TODO:
Ok(())
}
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
let mut futures = Vec::with_capacity(self.clients.len());
@@ -105,15 +140,16 @@ impl PeerS3Client for S3PeerSys {
Ok(buckets)
}
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
futures.push(cli.make_bucket(bucket, opts));
futures.push(cli.delete_bucket(bucket));
}
let mut errors = Vec::with_capacity(self.clients.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(_) => {
@@ -125,20 +161,7 @@ impl PeerS3Client for S3PeerSys {
}
}
for i in 0..self.pools_count {
let mut per_pool_errs = Vec::with_capacity(self.clients.len());
for (j, cli) in self.clients.iter().enumerate() {
let pools = cli.get_pools();
let idx = i;
if pools.contains(&idx) {
per_pool_errs.push(errors[j].as_ref());
}
// TODO: reduceWriteQuorumErrs
}
}
// TODO:
// TODO: reduceWriteQuorumErrs
Ok(())
}
@@ -182,6 +205,10 @@ impl PeerS3Client for S3PeerSys {
.find_map(|op| op.as_ref().map(|v| v.clone()))
.ok_or(Error::new(DiskError::VolumeNotFound))
}
fn get_pools(&self) -> Vec<usize> {
unimplemented!()
}
}
#[derive(Debug)]
@@ -282,6 +309,7 @@ impl PeerS3Client for LocalPeerS3Client {
Ok(())
}
async fn get_bucket_info(&self, bucket: &str, _opts: &BucketOptions) -> Result<BucketInfo> {
let mut futures = Vec::with_capacity(self.local_disks.len());
for disk in self.local_disks.iter() {
@@ -319,6 +347,29 @@ impl PeerS3Client for LocalPeerS3Client {
})
.ok_or(Error::new(DiskError::VolumeNotFound))
}
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
let mut futures = Vec::with_capacity(self.local_disks.len());
for disk in self.local_disks.iter() {
futures.push(disk.delete_volume(bucket));
}
let results = join_all(futures).await;
let mut errs = Vec::new();
for res in results {
match res {
Ok(_) => errs.push(None),
Err(e) => errs.push(Some(e)),
}
}
// TODO: reduceWriteQuorumErrs
Ok(())
}
}
#[derive(Debug)]
@@ -348,4 +399,8 @@ impl PeerS3Client for RemotePeerS3Client {
async fn get_bucket_info(&self, _bucket: &str, _opts: &BucketOptions) -> Result<BucketInfo> {
unimplemented!()
}
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
unimplemented!()
}
}

View File

@@ -186,4 +186,8 @@ impl StorageAPI for Sets {
.complete_multipart_upload(bucket, object, upload_id, uploaded_parts, opts)
.await
}
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
unimplemented!()
}
}

View File

@@ -4,6 +4,7 @@ use anyhow::{Error, Result};
use http::HeaderMap;
use s3s::{dto::StreamingBlob, Body};
use tracing::debug;
use uuid::Uuid;
use crate::{
@@ -239,4 +240,10 @@ impl StorageAPI for ECStore {
}
unimplemented!()
}
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
self.peer_sys.delete_bucket(bucket).await?;
Ok(())
}
}

View File

@@ -249,6 +249,7 @@ pub struct MakeBucketOptions {
pub force_create: bool,
}
#[derive(Debug)]
pub struct PutObjReader {
pub stream: StreamingBlob,
pub content_length: usize,
@@ -426,6 +427,7 @@ pub struct ObjectInfo {
#[async_trait::async_trait]
pub trait StorageAPI {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn delete_bucket(&self, bucket: &str) -> Result<()>;
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;

View File

@@ -83,7 +83,9 @@ impl S3 for FS {
#[tracing::instrument(level = "debug", skip(self, req))]
async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
let _input = req.input;
let input = req.input;
try_!(self.store.delete_bucket(&input.bucket).await);
Ok(S3Response::new(DeleteBucketOutput {}))
}