Merge pull request #58 from rustfs/jimchen/perf-delete_objects

优化并发删除对象
This commit is contained in:
loverustfs
2024-09-24 11:03:51 +08:00
committed by GitHub
4 changed files with 131 additions and 112 deletions

1
Cargo.lock generated
View File

@@ -447,6 +447,7 @@ dependencies = [
"http",
"lazy_static",
"netif",
"num_cpus",
"openssl",
"path-absolutize",
"path-clean",

View File

@@ -38,13 +38,14 @@ base64-simd = "0.8.0"
sha2 = "0.10.8"
hex-simd = "0.8.0"
path-clean = "1.0.1"
tokio = { workspace = true, features = ["io-util"] }
tokio = { workspace = true, features = ["io-util", "sync"] }
tokio-stream = "0.1.15"
tonic.workspace = true
tower.workspace = true
rmp = "0.8.14"
byteorder = "1.5.0"
xxhash-rust = { version = "0.8.12", features = ["xxh64"] }
num_cpus = "1.16"
[target.'cfg(not(windows))'.dependencies]
openssl = "0.10.66"

View File

@@ -1,7 +1,9 @@
use std::collections::HashMap;
#![allow(clippy::map_entry)]
use std::{collections::HashMap, sync::Arc};
use futures::future::join_all;
use http::HeaderMap;
use tokio::sync::Semaphore;
use tracing::warn;
use uuid::Uuid;
@@ -22,7 +24,7 @@ use crate::{
utils::hash,
};
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Sets {
pub id: Uuid,
// pub sets: Vec<Objects>,
@@ -205,8 +207,7 @@ impl StorageAPI for Sets {
let mut set_obj_map = HashMap::new();
// hash key
let mut i = 0;
for obj in objects.iter() {
for (i, obj) in objects.iter().enumerate() {
let idx = self.get_hashed_set_index(obj.object_name.as_str());
if !set_obj_map.contains_key(&idx) {
@@ -218,35 +219,40 @@ impl StorageAPI for Sets {
obj: obj.clone(),
}],
);
} else {
if let Some(val) = set_obj_map.get_mut(&idx) {
val.push(DelObj {
// set_idx: idx,
orig_idx: i,
obj: obj.clone(),
});
}
} else if let Some(val) = set_obj_map.get_mut(&idx) {
val.push(DelObj {
// set_idx: idx,
orig_idx: i,
obj: obj.clone(),
});
}
i += 1;
}
// TODO: 并发
let semaphore = Arc::new(Semaphore::new(num_cpus::get()));
let mut jhs = Vec::with_capacity(semaphore.available_permits());
for (k, v) in set_obj_map {
let disks = self.get_disks(k);
let objs: Vec<ObjectToDelete> = v.iter().map(|v| v.obj.clone()).collect();
let (dobjects, errs) = disks.delete_objects(bucket, objs, opts.clone()).await?;
let semaphore = semaphore.clone();
let opts = opts.clone();
let bucket = bucket.to_string();
let mut i = 0;
for err in errs {
let obj = v.get(i).unwrap();
let jh = tokio::spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
let objs: Vec<ObjectToDelete> = v.iter().map(|v| v.obj.clone()).collect();
disks.delete_objects(&bucket, objs, opts).await
});
jhs.push(jh);
}
del_errs[obj.orig_idx] = err;
let mut results = Vec::with_capacity(jhs.len());
for jh in jhs {
results.push(jh.await?.unwrap());
}
del_objects[obj.orig_idx] = dobjects.get(i).unwrap().clone();
i += 1;
}
for (dobjects, errs) in results {
del_objects.extend(dobjects);
del_errs.extend(errs);
}
Ok((del_objects, del_errs))

View File

