mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 09:40:32 +00:00
stash delete_object
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
use futures::future::join_all;
|
||||
use http::HeaderMap;
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -110,6 +111,22 @@ impl Sets {
|
||||
// ) -> Vec<Option<Error>> {
|
||||
// unimplemented!()
|
||||
// }
|
||||
|
||||
async fn delete_prefix(&self, bucket: &str, object: &str) -> Result<()> {
|
||||
let mut futures = Vec::new();
|
||||
let opt = ObjectOptions {
|
||||
delete_prefix: true,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
for set in self.disk_set.iter() {
|
||||
futures.push(set.delete_object(bucket, object, opt.clone()));
|
||||
}
|
||||
|
||||
let _results = join_all(futures).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// #[derive(Debug)]
|
||||
@@ -134,7 +151,14 @@ impl StorageAPI for Sets {
|
||||
async fn get_bucket_info(&self, _bucket: &str, _opts: &BucketOptions) -> Result<BucketInfo> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result<ObjectInfo> {
|
||||
if opts.delete_prefix {
|
||||
self.delete_prefix(bucket, object).await?;
|
||||
return Ok(ObjectInfo::default());
|
||||
}
|
||||
|
||||
self.get_disks_by_key(object).delete_object(bucket, object, opts).await
|
||||
}
|
||||
async fn list_objects_v2(
|
||||
&self,
|
||||
_bucket: &str,
|
||||
|
||||
@@ -16,6 +16,7 @@ use futures::future::join_all;
|
||||
use http::HeaderMap;
|
||||
use s3s::{dto::StreamingBlob, Body};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use time::OffsetDateTime;
|
||||
use tracing::{debug, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -227,6 +228,77 @@ impl ECStore {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn delete_prefix(&self, _bucket: &str, _object: &str) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn get_pool_info_existing_with_opts(
|
||||
&self,
|
||||
bucket: &str,
|
||||
object: &str,
|
||||
opts: &ObjectOptions,
|
||||
) -> Result<(PoolObjInfo, Vec<Error>)> {
|
||||
let mut futures = Vec::new();
|
||||
|
||||
for pool in self.pools.iter() {
|
||||
futures.push(pool.get_object_info(bucket, object, opts));
|
||||
}
|
||||
|
||||
let results = join_all(futures).await;
|
||||
|
||||
let mut ress = Vec::new();
|
||||
|
||||
let mut i = 0;
|
||||
|
||||
// join_all结果跟输入顺序一致
|
||||
for res in results {
|
||||
let index = i;
|
||||
|
||||
match res {
|
||||
Ok(r) => {
|
||||
ress.push(PoolObjInfo {
|
||||
index,
|
||||
object_info: r,
|
||||
err: None,
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
ress.push(PoolObjInfo {
|
||||
index,
|
||||
err: Some(e),
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
|
||||
ress.sort_by(|a, b| {
|
||||
let at = a.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH);
|
||||
let bt = b.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH);
|
||||
|
||||
at.cmp(&bt)
|
||||
});
|
||||
|
||||
for res in ress {
|
||||
// check
|
||||
if res.err.is_none() {
|
||||
// TODO: let errs = self.poolsWithObject()
|
||||
return Ok((res, Vec::new()));
|
||||
}
|
||||
}
|
||||
|
||||
let ret = PoolObjInfo::default();
|
||||
|
||||
Ok((ret, Vec::new()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct PoolObjInfo {
|
||||
pub index: usize,
|
||||
pub object_info: ObjectInfo,
|
||||
pub err: Option<Error>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -302,6 +374,31 @@ impl StorageAPI for ECStore {
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result<ObjectInfo> {
|
||||
if opts.delete_prefix {
|
||||
self.delete_prefix(bucket, &object).await?;
|
||||
return Ok(ObjectInfo::default());
|
||||
}
|
||||
|
||||
let object = utils::path::encode_dir_object(object);
|
||||
let object = object.as_str();
|
||||
|
||||
// 查询在哪个pool
|
||||
let (mut pinfo, errs) = self.get_pool_info_existing_with_opts(bucket, object, &opts).await?;
|
||||
if pinfo.object_info.delete_marker && opts.version_id.is_empty() {
|
||||
pinfo.object_info.name = utils::path::decode_dir_object(object);
|
||||
return Ok(pinfo.object_info);
|
||||
}
|
||||
|
||||
if !errs.is_empty() {
|
||||
// TODO: deleteObjectFromAllPools
|
||||
}
|
||||
|
||||
let mut obj = self.pools[pinfo.index].delete_object(bucket, object, opts.clone()).await?;
|
||||
obj.name = utils::path::decode_dir_object(object);
|
||||
|
||||
Ok(obj)
|
||||
}
|
||||
async fn list_objects_v2(
|
||||
&self,
|
||||
bucket: &str,
|
||||
|
||||
@@ -86,7 +86,7 @@ impl FileInfo {
|
||||
parity_blocks: self.erasure.parity_blocks,
|
||||
data_blocks: self.erasure.data_blocks,
|
||||
version_id: self.version_id,
|
||||
deleted: self.deleted,
|
||||
delete_marker: self.deleted,
|
||||
mod_time: self.mod_time,
|
||||
size: self.size,
|
||||
parts: self.parts.clone(),
|
||||
@@ -358,12 +358,15 @@ impl HTTPRangeSpec {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct ObjectOptions {
|
||||
// Use the maximum parity (N/2), used when saving server configuration files
|
||||
pub max_parity: bool,
|
||||
pub mod_time: Option<OffsetDateTime>,
|
||||
pub part_number: usize,
|
||||
|
||||
pub delete_prefix: bool,
|
||||
pub version_id: String,
|
||||
}
|
||||
|
||||
// impl Default for ObjectOptions {
|
||||
@@ -413,13 +416,13 @@ impl From<s3s::dto::CompletedPart> for CompletePart {
|
||||
pub struct ObjectInfo {
|
||||
pub bucket: String,
|
||||
pub name: String,
|
||||
pub mod_time: Option<OffsetDateTime>,
|
||||
pub size: usize,
|
||||
pub is_dir: bool,
|
||||
pub parity_blocks: usize,
|
||||
pub data_blocks: usize,
|
||||
pub version_id: Uuid,
|
||||
pub deleted: bool,
|
||||
pub mod_time: Option<OffsetDateTime>,
|
||||
pub size: usize,
|
||||
pub delete_marker: bool,
|
||||
pub parts: Vec<ObjectPartInfo>,
|
||||
pub is_latest: bool,
|
||||
}
|
||||
@@ -474,7 +477,7 @@ pub trait StorageAPI {
|
||||
async fn delete_bucket(&self, bucket: &str) -> Result<()>;
|
||||
async fn list_bucket(&self, opts: &BucketOptions) -> Result<Vec<BucketInfo>>;
|
||||
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
|
||||
|
||||
async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result<ObjectInfo>;
|
||||
async fn list_objects_v2(
|
||||
&self,
|
||||
bucket: &str,
|
||||
|
||||
Reference in New Issue
Block a user