From 72201049f0f0854e61361fb1f81abbe874c5e4b0 Mon Sep 17 00:00:00 2001 From: weisd Date: Wed, 25 Sep 2024 16:54:48 +0800 Subject: [PATCH] fix: delete_objects --- ecstore/src/sets.rs | 53 +++++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index c0754384..81aaefca 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -228,31 +228,46 @@ impl StorageAPI for Sets { } } - let semaphore = Arc::new(Semaphore::new(num_cpus::get())); - let mut jhs = Vec::with_capacity(semaphore.available_permits()); + // let semaphore = Arc::new(Semaphore::new(num_cpus::get())); + // let mut jhs = Vec::with_capacity(semaphore.available_permits()); + // for (k, v) in set_obj_map { + // let disks = self.get_disks(k); + // let semaphore = semaphore.clone(); + // let opts = opts.clone(); + // let bucket = bucket.to_string(); + + // let jh = tokio::spawn(async move { + // let _permit = semaphore.acquire().await.unwrap(); + // let objs: Vec = v.iter().map(|v| v.obj.clone()).collect(); + // disks.delete_objects(&bucket, objs, opts).await + // }); + // jhs.push(jh); + // } + + // let mut results = Vec::with_capacity(jhs.len()); + // for jh in jhs { + // results.push(jh.await?.unwrap()); + // } + + // for (dobjects, errs) in results { + // del_objects.extend(dobjects); + // del_errs.extend(errs); + // } + + // TODO: 并发 for (k, v) in set_obj_map { let disks = self.get_disks(k); - let semaphore = semaphore.clone(); - let opts = opts.clone(); - let bucket = bucket.to_string(); + let objs: Vec = v.iter().map(|v| v.obj.clone()).collect(); + let (dobjects, errs) = disks.delete_objects(bucket, objs, opts.clone()).await?; - let jh = tokio::spawn(async move { - let _permit = semaphore.acquire().await.unwrap(); - let objs: Vec = v.iter().map(|v| v.obj.clone()).collect(); - disks.delete_objects(&bucket, objs, opts).await - }); - jhs.push(jh); - } + for (i, err) in errs.into_iter().enumerate() { + let obj = v.get(i).unwrap(); - let mut results = Vec::with_capacity(jhs.len()); - for jh in jhs { - results.push(jh.await?.unwrap()); - } + del_errs[obj.orig_idx] = err; - for (dobjects, errs) in results { - del_objects.extend(dobjects); - del_errs.extend(errs); + del_objects[obj.orig_idx] = dobjects.get(i).unwrap().clone(); + } } Ok((del_objects, del_errs))