diff --git a/Cargo.lock b/Cargo.lock index 22b19bba..b8c76b3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -447,6 +447,7 @@ dependencies = [ "http", "lazy_static", "netif", + "num_cpus", "openssl", "path-absolutize", "path-clean", diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index ada0fc7a..75f503ef 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -38,13 +38,14 @@ base64-simd = "0.8.0" sha2 = "0.10.8" hex-simd = "0.8.0" path-clean = "1.0.1" -tokio = { workspace = true, features = ["io-util"] } +tokio = { workspace = true, features = ["io-util", "sync"] } tokio-stream = "0.1.15" tonic.workspace = true tower.workspace = true rmp = "0.8.14" byteorder = "1.5.0" xxhash-rust = { version = "0.8.12", features = ["xxh64"] } +num_cpus = "1.16" [target.'cfg(not(windows))'.dependencies] openssl = "0.10.66" diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 9d1decf5..c0754384 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -1,7 +1,9 @@ -use std::collections::HashMap; +#![allow(clippy::map_entry)] +use std::{collections::HashMap, sync::Arc}; use futures::future::join_all; use http::HeaderMap; +use tokio::sync::Semaphore; use tracing::warn; use uuid::Uuid; @@ -22,7 +24,7 @@ use crate::{ utils::hash, }; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Sets { pub id: Uuid, // pub sets: Vec, @@ -205,8 +207,7 @@ impl StorageAPI for Sets { let mut set_obj_map = HashMap::new(); // hash key - let mut i = 0; - for obj in objects.iter() { + for (i, obj) in objects.iter().enumerate() { let idx = self.get_hashed_set_index(obj.object_name.as_str()); if !set_obj_map.contains_key(&idx) { @@ -218,35 +219,40 @@ impl StorageAPI for Sets { obj: obj.clone(), }], ); - } else { - if let Some(val) = set_obj_map.get_mut(&idx) { - val.push(DelObj { - // set_idx: idx, - orig_idx: i, - obj: obj.clone(), - }); - } + } else if let Some(val) = set_obj_map.get_mut(&idx) { + val.push(DelObj { + // set_idx: idx, + orig_idx: i, + obj: obj.clone(), + }); } - - i += 1; } - // TODO: 并发 + let semaphore = Arc::new(Semaphore::new(num_cpus::get())); + let mut jhs = Vec::with_capacity(semaphore.available_permits()); + for (k, v) in set_obj_map { let disks = self.get_disks(k); - let objs: Vec = v.iter().map(|v| v.obj.clone()).collect(); - let (dobjects, errs) = disks.delete_objects(bucket, objs, opts.clone()).await?; + let semaphore = semaphore.clone(); + let opts = opts.clone(); + let bucket = bucket.to_string(); - let mut i = 0; - for err in errs { - let obj = v.get(i).unwrap(); + let jh = tokio::spawn(async move { + let _permit = semaphore.acquire().await.unwrap(); + let objs: Vec = v.iter().map(|v| v.obj.clone()).collect(); + disks.delete_objects(&bucket, objs, opts).await + }); + jhs.push(jh); + } - del_errs[obj.orig_idx] = err; + let mut results = Vec::with_capacity(jhs.len()); + for jh in jhs { + results.push(jh.await?.unwrap()); + } - del_objects[obj.orig_idx] = dobjects.get(i).unwrap().clone(); - - i += 1; - } + for (dobjects, errs) in results { + del_objects.extend(dobjects); + del_errs.extend(errs); } Ok((del_objects, del_errs)) diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index b70e381e..6a5e18a9 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -1,3 +1,4 @@ +#![allow(clippy::map_entry)] use crate::{ bucket_meta::BucketMetadata, disk::{ @@ -24,7 +25,10 @@ use std::{ time::Duration, }; use time::OffsetDateTime; -use tokio::{fs, sync::RwLock}; +use tokio::{ + fs, + sync::{RwLock, Semaphore}, +}; use tracing::{debug, info, warn}; use uuid::Uuid; @@ -51,10 +55,11 @@ pub async fn update_erasure_type(setup_type: SetupType) { *is_erasure_sd = setup_type == SetupType::ErasureSD; } +type TypeLocalDiskSetDrives = Vec>>>; + lazy_static! { pub static ref GLOBAL_LOCAL_DISK_MAP: Arc>>> = Arc::new(RwLock::new(HashMap::new())); - pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc>>>>> = - Arc::new(RwLock::new(Vec::new())); + pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc> = Arc::new(RwLock::new(Vec::new())); } pub async fn find_local_disk(disk_path: &String) -> Option { @@ -76,7 +81,7 @@ pub async fn find_local_disk(disk_path: &String) -> Option { pub async fn all_local_disk_path() -> Vec { let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await; - disk_map.keys().map(|v| v.clone()).collect() + disk_map.keys().cloned().collect() } pub async fn all_local_disk() -> Vec { @@ -156,6 +161,7 @@ pub struct ECStore { } impl ECStore { + #[allow(clippy::new_ret_no_self)] pub async fn new(_address: String, endpoint_pools: EndpointServerPools) -> Result<()> { // let layouts = DisksLayout::try_from(endpoints.as_slice())?; @@ -318,8 +324,8 @@ impl ECStore { if entry.is_object() { let fi = entry.to_fileinfo(&opts.bucket)?; - if fi.is_some() { - ress.push(fi.unwrap().into_object_info(&opts.bucket, &entry.name, false)); + if let Some(f) = fi { + ress.push(f.into_object_info(&opts.bucket, &entry.name, false)); } continue; } @@ -391,62 +397,68 @@ impl ECStore { object: &str, opts: &ObjectOptions, ) -> Result<(PoolObjInfo, Vec)> { - let mut futures = Vec::new(); - - for pool in self.pools.iter() { - futures.push(pool.get_object_info(bucket, object, opts)); - } - - let results = join_all(futures).await; - - let mut ress = Vec::new(); - - let mut i = 0; - - // join_all结果跟输入顺序一致 - for res in results { - let index = i; - - match res { - Ok(r) => { - ress.push(PoolObjInfo { - index, - object_info: r, - err: None, - }); - } - Err(e) => { - ress.push(PoolObjInfo { - index, - err: Some(e), - ..Default::default() - }); - } - } - i += 1; - } - - ress.sort_by(|a, b| { - let at = a.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH); - let bt = b.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH); - - at.cmp(&bt) - }); - - for res in ress { - // check - if res.err.is_none() { - // TODO: let errs = self.poolsWithObject() - return Ok((res, Vec::new())); - } - } - - let ret = PoolObjInfo::default(); - - Ok((ret, Vec::new())) + internal_get_pool_info_existing_with_opts(&self.pools, bucket, object, opts).await } } +async fn internal_get_pool_info_existing_with_opts( + pools: &[Sets], + bucket: &str, + object: &str, + opts: &ObjectOptions, +) -> Result<(PoolObjInfo, Vec)> { + let mut futures = Vec::new(); + + for pool in pools.iter() { + futures.push(pool.get_object_info(bucket, object, opts)); + } + + let results = join_all(futures).await; + + let mut ress = Vec::new(); + + // join_all结果跟输入顺序一致 + for (i, res) in results.into_iter().enumerate() { + let index = i; + + match res { + Ok(r) => { + ress.push(PoolObjInfo { + index, + object_info: r, + err: None, + }); + } + Err(e) => { + ress.push(PoolObjInfo { + index, + err: Some(e), + ..Default::default() + }); + } + } + } + + ress.sort_by(|a, b| { + let at = a.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH); + let bt = b.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH); + + at.cmp(&bt) + }); + + for res in ress { + // check + if res.err.is_none() { + // TODO: let errs = self.poolsWithObject() + return Ok((res, Vec::new())); + } + } + + let ret = PoolObjInfo::default(); + + Ok((ret, Vec::new())) +} + #[derive(Debug, Default)] pub struct PoolObjInfo { pub index: usize, @@ -559,21 +571,34 @@ impl StorageAPI for ECStore { del_errs.push(None) } - // TODO: limte 限制并发数量 - let opt = ObjectOptions::default(); - // 取所有poolObjInfo - let mut futures = Vec::new(); - for obj in objects.iter() { - futures.push(self.get_pool_info_existing_with_opts(bucket, &obj.object_name, &opt)); - } + let mut jhs = Vec::new(); + let semaphore = Arc::new(Semaphore::new(num_cpus::get())); + let pools = Arc::new(self.pools.clone()); - let results = join_all(futures).await; + for obj in objects.iter() { + let (semaphore, pools, bucket, object_name, opt) = ( + semaphore.clone(), + pools.clone(), + bucket.to_string(), + obj.object_name.to_string(), + ObjectOptions::default(), + ); + + let jh = tokio::spawn(async move { + let _permit = semaphore.acquire().await.unwrap(); + internal_get_pool_info_existing_with_opts(pools.as_ref(), &bucket, &object_name, &opt).await + }); + jhs.push(jh); + } + let mut results = Vec::new(); + for jh in jhs { + results.push(jh.await.unwrap()); + } // 记录pool Index 对应的objects pool_idx -> objects idx let mut pool_index_objects = HashMap::new(); - let mut i = 0; - for res in results { + for (i, res) in results.into_iter().enumerate() { match res { Ok((pinfo, _)) => { if pinfo.object_info.delete_marker && opts.version_id.is_empty() { @@ -601,8 +626,6 @@ impl StorageAPI for ECStore { del_errs[i] = Some(e) } } - - i += 1; } if !pool_index_objects.is_empty() { @@ -615,16 +638,7 @@ impl StorageAPI for ECStore { let obj_idxs = vals.unwrap(); // 取对应obj,理论上不会none - let objs: Vec = obj_idxs - .iter() - .filter_map(|&idx| { - if let Some(obj) = objects.get(idx) { - Some(obj.clone()) - } else { - None - } - }) - .collect(); + let objs: Vec = obj_idxs.iter().filter_map(|&idx| objects.get(idx).cloned()).collect(); if objs.is_empty() { continue; @@ -633,8 +647,7 @@ impl StorageAPI for ECStore { let (pdel_objs, perrs) = sets.delete_objects(bucket, objs, opts.clone()).await?; // perrs的顺序理论上跟obj_idxs顺序一致 - let mut i = 0; - for err in perrs { + for (i, err) in perrs.into_iter().enumerate() { let obj_idx = obj_idxs[i]; if err.is_some() { @@ -645,8 +658,6 @@ impl StorageAPI for ECStore { dobj.object_name = utils::path::decode_dir_object(&dobj.object_name); del_objects[obj_idx] = dobj; - - i += 1; } } } @@ -655,7 +666,7 @@ impl StorageAPI for ECStore { } async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result { if opts.delete_prefix { - self.delete_prefix(bucket, &object).await?; + self.delete_prefix(bucket, object).await?; return Ok(ObjectInfo::default()); }