diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index c9f77e8e..819cbd1e 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -1,7 +1,8 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use futures::future::join_all; use http::HeaderMap; +use tokio::sync::Semaphore; use tracing::warn; use uuid::Uuid; @@ -227,19 +228,31 @@ impl StorageAPI for Sets { } } - // TODO: 并发 + let semaphore = Arc::new(Semaphore::new(num_cpus::get())); + let mut jhs = Vec::with_capacity(semaphore.available_permits()); + for (k, v) in set_obj_map { let disks = self.get_disks(k); - let objs: Vec = v.iter().map(|v| v.obj.clone()).collect(); - let (dobjects, errs) = disks.delete_objects(bucket, objs, opts.clone()).await?; + let semaphore = semaphore.clone(); + let opts = opts.clone(); + let bucket = bucket.to_string(); - for (i, err) in errs.into_iter().enumerate() { - let obj = v.get(i).unwrap(); + let jh = tokio::spawn(async move { + let _permit = semaphore.acquire().await.unwrap(); + let objs: Vec = v.iter().map(|v| v.obj.clone()).collect(); + disks.delete_objects(&bucket, objs, opts).await + }); + jhs.push(jh); + } - del_errs[obj.orig_idx] = err; + let mut results = Vec::with_capacity(jhs.len()); + for jh in jhs { + results.push(jh.await?.unwrap()); + } - del_objects[obj.orig_idx] = dobjects.get(i).unwrap().clone(); - } + for (dobjects, errs) in results { + del_objects.extend(dobjects); + del_errs.extend(errs); } Ok((del_objects, del_errs))