Fix collect (#586)

* fix: fix datausageinfo

Signed-off-by: junxiang Mu <1948535941@qq.com>

* feat(data-usage): implement local disk snapshot aggregation for data usage statistics

Signed-off-by: junxiang Mu <1948535941@qq.com>

* feat(scanner): improve data usage collection with local scan aggregation

Signed-off-by: junxiang Mu <1948535941@qq.com>

* refactor: improve object existence check and code style

Signed-off-by: junxiang Mu <1948535941@qq.com>

---------

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
guojidan
2025-09-24 02:48:23 -07:00
committed by GitHub
parent ef0dbaaeb5
commit 12ecb36c6d
15 changed files with 1626 additions and 376 deletions

View File

@@ -1,19 +1,22 @@
# Repository Guidelines
## Project Structure & Module Organization
The workspace root defines shared dependencies in `Cargo.toml`. The service binary lives in `rustfs/` with entrypoints under `src/main.rs`. Crypto, IAM, and KMS components sit in `crates/` (notably `crates/crypto`, `crates/iam`, `crates/kms`). End-to-end fixtures are in `crates/e2e_test/` and `test_standalone/`. Operational tooling resides in `scripts/`, while deployment manifests are under `deploy/` and Docker assets at the root. Before editing, skim a crates README or module-level docs to confirm its responsibility.
## Build, Test, and Development Commands
Use `cargo build --release` for optimized binaries, or run `./build-rustfs.sh --dev` when iterating locally; `make build` mirrors the release pipeline. Cross-platform builds rely on `./build-rustfs.sh --platform <target>` or `make build-cross-all`. Validate code early with `cargo check --all-targets`. Run unit coverage using `cargo test --workspace --exclude e2e_test` and prefer `cargo nextest run --all --exclude e2e_test` if installed. Execute `make pre-commit` (fmt, clippy, check, test) before every push. After generating code, always run `make clippy` and ensure it completes successfully before proceeding.
## Coding Style & Naming Conventions
Formatting follows `rustfmt.toml` (130-column width, async-friendly wrapping). Adopt `snake_case` for items, `PascalCase` for types, and `SCREAMING_SNAKE_CASE` for constants. Avoid `unwrap()` and `expect()` outside tests; propagate errors with `Result` and crate-specific `thiserror` types. Keep async code non-blocking—use `tokio::task::spawn_blocking` if CPU-heavy work is unavoidable. Document public APIs with focused `///` comments that cover parameters, errors, and examples.
## Testing Guidelines
Co-locate unit tests with modules and use behavior-led names such as `handles_expired_token`. Place integration suites in `tests/` folders and exhaustive flows in `crates/e2e_test/`. For KMS validation, clear proxies and run `NO_PROXY=127.0.0.1,localhost HTTP_PROXY= HTTPS_PROXY= cargo test --package e2e_test kms:: -- --nocapture --test-threads=1`. Always finish by running `cargo test --all` to ensure coverage across crates.
## Commit & Pull Request Guidelines
Create feature branches (`feat/...`, `fix/...`, `refactor/...`) after syncing `main`; never commit directly to the protected branch. Commits must align with Conventional Commits (e.g., `feat: add kms key rotation`) and remain under 72 characters. Each commit should compile, format cleanly, pass clippy with `-D warnings`, and include relevant tests. Open pull requests with `gh pr create`, provide a concise summary, list verification commands, and wait for reviewer approval before merging.
## Communication Rules
- Respond to the user in Chinese; use English in all other contexts.
## Project Structure & Module Organization
The workspace root hosts shared dependencies in `Cargo.toml`. The service binary lives under `rustfs/src/main.rs`, while reusable crates sit in `crates/` (`crypto`, `iam`, `kms`, and `e2e_test`). Local fixtures for standalone flows reside in `test_standalone/`, deployment manifests are under `deploy/`, Docker assets sit at the root, and automation lives in `scripts/`. Skim each crates README or module docs before contributing changes.
## Build, Test, and Development Commands
Run `cargo check --all-targets` for fast validation. Build release binaries via `cargo build --release` or the pipeline-aligned `make build`. Use `./build-rustfs.sh --dev` for iterative development and `./build-rustfs.sh --platform <target>` for cross-compiles. Prefer `make pre-commit` before pushing to cover formatting, clippy, checks, and tests.
## Coding Style & Naming Conventions
Formatting follows the repo `rustfmt.toml` (130-column width). Use `snake_case` for items, `PascalCase` for types, and `SCREAMING_SNAKE_CASE` for constants. Avoid `unwrap()` or `expect()` outside tests; bubble errors with `Result` and crate-specific `thiserror` types. Keep async code non-blocking and offload CPU-heavy work with `tokio::task::spawn_blocking` when necessary.
## Testing Guidelines
Co-locate unit tests with their modules and give behavior-led names such as `handles_expired_token`. Integration suites belong in each crates `tests/` directory, while exhaustive end-to-end scenarios live in `crates/e2e_test/`. Run `cargo test --workspace --exclude e2e_test` during iteration, `cargo nextest run --all --exclude e2e_test` when available, and finish with `cargo test --all` before requesting review. Use `NO_PROXY=127.0.0.1,localhost HTTP_PROXY= HTTPS_PROXY=` for KMS e2e tests.
## Commit & Pull Request Guidelines
Work on feature branches (e.g., `feat/...`) after syncing `main`. Follow Conventional Commits under 72 characters (e.g., `feat: add kms key rotation`). Each commit must compile, format cleanly, and pass `make pre-commit`. Open PRs with a concise summary, note verification commands, link relevant issues, and wait for reviewer approval.
## Security & Configuration Tips
Do not commit secrets or cloud credentials; prefer environment variables or vault tooling. Review IAM- and KMS-related changes with a second maintainer. Confirm proxy settings before running sensitive tests to avoid leaking traffic outside localhost.

View File

@@ -33,10 +33,10 @@ chrono = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true }
tempfile = { workspace = true }
walkdir = "2.5.0"
[dev-dependencies]
serde_json = { workspace = true }
serial_test = "3.2.0"
tracing-subscriber = { workspace = true }
walkdir = "2.5.0"
tempfile = { workspace = true }

View File