@@ -1,3 +1,4 @@
#![allow(clippy::map_entry)]
use crate::{
bucket_meta::BucketMetadata,
disk::{
@@ -24,7 +25,10 @@ use std::{
time::Duration,
};
use time::OffsetDateTime;
use tokio::{fs, sync::RwLock};
use tokio::{
fs,
sync::{RwLock, Semaphore},
};
use tracing::{debug, info, warn};
use uuid::Uuid;
@@ -51,10 +55,11 @@ pub async fn update_erasure_type(setup_type: SetupType) {
*is_erasure_sd = setup_type == SetupType::ErasureSD;
}
type TypeLocalDiskSetDrives = Vec<Vec<Vec<Option<DiskStore>>>>;
lazy_static! {
pub static ref GLOBAL_LOCAL_DISK_MAP: Arc<RwLock<HashMap<String, Option<DiskStore>>>> = Arc::new(RwLock::new(HashMap::new()));
pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc<RwLock<Vec<Vec<Vec<Option<DiskStore>>>>>> =
Arc::new(RwLock::new(Vec::new()));
pub static ref GLOBAL_LOCAL_DISK_SET_DRIVES: Arc<RwLock<TypeLocalDiskSetDrives>> = Arc::new(RwLock::new(Vec::new()));
}
pub async fn find_local_disk(disk_path: &String) -> Option<DiskStore> {
@@ -76,7 +81,7 @@ pub async fn find_local_disk(disk_path: &String) -> Option<DiskStore> {
pub async fn all_local_disk_path() -> Vec<String> {
let disk_map = GLOBAL_LOCAL_DISK_MAP.read().await;
disk_map.keys().map(|v| v.clone()).collect()
disk_map.keys().cloned().collect()
}
pub async fn all_local_disk() -> Vec<DiskStore> {
@@ -156,6 +161,7 @@ pub struct ECStore {
}
impl ECStore {
#[allow(clippy::new_ret_no_self)]
pub async fn new(_address: String, endpoint_pools: EndpointServerPools) -> Result<()> {
// let layouts = DisksLayout::try_from(endpoints.as_slice())?;
@@ -318,8 +324,8 @@ impl ECStore {
if entry.is_object() {
let fi = entry.to_fileinfo(&opts.bucket)?;
if fi.is_some() {
ress.push(fi.unwrap().into_object_info(&opts.bucket, &entry.name, false));
if let Some(f) = fi {
ress.push(f.into_object_info(&opts.bucket, &entry.name, false));
}
continue;
}
@@ -391,62 +397,68 @@ impl ECStore {
object: &str,
opts: &ObjectOptions,
) -> Result<(PoolObjInfo, Vec<Error>)> {
let mut futures = Vec::new();
for pool in self.pools.iter() {
futures.push(pool.get_object_info(bucket, object, opts));
}
let results = join_all(futures).await;
let mut ress = Vec::new();
let mut i = 0;
// join_all结果跟输入顺序一致
for res in results {
let index = i;
match res {
Ok(r) => {
ress.push(PoolObjInfo {
index,
object_info: r,
err: None,
});
}
Err(e) => {
ress.push(PoolObjInfo {
index,
err: Some(e),
..Default::default()
});
}
}
i += 1;
}
ress.sort_by(|a, b| {
let at = a.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH);
let bt = b.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH);
at.cmp(&bt)
});
for res in ress {
// check
if res.err.is_none() {
// TODO: let errs = self.poolsWithObject()
return Ok((res, Vec::new()));
}
}
let ret = PoolObjInfo::default();
Ok((ret, Vec::new()))
internal_get_pool_info_existing_with_opts(&self.pools, bucket, object, opts).await
}
}
async fn internal_get_pool_info_existing_with_opts(
pools: &[Sets],
bucket: &str,
object: &str,
opts: &ObjectOptions,
) -> Result<(PoolObjInfo, Vec<Error>)> {
let mut futures = Vec::new();
for pool in pools.iter() {
futures.push(pool.get_object_info(bucket, object, opts));
}
let results = join_all(futures).await;
let mut ress = Vec::new();
// join_all结果跟输入顺序一致
for (i, res) in results.into_iter().enumerate() {
let index = i;
match res {
Ok(r) => {
ress.push(PoolObjInfo {
index,
object_info: r,
err: None,
});
}
Err(e) => {
ress.push(PoolObjInfo {
index,
err: Some(e),
..Default::default()
});
}
}
}
ress.sort_by(|a, b| {
let at = a.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH);
let bt = b.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH);
at.cmp(&bt)
});
for res in ress {
// check
if res.err.is_none() {
// TODO: let errs = self.poolsWithObject()
return Ok((res, Vec::new()));
}
}
let ret = PoolObjInfo::default();
Ok((ret, Vec::new()))
}
#[derive(Debug, Default)]
pub struct PoolObjInfo {
pub index: usize,
@@ -559,21 +571,34 @@ impl StorageAPI for ECStore {
del_errs.push(None)
}
// TODO: limte 限制并发数量
let opt = ObjectOptions::default();
// 取所有poolObjInfo
let mut futures = Vec::new();
for obj in objects.iter() {
futures.push(self.get_pool_info_existing_with_opts(bucket, &obj.object_name, &opt));
}
let mut jhs = Vec::new();
let semaphore = Arc::new(Semaphore::new(num_cpus::get()));
let pools = Arc::new(self.pools.clone());
let results = join_all(futures).await;
for obj in objects.iter() {
let (semaphore, pools, bucket, object_name, opt) = (
semaphore.clone(),
pools.clone(),
bucket.to_string(),
obj.object_name.to_string(),
ObjectOptions::default(),
);
let jh = tokio::spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
internal_get_pool_info_existing_with_opts(pools.as_ref(), &bucket, &object_name, &opt).await
});
jhs.push(jh);
}
let mut results = Vec::new();
for jh in jhs {
results.push(jh.await.unwrap());
}
// 记录pool Index 对应的objects pool_idx -> objects idx
let mut pool_index_objects = HashMap::new();
let mut i = 0;
for res in results {
for (i, res) in results.into_iter().enumerate() {
match res {
Ok((pinfo, _)) => {
if pinfo.object_info.delete_marker && opts.version_id.is_empty() {
@@ -601,8 +626,6 @@ impl StorageAPI for ECStore {
del_errs[i] = Some(e)
}
}
i += 1;
}
if !pool_index_objects.is_empty() {
@@ -615,16 +638,7 @@ impl StorageAPI for ECStore {
let obj_idxs = vals.unwrap();
// 取对应obj,理论上不会none
let objs: Vec<ObjectToDelete> = obj_idxs
.iter()
.filter_map(|&idx| {
if let Some(obj) = objects.get(idx) {
Some(obj.clone())
} else {
None
}
})
.collect();
let objs: Vec<ObjectToDelete> = obj_idxs.iter().filter_map(|&idx| objects.get(idx).cloned()).collect();
if objs.is_empty() {
continue;
@@ -633,8 +647,7 @@ impl StorageAPI for ECStore {
let (pdel_objs, perrs) = sets.delete_objects(bucket, objs, opts.clone()).await?;
// perrs的顺序理论上跟obj_idxs顺序一致
let mut i = 0;
for err in perrs {
for (i, err) in perrs.into_iter().enumerate() {
let obj_idx = obj_idxs[i];
if err.is_some() {
@@ -645,8 +658,6 @@ impl StorageAPI for ECStore {
dobj.object_name = utils::path::decode_dir_object(&dobj.object_name);
del_objects[obj_idx] = dobj;
i += 1;
}
}
}
@@ -655,7 +666,7 @@ impl StorageAPI for ECStore {
}
async fn delete_object(&self, bucket: &str, object: &str, opts: ObjectOptions) -> Result<ObjectInfo> {
if opts.delete_prefix {
self.delete_prefix(bucket, &object).await?;
self.delete_prefix(bucket, object).await?;
return Ok(ObjectInfo::default());
}