mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
stash delete_object
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use futures::future::join_all;
|
||||
use http::HeaderMap;
|
||||
use uuid::Uuid;
|
||||
@@ -139,6 +141,12 @@ impl Sets {
|
||||
// pub default_parity_count: usize,
|
||||
// }
|
||||
|
||||
struct DelObj {
|
||||
// set_idx: usize,
|
||||
orig_idx: usize,
|
||||
obj: ObjectToDelete,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl StorageAPI for Sets {
|
||||
async fn list_bucket(&self, _opts: &BucketOptions) -> Result<Vec<BucketInfo>> {
|
||||
@@ -153,12 +161,66 @@ impl StorageAPI for Sets {
|
||||
}
|
||||
async fn delete_objects(
|
||||
&self,
|
||||
_bucket: &str,
|
||||
_objects: Vec<ObjectToDelete>,
|
||||
_opts: ObjectOptions,
|
||||
bucket: &str,
|
||||
objects: Vec<ObjectToDelete>,
|
||||
opts: ObjectOptions,
|
||||
) -> Result<(Vec<DeletedObject>, Vec<Option<Error>>)> {
|
||||
// FIXME:
|
||||
unimplemented!()
|
||||
// 默认返回值
|
||||
let mut del_objects = vec![DeletedObject::default(); objects.len()];
|
||||
|
||||
let mut del_errs = Vec::with_capacity(objects.len());
|
||||
for _ in 0..objects.len() {
|
||||
del_errs.push(None)
|
||||
}
|
||||
|
||||
let mut set_obj_map = HashMap::new();
|
||||
|
||||
// hash key
|
||||
let mut i = 0;
|
||||
for obj in objects.iter() {
|
||||
let idx = self.get_hashed_set_index(obj.object_name.as_str());
|
||||
|
||||
if !set_obj_map.contains_key(&idx) {
|
||||
set_obj_map.insert(
|
||||
idx,
|
||||
vec![DelObj {
|
||||
// set_idx: idx,
|
||||
orig_idx: i,
|
||||
obj: obj.clone(),
|
||||
}],
|
||||
);
|
||||
} else {
|
||||
if let Some(val) = set_obj_map.get_mut(&idx) {
|
||||
val.push(DelObj {
|
||||
// set_idx: idx,
|
||||
orig_idx: i,
|
||||
obj: obj.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
i += 1;
|
||||
}
|
||||
|
||||
// TODO: 并发
|
||||
for (k, v) in set_obj_map {
|
||||
let disks = self.get_disks(k);
|
||||
let objs: Vec<ObjectToDelete> = v.iter().map(|v| v.obj.clone()).collect();
|
||||
let (dobjects, errs) = disks.delete_objects(bucket, objs, opts.clone()).await?;
|
||||
|
||||
let mut i = 0;
|
||||
for err in errs {
|
||||
let obj = v.get(i).unwrap();
|
||||
|
||||
del_errs[obj.orig_idx] = err;
|
||||
|
||||
del_objects[obj.orig_idx] = dobjects.get(i).unwrap().clone();
|
||||
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Ok((del_objects, del_errs))
|
||||
}
|
||||
async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result<ObjectInfo> {
|
||||
if opts.delete_prefix {
|
||||
|
||||
Reference in New Issue
Block a user