@@ -17,12 +17,17 @@ use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use time::OffsetDateTime;
use ecstore::{
disk::{Disk, DiskAPI, DiskStore, WalkDirOptions},
set_disk::SetDisks,
};
use rustfs_ecstore::{self as ecstore, StorageAPI, data_usage::store_data_usage_in_backend};
use rustfs_ecstore::store_api::ObjectInfo;
use rustfs_ecstore::{
self as ecstore, StorageAPI,
data_usage::{aggregate_local_snapshots, store_data_usage_in_backend},
};
use rustfs_filemeta::{MetacacheReader, VersionType};
use tokio::sync::{Mutex, RwLock};
use tokio_util::sync::CancellationToken;
@@ -34,13 +39,14 @@ use super::stats_aggregator::{DecentralizedStatsAggregator, DecentralizedStatsAg
// IO throttling component is integrated into NodeScanner
use crate::heal::HealManager;
use crate::scanner::lifecycle::ScannerItem;
use crate::scanner::local_scan::{self, LocalObjectRecord, LocalScanOutcome};
use crate::{
HealRequest,
error::{Error, Result},
get_ahm_services_cancel_token,
};
use rustfs_common::data_usage::{BucketUsageInfo, DataUsageInfo, SizeSummary};
use rustfs_common::data_usage::{DataUsageInfo, SizeSummary};
use rustfs_common::metrics::{Metric, Metrics, globalMetrics};
use rustfs_ecstore::bucket::versioning::VersioningApi;
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
@@ -261,6 +267,15 @@ impl Scanner {
let enable_healing = config.enable_healing;
drop(config);
let scan_outcome = match local_scan::scan_and_persist_local_usage(ecstore.clone()).await {
Ok(outcome) => outcome,
Err(err) => {
warn!("Local usage scan failed: {}", err);
LocalScanOutcome::default()
}
};
let bucket_objects_map = &scan_outcome.bucket_objects;
// List all buckets
debug!("Listing buckets");
match ecstore
@@ -289,10 +304,20 @@ impl Scanner {
enabled: bucket_info.versioning,
});
// Count objects in this bucket
let actual_objects = self.count_objects_in_bucket(&ecstore, bucket_name).await;
total_objects_scanned += actual_objects;
debug!("Counted {} objects in bucket {} (actual count)", actual_objects, bucket_name);
let records = match bucket_objects_map.get(bucket_name) {
Some(records) => records,
None => {
debug!(
"No local snapshot entries found for bucket {}; skipping lifecycle/integrity",
bucket_name
);
continue;
}
};
let live_objects = records.iter().filter(|record| record.usage.has_live_object).count() as u64;
total_objects_scanned = total_objects_scanned.saturating_add(live_objects);
debug!("Counted {} objects in bucket {} using local snapshots", live_objects, bucket_name);
// Process objects for lifecycle actions
if let Some(lifecycle_config) = &lifecycle_config {
@@ -303,9 +328,8 @@ impl Scanner {
Some(versioning_config.clone()),
);
// List objects in bucket and apply lifecycle actions
match self
.process_bucket_objects_for_lifecycle(&ecstore, bucket_name, &mut scanner_item)
.process_bucket_objects_for_lifecycle(bucket_name, &mut scanner_item, records)
.await
{
Ok(processed_count) => {
@@ -320,11 +344,16 @@ impl Scanner {
// If deep scan is enabled, verify each object's integrity
if enable_deep_scan && enable_healing {
debug!("Deep scan enabled, verifying object integrity in bucket {}", bucket_name);
if let Err(e) = self.deep_scan_bucket_objects(&ecstore, bucket_name).await {
if let Err(e) = self
.deep_scan_bucket_objects_with_records(&ecstore, bucket_name, records)
.await
{
warn!("Deep scan failed for bucket {}: {}", bucket_name, e);
}
}
}
self.update_data_usage_statistics(&scan_outcome, &ecstore).await;
}
Err(e) => {
error!("Failed to list buckets: {}", e);
@@ -336,9 +365,6 @@ impl Scanner {
// Update metrics directly
self.metrics.increment_objects_scanned(total_objects_scanned);
debug!("Updated metrics with {} objects", total_objects_scanned);
// Also update data usage statistics
self.update_data_usage_statistics(total_objects_scanned, &ecstore).await;
} else {
warn!("No objects found during basic test scan");
}
@@ -350,91 +376,138 @@ impl Scanner {
}
/// Update data usage statistics based on scan results
async fn update_data_usage_statistics(&self, total_objects: u64, ecstore: &std::sync::Arc<rustfs_ecstore::store::ECStore>) {
debug!("Updating data usage statistics with {} objects", total_objects);
async fn update_data_usage_statistics(
&self,
outcome: &LocalScanOutcome,
ecstore: &std::sync::Arc<rustfs_ecstore::store::ECStore>,
) {
let enabled = {
let cfg = self.config.read().await;
cfg.enable_data_usage_stats
};
// Get buckets list to update data usage
match ecstore
.list_bucket(&rustfs_ecstore::store_api::BucketOptions::default())
.await
{
Ok(buckets) => {
let buckets_len = buckets.len(); // Store length before moving
let mut data_usage_guard = self.data_usage_stats.lock().await;
if !enabled {
debug!("Data usage statistics disabled; skipping refresh");
return;
}
for bucket_info in buckets {
let bucket_name = &bucket_info.name;
if outcome.snapshots.is_empty() {
warn!("No local usage snapshots available; skipping data usage aggregation");
return;
}
// Skip system buckets
if bucket_name.starts_with('.') {
continue;
}
let mut aggregated = DataUsageInfo::default();
let mut latest_update: Option<SystemTime> = None;
// Get object count for this bucket
let bucket_objects = self.count_objects_in_bucket(ecstore, bucket_name).await;
// Create or update bucket data usage info
let bucket_data = data_usage_guard.entry(bucket_name.clone()).or_insert_with(|| {
let mut info = DataUsageInfo::new();
info.objects_total_count = bucket_objects;
info.buckets_count = 1;
// Add bucket to buckets_usage
let bucket_usage = BucketUsageInfo {
size: bucket_objects * 1024, // Estimate 1KB per object
objects_count: bucket_objects,
object_size_histogram: HashMap::new(),
..Default::default()
};
info.buckets_usage.insert(bucket_name.clone(), bucket_usage);
info
});
// Update existing bucket data
bucket_data.objects_total_count = bucket_objects;
if let Some(bucket_usage) = bucket_data.buckets_usage.get_mut(bucket_name) {
bucket_usage.objects_count = bucket_objects;
bucket_usage.size = bucket_objects * 1024; // Estimate 1KB per object
}
debug!("Updated data usage for bucket {}: {} objects", bucket_name, bucket_objects);
}
debug!("Data usage statistics updated for {} buckets", buckets_len);
debug!("Current data_usage_guard size after update: {}", data_usage_guard.len());
// Also persist consolidated data to backend
drop(data_usage_guard); // Release the lock
debug!("About to get consolidated data usage info for persistence");
if let Ok(consolidated_info) = self.get_consolidated_data_usage_info().await {
debug!("Got consolidated info with {} objects total", consolidated_info.objects_total_count);
let config = self.config.read().await;
if config.enable_data_usage_stats {
debug!("Data usage stats enabled, proceeding to store to backend");
if let Some(store) = rustfs_ecstore::new_object_layer_fn() {
debug!("ECStore available, spawning background storage task");
let data_clone = consolidated_info.clone();
tokio::spawn(async move {
if let Err(e) = store_data_usage_in_backend(data_clone, store).await {
error!("Failed to store consolidated data usage to backend: {}", e);
} else {
debug!("Successfully stored consolidated data usage to backend");
}
});
} else {
warn!("ECStore not available");
}
} else {
warn!("Data usage stats not enabled");
}
} else {
error!("Failed to get consolidated data usage info");
for snapshot in &outcome.snapshots {
if let Some(update) = snapshot.last_update {
if latest_update.is_none_or(|current| update > current) {
latest_update = Some(update);
}
}
Err(e) => {
error!("Failed to update data usage statistics: {}", e);
aggregated.objects_total_count = aggregated.objects_total_count.saturating_add(snapshot.objects_total_count);
aggregated.versions_total_count = aggregated.versions_total_count.saturating_add(snapshot.versions_total_count);
aggregated.delete_markers_total_count = aggregated
.delete_markers_total_count
.saturating_add(snapshot.delete_markers_total_count);
aggregated.objects_total_size = aggregated.objects_total_size.saturating_add(snapshot.objects_total_size);
for (bucket, usage) in &snapshot.buckets_usage {
let size = usage.size;
match aggregated.buckets_usage.entry(bucket.clone()) {
std::collections::hash_map::Entry::Occupied(mut entry) => entry.get_mut().merge(usage),
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(usage.clone());
}
}
aggregated
.bucket_sizes
.entry(bucket.clone())
.and_modify(|existing| *existing = existing.saturating_add(size))
.or_insert(size);
}
}
aggregated.buckets_count = aggregated.buckets_usage.len() as u64;
aggregated.last_update = latest_update;
self.node_scanner.update_data_usage(aggregated.clone()).await;
let local_stats = self.node_scanner.get_stats_summary().await;
self.stats_aggregator.set_local_stats(local_stats).await;
let mut guard = self.data_usage_stats.lock().await;
guard.clear();
for (bucket, usage) in &aggregated.buckets_usage {
let mut bucket_data = DataUsageInfo::new();
bucket_data.last_update = aggregated.last_update;
bucket_data.buckets_count = 1;
bucket_data.objects_total_count = usage.objects_count;
bucket_data.versions_total_count = usage.versions_count;
bucket_data.delete_markers_total_count = usage.delete_markers_count;
bucket_data.objects_total_size = usage.size;
bucket_data.bucket_sizes.insert(bucket.clone(), usage.size);
bucket_data.buckets_usage.insert(bucket.clone(), usage.clone());
guard.insert(bucket.clone(), bucket_data);
}
drop(guard);
let info_clone = aggregated.clone();
let store_clone = ecstore.clone();
tokio::spawn(async move {
if let Err(err) = store_data_usage_in_backend(info_clone, store_clone).await {
warn!("Failed to persist aggregated usage: {}", err);
}
});
}
fn convert_record_to_object_info(record: &LocalObjectRecord) -> ObjectInfo {
if let Some(info) = &record.object_info {
return info.clone();
}
let usage = &record.usage;
ObjectInfo {
bucket: usage.bucket.clone(),
name: usage.object.clone(),
size: usage.total_size as i64,
delete_marker: !usage.has_live_object && usage.delete_markers_count > 0,
mod_time: usage.last_modified_ns.and_then(Self::ns_to_offset_datetime),
..Default::default()
}
}
fn ns_to_offset_datetime(ns: i128) -> Option<OffsetDateTime> {
OffsetDateTime::from_unix_timestamp_nanos(ns).ok()
}
async fn deep_scan_bucket_objects_with_records(
&self,
ecstore: &std::sync::Arc<rustfs_ecstore::store::ECStore>,
bucket_name: &str,
records: &[LocalObjectRecord],
) -> Result<()> {
if records.is_empty() {
return self.deep_scan_bucket_objects(ecstore, bucket_name).await;
}
for record in records {
if !record.usage.has_live_object {
continue;
}
let object_name = &record.usage.object;
if let Err(err) = self.verify_object_integrity(bucket_name, object_name).await {
warn!(
"Object integrity verification failed for {}/{} during deep scan: {}",
bucket_name, object_name, err
);
}
}
Ok(())
}
/// Deep scan objects in a bucket for integrity verification
@@ -484,130 +557,29 @@ impl Scanner {
Ok(())
}
async fn count_objects_in_bucket(&self, ecstore: &std::sync::Arc<rustfs_ecstore::store::ECStore>, bucket_name: &str) -> u64 {
// Use filesystem scanning approach
// Get first disk path for scanning
let mut total_objects = 0u64;
for pool in &ecstore.pools {
for set_disks in &pool.disk_set {
let (disks, _) = set_disks.get_online_disks_with_healing(false).await;
if let Some(disk) = disks.first() {
let bucket_path = disk.path().join(bucket_name);
if bucket_path.exists() {
if let Ok(entries) = std::fs::read_dir(&bucket_path) {
let object_count = entries
.filter_map(|entry| entry.ok())
.filter(|entry| {
if let Ok(file_type) = entry.file_type() {
file_type.is_dir()
} else {
false
}
})
.filter(|entry| {
// Skip hidden/system directories
if let Some(name) = entry.file_name().to_str() {
!name.starts_with('.')
} else {
false
}
})
.count() as u64;
debug!(
"Filesystem scan found {} objects in bucket {} on disk {:?}",
object_count,
bucket_name,
disk.path()
);
total_objects = object_count; // Use count from first disk
break;
}
}
}
}
if total_objects > 0 {
break;
}
}
if total_objects == 0 {
// Fallback: assume 1 object if bucket exists
debug!("Using fallback count of 1 for bucket {}", bucket_name);
1
} else {
total_objects
}
}
/// Process bucket objects for lifecycle actions
async fn process_bucket_objects_for_lifecycle(
&self,
ecstore: &std::sync::Arc<rustfs_ecstore::store::ECStore>,
bucket_name: &str,
scanner_item: &mut ScannerItem,
records: &[LocalObjectRecord],
) -> Result<u64> {
info!("Processing objects for lifecycle in bucket: {}", bucket_name);
let mut processed_count = 0u64;
// Instead of filesystem scanning, use ECStore's list_objects_v2 method to get correct object names
let mut continuation_token = None;
loop {
let list_result = match ecstore
.clone()
.list_objects_v2(
bucket_name,
"", // prefix
continuation_token.clone(),
None, // delimiter
1000, // max_keys
false, // fetch_owner
None, // start_after
)
.await
{
Ok(result) => result,
Err(e) => {
warn!("Failed to list objects in bucket {}: {}", bucket_name, e);
break;
}
};
info!("Found {} objects in bucket {}", list_result.objects.len(), bucket_name);
for obj in list_result.objects {
info!("Processing lifecycle for object: {}/{}", bucket_name, obj.name);
// Create ObjectInfo for lifecycle processing
let object_info = rustfs_ecstore::store_api::ObjectInfo {
bucket: bucket_name.to_string(),
name: obj.name.clone(),
version_id: None,
mod_time: obj.mod_time,
size: obj.size,
user_defined: std::collections::HashMap::new(),
..Default::default()
};
// Create SizeSummary for tracking
let mut size_summary = SizeSummary::default();
// Apply lifecycle actions
let (deleted, _size) = scanner_item.apply_actions(&object_info, &mut size_summary).await;
if deleted {
info!("Object {}/{} was deleted by lifecycle action", bucket_name, obj.name);
}
processed_count += 1;
for record in records {
if !record.usage.has_live_object {
continue;
}
// Check if there are more objects to list
if list_result.is_truncated {
continuation_token = list_result.next_continuation_token.clone();
} else {
break;
let object_info = Self::convert_record_to_object_info(record);
let mut size_summary = SizeSummary::default();
let (deleted, _size) = scanner_item.apply_actions(&object_info, &mut size_summary).await;
if deleted {
info!("Object {}/{} was deleted by lifecycle action", bucket_name, object_info.name);
}
processed_count = processed_count.saturating_add(1);
}
info!("Processed {} objects for lifecycle in bucket {}", processed_count, bucket_name);
@@ -644,6 +616,21 @@ impl Scanner {
}
});
// Trigger an immediate data usage collection so that admin APIs have fresh data after startup.
let scanner = self.clone_for_background();
tokio::spawn(async move {
let enable_stats = {
let cfg = scanner.config.read().await;
cfg.enable_data_usage_stats
};
if enable_stats {
if let Err(e) = scanner.collect_and_persist_data_usage().await {
warn!("Initial data usage collection failed: {}", e);
}
}
});
Ok(())
}
@@ -712,23 +699,6 @@ impl Scanner {
Ok(integrated_info)
}
/// Get consolidated data usage info without debug output (for internal use)
async fn get_consolidated_data_usage_info(&self) -> Result<DataUsageInfo> {
let mut integrated_info = DataUsageInfo::new();
// Collect data from all buckets
{
let data_usage_guard = self.data_usage_stats.lock().await;
for (bucket_name, bucket_data) in data_usage_guard.iter() {
let _bucket_name = bucket_name;
integrated_info.merge(bucket_data);
}
}
self.update_capacity_info(&mut integrated_info).await;
Ok(integrated_info)
}
/// Update capacity information in DataUsageInfo
async fn update_capacity_info(&self, integrated_info: &mut DataUsageInfo) {
// Update capacity information from storage info
@@ -865,17 +835,13 @@ impl Scanner {
// Update legacy metrics with aggregated data
self.update_legacy_metrics_from_aggregated(&aggregated_stats).await;
// If aggregated stats show no objects scanned, also try basic test scan
if aggregated_stats.total_objects_scanned == 0 {
debug!("Aggregated stats show 0 objects, falling back to direct ECStore scan for testing");
info!("Calling perform_basic_test_scan due to 0 aggregated objects");
if let Err(scan_error) = self.perform_basic_test_scan().await {
warn!("Basic test scan failed: {}", scan_error);
} else {
debug!("Basic test scan completed successfully after aggregated stats");
}
// Always perform basic test scan to ensure lifecycle processing in test environments
debug!("Performing basic test scan to ensure lifecycle processing");
info!("Calling perform_basic_test_scan to ensure lifecycle processing");
if let Err(scan_error) = self.perform_basic_test_scan().await {
warn!("Basic test scan failed: {}", scan_error);
} else {
info!("Not calling perform_basic_test_scan because aggregated_stats.total_objects_scanned > 0");
debug!("Basic test scan completed successfully");
}
info!(
@@ -891,17 +857,13 @@ impl Scanner {
info!("Local stats: total_objects_scanned={}", local_stats.total_objects_scanned);
self.update_legacy_metrics_from_local(&local_stats).await;
// In test environments, if no real scanning happened, perform basic scan
if local_stats.total_objects_scanned == 0 {
debug!("No objects scanned by NodeScanner, falling back to direct ECStore scan for testing");
info!("Calling perform_basic_test_scan due to 0 local objects");
if let Err(scan_error) = self.perform_basic_test_scan().await {
warn!("Basic test scan failed: {}", scan_error);
} else {
debug!("Basic test scan completed successfully");
}
// Always perform basic test scan to ensure lifecycle processing in test environments
debug!("Performing basic test scan to ensure lifecycle processing");
info!("Calling perform_basic_test_scan to ensure lifecycle processing");
if let Err(scan_error) = self.perform_basic_test_scan().await {
warn!("Basic test scan failed: {}", scan_error);
} else {
info!("Not calling perform_basic_test_scan because local_stats.total_objects_scanned > 0");
debug!("Basic test scan completed successfully");
}
}
}
@@ -940,16 +902,47 @@ impl Scanner {
return Ok(());
};
// Collect data usage from NodeScanner stats
let _local_stats = self.node_scanner.get_stats_summary().await;
// Run local usage scan and aggregate snapshots; fall back to on-demand build when necessary.
let mut data_usage = match local_scan::scan_and_persist_local_usage(ecstore.clone()).await {
Ok(outcome) => {
info!(
"Local usage scan completed: {} disks with {} snapshot entries",
outcome.disk_status.len(),
outcome.snapshots.len()
);
// Build data usage from ECStore directly for now
let data_usage = self.build_data_usage_from_ecstore(&ecstore).await?;
match aggregate_local_snapshots(ecstore.clone()).await {
Ok((_, mut aggregated)) => {
if aggregated.last_update.is_none() {
aggregated.last_update = Some(SystemTime::now());
}
aggregated
}
Err(e) => {
warn!(
"Failed to aggregate local data usage snapshots, falling back to realtime collection: {}",
e
);
self.build_data_usage_from_ecstore(&ecstore).await?
}
}
}
Err(e) => {
warn!("Local usage scan failed (using realtime collection instead): {}", e);
self.build_data_usage_from_ecstore(&ecstore).await?
}
};
// Update NodeScanner with collected data
// Make sure bucket counters reflect aggregated content
data_usage.buckets_count = data_usage.buckets_usage.len() as u64;
if data_usage.last_update.is_none() {
data_usage.last_update = Some(SystemTime::now());
}
// Publish to node stats manager
self.node_scanner.update_data_usage(data_usage.clone()).await;
// Store to local cache
// Store to local cache for quick API responses
{
let mut data_usage_guard = self.data_usage_stats.lock().await;
data_usage_guard.insert("consolidated".to_string(), data_usage.clone());
@@ -973,8 +966,10 @@ impl Scanner {
});
info!(
"Data usage collection completed: {} buckets, {} objects",
data_usage.buckets_count, data_usage.objects_total_count
"Data usage collection completed: {} buckets, {} objects ({} disks reporting)",
data_usage.buckets_count,
data_usage.objects_total_count,
data_usage.disk_usage_status.len()
);
Ok(())
@@ -2472,17 +2467,41 @@ impl Scanner {
async fn legacy_scan_loop(&self) -> Result<()> {
info!("Starting legacy scan loop for backward compatibility");
while !get_ahm_services_cancel_token().is_none_or(|t| t.is_cancelled()) {
// Update local stats in aggregator
loop {
if let Some(token) = get_ahm_services_cancel_token() {
if token.is_cancelled() {
info!("Cancellation requested, exiting legacy scan loop");
break;
}
}
let (enable_data_usage_stats, scan_interval) = {
let config = self.config.read().await;
(config.enable_data_usage_stats, config.scan_interval)
};
if enable_data_usage_stats {
if let Err(e) = self.collect_and_persist_data_usage().await {
warn!("Background data usage collection failed: {}", e);
}
}
// Update local stats in aggregator after latest scan
let local_stats = self.node_scanner.get_stats_summary().await;
self.stats_aggregator.set_local_stats(local_stats).await;
// Sleep for scan interval
let config = self.config.read().await;
let scan_interval = config.scan_interval;
drop(config);
tokio::time::sleep(scan_interval).await;
match get_ahm_services_cancel_token() {
Some(token) => {
tokio::select! {
_ = tokio::time::sleep(scan_interval) => {}
_ = token.cancelled() => {
info!("Cancellation requested, exiting legacy scan loop");
break;
}
}
}
None => tokio::time::sleep(scan_interval).await,
}
}
Ok(())

View File

@@ -0,0 +1,664 @@
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use serde_json::{from_slice, to_vec};
use tokio::{fs, task};
use tracing::warn;
use walkdir::WalkDir;
use crate::error::{Error, Result};
use rustfs_common::data_usage::DiskUsageStatus;
use rustfs_ecstore::data_usage::{
LocalUsageSnapshot, LocalUsageSnapshotMeta, data_usage_state_dir, ensure_data_usage_layout, snapshot_file_name,
write_local_snapshot,
};
use rustfs_ecstore::disk::DiskAPI;
use rustfs_ecstore::store::ECStore;
use rustfs_ecstore::store_api::ObjectInfo;
use rustfs_filemeta::{FileInfo, FileMeta, FileMetaVersion, VersionType};
const STATE_FILE_EXTENSION: &str = "";
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LocalObjectUsage {
pub bucket: String,
pub object: String,
pub last_modified_ns: Option<i128>,
pub versions_count: u64,
pub delete_markers_count: u64,
pub total_size: u64,
pub has_live_object: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct IncrementalScanState {
last_scan_ns: Option<i128>,
objects: HashMap<String, LocalObjectUsage>,
}
struct DiskScanResult {
snapshot: LocalUsageSnapshot,
state: IncrementalScanState,
objects_by_bucket: HashMap<String, Vec<LocalObjectRecord>>,
status: DiskUsageStatus,
}
#[derive(Debug, Clone)]
pub struct LocalObjectRecord {
pub usage: LocalObjectUsage,
pub object_info: Option<rustfs_ecstore::store_api::ObjectInfo>,
}
#[derive(Debug, Default)]
pub struct LocalScanOutcome {
pub snapshots: Vec<LocalUsageSnapshot>,
pub bucket_objects: HashMap<String, Vec<LocalObjectRecord>>,
pub disk_status: Vec<DiskUsageStatus>,
}
/// Scan all local primary disks and persist refreshed usage snapshots.
pub async fn scan_and_persist_local_usage(store: Arc<ECStore>) -> Result<LocalScanOutcome> {
let mut snapshots = Vec::new();
let mut bucket_objects: HashMap<String, Vec<LocalObjectRecord>> = HashMap::new();
let mut disk_status = Vec::new();
for (pool_idx, pool) in store.pools.iter().enumerate() {
for set_disks in pool.disk_set.iter() {
let disks = {
let guard = set_disks.disks.read().await;
guard.clone()
};
for (disk_index, disk_opt) in disks.into_iter().enumerate() {
let Some(disk) = disk_opt else {
continue;
};
if !disk.is_local() {
continue;
}
// Count objects once by scanning only disk index zero from each set.
if disk_index != 0 {
continue;
}
let disk_id = match disk.get_disk_id().await.map_err(Error::from)? {
Some(id) => id.to_string(),
None => {
warn!("Skipping disk without ID: {}", disk.to_string());
continue;
}
};
let root = disk.path();
ensure_data_usage_layout(root.as_path()).await.map_err(Error::from)?;
let meta = LocalUsageSnapshotMeta {
disk_id: disk_id.clone(),
pool_index: Some(pool_idx),
set_index: Some(set_disks.set_index),
disk_index: Some(disk_index),
};
let state_path = state_file_path(root.as_path(), &disk_id);
let state = read_scan_state(&state_path).await?;
let root_clone = root.clone();
let meta_clone = meta.clone();
let handle = task::spawn_blocking(move || scan_disk_blocking(root_clone, meta_clone, state));
match handle.await {
Ok(Ok(result)) => {
write_local_snapshot(root.as_path(), &disk_id, &result.snapshot)
.await
.map_err(Error::from)?;
write_scan_state(&state_path, &result.state).await?;
snapshots.push(result.snapshot);
for (bucket, records) in result.objects_by_bucket {
bucket_objects.entry(bucket).or_default().extend(records.into_iter());
}
disk_status.push(result.status);
}
Ok(Err(err)) => {
warn!("Failed to scan disk {}: {}", disk.to_string(), err);
}
Err(join_err) => {
warn!("Disk scan task panicked for disk {}: {}", disk.to_string(), join_err);
}
}
}
}
}
Ok(LocalScanOutcome {
snapshots,
bucket_objects,
disk_status,
})
}
fn scan_disk_blocking(root: PathBuf, meta: LocalUsageSnapshotMeta, mut state: IncrementalScanState) -> Result<DiskScanResult> {
let now = SystemTime::now();
let now_ns = system_time_to_ns(now);
let mut visited: HashSet<String> = HashSet::new();
let mut emitted: HashSet<String> = HashSet::new();
let mut objects_by_bucket: HashMap<String, Vec<LocalObjectRecord>> = HashMap::new();
let mut status = DiskUsageStatus {
disk_id: meta.disk_id.clone(),
pool_index: meta.pool_index,
set_index: meta.set_index,
disk_index: meta.disk_index,
last_update: None,
snapshot_exists: false,
};
for entry in WalkDir::new(&root).follow_links(false).into_iter().filter_map(|res| res.ok()) {
if !entry.file_type().is_file() {
continue;
}
if entry.file_name() != "xl.meta" {
continue;
}
let xl_path = entry.path().to_path_buf();
let Some(object_dir) = xl_path.parent() else {
continue;
};
let Some(rel_path) = object_dir.strip_prefix(&root).ok().map(normalize_path) else {
continue;
};
let mut components = rel_path.split('/');
let Some(bucket_name) = components.next() else {
continue;
};
if bucket_name.starts_with('.') {
continue;
}
let object_key = components.collect::<Vec<_>>().join("/");
visited.insert(rel_path.clone());
let metadata = match std::fs::metadata(&xl_path) {
Ok(meta) => meta,
Err(err) => {
warn!("Failed to read metadata for {xl_path:?}: {err}");
continue;
}
};
let mtime_ns = metadata.modified().ok().map(system_time_to_ns);
let should_parse = match state.objects.get(&rel_path) {
Some(existing) => existing.last_modified_ns != mtime_ns,
None => true,
};
if should_parse {
match std::fs::read(&xl_path) {
Ok(buf) => match FileMeta::load(&buf) {
Ok(file_meta) => match compute_object_usage(bucket_name, object_key.as_str(), &file_meta) {
Ok(Some(mut record)) => {
record.usage.last_modified_ns = mtime_ns;
state.objects.insert(rel_path.clone(), record.usage.clone());
emitted.insert(rel_path.clone());
objects_by_bucket.entry(record.usage.bucket.clone()).or_default().push(record);
}
Ok(None) => {
state.objects.remove(&rel_path);
}
Err(err) => {
warn!("Failed to parse usage from {:?}: {}", xl_path, err);
}
},
Err(err) => {
warn!("Failed to decode xl.meta {:?}: {}", xl_path, err);
}
},
Err(err) => {
warn!("Failed to read xl.meta {:?}: {}", xl_path, err);
}
}
}
}
state.objects.retain(|key, _| visited.contains(key));
state.last_scan_ns = Some(now_ns);
for (key, usage) in &state.objects {
if emitted.contains(key) {
continue;
}
objects_by_bucket
.entry(usage.bucket.clone())
.or_default()
.push(LocalObjectRecord {
usage: usage.clone(),
object_info: None,
});
}
let snapshot = build_snapshot(meta, &state.objects, now);
status.snapshot_exists = true;
status.last_update = Some(now);
Ok(DiskScanResult {
snapshot,
state,
objects_by_bucket,
status,
})
}
fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Result<Option<LocalObjectRecord>> {
let mut versions_count = 0u64;
let mut delete_markers_count = 0u64;
let mut total_size = 0u64;
let mut has_live_object = false;
let mut latest_file_info: Option<FileInfo> = None;
for shallow in &file_meta.versions {
match shallow.header.version_type {
VersionType::Object => {
let version = match FileMetaVersion::try_from(shallow.meta.as_slice()) {
Ok(version) => version,
Err(err) => {
warn!("Failed to parse file meta version: {}", err);
continue;
}
};
if let Some(obj) = version.object {
if !has_live_object {
total_size = obj.size.max(0) as u64;
}
has_live_object = true;
versions_count = versions_count.saturating_add(1);
if latest_file_info.is_none() {
if let Ok(info) = file_meta.into_fileinfo(bucket, object, "", false, false) {
latest_file_info = Some(info);
}
}
}
}
VersionType::Delete => {
delete_markers_count = delete_markers_count.saturating_add(1);
versions_count = versions_count.saturating_add(1);
}
_ => {}
}
}
if !has_live_object && delete_markers_count == 0 {
return Ok(None);
}
let object_info = latest_file_info.as_ref().map(|fi| {
let versioned = fi.version_id.is_some();
ObjectInfo::from_file_info(fi, bucket, object, versioned)
});
Ok(Some(LocalObjectRecord {
usage: LocalObjectUsage {
bucket: bucket.to_string(),
object: object.to_string(),
last_modified_ns: None,
versions_count,
delete_markers_count,
total_size,
has_live_object,
},
object_info,
}))
}
fn build_snapshot(
meta: LocalUsageSnapshotMeta,
objects: &HashMap<String, LocalObjectUsage>,
now: SystemTime,
) -> LocalUsageSnapshot {
let mut snapshot = LocalUsageSnapshot::new(meta);
for usage in objects.values() {
let bucket_entry = snapshot.buckets_usage.entry(usage.bucket.clone()).or_default();
if usage.has_live_object {
bucket_entry.objects_count = bucket_entry.objects_count.saturating_add(1);
}
bucket_entry.versions_count = bucket_entry.versions_count.saturating_add(usage.versions_count);
bucket_entry.delete_markers_count = bucket_entry.delete_markers_count.saturating_add(usage.delete_markers_count);
bucket_entry.size = bucket_entry.size.saturating_add(usage.total_size);
}
snapshot.last_update = Some(now);
snapshot.recompute_totals();
snapshot
}
fn normalize_path(path: &Path) -> String {
path.iter()
.map(|component| component.to_string_lossy())
.collect::<Vec<_>>()
.join("/")
}
fn system_time_to_ns(time: SystemTime) -> i128 {
match time.duration_since(UNIX_EPOCH) {
Ok(duration) => {
let secs = duration.as_secs() as i128;
let nanos = duration.subsec_nanos() as i128;
secs * 1_000_000_000 + nanos
}
Err(err) => {
let duration = err.duration();
let secs = duration.as_secs() as i128;
let nanos = duration.subsec_nanos() as i128;
-(secs * 1_000_000_000 + nanos)
}
}
}
fn state_file_path(root: &Path, disk_id: &str) -> PathBuf {
let mut path = data_usage_state_dir(root);
path.push(format!("{}{}", snapshot_file_name(disk_id), STATE_FILE_EXTENSION));
path
}
async fn read_scan_state(path: &Path) -> Result<IncrementalScanState> {
match fs::read(path).await {
Ok(bytes) => from_slice(&bytes).map_err(|err| Error::Serialization(err.to_string())),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(IncrementalScanState::default()),
Err(err) => Err(err.into()),
}
}
async fn write_scan_state(path: &Path, state: &IncrementalScanState) -> Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
let data = to_vec(state).map_err(|err| Error::Serialization(err.to_string()))?;
fs::write(path, data).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use rustfs_filemeta::{ChecksumAlgo, ErasureAlgo, FileMetaShallowVersion, MetaDeleteMarker, MetaObject};
use std::collections::HashMap;
use std::fs;
use tempfile::TempDir;
use time::OffsetDateTime;
use uuid::Uuid;
fn build_file_meta_with_object(erasure_index: usize, size: i64) -> FileMeta {
let mut file_meta = FileMeta::default();
let meta_object = MetaObject {
version_id: Some(Uuid::new_v4()),
data_dir: Some(Uuid::new_v4()),
erasure_algorithm: ErasureAlgo::ReedSolomon,
erasure_m: 2,
erasure_n: 2,
erasure_block_size: 4096,
erasure_index,
erasure_dist: vec![0_u8, 1, 2, 3],
bitrot_checksum_algo: ChecksumAlgo::HighwayHash,
part_numbers: vec![1],
part_etags: vec!["etag".to_string()],
part_sizes: vec![size as usize],
part_actual_sizes: vec![size],
part_indices: Vec::new(),
size,
mod_time: Some(OffsetDateTime::now_utc()),
meta_sys: HashMap::new(),
meta_user: HashMap::new(),
};
let version = FileMetaVersion {
version_type: VersionType::Object,
object: Some(meta_object),
delete_marker: None,
write_version: 1,
};
let shallow = FileMetaShallowVersion::try_from(version).expect("convert version");
file_meta.versions.push(shallow);
file_meta
}
fn build_file_meta_with_delete_marker() -> FileMeta {
let mut file_meta = FileMeta::default();
let delete_marker = MetaDeleteMarker {
version_id: Some(Uuid::new_v4()),
mod_time: Some(OffsetDateTime::now_utc()),
meta_sys: None,
};
let version = FileMetaVersion {
version_type: VersionType::Delete,
object: None,
delete_marker: Some(delete_marker),
write_version: 2,
};
let shallow = FileMetaShallowVersion::try_from(version).expect("convert delete marker");
file_meta.versions.push(shallow);
file_meta
}
#[test]
fn compute_object_usage_primary_disk() {
let file_meta = build_file_meta_with_object(0, 1024);
let record = compute_object_usage("bucket", "foo/bar", &file_meta)
.expect("compute usage")
.expect("record should exist");
assert!(record.usage.has_live_object);
assert_eq!(record.usage.bucket, "bucket");
assert_eq!(record.usage.object, "foo/bar");
assert_eq!(record.usage.total_size, 1024);
assert!(record.object_info.is_some(), "object info should be synthesized");
}
#[test]
fn compute_object_usage_handles_non_primary_disk() {
let file_meta = build_file_meta_with_object(1, 2048);
let record = compute_object_usage("bucket", "obj", &file_meta)
.expect("compute usage")
.expect("record should exist for non-primary shard");
assert!(record.usage.has_live_object);
}
#[test]
fn compute_object_usage_reports_delete_marker() {
let file_meta = build_file_meta_with_delete_marker();
let record = compute_object_usage("bucket", "obj", &file_meta)
.expect("compute usage")
.expect("delete marker record");
assert!(!record.usage.has_live_object);
assert_eq!(record.usage.delete_markers_count, 1);
assert_eq!(record.usage.versions_count, 1);
}
#[test]
fn build_snapshot_accumulates_usage() {
let mut objects = HashMap::new();
objects.insert(
"bucket/a".to_string(),
LocalObjectUsage {
bucket: "bucket".to_string(),
object: "a".to_string(),
last_modified_ns: None,
versions_count: 2,
delete_markers_count: 1,
total_size: 512,
has_live_object: true,
},
);
let snapshot = build_snapshot(LocalUsageSnapshotMeta::default(), &objects, SystemTime::now());
let usage = snapshot.buckets_usage.get("bucket").expect("bucket entry should exist");
assert_eq!(usage.objects_count, 1);
assert_eq!(usage.versions_count, 2);
assert_eq!(usage.delete_markers_count, 1);
assert_eq!(usage.size, 512);
}
#[test]
fn scan_disk_blocking_handles_incremental_updates() {
let temp_dir = TempDir::new().expect("create temp dir");
let root = temp_dir.path();
let bucket_dir = root.join("bench");
let object1_dir = bucket_dir.join("obj1");
fs::create_dir_all(&object1_dir).expect("create first object directory");
let file_meta = build_file_meta_with_object(0, 1024);
let bytes = file_meta.marshal_msg().expect("serialize first object");
fs::write(object1_dir.join("xl.meta"), bytes).expect("write first xl.meta");
let meta = LocalUsageSnapshotMeta {
disk_id: "disk-test".to_string(),
..Default::default()
};
let DiskScanResult {
snapshot: snapshot1,
state,
..
} = scan_disk_blocking(root.to_path_buf(), meta.clone(), IncrementalScanState::default()).expect("initial scan succeeds");
let usage1 = snapshot1.buckets_usage.get("bench").expect("bucket stats recorded");
assert_eq!(usage1.objects_count, 1);
assert_eq!(usage1.size, 1024);
assert_eq!(state.objects.len(), 1);
let object2_dir = bucket_dir.join("nested").join("obj2");
fs::create_dir_all(&object2_dir).expect("create second object directory");
let second_meta = build_file_meta_with_object(0, 2048);
let bytes = second_meta.marshal_msg().expect("serialize second object");
fs::write(object2_dir.join("xl.meta"), bytes).expect("write second xl.meta");
let DiskScanResult {
snapshot: snapshot2,
state: state_next,
..
} = scan_disk_blocking(root.to_path_buf(), meta.clone(), state).expect("incremental scan succeeds");
let usage2 = snapshot2
.buckets_usage
.get("bench")
.expect("bucket stats recorded after addition");
assert_eq!(usage2.objects_count, 2);
assert_eq!(usage2.size, 1024 + 2048);
assert_eq!(state_next.objects.len(), 2);
fs::remove_dir_all(&object1_dir).expect("remove first object");
let DiskScanResult {
snapshot: snapshot3,
state: state_final,
..
} = scan_disk_blocking(root.to_path_buf(), meta, state_next).expect("scan after deletion succeeds");
let usage3 = snapshot3
.buckets_usage
.get("bench")
.expect("bucket stats recorded after deletion");
assert_eq!(usage3.objects_count, 1);
assert_eq!(usage3.size, 2048);
assert_eq!(state_final.objects.len(), 1);
assert!(
state_final.objects.keys().all(|path| path.contains("nested")),
"state should only keep surviving object"
);
}
#[test]
fn scan_disk_blocking_recovers_from_stale_state_entries() {
let temp_dir = TempDir::new().expect("create temp dir");
let root = temp_dir.path();
let mut stale_state = IncrementalScanState::default();
stale_state.objects.insert(
"bench/stale".to_string(),
LocalObjectUsage {
bucket: "bench".to_string(),
object: "stale".to_string(),
last_modified_ns: Some(42),
versions_count: 1,
delete_markers_count: 0,
total_size: 512,
has_live_object: true,
},
);
stale_state.last_scan_ns = Some(99);
let meta = LocalUsageSnapshotMeta {
disk_id: "disk-test".to_string(),
..Default::default()
};
let DiskScanResult {
snapshot, state, status, ..
} = scan_disk_blocking(root.to_path_buf(), meta, stale_state).expect("scan succeeds");
assert!(state.objects.is_empty(), "stale entries should be cleared when files disappear");
assert!(
snapshot.buckets_usage.is_empty(),
"no real xl.meta files means bucket usage should stay empty"
);
assert!(status.snapshot_exists, "snapshot status should indicate a refresh");
}
#[test]
fn scan_disk_blocking_handles_large_volume() {
const OBJECTS: usize = 256;
let temp_dir = TempDir::new().expect("create temp dir");
let root = temp_dir.path();
let bucket_dir = root.join("bulk");
for idx in 0..OBJECTS {
let object_dir = bucket_dir.join(format!("obj-{idx:03}"));
fs::create_dir_all(&object_dir).expect("create object directory");
let size = 1024 + idx as i64;
let file_meta = build_file_meta_with_object(0, size);
let bytes = file_meta.marshal_msg().expect("serialize file meta");
fs::write(object_dir.join("xl.meta"), bytes).expect("write xl.meta");
}
let meta = LocalUsageSnapshotMeta {
disk_id: "disk-test".to_string(),
..Default::default()
};
let DiskScanResult { snapshot, state, .. } =
scan_disk_blocking(root.to_path_buf(), meta, IncrementalScanState::default()).expect("bulk scan succeeds");
let bucket_usage = snapshot
.buckets_usage
.get("bulk")
.expect("bucket usage present for bulk scan");
assert_eq!(bucket_usage.objects_count as usize, OBJECTS, "should count all objects once");
assert!(
bucket_usage.size >= (1024 * OBJECTS) as u64,
"aggregated size should grow with object count"
);
assert_eq!(state.objects.len(), OBJECTS, "incremental state tracks every object");
}
}

View File

@@ -349,6 +349,7 @@ impl LocalStatsManager {
total_buckets: stats.buckets_stats.len(),
last_update: stats.last_update,
scan_progress: stats.scan_progress.clone(),
data_usage: stats.data_usage.clone(),
}
}
@@ -427,4 +428,6 @@ pub struct StatsSummary {
pub last_update: SystemTime,
/// scan progress
pub scan_progress: super::node_scanner::ScanProgress,
/// data usage snapshot for the node
pub data_usage: DataUsageInfo,
}

View File

@@ -18,6 +18,7 @@ pub mod histogram;
pub mod io_monitor;
pub mod io_throttler;
pub mod lifecycle;
pub mod local_scan;
pub mod local_stats;
pub mod metrics;
pub mod node_scanner;

View File

@@ -457,6 +457,7 @@ impl DecentralizedStatsAggregator {
aggregated.total_heal_triggered += summary.total_heal_triggered;
aggregated.total_disks += summary.total_disks;
aggregated.total_buckets += summary.total_buckets;
aggregated.aggregated_data_usage.merge(&summary.data_usage);
// aggregate scan progress
aggregated
@@ -570,3 +571,202 @@ pub struct CacheStatus {
/// cache ttl
pub ttl: Duration,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scanner::node_scanner::{BucketScanState, ScanProgress};
use rustfs_common::data_usage::{BucketUsageInfo, DataUsageInfo};
use std::collections::{HashMap, HashSet};
use std::time::Duration;
#[tokio::test]
async fn aggregated_stats_merge_data_usage() {
let aggregator = DecentralizedStatsAggregator::new(DecentralizedStatsAggregatorConfig::default());
let mut data_usage = DataUsageInfo::default();
let bucket_usage = BucketUsageInfo {
objects_count: 5,
size: 1024,
..Default::default()
};
data_usage.buckets_usage.insert("bucket".to_string(), bucket_usage);
data_usage.objects_total_count = 5;
data_usage.objects_total_size = 1024;
let summary = StatsSummary {
node_id: "local-node".to_string(),
total_objects_scanned: 10,
total_healthy_objects: 9,
total_corrupted_objects: 1,
total_bytes_scanned: 2048,
total_scan_errors: 0,
total_heal_triggered: 0,
total_disks: 2,
total_buckets: 1,
last_update: SystemTime::now(),
scan_progress: ScanProgress::default(),
data_usage: data_usage.clone(),
};
aggregator.set_local_stats(summary).await;
// Wait briefly to ensure async cache writes settle in high-concurrency environments
tokio::time::sleep(Duration::from_millis(10)).await;
let aggregated = aggregator.get_aggregated_stats().await.expect("aggregated stats");
assert_eq!(aggregated.node_count, 1);
assert!(aggregated.node_summaries.contains_key("local-node"));
assert_eq!(aggregated.aggregated_data_usage.objects_total_count, 5);
assert_eq!(
aggregated
.aggregated_data_usage
.buckets_usage
.get("bucket")
.expect("bucket usage present")
.objects_count,
5
);
}
#[tokio::test]
async fn aggregated_stats_merge_multiple_nodes() {
let aggregator = DecentralizedStatsAggregator::new(DecentralizedStatsAggregatorConfig::default());
let mut local_usage = DataUsageInfo::default();
let local_bucket = BucketUsageInfo {
objects_count: 3,
versions_count: 3,
size: 150,
..Default::default()
};
local_usage.buckets_usage.insert("local-bucket".to_string(), local_bucket);
local_usage.calculate_totals();
local_usage.buckets_count = local_usage.buckets_usage.len() as u64;
local_usage.last_update = Some(SystemTime::now());
let local_progress = ScanProgress {
current_cycle: 1,
completed_disks: {
let mut set = std::collections::HashSet::new();
set.insert("disk-local".to_string());
set
},
completed_buckets: {
let mut map = std::collections::HashMap::new();
map.insert(
"local-bucket".to_string(),
BucketScanState {
completed: true,
last_object_key: Some("obj1".to_string()),
objects_scanned: 3,
scan_timestamp: SystemTime::now(),
},
);
map
},
..Default::default()
};
let local_summary = StatsSummary {
node_id: "node-local".to_string(),
total_objects_scanned: 30,
total_healthy_objects: 30,
total_corrupted_objects: 0,
total_bytes_scanned: 1500,
total_scan_errors: 0,
total_heal_triggered: 0,
total_disks: 1,
total_buckets: 1,
last_update: SystemTime::now(),
scan_progress: local_progress,
data_usage: local_usage.clone(),
};
let mut remote_usage = DataUsageInfo::default();
let remote_bucket = BucketUsageInfo {
objects_count: 5,
versions_count: 5,
size: 250,
..Default::default()
};
remote_usage.buckets_usage.insert("remote-bucket".to_string(), remote_bucket);
remote_usage.calculate_totals();
remote_usage.buckets_count = remote_usage.buckets_usage.len() as u64;
remote_usage.last_update = Some(SystemTime::now());
let remote_progress = ScanProgress {
current_cycle: 2,
completed_disks: {
let mut set = std::collections::HashSet::new();
set.insert("disk-remote".to_string());
set
},
completed_buckets: {
let mut map = std::collections::HashMap::new();
map.insert(
"remote-bucket".to_string(),
BucketScanState {
completed: true,
last_object_key: Some("remote-obj".to_string()),
objects_scanned: 5,
scan_timestamp: SystemTime::now(),
},
);
map
},
..Default::default()
};
let remote_summary = StatsSummary {
node_id: "node-remote".to_string(),
total_objects_scanned: 50,
total_healthy_objects: 48,
total_corrupted_objects: 2,
total_bytes_scanned: 2048,
total_scan_errors: 1,
total_heal_triggered: 1,
total_disks: 2,
total_buckets: 1,
last_update: SystemTime::now(),
scan_progress: remote_progress,
data_usage: remote_usage.clone(),
};
let node_summaries: HashMap<_, _> = [
(local_summary.node_id.clone(), local_summary.clone()),
(remote_summary.node_id.clone(), remote_summary.clone()),
]
.into_iter()
.collect();
let aggregated = aggregator.aggregate_node_summaries(node_summaries, SystemTime::now()).await;
assert_eq!(aggregated.node_count, 2);
assert_eq!(aggregated.total_objects_scanned, 80);
assert_eq!(aggregated.total_corrupted_objects, 2);
assert_eq!(aggregated.total_disks, 3);
assert!(aggregated.node_summaries.contains_key("node-local"));
assert!(aggregated.node_summaries.contains_key("node-remote"));
assert_eq!(
aggregated.aggregated_data_usage.objects_total_count,
local_usage.objects_total_count + remote_usage.objects_total_count
);
assert_eq!(
aggregated.aggregated_data_usage.objects_total_size,
local_usage.objects_total_size + remote_usage.objects_total_size
);
let mut expected_buckets: HashSet<&str> = HashSet::new();
expected_buckets.insert("local-bucket");
expected_buckets.insert("remote-bucket");
let actual_buckets: HashSet<&str> = aggregated
.aggregated_data_usage
.buckets_usage
.keys()
.map(|s| s.as_str())
.collect();
assert_eq!(expected_buckets, actual_buckets);
}
}

View File

@@ -195,6 +195,7 @@ async fn test_distributed_stats_aggregation() {
total_buckets: 5,
last_update: std::time::SystemTime::now(),
scan_progress: Default::default(),
data_usage: rustfs_common::data_usage::DataUsageInfo::default(),
};
aggregator.set_local_stats(local_stats).await;

View File

@@ -271,7 +271,10 @@ async fn create_test_tier() {
/// Test helper: Check if object exists
async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
((**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await).is_ok()
match (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
Ok(info) => !info.delete_marker,
Err(_) => false,
}
}
/// Test helper: Check if object exists

View File

@@ -144,6 +144,20 @@ pub struct DataUsageInfo {
pub buckets_usage: HashMap<String, BucketUsageInfo>,
/// Deprecated kept here for backward compatibility reasons
pub bucket_sizes: HashMap<String, u64>,
/// Per-disk snapshot information when available
#[serde(default)]
pub disk_usage_status: Vec<DiskUsageStatus>,
}
/// Metadata describing the status of a disk-level data usage snapshot.
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct DiskUsageStatus {
pub disk_id: String,
pub pool_index: Option<usize>,
pub set_index: Option<usize>,
pub disk_index: Option<usize>,
pub last_update: Option<SystemTime>,
pub snapshot_exists: bool,
}
/// Size summary for a single object or group of objects
@@ -1127,6 +1141,8 @@ impl DataUsageInfo {
}
}
self.disk_usage_status.extend(other.disk_usage_status.iter().cloned());
// Recalculate totals
self.calculate_totals();

View File

@@ -326,7 +326,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
if let Some(days) = expiration.days {
let expected_expiry = expected_expiry_time(obj.mod_time.expect("err!"), days /*, date*/);
if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() {
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
@@ -347,7 +347,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
if obj.delete_marker && expired_object_delete_marker {
let due = expiration.next_due(obj);
if let Some(due) = due {
if now.unix_timestamp() == 0 || now.unix_timestamp() > due.unix_timestamp() {
if now.unix_timestamp() >= due.unix_timestamp() {
events.push(Event {
action: IlmAction::DelMarkerDeleteAllVersionsAction,
rule_id: rule.id.clone().expect("err!"),
@@ -380,7 +380,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
if noncurrent_days != 0 {
if let Some(successor_mod_time) = obj.successor_mod_time {
let expected_expiry = expected_expiry_time(successor_mod_time, noncurrent_days);
if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() {
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
@@ -402,9 +402,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE
{
let due = rule.noncurrent_version_transitions.as_ref().unwrap()[0].next_due(obj);
if due.is_some()
&& (now.unix_timestamp() == 0 || now.unix_timestamp() > due.unwrap().unix_timestamp())
{
if due.is_some() && (now.unix_timestamp() >= due.unwrap().unix_timestamp()) {
events.push(Event {
action: IlmAction::TransitionVersionAction,
rule_id: rule.id.clone().expect("err!"),
@@ -436,9 +434,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
if let Some(ref expiration) = rule.expiration {
if let Some(ref date) = expiration.date {
let date0 = OffsetDateTime::from(date.clone());
if date0.unix_timestamp() != 0
&& (now.unix_timestamp() == 0 || now.unix_timestamp() > date0.unix_timestamp())
{
if date0.unix_timestamp() != 0 && (now.unix_timestamp() >= date0.unix_timestamp()) {
info!("eval_inner: expiration by date - date0={:?}", date0);
events.push(Event {
action: IlmAction::DeleteAction,
@@ -459,7 +455,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
now,
now.unix_timestamp() > expected_expiry.unix_timestamp()
);
if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() {
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
info!("eval_inner: object should expire, adding DeleteAction");
let mut event = Event {
action: IlmAction::DeleteAction,
@@ -485,9 +481,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
if let Some(ref transitions) = rule.transitions {
let due = transitions[0].next_due(obj);
if let Some(due) = due {
if due.unix_timestamp() > 0
&& (now.unix_timestamp() == 0 || now.unix_timestamp() > due.unix_timestamp())
{
if due.unix_timestamp() > 0 && (now.unix_timestamp() >= due.unix_timestamp()) {
events.push(Event {
action: IlmAction::TransitionAction,
rule_id: rule.id.clone().expect("err!"),

View File

@@ -12,10 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{collections::HashMap, sync::Arc};
use std::{
collections::{HashMap, hash_map::Entry},
sync::Arc,
time::SystemTime,
};
use crate::{bucket::metadata_sys::get_replication_config, config::com::read_config, store::ECStore, store_api::StorageAPI};
use rustfs_common::data_usage::{BucketTargetUsageInfo, DataUsageCache, DataUsageEntry, DataUsageInfo, SizeSummary};
pub mod local_snapshot;
pub use local_snapshot::{
DATA_USAGE_DIR, DATA_USAGE_STATE_DIR, LOCAL_USAGE_SNAPSHOT_VERSION, LocalUsageSnapshot, LocalUsageSnapshotMeta,
data_usage_dir, data_usage_state_dir, ensure_data_usage_layout, read_snapshot as read_local_snapshot, snapshot_file_name,
snapshot_object_path, snapshot_path, write_snapshot as write_local_snapshot,
};
use crate::{
bucket::metadata_sys::get_replication_config, config::com::read_config, disk::DiskAPI, store::ECStore, store_api::StorageAPI,
};
use rustfs_common::data_usage::{
BucketTargetUsageInfo, BucketUsageInfo, DataUsageCache, DataUsageEntry, DataUsageInfo, DiskUsageStatus, SizeSummary,
};
use rustfs_utils::path::SLASH_SEPARATOR;
use tracing::{error, info, warn};
@@ -144,6 +159,178 @@ pub async fn load_data_usage_from_backend(store: Arc<ECStore>) -> Result<DataUsa
Ok(data_usage_info)
}
/// Aggregate usage information from local disk snapshots.
pub async fn aggregate_local_snapshots(store: Arc<ECStore>) -> Result<(Vec<DiskUsageStatus>, DataUsageInfo), Error> {
let mut aggregated = DataUsageInfo::default();
let mut latest_update: Option<SystemTime> = None;
let mut statuses: Vec<DiskUsageStatus> = Vec::new();
for (pool_idx, pool) in store.pools.iter().enumerate() {
for set_disks in pool.disk_set.iter() {
let disk_entries = {
let guard = set_disks.disks.read().await;
guard.clone()
};
for (disk_index, disk_opt) in disk_entries.into_iter().enumerate() {
let Some(disk) = disk_opt else {
continue;
};
if !disk.is_local() {
continue;
}
let disk_id = match disk.get_disk_id().await.map_err(Error::from)? {
Some(id) => id.to_string(),
None => continue,
};
let root = disk.path();
let mut status = DiskUsageStatus {
disk_id: disk_id.clone(),
pool_index: Some(pool_idx),
set_index: Some(set_disks.set_index),
disk_index: Some(disk_index),
last_update: None,
snapshot_exists: false,
};
if let Some(mut snapshot) = read_local_snapshot(root.as_path(), &disk_id).await? {
status.last_update = snapshot.last_update;
status.snapshot_exists = true;
if snapshot.meta.disk_id.is_empty() {
snapshot.meta.disk_id = disk_id.clone();
}
if snapshot.meta.pool_index.is_none() {
snapshot.meta.pool_index = Some(pool_idx);
}
if snapshot.meta.set_index.is_none() {
snapshot.meta.set_index = Some(set_disks.set_index);
}
if snapshot.meta.disk_index.is_none() {
snapshot.meta.disk_index = Some(disk_index);
}
snapshot.recompute_totals();
if let Some(update) = snapshot.last_update {
if latest_update.is_none_or(|current| update > current) {
latest_update = Some(update);
}
}
aggregated.objects_total_count = aggregated.objects_total_count.saturating_add(snapshot.objects_total_count);
aggregated.versions_total_count =
aggregated.versions_total_count.saturating_add(snapshot.versions_total_count);
aggregated.delete_markers_total_count = aggregated
.delete_markers_total_count
.saturating_add(snapshot.delete_markers_total_count);
aggregated.objects_total_size = aggregated.objects_total_size.saturating_add(snapshot.objects_total_size);
for (bucket, usage) in snapshot.buckets_usage.into_iter() {
let bucket_size = usage.size;
match aggregated.buckets_usage.entry(bucket.clone()) {
Entry::Occupied(mut entry) => entry.get_mut().merge(&usage),
Entry::Vacant(entry) => {
entry.insert(usage.clone());
}
}
aggregated
.bucket_sizes
.entry(bucket)
.and_modify(|size| *size = size.saturating_add(bucket_size))
.or_insert(bucket_size);
}
}
statuses.push(status);
}
}
}
aggregated.buckets_count = aggregated.buckets_usage.len() as u64;
aggregated.last_update = latest_update;
aggregated.disk_usage_status = statuses.clone();
Ok((statuses, aggregated))
}
/// Calculate accurate bucket usage statistics by enumerating objects through the object layer.
pub async fn compute_bucket_usage(store: Arc<ECStore>, bucket_name: &str) -> Result<BucketUsageInfo, Error> {
let mut continuation: Option<String> = None;
let mut objects_count: u64 = 0;
let mut versions_count: u64 = 0;
let mut total_size: u64 = 0;
let mut delete_markers: u64 = 0;
loop {
let result = store
.clone()
.list_objects_v2(
bucket_name,
"", // prefix
continuation.clone(),
None, // delimiter
1000, // max_keys
false, // fetch_owner
None, // start_after
)
.await?;
for object in result.objects.iter() {
if object.is_dir {
continue;
}
if object.delete_marker {
delete_markers = delete_markers.saturating_add(1);
continue;
}
let object_size = object.size.max(0) as u64;
objects_count = objects_count.saturating_add(1);
total_size = total_size.saturating_add(object_size);
let detected_versions = if object.num_versions > 0 {
object.num_versions as u64
} else {
1
};
versions_count = versions_count.saturating_add(detected_versions);
}
if !result.is_truncated {
break;
}
continuation = result.next_continuation_token.clone();
if continuation.is_none() {
warn!(
"Bucket {} listing marked truncated but no continuation token returned; stopping early",
bucket_name
);
break;
}
}
if versions_count == 0 {
versions_count = objects_count;
}
let usage = BucketUsageInfo {
size: total_size,
objects_count,
versions_count,
delete_markers_count: delete_markers,
..Default::default()
};
Ok(usage)
}
/// Build basic data usage info with real object counts
async fn build_basic_data_usage_info(store: Arc<ECStore>) -> Result<DataUsageInfo, Error> {
let mut data_usage_info = DataUsageInfo::default();
@@ -152,56 +339,40 @@ async fn build_basic_data_usage_info(store: Arc<ECStore>) -> Result<DataUsageInf
match store.list_bucket(&crate::store_api::BucketOptions::default()).await {
Ok(buckets) => {
data_usage_info.buckets_count = buckets.len() as u64;
data_usage_info.last_update = Some(std::time::SystemTime::now());
data_usage_info.last_update = Some(SystemTime::now());
let mut total_objects = 0u64;
let mut total_versions = 0u64;
let mut total_size = 0u64;
let mut total_delete_markers = 0u64;
for bucket_info in buckets {
if bucket_info.name.starts_with('.') {
continue; // Skip system buckets
}
// Try to get actual object count for this bucket
let (object_count, bucket_size) = match store
.clone()
.list_objects_v2(
&bucket_info.name,
"", // prefix
None, // continuation_token
None, // delimiter
100, // max_keys - small limit for performance
false, // fetch_owner
None, // start_after
)
.await
{
Ok(result) => {
let count = result.objects.len() as u64;
let size = result.objects.iter().map(|obj| obj.size as u64).sum();
(count, size)
match compute_bucket_usage(store.clone(), &bucket_info.name).await {
Ok(bucket_usage) => {
total_objects = total_objects.saturating_add(bucket_usage.objects_count);
total_versions = total_versions.saturating_add(bucket_usage.versions_count);
total_size = total_size.saturating_add(bucket_usage.size);
total_delete_markers = total_delete_markers.saturating_add(bucket_usage.delete_markers_count);
data_usage_info
.buckets_usage
.insert(bucket_info.name.clone(), bucket_usage.clone());
data_usage_info.bucket_sizes.insert(bucket_info.name, bucket_usage.size);
}
Err(_) => (0, 0),
};
total_objects += object_count;
total_size += bucket_size;
let bucket_usage = rustfs_common::data_usage::BucketUsageInfo {
size: bucket_size,
objects_count: object_count,
versions_count: object_count, // Simplified
delete_markers_count: 0,
..Default::default()
};
data_usage_info.buckets_usage.insert(bucket_info.name.clone(), bucket_usage);
data_usage_info.bucket_sizes.insert(bucket_info.name, bucket_size);
Err(e) => {
warn!("Failed to compute bucket usage for {}: {}", bucket_info.name, e);
}
}
}
data_usage_info.objects_total_count = total_objects;
data_usage_info.versions_total_count = total_versions;
data_usage_info.objects_total_size = total_size;
data_usage_info.versions_total_count = total_objects;
data_usage_info.delete_markers_total_count = total_delete_markers;
}
Err(e) => {
warn!("Failed to list buckets for basic data usage info: {}", e);

View File

@@ -0,0 +1,145 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use serde::{Deserialize, Serialize};
use tokio::fs;
use crate::data_usage::BucketUsageInfo;
use crate::disk::RUSTFS_META_BUCKET;
use crate::error::{Error, Result};
/// Directory used to store per-disk usage snapshots under the metadata bucket.
pub const DATA_USAGE_DIR: &str = "datausage";
/// Directory used to store incremental scan state files under the metadata bucket.
pub const DATA_USAGE_STATE_DIR: &str = "datausage/state";
/// Snapshot file format version, allows forward compatibility if the structure evolves.
pub const LOCAL_USAGE_SNAPSHOT_VERSION: u32 = 1;
/// Additional metadata describing which disk produced the snapshot.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LocalUsageSnapshotMeta {
/// Disk UUID stored as a string for simpler serialization.
pub disk_id: String,
/// Pool index if this disk is bound to a specific pool.
pub pool_index: Option<usize>,
/// Set index if known.
pub set_index: Option<usize>,
/// Disk index inside the set if known.
pub disk_index: Option<usize>,
}
/// Usage snapshot produced by a single disk.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LocalUsageSnapshot {
/// Format version recorded in the snapshot.
pub format_version: u32,
/// Snapshot metadata, including disk identity.
pub meta: LocalUsageSnapshotMeta,
/// Wall-clock timestamp when the snapshot was produced.
pub last_update: Option<SystemTime>,
/// Per-bucket usage statistics.
pub buckets_usage: HashMap<String, BucketUsageInfo>,
/// Cached bucket count to speed up aggregations.
pub buckets_count: u64,
/// Total objects counted on this disk.
pub objects_total_count: u64,
/// Total versions counted on this disk.
pub versions_total_count: u64,
/// Total delete markers counted on this disk.
pub delete_markers_total_count: u64,
/// Total bytes occupied by objects on this disk.
pub objects_total_size: u64,
}
impl LocalUsageSnapshot {
/// Create an empty snapshot with the default format version filled in.
pub fn new(meta: LocalUsageSnapshotMeta) -> Self {
Self {
format_version: LOCAL_USAGE_SNAPSHOT_VERSION,
meta,
..Default::default()
}
}
/// Recalculate cached totals from the per-bucket map.
pub fn recompute_totals(&mut self) {
let mut buckets_count = 0u64;
let mut objects_total_count = 0u64;
let mut versions_total_count = 0u64;
let mut delete_markers_total_count = 0u64;
let mut objects_total_size = 0u64;
for usage in self.buckets_usage.values() {
buckets_count = buckets_count.saturating_add(1);
objects_total_count = objects_total_count.saturating_add(usage.objects_count);
versions_total_count = versions_total_count.saturating_add(usage.versions_count);
delete_markers_total_count = delete_markers_total_count.saturating_add(usage.delete_markers_count);
objects_total_size = objects_total_size.saturating_add(usage.size);
}
self.buckets_count = buckets_count;
self.objects_total_count = objects_total_count;
self.versions_total_count = versions_total_count;
self.delete_markers_total_count = delete_markers_total_count;
self.objects_total_size = objects_total_size;
}
}
/// Build the snapshot file name `<disk-id>.json`.
pub fn snapshot_file_name(disk_id: &str) -> String {
format!("{}.json", disk_id)
}
/// Build the object path relative to `RUSTFS_META_BUCKET`, e.g. `datausage/<disk-id>.json`.
pub fn snapshot_object_path(disk_id: &str) -> String {
format!("{}/{}", DATA_USAGE_DIR, snapshot_file_name(disk_id))
}
/// Return the absolute path to `.rustfs.sys/datausage` on the given disk root.
pub fn data_usage_dir(root: &Path) -> PathBuf {
root.join(RUSTFS_META_BUCKET).join(DATA_USAGE_DIR)
}
/// Return the absolute path to `.rustfs.sys/datausage/state` on the given disk root.
pub fn data_usage_state_dir(root: &Path) -> PathBuf {
root.join(RUSTFS_META_BUCKET).join(DATA_USAGE_STATE_DIR)
}
/// Build the absolute path to the snapshot file for the provided disk ID.
pub fn snapshot_path(root: &Path, disk_id: &str) -> PathBuf {
data_usage_dir(root).join(snapshot_file_name(disk_id))
}
/// Read a snapshot from disk if it exists.
pub async fn read_snapshot(root: &Path, disk_id: &str) -> Result<Option<LocalUsageSnapshot>> {
let path = snapshot_path(root, disk_id);
match fs::read(&path).await {
Ok(content) => {
let snapshot = serde_json::from_slice::<LocalUsageSnapshot>(&content)
.map_err(|err| Error::other(format!("failed to deserialize snapshot {path:?}: {err}")))?;
Ok(Some(snapshot))
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(Error::other(err)),
}
}
/// Persist a snapshot to disk, creating directories as needed and overwriting any existing file.
pub async fn write_snapshot(root: &Path, disk_id: &str, snapshot: &LocalUsageSnapshot) -> Result<()> {
let dir = data_usage_dir(root);
fs::create_dir_all(&dir).await.map_err(Error::other)?;
let path = dir.join(snapshot_file_name(disk_id));
let data = serde_json::to_vec_pretty(snapshot)
.map_err(|err| Error::other(format!("failed to serialize snapshot {path:?}: {err}")))?;
fs::write(&path, data).await.map_err(Error::other)
}
/// Ensure that the data usage directory structure exists on this disk root.
pub async fn ensure_data_usage_layout(root: &Path) -> Result<()> {
let usage_dir = data_usage_dir(root);
fs::create_dir_all(&usage_dir).await.map_err(Error::other)?;
let state_dir = data_usage_state_dir(root);
fs::create_dir_all(&state_dir).await.map_err(Error::other)?;
Ok(())
}

View File

@@ -21,6 +21,7 @@ use super::{
};
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
use crate::data_usage::local_snapshot::ensure_data_usage_layout;
use crate::disk::error::FileAccessDeniedWithContext;
use crate::disk::error_conv::{to_access_error, to_file_error, to_unformatted_disk_error, to_volume_error};
use crate::disk::fs::{
@@ -147,8 +148,10 @@ impl LocalDisk {
}
};
ensure_data_usage_layout(&root).await.map_err(DiskError::from)?;
if cleanup {
// TODO: 删除 tmp 数据
// TODO: remove temporary data
}
// Use optimized path resolution instead of absolutize_virtually

View File

@@ -30,7 +30,9 @@ use rustfs_ecstore::bucket::metadata_sys::{self, get_replication_config};
use rustfs_ecstore::bucket::target::BucketTarget;
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
use rustfs_ecstore::cmd::bucket_targets::{self, GLOBAL_Bucket_Target_Sys};
use rustfs_ecstore::data_usage::load_data_usage_from_backend;
use rustfs_ecstore::data_usage::{
aggregate_local_snapshots, compute_bucket_usage, load_data_usage_from_backend, store_data_usage_in_backend,
};
use rustfs_ecstore::error::StorageError;
use rustfs_ecstore::global::get_global_action_cred;
use rustfs_ecstore::metrics_realtime::{CollectMetricsOpts, MetricType, collect_local_metrics};
@@ -437,20 +439,67 @@ impl Operation for DataUsageInfoHandler {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let mut info = load_data_usage_from_backend(store.clone()).await.map_err(|e| {
error!("load_data_usage_from_backend failed {:?}", e);
s3_error!(InternalError, "load_data_usage_from_backend failed")
})?;
let (disk_statuses, mut info) = match aggregate_local_snapshots(store.clone()).await {
Ok((statuses, usage)) => (statuses, usage),
Err(err) => {
warn!("aggregate_local_snapshots failed: {:?}", err);
(
Vec::new(),
load_data_usage_from_backend(store.clone()).await.map_err(|e| {
error!("load_data_usage_from_backend failed {:?}", e);
s3_error!(InternalError, "load_data_usage_from_backend failed")
})?,
)
}
};
// If no valid data exists, attempt real-time collection
if info.objects_total_count == 0 && info.buckets_count == 0 {
let snapshots_available = disk_statuses.iter().any(|status| status.snapshot_exists);
if !snapshots_available {
if let Ok(fallback) = load_data_usage_from_backend(store.clone()).await {
let mut fallback_info = fallback;
fallback_info.disk_usage_status = disk_statuses.clone();
info = fallback_info;
}
} else {
info.disk_usage_status = disk_statuses.clone();
}
let last_update_age = info.last_update.and_then(|ts| ts.elapsed().ok());
let data_missing = info.objects_total_count == 0 && info.buckets_count == 0;
let stale = last_update_age
.map(|elapsed| elapsed > std::time::Duration::from_secs(300))
.unwrap_or(true);
if data_missing {
info!("No data usage statistics found, attempting real-time collection");
if let Err(e) = collect_realtime_data_usage(&mut info, store.clone()).await {
warn!("Failed to collect real-time data usage: {}", e);
} else if let Err(e) = store_data_usage_in_backend(info.clone(), store.clone()).await {
warn!("Failed to persist refreshed data usage: {}", e);
}
} else if stale {
info!(
"Data usage statistics are stale (last update {:?} ago), refreshing asynchronously",
last_update_age
);
let mut info_for_refresh = info.clone();
let store_for_refresh = store.clone();
tokio::spawn(async move {
if let Err(e) = collect_realtime_data_usage(&mut info_for_refresh, store_for_refresh.clone()).await {
warn!("Background data usage refresh failed: {}", e);
return;
}
if let Err(e) = store_data_usage_in_backend(info_for_refresh, store_for_refresh).await {
warn!("Background data usage persistence failed: {}", e);
}
});
}
info.disk_usage_status = disk_statuses;
// Set capacity information
let sinfo = store.storage_info().await;
info.total_capacity = get_total_usable_capacity(&sinfo.disks, &sinfo) as u64;
@@ -1192,9 +1241,18 @@ async fn collect_realtime_data_usage(
info.buckets_count = buckets.len() as u64;
info.last_update = Some(std::time::SystemTime::now());
info.buckets_usage.clear();
info.bucket_sizes.clear();
info.disk_usage_status.clear();
info.objects_total_count = 0;
info.objects_total_size = 0;
info.versions_total_count = 0;
info.delete_markers_total_count = 0;
let mut total_objects = 0u64;
let mut total_versions = 0u64;
let mut total_size = 0u64;
let mut total_delete_markers = 0u64;
// For each bucket, try to get object count
for bucket_info in buckets {
@@ -1205,61 +1263,30 @@ async fn collect_realtime_data_usage(
continue;
}
// Try to count objects in this bucket
let (object_count, bucket_size) = count_bucket_objects(&store, bucket_name).await.unwrap_or((0, 0));
match compute_bucket_usage(store.clone(), bucket_name).await {
Ok(bucket_usage) => {
total_objects = total_objects.saturating_add(bucket_usage.objects_count);
total_versions = total_versions.saturating_add(bucket_usage.versions_count);
total_size = total_size.saturating_add(bucket_usage.size);
total_delete_markers = total_delete_markers.saturating_add(bucket_usage.delete_markers_count);
total_objects += object_count;
total_size += bucket_size;
let bucket_usage = rustfs_common::data_usage::BucketUsageInfo {
objects_count: object_count,
size: bucket_size,
versions_count: object_count, // Simplified: assume 1 version per object
..Default::default()
};
info.buckets_usage.insert(bucket_name.clone(), bucket_usage);
info.bucket_sizes.insert(bucket_name.clone(), bucket_size);
info.buckets_usage.insert(bucket_name.clone(), bucket_usage.clone());
info.bucket_sizes.insert(bucket_name.clone(), bucket_usage.size);
}
Err(e) => {
warn!("Failed to compute bucket usage for {}: {}", bucket_name, e);
}
}
}
info.objects_total_count = total_objects;
info.objects_total_size = total_size;
info.versions_total_count = total_objects; // Simplified
info.versions_total_count = total_versions;
info.delete_markers_total_count = total_delete_markers;
Ok(())
}
/// Helper function to count objects in a bucket
async fn count_bucket_objects(
store: &Arc<rustfs_ecstore::store::ECStore>,
bucket_name: &str,
) -> Result<(u64, u64), Box<dyn std::error::Error + Send + Sync>> {
// Use list_objects_v2 to get actual object count
match store
.clone()
.list_objects_v2(
bucket_name,
"", // prefix
None, // continuation_token
None, // delimiter
1000, // max_keys - limit for performance
false, // fetch_owner
None, // start_after
)
.await
{
Ok(result) => {
let object_count = result.objects.len() as u64;
let total_size = result.objects.iter().map(|obj| obj.size as u64).sum();
Ok((object_count, total_size))
}
Err(e) => {
warn!("Failed to list objects in bucket {}: {}", bucket_name, e);
Ok((0, 0))
}
}
}
pub struct ProfileHandler {}
#[async_trait::async_trait]
impl Operation for ProfileHandler {