mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,2 +1,3 @@
|
||||
/target
|
||||
.DS_Store
|
||||
.DS_Store
|
||||
.idea
|
||||
|
||||
@@ -828,6 +828,14 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
async fn delete_volume(&self, volume: &str) -> Result<()> {
|
||||
let p = self.get_bucket_path(&volume)?;
|
||||
|
||||
fs::remove_dir_all(&p).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -880,4 +888,32 @@ mod test {
|
||||
|
||||
fs::remove_dir_all(&p).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_volume() {
|
||||
let p = "./testv";
|
||||
fs::create_dir_all(&p).await.unwrap();
|
||||
|
||||
let ep = match Endpoint::try_from(p) {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
println!("{e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let disk = LocalDisk::new(&ep, false).await.unwrap();
|
||||
|
||||
let tmpp = disk.resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET)).unwrap();
|
||||
|
||||
println!("ppp :{:?}", &tmpp);
|
||||
|
||||
let volumes = vec!["a", "b", "c"];
|
||||
|
||||
disk.make_volumes(volumes.clone()).await.unwrap();
|
||||
|
||||
disk.delete_volume("a").await.unwrap();
|
||||
|
||||
fs::remove_dir_all(&p).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
|
||||
) -> Result<RenameDataResp>;
|
||||
|
||||
async fn make_volumes(&self, volume: Vec<&str>) -> Result<()>;
|
||||
async fn delete_volume(&self, volume: &str) -> Result<()>;
|
||||
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>>;
|
||||
async fn make_volume(&self, volume: &str) -> Result<()>;
|
||||
async fn stat_volume(&self, volume: &str) -> Result<VolumeInfo>;
|
||||
|
||||
@@ -17,6 +17,7 @@ type Client = Arc<Box<dyn PeerS3Client>>;
|
||||
pub trait PeerS3Client: Debug + Sync + Send + 'static {
|
||||
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
|
||||
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
|
||||
async fn delete_bucket(&self, bucket: &str) -> Result<()>;
|
||||
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
|
||||
fn get_pools(&self) -> Vec<usize>;
|
||||
}
|
||||
@@ -57,8 +58,42 @@ impl S3PeerSys {
|
||||
|
||||
#[async_trait]
|
||||
impl PeerS3Client for S3PeerSys {
|
||||
fn get_pools(&self) -> Vec<usize> {
|
||||
unimplemented!()
|
||||
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
|
||||
let mut futures = Vec::with_capacity(self.clients.len());
|
||||
for cli in self.clients.iter() {
|
||||
futures.push(cli.make_bucket(bucket, opts));
|
||||
}
|
||||
|
||||
let mut errors = Vec::with_capacity(self.clients.len());
|
||||
|
||||
let results = join_all(futures).await;
|
||||
for result in results {
|
||||
match result {
|
||||
Ok(_) => {
|
||||
errors.push(None);
|
||||
}
|
||||
Err(e) => {
|
||||
errors.push(Some(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i in 0..self.pools_count {
|
||||
let mut per_pool_errs = Vec::with_capacity(self.clients.len());
|
||||
for (j, cli) in self.clients.iter().enumerate() {
|
||||
let pools = cli.get_pools();
|
||||
let idx = i;
|
||||
if pools.contains(&idx) {
|
||||
per_pool_errs.push(errors[j].as_ref());
|
||||
}
|
||||
|
||||
// TODO: reduceWriteQuorumErrs
|
||||
}
|
||||
}
|
||||
|
||||
// TODO:
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
|
||||
let mut futures = Vec::with_capacity(self.clients.len());
|
||||
@@ -105,15 +140,16 @@ impl PeerS3Client for S3PeerSys {
|
||||
|
||||
Ok(buckets)
|
||||
}
|
||||
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
|
||||
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
|
||||
let mut futures = Vec::with_capacity(self.clients.len());
|
||||
for cli in self.clients.iter() {
|
||||
futures.push(cli.make_bucket(bucket, opts));
|
||||
futures.push(cli.delete_bucket(bucket));
|
||||
}
|
||||
|
||||
let mut errors = Vec::with_capacity(self.clients.len());
|
||||
|
||||
let results = join_all(futures).await;
|
||||
|
||||
for result in results {
|
||||
match result {
|
||||
Ok(_) => {
|
||||
@@ -125,20 +161,7 @@ impl PeerS3Client for S3PeerSys {
|
||||
}
|
||||
}
|
||||
|
||||
for i in 0..self.pools_count {
|
||||
let mut per_pool_errs = Vec::with_capacity(self.clients.len());
|
||||
for (j, cli) in self.clients.iter().enumerate() {
|
||||
let pools = cli.get_pools();
|
||||
let idx = i;
|
||||
if pools.contains(&idx) {
|
||||
per_pool_errs.push(errors[j].as_ref());
|
||||
}
|
||||
|
||||
// TODO: reduceWriteQuorumErrs
|
||||
}
|
||||
}
|
||||
|
||||
// TODO:
|
||||
// TODO: reduceWriteQuorumErrs
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -182,6 +205,10 @@ impl PeerS3Client for S3PeerSys {
|
||||
.find_map(|op| op.as_ref().map(|v| v.clone()))
|
||||
.ok_or(Error::new(DiskError::VolumeNotFound))
|
||||
}
|
||||
|
||||
fn get_pools(&self) -> Vec<usize> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -282,6 +309,7 @@ impl PeerS3Client for LocalPeerS3Client {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_bucket_info(&self, bucket: &str, _opts: &BucketOptions) -> Result<BucketInfo> {
|
||||
let mut futures = Vec::with_capacity(self.local_disks.len());
|
||||
for disk in self.local_disks.iter() {
|
||||
@@ -319,6 +347,29 @@ impl PeerS3Client for LocalPeerS3Client {
|
||||
})
|
||||
.ok_or(Error::new(DiskError::VolumeNotFound))
|
||||
}
|
||||
|
||||
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
|
||||
let mut futures = Vec::with_capacity(self.local_disks.len());
|
||||
|
||||
for disk in self.local_disks.iter() {
|
||||
futures.push(disk.delete_volume(bucket));
|
||||
}
|
||||
|
||||
let results = join_all(futures).await;
|
||||
|
||||
let mut errs = Vec::new();
|
||||
|
||||
for res in results {
|
||||
match res {
|
||||
Ok(_) => errs.push(None),
|
||||
Err(e) => errs.push(Some(e)),
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: reduceWriteQuorumErrs
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -348,4 +399,8 @@ impl PeerS3Client for RemotePeerS3Client {
|
||||
async fn get_bucket_info(&self, _bucket: &str, _opts: &BucketOptions) -> Result<BucketInfo> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,4 +186,8 @@ impl StorageAPI for Sets {
|
||||
.complete_multipart_upload(bucket, object, upload_id, uploaded_parts, opts)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ use anyhow::{Error, Result};
|
||||
|
||||
use http::HeaderMap;
|
||||
use s3s::{dto::StreamingBlob, Body};
|
||||
use tracing::debug;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
@@ -239,4 +240,10 @@ impl StorageAPI for ECStore {
|
||||
}
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete_bucket(&self, bucket: &str) -> Result<()> {
|
||||
self.peer_sys.delete_bucket(bucket).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -249,6 +249,7 @@ pub struct MakeBucketOptions {
|
||||
pub force_create: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PutObjReader {
|
||||
pub stream: StreamingBlob,
|
||||
pub content_length: usize,
|
||||
@@ -426,6 +427,7 @@ pub struct ObjectInfo {
|
||||
#[async_trait::async_trait]
|
||||
pub trait StorageAPI {
|
||||
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
|
||||
async fn delete_bucket(&self, bucket: &str) -> Result<()>;
|
||||
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
|
||||
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
|
||||
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo>;
|
||||
|
||||
@@ -83,7 +83,9 @@ impl S3 for FS {
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self, req))]
|
||||
async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
|
||||
let _input = req.input;
|
||||
let input = req.input;
|
||||
|
||||
try_!(self.store.delete_bucket(&input.bucket).await);
|
||||
|
||||
Ok(S3Response::new(DeleteBucketOutput {}))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user