add delete_bucket opts

This commit is contained in:
weisd
2024-09-13 13:30:10 +08:00
parent dc6f485aca
commit 7f35685b35
7 changed files with 66 additions and 20 deletions

View File

@@ -31,6 +31,9 @@ pub enum DiskError {
#[error("volume not found")]
VolumeNotFound,
#[error("volume not empty")]
VolumeNotEmpty,
}
impl DiskError {

View File

@@ -972,7 +972,20 @@ impl DiskAPI for LocalDisk {
let p = self.get_bucket_path(volume)?;
// TODO: 不能用递归删除如果目录下面有文件返回errVolumeNotEmpty
fs::remove_dir_all(&p).await?;
if let Err(err) = fs::remove_dir_all(&p).await {
match err.kind() {
ErrorKind::NotFound => (),
// ErrorKind::DirectoryNotEmpty => (),
kind => {
if kind.to_string() == "directory not empty" {
return Err(Error::new(DiskError::VolumeNotEmpty));
}
return Err(Error::from(err));
}
}
}
Ok(())
}

View File

@@ -8,7 +8,7 @@ use crate::{
disk::{self, error::DiskError, DiskStore, VolumeInfo},
endpoints::{EndpointServerPools, Node},
error::{Error, Result},
store_api::{BucketInfo, BucketOptions, MakeBucketOptions},
store_api::{BucketInfo, BucketOptions, DeleteBucketOptions, MakeBucketOptions},
};
type Client = Arc<Box<dyn PeerS3Client>>;
@@ -17,7 +17,7 @@ type Client = Arc<Box<dyn PeerS3Client>>;
pub trait PeerS3Client: Debug + Sync + Send + 'static {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
async fn delete_bucket(&self, bucket: &str) -> Result<()>;
async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()>;
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
fn get_pools(&self) -> Vec<usize>;
}
@@ -140,10 +140,10 @@ impl PeerS3Client for S3PeerSys {
Ok(buckets)
}
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
futures.push(cli.delete_bucket(bucket));
futures.push(cli.delete_bucket(bucket, &opts));
}
let mut errors = Vec::with_capacity(self.clients.len());
@@ -350,7 +350,7 @@ impl PeerS3Client for LocalPeerS3Client {
.ok_or(Error::new(DiskError::VolumeNotFound))
}
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()> {
let mut futures = Vec::with_capacity(self.local_disks.len());
for disk in self.local_disks.iter() {
@@ -361,14 +361,34 @@ impl PeerS3Client for LocalPeerS3Client {
let mut errs = Vec::new();
let mut recreate = false;
for res in results {
match res {
Ok(_) => errs.push(None),
Err(e) => errs.push(Some(e)),
Err(e) => {
if DiskError::VolumeNotEmpty.is(&e) {
recreate = true;
}
errs.push(Some(e))
}
}
}
// TODO: errVolumeNotEmpty 不删除,把已经删除的重新创建
// errVolumeNotEmpty 不删除,把已经删除的重新创建
let mut idx = 0;
for err in errs {
if err.is_none() && recreate {
let _ = self.local_disks[idx].make_volume(bucket).await;
}
idx += 1;
}
if recreate {
return Err(Error::new(DiskError::VolumeNotEmpty));
}
// TODO: reduceWriteQuorumErrs
@@ -404,7 +424,7 @@ impl PeerS3Client for RemotePeerS3Client {
unimplemented!()
}
async fn delete_bucket(&self, _bucket: &str) -> Result<()> {
async fn delete_bucket(&self, _bucket: &str, _opts: &DeleteBucketOptions) -> Result<()> {
unimplemented!()
}
}

View File

@@ -13,8 +13,9 @@ use crate::{
error::{Error, Result},
set_disk::SetDisks,
store_api::{
BucketInfo, BucketOptions, CompletePart, DeletedObject, GetObjectReader, HTTPRangeSpec, ListObjectsV2Info,
MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI,
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,
ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo,
PutObjReader, StorageAPI,
},
utils::hash,
};
@@ -297,7 +298,7 @@ impl StorageAPI for Sets {
.await
}
async fn delete_bucket(&self, _bucket: &str) -> Result<()> {
async fn delete_bucket(&self, _bucket: &str, opts: &DeleteBucketOptions) -> Result<()> {
unimplemented!()
}
}

View File

@@ -7,9 +7,9 @@ use crate::{
peer::{PeerS3Client, S3PeerSys},
sets::Sets,
store_api::{
BucketInfo, BucketOptions, CompletePart, DeletedObject, GetObjectReader, HTTPRangeSpec, ListObjectsInfo,
ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo,
PutObjReader, StorageAPI,
BucketInfo, BucketOptions, CompletePart, DeleteBucketOptions, DeletedObject, GetObjectReader, HTTPRangeSpec,
ListObjectsInfo, ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete,
PartInfo, PutObjReader, StorageAPI,
},
store_init, utils,
};
@@ -630,8 +630,8 @@ impl StorageAPI for ECStore {
unimplemented!()
}
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
self.peer_sys.delete_bucket(bucket).await?;
async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()> {
self.peer_sys.delete_bucket(bucket, opts).await?;
// 删除meta
self.delete_all(RUSTFS_META_BUCKET, format!("{}/{}", BUCKET_META_PREFIX, bucket).as_str())

View File

@@ -243,6 +243,10 @@ pub struct MakeBucketOptions {
pub force_create: bool,
}
pub struct DeleteBucketOptions {
pub force: bool, // Force deletion
}
#[derive(Debug)]
pub struct PutObjReader {
pub stream: StreamingBlob,
@@ -488,7 +492,7 @@ pub struct DeletedObject {
#[async_trait::async_trait]
pub trait StorageAPI {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn delete_bucket(&self, bucket: &str) -> Result<()>;
async fn delete_bucket(&self, bucket: &str, opts: &DeleteBucketOptions) -> Result<()>;
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result<ObjectInfo>;

View File

@@ -2,6 +2,7 @@ use bytes::Bytes;
use ecstore::disk::error::DiskError;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::CompletePart;
use ecstore::store_api::DeleteBucketOptions;
use ecstore::store_api::HTTPRangeSpec;
use ecstore::store_api::MakeBucketOptions;
use ecstore::store_api::MultipartUploadResult;
@@ -86,8 +87,12 @@ impl S3 for FS {
#[tracing::instrument(level = "debug", skip(self, req))]
async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
let input = req.input;
try_!(self.store.delete_bucket(&input.bucket).await);
// TODO: DeleteBucketInput 没有force参数
try_!(
self.store
.delete_bucket(&input.bucket, &DeleteBucketOptions { force: false })
.await
);
Ok(S3Response::new(DeleteBucketOutput {}))
}