Improve the peer client (#693)

This commit is contained in:
weisd
2025-10-23 17:21:55 +08:00
committed by GitHub
parent 416d3ad5b7
commit 1d069fd351

View File

@@ -94,11 +94,11 @@ impl S3PeerSys {
let mut pool_errs = Vec::new();
for pool_idx in 0..self.pools_count {
let mut per_pool_errs = Vec::new();
let mut per_pool_errs = vec![None; self.clients.len()];
for (i, client) in self.clients.iter().enumerate() {
if let Some(v) = client.get_pools() {
if v.contains(&pool_idx) {
per_pool_errs.push(errs[i].clone());
per_pool_errs[i] = errs[i].clone();
}
}
}
@@ -129,20 +129,28 @@ impl S3PeerSys {
let errs = join_all(futures).await;
for pool_idx in 0..self.pools_count {
let mut per_pool_errs = Vec::new();
let mut per_pool_errs = vec![None; self.clients.len()];
for (i, client) in self.clients.iter().enumerate() {
if let Some(v) = client.get_pools() {
if v.contains(&pool_idx) {
per_pool_errs.push(errs[i].clone());
per_pool_errs[i] = errs[i].clone();
}
}
}
let qu = per_pool_errs.len() / 2;
if let Some(pool_err) = reduce_write_quorum_errs(&per_pool_errs, BUCKET_OP_IGNORED_ERRS, qu) {
tracing::error!("heal_bucket per_pool_errs: {per_pool_errs:?}");
tracing::error!("heal_bucket reduce_write_quorum_errs: {pool_err}");
return Err(pool_err);
}
}
if let Some(err) = reduce_write_quorum_errs(&errs, BUCKET_OP_IGNORED_ERRS, (errs.len() / 2) + 1) {
tracing::error!("heal_bucket errs: {errs:?}");
tracing::error!("heal_bucket reduce_write_quorum_errs: {err}");
return Err(err);
}
for (i, err) in errs.iter().enumerate() {
if err.is_none() {
return Ok(heal_bucket_results.read().await[i].clone());
@@ -157,34 +165,36 @@ impl S3PeerSys {
futures.push(cli.make_bucket(bucket, opts));
}
let mut errors = Vec::with_capacity(self.clients.len());
let mut errors = vec![None; self.clients.len()];
let results = join_all(futures).await;
for result in results {
for (i, result) in results.into_iter().enumerate() {
match result {
Ok(_) => {
errors.push(None);
errors[i] = None;
}
Err(e) => {
errors.push(Some(e));
errors[i] = Some(e);
}
}
}
for i in 0..self.pools_count {
let mut per_pool_errs = Vec::with_capacity(self.clients.len());
let mut per_pool_errs = vec![None; self.clients.len()];
for (j, cli) in self.clients.iter().enumerate() {
let pools = cli.get_pools();
let idx = i;
if pools.unwrap_or_default().contains(&idx) {
per_pool_errs.push(errors[j].clone());
per_pool_errs[j] = errors[j].clone();
}
}
if let Some(pool_err) =
reduce_write_quorum_errs(&per_pool_errs, BUCKET_OP_IGNORED_ERRS, (per_pool_errs.len() / 2) + 1)
{
return Err(pool_err);
}
if let Some(pool_err) =
reduce_write_quorum_errs(&per_pool_errs, BUCKET_OP_IGNORED_ERRS, (per_pool_errs.len() / 2) + 1)
{
tracing::error!("make_bucket per_pool_errs: {per_pool_errs:?}");
tracing::error!("make_bucket reduce_write_quorum_errs: {pool_err}");
return Err(pool_err);
}
}
@@ -196,42 +206,74 @@ impl S3PeerSys {
futures.push(cli.list_bucket(opts));
}
let mut errors = Vec::with_capacity(self.clients.len());
let mut ress = Vec::with_capacity(self.clients.len());
let mut errors = vec![None; self.clients.len()];
let mut node_buckets = vec![None; self.clients.len()];
let results = join_all(futures).await;
for result in results {
for (i, result) in results.into_iter().enumerate() {
match result {
Ok(res) => {
ress.push(Some(res));
errors.push(None);
node_buckets[i] = Some(res);
errors[i] = None;
}
Err(e) => {
ress.push(None);
errors.push(Some(e));
}
}
}
// TODO: reduceWriteQuorumErrs
// for i in 0..self.pools_count {}
let mut uniq_map: HashMap<&String, &BucketInfo> = HashMap::new();
for res in ress.iter() {
if res.is_none() {
continue;
}
let buckets = res.as_ref().unwrap();
for bucket in buckets.iter() {
if !uniq_map.contains_key(&bucket.name) {
uniq_map.insert(&bucket.name, bucket);
node_buckets[i] = None;
errors[i] = Some(e);
}
}
}
let buckets: Vec<BucketInfo> = uniq_map.values().map(|&v| v.clone()).collect();
let mut result_map: HashMap<&String, BucketInfo> = HashMap::new();
for i in 0..self.pools_count {
let mut per_pool_errs = vec![None; self.clients.len()];
for (j, cli) in self.clients.iter().enumerate() {
let pools = cli.get_pools();
let idx = i;
if pools.unwrap_or_default().contains(&idx) {
per_pool_errs[j] = errors[j].clone();
}
}
let quorum = per_pool_errs.len() / 2;
if let Some(pool_err) = reduce_write_quorum_errs(&per_pool_errs, BUCKET_OP_IGNORED_ERRS, quorum) {
tracing::error!("list_bucket per_pool_errs: {per_pool_errs:?}");
tracing::error!("list_bucket reduce_write_quorum_errs: {pool_err}");
return Err(pool_err);
}
let mut bucket_map: HashMap<&String, usize> = HashMap::new();
for (j, node_bucket) in node_buckets.iter().enumerate() {
if let Some(buckets) = node_bucket.as_ref() {
if buckets.is_empty() {
continue;
}
if !self.clients[j].get_pools().unwrap_or_default().contains(&i) {
continue;
}
for bucket in buckets.iter() {
if result_map.contains_key(&bucket.name) {
continue;
}
// incr bucket_map count create if not exists
let count = bucket_map.entry(&bucket.name).or_insert(0usize);
*count += 1;
if *count >= quorum {
result_map.insert(&bucket.name, bucket.clone());
}
}
}
}
// TODO: MRF
}
let mut buckets: Vec<BucketInfo> = result_map.into_values().collect();
buckets.sort_by_key(|b| b.name.clone());
Ok(buckets)
}
@@ -241,22 +283,27 @@ impl S3PeerSys {
futures.push(cli.delete_bucket(bucket, opts));
}
let mut errors = Vec::with_capacity(self.clients.len());
let mut errors = vec![None; self.clients.len()];
let results = join_all(futures).await;
for result in results {
for (i, result) in results.into_iter().enumerate() {
match result {
Ok(_) => {
errors.push(None);
errors[i] = None;
}
Err(e) => {
errors.push(Some(e));
errors[i] = Some(e);
}
}
}
// TODO: reduceWriteQuorumErrs
if let Some(err) = reduce_write_quorum_errs(&errors, BUCKET_OP_IGNORED_ERRS, (errors.len() / 2) + 1) {
if !Error::is_err_object_not_found(&err) && !opts.no_recreate {
let _ = self.make_bucket(bucket, &MakeBucketOptions::default()).await;
}
return Err(err);
}
Ok(())
}
@@ -266,37 +313,44 @@ impl S3PeerSys {
futures.push(cli.get_bucket_info(bucket, opts));
}
let mut ress = Vec::with_capacity(self.clients.len());
let mut errors = Vec::with_capacity(self.clients.len());
let mut ress = vec![None; self.clients.len()];
let mut errors = vec![None; self.clients.len()];
let results = join_all(futures).await;
for result in results {
for (i, result) in results.into_iter().enumerate() {
match result {
Ok(res) => {
ress.push(Some(res));
errors.push(None);
ress[i] = Some(res);
errors[i] = None;
}
Err(e) => {
ress.push(None);
errors.push(Some(e));
ress[i] = None;
errors[i] = Some(e);
}
}
}
for i in 0..self.pools_count {
let mut per_pool_errs = Vec::with_capacity(self.clients.len());
let mut per_pool_errs = vec![None; self.clients.len()];
for (j, cli) in self.clients.iter().enumerate() {
let pools = cli.get_pools();
let idx = i;
if pools.unwrap_or_default().contains(&idx) {
per_pool_errs.push(errors[j].as_ref());
per_pool_errs[j] = errors[j].clone();
}
}
// TODO: reduceWriteQuorumErrs
if let Some(pool_err) =
reduce_write_quorum_errs(&per_pool_errs, BUCKET_OP_IGNORED_ERRS, (per_pool_errs.len() / 2) + 1)
{
return Err(pool_err);
}
}
ress.iter().find_map(|op| op.clone()).ok_or(Error::VolumeNotFound)
ress.into_iter()
.filter(|op| op.is_some())
.find_map(|op| op.clone())
.ok_or(Error::VolumeNotFound)
}
pub fn get_pools(&self) -> Option<Vec<usize>> {