diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index 6c8532d2..3363f907 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -840,6 +840,16 @@ impl Scanner { warn!("Failed to save checkpoint: {}", e); } + // Always trigger data usage collection during scan cycle + let config = self.config.read().await; + if config.enable_data_usage_stats { + info!("Data usage stats enabled, collecting data"); + if let Err(e) = self.collect_and_persist_data_usage().await { + error!("Failed to collect data usage during scan cycle: {}", e); + } + } + drop(config); + // Get aggregated statistics from all nodes debug!("About to get aggregated stats"); match self.stats_aggregator.get_aggregated_stats().await { @@ -920,6 +930,126 @@ impl Scanner { Ok(()) } + /// Collect and persist data usage statistics + async fn collect_and_persist_data_usage(&self) -> Result<()> { + info!("Starting data usage collection and persistence"); + + // Get ECStore instance + let Some(ecstore) = rustfs_ecstore::new_object_layer_fn() else { + warn!("ECStore not available for data usage collection"); + return Ok(()); + }; + + // Collect data usage from NodeScanner stats + let _local_stats = self.node_scanner.get_stats_summary().await; + + // Build data usage from ECStore directly for now + let data_usage = self.build_data_usage_from_ecstore(&ecstore).await?; + + // Update NodeScanner with collected data + self.node_scanner.update_data_usage(data_usage.clone()).await; + + // Store to local cache + { + let mut data_usage_guard = self.data_usage_stats.lock().await; + data_usage_guard.insert("consolidated".to_string(), data_usage.clone()); + } + + // Update last collection time + { + let mut last_collection = self.last_data_usage_collection.write().await; + *last_collection = Some(SystemTime::now()); + } + + // Persist to backend asynchronously + let data_clone = data_usage.clone(); + let store_clone = ecstore.clone(); + tokio::spawn(async move { + if let Err(e) = store_data_usage_in_backend(data_clone, store_clone).await { + error!("Failed to persist data usage to backend: {}", e); + } else { + info!("Successfully persisted data usage to backend"); + } + }); + + info!( + "Data usage collection completed: {} buckets, {} objects", + data_usage.buckets_count, data_usage.objects_total_count + ); + + Ok(()) + } + + /// Build data usage statistics directly from ECStore + async fn build_data_usage_from_ecstore(&self, ecstore: &Arc) -> Result { + let mut data_usage = DataUsageInfo::default(); + + // Get bucket list + match ecstore + .list_bucket(&rustfs_ecstore::store_api::BucketOptions::default()) + .await + { + Ok(buckets) => { + data_usage.buckets_count = buckets.len() as u64; + data_usage.last_update = Some(SystemTime::now()); + + let mut total_objects = 0u64; + let mut total_size = 0u64; + + for bucket_info in buckets { + if bucket_info.name.starts_with('.') { + continue; // Skip system buckets + } + + // Try to get actual object count for this bucket + let (object_count, bucket_size) = match ecstore + .clone() + .list_objects_v2( + &bucket_info.name, + "", // prefix + None, // continuation_token + None, // delimiter + 100, // max_keys - small limit for performance + false, // fetch_owner + None, // start_after + ) + .await + { + Ok(result) => { + let count = result.objects.len() as u64; + let size = result.objects.iter().map(|obj| obj.size as u64).sum(); + (count, size) + } + Err(_) => (0, 0), + }; + + total_objects += object_count; + total_size += bucket_size; + + let bucket_usage = rustfs_common::data_usage::BucketUsageInfo { + size: bucket_size, + objects_count: object_count, + versions_count: object_count, // Simplified + delete_markers_count: 0, + ..Default::default() + }; + + data_usage.buckets_usage.insert(bucket_info.name.clone(), bucket_usage); + data_usage.bucket_sizes.insert(bucket_info.name, bucket_size); + } + + data_usage.objects_total_count = total_objects; + data_usage.objects_total_size = total_size; + data_usage.versions_total_count = total_objects; + } + Err(e) => { + warn!("Failed to list buckets for data usage collection: {}", e); + } + } + + Ok(data_usage) + } + /// Verify object integrity and trigger healing if necessary #[allow(dead_code)] async fn verify_object_integrity(&self, bucket: &str, object: &str) -> Result<()> { diff --git a/crates/ahm/src/scanner/node_scanner.rs b/crates/ahm/src/scanner/node_scanner.rs index dbc8bcec..e5289773 100644 --- a/crates/ahm/src/scanner/node_scanner.rs +++ b/crates/ahm/src/scanner/node_scanner.rs @@ -251,6 +251,8 @@ pub struct ScanProgress { /// estimated completion time #[serde(with = "option_system_time_serde")] pub estimated_completion: Option, + /// data usage statistics + pub data_usage: Option, } impl Default for ScanProgress { @@ -265,6 +267,7 @@ impl Default for ScanProgress { last_scan_key: None, scan_start_time: SystemTime::now(), estimated_completion: None, + data_usage: None, } } } diff --git a/crates/ahm/tests/heal_integration_test.rs b/crates/ahm/tests/heal_integration_test.rs index 3dade30f..bb1a89e6 100644 --- a/crates/ahm/tests/heal_integration_test.rs +++ b/crates/ahm/tests/heal_integration_test.rs @@ -140,285 +140,289 @@ async fn upload_test_object(ecstore: &Arc, bucket: &str, object: &str, info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size); } -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[serial] -async fn test_heal_object_basic() { - let (disk_paths, ecstore, heal_storage) = setup_test_env().await; +mod serial_tests { + use super::*; - // Create test bucket and object - let bucket_name = "test-bucket"; - let object_name = "test-object.txt"; - let test_data = b"Hello, this is test data for healing!"; + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_heal_object_basic() { + let (disk_paths, ecstore, heal_storage) = setup_test_env().await; - create_test_bucket(&ecstore, bucket_name).await; - upload_test_object(&ecstore, bucket_name, object_name, test_data).await; + // Create test bucket and object + let bucket_name = "test-heal-object-basic"; + let object_name = "test-object.txt"; + let test_data = b"Hello, this is test data for healing!"; - // ─── 1️⃣ delete single data shard file ───────────────────────────────────── - let obj_dir = disk_paths[0].join(bucket_name).join(object_name); - // find part file at depth 2, e.g. ...//part.1 - let target_part = WalkDir::new(&obj_dir) - .min_depth(2) - .max_depth(2) - .into_iter() - .filter_map(Result::ok) - .find(|e| e.file_type().is_file() && e.file_name().to_str().map(|n| n.starts_with("part.")).unwrap_or(false)) - .map(|e| e.into_path()) - .expect("Failed to locate part file to delete"); + create_test_bucket(&ecstore, bucket_name).await; + upload_test_object(&ecstore, bucket_name, object_name, test_data).await; - std::fs::remove_file(&target_part).expect("failed to delete part file"); - assert!(!target_part.exists()); - println!("✅ Deleted shard part file: {target_part:?}"); + // ─── 1️⃣ delete single data shard file ───────────────────────────────────── + let obj_dir = disk_paths[0].join(bucket_name).join(object_name); + // find part file at depth 2, e.g. ...//part.1 + let target_part = WalkDir::new(&obj_dir) + .min_depth(2) + .max_depth(2) + .into_iter() + .filter_map(Result::ok) + .find(|e| e.file_type().is_file() && e.file_name().to_str().map(|n| n.starts_with("part.")).unwrap_or(false)) + .map(|e| e.into_path()) + .expect("Failed to locate part file to delete"); - // Create heal manager with faster interval - let cfg = HealConfig { - heal_interval: Duration::from_millis(1), - ..Default::default() - }; - let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg)); - heal_manager.start().await.unwrap(); + std::fs::remove_file(&target_part).expect("failed to delete part file"); + assert!(!target_part.exists()); + println!("✅ Deleted shard part file: {target_part:?}"); - // Submit heal request for the object - let heal_request = HealRequest::new( - HealType::Object { - bucket: bucket_name.to_string(), - object: object_name.to_string(), - version_id: None, - }, - HealOptions { - dry_run: false, - recursive: false, - remove_corrupted: false, - recreate_missing: true, - scan_mode: HealScanMode::Normal, - update_parity: true, - timeout: Some(Duration::from_secs(300)), - pool_index: None, - set_index: None, - }, - HealPriority::Normal, - ); + // Create heal manager with faster interval + let cfg = HealConfig { + heal_interval: Duration::from_millis(1), + ..Default::default() + }; + let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg)); + heal_manager.start().await.unwrap(); - let task_id = heal_manager - .submit_heal_request(heal_request) - .await - .expect("Failed to submit heal request"); + // Submit heal request for the object + let heal_request = HealRequest::new( + HealType::Object { + bucket: bucket_name.to_string(), + object: object_name.to_string(), + version_id: None, + }, + HealOptions { + dry_run: false, + recursive: false, + remove_corrupted: false, + recreate_missing: true, + scan_mode: HealScanMode::Normal, + update_parity: true, + timeout: Some(Duration::from_secs(300)), + pool_index: None, + set_index: None, + }, + HealPriority::Normal, + ); - info!("Submitted heal request with task ID: {}", task_id); + let task_id = heal_manager + .submit_heal_request(heal_request) + .await + .expect("Failed to submit heal request"); - // Wait for task completion - tokio::time::sleep(tokio::time::Duration::from_secs(8)).await; + info!("Submitted heal request with task ID: {}", task_id); - // Attempt to fetch task status (might be removed if finished) - match heal_manager.get_task_status(&task_id).await { - Ok(status) => info!("Task status: {:?}", status), - Err(e) => info!("Task status not found (likely completed): {}", e), + // Wait for task completion + tokio::time::sleep(tokio::time::Duration::from_secs(8)).await; + + // Attempt to fetch task status (might be removed if finished) + match heal_manager.get_task_status(&task_id).await { + Ok(status) => info!("Task status: {:?}", status), + Err(e) => info!("Task status not found (likely completed): {}", e), + } + + // ─── 2️⃣ verify each part file is restored ─────── + assert!(target_part.exists()); + + info!("Heal object basic test passed"); } - // ─── 2️⃣ verify each part file is restored ─────── - assert!(target_part.exists()); + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_heal_bucket_basic() { + let (disk_paths, ecstore, heal_storage) = setup_test_env().await; - info!("Heal object basic test passed"); -} + // Create test bucket + let bucket_name = "test-heal-bucket-basic"; + create_test_bucket(&ecstore, bucket_name).await; -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[serial] -async fn test_heal_bucket_basic() { - let (disk_paths, ecstore, heal_storage) = setup_test_env().await; + // ─── 1️⃣ delete bucket dir on disk ────────────── + let broken_bucket_path = disk_paths[0].join(bucket_name); + assert!(broken_bucket_path.exists(), "bucket dir does not exist on disk"); + std::fs::remove_dir_all(&broken_bucket_path).expect("failed to delete bucket dir on disk"); + assert!(!broken_bucket_path.exists(), "bucket dir still exists after deletion"); + println!("✅ Deleted bucket directory on disk: {broken_bucket_path:?}"); - // Create test bucket - let bucket_name = "test-bucket-heal"; - create_test_bucket(&ecstore, bucket_name).await; + // Create heal manager with faster interval + let cfg = HealConfig { + heal_interval: Duration::from_millis(1), + ..Default::default() + }; + let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg)); + heal_manager.start().await.unwrap(); - // ─── 1️⃣ delete bucket dir on disk ────────────── - let broken_bucket_path = disk_paths[0].join(bucket_name); - assert!(broken_bucket_path.exists(), "bucket dir does not exist on disk"); - std::fs::remove_dir_all(&broken_bucket_path).expect("failed to delete bucket dir on disk"); - assert!(!broken_bucket_path.exists(), "bucket dir still exists after deletion"); - println!("✅ Deleted bucket directory on disk: {broken_bucket_path:?}"); + // Submit heal request for the bucket + let heal_request = HealRequest::new( + HealType::Bucket { + bucket: bucket_name.to_string(), + }, + HealOptions { + dry_run: false, + recursive: true, + remove_corrupted: false, + recreate_missing: false, + scan_mode: HealScanMode::Normal, + update_parity: false, + timeout: Some(Duration::from_secs(300)), + pool_index: None, + set_index: None, + }, + HealPriority::Normal, + ); - // Create heal manager with faster interval - let cfg = HealConfig { - heal_interval: Duration::from_millis(1), - ..Default::default() - }; - let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg)); - heal_manager.start().await.unwrap(); + let task_id = heal_manager + .submit_heal_request(heal_request) + .await + .expect("Failed to submit bucket heal request"); - // Submit heal request for the bucket - let heal_request = HealRequest::new( - HealType::Bucket { - bucket: bucket_name.to_string(), - }, - HealOptions { - dry_run: false, + info!("Submitted bucket heal request with task ID: {}", task_id); + + // Wait for task completion + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // Attempt to fetch task status (optional) + if let Ok(status) = heal_manager.get_task_status(&task_id).await { + if status == HealTaskStatus::Completed { + info!("Bucket heal task status: {:?}", status); + } else { + panic!("Bucket heal task status: {status:?}"); + } + } + + // ─── 3️⃣ Verify bucket directory is restored on every disk ─────── + assert!(broken_bucket_path.exists(), "bucket dir does not exist on disk"); + + info!("Heal bucket basic test passed"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_heal_format_basic() { + let (disk_paths, _ecstore, heal_storage) = setup_test_env().await; + + // ─── 1️⃣ delete format.json on one disk ────────────── + let format_path = disk_paths[0].join(".rustfs.sys").join("format.json"); + assert!(format_path.exists(), "format.json does not exist on disk"); + std::fs::remove_file(&format_path).expect("failed to delete format.json on disk"); + assert!(!format_path.exists(), "format.json still exists after deletion"); + println!("✅ Deleted format.json on disk: {format_path:?}"); + + // Create heal manager with faster interval + let cfg = HealConfig { + heal_interval: Duration::from_secs(2), + ..Default::default() + }; + let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg)); + heal_manager.start().await.unwrap(); + + // Wait for task completion + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // ─── 2️⃣ verify format.json is restored ─────── + assert!(format_path.exists(), "format.json does not exist on disk after heal"); + + info!("Heal format basic test passed"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_heal_format_with_data() { + let (disk_paths, ecstore, heal_storage) = setup_test_env().await; + + // Create test bucket and object + let bucket_name = "test-heal-format-with-data"; + let object_name = "test-object.txt"; + let test_data = b"Hello, this is test data for healing!"; + + create_test_bucket(&ecstore, bucket_name).await; + upload_test_object(&ecstore, bucket_name, object_name, test_data).await; + + let obj_dir = disk_paths[0].join(bucket_name).join(object_name); + let target_part = WalkDir::new(&obj_dir) + .min_depth(2) + .max_depth(2) + .into_iter() + .filter_map(Result::ok) + .find(|e| e.file_type().is_file() && e.file_name().to_str().map(|n| n.starts_with("part.")).unwrap_or(false)) + .map(|e| e.into_path()) + .expect("Failed to locate part file to delete"); + + // ─── 1️⃣ delete format.json on one disk ────────────── + let format_path = disk_paths[0].join(".rustfs.sys").join("format.json"); + std::fs::remove_dir_all(&disk_paths[0]).expect("failed to delete all contents under disk_paths[0]"); + std::fs::create_dir_all(&disk_paths[0]).expect("failed to recreate disk_paths[0] directory"); + println!("✅ Deleted format.json on disk: {:?}", disk_paths[0]); + + // Create heal manager with faster interval + let cfg = HealConfig { + heal_interval: Duration::from_secs(2), + ..Default::default() + }; + let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg)); + heal_manager.start().await.unwrap(); + + // Wait for task completion + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // ─── 2️⃣ verify format.json is restored ─────── + assert!(format_path.exists(), "format.json does not exist on disk after heal"); + // ─── 3 verify each part file is restored ─────── + assert!(target_part.exists()); + + info!("Heal format basic test passed"); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_heal_storage_api_direct() { + let (_disk_paths, ecstore, heal_storage) = setup_test_env().await; + + // Test direct heal storage API calls + + // Test heal_format + let format_result = heal_storage.heal_format(true).await; // dry run + assert!(format_result.is_ok()); + info!("Direct heal_format test passed"); + + // Test heal_bucket + let bucket_name = "test-bucket-direct"; + create_test_bucket(&ecstore, bucket_name).await; + + let heal_opts = HealOpts { recursive: true, - remove_corrupted: false, - recreate_missing: false, + dry_run: true, + remove: false, + recreate: false, scan_mode: HealScanMode::Normal, update_parity: false, - timeout: Some(Duration::from_secs(300)), - pool_index: None, - set_index: None, - }, - HealPriority::Normal, - ); + no_lock: false, + pool: None, + set: None, + }; - let task_id = heal_manager - .submit_heal_request(heal_request) - .await - .expect("Failed to submit bucket heal request"); + let bucket_result = heal_storage.heal_bucket(bucket_name, &heal_opts).await; + assert!(bucket_result.is_ok()); + info!("Direct heal_bucket test passed"); - info!("Submitted bucket heal request with task ID: {}", task_id); + // Test heal_object + let object_name = "test-object-direct.txt"; + let test_data = b"Test data for direct heal API"; + upload_test_object(&ecstore, bucket_name, object_name, test_data).await; - // Wait for task completion - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + let object_heal_opts = HealOpts { + recursive: false, + dry_run: true, + remove: false, + recreate: false, + scan_mode: HealScanMode::Normal, + update_parity: false, + no_lock: false, + pool: None, + set: None, + }; - // Attempt to fetch task status (optional) - if let Ok(status) = heal_manager.get_task_status(&task_id).await { - if status == HealTaskStatus::Completed { - info!("Bucket heal task status: {:?}", status); - } else { - panic!("Bucket heal task status: {status:?}"); - } + let object_result = heal_storage + .heal_object(bucket_name, object_name, None, &object_heal_opts) + .await; + assert!(object_result.is_ok()); + info!("Direct heal_object test passed"); + + info!("Direct heal storage API test passed"); } - - // ─── 3️⃣ Verify bucket directory is restored on every disk ─────── - assert!(broken_bucket_path.exists(), "bucket dir does not exist on disk"); - - info!("Heal bucket basic test passed"); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[serial] -async fn test_heal_format_basic() { - let (disk_paths, _ecstore, heal_storage) = setup_test_env().await; - - // ─── 1️⃣ delete format.json on one disk ────────────── - let format_path = disk_paths[0].join(".rustfs.sys").join("format.json"); - assert!(format_path.exists(), "format.json does not exist on disk"); - std::fs::remove_file(&format_path).expect("failed to delete format.json on disk"); - assert!(!format_path.exists(), "format.json still exists after deletion"); - println!("✅ Deleted format.json on disk: {format_path:?}"); - - // Create heal manager with faster interval - let cfg = HealConfig { - heal_interval: Duration::from_secs(2), - ..Default::default() - }; - let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg)); - heal_manager.start().await.unwrap(); - - // Wait for task completion - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - - // ─── 2️⃣ verify format.json is restored ─────── - assert!(format_path.exists(), "format.json does not exist on disk after heal"); - - info!("Heal format basic test passed"); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[serial] -async fn test_heal_format_with_data() { - let (disk_paths, ecstore, heal_storage) = setup_test_env().await; - - // Create test bucket and object - let bucket_name = "test-bucket"; - let object_name = "test-object.txt"; - let test_data = b"Hello, this is test data for healing!"; - - create_test_bucket(&ecstore, bucket_name).await; - upload_test_object(&ecstore, bucket_name, object_name, test_data).await; - - let obj_dir = disk_paths[0].join(bucket_name).join(object_name); - let target_part = WalkDir::new(&obj_dir) - .min_depth(2) - .max_depth(2) - .into_iter() - .filter_map(Result::ok) - .find(|e| e.file_type().is_file() && e.file_name().to_str().map(|n| n.starts_with("part.")).unwrap_or(false)) - .map(|e| e.into_path()) - .expect("Failed to locate part file to delete"); - - // ─── 1️⃣ delete format.json on one disk ────────────── - let format_path = disk_paths[0].join(".rustfs.sys").join("format.json"); - std::fs::remove_dir_all(&disk_paths[0]).expect("failed to delete all contents under disk_paths[0]"); - std::fs::create_dir_all(&disk_paths[0]).expect("failed to recreate disk_paths[0] directory"); - println!("✅ Deleted format.json on disk: {:?}", disk_paths[0]); - - // Create heal manager with faster interval - let cfg = HealConfig { - heal_interval: Duration::from_secs(2), - ..Default::default() - }; - let heal_manager = HealManager::new(heal_storage.clone(), Some(cfg)); - heal_manager.start().await.unwrap(); - - // Wait for task completion - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - - // ─── 2️⃣ verify format.json is restored ─────── - assert!(format_path.exists(), "format.json does not exist on disk after heal"); - // ─── 3 verify each part file is restored ─────── - assert!(target_part.exists()); - - info!("Heal format basic test passed"); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[serial] -async fn test_heal_storage_api_direct() { - let (_disk_paths, ecstore, heal_storage) = setup_test_env().await; - - // Test direct heal storage API calls - - // Test heal_format - let format_result = heal_storage.heal_format(true).await; // dry run - assert!(format_result.is_ok()); - info!("Direct heal_format test passed"); - - // Test heal_bucket - let bucket_name = "test-bucket-direct"; - create_test_bucket(&ecstore, bucket_name).await; - - let heal_opts = HealOpts { - recursive: true, - dry_run: true, - remove: false, - recreate: false, - scan_mode: HealScanMode::Normal, - update_parity: false, - no_lock: false, - pool: None, - set: None, - }; - - let bucket_result = heal_storage.heal_bucket(bucket_name, &heal_opts).await; - assert!(bucket_result.is_ok()); - info!("Direct heal_bucket test passed"); - - // Test heal_object - let object_name = "test-object-direct.txt"; - let test_data = b"Test data for direct heal API"; - upload_test_object(&ecstore, bucket_name, object_name, test_data).await; - - let object_heal_opts = HealOpts { - recursive: false, - dry_run: true, - remove: false, - recreate: false, - scan_mode: HealScanMode::Normal, - update_parity: false, - no_lock: false, - pool: None, - set: None, - }; - - let object_result = heal_storage - .heal_object(bucket_name, object_name, None, &object_heal_opts) - .await; - assert!(object_result.is_ok()); - info!("Direct heal_object test passed"); - - info!("Direct heal storage API test passed"); } diff --git a/crates/ahm/tests/lifecycle_integration_test.rs b/crates/ahm/tests/lifecycle_integration_test.rs index 8c74bb3d..0f3b20af 100644 --- a/crates/ahm/tests/lifecycle_integration_test.rs +++ b/crates/ahm/tests/lifecycle_integration_test.rs @@ -296,286 +296,290 @@ async fn object_is_transitioned(ecstore: &Arc, bucket: &str, object: &s } } -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[serial] -async fn test_lifecycle_expiry_basic() { - let (_disk_paths, ecstore) = setup_test_env().await; +mod serial_tests { + use super::*; - // Create test bucket and object - let bucket_name = "test-lifecycle-bucket"; - let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/" - let test_data = b"Hello, this is test data for lifecycle expiry!"; + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_lifecycle_expiry_basic() { + let (_disk_paths, ecstore) = setup_test_env().await; - create_test_bucket(&ecstore, bucket_name).await; - upload_test_object(&ecstore, bucket_name, object_name, test_data).await; + // Create test bucket and object + let bucket_name = "test-lifecycle-expiry-basic-bucket"; + let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/" + let test_data = b"Hello, this is test data for lifecycle expiry!"; - // Verify object exists initially - assert!(object_exists(&ecstore, bucket_name, object_name).await); - println!("✅ Object exists before lifecycle processing"); + create_test_bucket(&ecstore, bucket_name).await; + upload_test_object(&ecstore, bucket_name, object_name, test_data).await; - // Set lifecycle configuration with very short expiry (0 days = immediate expiry) - set_bucket_lifecycle(bucket_name) - .await - .expect("Failed to set lifecycle configuration"); - println!("✅ Lifecycle configuration set for bucket: {bucket_name}"); + // Verify object exists initially + assert!(object_exists(&ecstore, bucket_name, object_name).await); + println!("✅ Object exists before lifecycle processing"); - // Verify lifecycle configuration was set - match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await { - Ok(bucket_meta) => { - assert!(bucket_meta.lifecycle_config.is_some()); - println!("✅ Bucket metadata retrieved successfully"); - } - Err(e) => { - println!("❌ Error retrieving bucket metadata: {e:?}"); - } - } - - // Create scanner with very short intervals for testing - let scanner_config = ScannerConfig { - scan_interval: Duration::from_millis(100), - deep_scan_interval: Duration::from_millis(500), - max_concurrent_scans: 1, - ..Default::default() - }; - - let scanner = Scanner::new(Some(scanner_config), None); - - // Start scanner - scanner.start().await.expect("Failed to start scanner"); - println!("✅ Scanner started"); - - // Wait for scanner to process lifecycle rules - tokio::time::sleep(Duration::from_secs(2)).await; - - // Manually trigger a scan cycle to ensure lifecycle processing - scanner.scan_cycle().await.expect("Failed to trigger scan cycle"); - println!("✅ Manual scan cycle completed"); - - // Wait a bit more for background workers to process expiry tasks - tokio::time::sleep(Duration::from_secs(5)).await; - - // Check if object has been expired (delete_marker) - let check_result = object_exists(&ecstore, bucket_name, object_name).await; - println!("Object is_delete_marker after lifecycle processing: {check_result}"); - - if check_result { - println!("❌ Object was not deleted by lifecycle processing"); - } else { - println!("✅ Object was successfully deleted by lifecycle processing"); - // Let's try to get object info to see its details - match ecstore - .get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + // Set lifecycle configuration with very short expiry (0 days = immediate expiry) + set_bucket_lifecycle(bucket_name) .await - { - Ok(obj_info) => { - println!( - "Object info: name={}, size={}, mod_time={:?}", - obj_info.name, obj_info.size, obj_info.mod_time - ); + .expect("Failed to set lifecycle configuration"); + println!("✅ Lifecycle configuration set for bucket: {bucket_name}"); + + // Verify lifecycle configuration was set + match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await { + Ok(bucket_meta) => { + assert!(bucket_meta.lifecycle_config.is_some()); + println!("✅ Bucket metadata retrieved successfully"); } Err(e) => { - println!("Error getting object info: {e:?}"); + println!("❌ Error retrieving bucket metadata: {e:?}"); } } + + // Create scanner with very short intervals for testing + let scanner_config = ScannerConfig { + scan_interval: Duration::from_millis(100), + deep_scan_interval: Duration::from_millis(500), + max_concurrent_scans: 1, + ..Default::default() + }; + + let scanner = Scanner::new(Some(scanner_config), None); + + // Start scanner + scanner.start().await.expect("Failed to start scanner"); + println!("✅ Scanner started"); + + // Wait for scanner to process lifecycle rules + tokio::time::sleep(Duration::from_secs(2)).await; + + // Manually trigger a scan cycle to ensure lifecycle processing + scanner.scan_cycle().await.expect("Failed to trigger scan cycle"); + println!("✅ Manual scan cycle completed"); + + // Wait a bit more for background workers to process expiry tasks + tokio::time::sleep(Duration::from_secs(5)).await; + + // Check if object has been expired (delete_marker) + let check_result = object_exists(&ecstore, bucket_name, object_name).await; + println!("Object is_delete_marker after lifecycle processing: {check_result}"); + + if check_result { + println!("❌ Object was not deleted by lifecycle processing"); + } else { + println!("✅ Object was successfully deleted by lifecycle processing"); + // Let's try to get object info to see its details + match ecstore + .get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + .await + { + Ok(obj_info) => { + println!( + "Object info: name={}, size={}, mod_time={:?}", + obj_info.name, obj_info.size, obj_info.mod_time + ); + } + Err(e) => { + println!("Error getting object info: {e:?}"); + } + } + } + + assert!(!check_result); + println!("✅ Object successfully expired"); + + // Stop scanner + let _ = scanner.stop().await; + println!("✅ Scanner stopped"); + + println!("Lifecycle expiry basic test completed"); } - assert!(!check_result); - println!("✅ Object successfully expired"); + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_lifecycle_expiry_deletemarker() { + let (_disk_paths, ecstore) = setup_test_env().await; - // Stop scanner - let _ = scanner.stop().await; - println!("✅ Scanner stopped"); + // Create test bucket and object + let bucket_name = "test-lifecycle-expiry-deletemarker-bucket"; + let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/" + let test_data = b"Hello, this is test data for lifecycle expiry!"; - println!("Lifecycle expiry basic test completed"); -} + create_test_lock_bucket(&ecstore, bucket_name).await; + upload_test_object(&ecstore, bucket_name, object_name, test_data).await; -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[serial] -async fn test_lifecycle_expiry_deletemarker() { - let (_disk_paths, ecstore) = setup_test_env().await; + // Verify object exists initially + assert!(object_exists(&ecstore, bucket_name, object_name).await); + println!("✅ Object exists before lifecycle processing"); - // Create test bucket and object - let bucket_name = "test-lifecycle-bucket"; - let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/" - let test_data = b"Hello, this is test data for lifecycle expiry!"; - - create_test_lock_bucket(&ecstore, bucket_name).await; - upload_test_object(&ecstore, bucket_name, object_name, test_data).await; - - // Verify object exists initially - assert!(object_exists(&ecstore, bucket_name, object_name).await); - println!("✅ Object exists before lifecycle processing"); - - // Set lifecycle configuration with very short expiry (0 days = immediate expiry) - set_bucket_lifecycle_deletemarker(bucket_name) - .await - .expect("Failed to set lifecycle configuration"); - println!("✅ Lifecycle configuration set for bucket: {bucket_name}"); - - // Verify lifecycle configuration was set - match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await { - Ok(bucket_meta) => { - assert!(bucket_meta.lifecycle_config.is_some()); - println!("✅ Bucket metadata retrieved successfully"); - } - Err(e) => { - println!("❌ Error retrieving bucket metadata: {e:?}"); - } - } - - // Create scanner with very short intervals for testing - let scanner_config = ScannerConfig { - scan_interval: Duration::from_millis(100), - deep_scan_interval: Duration::from_millis(500), - max_concurrent_scans: 1, - ..Default::default() - }; - - let scanner = Scanner::new(Some(scanner_config), None); - - // Start scanner - scanner.start().await.expect("Failed to start scanner"); - println!("✅ Scanner started"); - - // Wait for scanner to process lifecycle rules - tokio::time::sleep(Duration::from_secs(2)).await; - - // Manually trigger a scan cycle to ensure lifecycle processing - scanner.scan_cycle().await.expect("Failed to trigger scan cycle"); - println!("✅ Manual scan cycle completed"); - - // Wait a bit more for background workers to process expiry tasks - tokio::time::sleep(Duration::from_secs(5)).await; - - // Check if object has been expired (deleted) - //let check_result = object_is_delete_marker(&ecstore, bucket_name, object_name).await; - let check_result = object_exists(&ecstore, bucket_name, object_name).await; - println!("Object exists after lifecycle processing: {check_result}"); - - if !check_result { - println!("❌ Object was not deleted by lifecycle processing"); - // Let's try to get object info to see its details - match ecstore - .get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + // Set lifecycle configuration with very short expiry (0 days = immediate expiry) + set_bucket_lifecycle_deletemarker(bucket_name) .await - { - Ok(obj_info) => { - println!( - "Object info: name={}, size={}, mod_time={:?}", - obj_info.name, obj_info.size, obj_info.mod_time - ); + .expect("Failed to set lifecycle configuration"); + println!("✅ Lifecycle configuration set for bucket: {bucket_name}"); + + // Verify lifecycle configuration was set + match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await { + Ok(bucket_meta) => { + assert!(bucket_meta.lifecycle_config.is_some()); + println!("✅ Bucket metadata retrieved successfully"); } Err(e) => { - println!("Error getting object info: {e:?}"); + println!("❌ Error retrieving bucket metadata: {e:?}"); } } - } else { - println!("✅ Object was successfully deleted by lifecycle processing"); + + // Create scanner with very short intervals for testing + let scanner_config = ScannerConfig { + scan_interval: Duration::from_millis(100), + deep_scan_interval: Duration::from_millis(500), + max_concurrent_scans: 1, + ..Default::default() + }; + + let scanner = Scanner::new(Some(scanner_config), None); + + // Start scanner + scanner.start().await.expect("Failed to start scanner"); + println!("✅ Scanner started"); + + // Wait for scanner to process lifecycle rules + tokio::time::sleep(Duration::from_secs(2)).await; + + // Manually trigger a scan cycle to ensure lifecycle processing + scanner.scan_cycle().await.expect("Failed to trigger scan cycle"); + println!("✅ Manual scan cycle completed"); + + // Wait a bit more for background workers to process expiry tasks + tokio::time::sleep(Duration::from_secs(5)).await; + + // Check if object has been expired (deleted) + //let check_result = object_is_delete_marker(&ecstore, bucket_name, object_name).await; + let check_result = object_exists(&ecstore, bucket_name, object_name).await; + println!("Object exists after lifecycle processing: {check_result}"); + + if !check_result { + println!("❌ Object was not deleted by lifecycle processing"); + // Let's try to get object info to see its details + match ecstore + .get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + .await + { + Ok(obj_info) => { + println!( + "Object info: name={}, size={}, mod_time={:?}", + obj_info.name, obj_info.size, obj_info.mod_time + ); + } + Err(e) => { + println!("Error getting object info: {e:?}"); + } + } + } else { + println!("✅ Object was successfully deleted by lifecycle processing"); + } + + assert!(check_result); + println!("✅ Object successfully expired"); + + // Stop scanner + let _ = scanner.stop().await; + println!("✅ Scanner stopped"); + + println!("Lifecycle expiry basic test completed"); } - assert!(check_result); - println!("✅ Object successfully expired"); + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[serial] + async fn test_lifecycle_transition_basic() { + let (_disk_paths, ecstore) = setup_test_env().await; - // Stop scanner - let _ = scanner.stop().await; - println!("✅ Scanner stopped"); + //create_test_tier().await; - println!("Lifecycle expiry basic test completed"); -} + // Create test bucket and object + let bucket_name = "test-lifecycle-transition-basic-bucket"; + let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/" + let test_data = b"Hello, this is test data for lifecycle expiry!"; -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[serial] -async fn test_lifecycle_transition_basic() { - let (_disk_paths, ecstore) = setup_test_env().await; + create_test_bucket(&ecstore, bucket_name).await; + upload_test_object(&ecstore, bucket_name, object_name, test_data).await; - //create_test_tier().await; + // Verify object exists initially + assert!(object_exists(&ecstore, bucket_name, object_name).await); + println!("✅ Object exists before lifecycle processing"); - // Create test bucket and object - let bucket_name = "test-lifecycle-bucket"; - let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/" - let test_data = b"Hello, this is test data for lifecycle expiry!"; - - create_test_bucket(&ecstore, bucket_name).await; - upload_test_object(&ecstore, bucket_name, object_name, test_data).await; - - // Verify object exists initially - assert!(object_exists(&ecstore, bucket_name, object_name).await); - println!("✅ Object exists before lifecycle processing"); - - // Set lifecycle configuration with very short expiry (0 days = immediate expiry) - /*set_bucket_lifecycle_transition(bucket_name) - .await - .expect("Failed to set lifecycle configuration"); - println!("✅ Lifecycle configuration set for bucket: {bucket_name}"); - - // Verify lifecycle configuration was set - match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await { - Ok(bucket_meta) => { - assert!(bucket_meta.lifecycle_config.is_some()); - println!("✅ Bucket metadata retrieved successfully"); - } - Err(e) => { - println!("❌ Error retrieving bucket metadata: {e:?}"); - } - }*/ - - // Create scanner with very short intervals for testing - let scanner_config = ScannerConfig { - scan_interval: Duration::from_millis(100), - deep_scan_interval: Duration::from_millis(500), - max_concurrent_scans: 1, - ..Default::default() - }; - - let scanner = Scanner::new(Some(scanner_config), None); - - // Start scanner - scanner.start().await.expect("Failed to start scanner"); - println!("✅ Scanner started"); - - // Wait for scanner to process lifecycle rules - tokio::time::sleep(Duration::from_secs(2)).await; - - // Manually trigger a scan cycle to ensure lifecycle processing - scanner.scan_cycle().await.expect("Failed to trigger scan cycle"); - println!("✅ Manual scan cycle completed"); - - // Wait a bit more for background workers to process expiry tasks - tokio::time::sleep(Duration::from_secs(5)).await; - - // Check if object has been expired (deleted) - //let check_result = object_is_transitioned(&ecstore, bucket_name, object_name).await; - let check_result = object_exists(&ecstore, bucket_name, object_name).await; - println!("Object exists after lifecycle processing: {check_result}"); - - if check_result { - println!("✅ Object was not deleted by lifecycle processing"); - // Let's try to get object info to see its details - match ecstore - .get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + // Set lifecycle configuration with very short expiry (0 days = immediate expiry) + /*set_bucket_lifecycle_transition(bucket_name) .await - { - Ok(obj_info) => { - println!( - "Object info: name={}, size={}, mod_time={:?}", - obj_info.name, obj_info.size, obj_info.mod_time - ); - println!("Object info: transitioned_object={:?}", obj_info.transitioned_object); + .expect("Failed to set lifecycle configuration"); + println!("✅ Lifecycle configuration set for bucket: {bucket_name}"); + + // Verify lifecycle configuration was set + match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await { + Ok(bucket_meta) => { + assert!(bucket_meta.lifecycle_config.is_some()); + println!("✅ Bucket metadata retrieved successfully"); } Err(e) => { - println!("Error getting object info: {e:?}"); + println!("❌ Error retrieving bucket metadata: {e:?}"); } + }*/ + + // Create scanner with very short intervals for testing + let scanner_config = ScannerConfig { + scan_interval: Duration::from_millis(100), + deep_scan_interval: Duration::from_millis(500), + max_concurrent_scans: 1, + ..Default::default() + }; + + let scanner = Scanner::new(Some(scanner_config), None); + + // Start scanner + scanner.start().await.expect("Failed to start scanner"); + println!("✅ Scanner started"); + + // Wait for scanner to process lifecycle rules + tokio::time::sleep(Duration::from_secs(2)).await; + + // Manually trigger a scan cycle to ensure lifecycle processing + scanner.scan_cycle().await.expect("Failed to trigger scan cycle"); + println!("✅ Manual scan cycle completed"); + + // Wait a bit more for background workers to process expiry tasks + tokio::time::sleep(Duration::from_secs(5)).await; + + // Check if object has been expired (deleted) + //let check_result = object_is_transitioned(&ecstore, bucket_name, object_name).await; + let check_result = object_exists(&ecstore, bucket_name, object_name).await; + println!("Object exists after lifecycle processing: {check_result}"); + + if check_result { + println!("✅ Object was not deleted by lifecycle processing"); + // Let's try to get object info to see its details + match ecstore + .get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + .await + { + Ok(obj_info) => { + println!( + "Object info: name={}, size={}, mod_time={:?}", + obj_info.name, obj_info.size, obj_info.mod_time + ); + println!("Object info: transitioned_object={:?}", obj_info.transitioned_object); + } + Err(e) => { + println!("Error getting object info: {e:?}"); + } + } + } else { + println!("❌ Object was deleted by lifecycle processing"); } - } else { - println!("❌ Object was deleted by lifecycle processing"); + + assert!(check_result); + println!("✅ Object successfully transitioned"); + + // Stop scanner + let _ = scanner.stop().await; + println!("✅ Scanner stopped"); + + println!("Lifecycle transition basic test completed"); } - - assert!(check_result); - println!("✅ Object successfully transitioned"); - - // Stop scanner - let _ = scanner.stop().await; - println!("✅ Scanner stopped"); - - println!("Lifecycle transition basic test completed"); } diff --git a/crates/ecstore/src/data_usage.rs b/crates/ecstore/src/data_usage.rs index 425081ef..c9dbed85 100644 --- a/crates/ecstore/src/data_usage.rs +++ b/crates/ecstore/src/data_usage.rs @@ -14,10 +14,10 @@ use std::{collections::HashMap, sync::Arc}; -use crate::{bucket::metadata_sys::get_replication_config, config::com::read_config, store::ECStore}; +use crate::{bucket::metadata_sys::get_replication_config, config::com::read_config, store::ECStore, store_api::StorageAPI}; use rustfs_common::data_usage::{BucketTargetUsageInfo, DataUsageCache, DataUsageEntry, DataUsageInfo, SizeSummary}; use rustfs_utils::path::SLASH_SEPARATOR; -use tracing::{error, warn}; +use tracing::{error, info, warn}; use crate::error::Error; @@ -61,12 +61,13 @@ pub async fn store_data_usage_in_backend(data_usage_info: DataUsageInfo, store: /// Load data usage info from backend storage pub async fn load_data_usage_from_backend(store: Arc) -> Result { - let buf: Vec = match read_config(store, &DATA_USAGE_OBJ_NAME_PATH).await { + let buf: Vec = match read_config(store.clone(), &DATA_USAGE_OBJ_NAME_PATH).await { Ok(data) => data, Err(e) => { error!("Failed to read data usage info from backend: {}", e); if e == crate::error::Error::ConfigNotFound { - return Ok(DataUsageInfo::default()); + warn!("Data usage config not found, building basic statistics"); + return build_basic_data_usage_info(store).await; } return Err(Error::other(e)); } @@ -75,9 +76,22 @@ pub async fn load_data_usage_from_backend(store: Arc) -> Result) -> Result 0 || bui.replication_failed_count_v1 > 0 @@ -129,6 +144,73 @@ pub async fn load_data_usage_from_backend(store: Arc) -> Result) -> Result { + let mut data_usage_info = DataUsageInfo::default(); + + // Get bucket list + match store.list_bucket(&crate::store_api::BucketOptions::default()).await { + Ok(buckets) => { + data_usage_info.buckets_count = buckets.len() as u64; + data_usage_info.last_update = Some(std::time::SystemTime::now()); + + let mut total_objects = 0u64; + let mut total_size = 0u64; + + for bucket_info in buckets { + if bucket_info.name.starts_with('.') { + continue; // Skip system buckets + } + + // Try to get actual object count for this bucket + let (object_count, bucket_size) = match store + .clone() + .list_objects_v2( + &bucket_info.name, + "", // prefix + None, // continuation_token + None, // delimiter + 100, // max_keys - small limit for performance + false, // fetch_owner + None, // start_after + ) + .await + { + Ok(result) => { + let count = result.objects.len() as u64; + let size = result.objects.iter().map(|obj| obj.size as u64).sum(); + (count, size) + } + Err(_) => (0, 0), + }; + + total_objects += object_count; + total_size += bucket_size; + + let bucket_usage = rustfs_common::data_usage::BucketUsageInfo { + size: bucket_size, + objects_count: object_count, + versions_count: object_count, // Simplified + delete_markers_count: 0, + ..Default::default() + }; + + data_usage_info.buckets_usage.insert(bucket_info.name.clone(), bucket_usage); + data_usage_info.bucket_sizes.insert(bucket_info.name, bucket_size); + } + + data_usage_info.objects_total_count = total_objects; + data_usage_info.objects_total_size = total_size; + data_usage_info.versions_total_count = total_objects; + } + Err(e) => { + warn!("Failed to list buckets for basic data usage info: {}", e); + } + } + + Ok(data_usage_info) +} + /// Create a data usage cache entry from size summary pub fn create_cache_entry_from_summary(summary: &SizeSummary) -> DataUsageEntry { let mut entry = DataUsageEntry::default(); diff --git a/crates/protos/src/main.rs b/crates/protos/src/main.rs index 99d77c61..223bfaa9 100644 --- a/crates/protos/src/main.rs +++ b/crates/protos/src/main.rs @@ -29,8 +29,9 @@ fn main() -> Result<(), AnyError> { let need_compile = match version.compare_ext(&VERSION_PROTOBUF) { Ok(cmp::Ordering::Greater) => true, Ok(_) => { - let version_err = Version::build_error_message(&version, &VERSION_PROTOBUF).unwrap(); - println!("cargo:warning=Tool `protoc` {version_err}, skip compiling."); + if let Some(version_err) = Version::build_error_message(&version, &VERSION_PROTOBUF) { + println!("cargo:warning=Tool `protoc` {version_err}, skip compiling."); + } false } Err(version_err) => { @@ -144,8 +145,9 @@ fn compile_flatbuffers_models, S: AsRef>( let need_compile = match version.compare_ext(&VERSION_FLATBUFFERS) { Ok(cmp::Ordering::Greater) => true, Ok(_) => { - let version_err = Version::build_error_message(&version, &VERSION_FLATBUFFERS).unwrap(); - println!("cargo:warning=Tool `{flatc_path}` {version_err}, skip compiling."); + if let Some(version_err) = Version::build_error_message(&version, &VERSION_FLATBUFFERS) { + println!("cargo:warning=Tool `{flatc_path}` {version_err}, skip compiling."); + } false } Err(version_err) => { @@ -253,7 +255,13 @@ impl Version { } else { match self.compare_major_version(expected_version) { cmp::Ordering::Greater => Ok(cmp::Ordering::Greater), - _ => Err(Self::build_error_message(self, expected_version).unwrap()), + _ => { + if let Some(error_msg) = Self::build_error_message(self, expected_version) { + Err(error_msg) + } else { + Err("Unknown version comparison error".to_string()) + } + } } } } diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index c3daf36e..c00e1db0 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -365,6 +365,16 @@ impl Operation for DataUsageInfoHandler { s3_error!(InternalError, "load_data_usage_from_backend failed") })?; + // If no valid data exists, attempt real-time collection + if info.objects_total_count == 0 && info.buckets_count == 0 { + info!("No data usage statistics found, attempting real-time collection"); + + if let Err(e) = collect_realtime_data_usage(&mut info, store.clone()).await { + warn!("Failed to collect real-time data usage: {}", e); + } + } + + // Set capacity information let sinfo = store.storage_info().await; info.total_capacity = get_total_usable_capacity(&sinfo.disks, &sinfo) as u64; info.total_free_capacity = get_total_usable_capacity_free(&sinfo.disks, &sinfo) as u64; @@ -1093,6 +1103,86 @@ impl Operation for RemoveRemoteTargetHandler { } } +/// Real-time data collection function +async fn collect_realtime_data_usage( + info: &mut rustfs_common::data_usage::DataUsageInfo, + store: Arc, +) -> Result<(), Box> { + // Get bucket list and collect basic statistics + let buckets = store + .list_bucket(&rustfs_ecstore::store_api::BucketOptions::default()) + .await?; + + info.buckets_count = buckets.len() as u64; + info.last_update = Some(std::time::SystemTime::now()); + + let mut total_objects = 0u64; + let mut total_size = 0u64; + + // For each bucket, try to get object count + for bucket_info in buckets { + let bucket_name = &bucket_info.name; + + // Skip system buckets + if bucket_name.starts_with('.') { + continue; + } + + // Try to count objects in this bucket + let (object_count, bucket_size) = count_bucket_objects(&store, bucket_name).await.unwrap_or((0, 0)); + + total_objects += object_count; + total_size += bucket_size; + + let bucket_usage = rustfs_common::data_usage::BucketUsageInfo { + objects_count: object_count, + size: bucket_size, + versions_count: object_count, // Simplified: assume 1 version per object + ..Default::default() + }; + + info.buckets_usage.insert(bucket_name.clone(), bucket_usage); + info.bucket_sizes.insert(bucket_name.clone(), bucket_size); + } + + info.objects_total_count = total_objects; + info.objects_total_size = total_size; + info.versions_total_count = total_objects; // Simplified + + Ok(()) +} + +/// Helper function to count objects in a bucket +async fn count_bucket_objects( + store: &Arc, + bucket_name: &str, +) -> Result<(u64, u64), Box> { + // Use list_objects_v2 to get actual object count + match store + .clone() + .list_objects_v2( + bucket_name, + "", // prefix + None, // continuation_token + None, // delimiter + 1000, // max_keys - limit for performance + false, // fetch_owner + None, // start_after + ) + .await + { + Ok(result) => { + let object_count = result.objects.len() as u64; + let total_size = result.objects.iter().map(|obj| obj.size as u64).sum(); + Ok((object_count, total_size)) + } + Err(e) => { + warn!("Failed to list objects in bucket {}: {}", bucket_name, e); + Ok((0, 0)) + } + } +} + #[cfg(test)] mod tests { use super::*;