mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
fix: delete_objects
This commit is contained in:
@@ -228,31 +228,46 @@ impl StorageAPI for Sets {
|
||||
}
|
||||
}
|
||||
|
||||
let semaphore = Arc::new(Semaphore::new(num_cpus::get()));
|
||||
let mut jhs = Vec::with_capacity(semaphore.available_permits());
|
||||
// let semaphore = Arc::new(Semaphore::new(num_cpus::get()));
|
||||
// let mut jhs = Vec::with_capacity(semaphore.available_permits());
|
||||
|
||||
// for (k, v) in set_obj_map {
|
||||
// let disks = self.get_disks(k);
|
||||
// let semaphore = semaphore.clone();
|
||||
// let opts = opts.clone();
|
||||
// let bucket = bucket.to_string();
|
||||
|
||||
// let jh = tokio::spawn(async move {
|
||||
// let _permit = semaphore.acquire().await.unwrap();
|
||||
// let objs: Vec<ObjectToDelete> = v.iter().map(|v| v.obj.clone()).collect();
|
||||
// disks.delete_objects(&bucket, objs, opts).await
|
||||
// });
|
||||
// jhs.push(jh);
|
||||
// }
|
||||
|
||||
// let mut results = Vec::with_capacity(jhs.len());
|
||||
// for jh in jhs {
|
||||
// results.push(jh.await?.unwrap());
|
||||
// }
|
||||
|
||||
// for (dobjects, errs) in results {
|
||||
// del_objects.extend(dobjects);
|
||||
// del_errs.extend(errs);
|
||||
// }
|
||||
|
||||
// TODO: 并发
|
||||
for (k, v) in set_obj_map {
|
||||
let disks = self.get_disks(k);
|
||||
let semaphore = semaphore.clone();
|
||||
let opts = opts.clone();
|
||||
let bucket = bucket.to_string();
|
||||
let objs: Vec<ObjectToDelete> = v.iter().map(|v| v.obj.clone()).collect();
|
||||
let (dobjects, errs) = disks.delete_objects(bucket, objs, opts.clone()).await?;
|
||||
|
||||
let jh = tokio::spawn(async move {
|
||||
let _permit = semaphore.acquire().await.unwrap();
|
||||
let objs: Vec<ObjectToDelete> = v.iter().map(|v| v.obj.clone()).collect();
|
||||
disks.delete_objects(&bucket, objs, opts).await
|
||||
});
|
||||
jhs.push(jh);
|
||||
}
|
||||
for (i, err) in errs.into_iter().enumerate() {
|
||||
let obj = v.get(i).unwrap();
|
||||
|
||||
let mut results = Vec::with_capacity(jhs.len());
|
||||
for jh in jhs {
|
||||
results.push(jh.await?.unwrap());
|
||||
}
|
||||
del_errs[obj.orig_idx] = err;
|
||||
|
||||
for (dobjects, errs) in results {
|
||||
del_objects.extend(dobjects);
|
||||
del_errs.extend(errs);
|
||||
del_objects[obj.orig_idx] = dobjects.get(i).unwrap().clone();
|
||||
}
|
||||
}
|
||||
|
||||
Ok((del_objects, del_errs))
|
||||
|
||||
Reference in New Issue
Block a user