From 1d069fd35154502140a8cf2d766b7a583fd94fd2 Mon Sep 17 00:00:00 2001 From: weisd Date: Thu, 23 Oct 2025 17:21:55 +0800 Subject: [PATCH] Improve the peer client (#693) --- crates/ecstore/src/rpc/peer_s3_client.rs | 168 +++++++++++++++-------- 1 file changed, 111 insertions(+), 57 deletions(-) diff --git a/crates/ecstore/src/rpc/peer_s3_client.rs b/crates/ecstore/src/rpc/peer_s3_client.rs index 9518132d..c7f36eac 100644 --- a/crates/ecstore/src/rpc/peer_s3_client.rs +++ b/crates/ecstore/src/rpc/peer_s3_client.rs @@ -94,11 +94,11 @@ impl S3PeerSys { let mut pool_errs = Vec::new(); for pool_idx in 0..self.pools_count { - let mut per_pool_errs = Vec::new(); + let mut per_pool_errs = vec![None; self.clients.len()]; for (i, client) in self.clients.iter().enumerate() { if let Some(v) = client.get_pools() { if v.contains(&pool_idx) { - per_pool_errs.push(errs[i].clone()); + per_pool_errs[i] = errs[i].clone(); } } } @@ -129,20 +129,28 @@ impl S3PeerSys { let errs = join_all(futures).await; for pool_idx in 0..self.pools_count { - let mut per_pool_errs = Vec::new(); + let mut per_pool_errs = vec![None; self.clients.len()]; for (i, client) in self.clients.iter().enumerate() { if let Some(v) = client.get_pools() { if v.contains(&pool_idx) { - per_pool_errs.push(errs[i].clone()); + per_pool_errs[i] = errs[i].clone(); } } } let qu = per_pool_errs.len() / 2; if let Some(pool_err) = reduce_write_quorum_errs(&per_pool_errs, BUCKET_OP_IGNORED_ERRS, qu) { + tracing::error!("heal_bucket per_pool_errs: {per_pool_errs:?}"); + tracing::error!("heal_bucket reduce_write_quorum_errs: {pool_err}"); return Err(pool_err); } } + if let Some(err) = reduce_write_quorum_errs(&errs, BUCKET_OP_IGNORED_ERRS, (errs.len() / 2) + 1) { + tracing::error!("heal_bucket errs: {errs:?}"); + tracing::error!("heal_bucket reduce_write_quorum_errs: {err}"); + return Err(err); + } + for (i, err) in errs.iter().enumerate() { if err.is_none() { return Ok(heal_bucket_results.read().await[i].clone()); @@ -157,34 +165,36 @@ impl S3PeerSys { futures.push(cli.make_bucket(bucket, opts)); } - let mut errors = Vec::with_capacity(self.clients.len()); + let mut errors = vec![None; self.clients.len()]; let results = join_all(futures).await; - for result in results { + for (i, result) in results.into_iter().enumerate() { match result { Ok(_) => { - errors.push(None); + errors[i] = None; } Err(e) => { - errors.push(Some(e)); + errors[i] = Some(e); } } } for i in 0..self.pools_count { - let mut per_pool_errs = Vec::with_capacity(self.clients.len()); + let mut per_pool_errs = vec![None; self.clients.len()]; for (j, cli) in self.clients.iter().enumerate() { let pools = cli.get_pools(); let idx = i; if pools.unwrap_or_default().contains(&idx) { - per_pool_errs.push(errors[j].clone()); + per_pool_errs[j] = errors[j].clone(); } + } - if let Some(pool_err) = - reduce_write_quorum_errs(&per_pool_errs, BUCKET_OP_IGNORED_ERRS, (per_pool_errs.len() / 2) + 1) - { - return Err(pool_err); - } + if let Some(pool_err) = + reduce_write_quorum_errs(&per_pool_errs, BUCKET_OP_IGNORED_ERRS, (per_pool_errs.len() / 2) + 1) + { + tracing::error!("make_bucket per_pool_errs: {per_pool_errs:?}"); + tracing::error!("make_bucket reduce_write_quorum_errs: {pool_err}"); + return Err(pool_err); } } @@ -196,42 +206,74 @@ impl S3PeerSys { futures.push(cli.list_bucket(opts)); } - let mut errors = Vec::with_capacity(self.clients.len()); - let mut ress = Vec::with_capacity(self.clients.len()); + let mut errors = vec![None; self.clients.len()]; + let mut node_buckets = vec![None; self.clients.len()]; let results = join_all(futures).await; - for result in results { + for (i, result) in results.into_iter().enumerate() { match result { Ok(res) => { - ress.push(Some(res)); - errors.push(None); + node_buckets[i] = Some(res); + errors[i] = None; } Err(e) => { - ress.push(None); - errors.push(Some(e)); - } - } - } - // TODO: reduceWriteQuorumErrs - // for i in 0..self.pools_count {} - - let mut uniq_map: HashMap<&String, &BucketInfo> = HashMap::new(); - - for res in ress.iter() { - if res.is_none() { - continue; - } - - let buckets = res.as_ref().unwrap(); - - for bucket in buckets.iter() { - if !uniq_map.contains_key(&bucket.name) { - uniq_map.insert(&bucket.name, bucket); + node_buckets[i] = None; + errors[i] = Some(e); } } } - let buckets: Vec = uniq_map.values().map(|&v| v.clone()).collect(); + let mut result_map: HashMap<&String, BucketInfo> = HashMap::new(); + for i in 0..self.pools_count { + let mut per_pool_errs = vec![None; self.clients.len()]; + for (j, cli) in self.clients.iter().enumerate() { + let pools = cli.get_pools(); + let idx = i; + if pools.unwrap_or_default().contains(&idx) { + per_pool_errs[j] = errors[j].clone(); + } + } + + let quorum = per_pool_errs.len() / 2; + + if let Some(pool_err) = reduce_write_quorum_errs(&per_pool_errs, BUCKET_OP_IGNORED_ERRS, quorum) { + tracing::error!("list_bucket per_pool_errs: {per_pool_errs:?}"); + tracing::error!("list_bucket reduce_write_quorum_errs: {pool_err}"); + return Err(pool_err); + } + + let mut bucket_map: HashMap<&String, usize> = HashMap::new(); + for (j, node_bucket) in node_buckets.iter().enumerate() { + if let Some(buckets) = node_bucket.as_ref() { + if buckets.is_empty() { + continue; + } + + if !self.clients[j].get_pools().unwrap_or_default().contains(&i) { + continue; + } + + for bucket in buckets.iter() { + if result_map.contains_key(&bucket.name) { + continue; + } + + // incr bucket_map count create if not exists + let count = bucket_map.entry(&bucket.name).or_insert(0usize); + *count += 1; + + if *count >= quorum { + result_map.insert(&bucket.name, bucket.clone()); + } + } + } + } + // TODO: MRF + } + + let mut buckets: Vec = result_map.into_values().collect(); + + buckets.sort_by_key(|b| b.name.clone()); Ok(buckets) } @@ -241,22 +283,27 @@ impl S3PeerSys { futures.push(cli.delete_bucket(bucket, opts)); } - let mut errors = Vec::with_capacity(self.clients.len()); + let mut errors = vec![None; self.clients.len()]; let results = join_all(futures).await; - for result in results { + for (i, result) in results.into_iter().enumerate() { match result { Ok(_) => { - errors.push(None); + errors[i] = None; } Err(e) => { - errors.push(Some(e)); + errors[i] = Some(e); } } } - // TODO: reduceWriteQuorumErrs + if let Some(err) = reduce_write_quorum_errs(&errors, BUCKET_OP_IGNORED_ERRS, (errors.len() / 2) + 1) { + if !Error::is_err_object_not_found(&err) && !opts.no_recreate { + let _ = self.make_bucket(bucket, &MakeBucketOptions::default()).await; + } + return Err(err); + } Ok(()) } @@ -266,37 +313,44 @@ impl S3PeerSys { futures.push(cli.get_bucket_info(bucket, opts)); } - let mut ress = Vec::with_capacity(self.clients.len()); - let mut errors = Vec::with_capacity(self.clients.len()); + let mut ress = vec![None; self.clients.len()]; + let mut errors = vec![None; self.clients.len()]; let results = join_all(futures).await; - for result in results { + for (i, result) in results.into_iter().enumerate() { match result { Ok(res) => { - ress.push(Some(res)); - errors.push(None); + ress[i] = Some(res); + errors[i] = None; } Err(e) => { - ress.push(None); - errors.push(Some(e)); + ress[i] = None; + errors[i] = Some(e); } } } for i in 0..self.pools_count { - let mut per_pool_errs = Vec::with_capacity(self.clients.len()); + let mut per_pool_errs = vec![None; self.clients.len()]; for (j, cli) in self.clients.iter().enumerate() { let pools = cli.get_pools(); let idx = i; if pools.unwrap_or_default().contains(&idx) { - per_pool_errs.push(errors[j].as_ref()); + per_pool_errs[j] = errors[j].clone(); } + } - // TODO: reduceWriteQuorumErrs + if let Some(pool_err) = + reduce_write_quorum_errs(&per_pool_errs, BUCKET_OP_IGNORED_ERRS, (per_pool_errs.len() / 2) + 1) + { + return Err(pool_err); } } - ress.iter().find_map(|op| op.clone()).ok_or(Error::VolumeNotFound) + ress.into_iter() + .filter(|op| op.is_some()) + .find_map(|op| op.clone()) + .ok_or(Error::VolumeNotFound) } pub fn get_pools(&self) -> Option> {