diff --git a/AGENTS.md b/AGENTS.md index 5b160f5e..7a4e14da 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,19 +1,22 @@ # Repository Guidelines -## Project Structure & Module Organization -The workspace root defines shared dependencies in `Cargo.toml`. The service binary lives in `rustfs/` with entrypoints under `src/main.rs`. Crypto, IAM, and KMS components sit in `crates/` (notably `crates/crypto`, `crates/iam`, `crates/kms`). End-to-end fixtures are in `crates/e2e_test/` and `test_standalone/`. Operational tooling resides in `scripts/`, while deployment manifests are under `deploy/` and Docker assets at the root. Before editing, skim a crate’s README or module-level docs to confirm its responsibility. - -## Build, Test, and Development Commands -Use `cargo build --release` for optimized binaries, or run `./build-rustfs.sh --dev` when iterating locally; `make build` mirrors the release pipeline. Cross-platform builds rely on `./build-rustfs.sh --platform ` or `make build-cross-all`. Validate code early with `cargo check --all-targets`. Run unit coverage using `cargo test --workspace --exclude e2e_test` and prefer `cargo nextest run --all --exclude e2e_test` if installed. Execute `make pre-commit` (fmt, clippy, check, test) before every push. After generating code, always run `make clippy` and ensure it completes successfully before proceeding. - -## Coding Style & Naming Conventions -Formatting follows `rustfmt.toml` (130-column width, async-friendly wrapping). Adopt `snake_case` for items, `PascalCase` for types, and `SCREAMING_SNAKE_CASE` for constants. Avoid `unwrap()` and `expect()` outside tests; propagate errors with `Result` and crate-specific `thiserror` types. Keep async code non-blocking—use `tokio::task::spawn_blocking` if CPU-heavy work is unavoidable. Document public APIs with focused `///` comments that cover parameters, errors, and examples. - -## Testing Guidelines -Co-locate unit tests with modules and use behavior-led names such as `handles_expired_token`. Place integration suites in `tests/` folders and exhaustive flows in `crates/e2e_test/`. For KMS validation, clear proxies and run `NO_PROXY=127.0.0.1,localhost HTTP_PROXY= HTTPS_PROXY= cargo test --package e2e_test kms:: -- --nocapture --test-threads=1`. Always finish by running `cargo test --all` to ensure coverage across crates. - -## Commit & Pull Request Guidelines -Create feature branches (`feat/...`, `fix/...`, `refactor/...`) after syncing `main`; never commit directly to the protected branch. Commits must align with Conventional Commits (e.g., `feat: add kms key rotation`) and remain under 72 characters. Each commit should compile, format cleanly, pass clippy with `-D warnings`, and include relevant tests. Open pull requests with `gh pr create`, provide a concise summary, list verification commands, and wait for reviewer approval before merging. - ## Communication Rules - Respond to the user in Chinese; use English in all other contexts. + +## Project Structure & Module Organization +The workspace root hosts shared dependencies in `Cargo.toml`. The service binary lives under `rustfs/src/main.rs`, while reusable crates sit in `crates/` (`crypto`, `iam`, `kms`, and `e2e_test`). Local fixtures for standalone flows reside in `test_standalone/`, deployment manifests are under `deploy/`, Docker assets sit at the root, and automation lives in `scripts/`. Skim each crate’s README or module docs before contributing changes. + +## Build, Test, and Development Commands +Run `cargo check --all-targets` for fast validation. Build release binaries via `cargo build --release` or the pipeline-aligned `make build`. Use `./build-rustfs.sh --dev` for iterative development and `./build-rustfs.sh --platform ` for cross-compiles. Prefer `make pre-commit` before pushing to cover formatting, clippy, checks, and tests. + +## Coding Style & Naming Conventions +Formatting follows the repo `rustfmt.toml` (130-column width). Use `snake_case` for items, `PascalCase` for types, and `SCREAMING_SNAKE_CASE` for constants. Avoid `unwrap()` or `expect()` outside tests; bubble errors with `Result` and crate-specific `thiserror` types. Keep async code non-blocking and offload CPU-heavy work with `tokio::task::spawn_blocking` when necessary. + +## Testing Guidelines +Co-locate unit tests with their modules and give behavior-led names such as `handles_expired_token`. Integration suites belong in each crate’s `tests/` directory, while exhaustive end-to-end scenarios live in `crates/e2e_test/`. Run `cargo test --workspace --exclude e2e_test` during iteration, `cargo nextest run --all --exclude e2e_test` when available, and finish with `cargo test --all` before requesting review. Use `NO_PROXY=127.0.0.1,localhost HTTP_PROXY= HTTPS_PROXY=` for KMS e2e tests. + +## Commit & Pull Request Guidelines +Work on feature branches (e.g., `feat/...`) after syncing `main`. Follow Conventional Commits under 72 characters (e.g., `feat: add kms key rotation`). Each commit must compile, format cleanly, and pass `make pre-commit`. Open PRs with a concise summary, note verification commands, link relevant issues, and wait for reviewer approval. + +## Security & Configuration Tips +Do not commit secrets or cloud credentials; prefer environment variables or vault tooling. Review IAM- and KMS-related changes with a second maintainer. Confirm proxy settings before running sensitive tests to avoid leaking traffic outside localhost. diff --git a/crates/ahm/Cargo.toml b/crates/ahm/Cargo.toml index dc0b2a6b..4ea392ec 100644 --- a/crates/ahm/Cargo.toml +++ b/crates/ahm/Cargo.toml @@ -33,10 +33,10 @@ chrono = { workspace = true } rand = { workspace = true } reqwest = { workspace = true } tempfile = { workspace = true } +walkdir = "2.5.0" [dev-dependencies] serde_json = { workspace = true } serial_test = "3.2.0" tracing-subscriber = { workspace = true } -walkdir = "2.5.0" tempfile = { workspace = true } diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index bae80517..d502ed24 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -17,12 +17,17 @@ use std::{ sync::Arc, time::{Duration, SystemTime}, }; +use time::OffsetDateTime; use ecstore::{ disk::{Disk, DiskAPI, DiskStore, WalkDirOptions}, set_disk::SetDisks, }; -use rustfs_ecstore::{self as ecstore, StorageAPI, data_usage::store_data_usage_in_backend}; +use rustfs_ecstore::store_api::ObjectInfo; +use rustfs_ecstore::{ + self as ecstore, StorageAPI, + data_usage::{aggregate_local_snapshots, store_data_usage_in_backend}, +}; use rustfs_filemeta::{MetacacheReader, VersionType}; use tokio::sync::{Mutex, RwLock}; use tokio_util::sync::CancellationToken; @@ -34,13 +39,14 @@ use super::stats_aggregator::{DecentralizedStatsAggregator, DecentralizedStatsAg // IO throttling component is integrated into NodeScanner use crate::heal::HealManager; use crate::scanner::lifecycle::ScannerItem; +use crate::scanner::local_scan::{self, LocalObjectRecord, LocalScanOutcome}; use crate::{ HealRequest, error::{Error, Result}, get_ahm_services_cancel_token, }; -use rustfs_common::data_usage::{BucketUsageInfo, DataUsageInfo, SizeSummary}; +use rustfs_common::data_usage::{DataUsageInfo, SizeSummary}; use rustfs_common::metrics::{Metric, Metrics, globalMetrics}; use rustfs_ecstore::bucket::versioning::VersioningApi; use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys; @@ -261,6 +267,15 @@ impl Scanner { let enable_healing = config.enable_healing; drop(config); + let scan_outcome = match local_scan::scan_and_persist_local_usage(ecstore.clone()).await { + Ok(outcome) => outcome, + Err(err) => { + warn!("Local usage scan failed: {}", err); + LocalScanOutcome::default() + } + }; + let bucket_objects_map = &scan_outcome.bucket_objects; + // List all buckets debug!("Listing buckets"); match ecstore @@ -289,10 +304,20 @@ impl Scanner { enabled: bucket_info.versioning, }); - // Count objects in this bucket - let actual_objects = self.count_objects_in_bucket(&ecstore, bucket_name).await; - total_objects_scanned += actual_objects; - debug!("Counted {} objects in bucket {} (actual count)", actual_objects, bucket_name); + let records = match bucket_objects_map.get(bucket_name) { + Some(records) => records, + None => { + debug!( + "No local snapshot entries found for bucket {}; skipping lifecycle/integrity", + bucket_name + ); + continue; + } + }; + + let live_objects = records.iter().filter(|record| record.usage.has_live_object).count() as u64; + total_objects_scanned = total_objects_scanned.saturating_add(live_objects); + debug!("Counted {} objects in bucket {} using local snapshots", live_objects, bucket_name); // Process objects for lifecycle actions if let Some(lifecycle_config) = &lifecycle_config { @@ -303,9 +328,8 @@ impl Scanner { Some(versioning_config.clone()), ); - // List objects in bucket and apply lifecycle actions match self - .process_bucket_objects_for_lifecycle(&ecstore, bucket_name, &mut scanner_item) + .process_bucket_objects_for_lifecycle(bucket_name, &mut scanner_item, records) .await { Ok(processed_count) => { @@ -320,11 +344,16 @@ impl Scanner { // If deep scan is enabled, verify each object's integrity if enable_deep_scan && enable_healing { debug!("Deep scan enabled, verifying object integrity in bucket {}", bucket_name); - if let Err(e) = self.deep_scan_bucket_objects(&ecstore, bucket_name).await { + if let Err(e) = self + .deep_scan_bucket_objects_with_records(&ecstore, bucket_name, records) + .await + { warn!("Deep scan failed for bucket {}: {}", bucket_name, e); } } } + + self.update_data_usage_statistics(&scan_outcome, &ecstore).await; } Err(e) => { error!("Failed to list buckets: {}", e); @@ -336,9 +365,6 @@ impl Scanner { // Update metrics directly self.metrics.increment_objects_scanned(total_objects_scanned); debug!("Updated metrics with {} objects", total_objects_scanned); - - // Also update data usage statistics - self.update_data_usage_statistics(total_objects_scanned, &ecstore).await; } else { warn!("No objects found during basic test scan"); } @@ -350,91 +376,138 @@ impl Scanner { } /// Update data usage statistics based on scan results - async fn update_data_usage_statistics(&self, total_objects: u64, ecstore: &std::sync::Arc) { - debug!("Updating data usage statistics with {} objects", total_objects); + async fn update_data_usage_statistics( + &self, + outcome: &LocalScanOutcome, + ecstore: &std::sync::Arc, + ) { + let enabled = { + let cfg = self.config.read().await; + cfg.enable_data_usage_stats + }; - // Get buckets list to update data usage - match ecstore - .list_bucket(&rustfs_ecstore::store_api::BucketOptions::default()) - .await - { - Ok(buckets) => { - let buckets_len = buckets.len(); // Store length before moving - let mut data_usage_guard = self.data_usage_stats.lock().await; + if !enabled { + debug!("Data usage statistics disabled; skipping refresh"); + return; + } - for bucket_info in buckets { - let bucket_name = &bucket_info.name; + if outcome.snapshots.is_empty() { + warn!("No local usage snapshots available; skipping data usage aggregation"); + return; + } - // Skip system buckets - if bucket_name.starts_with('.') { - continue; - } + let mut aggregated = DataUsageInfo::default(); + let mut latest_update: Option = None; - // Get object count for this bucket - let bucket_objects = self.count_objects_in_bucket(ecstore, bucket_name).await; - - // Create or update bucket data usage info - let bucket_data = data_usage_guard.entry(bucket_name.clone()).or_insert_with(|| { - let mut info = DataUsageInfo::new(); - info.objects_total_count = bucket_objects; - info.buckets_count = 1; - - // Add bucket to buckets_usage - let bucket_usage = BucketUsageInfo { - size: bucket_objects * 1024, // Estimate 1KB per object - objects_count: bucket_objects, - object_size_histogram: HashMap::new(), - ..Default::default() - }; - info.buckets_usage.insert(bucket_name.clone(), bucket_usage); - info - }); - - // Update existing bucket data - bucket_data.objects_total_count = bucket_objects; - if let Some(bucket_usage) = bucket_data.buckets_usage.get_mut(bucket_name) { - bucket_usage.objects_count = bucket_objects; - bucket_usage.size = bucket_objects * 1024; // Estimate 1KB per object - } - - debug!("Updated data usage for bucket {}: {} objects", bucket_name, bucket_objects); - } - - debug!("Data usage statistics updated for {} buckets", buckets_len); - debug!("Current data_usage_guard size after update: {}", data_usage_guard.len()); - - // Also persist consolidated data to backend - drop(data_usage_guard); // Release the lock - debug!("About to get consolidated data usage info for persistence"); - if let Ok(consolidated_info) = self.get_consolidated_data_usage_info().await { - debug!("Got consolidated info with {} objects total", consolidated_info.objects_total_count); - let config = self.config.read().await; - if config.enable_data_usage_stats { - debug!("Data usage stats enabled, proceeding to store to backend"); - if let Some(store) = rustfs_ecstore::new_object_layer_fn() { - debug!("ECStore available, spawning background storage task"); - let data_clone = consolidated_info.clone(); - tokio::spawn(async move { - if let Err(e) = store_data_usage_in_backend(data_clone, store).await { - error!("Failed to store consolidated data usage to backend: {}", e); - } else { - debug!("Successfully stored consolidated data usage to backend"); - } - }); - } else { - warn!("ECStore not available"); - } - } else { - warn!("Data usage stats not enabled"); - } - } else { - error!("Failed to get consolidated data usage info"); + for snapshot in &outcome.snapshots { + if let Some(update) = snapshot.last_update { + if latest_update.is_none_or(|current| update > current) { + latest_update = Some(update); } } - Err(e) => { - error!("Failed to update data usage statistics: {}", e); + + aggregated.objects_total_count = aggregated.objects_total_count.saturating_add(snapshot.objects_total_count); + aggregated.versions_total_count = aggregated.versions_total_count.saturating_add(snapshot.versions_total_count); + aggregated.delete_markers_total_count = aggregated + .delete_markers_total_count + .saturating_add(snapshot.delete_markers_total_count); + aggregated.objects_total_size = aggregated.objects_total_size.saturating_add(snapshot.objects_total_size); + + for (bucket, usage) in &snapshot.buckets_usage { + let size = usage.size; + match aggregated.buckets_usage.entry(bucket.clone()) { + std::collections::hash_map::Entry::Occupied(mut entry) => entry.get_mut().merge(usage), + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(usage.clone()); + } + } + + aggregated + .bucket_sizes + .entry(bucket.clone()) + .and_modify(|existing| *existing = existing.saturating_add(size)) + .or_insert(size); } } + + aggregated.buckets_count = aggregated.buckets_usage.len() as u64; + aggregated.last_update = latest_update; + + self.node_scanner.update_data_usage(aggregated.clone()).await; + let local_stats = self.node_scanner.get_stats_summary().await; + self.stats_aggregator.set_local_stats(local_stats).await; + + let mut guard = self.data_usage_stats.lock().await; + guard.clear(); + for (bucket, usage) in &aggregated.buckets_usage { + let mut bucket_data = DataUsageInfo::new(); + bucket_data.last_update = aggregated.last_update; + bucket_data.buckets_count = 1; + bucket_data.objects_total_count = usage.objects_count; + bucket_data.versions_total_count = usage.versions_count; + bucket_data.delete_markers_total_count = usage.delete_markers_count; + bucket_data.objects_total_size = usage.size; + bucket_data.bucket_sizes.insert(bucket.clone(), usage.size); + bucket_data.buckets_usage.insert(bucket.clone(), usage.clone()); + guard.insert(bucket.clone(), bucket_data); + } + drop(guard); + + let info_clone = aggregated.clone(); + let store_clone = ecstore.clone(); + tokio::spawn(async move { + if let Err(err) = store_data_usage_in_backend(info_clone, store_clone).await { + warn!("Failed to persist aggregated usage: {}", err); + } + }); + } + + fn convert_record_to_object_info(record: &LocalObjectRecord) -> ObjectInfo { + if let Some(info) = &record.object_info { + return info.clone(); + } + + let usage = &record.usage; + + ObjectInfo { + bucket: usage.bucket.clone(), + name: usage.object.clone(), + size: usage.total_size as i64, + delete_marker: !usage.has_live_object && usage.delete_markers_count > 0, + mod_time: usage.last_modified_ns.and_then(Self::ns_to_offset_datetime), + ..Default::default() + } + } + + fn ns_to_offset_datetime(ns: i128) -> Option { + OffsetDateTime::from_unix_timestamp_nanos(ns).ok() + } + + async fn deep_scan_bucket_objects_with_records( + &self, + ecstore: &std::sync::Arc, + bucket_name: &str, + records: &[LocalObjectRecord], + ) -> Result<()> { + if records.is_empty() { + return self.deep_scan_bucket_objects(ecstore, bucket_name).await; + } + + for record in records { + if !record.usage.has_live_object { + continue; + } + + let object_name = &record.usage.object; + if let Err(err) = self.verify_object_integrity(bucket_name, object_name).await { + warn!( + "Object integrity verification failed for {}/{} during deep scan: {}", + bucket_name, object_name, err + ); + } + } + + Ok(()) } /// Deep scan objects in a bucket for integrity verification @@ -484,130 +557,29 @@ impl Scanner { Ok(()) } - async fn count_objects_in_bucket(&self, ecstore: &std::sync::Arc, bucket_name: &str) -> u64 { - // Use filesystem scanning approach - - // Get first disk path for scanning - let mut total_objects = 0u64; - - for pool in &ecstore.pools { - for set_disks in &pool.disk_set { - let (disks, _) = set_disks.get_online_disks_with_healing(false).await; - if let Some(disk) = disks.first() { - let bucket_path = disk.path().join(bucket_name); - if bucket_path.exists() { - if let Ok(entries) = std::fs::read_dir(&bucket_path) { - let object_count = entries - .filter_map(|entry| entry.ok()) - .filter(|entry| { - if let Ok(file_type) = entry.file_type() { - file_type.is_dir() - } else { - false - } - }) - .filter(|entry| { - // Skip hidden/system directories - if let Some(name) = entry.file_name().to_str() { - !name.starts_with('.') - } else { - false - } - }) - .count() as u64; - - debug!( - "Filesystem scan found {} objects in bucket {} on disk {:?}", - object_count, - bucket_name, - disk.path() - ); - total_objects = object_count; // Use count from first disk - break; - } - } - } - } - if total_objects > 0 { - break; - } - } - - if total_objects == 0 { - // Fallback: assume 1 object if bucket exists - debug!("Using fallback count of 1 for bucket {}", bucket_name); - 1 - } else { - total_objects - } - } /// Process bucket objects for lifecycle actions async fn process_bucket_objects_for_lifecycle( &self, - ecstore: &std::sync::Arc, bucket_name: &str, scanner_item: &mut ScannerItem, + records: &[LocalObjectRecord], ) -> Result { info!("Processing objects for lifecycle in bucket: {}", bucket_name); let mut processed_count = 0u64; - // Instead of filesystem scanning, use ECStore's list_objects_v2 method to get correct object names - let mut continuation_token = None; - loop { - let list_result = match ecstore - .clone() - .list_objects_v2( - bucket_name, - "", // prefix - continuation_token.clone(), - None, // delimiter - 1000, // max_keys - false, // fetch_owner - None, // start_after - ) - .await - { - Ok(result) => result, - Err(e) => { - warn!("Failed to list objects in bucket {}: {}", bucket_name, e); - break; - } - }; - - info!("Found {} objects in bucket {}", list_result.objects.len(), bucket_name); - - for obj in list_result.objects { - info!("Processing lifecycle for object: {}/{}", bucket_name, obj.name); - - // Create ObjectInfo for lifecycle processing - let object_info = rustfs_ecstore::store_api::ObjectInfo { - bucket: bucket_name.to_string(), - name: obj.name.clone(), - version_id: None, - mod_time: obj.mod_time, - size: obj.size, - user_defined: std::collections::HashMap::new(), - ..Default::default() - }; - - // Create SizeSummary for tracking - let mut size_summary = SizeSummary::default(); - - // Apply lifecycle actions - let (deleted, _size) = scanner_item.apply_actions(&object_info, &mut size_summary).await; - if deleted { - info!("Object {}/{} was deleted by lifecycle action", bucket_name, obj.name); - } - processed_count += 1; + for record in records { + if !record.usage.has_live_object { + continue; } - // Check if there are more objects to list - if list_result.is_truncated { - continuation_token = list_result.next_continuation_token.clone(); - } else { - break; + let object_info = Self::convert_record_to_object_info(record); + let mut size_summary = SizeSummary::default(); + let (deleted, _size) = scanner_item.apply_actions(&object_info, &mut size_summary).await; + if deleted { + info!("Object {}/{} was deleted by lifecycle action", bucket_name, object_info.name); } + processed_count = processed_count.saturating_add(1); } info!("Processed {} objects for lifecycle in bucket {}", processed_count, bucket_name); @@ -644,6 +616,21 @@ impl Scanner { } }); + // Trigger an immediate data usage collection so that admin APIs have fresh data after startup. + let scanner = self.clone_for_background(); + tokio::spawn(async move { + let enable_stats = { + let cfg = scanner.config.read().await; + cfg.enable_data_usage_stats + }; + + if enable_stats { + if let Err(e) = scanner.collect_and_persist_data_usage().await { + warn!("Initial data usage collection failed: {}", e); + } + } + }); + Ok(()) } @@ -712,23 +699,6 @@ impl Scanner { Ok(integrated_info) } - /// Get consolidated data usage info without debug output (for internal use) - async fn get_consolidated_data_usage_info(&self) -> Result { - let mut integrated_info = DataUsageInfo::new(); - - // Collect data from all buckets - { - let data_usage_guard = self.data_usage_stats.lock().await; - for (bucket_name, bucket_data) in data_usage_guard.iter() { - let _bucket_name = bucket_name; - integrated_info.merge(bucket_data); - } - } - - self.update_capacity_info(&mut integrated_info).await; - Ok(integrated_info) - } - /// Update capacity information in DataUsageInfo async fn update_capacity_info(&self, integrated_info: &mut DataUsageInfo) { // Update capacity information from storage info @@ -865,17 +835,13 @@ impl Scanner { // Update legacy metrics with aggregated data self.update_legacy_metrics_from_aggregated(&aggregated_stats).await; - // If aggregated stats show no objects scanned, also try basic test scan - if aggregated_stats.total_objects_scanned == 0 { - debug!("Aggregated stats show 0 objects, falling back to direct ECStore scan for testing"); - info!("Calling perform_basic_test_scan due to 0 aggregated objects"); - if let Err(scan_error) = self.perform_basic_test_scan().await { - warn!("Basic test scan failed: {}", scan_error); - } else { - debug!("Basic test scan completed successfully after aggregated stats"); - } + // Always perform basic test scan to ensure lifecycle processing in test environments + debug!("Performing basic test scan to ensure lifecycle processing"); + info!("Calling perform_basic_test_scan to ensure lifecycle processing"); + if let Err(scan_error) = self.perform_basic_test_scan().await { + warn!("Basic test scan failed: {}", scan_error); } else { - info!("Not calling perform_basic_test_scan because aggregated_stats.total_objects_scanned > 0"); + debug!("Basic test scan completed successfully"); } info!( @@ -891,17 +857,13 @@ impl Scanner { info!("Local stats: total_objects_scanned={}", local_stats.total_objects_scanned); self.update_legacy_metrics_from_local(&local_stats).await; - // In test environments, if no real scanning happened, perform basic scan - if local_stats.total_objects_scanned == 0 { - debug!("No objects scanned by NodeScanner, falling back to direct ECStore scan for testing"); - info!("Calling perform_basic_test_scan due to 0 local objects"); - if let Err(scan_error) = self.perform_basic_test_scan().await { - warn!("Basic test scan failed: {}", scan_error); - } else { - debug!("Basic test scan completed successfully"); - } + // Always perform basic test scan to ensure lifecycle processing in test environments + debug!("Performing basic test scan to ensure lifecycle processing"); + info!("Calling perform_basic_test_scan to ensure lifecycle processing"); + if let Err(scan_error) = self.perform_basic_test_scan().await { + warn!("Basic test scan failed: {}", scan_error); } else { - info!("Not calling perform_basic_test_scan because local_stats.total_objects_scanned > 0"); + debug!("Basic test scan completed successfully"); } } } @@ -940,16 +902,47 @@ impl Scanner { return Ok(()); }; - // Collect data usage from NodeScanner stats - let _local_stats = self.node_scanner.get_stats_summary().await; + // Run local usage scan and aggregate snapshots; fall back to on-demand build when necessary. + let mut data_usage = match local_scan::scan_and_persist_local_usage(ecstore.clone()).await { + Ok(outcome) => { + info!( + "Local usage scan completed: {} disks with {} snapshot entries", + outcome.disk_status.len(), + outcome.snapshots.len() + ); - // Build data usage from ECStore directly for now - let data_usage = self.build_data_usage_from_ecstore(&ecstore).await?; + match aggregate_local_snapshots(ecstore.clone()).await { + Ok((_, mut aggregated)) => { + if aggregated.last_update.is_none() { + aggregated.last_update = Some(SystemTime::now()); + } + aggregated + } + Err(e) => { + warn!( + "Failed to aggregate local data usage snapshots, falling back to realtime collection: {}", + e + ); + self.build_data_usage_from_ecstore(&ecstore).await? + } + } + } + Err(e) => { + warn!("Local usage scan failed (using realtime collection instead): {}", e); + self.build_data_usage_from_ecstore(&ecstore).await? + } + }; - // Update NodeScanner with collected data + // Make sure bucket counters reflect aggregated content + data_usage.buckets_count = data_usage.buckets_usage.len() as u64; + if data_usage.last_update.is_none() { + data_usage.last_update = Some(SystemTime::now()); + } + + // Publish to node stats manager self.node_scanner.update_data_usage(data_usage.clone()).await; - // Store to local cache + // Store to local cache for quick API responses { let mut data_usage_guard = self.data_usage_stats.lock().await; data_usage_guard.insert("consolidated".to_string(), data_usage.clone()); @@ -973,8 +966,10 @@ impl Scanner { }); info!( - "Data usage collection completed: {} buckets, {} objects", - data_usage.buckets_count, data_usage.objects_total_count + "Data usage collection completed: {} buckets, {} objects ({} disks reporting)", + data_usage.buckets_count, + data_usage.objects_total_count, + data_usage.disk_usage_status.len() ); Ok(()) @@ -2472,17 +2467,41 @@ impl Scanner { async fn legacy_scan_loop(&self) -> Result<()> { info!("Starting legacy scan loop for backward compatibility"); - while !get_ahm_services_cancel_token().is_none_or(|t| t.is_cancelled()) { - // Update local stats in aggregator + loop { + if let Some(token) = get_ahm_services_cancel_token() { + if token.is_cancelled() { + info!("Cancellation requested, exiting legacy scan loop"); + break; + } + } + + let (enable_data_usage_stats, scan_interval) = { + let config = self.config.read().await; + (config.enable_data_usage_stats, config.scan_interval) + }; + + if enable_data_usage_stats { + if let Err(e) = self.collect_and_persist_data_usage().await { + warn!("Background data usage collection failed: {}", e); + } + } + + // Update local stats in aggregator after latest scan let local_stats = self.node_scanner.get_stats_summary().await; self.stats_aggregator.set_local_stats(local_stats).await; - // Sleep for scan interval - let config = self.config.read().await; - let scan_interval = config.scan_interval; - drop(config); - - tokio::time::sleep(scan_interval).await; + match get_ahm_services_cancel_token() { + Some(token) => { + tokio::select! { + _ = tokio::time::sleep(scan_interval) => {} + _ = token.cancelled() => { + info!("Cancellation requested, exiting legacy scan loop"); + break; + } + } + } + None => tokio::time::sleep(scan_interval).await, + } } Ok(()) diff --git a/crates/ahm/src/scanner/local_scan/mod.rs b/crates/ahm/src/scanner/local_scan/mod.rs new file mode 100644 index 00000000..96d2b6f7 --- /dev/null +++ b/crates/ahm/src/scanner/local_scan/mod.rs @@ -0,0 +1,664 @@ +use std::collections::{HashMap, HashSet}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::{Deserialize, Serialize}; +use serde_json::{from_slice, to_vec}; +use tokio::{fs, task}; +use tracing::warn; +use walkdir::WalkDir; + +use crate::error::{Error, Result}; + +use rustfs_common::data_usage::DiskUsageStatus; +use rustfs_ecstore::data_usage::{ + LocalUsageSnapshot, LocalUsageSnapshotMeta, data_usage_state_dir, ensure_data_usage_layout, snapshot_file_name, + write_local_snapshot, +}; +use rustfs_ecstore::disk::DiskAPI; +use rustfs_ecstore::store::ECStore; +use rustfs_ecstore::store_api::ObjectInfo; +use rustfs_filemeta::{FileInfo, FileMeta, FileMetaVersion, VersionType}; + +const STATE_FILE_EXTENSION: &str = ""; + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalObjectUsage { + pub bucket: String, + pub object: String, + pub last_modified_ns: Option, + pub versions_count: u64, + pub delete_markers_count: u64, + pub total_size: u64, + pub has_live_object: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct IncrementalScanState { + last_scan_ns: Option, + objects: HashMap, +} + +struct DiskScanResult { + snapshot: LocalUsageSnapshot, + state: IncrementalScanState, + objects_by_bucket: HashMap>, + status: DiskUsageStatus, +} + +#[derive(Debug, Clone)] +pub struct LocalObjectRecord { + pub usage: LocalObjectUsage, + pub object_info: Option, +} + +#[derive(Debug, Default)] +pub struct LocalScanOutcome { + pub snapshots: Vec, + pub bucket_objects: HashMap>, + pub disk_status: Vec, +} + +/// Scan all local primary disks and persist refreshed usage snapshots. +pub async fn scan_and_persist_local_usage(store: Arc) -> Result { + let mut snapshots = Vec::new(); + let mut bucket_objects: HashMap> = HashMap::new(); + let mut disk_status = Vec::new(); + + for (pool_idx, pool) in store.pools.iter().enumerate() { + for set_disks in pool.disk_set.iter() { + let disks = { + let guard = set_disks.disks.read().await; + guard.clone() + }; + + for (disk_index, disk_opt) in disks.into_iter().enumerate() { + let Some(disk) = disk_opt else { + continue; + }; + + if !disk.is_local() { + continue; + } + + // Count objects once by scanning only disk index zero from each set. + if disk_index != 0 { + continue; + } + + let disk_id = match disk.get_disk_id().await.map_err(Error::from)? { + Some(id) => id.to_string(), + None => { + warn!("Skipping disk without ID: {}", disk.to_string()); + continue; + } + }; + + let root = disk.path(); + ensure_data_usage_layout(root.as_path()).await.map_err(Error::from)?; + + let meta = LocalUsageSnapshotMeta { + disk_id: disk_id.clone(), + pool_index: Some(pool_idx), + set_index: Some(set_disks.set_index), + disk_index: Some(disk_index), + }; + + let state_path = state_file_path(root.as_path(), &disk_id); + let state = read_scan_state(&state_path).await?; + + let root_clone = root.clone(); + let meta_clone = meta.clone(); + + let handle = task::spawn_blocking(move || scan_disk_blocking(root_clone, meta_clone, state)); + + match handle.await { + Ok(Ok(result)) => { + write_local_snapshot(root.as_path(), &disk_id, &result.snapshot) + .await + .map_err(Error::from)?; + write_scan_state(&state_path, &result.state).await?; + snapshots.push(result.snapshot); + for (bucket, records) in result.objects_by_bucket { + bucket_objects.entry(bucket).or_default().extend(records.into_iter()); + } + disk_status.push(result.status); + } + Ok(Err(err)) => { + warn!("Failed to scan disk {}: {}", disk.to_string(), err); + } + Err(join_err) => { + warn!("Disk scan task panicked for disk {}: {}", disk.to_string(), join_err); + } + } + } + } + } + + Ok(LocalScanOutcome { + snapshots, + bucket_objects, + disk_status, + }) +} + +fn scan_disk_blocking(root: PathBuf, meta: LocalUsageSnapshotMeta, mut state: IncrementalScanState) -> Result { + let now = SystemTime::now(); + let now_ns = system_time_to_ns(now); + let mut visited: HashSet = HashSet::new(); + let mut emitted: HashSet = HashSet::new(); + let mut objects_by_bucket: HashMap> = HashMap::new(); + let mut status = DiskUsageStatus { + disk_id: meta.disk_id.clone(), + pool_index: meta.pool_index, + set_index: meta.set_index, + disk_index: meta.disk_index, + last_update: None, + snapshot_exists: false, + }; + + for entry in WalkDir::new(&root).follow_links(false).into_iter().filter_map(|res| res.ok()) { + if !entry.file_type().is_file() { + continue; + } + + if entry.file_name() != "xl.meta" { + continue; + } + + let xl_path = entry.path().to_path_buf(); + let Some(object_dir) = xl_path.parent() else { + continue; + }; + + let Some(rel_path) = object_dir.strip_prefix(&root).ok().map(normalize_path) else { + continue; + }; + + let mut components = rel_path.split('/'); + let Some(bucket_name) = components.next() else { + continue; + }; + + if bucket_name.starts_with('.') { + continue; + } + + let object_key = components.collect::>().join("/"); + + visited.insert(rel_path.clone()); + + let metadata = match std::fs::metadata(&xl_path) { + Ok(meta) => meta, + Err(err) => { + warn!("Failed to read metadata for {xl_path:?}: {err}"); + continue; + } + }; + + let mtime_ns = metadata.modified().ok().map(system_time_to_ns); + + let should_parse = match state.objects.get(&rel_path) { + Some(existing) => existing.last_modified_ns != mtime_ns, + None => true, + }; + + if should_parse { + match std::fs::read(&xl_path) { + Ok(buf) => match FileMeta::load(&buf) { + Ok(file_meta) => match compute_object_usage(bucket_name, object_key.as_str(), &file_meta) { + Ok(Some(mut record)) => { + record.usage.last_modified_ns = mtime_ns; + state.objects.insert(rel_path.clone(), record.usage.clone()); + emitted.insert(rel_path.clone()); + objects_by_bucket.entry(record.usage.bucket.clone()).or_default().push(record); + } + Ok(None) => { + state.objects.remove(&rel_path); + } + Err(err) => { + warn!("Failed to parse usage from {:?}: {}", xl_path, err); + } + }, + Err(err) => { + warn!("Failed to decode xl.meta {:?}: {}", xl_path, err); + } + }, + Err(err) => { + warn!("Failed to read xl.meta {:?}: {}", xl_path, err); + } + } + } + } + + state.objects.retain(|key, _| visited.contains(key)); + state.last_scan_ns = Some(now_ns); + + for (key, usage) in &state.objects { + if emitted.contains(key) { + continue; + } + objects_by_bucket + .entry(usage.bucket.clone()) + .or_default() + .push(LocalObjectRecord { + usage: usage.clone(), + object_info: None, + }); + } + + let snapshot = build_snapshot(meta, &state.objects, now); + status.snapshot_exists = true; + status.last_update = Some(now); + + Ok(DiskScanResult { + snapshot, + state, + objects_by_bucket, + status, + }) +} + +fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Result> { + let mut versions_count = 0u64; + let mut delete_markers_count = 0u64; + let mut total_size = 0u64; + let mut has_live_object = false; + + let mut latest_file_info: Option = None; + + for shallow in &file_meta.versions { + match shallow.header.version_type { + VersionType::Object => { + let version = match FileMetaVersion::try_from(shallow.meta.as_slice()) { + Ok(version) => version, + Err(err) => { + warn!("Failed to parse file meta version: {}", err); + continue; + } + }; + if let Some(obj) = version.object { + if !has_live_object { + total_size = obj.size.max(0) as u64; + } + has_live_object = true; + versions_count = versions_count.saturating_add(1); + + if latest_file_info.is_none() { + if let Ok(info) = file_meta.into_fileinfo(bucket, object, "", false, false) { + latest_file_info = Some(info); + } + } + } + } + VersionType::Delete => { + delete_markers_count = delete_markers_count.saturating_add(1); + versions_count = versions_count.saturating_add(1); + } + _ => {} + } + } + + if !has_live_object && delete_markers_count == 0 { + return Ok(None); + } + + let object_info = latest_file_info.as_ref().map(|fi| { + let versioned = fi.version_id.is_some(); + ObjectInfo::from_file_info(fi, bucket, object, versioned) + }); + + Ok(Some(LocalObjectRecord { + usage: LocalObjectUsage { + bucket: bucket.to_string(), + object: object.to_string(), + last_modified_ns: None, + versions_count, + delete_markers_count, + total_size, + has_live_object, + }, + object_info, + })) +} + +fn build_snapshot( + meta: LocalUsageSnapshotMeta, + objects: &HashMap, + now: SystemTime, +) -> LocalUsageSnapshot { + let mut snapshot = LocalUsageSnapshot::new(meta); + + for usage in objects.values() { + let bucket_entry = snapshot.buckets_usage.entry(usage.bucket.clone()).or_default(); + + if usage.has_live_object { + bucket_entry.objects_count = bucket_entry.objects_count.saturating_add(1); + } + bucket_entry.versions_count = bucket_entry.versions_count.saturating_add(usage.versions_count); + bucket_entry.delete_markers_count = bucket_entry.delete_markers_count.saturating_add(usage.delete_markers_count); + bucket_entry.size = bucket_entry.size.saturating_add(usage.total_size); + } + + snapshot.last_update = Some(now); + snapshot.recompute_totals(); + snapshot +} + +fn normalize_path(path: &Path) -> String { + path.iter() + .map(|component| component.to_string_lossy()) + .collect::>() + .join("/") +} + +fn system_time_to_ns(time: SystemTime) -> i128 { + match time.duration_since(UNIX_EPOCH) { + Ok(duration) => { + let secs = duration.as_secs() as i128; + let nanos = duration.subsec_nanos() as i128; + secs * 1_000_000_000 + nanos + } + Err(err) => { + let duration = err.duration(); + let secs = duration.as_secs() as i128; + let nanos = duration.subsec_nanos() as i128; + -(secs * 1_000_000_000 + nanos) + } + } +} + +fn state_file_path(root: &Path, disk_id: &str) -> PathBuf { + let mut path = data_usage_state_dir(root); + path.push(format!("{}{}", snapshot_file_name(disk_id), STATE_FILE_EXTENSION)); + path +} + +async fn read_scan_state(path: &Path) -> Result { + match fs::read(path).await { + Ok(bytes) => from_slice(&bytes).map_err(|err| Error::Serialization(err.to_string())), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(IncrementalScanState::default()), + Err(err) => Err(err.into()), + } +} + +async fn write_scan_state(path: &Path, state: &IncrementalScanState) -> Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } + let data = to_vec(state).map_err(|err| Error::Serialization(err.to_string()))?; + fs::write(path, data).await?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use rustfs_filemeta::{ChecksumAlgo, ErasureAlgo, FileMetaShallowVersion, MetaDeleteMarker, MetaObject}; + use std::collections::HashMap; + use std::fs; + use tempfile::TempDir; + use time::OffsetDateTime; + use uuid::Uuid; + + fn build_file_meta_with_object(erasure_index: usize, size: i64) -> FileMeta { + let mut file_meta = FileMeta::default(); + + let meta_object = MetaObject { + version_id: Some(Uuid::new_v4()), + data_dir: Some(Uuid::new_v4()), + erasure_algorithm: ErasureAlgo::ReedSolomon, + erasure_m: 2, + erasure_n: 2, + erasure_block_size: 4096, + erasure_index, + erasure_dist: vec![0_u8, 1, 2, 3], + bitrot_checksum_algo: ChecksumAlgo::HighwayHash, + part_numbers: vec![1], + part_etags: vec!["etag".to_string()], + part_sizes: vec![size as usize], + part_actual_sizes: vec![size], + part_indices: Vec::new(), + size, + mod_time: Some(OffsetDateTime::now_utc()), + meta_sys: HashMap::new(), + meta_user: HashMap::new(), + }; + + let version = FileMetaVersion { + version_type: VersionType::Object, + object: Some(meta_object), + delete_marker: None, + write_version: 1, + }; + + let shallow = FileMetaShallowVersion::try_from(version).expect("convert version"); + file_meta.versions.push(shallow); + file_meta + } + + fn build_file_meta_with_delete_marker() -> FileMeta { + let mut file_meta = FileMeta::default(); + + let delete_marker = MetaDeleteMarker { + version_id: Some(Uuid::new_v4()), + mod_time: Some(OffsetDateTime::now_utc()), + meta_sys: None, + }; + + let version = FileMetaVersion { + version_type: VersionType::Delete, + object: None, + delete_marker: Some(delete_marker), + write_version: 2, + }; + + let shallow = FileMetaShallowVersion::try_from(version).expect("convert delete marker"); + file_meta.versions.push(shallow); + file_meta + } + + #[test] + fn compute_object_usage_primary_disk() { + let file_meta = build_file_meta_with_object(0, 1024); + let record = compute_object_usage("bucket", "foo/bar", &file_meta) + .expect("compute usage") + .expect("record should exist"); + + assert!(record.usage.has_live_object); + assert_eq!(record.usage.bucket, "bucket"); + assert_eq!(record.usage.object, "foo/bar"); + assert_eq!(record.usage.total_size, 1024); + assert!(record.object_info.is_some(), "object info should be synthesized"); + } + + #[test] + fn compute_object_usage_handles_non_primary_disk() { + let file_meta = build_file_meta_with_object(1, 2048); + let record = compute_object_usage("bucket", "obj", &file_meta) + .expect("compute usage") + .expect("record should exist for non-primary shard"); + assert!(record.usage.has_live_object); + } + + #[test] + fn compute_object_usage_reports_delete_marker() { + let file_meta = build_file_meta_with_delete_marker(); + let record = compute_object_usage("bucket", "obj", &file_meta) + .expect("compute usage") + .expect("delete marker record"); + + assert!(!record.usage.has_live_object); + assert_eq!(record.usage.delete_markers_count, 1); + assert_eq!(record.usage.versions_count, 1); + } + + #[test] + fn build_snapshot_accumulates_usage() { + let mut objects = HashMap::new(); + objects.insert( + "bucket/a".to_string(), + LocalObjectUsage { + bucket: "bucket".to_string(), + object: "a".to_string(), + last_modified_ns: None, + versions_count: 2, + delete_markers_count: 1, + total_size: 512, + has_live_object: true, + }, + ); + + let snapshot = build_snapshot(LocalUsageSnapshotMeta::default(), &objects, SystemTime::now()); + let usage = snapshot.buckets_usage.get("bucket").expect("bucket entry should exist"); + assert_eq!(usage.objects_count, 1); + assert_eq!(usage.versions_count, 2); + assert_eq!(usage.delete_markers_count, 1); + assert_eq!(usage.size, 512); + } + + #[test] + fn scan_disk_blocking_handles_incremental_updates() { + let temp_dir = TempDir::new().expect("create temp dir"); + let root = temp_dir.path(); + + let bucket_dir = root.join("bench"); + let object1_dir = bucket_dir.join("obj1"); + fs::create_dir_all(&object1_dir).expect("create first object directory"); + + let file_meta = build_file_meta_with_object(0, 1024); + let bytes = file_meta.marshal_msg().expect("serialize first object"); + fs::write(object1_dir.join("xl.meta"), bytes).expect("write first xl.meta"); + + let meta = LocalUsageSnapshotMeta { + disk_id: "disk-test".to_string(), + ..Default::default() + }; + + let DiskScanResult { + snapshot: snapshot1, + state, + .. + } = scan_disk_blocking(root.to_path_buf(), meta.clone(), IncrementalScanState::default()).expect("initial scan succeeds"); + + let usage1 = snapshot1.buckets_usage.get("bench").expect("bucket stats recorded"); + assert_eq!(usage1.objects_count, 1); + assert_eq!(usage1.size, 1024); + assert_eq!(state.objects.len(), 1); + + let object2_dir = bucket_dir.join("nested").join("obj2"); + fs::create_dir_all(&object2_dir).expect("create second object directory"); + let second_meta = build_file_meta_with_object(0, 2048); + let bytes = second_meta.marshal_msg().expect("serialize second object"); + fs::write(object2_dir.join("xl.meta"), bytes).expect("write second xl.meta"); + + let DiskScanResult { + snapshot: snapshot2, + state: state_next, + .. + } = scan_disk_blocking(root.to_path_buf(), meta.clone(), state).expect("incremental scan succeeds"); + + let usage2 = snapshot2 + .buckets_usage + .get("bench") + .expect("bucket stats recorded after addition"); + assert_eq!(usage2.objects_count, 2); + assert_eq!(usage2.size, 1024 + 2048); + assert_eq!(state_next.objects.len(), 2); + + fs::remove_dir_all(&object1_dir).expect("remove first object"); + + let DiskScanResult { + snapshot: snapshot3, + state: state_final, + .. + } = scan_disk_blocking(root.to_path_buf(), meta, state_next).expect("scan after deletion succeeds"); + + let usage3 = snapshot3 + .buckets_usage + .get("bench") + .expect("bucket stats recorded after deletion"); + assert_eq!(usage3.objects_count, 1); + assert_eq!(usage3.size, 2048); + assert_eq!(state_final.objects.len(), 1); + assert!( + state_final.objects.keys().all(|path| path.contains("nested")), + "state should only keep surviving object" + ); + } + + #[test] + fn scan_disk_blocking_recovers_from_stale_state_entries() { + let temp_dir = TempDir::new().expect("create temp dir"); + let root = temp_dir.path(); + + let mut stale_state = IncrementalScanState::default(); + stale_state.objects.insert( + "bench/stale".to_string(), + LocalObjectUsage { + bucket: "bench".to_string(), + object: "stale".to_string(), + last_modified_ns: Some(42), + versions_count: 1, + delete_markers_count: 0, + total_size: 512, + has_live_object: true, + }, + ); + stale_state.last_scan_ns = Some(99); + + let meta = LocalUsageSnapshotMeta { + disk_id: "disk-test".to_string(), + ..Default::default() + }; + + let DiskScanResult { + snapshot, state, status, .. + } = scan_disk_blocking(root.to_path_buf(), meta, stale_state).expect("scan succeeds"); + + assert!(state.objects.is_empty(), "stale entries should be cleared when files disappear"); + assert!( + snapshot.buckets_usage.is_empty(), + "no real xl.meta files means bucket usage should stay empty" + ); + assert!(status.snapshot_exists, "snapshot status should indicate a refresh"); + } + + #[test] + fn scan_disk_blocking_handles_large_volume() { + const OBJECTS: usize = 256; + + let temp_dir = TempDir::new().expect("create temp dir"); + let root = temp_dir.path(); + let bucket_dir = root.join("bulk"); + + for idx in 0..OBJECTS { + let object_dir = bucket_dir.join(format!("obj-{idx:03}")); + fs::create_dir_all(&object_dir).expect("create object directory"); + let size = 1024 + idx as i64; + let file_meta = build_file_meta_with_object(0, size); + let bytes = file_meta.marshal_msg().expect("serialize file meta"); + fs::write(object_dir.join("xl.meta"), bytes).expect("write xl.meta"); + } + + let meta = LocalUsageSnapshotMeta { + disk_id: "disk-test".to_string(), + ..Default::default() + }; + + let DiskScanResult { snapshot, state, .. } = + scan_disk_blocking(root.to_path_buf(), meta, IncrementalScanState::default()).expect("bulk scan succeeds"); + + let bucket_usage = snapshot + .buckets_usage + .get("bulk") + .expect("bucket usage present for bulk scan"); + assert_eq!(bucket_usage.objects_count as usize, OBJECTS, "should count all objects once"); + assert!( + bucket_usage.size >= (1024 * OBJECTS) as u64, + "aggregated size should grow with object count" + ); + assert_eq!(state.objects.len(), OBJECTS, "incremental state tracks every object"); + } +} diff --git a/crates/ahm/src/scanner/local_stats.rs b/crates/ahm/src/scanner/local_stats.rs index eeef87f5..771cb1f6 100644 --- a/crates/ahm/src/scanner/local_stats.rs +++ b/crates/ahm/src/scanner/local_stats.rs @@ -349,6 +349,7 @@ impl LocalStatsManager { total_buckets: stats.buckets_stats.len(), last_update: stats.last_update, scan_progress: stats.scan_progress.clone(), + data_usage: stats.data_usage.clone(), } } @@ -427,4 +428,6 @@ pub struct StatsSummary { pub last_update: SystemTime, /// scan progress pub scan_progress: super::node_scanner::ScanProgress, + /// data usage snapshot for the node + pub data_usage: DataUsageInfo, } diff --git a/crates/ahm/src/scanner/mod.rs b/crates/ahm/src/scanner/mod.rs index 78ca12a3..f3fdd105 100644 --- a/crates/ahm/src/scanner/mod.rs +++ b/crates/ahm/src/scanner/mod.rs @@ -18,6 +18,7 @@ pub mod histogram; pub mod io_monitor; pub mod io_throttler; pub mod lifecycle; +pub mod local_scan; pub mod local_stats; pub mod metrics; pub mod node_scanner; diff --git a/crates/ahm/src/scanner/stats_aggregator.rs b/crates/ahm/src/scanner/stats_aggregator.rs index 420edc50..553407db 100644 --- a/crates/ahm/src/scanner/stats_aggregator.rs +++ b/crates/ahm/src/scanner/stats_aggregator.rs @@ -457,6 +457,7 @@ impl DecentralizedStatsAggregator { aggregated.total_heal_triggered += summary.total_heal_triggered; aggregated.total_disks += summary.total_disks; aggregated.total_buckets += summary.total_buckets; + aggregated.aggregated_data_usage.merge(&summary.data_usage); // aggregate scan progress aggregated @@ -570,3 +571,202 @@ pub struct CacheStatus { /// cache ttl pub ttl: Duration, } + +#[cfg(test)] +mod tests { + use super::*; + use crate::scanner::node_scanner::{BucketScanState, ScanProgress}; + use rustfs_common::data_usage::{BucketUsageInfo, DataUsageInfo}; + use std::collections::{HashMap, HashSet}; + use std::time::Duration; + + #[tokio::test] + async fn aggregated_stats_merge_data_usage() { + let aggregator = DecentralizedStatsAggregator::new(DecentralizedStatsAggregatorConfig::default()); + + let mut data_usage = DataUsageInfo::default(); + let bucket_usage = BucketUsageInfo { + objects_count: 5, + size: 1024, + ..Default::default() + }; + data_usage.buckets_usage.insert("bucket".to_string(), bucket_usage); + data_usage.objects_total_count = 5; + data_usage.objects_total_size = 1024; + + let summary = StatsSummary { + node_id: "local-node".to_string(), + total_objects_scanned: 10, + total_healthy_objects: 9, + total_corrupted_objects: 1, + total_bytes_scanned: 2048, + total_scan_errors: 0, + total_heal_triggered: 0, + total_disks: 2, + total_buckets: 1, + last_update: SystemTime::now(), + scan_progress: ScanProgress::default(), + data_usage: data_usage.clone(), + }; + + aggregator.set_local_stats(summary).await; + + // Wait briefly to ensure async cache writes settle in high-concurrency environments + tokio::time::sleep(Duration::from_millis(10)).await; + + let aggregated = aggregator.get_aggregated_stats().await.expect("aggregated stats"); + + assert_eq!(aggregated.node_count, 1); + assert!(aggregated.node_summaries.contains_key("local-node")); + assert_eq!(aggregated.aggregated_data_usage.objects_total_count, 5); + assert_eq!( + aggregated + .aggregated_data_usage + .buckets_usage + .get("bucket") + .expect("bucket usage present") + .objects_count, + 5 + ); + } + + #[tokio::test] + async fn aggregated_stats_merge_multiple_nodes() { + let aggregator = DecentralizedStatsAggregator::new(DecentralizedStatsAggregatorConfig::default()); + + let mut local_usage = DataUsageInfo::default(); + let local_bucket = BucketUsageInfo { + objects_count: 3, + versions_count: 3, + size: 150, + ..Default::default() + }; + local_usage.buckets_usage.insert("local-bucket".to_string(), local_bucket); + local_usage.calculate_totals(); + local_usage.buckets_count = local_usage.buckets_usage.len() as u64; + local_usage.last_update = Some(SystemTime::now()); + + let local_progress = ScanProgress { + current_cycle: 1, + completed_disks: { + let mut set = std::collections::HashSet::new(); + set.insert("disk-local".to_string()); + set + }, + completed_buckets: { + let mut map = std::collections::HashMap::new(); + map.insert( + "local-bucket".to_string(), + BucketScanState { + completed: true, + last_object_key: Some("obj1".to_string()), + objects_scanned: 3, + scan_timestamp: SystemTime::now(), + }, + ); + map + }, + ..Default::default() + }; + + let local_summary = StatsSummary { + node_id: "node-local".to_string(), + total_objects_scanned: 30, + total_healthy_objects: 30, + total_corrupted_objects: 0, + total_bytes_scanned: 1500, + total_scan_errors: 0, + total_heal_triggered: 0, + total_disks: 1, + total_buckets: 1, + last_update: SystemTime::now(), + scan_progress: local_progress, + data_usage: local_usage.clone(), + }; + + let mut remote_usage = DataUsageInfo::default(); + let remote_bucket = BucketUsageInfo { + objects_count: 5, + versions_count: 5, + size: 250, + ..Default::default() + }; + remote_usage.buckets_usage.insert("remote-bucket".to_string(), remote_bucket); + remote_usage.calculate_totals(); + remote_usage.buckets_count = remote_usage.buckets_usage.len() as u64; + remote_usage.last_update = Some(SystemTime::now()); + + let remote_progress = ScanProgress { + current_cycle: 2, + completed_disks: { + let mut set = std::collections::HashSet::new(); + set.insert("disk-remote".to_string()); + set + }, + completed_buckets: { + let mut map = std::collections::HashMap::new(); + map.insert( + "remote-bucket".to_string(), + BucketScanState { + completed: true, + last_object_key: Some("remote-obj".to_string()), + objects_scanned: 5, + scan_timestamp: SystemTime::now(), + }, + ); + map + }, + ..Default::default() + }; + + let remote_summary = StatsSummary { + node_id: "node-remote".to_string(), + total_objects_scanned: 50, + total_healthy_objects: 48, + total_corrupted_objects: 2, + total_bytes_scanned: 2048, + total_scan_errors: 1, + total_heal_triggered: 1, + total_disks: 2, + total_buckets: 1, + last_update: SystemTime::now(), + scan_progress: remote_progress, + data_usage: remote_usage.clone(), + }; + let node_summaries: HashMap<_, _> = [ + (local_summary.node_id.clone(), local_summary.clone()), + (remote_summary.node_id.clone(), remote_summary.clone()), + ] + .into_iter() + .collect(); + + let aggregated = aggregator.aggregate_node_summaries(node_summaries, SystemTime::now()).await; + + assert_eq!(aggregated.node_count, 2); + assert_eq!(aggregated.total_objects_scanned, 80); + assert_eq!(aggregated.total_corrupted_objects, 2); + assert_eq!(aggregated.total_disks, 3); + assert!(aggregated.node_summaries.contains_key("node-local")); + assert!(aggregated.node_summaries.contains_key("node-remote")); + + assert_eq!( + aggregated.aggregated_data_usage.objects_total_count, + local_usage.objects_total_count + remote_usage.objects_total_count + ); + assert_eq!( + aggregated.aggregated_data_usage.objects_total_size, + local_usage.objects_total_size + remote_usage.objects_total_size + ); + + let mut expected_buckets: HashSet<&str> = HashSet::new(); + expected_buckets.insert("local-bucket"); + expected_buckets.insert("remote-bucket"); + let actual_buckets: HashSet<&str> = aggregated + .aggregated_data_usage + .buckets_usage + .keys() + .map(|s| s.as_str()) + .collect(); + assert_eq!(expected_buckets, actual_buckets); + } +} diff --git a/crates/ahm/tests/integration_tests.rs b/crates/ahm/tests/integration_tests.rs index 1fddff7b..b34bdc42 100644 --- a/crates/ahm/tests/integration_tests.rs +++ b/crates/ahm/tests/integration_tests.rs @@ -195,6 +195,7 @@ async fn test_distributed_stats_aggregation() { total_buckets: 5, last_update: std::time::SystemTime::now(), scan_progress: Default::default(), + data_usage: rustfs_common::data_usage::DataUsageInfo::default(), }; aggregator.set_local_stats(local_stats).await; diff --git a/crates/ahm/tests/lifecycle_integration_test.rs b/crates/ahm/tests/lifecycle_integration_test.rs index 0f3b20af..8d423f4b 100644 --- a/crates/ahm/tests/lifecycle_integration_test.rs +++ b/crates/ahm/tests/lifecycle_integration_test.rs @@ -271,7 +271,10 @@ async fn create_test_tier() { /// Test helper: Check if object exists async fn object_exists(ecstore: &Arc, bucket: &str, object: &str) -> bool { - ((**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await).is_ok() + match (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await { + Ok(info) => !info.delete_marker, + Err(_) => false, + } } /// Test helper: Check if object exists diff --git a/crates/common/src/data_usage.rs b/crates/common/src/data_usage.rs index d07a3cd7..65f6fd83 100644 --- a/crates/common/src/data_usage.rs +++ b/crates/common/src/data_usage.rs @@ -144,6 +144,20 @@ pub struct DataUsageInfo { pub buckets_usage: HashMap, /// Deprecated kept here for backward compatibility reasons pub bucket_sizes: HashMap, + /// Per-disk snapshot information when available + #[serde(default)] + pub disk_usage_status: Vec, +} + +/// Metadata describing the status of a disk-level data usage snapshot. +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct DiskUsageStatus { + pub disk_id: String, + pub pool_index: Option, + pub set_index: Option, + pub disk_index: Option, + pub last_update: Option, + pub snapshot_exists: bool, } /// Size summary for a single object or group of objects @@ -1127,6 +1141,8 @@ impl DataUsageInfo { } } + self.disk_usage_status.extend(other.disk_usage_status.iter().cloned()); + // Recalculate totals self.calculate_totals(); diff --git a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs index e34e822d..6adbbe85 100644 --- a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs +++ b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs @@ -326,7 +326,7 @@ impl Lifecycle for BucketLifecycleConfiguration { if let Some(days) = expiration.days { let expected_expiry = expected_expiry_time(obj.mod_time.expect("err!"), days /*, date*/); - if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() { + if now.unix_timestamp() >= expected_expiry.unix_timestamp() { events.push(Event { action: IlmAction::DeleteVersionAction, rule_id: rule.id.clone().expect("err!"), @@ -347,7 +347,7 @@ impl Lifecycle for BucketLifecycleConfiguration { if obj.delete_marker && expired_object_delete_marker { let due = expiration.next_due(obj); if let Some(due) = due { - if now.unix_timestamp() == 0 || now.unix_timestamp() > due.unix_timestamp() { + if now.unix_timestamp() >= due.unix_timestamp() { events.push(Event { action: IlmAction::DelMarkerDeleteAllVersionsAction, rule_id: rule.id.clone().expect("err!"), @@ -380,7 +380,7 @@ impl Lifecycle for BucketLifecycleConfiguration { if noncurrent_days != 0 { if let Some(successor_mod_time) = obj.successor_mod_time { let expected_expiry = expected_expiry_time(successor_mod_time, noncurrent_days); - if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() { + if now.unix_timestamp() >= expected_expiry.unix_timestamp() { events.push(Event { action: IlmAction::DeleteVersionAction, rule_id: rule.id.clone().expect("err!"), @@ -402,9 +402,7 @@ impl Lifecycle for BucketLifecycleConfiguration { if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE { let due = rule.noncurrent_version_transitions.as_ref().unwrap()[0].next_due(obj); - if due.is_some() - && (now.unix_timestamp() == 0 || now.unix_timestamp() > due.unwrap().unix_timestamp()) - { + if due.is_some() && (now.unix_timestamp() >= due.unwrap().unix_timestamp()) { events.push(Event { action: IlmAction::TransitionVersionAction, rule_id: rule.id.clone().expect("err!"), @@ -436,9 +434,7 @@ impl Lifecycle for BucketLifecycleConfiguration { if let Some(ref expiration) = rule.expiration { if let Some(ref date) = expiration.date { let date0 = OffsetDateTime::from(date.clone()); - if date0.unix_timestamp() != 0 - && (now.unix_timestamp() == 0 || now.unix_timestamp() > date0.unix_timestamp()) - { + if date0.unix_timestamp() != 0 && (now.unix_timestamp() >= date0.unix_timestamp()) { info!("eval_inner: expiration by date - date0={:?}", date0); events.push(Event { action: IlmAction::DeleteAction, @@ -459,7 +455,7 @@ impl Lifecycle for BucketLifecycleConfiguration { now, now.unix_timestamp() > expected_expiry.unix_timestamp() ); - if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() { + if now.unix_timestamp() >= expected_expiry.unix_timestamp() { info!("eval_inner: object should expire, adding DeleteAction"); let mut event = Event { action: IlmAction::DeleteAction, @@ -485,9 +481,7 @@ impl Lifecycle for BucketLifecycleConfiguration { if let Some(ref transitions) = rule.transitions { let due = transitions[0].next_due(obj); if let Some(due) = due { - if due.unix_timestamp() > 0 - && (now.unix_timestamp() == 0 || now.unix_timestamp() > due.unix_timestamp()) - { + if due.unix_timestamp() > 0 && (now.unix_timestamp() >= due.unix_timestamp()) { events.push(Event { action: IlmAction::TransitionAction, rule_id: rule.id.clone().expect("err!"), diff --git a/crates/ecstore/src/data_usage.rs b/crates/ecstore/src/data_usage.rs index c9dbed85..001a054f 100644 --- a/crates/ecstore/src/data_usage.rs +++ b/crates/ecstore/src/data_usage.rs @@ -12,10 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, hash_map::Entry}, + sync::Arc, + time::SystemTime, +}; -use crate::{bucket::metadata_sys::get_replication_config, config::com::read_config, store::ECStore, store_api::StorageAPI}; -use rustfs_common::data_usage::{BucketTargetUsageInfo, DataUsageCache, DataUsageEntry, DataUsageInfo, SizeSummary}; +pub mod local_snapshot; +pub use local_snapshot::{ + DATA_USAGE_DIR, DATA_USAGE_STATE_DIR, LOCAL_USAGE_SNAPSHOT_VERSION, LocalUsageSnapshot, LocalUsageSnapshotMeta, + data_usage_dir, data_usage_state_dir, ensure_data_usage_layout, read_snapshot as read_local_snapshot, snapshot_file_name, + snapshot_object_path, snapshot_path, write_snapshot as write_local_snapshot, +}; + +use crate::{ + bucket::metadata_sys::get_replication_config, config::com::read_config, disk::DiskAPI, store::ECStore, store_api::StorageAPI, +}; +use rustfs_common::data_usage::{ + BucketTargetUsageInfo, BucketUsageInfo, DataUsageCache, DataUsageEntry, DataUsageInfo, DiskUsageStatus, SizeSummary, +}; use rustfs_utils::path::SLASH_SEPARATOR; use tracing::{error, info, warn}; @@ -144,6 +159,178 @@ pub async fn load_data_usage_from_backend(store: Arc) -> Result) -> Result<(Vec, DataUsageInfo), Error> { + let mut aggregated = DataUsageInfo::default(); + let mut latest_update: Option = None; + let mut statuses: Vec = Vec::new(); + + for (pool_idx, pool) in store.pools.iter().enumerate() { + for set_disks in pool.disk_set.iter() { + let disk_entries = { + let guard = set_disks.disks.read().await; + guard.clone() + }; + + for (disk_index, disk_opt) in disk_entries.into_iter().enumerate() { + let Some(disk) = disk_opt else { + continue; + }; + + if !disk.is_local() { + continue; + } + + let disk_id = match disk.get_disk_id().await.map_err(Error::from)? { + Some(id) => id.to_string(), + None => continue, + }; + + let root = disk.path(); + let mut status = DiskUsageStatus { + disk_id: disk_id.clone(), + pool_index: Some(pool_idx), + set_index: Some(set_disks.set_index), + disk_index: Some(disk_index), + last_update: None, + snapshot_exists: false, + }; + + if let Some(mut snapshot) = read_local_snapshot(root.as_path(), &disk_id).await? { + status.last_update = snapshot.last_update; + status.snapshot_exists = true; + + if snapshot.meta.disk_id.is_empty() { + snapshot.meta.disk_id = disk_id.clone(); + } + if snapshot.meta.pool_index.is_none() { + snapshot.meta.pool_index = Some(pool_idx); + } + if snapshot.meta.set_index.is_none() { + snapshot.meta.set_index = Some(set_disks.set_index); + } + if snapshot.meta.disk_index.is_none() { + snapshot.meta.disk_index = Some(disk_index); + } + + snapshot.recompute_totals(); + + if let Some(update) = snapshot.last_update { + if latest_update.is_none_or(|current| update > current) { + latest_update = Some(update); + } + } + + aggregated.objects_total_count = aggregated.objects_total_count.saturating_add(snapshot.objects_total_count); + aggregated.versions_total_count = + aggregated.versions_total_count.saturating_add(snapshot.versions_total_count); + aggregated.delete_markers_total_count = aggregated + .delete_markers_total_count + .saturating_add(snapshot.delete_markers_total_count); + aggregated.objects_total_size = aggregated.objects_total_size.saturating_add(snapshot.objects_total_size); + + for (bucket, usage) in snapshot.buckets_usage.into_iter() { + let bucket_size = usage.size; + match aggregated.buckets_usage.entry(bucket.clone()) { + Entry::Occupied(mut entry) => entry.get_mut().merge(&usage), + Entry::Vacant(entry) => { + entry.insert(usage.clone()); + } + } + + aggregated + .bucket_sizes + .entry(bucket) + .and_modify(|size| *size = size.saturating_add(bucket_size)) + .or_insert(bucket_size); + } + } + + statuses.push(status); + } + } + } + + aggregated.buckets_count = aggregated.buckets_usage.len() as u64; + aggregated.last_update = latest_update; + aggregated.disk_usage_status = statuses.clone(); + + Ok((statuses, aggregated)) +} + +/// Calculate accurate bucket usage statistics by enumerating objects through the object layer. +pub async fn compute_bucket_usage(store: Arc, bucket_name: &str) -> Result { + let mut continuation: Option = None; + let mut objects_count: u64 = 0; + let mut versions_count: u64 = 0; + let mut total_size: u64 = 0; + let mut delete_markers: u64 = 0; + + loop { + let result = store + .clone() + .list_objects_v2( + bucket_name, + "", // prefix + continuation.clone(), + None, // delimiter + 1000, // max_keys + false, // fetch_owner + None, // start_after + ) + .await?; + + for object in result.objects.iter() { + if object.is_dir { + continue; + } + + if object.delete_marker { + delete_markers = delete_markers.saturating_add(1); + continue; + } + + let object_size = object.size.max(0) as u64; + objects_count = objects_count.saturating_add(1); + total_size = total_size.saturating_add(object_size); + + let detected_versions = if object.num_versions > 0 { + object.num_versions as u64 + } else { + 1 + }; + versions_count = versions_count.saturating_add(detected_versions); + } + + if !result.is_truncated { + break; + } + + continuation = result.next_continuation_token.clone(); + if continuation.is_none() { + warn!( + "Bucket {} listing marked truncated but no continuation token returned; stopping early", + bucket_name + ); + break; + } + } + + if versions_count == 0 { + versions_count = objects_count; + } + + let usage = BucketUsageInfo { + size: total_size, + objects_count, + versions_count, + delete_markers_count: delete_markers, + ..Default::default() + }; + + Ok(usage) +} + /// Build basic data usage info with real object counts async fn build_basic_data_usage_info(store: Arc) -> Result { let mut data_usage_info = DataUsageInfo::default(); @@ -152,56 +339,40 @@ async fn build_basic_data_usage_info(store: Arc) -> Result { data_usage_info.buckets_count = buckets.len() as u64; - data_usage_info.last_update = Some(std::time::SystemTime::now()); + data_usage_info.last_update = Some(SystemTime::now()); let mut total_objects = 0u64; + let mut total_versions = 0u64; let mut total_size = 0u64; + let mut total_delete_markers = 0u64; for bucket_info in buckets { if bucket_info.name.starts_with('.') { continue; // Skip system buckets } - // Try to get actual object count for this bucket - let (object_count, bucket_size) = match store - .clone() - .list_objects_v2( - &bucket_info.name, - "", // prefix - None, // continuation_token - None, // delimiter - 100, // max_keys - small limit for performance - false, // fetch_owner - None, // start_after - ) - .await - { - Ok(result) => { - let count = result.objects.len() as u64; - let size = result.objects.iter().map(|obj| obj.size as u64).sum(); - (count, size) + match compute_bucket_usage(store.clone(), &bucket_info.name).await { + Ok(bucket_usage) => { + total_objects = total_objects.saturating_add(bucket_usage.objects_count); + total_versions = total_versions.saturating_add(bucket_usage.versions_count); + total_size = total_size.saturating_add(bucket_usage.size); + total_delete_markers = total_delete_markers.saturating_add(bucket_usage.delete_markers_count); + + data_usage_info + .buckets_usage + .insert(bucket_info.name.clone(), bucket_usage.clone()); + data_usage_info.bucket_sizes.insert(bucket_info.name, bucket_usage.size); } - Err(_) => (0, 0), - }; - - total_objects += object_count; - total_size += bucket_size; - - let bucket_usage = rustfs_common::data_usage::BucketUsageInfo { - size: bucket_size, - objects_count: object_count, - versions_count: object_count, // Simplified - delete_markers_count: 0, - ..Default::default() - }; - - data_usage_info.buckets_usage.insert(bucket_info.name.clone(), bucket_usage); - data_usage_info.bucket_sizes.insert(bucket_info.name, bucket_size); + Err(e) => { + warn!("Failed to compute bucket usage for {}: {}", bucket_info.name, e); + } + } } data_usage_info.objects_total_count = total_objects; + data_usage_info.versions_total_count = total_versions; data_usage_info.objects_total_size = total_size; - data_usage_info.versions_total_count = total_objects; + data_usage_info.delete_markers_total_count = total_delete_markers; } Err(e) => { warn!("Failed to list buckets for basic data usage info: {}", e); diff --git a/crates/ecstore/src/data_usage/local_snapshot.rs b/crates/ecstore/src/data_usage/local_snapshot.rs new file mode 100644 index 00000000..7d81a140 --- /dev/null +++ b/crates/ecstore/src/data_usage/local_snapshot.rs @@ -0,0 +1,145 @@ +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::time::SystemTime; + +use serde::{Deserialize, Serialize}; +use tokio::fs; + +use crate::data_usage::BucketUsageInfo; +use crate::disk::RUSTFS_META_BUCKET; +use crate::error::{Error, Result}; + +/// Directory used to store per-disk usage snapshots under the metadata bucket. +pub const DATA_USAGE_DIR: &str = "datausage"; +/// Directory used to store incremental scan state files under the metadata bucket. +pub const DATA_USAGE_STATE_DIR: &str = "datausage/state"; +/// Snapshot file format version, allows forward compatibility if the structure evolves. +pub const LOCAL_USAGE_SNAPSHOT_VERSION: u32 = 1; + +/// Additional metadata describing which disk produced the snapshot. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalUsageSnapshotMeta { + /// Disk UUID stored as a string for simpler serialization. + pub disk_id: String, + /// Pool index if this disk is bound to a specific pool. + pub pool_index: Option, + /// Set index if known. + pub set_index: Option, + /// Disk index inside the set if known. + pub disk_index: Option, +} + +/// Usage snapshot produced by a single disk. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalUsageSnapshot { + /// Format version recorded in the snapshot. + pub format_version: u32, + /// Snapshot metadata, including disk identity. + pub meta: LocalUsageSnapshotMeta, + /// Wall-clock timestamp when the snapshot was produced. + pub last_update: Option, + /// Per-bucket usage statistics. + pub buckets_usage: HashMap, + /// Cached bucket count to speed up aggregations. + pub buckets_count: u64, + /// Total objects counted on this disk. + pub objects_total_count: u64, + /// Total versions counted on this disk. + pub versions_total_count: u64, + /// Total delete markers counted on this disk. + pub delete_markers_total_count: u64, + /// Total bytes occupied by objects on this disk. + pub objects_total_size: u64, +} + +impl LocalUsageSnapshot { + /// Create an empty snapshot with the default format version filled in. + pub fn new(meta: LocalUsageSnapshotMeta) -> Self { + Self { + format_version: LOCAL_USAGE_SNAPSHOT_VERSION, + meta, + ..Default::default() + } + } + + /// Recalculate cached totals from the per-bucket map. + pub fn recompute_totals(&mut self) { + let mut buckets_count = 0u64; + let mut objects_total_count = 0u64; + let mut versions_total_count = 0u64; + let mut delete_markers_total_count = 0u64; + let mut objects_total_size = 0u64; + + for usage in self.buckets_usage.values() { + buckets_count = buckets_count.saturating_add(1); + objects_total_count = objects_total_count.saturating_add(usage.objects_count); + versions_total_count = versions_total_count.saturating_add(usage.versions_count); + delete_markers_total_count = delete_markers_total_count.saturating_add(usage.delete_markers_count); + objects_total_size = objects_total_size.saturating_add(usage.size); + } + + self.buckets_count = buckets_count; + self.objects_total_count = objects_total_count; + self.versions_total_count = versions_total_count; + self.delete_markers_total_count = delete_markers_total_count; + self.objects_total_size = objects_total_size; + } +} + +/// Build the snapshot file name `.json`. +pub fn snapshot_file_name(disk_id: &str) -> String { + format!("{}.json", disk_id) +} + +/// Build the object path relative to `RUSTFS_META_BUCKET`, e.g. `datausage/.json`. +pub fn snapshot_object_path(disk_id: &str) -> String { + format!("{}/{}", DATA_USAGE_DIR, snapshot_file_name(disk_id)) +} + +/// Return the absolute path to `.rustfs.sys/datausage` on the given disk root. +pub fn data_usage_dir(root: &Path) -> PathBuf { + root.join(RUSTFS_META_BUCKET).join(DATA_USAGE_DIR) +} + +/// Return the absolute path to `.rustfs.sys/datausage/state` on the given disk root. +pub fn data_usage_state_dir(root: &Path) -> PathBuf { + root.join(RUSTFS_META_BUCKET).join(DATA_USAGE_STATE_DIR) +} + +/// Build the absolute path to the snapshot file for the provided disk ID. +pub fn snapshot_path(root: &Path, disk_id: &str) -> PathBuf { + data_usage_dir(root).join(snapshot_file_name(disk_id)) +} + +/// Read a snapshot from disk if it exists. +pub async fn read_snapshot(root: &Path, disk_id: &str) -> Result> { + let path = snapshot_path(root, disk_id); + match fs::read(&path).await { + Ok(content) => { + let snapshot = serde_json::from_slice::(&content) + .map_err(|err| Error::other(format!("failed to deserialize snapshot {path:?}: {err}")))?; + Ok(Some(snapshot)) + } + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(err) => Err(Error::other(err)), + } +} + +/// Persist a snapshot to disk, creating directories as needed and overwriting any existing file. +pub async fn write_snapshot(root: &Path, disk_id: &str, snapshot: &LocalUsageSnapshot) -> Result<()> { + let dir = data_usage_dir(root); + fs::create_dir_all(&dir).await.map_err(Error::other)?; + let path = dir.join(snapshot_file_name(disk_id)); + let data = serde_json::to_vec_pretty(snapshot) + .map_err(|err| Error::other(format!("failed to serialize snapshot {path:?}: {err}")))?; + fs::write(&path, data).await.map_err(Error::other) +} + +/// Ensure that the data usage directory structure exists on this disk root. +pub async fn ensure_data_usage_layout(root: &Path) -> Result<()> { + let usage_dir = data_usage_dir(root); + fs::create_dir_all(&usage_dir).await.map_err(Error::other)?; + let state_dir = data_usage_state_dir(root); + fs::create_dir_all(&state_dir).await.map_err(Error::other)?; + Ok(()) +} diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index e066885a..cf556f31 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -21,6 +21,7 @@ use super::{ }; use super::{endpoint::Endpoint, error::DiskError, format::FormatV3}; +use crate::data_usage::local_snapshot::ensure_data_usage_layout; use crate::disk::error::FileAccessDeniedWithContext; use crate::disk::error_conv::{to_access_error, to_file_error, to_unformatted_disk_error, to_volume_error}; use crate::disk::fs::{ @@ -147,8 +148,10 @@ impl LocalDisk { } }; + ensure_data_usage_layout(&root).await.map_err(DiskError::from)?; + if cleanup { - // TODO: 删除 tmp 数据 + // TODO: remove temporary data } // Use optimized path resolution instead of absolutize_virtually diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 8d5bcf73..ddb9227f 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -30,7 +30,9 @@ use rustfs_ecstore::bucket::metadata_sys::{self, get_replication_config}; use rustfs_ecstore::bucket::target::BucketTarget; use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys; use rustfs_ecstore::cmd::bucket_targets::{self, GLOBAL_Bucket_Target_Sys}; -use rustfs_ecstore::data_usage::load_data_usage_from_backend; +use rustfs_ecstore::data_usage::{ + aggregate_local_snapshots, compute_bucket_usage, load_data_usage_from_backend, store_data_usage_in_backend, +}; use rustfs_ecstore::error::StorageError; use rustfs_ecstore::global::get_global_action_cred; use rustfs_ecstore::metrics_realtime::{CollectMetricsOpts, MetricType, collect_local_metrics}; @@ -437,20 +439,67 @@ impl Operation for DataUsageInfoHandler { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); }; - let mut info = load_data_usage_from_backend(store.clone()).await.map_err(|e| { - error!("load_data_usage_from_backend failed {:?}", e); - s3_error!(InternalError, "load_data_usage_from_backend failed") - })?; + let (disk_statuses, mut info) = match aggregate_local_snapshots(store.clone()).await { + Ok((statuses, usage)) => (statuses, usage), + Err(err) => { + warn!("aggregate_local_snapshots failed: {:?}", err); + ( + Vec::new(), + load_data_usage_from_backend(store.clone()).await.map_err(|e| { + error!("load_data_usage_from_backend failed {:?}", e); + s3_error!(InternalError, "load_data_usage_from_backend failed") + })?, + ) + } + }; - // If no valid data exists, attempt real-time collection - if info.objects_total_count == 0 && info.buckets_count == 0 { + let snapshots_available = disk_statuses.iter().any(|status| status.snapshot_exists); + if !snapshots_available { + if let Ok(fallback) = load_data_usage_from_backend(store.clone()).await { + let mut fallback_info = fallback; + fallback_info.disk_usage_status = disk_statuses.clone(); + info = fallback_info; + } + } else { + info.disk_usage_status = disk_statuses.clone(); + } + + let last_update_age = info.last_update.and_then(|ts| ts.elapsed().ok()); + let data_missing = info.objects_total_count == 0 && info.buckets_count == 0; + let stale = last_update_age + .map(|elapsed| elapsed > std::time::Duration::from_secs(300)) + .unwrap_or(true); + + if data_missing { info!("No data usage statistics found, attempting real-time collection"); if let Err(e) = collect_realtime_data_usage(&mut info, store.clone()).await { warn!("Failed to collect real-time data usage: {}", e); + } else if let Err(e) = store_data_usage_in_backend(info.clone(), store.clone()).await { + warn!("Failed to persist refreshed data usage: {}", e); } + } else if stale { + info!( + "Data usage statistics are stale (last update {:?} ago), refreshing asynchronously", + last_update_age + ); + + let mut info_for_refresh = info.clone(); + let store_for_refresh = store.clone(); + tokio::spawn(async move { + if let Err(e) = collect_realtime_data_usage(&mut info_for_refresh, store_for_refresh.clone()).await { + warn!("Background data usage refresh failed: {}", e); + return; + } + + if let Err(e) = store_data_usage_in_backend(info_for_refresh, store_for_refresh).await { + warn!("Background data usage persistence failed: {}", e); + } + }); } + info.disk_usage_status = disk_statuses; + // Set capacity information let sinfo = store.storage_info().await; info.total_capacity = get_total_usable_capacity(&sinfo.disks, &sinfo) as u64; @@ -1192,9 +1241,18 @@ async fn collect_realtime_data_usage( info.buckets_count = buckets.len() as u64; info.last_update = Some(std::time::SystemTime::now()); + info.buckets_usage.clear(); + info.bucket_sizes.clear(); + info.disk_usage_status.clear(); + info.objects_total_count = 0; + info.objects_total_size = 0; + info.versions_total_count = 0; + info.delete_markers_total_count = 0; let mut total_objects = 0u64; + let mut total_versions = 0u64; let mut total_size = 0u64; + let mut total_delete_markers = 0u64; // For each bucket, try to get object count for bucket_info in buckets { @@ -1205,61 +1263,30 @@ async fn collect_realtime_data_usage( continue; } - // Try to count objects in this bucket - let (object_count, bucket_size) = count_bucket_objects(&store, bucket_name).await.unwrap_or((0, 0)); + match compute_bucket_usage(store.clone(), bucket_name).await { + Ok(bucket_usage) => { + total_objects = total_objects.saturating_add(bucket_usage.objects_count); + total_versions = total_versions.saturating_add(bucket_usage.versions_count); + total_size = total_size.saturating_add(bucket_usage.size); + total_delete_markers = total_delete_markers.saturating_add(bucket_usage.delete_markers_count); - total_objects += object_count; - total_size += bucket_size; - - let bucket_usage = rustfs_common::data_usage::BucketUsageInfo { - objects_count: object_count, - size: bucket_size, - versions_count: object_count, // Simplified: assume 1 version per object - ..Default::default() - }; - - info.buckets_usage.insert(bucket_name.clone(), bucket_usage); - info.bucket_sizes.insert(bucket_name.clone(), bucket_size); + info.buckets_usage.insert(bucket_name.clone(), bucket_usage.clone()); + info.bucket_sizes.insert(bucket_name.clone(), bucket_usage.size); + } + Err(e) => { + warn!("Failed to compute bucket usage for {}: {}", bucket_name, e); + } + } } info.objects_total_count = total_objects; info.objects_total_size = total_size; - info.versions_total_count = total_objects; // Simplified + info.versions_total_count = total_versions; + info.delete_markers_total_count = total_delete_markers; Ok(()) } -/// Helper function to count objects in a bucket -async fn count_bucket_objects( - store: &Arc, - bucket_name: &str, -) -> Result<(u64, u64), Box> { - // Use list_objects_v2 to get actual object count - match store - .clone() - .list_objects_v2( - bucket_name, - "", // prefix - None, // continuation_token - None, // delimiter - 1000, // max_keys - limit for performance - false, // fetch_owner - None, // start_after - ) - .await - { - Ok(result) => { - let object_count = result.objects.len() as u64; - let total_size = result.objects.iter().map(|obj| obj.size as u64).sum(); - Ok((object_count, total_size)) - } - Err(e) => { - warn!("Failed to list objects in bucket {}: {}", bucket_name, e); - Ok((0, 0)) - } - } -} - pub struct ProfileHandler {} #[async_trait::async_trait] impl Operation for ProfileHandler {