From fba201df3dde0581c393fac188719bb5cd587854 Mon Sep 17 00:00:00 2001 From: guojidan <63799833+guojidan@users.noreply.github.com> Date: Thu, 11 Dec 2025 09:55:25 +0800 Subject: [PATCH] fix: harden data usage aggregation and cache handling (#1102) Signed-off-by: junxiang Mu <1948535941@qq.com> Co-authored-by: loverustfs --- Cargo.lock | 1 + crates/ahm/src/scanner/data_scanner.rs | 121 +++++++++--- crates/ahm/src/scanner/local_scan/mod.rs | 13 +- crates/ahm/src/scanner/stats_aggregator.rs | 6 +- crates/ahm/tests/data_usage_fallback_test.rs | 97 ++++++++++ crates/e2e_test/Cargo.toml | 3 +- crates/e2e_test/src/data_usage_test.rs | 73 +++++++ crates/e2e_test/src/lib.rs | 4 + crates/ecstore/src/data_usage.rs | 191 +++++++++++++++---- 9 files changed, 443 insertions(+), 66 deletions(-) create mode 100644 crates/ahm/tests/data_usage_fallback_test.rs create mode 100644 crates/e2e_test/src/data_usage_test.rs diff --git a/Cargo.lock b/Cargo.lock index 94b0e800..aa41eac0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3053,6 +3053,7 @@ dependencies = [ "rand 0.10.0-rc.5", "reqwest", "rmp-serde", + "rustfs-common", "rustfs-ecstore", "rustfs-filemeta", "rustfs-lock", diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index ebb9dcbb..93ea5fec 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -29,7 +29,7 @@ use rustfs_ecstore::{ self as ecstore, StorageAPI, bucket::versioning::VersioningApi, bucket::versioning_sys::BucketVersioningSys, - data_usage::{aggregate_local_snapshots, store_data_usage_in_backend}, + data_usage::{aggregate_local_snapshots, compute_bucket_usage, store_data_usage_in_backend}, disk::{Disk, DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions}, set_disk::SetDisks, store_api::ObjectInfo, @@ -137,6 +137,8 @@ pub struct Scanner { data_usage_stats: Arc>>, /// Last data usage statistics collection time last_data_usage_collection: Arc>>, + /// Backoff timestamp for heavy fallback collection + fallback_backoff_until: Arc>>, /// Heal manager for auto-heal integration heal_manager: Option>, @@ -192,6 +194,7 @@ impl Scanner { disk_metrics: Arc::new(Mutex::new(HashMap::new())), data_usage_stats: Arc::new(Mutex::new(HashMap::new())), last_data_usage_collection: Arc::new(RwLock::new(None)), + fallback_backoff_until: Arc::new(RwLock::new(None)), heal_manager, node_scanner, stats_aggregator, @@ -881,6 +884,7 @@ impl Scanner { /// Collect and persist data usage statistics async fn collect_and_persist_data_usage(&self) -> Result<()> { info!("Starting data usage collection and persistence"); + let now = SystemTime::now(); // Get ECStore instance let Some(ecstore) = rustfs_ecstore::new_object_layer_fn() else { @@ -888,6 +892,10 @@ impl Scanner { return Ok(()); }; + // Helper to avoid hammering the storage layer with repeated realtime scans. + let mut use_cached_on_backoff = false; + let fallback_backoff_secs = Duration::from_secs(300); + // Run local usage scan and aggregate snapshots; fall back to on-demand build when necessary. let mut data_usage = match local_scan::scan_and_persist_local_usage(ecstore.clone()).await { Ok(outcome) => { @@ -909,16 +917,55 @@ impl Scanner { "Failed to aggregate local data usage snapshots, falling back to realtime collection: {}", e ); - self.build_data_usage_from_ecstore(&ecstore).await? + match self.maybe_fallback_collection(now, fallback_backoff_secs, &ecstore).await? { + Some(usage) => usage, + None => { + use_cached_on_backoff = true; + DataUsageInfo::default() + } + } } } } Err(e) => { warn!("Local usage scan failed (using realtime collection instead): {}", e); - self.build_data_usage_from_ecstore(&ecstore).await? + match self.maybe_fallback_collection(now, fallback_backoff_secs, &ecstore).await? { + Some(usage) => usage, + None => { + use_cached_on_backoff = true; + DataUsageInfo::default() + } + } } }; + // If heavy fallback was skipped due to backoff, try to reuse cached stats to avoid empty responses. + if use_cached_on_backoff && data_usage.buckets_usage.is_empty() { + let cached = { + let guard = self.data_usage_stats.lock().await; + guard.values().next().cloned() + }; + if let Some(cached_usage) = cached { + data_usage = cached_usage; + } + + // If there is still no data, try backend before persisting zeros + if data_usage.buckets_usage.is_empty() { + if let Ok(existing) = rustfs_ecstore::data_usage::load_data_usage_from_backend(ecstore.clone()).await { + if !existing.buckets_usage.is_empty() { + info!("Using existing backend data usage during fallback backoff"); + data_usage = existing; + } + } + } + + // Avoid overwriting valid backend stats with zeros when fallback is throttled + if data_usage.buckets_usage.is_empty() { + warn!("Skipping data usage persistence: fallback throttled and no cached/backend data available"); + return Ok(()); + } + } + // Make sure bucket counters reflect aggregated content data_usage.buckets_count = data_usage.buckets_usage.len() as u64; if data_usage.last_update.is_none() { @@ -961,8 +1008,31 @@ impl Scanner { Ok(()) } + async fn maybe_fallback_collection( + &self, + now: SystemTime, + backoff: Duration, + ecstore: &Arc, + ) -> Result> { + let backoff_until = *self.fallback_backoff_until.read().await; + let within_backoff = backoff_until.map(|ts| now < ts).unwrap_or(false); + + if within_backoff { + warn!( + "Skipping heavy data usage fallback within backoff window (until {:?}); using cached stats if available", + backoff_until + ); + return Ok(None); + } + + let usage = self.build_data_usage_from_ecstore(ecstore).await?; + let mut backoff_guard = self.fallback_backoff_until.write().await; + *backoff_guard = Some(now + backoff); + Ok(Some(usage)) + } + /// Build data usage statistics directly from ECStore - async fn build_data_usage_from_ecstore(&self, ecstore: &Arc) -> Result { + pub async fn build_data_usage_from_ecstore(&self, ecstore: &Arc) -> Result { let mut data_usage = DataUsageInfo::default(); // Get bucket list @@ -975,6 +1045,8 @@ impl Scanner { data_usage.last_update = Some(SystemTime::now()); let mut total_objects = 0u64; + let mut total_versions = 0u64; + let mut total_delete_markers = 0u64; let mut total_size = 0u64; for bucket_info in buckets { @@ -982,37 +1054,26 @@ impl Scanner { continue; // Skip system buckets } - // Try to get actual object count for this bucket - let (object_count, bucket_size) = match ecstore - .clone() - .list_objects_v2( - &bucket_info.name, - "", // prefix - None, // continuation_token - None, // delimiter - 100, // max_keys - small limit for performance - false, // fetch_owner - None, // start_after - false, // incl_deleted - ) - .await - { - Ok(result) => { - let count = result.objects.len() as u64; - let size = result.objects.iter().map(|obj| obj.size as u64).sum(); - (count, size) - } - Err(_) => (0, 0), - }; + // Use ecstore pagination helper to avoid truncating at 100 objects + let (object_count, bucket_size, versions_count, delete_markers) = + match compute_bucket_usage(ecstore.clone(), &bucket_info.name).await { + Ok(usage) => (usage.objects_count, usage.size, usage.versions_count, usage.delete_markers_count), + Err(e) => { + warn!("Failed to compute bucket usage for {}: {}", bucket_info.name, e); + (0, 0, 0, 0) + } + }; total_objects += object_count; + total_versions += versions_count; + total_delete_markers += delete_markers; total_size += bucket_size; let bucket_usage = rustfs_common::data_usage::BucketUsageInfo { size: bucket_size, objects_count: object_count, - versions_count: object_count, // Simplified - delete_markers_count: 0, + versions_count, + delete_markers_count: delete_markers, ..Default::default() }; @@ -1022,7 +1083,8 @@ impl Scanner { data_usage.objects_total_count = total_objects; data_usage.objects_total_size = total_size; - data_usage.versions_total_count = total_objects; + data_usage.versions_total_count = total_versions; + data_usage.delete_markers_total_count = total_delete_markers; } Err(e) => { warn!("Failed to list buckets for data usage collection: {}", e); @@ -2556,6 +2618,7 @@ impl Scanner { disk_metrics: Arc::clone(&self.disk_metrics), data_usage_stats: Arc::clone(&self.data_usage_stats), last_data_usage_collection: Arc::clone(&self.last_data_usage_collection), + fallback_backoff_until: Arc::clone(&self.fallback_backoff_until), heal_manager: self.heal_manager.clone(), node_scanner: Arc::clone(&self.node_scanner), stats_aggregator: Arc::clone(&self.stats_aggregator), diff --git a/crates/ahm/src/scanner/local_scan/mod.rs b/crates/ahm/src/scanner/local_scan/mod.rs index 7e31d711..39387c24 100644 --- a/crates/ahm/src/scanner/local_scan/mod.rs +++ b/crates/ahm/src/scanner/local_scan/mod.rs @@ -84,6 +84,9 @@ pub async fn scan_and_persist_local_usage(store: Arc) -> Result) -> Result id.to_string(), None => { diff --git a/crates/ahm/src/scanner/stats_aggregator.rs b/crates/ahm/src/scanner/stats_aggregator.rs index ed56b549..0c019c3a 100644 --- a/crates/ahm/src/scanner/stats_aggregator.rs +++ b/crates/ahm/src/scanner/stats_aggregator.rs @@ -347,7 +347,8 @@ impl DecentralizedStatsAggregator { // update cache *self.cached_stats.write().await = Some(aggregated.clone()); - *self.cache_timestamp.write().await = aggregation_timestamp; + // Use the time when aggregation completes as cache timestamp to avoid premature expiry during long runs + *self.cache_timestamp.write().await = SystemTime::now(); Ok(aggregated) } @@ -359,7 +360,8 @@ impl DecentralizedStatsAggregator { // update cache *self.cached_stats.write().await = Some(aggregated.clone()); - *self.cache_timestamp.write().await = now; + // Cache timestamp should reflect completion time rather than aggregation start + *self.cache_timestamp.write().await = SystemTime::now(); Ok(aggregated) } diff --git a/crates/ahm/tests/data_usage_fallback_test.rs b/crates/ahm/tests/data_usage_fallback_test.rs new file mode 100644 index 00000000..48fd5457 --- /dev/null +++ b/crates/ahm/tests/data_usage_fallback_test.rs @@ -0,0 +1,97 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![cfg(test)] + +use rustfs_ahm::scanner::data_scanner::Scanner; +use rustfs_common::data_usage::DataUsageInfo; +use rustfs_ecstore::GLOBAL_Endpoints; +use rustfs_ecstore::bucket::metadata_sys::{BucketMetadataSys, GLOBAL_BucketMetadataSys}; +use rustfs_ecstore::endpoints::EndpointServerPools; +use rustfs_ecstore::store::ECStore; +use rustfs_ecstore::store_api::{ObjectIO, PutObjReader, StorageAPI}; +use std::sync::Arc; +use tempfile::TempDir; +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; + +/// Build a minimal single-node ECStore over a temp directory and populate objects. +async fn create_store_with_objects(count: usize) -> (TempDir, std::sync::Arc) { + let temp_dir = TempDir::new().expect("temp dir"); + let root = temp_dir.path().to_string_lossy().to_string(); + + // Create endpoints from the temp dir + let (endpoint_pools, _setup) = EndpointServerPools::from_volumes("127.0.0.1:0", vec![root]) + .await + .expect("endpoint pools"); + + // Seed globals required by metadata sys if not already set + if GLOBAL_Endpoints.get().is_none() { + let _ = GLOBAL_Endpoints.set(endpoint_pools.clone()); + } + + let store = ECStore::new("127.0.0.1:0".parse().unwrap(), endpoint_pools, CancellationToken::new()) + .await + .expect("create store"); + + if rustfs_ecstore::global::new_object_layer_fn().is_none() { + rustfs_ecstore::global::set_object_layer(store.clone()).await; + } + + // Initialize metadata system before bucket operations + if GLOBAL_BucketMetadataSys.get().is_none() { + let mut sys = BucketMetadataSys::new(store.clone()); + sys.init(Vec::new()).await; + let _ = GLOBAL_BucketMetadataSys.set(Arc::new(RwLock::new(sys))); + } + + store + .make_bucket("fallback-bucket", &rustfs_ecstore::store_api::MakeBucketOptions::default()) + .await + .expect("make bucket"); + + for i in 0..count { + let key = format!("obj-{i:04}"); + let data = format!("payload-{i}"); + let mut reader = PutObjReader::from_vec(data.into_bytes()); + store + .put_object("fallback-bucket", &key, &mut reader, &rustfs_ecstore::store_api::ObjectOptions::default()) + .await + .expect("put object"); + } + + (temp_dir, store) +} + +#[tokio::test] +async fn fallback_builds_full_counts_over_100_objects() { + let (_tmp, store) = create_store_with_objects(1000).await; + let scanner = Scanner::new(None, None); + + // Directly call the fallback builder to ensure pagination works. + let usage: DataUsageInfo = scanner.build_data_usage_from_ecstore(&store).await.expect("fallback usage"); + + let bucket = usage.buckets_usage.get("fallback-bucket").expect("bucket usage present"); + + assert!( + usage.objects_total_count >= 1000, + "total objects should be >=1000, got {}", + usage.objects_total_count + ); + assert!( + bucket.objects_count >= 1000, + "bucket objects should be >=1000, got {}", + bucket.objects_count + ); +} diff --git a/crates/e2e_test/Cargo.toml b/crates/e2e_test/Cargo.toml index 07e2b239..e1fcbe8a 100644 --- a/crates/e2e_test/Cargo.toml +++ b/crates/e2e_test/Cargo.toml @@ -25,6 +25,7 @@ workspace = true [dependencies] rustfs-ecstore.workspace = true +rustfs-common.workspace = true flatbuffers.workspace = true futures.workspace = true rustfs-lock.workspace = true @@ -49,4 +50,4 @@ uuid = { workspace = true } base64 = { workspace = true } rand = { workspace = true } chrono = { workspace = true } -md5 = { workspace = true } \ No newline at end of file +md5 = { workspace = true } diff --git a/crates/e2e_test/src/data_usage_test.rs b/crates/e2e_test/src/data_usage_test.rs new file mode 100644 index 00000000..1121b366 --- /dev/null +++ b/crates/e2e_test/src/data_usage_test.rs @@ -0,0 +1,73 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use aws_sdk_s3::primitives::ByteStream; +use rustfs_common::data_usage::DataUsageInfo; +use serial_test::serial; + +use crate::common::{RustFSTestEnvironment, TEST_BUCKET, awscurl_get, init_logging}; + +/// Regression test for data usage accuracy (issue #1012). +/// Launches rustfs, writes 1000 objects, then asserts admin data usage reports the full count. +#[tokio::test(flavor = "multi_thread")] +#[serial] +#[ignore = "Starts a rustfs server and requires awscurl; enable when running full E2E"] +async fn data_usage_reports_all_objects() -> Result<(), Box> { + init_logging(); + + let mut env = RustFSTestEnvironment::new().await?; + env.start_rustfs_server(vec![]).await?; + + let client = env.create_s3_client(); + + // Create bucket and upload objects + client.create_bucket().bucket(TEST_BUCKET).send().await?; + + for i in 0..1000 { + let key = format!("obj-{i:04}"); + client + .put_object() + .bucket(TEST_BUCKET) + .key(key) + .body(ByteStream::from_static(b"hello-world")) + .send() + .await?; + } + + // Query admin data usage API + let url = format!("{}/rustfs/admin/v3/datausageinfo", env.url); + let resp = awscurl_get(&url, &env.access_key, &env.secret_key).await?; + let usage: DataUsageInfo = serde_json::from_str(&resp)?; + + // Assert total object count and per-bucket count are not truncated + let bucket_usage = usage + .buckets_usage + .get(TEST_BUCKET) + .cloned() + .expect("bucket usage should exist"); + + assert!( + usage.objects_total_count >= 1000, + "total object count should be at least 1000, got {}", + usage.objects_total_count + ); + assert!( + bucket_usage.objects_count >= 1000, + "bucket object count should be at least 1000, got {}", + bucket_usage.objects_count + ); + + env.stop_server(); + Ok(()) +} diff --git a/crates/e2e_test/src/lib.rs b/crates/e2e_test/src/lib.rs index ac6f3805..8a7a7ef4 100644 --- a/crates/e2e_test/src/lib.rs +++ b/crates/e2e_test/src/lib.rs @@ -18,6 +18,10 @@ mod reliant; #[cfg(test)] pub mod common; +// Data usage regression tests +#[cfg(test)] +mod data_usage_test; + // KMS-specific test modules #[cfg(test)] mod kms; diff --git a/crates/ecstore/src/data_usage.rs b/crates/ecstore/src/data_usage.rs index 822aaa38..4bfd1ea7 100644 --- a/crates/ecstore/src/data_usage.rs +++ b/crates/ecstore/src/data_usage.rs @@ -32,6 +32,7 @@ use rustfs_common::data_usage::{ BucketTargetUsageInfo, BucketUsageInfo, DataUsageCache, DataUsageEntry, DataUsageInfo, DiskUsageStatus, SizeSummary, }; use rustfs_utils::path::SLASH_SEPARATOR; +use tokio::fs; use tracing::{error, info, warn}; use crate::error::Error; @@ -63,6 +64,21 @@ lazy_static::lazy_static! { /// Store data usage info to backend storage pub async fn store_data_usage_in_backend(data_usage_info: DataUsageInfo, store: Arc) -> Result<(), Error> { + // Prevent older data from overwriting newer persisted stats + if let Ok(buf) = read_config(store.clone(), &DATA_USAGE_OBJ_NAME_PATH).await { + if let Ok(existing) = serde_json::from_slice::(&buf) { + if let (Some(new_ts), Some(existing_ts)) = (data_usage_info.last_update, existing.last_update) { + if new_ts <= existing_ts { + info!( + "Skip persisting data usage: incoming last_update {:?} <= existing {:?}", + new_ts, existing_ts + ); + return Ok(()); + } + } + } + } + let data = serde_json::to_vec(&data_usage_info).map_err(|e| Error::other(format!("Failed to serialize data usage info: {e}")))?; @@ -160,6 +176,39 @@ pub async fn load_data_usage_from_backend(store: Arc) -> Result) { + if let Some(update) = snapshot.last_update { + if latest_update.is_none_or(|current| update > current) { + *latest_update = Some(update); + } + } + + snapshot.recompute_totals(); + + aggregated.objects_total_count = aggregated.objects_total_count.saturating_add(snapshot.objects_total_count); + aggregated.versions_total_count = aggregated.versions_total_count.saturating_add(snapshot.versions_total_count); + aggregated.delete_markers_total_count = aggregated + .delete_markers_total_count + .saturating_add(snapshot.delete_markers_total_count); + aggregated.objects_total_size = aggregated.objects_total_size.saturating_add(snapshot.objects_total_size); + + for (bucket, usage) in snapshot.buckets_usage.into_iter() { + let bucket_size = usage.size; + match aggregated.buckets_usage.entry(bucket.clone()) { + Entry::Occupied(mut entry) => entry.get_mut().merge(&usage), + Entry::Vacant(entry) => { + entry.insert(usage.clone()); + } + } + + aggregated + .bucket_sizes + .entry(bucket) + .and_modify(|size| *size = size.saturating_add(bucket_size)) + .or_insert(bucket_size); + } +} + pub async fn aggregate_local_snapshots(store: Arc) -> Result<(Vec, DataUsageInfo), Error> { let mut aggregated = DataUsageInfo::default(); let mut latest_update: Option = None; @@ -196,7 +245,24 @@ pub async fn aggregate_local_snapshots(store: Arc) -> Result<(Vec) -> Result<(Vec current) { - latest_update = Some(update); - } - } - - aggregated.objects_total_count = aggregated.objects_total_count.saturating_add(snapshot.objects_total_count); - aggregated.versions_total_count = - aggregated.versions_total_count.saturating_add(snapshot.versions_total_count); - aggregated.delete_markers_total_count = aggregated - .delete_markers_total_count - .saturating_add(snapshot.delete_markers_total_count); - aggregated.objects_total_size = aggregated.objects_total_size.saturating_add(snapshot.objects_total_size); - - for (bucket, usage) in snapshot.buckets_usage.into_iter() { - let bucket_size = usage.size; - match aggregated.buckets_usage.entry(bucket.clone()) { - Entry::Occupied(mut entry) => entry.get_mut().merge(&usage), - Entry::Vacant(entry) => { - entry.insert(usage.clone()); - } - } - - aggregated - .bucket_sizes - .entry(bucket) - .and_modify(|size| *size = size.saturating_add(bucket_size)) - .or_insert(bucket_size); - } + merge_snapshot(&mut aggregated, snapshot, &mut latest_update); } statuses.push(status); @@ -549,3 +585,94 @@ pub async fn save_data_usage_cache(cache: &DataUsageCache, name: &str) -> crate: save_config(store, &name, buf).await?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use rustfs_common::data_usage::BucketUsageInfo; + + fn aggregate_for_test( + inputs: Vec<(DiskUsageStatus, Result, Error>)>, + ) -> (Vec, DataUsageInfo) { + let mut aggregated = DataUsageInfo::default(); + let mut latest_update: Option = None; + let mut statuses = Vec::new(); + + for (mut status, snapshot_result) in inputs { + if let Ok(Some(snapshot)) = snapshot_result { + status.snapshot_exists = true; + status.last_update = snapshot.last_update; + merge_snapshot(&mut aggregated, snapshot, &mut latest_update); + } + statuses.push(status); + } + + aggregated.buckets_count = aggregated.buckets_usage.len() as u64; + aggregated.last_update = latest_update; + aggregated.disk_usage_status = statuses.clone(); + + (statuses, aggregated) + } + + #[test] + fn aggregate_skips_corrupted_snapshot_and_preserves_other_disks() { + let mut good_snapshot = LocalUsageSnapshot::new(LocalUsageSnapshotMeta { + disk_id: "good-disk".to_string(), + pool_index: Some(0), + set_index: Some(0), + disk_index: Some(0), + }); + good_snapshot.last_update = Some(SystemTime::now()); + good_snapshot.buckets_usage.insert( + "bucket-a".to_string(), + BucketUsageInfo { + objects_count: 3, + versions_count: 3, + size: 42, + ..Default::default() + }, + ); + good_snapshot.recompute_totals(); + + let bad_snapshot_err: Result, Error> = Err(Error::other("corrupted snapshot payload")); + + let inputs = vec![ + ( + DiskUsageStatus { + disk_id: "bad-disk".to_string(), + pool_index: Some(0), + set_index: Some(0), + disk_index: Some(1), + last_update: None, + snapshot_exists: false, + }, + bad_snapshot_err, + ), + ( + DiskUsageStatus { + disk_id: "good-disk".to_string(), + pool_index: Some(0), + set_index: Some(0), + disk_index: Some(0), + last_update: None, + snapshot_exists: false, + }, + Ok(Some(good_snapshot)), + ), + ]; + + let (statuses, aggregated) = aggregate_for_test(inputs); + + // Bad disk stays non-existent, good disk is marked present + let bad_status = statuses.iter().find(|s| s.disk_id == "bad-disk").unwrap(); + assert!(!bad_status.snapshot_exists); + let good_status = statuses.iter().find(|s| s.disk_id == "good-disk").unwrap(); + assert!(good_status.snapshot_exists); + + // Aggregated data is from good snapshot only + assert_eq!(aggregated.objects_total_count, 3); + assert_eq!(aggregated.objects_total_size, 42); + assert_eq!(aggregated.buckets_count, 1); + assert_eq!(aggregated.buckets_usage.get("bucket-a").map(|b| (b.objects_count, b.size)), Some((3, 42))); + } +}