mirror of
https://github.com/rustfs/rustfs.git
synced 2026-03-17 14:24:08 +00:00
fix(common): remove panic paths in runtime helpers (#2116)
Co-authored-by: houseme <housemecn@gmail.com> Co-authored-by: heihutu <30542132+heihutu@users.noreply.github.com>
This commit is contained in:
@@ -458,10 +458,7 @@ impl DataUsageEntry {
|
||||
self.size += other.size;
|
||||
|
||||
if let Some(o_rep) = &other.replication_stats {
|
||||
if self.replication_stats.is_none() {
|
||||
self.replication_stats = Some(ReplicationAllStats::default());
|
||||
}
|
||||
let s_rep = self.replication_stats.as_mut().unwrap();
|
||||
let s_rep = self.replication_stats.get_or_insert_with(ReplicationAllStats::default);
|
||||
s_rep.targets.clear();
|
||||
s_rep.replica_size += o_rep.replica_size;
|
||||
s_rep.replica_count += o_rep.replica_count;
|
||||
@@ -586,7 +583,7 @@ impl DataUsageCache {
|
||||
return Some(root);
|
||||
}
|
||||
let mut flat = self.flatten(&root);
|
||||
if flat.replication_stats.is_some() && flat.replication_stats.as_ref().unwrap().empty() {
|
||||
if flat.replication_stats.as_ref().is_some_and(|stats| stats.empty()) {
|
||||
flat.replication_stats = None;
|
||||
}
|
||||
Some(flat)
|
||||
@@ -679,7 +676,9 @@ impl DataUsageCache {
|
||||
leaves.sort_by(|a, b| a.objects.cmp(&b.objects));
|
||||
|
||||
while remove > 0 && !leaves.is_empty() {
|
||||
let e = leaves.first().unwrap();
|
||||
let Some(e) = leaves.first() else {
|
||||
break;
|
||||
};
|
||||
let candidate = e.path.clone();
|
||||
if candidate == *path && !compact_self {
|
||||
break;
|
||||
@@ -703,12 +702,9 @@ impl DataUsageCache {
|
||||
}
|
||||
|
||||
pub fn total_children_rec(&self, path: &str) -> usize {
|
||||
let root = self.find(path);
|
||||
|
||||
if root.is_none() {
|
||||
let Some(root) = self.find(path) else {
|
||||
return 0;
|
||||
}
|
||||
let root = root.unwrap();
|
||||
};
|
||||
if root.children.is_empty() {
|
||||
return 0;
|
||||
}
|
||||
@@ -721,31 +717,36 @@ impl DataUsageCache {
|
||||
}
|
||||
|
||||
pub fn merge(&mut self, o: &DataUsageCache) {
|
||||
let mut existing_root = self.root();
|
||||
let other_root = o.root();
|
||||
if existing_root.is_none() && other_root.is_none() {
|
||||
let Some(mut existing_root) = self.root() else {
|
||||
if o.root().is_none() {
|
||||
return;
|
||||
}
|
||||
if other_root.is_none() {
|
||||
return;
|
||||
}
|
||||
if existing_root.is_none() {
|
||||
*self = o.clone();
|
||||
return;
|
||||
}
|
||||
if o.info.last_update.gt(&self.info.last_update) {
|
||||
};
|
||||
|
||||
let Some(other_root) = o.root() else {
|
||||
return;
|
||||
};
|
||||
|
||||
if o.info.last_update > self.info.last_update {
|
||||
self.info.last_update = o.info.last_update;
|
||||
}
|
||||
|
||||
existing_root.as_mut().unwrap().merge(other_root.as_ref().unwrap());
|
||||
self.cache.insert(hash_path(&self.info.name).key(), existing_root.unwrap());
|
||||
let e_hash = self.root_hash();
|
||||
for key in other_root.as_ref().unwrap().children.iter() {
|
||||
let entry = &o.cache[key];
|
||||
existing_root.merge(&other_root);
|
||||
self.cache.insert(hash_path(&self.info.name).key(), existing_root);
|
||||
|
||||
let root_hash = self.root_hash();
|
||||
for key in other_root.children.iter() {
|
||||
let Some(entry) = o.cache.get(key) else {
|
||||
continue;
|
||||
};
|
||||
let flat = o.flatten(entry);
|
||||
let mut existing = self.cache[key].clone();
|
||||
if let Some(existing) = self.cache.get_mut(key) {
|
||||
existing.merge(&flat);
|
||||
self.replace_hashed(&DataUsageHash(key.clone()), &Some(e_hash.clone()), &existing);
|
||||
} else {
|
||||
self.replace_hashed(&DataUsageHash(key.clone()), &Some(root_hash.clone()), &flat);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1141,10 +1142,12 @@ impl DataUsageInfo {
|
||||
self.buckets_count = self.buckets_usage.len() as u64;
|
||||
|
||||
// Update last update time
|
||||
if let Some(other_update) = other.last_update
|
||||
&& (self.last_update.is_none() || other_update > self.last_update.unwrap())
|
||||
{
|
||||
self.last_update = Some(other_update);
|
||||
if let Some(other_update) = other.last_update {
|
||||
match self.last_update {
|
||||
None => self.last_update = Some(other_update),
|
||||
Some(self_update) if other_update > self_update => self.last_update = Some(other_update),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1285,4 +1288,59 @@ mod tests {
|
||||
assert_eq!(summary1.total_size, 300);
|
||||
assert_eq!(summary1.versions, 15);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_data_usage_cache_merge_adds_missing_child() {
|
||||
let mut base = DataUsageCache::default();
|
||||
base.info.name = "bucket".to_string();
|
||||
base.replace("bucket", "", DataUsageEntry::default());
|
||||
|
||||
let mut other = DataUsageCache::default();
|
||||
other.info.name = "bucket".to_string();
|
||||
let child = DataUsageEntry {
|
||||
size: 42,
|
||||
..Default::default()
|
||||
};
|
||||
other.replace("bucket/child", "bucket", child);
|
||||
|
||||
base.merge(&other);
|
||||
|
||||
let root = base.find("bucket").expect("root bucket should exist");
|
||||
assert_eq!(root.size, 0);
|
||||
let child_entry = base.find("bucket/child").expect("merged child should be added");
|
||||
assert_eq!(child_entry.size, 42);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_data_usage_cache_merge_accumulates_existing_child() {
|
||||
let mut base = DataUsageCache::default();
|
||||
base.info.name = "bucket".to_string();
|
||||
base.replace(
|
||||
"bucket/child",
|
||||
"bucket",
|
||||
DataUsageEntry {
|
||||
size: 10,
|
||||
objects: 1,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
let mut other = DataUsageCache::default();
|
||||
other.info.name = "bucket".to_string();
|
||||
other.replace(
|
||||
"bucket/child",
|
||||
"bucket",
|
||||
DataUsageEntry {
|
||||
size: 20,
|
||||
objects: 2,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
base.merge(&other);
|
||||
|
||||
let child_entry = base.find("bucket/child").expect("child should remain after merge");
|
||||
assert_eq!(child_entry.size, 30);
|
||||
assert_eq!(child_entry.objects, 3);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,12 +296,13 @@ type HealResponseSender = broadcast::Sender<HealChannelResponse>;
|
||||
static GLOBAL_HEAL_RESPONSE_SENDER: OnceLock<HealResponseSender> = OnceLock::new();
|
||||
|
||||
/// Initialize global heal channel
|
||||
pub fn init_heal_channel() -> HealChannelReceiver {
|
||||
pub fn init_heal_channel() -> Result<HealChannelReceiver, &'static str> {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
GLOBAL_HEAL_CHANNEL_SENDER
|
||||
.set(tx)
|
||||
.expect("Heal channel sender already initialized");
|
||||
rx
|
||||
if GLOBAL_HEAL_CHANNEL_SENDER.set(tx).is_ok() {
|
||||
Ok(rx)
|
||||
} else {
|
||||
Err("Heal channel sender already initialized")
|
||||
}
|
||||
}
|
||||
|
||||
/// Get global heal channel sender
|
||||
|
||||
@@ -155,10 +155,7 @@ impl LastMinuteLatency {
|
||||
}
|
||||
|
||||
pub fn add(&mut self, t: &Duration) {
|
||||
let sec = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs();
|
||||
let sec = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
|
||||
self.forward_to(sec);
|
||||
let win_idx = sec % 60;
|
||||
self.totals[win_idx as usize].add(t);
|
||||
@@ -174,10 +171,7 @@ impl LastMinuteLatency {
|
||||
|
||||
pub fn get_total(&mut self) -> AccElem {
|
||||
let mut res = AccElem::default();
|
||||
let sec = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs();
|
||||
let sec = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
|
||||
self.forward_to(sec);
|
||||
for elem in self.totals.iter() {
|
||||
res.merge(elem);
|
||||
|
||||
@@ -73,7 +73,7 @@ pub async fn init_heal_manager(
|
||||
.map_err(|_| Error::Config("Heal manager already initialized".to_string()))?;
|
||||
|
||||
// Initialize heal channel
|
||||
let channel_receiver = rustfs_common::heal_channel::init_heal_channel();
|
||||
let channel_receiver = rustfs_common::heal_channel::init_heal_channel().map_err(|err| Error::Config(err.to_string()))?;
|
||||
|
||||
// Create channel processor
|
||||
let channel_processor = HealChannelProcessor::new(heal_manager.clone());
|
||||
|
||||
Reference in New Issue
Block a user