优化并发删除对象

This commit is contained in:
JimChenWYU
2024-09-23 16:07:02 +08:00
parent 184d593d27
commit 3bc98eec16

View File

@@ -1,7 +1,8 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};
use futures::future::join_all;
use http::HeaderMap;
use tokio::sync::Semaphore;
use tracing::warn;
use uuid::Uuid;
@@ -227,19 +228,31 @@ impl StorageAPI for Sets {
}
}
// TODO: 并发
let semaphore = Arc::new(Semaphore::new(num_cpus::get()));
let mut jhs = Vec::with_capacity(semaphore.available_permits());
for (k, v) in set_obj_map {
let disks = self.get_disks(k);
let objs: Vec<ObjectToDelete> = v.iter().map(|v| v.obj.clone()).collect();
let (dobjects, errs) = disks.delete_objects(bucket, objs, opts.clone()).await?;
let semaphore = semaphore.clone();
let opts = opts.clone();
let bucket = bucket.to_string();
for (i, err) in errs.into_iter().enumerate() {
let obj = v.get(i).unwrap();
let jh = tokio::spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
let objs: Vec<ObjectToDelete> = v.iter().map(|v| v.obj.clone()).collect();
disks.delete_objects(&bucket, objs, opts).await
});
jhs.push(jh);
}
del_errs[obj.orig_idx] = err;
let mut results = Vec::with_capacity(jhs.len());
for jh in jhs {
results.push(jh.await?.unwrap());
}
del_objects[obj.orig_idx] = dobjects.get(i).unwrap().clone();
}
for (dobjects, errs) in results {
del_objects.extend(dobjects);
del_errs.extend(errs);
}
Ok((del_objects, del_errs))