diff --git a/crates/common/src/metrics.rs b/crates/common/src/metrics.rs index f0fc031a..196e16c2 100644 --- a/crates/common/src/metrics.rs +++ b/crates/common/src/metrics.rs @@ -96,6 +96,11 @@ pub enum Metric { ApplyNonCurrent, HealAbandonedVersion, + // Quota metrics: + QuotaCheck, + QuotaViolation, + QuotaSync, + // START Trace metrics: StartTrace, ScanObject, // Scan object. All operations included. @@ -131,6 +136,9 @@ impl Metric { Self::CleanAbandoned => "clean_abandoned", Self::ApplyNonCurrent => "apply_non_current", Self::HealAbandonedVersion => "heal_abandoned_version", + Self::QuotaCheck => "quota_check", + Self::QuotaViolation => "quota_violation", + Self::QuotaSync => "quota_sync", Self::StartTrace => "start_trace", Self::ScanObject => "scan_object", Self::HealAbandonedObject => "heal_abandoned_object", @@ -163,15 +171,18 @@ impl Metric { 10 => Some(Self::CleanAbandoned), 11 => Some(Self::ApplyNonCurrent), 12 => Some(Self::HealAbandonedVersion), - 13 => Some(Self::StartTrace), - 14 => Some(Self::ScanObject), - 15 => Some(Self::HealAbandonedObject), - 16 => Some(Self::LastRealtime), - 17 => Some(Self::ScanFolder), - 18 => Some(Self::ScanCycle), - 19 => Some(Self::ScanBucketDrive), - 20 => Some(Self::CompactFolder), - 21 => Some(Self::Last), + 13 => Some(Self::QuotaCheck), + 14 => Some(Self::QuotaViolation), + 15 => Some(Self::QuotaSync), + 16 => Some(Self::StartTrace), + 17 => Some(Self::ScanObject), + 18 => Some(Self::HealAbandonedObject), + 19 => Some(Self::LastRealtime), + 20 => Some(Self::ScanFolder), + 21 => Some(Self::ScanCycle), + 22 => Some(Self::ScanBucketDrive), + 23 => Some(Self::CompactFolder), + 24 => Some(Self::Last), _ => None, } } diff --git a/crates/config/src/constants/mod.rs b/crates/config/src/constants/mod.rs index 49fa0080..a526c5ac 100644 --- a/crates/config/src/constants/mod.rs +++ b/crates/config/src/constants/mod.rs @@ -21,6 +21,7 @@ pub(crate) mod heal; pub(crate) mod object; pub(crate) mod profiler; pub(crate) mod protocols; +pub(crate) mod quota; pub(crate) mod runtime; pub(crate) mod targets; pub(crate) mod tls; diff --git a/crates/config/src/constants/quota.rs b/crates/config/src/constants/quota.rs new file mode 100644 index 00000000..90d98d60 --- /dev/null +++ b/crates/config/src/constants/quota.rs @@ -0,0 +1,26 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub const QUOTA_CONFIG_FILE: &str = "quota.json"; +pub const QUOTA_TYPE_HARD: &str = "HARD"; + +pub const QUOTA_EXCEEDED_ERROR_CODE: &str = "XRustfsQuotaExceeded"; +pub const QUOTA_INVALID_CONFIG_ERROR_CODE: &str = "InvalidArgument"; +pub const QUOTA_NOT_FOUND_ERROR_CODE: &str = "NoSuchBucket"; +pub const QUOTA_INTERNAL_ERROR_CODE: &str = "InternalError"; + +pub const QUOTA_API_PATH: &str = "/rustfs/admin/v3/quota/{bucket}"; + +pub const QUOTA_INVALID_TYPE_ERROR_MSG: &str = "Only HARD quota type is supported"; +pub const QUOTA_METADATA_SYSTEM_ERROR_MSG: &str = "Bucket metadata system not initialized"; diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index 4b6d42fb..5a8b6800 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -33,6 +33,8 @@ pub use constants::profiler::*; #[cfg(feature = "constants")] pub use constants::protocols::*; #[cfg(feature = "constants")] +pub use constants::quota::*; +#[cfg(feature = "constants")] pub use constants::runtime::*; #[cfg(feature = "constants")] pub use constants::targets::*; diff --git a/crates/e2e_test/src/common.rs b/crates/e2e_test/src/common.rs index 9b94005e..da70db83 100644 --- a/crates/e2e_test/src/common.rs +++ b/crates/e2e_test/src/common.rs @@ -176,12 +176,14 @@ impl RustFSTestEnvironment { /// Kill any existing RustFS processes pub async fn cleanup_existing_processes(&self) -> Result<(), Box> { info!("Cleaning up any existing RustFS processes"); - let output = Command::new("pkill").args(["-f", "rustfs"]).output(); + let binary_path = rustfs_binary_path(); + let binary_name = binary_path.to_string_lossy(); + let output = Command::new("pkill").args(["-f", &binary_name]).output(); if let Ok(output) = output && output.status.success() { - info!("Killed existing RustFS processes"); + info!("Killed existing RustFS processes: {}", binary_name); sleep(Duration::from_millis(1000)).await; } Ok(()) @@ -363,3 +365,12 @@ pub async fn awscurl_put( ) -> Result> { execute_awscurl(url, "PUT", Some(body), access_key, secret_key).await } + +/// Helper function for DELETE requests +pub async fn awscurl_delete( + url: &str, + access_key: &str, + secret_key: &str, +) -> Result> { + execute_awscurl(url, "DELETE", None, access_key, secret_key).await +} diff --git a/crates/e2e_test/src/lib.rs b/crates/e2e_test/src/lib.rs index b635afb7..0d5a8bbb 100644 --- a/crates/e2e_test/src/lib.rs +++ b/crates/e2e_test/src/lib.rs @@ -29,6 +29,10 @@ mod data_usage_test; #[cfg(test)] mod kms; +// Quota tests +#[cfg(test)] +mod quota_test; + #[cfg(test)] mod bucket_policy_check_test; diff --git a/crates/e2e_test/src/quota_test.rs b/crates/e2e_test/src/quota_test.rs new file mode 100644 index 00000000..f3c53622 --- /dev/null +++ b/crates/e2e_test/src/quota_test.rs @@ -0,0 +1,798 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::common::{RustFSTestEnvironment, awscurl_delete, awscurl_get, awscurl_post, awscurl_put, init_logging}; +use aws_sdk_s3::Client; +use serial_test::serial; +use tracing::{debug, info}; + +/// Test environment setup for quota tests +pub struct QuotaTestEnv { + pub env: RustFSTestEnvironment, + pub client: Client, + pub bucket_name: String, +} + +impl QuotaTestEnv { + pub async fn new() -> Result> { + let bucket_name = format!("quota-test-{}", uuid::Uuid::new_v4()); + let mut env = RustFSTestEnvironment::new().await?; + env.start_rustfs_server(vec![]).await?; + let client = env.create_s3_client(); + + Ok(Self { + env, + client, + bucket_name, + }) + } + + pub async fn create_bucket(&self) -> Result<(), Box> { + self.env.create_test_bucket(&self.bucket_name).await?; + Ok(()) + } + + pub async fn cleanup_bucket(&self) -> Result<(), Box> { + let objects = self.client.list_objects_v2().bucket(&self.bucket_name).send().await?; + for object in objects.contents() { + self.client + .delete_object() + .bucket(&self.bucket_name) + .key(object.key().unwrap_or_default()) + .send() + .await?; + } + self.env.delete_test_bucket(&self.bucket_name).await?; + Ok(()) + } + + pub async fn set_bucket_quota(&self, quota_bytes: u64) -> Result<(), Box> { + let url = format!("{}/rustfs/admin/v3/quota/{}", self.env.url, self.bucket_name); + let quota_config = serde_json::json!({ + "quota": quota_bytes, + "quota_type": "HARD" + }); + + let response = awscurl_put(&url, "a_config.to_string(), &self.env.access_key, &self.env.secret_key).await?; + if response.contains("error") { + Err(format!("Failed to set quota: {}", response).into()) + } else { + Ok(()) + } + } + + pub async fn get_bucket_quota(&self) -> Result, Box> { + let url = format!("{}/rustfs/admin/v3/quota/{}", self.env.url, self.bucket_name); + let response = awscurl_get(&url, &self.env.access_key, &self.env.secret_key).await?; + + if response.contains("error") { + Err(format!("Failed to get quota: {}", response).into()) + } else { + let quota_info: serde_json::Value = serde_json::from_str(&response)?; + Ok(quota_info.get("quota").and_then(|v| v.as_u64())) + } + } + + pub async fn clear_bucket_quota(&self) -> Result<(), Box> { + let url = format!("{}/rustfs/admin/v3/quota/{}", self.env.url, self.bucket_name); + let response = awscurl_delete(&url, &self.env.access_key, &self.env.secret_key).await?; + + if response.contains("error") { + Err(format!("Failed to clear quota: {}", response).into()) + } else { + Ok(()) + } + } + + pub async fn get_bucket_quota_stats(&self) -> Result> { + let url = format!("{}/rustfs/admin/v3/quota-stats/{}", self.env.url, self.bucket_name); + let response = awscurl_get(&url, &self.env.access_key, &self.env.secret_key).await?; + + if response.contains("error") { + Err(format!("Failed to get quota stats: {}", response).into()) + } else { + Ok(serde_json::from_str(&response)?) + } + } + + pub async fn check_bucket_quota( + &self, + operation_type: &str, + operation_size: u64, + ) -> Result> { + let url = format!("{}/rustfs/admin/v3/quota-check/{}", self.env.url, self.bucket_name); + let check_request = serde_json::json!({ + "operation_type": operation_type, + "operation_size": operation_size + }); + + let response = awscurl_post(&url, &check_request.to_string(), &self.env.access_key, &self.env.secret_key).await?; + + if response.contains("error") { + Err(format!("Failed to check quota: {}", response).into()) + } else { + Ok(serde_json::from_str(&response)?) + } + } + + pub async fn upload_object(&self, key: &str, size_bytes: usize) -> Result<(), Box> { + let data = vec![0u8; size_bytes]; + self.client + .put_object() + .bucket(&self.bucket_name) + .key(key) + .body(aws_sdk_s3::primitives::ByteStream::from(data)) + .send() + .await?; + Ok(()) + } + + pub async fn object_exists(&self, key: &str) -> Result> { + match self.client.head_object().bucket(&self.bucket_name).key(key).send().await { + Ok(_) => Ok(true), + Err(e) => { + // Check for any 404-related errors and return false instead of propagating + let error_str = e.to_string(); + if error_str.contains("404") || error_str.contains("Not Found") || error_str.contains("NotFound") { + Ok(false) + } else { + // Also check the error code directly + if let Some(service_err) = e.as_service_error() + && service_err.is_not_found() + { + return Ok(false); + } + Err(e.into()) + } + } + } + } + + pub async fn get_bucket_usage(&self) -> Result> { + let stats = self.get_bucket_quota_stats().await?; + Ok(stats.get("current_usage").and_then(|v| v.as_u64()).unwrap_or(0)) + } + + pub async fn set_bucket_quota_for( + &self, + bucket: &str, + quota_bytes: u64, + ) -> Result<(), Box> { + let url = format!("{}/rustfs/admin/v3/quota/{}", self.env.url, bucket); + let quota_config = serde_json::json!({ + "quota": quota_bytes, + "quota_type": "HARD" + }); + + let response = awscurl_put(&url, "a_config.to_string(), &self.env.access_key, &self.env.secret_key).await?; + if response.contains("error") { + Err(format!("Failed to set quota: {}", response).into()) + } else { + Ok(()) + } + } + + /// Get bucket quota statistics for specific bucket + pub async fn get_bucket_quota_stats_for( + &self, + bucket: &str, + ) -> Result> { + debug!("Getting quota stats for bucket: {}", bucket); + + let url = format!("{}/rustfs/admin/v3/quota-stats/{}", self.env.url, bucket); + let response = awscurl_get(&url, &self.env.access_key, &self.env.secret_key).await?; + + if response.contains("error") { + Err(format!("Failed to get quota stats: {}", response).into()) + } else { + let stats: serde_json::Value = serde_json::from_str(&response)?; + Ok(stats) + } + } + + /// Upload an object to specific bucket + pub async fn upload_object_to_bucket( + &self, + bucket: &str, + key: &str, + size_bytes: usize, + ) -> Result<(), Box> { + debug!("Uploading object {} with size {} bytes to bucket {}", key, size_bytes, bucket); + + let data = vec![0u8; size_bytes]; + + self.client + .put_object() + .bucket(bucket) + .key(key) + .body(aws_sdk_s3::primitives::ByteStream::from(data)) + .send() + .await?; + + info!("Successfully uploaded object: {} ({} bytes) to bucket: {}", key, size_bytes, bucket); + Ok(()) + } +} + +#[cfg(test)] +mod integration_tests { + use super::*; + + #[tokio::test] + #[serial] + async fn test_quota_basic_operations() -> Result<(), Box> { + init_logging(); + let env = QuotaTestEnv::new().await?; + + // Create test bucket + env.create_bucket().await?; + + // Set quota of 1MB + env.set_bucket_quota(1024 * 1024).await?; + + // Verify quota is set + let quota = env.get_bucket_quota().await?; + assert_eq!(quota, Some(1024 * 1024)); + + // Upload a 512KB object (should succeed) + env.upload_object("test1.txt", 512 * 1024).await?; + assert!(env.object_exists("test1.txt").await?); + + // Upload another 512KB object (should succeed, total 1MB) + env.upload_object("test2.txt", 512 * 1024).await?; + assert!(env.object_exists("test2.txt").await?); + + // Try to upload 1KB more (should fail due to quota) + let upload_result = env.upload_object("test3.txt", 1024).await; + assert!(upload_result.is_err()); + assert!(!env.object_exists("test3.txt").await?); + + // Clean up + env.clear_bucket_quota().await?; + env.cleanup_bucket().await?; + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn test_quota_update_and_clear() -> Result<(), Box> { + init_logging(); + let env = QuotaTestEnv::new().await?; + + env.create_bucket().await?; + + // Set initial quota + env.set_bucket_quota(512 * 1024).await?; + assert_eq!(env.get_bucket_quota().await?, Some(512 * 1024)); + + // Update quota to larger size + env.set_bucket_quota(2 * 1024 * 1024).await?; + assert_eq!(env.get_bucket_quota().await?, Some(2 * 1024 * 1024)); + + // Upload 1MB object (should succeed with new quota) + env.upload_object("large_file.txt", 1024 * 1024).await?; + assert!(env.object_exists("large_file.txt").await?); + + // Clear quota + env.clear_bucket_quota().await?; + assert_eq!(env.get_bucket_quota().await?, None); + + // Upload another large object (should succeed with no quota) + env.upload_object("unlimited_file.txt", 5 * 1024 * 1024).await?; + assert!(env.object_exists("unlimited_file.txt").await?); + + env.cleanup_bucket().await?; + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn test_quota_delete_operations() -> Result<(), Box> { + init_logging(); + let env = QuotaTestEnv::new().await?; + + env.create_bucket().await?; + + // Set quota of 1MB + env.set_bucket_quota(1024 * 1024).await?; + + // Fill up to quota limit + env.upload_object("file1.txt", 512 * 1024).await?; + env.upload_object("file2.txt", 512 * 1024).await?; + + // Delete one file + env.client + .delete_object() + .bucket(&env.bucket_name) + .key("file1.txt") + .send() + .await?; + + assert!(!env.object_exists("file1.txt").await?); + + // Now we should be able to upload again (quota freed up) + env.upload_object("file3.txt", 256 * 1024).await?; + assert!(env.object_exists("file3.txt").await?); + + env.cleanup_bucket().await?; + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn test_quota_usage_tracking() -> Result<(), Box> { + init_logging(); + let env = QuotaTestEnv::new().await?; + + env.create_bucket().await?; + + // Set quota + env.set_bucket_quota(2 * 1024 * 1024).await?; + + // Upload some files + env.upload_object("file1.txt", 512 * 1024).await?; + env.upload_object("file2.txt", 256 * 1024).await?; + + // Check usage + let usage = env.get_bucket_usage().await?; + assert_eq!(usage, (512 + 256) * 1024); + + // Delete a file + env.client + .delete_object() + .bucket(&env.bucket_name) + .key("file1.txt") + .send() + .await?; + + // Check updated usage + let updated_usage = env.get_bucket_usage().await?; + assert_eq!(updated_usage, 256 * 1024); + + env.cleanup_bucket().await?; + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn test_quota_statistics() -> Result<(), Box> { + init_logging(); + let env = QuotaTestEnv::new().await?; + + env.create_bucket().await?; + + // Set quota of 2MB + env.set_bucket_quota(2 * 1024 * 1024).await?; + + // Upload files to use 1.5MB + env.upload_object("file1.txt", 1024 * 1024).await?; + env.upload_object("file2.txt", 512 * 1024).await?; + + // Get detailed quota statistics + let stats = env.get_bucket_quota_stats().await?; + + assert_eq!(stats.get("bucket").unwrap().as_str().unwrap(), env.bucket_name); + assert_eq!(stats.get("quota_limit").unwrap().as_u64().unwrap(), 2 * 1024 * 1024); + assert_eq!(stats.get("current_usage").unwrap().as_u64().unwrap(), (1024 + 512) * 1024); + assert_eq!(stats.get("remaining_quota").unwrap().as_u64().unwrap(), 512 * 1024); + + let usage_percentage = stats.get("usage_percentage").unwrap().as_f64().unwrap(); + assert!((usage_percentage - 75.0).abs() < 0.1); + + env.cleanup_bucket().await?; + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn test_quota_check_api() -> Result<(), Box> { + init_logging(); + let env = QuotaTestEnv::new().await?; + + env.create_bucket().await?; + + // Set quota of 1MB + env.set_bucket_quota(1024 * 1024).await?; + + // Upload 512KB file + env.upload_object("existing_file.txt", 512 * 1024).await?; + + // Check if we can upload another 512KB (should succeed, exactly fill quota) + let check_result = env.check_bucket_quota("PUT", 512 * 1024).await?; + assert!(check_result.get("allowed").unwrap().as_bool().unwrap()); + assert_eq!(check_result.get("remaining_quota").unwrap().as_u64().unwrap(), 0); + + // Note: we haven't actually uploaded the second file yet, so current_usage is still 512KB + // Check if we can upload 1KB (should succeed - we haven't used the full quota yet) + let check_result = env.check_bucket_quota("PUT", 1024).await?; + assert!(check_result.get("allowed").unwrap().as_bool().unwrap()); + assert_eq!(check_result.get("remaining_quota").unwrap().as_u64().unwrap(), 512 * 1024 - 1024); + + // Check if we can upload 600KB (should fail - would exceed quota) + let check_result = env.check_bucket_quota("PUT", 600 * 1024).await?; + assert!(!check_result.get("allowed").unwrap().as_bool().unwrap()); + + // Check delete operation (should always be allowed) + let check_result = env.check_bucket_quota("DELETE", 512 * 1024).await?; + assert!(check_result.get("allowed").unwrap().as_bool().unwrap()); + + env.cleanup_bucket().await?; + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn test_quota_multiple_buckets() -> Result<(), Box> { + init_logging(); + let env = QuotaTestEnv::new().await?; + + // Create two buckets in the same environment + let bucket1 = format!("quota-test-{}-1", uuid::Uuid::new_v4()); + let bucket2 = format!("quota-test-{}-2", uuid::Uuid::new_v4()); + + env.env.create_test_bucket(&bucket1).await?; + env.env.create_test_bucket(&bucket2).await?; + + // Set different quotas for each bucket + env.set_bucket_quota_for(&bucket1, 1024 * 1024).await?; // 1MB + env.set_bucket_quota_for(&bucket2, 2 * 1024 * 1024).await?; // 2MB + + // Fill first bucket to quota + env.upload_object_to_bucket(&bucket1, "big_file.txt", 1024 * 1024).await?; + + // Should still be able to upload to second bucket + env.upload_object_to_bucket(&bucket2, "big_file.txt", 1024 * 1024).await?; + env.upload_object_to_bucket(&bucket2, "another_file.txt", 512 * 1024).await?; + + // Verify statistics are independent + let stats1 = env.get_bucket_quota_stats_for(&bucket1).await?; + let stats2 = env.get_bucket_quota_stats_for(&bucket2).await?; + + assert_eq!(stats1.get("current_usage").unwrap().as_u64().unwrap(), 1024 * 1024); + assert_eq!(stats2.get("current_usage").unwrap().as_u64().unwrap(), (1024 + 512) * 1024); + + // Clean up + env.env.delete_test_bucket(&bucket1).await?; + env.env.delete_test_bucket(&bucket2).await?; + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn test_quota_error_handling() -> Result<(), Box> { + init_logging(); + let env = QuotaTestEnv::new().await?; + + env.create_bucket().await?; + + // Test invalid quota type + let url = format!("{}/rustfs/admin/v3/quota/{}", env.env.url, env.bucket_name); + + let invalid_config = serde_json::json!({ + "quota": 1024, + "quota_type": "SOFT" // Invalid type + }); + + let response = awscurl_put(&url, &invalid_config.to_string(), &env.env.access_key, &env.env.secret_key).await; + assert!(response.is_err()); + let error_msg = response.unwrap_err().to_string(); + assert!(error_msg.contains("InvalidArgument")); + + // Test operations on non-existent bucket + let url = format!("{}/rustfs/admin/v3/quota/non-existent-bucket", env.env.url); + let response = awscurl_get(&url, &env.env.access_key, &env.env.secret_key).await; + assert!(response.is_err()); + let error_msg = response.unwrap_err().to_string(); + assert!(error_msg.contains("NoSuchBucket")); + + env.cleanup_bucket().await?; + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn test_quota_http_endpoints() -> Result<(), Box> { + init_logging(); + let env = QuotaTestEnv::new().await?; + + env.create_bucket().await?; + + // Test 1: GET quota for bucket without quota config + let url = format!("{}/rustfs/admin/v3/quota/{}", env.env.url, env.bucket_name); + let response = awscurl_get(&url, &env.env.access_key, &env.env.secret_key).await?; + assert!(response.contains("quota") && response.contains("null")); + + // Test 2: PUT quota - valid config + let quota_config = serde_json::json!({ + "quota": 1048576, + "quota_type": "HARD" + }); + let response = awscurl_put(&url, "a_config.to_string(), &env.env.access_key, &env.env.secret_key).await?; + assert!(response.contains("success") || !response.contains("error")); + + // Test 3: GET quota after setting + let response = awscurl_get(&url, &env.env.access_key, &env.env.secret_key).await?; + assert!(response.contains("1048576")); + + // Test 4: GET quota stats + let stats_url = format!("{}/rustfs/admin/v3/quota-stats/{}", env.env.url, env.bucket_name); + let response = awscurl_get(&stats_url, &env.env.access_key, &env.env.secret_key).await?; + assert!(response.contains("quota_limit") && response.contains("current_usage")); + + // Test 5: POST quota check + let check_url = format!("{}/rustfs/admin/v3/quota-check/{}", env.env.url, env.bucket_name); + let check_request = serde_json::json!({ + "operation_type": "PUT", + "operation_size": 1024 + }); + let response = awscurl_post(&check_url, &check_request.to_string(), &env.env.access_key, &env.env.secret_key).await?; + assert!(response.contains("allowed")); + + // Test 6: DELETE quota + let response = awscurl_delete(&url, &env.env.access_key, &env.env.secret_key).await?; + assert!(!response.contains("error")); + + // Test 7: GET quota after deletion + let response = awscurl_get(&url, &env.env.access_key, &env.env.secret_key).await?; + assert!(response.contains("quota") && response.contains("null")); + + // Test 8: Invalid quota type + let invalid_config = serde_json::json!({ + "quota": 1024, + "quota_type": "SOFT" + }); + let response = awscurl_put(&url, &invalid_config.to_string(), &env.env.access_key, &env.env.secret_key).await; + assert!(response.is_err()); + let error_msg = response.unwrap_err().to_string(); + assert!(error_msg.contains("InvalidArgument")); + + env.cleanup_bucket().await?; + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn test_quota_copy_operations() -> Result<(), Box> { + init_logging(); + let env = QuotaTestEnv::new().await?; + + env.create_bucket().await?; + + // Set quota of 2MB + env.set_bucket_quota(2 * 1024 * 1024).await?; + + // Upload initial file + env.upload_object("original.txt", 1024 * 1024).await?; + + // Copy file - should succeed (1MB each, total 2MB) + env.client + .copy_object() + .bucket(&env.bucket_name) + .key("copy1.txt") + .copy_source(format!("{}/{}", env.bucket_name, "original.txt")) + .send() + .await?; + + assert!(env.object_exists("copy1.txt").await?); + + // Try to copy again - should fail (1.5MB each, total 3MB > 2MB quota) + let copy_result = env + .client + .copy_object() + .bucket(&env.bucket_name) + .key("copy2.txt") + .copy_source(format!("{}/{}", env.bucket_name, "original.txt")) + .send() + .await; + + assert!(copy_result.is_err()); + assert!(!env.object_exists("copy2.txt").await?); + + env.cleanup_bucket().await?; + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn test_quota_batch_delete() -> Result<(), Box> { + init_logging(); + let env = QuotaTestEnv::new().await?; + + env.create_bucket().await?; + + // Set quota of 2MB + env.set_bucket_quota(2 * 1024 * 1024).await?; + + // Upload files to fill quota + env.upload_object("file1.txt", 1024 * 1024).await?; + env.upload_object("file2.txt", 1024 * 1024).await?; + + // Verify quota is full + let upload_result = env.upload_object("file3.txt", 1024).await; + assert!(upload_result.is_err()); + + // Delete multiple objects using batch delete + let objects = vec![ + aws_sdk_s3::types::ObjectIdentifier::builder() + .key("file1.txt") + .build() + .unwrap(), + aws_sdk_s3::types::ObjectIdentifier::builder() + .key("file2.txt") + .build() + .unwrap(), + ]; + + let delete_result = env + .client + .delete_objects() + .bucket(&env.bucket_name) + .delete( + aws_sdk_s3::types::Delete::builder() + .set_objects(Some(objects)) + .quiet(true) + .build() + .unwrap(), + ) + .send() + .await?; + + assert_eq!(delete_result.deleted().len(), 2); + + // Now should be able to upload again (quota freed up) + env.upload_object("file3.txt", 256 * 1024).await?; + assert!(env.object_exists("file3.txt").await?); + + env.cleanup_bucket().await?; + + Ok(()) + } + + #[tokio::test] + #[serial] + async fn test_quota_multipart_upload() -> Result<(), Box> { + init_logging(); + let env = QuotaTestEnv::new().await?; + + env.create_bucket().await?; + + // Set quota of 10MB + env.set_bucket_quota(10 * 1024 * 1024).await?; + + let key = "multipart_test.txt"; + let part_size = 5 * 1024 * 1024; // 5MB minimum per part (S3 requirement) + + // Test 1: Multipart upload within quota (single 5MB part) + let create_result = env + .client + .create_multipart_upload() + .bucket(&env.bucket_name) + .key(key) + .send() + .await?; + + let upload_id = create_result.upload_id().unwrap(); + + // Upload single 5MB part (S3 allows single part with any size ≥ 5MB for the only part) + let part_data = vec![1u8; part_size]; + let part_result = env + .client + .upload_part() + .bucket(&env.bucket_name) + .key(key) + .upload_id(upload_id) + .part_number(1) + .body(aws_sdk_s3::primitives::ByteStream::from(part_data)) + .send() + .await?; + + let uploaded_parts = vec![ + aws_sdk_s3::types::CompletedPart::builder() + .part_number(1) + .e_tag(part_result.e_tag().unwrap()) + .build(), + ]; + + env.client + .complete_multipart_upload() + .bucket(&env.bucket_name) + .key(key) + .upload_id(upload_id) + .multipart_upload( + aws_sdk_s3::types::CompletedMultipartUpload::builder() + .set_parts(Some(uploaded_parts)) + .build(), + ) + .send() + .await?; + + assert!(env.object_exists(key).await?); + + // Test 2: Multipart upload exceeds quota (should fail) + // Upload 6MB filler (total now: 5MB + 6MB = 11MB > 10MB quota) + let upload_filler = env.upload_object("filler.txt", 6 * 1024 * 1024).await; + // This should fail due to quota + assert!(upload_filler.is_err()); + + // Verify filler doesn't exist + assert!(!env.object_exists("filler.txt").await?); + + // Now try a multipart upload that exceeds quota + // Current usage: 5MB (from Test 1), quota: 10MB + // Trying to upload 6MB via multipart → should fail + + let create_result2 = env + .client + .create_multipart_upload() + .bucket(&env.bucket_name) + .key("over_quota.txt") + .send() + .await?; + + let upload_id2 = create_result2.upload_id().unwrap(); + + let mut uploaded_parts2 = vec![]; + for part_num in 1..=2 { + let part_data = vec![part_num as u8; part_size]; + let part_result = env + .client + .upload_part() + .bucket(&env.bucket_name) + .key("over_quota.txt") + .upload_id(upload_id2) + .part_number(part_num) + .body(aws_sdk_s3::primitives::ByteStream::from(part_data)) + .send() + .await?; + + uploaded_parts2.push( + aws_sdk_s3::types::CompletedPart::builder() + .part_number(part_num) + .e_tag(part_result.e_tag().unwrap()) + .build(), + ); + } + + let complete_result = env + .client + .complete_multipart_upload() + .bucket(&env.bucket_name) + .key("over_quota.txt") + .upload_id(upload_id2) + .multipart_upload( + aws_sdk_s3::types::CompletedMultipartUpload::builder() + .set_parts(Some(uploaded_parts2)) + .build(), + ) + .send() + .await; + + assert!(complete_result.is_err()); + assert!(!env.object_exists("over_quota.txt").await?); + + env.cleanup_bucket().await?; + + Ok(()) + } +} diff --git a/crates/ecstore/src/bucket/metadata.rs b/crates/ecstore/src/bucket/metadata.rs index 87884300..5c75571e 100644 --- a/crates/ecstore/src/bucket/metadata.rs +++ b/crates/ecstore/src/bucket/metadata.rs @@ -355,7 +355,7 @@ impl BucketMetadata { self.tagging_config = Some(deserialize::(&self.tagging_config_xml)?); } if !self.quota_config_json.is_empty() { - self.quota_config = Some(BucketQuota::unmarshal(&self.quota_config_json)?); + self.quota_config = Some(serde_json::from_slice(&self.quota_config_json)?); } if !self.replication_config_xml.is_empty() { self.replication_config = Some(deserialize::(&self.replication_config_xml)?); @@ -487,7 +487,8 @@ mod test { bm.tagging_config_updated_at = OffsetDateTime::now_utc(); // Add quota configuration - let quota_json = r#"{"quota":1073741824,"quotaType":"hard"}"#; // 1GB quota + let quota_json = + r#"{"quota":1073741824,"quota_type":"Hard","created_at":"2024-01-01T00:00:00Z","updated_at":"2024-01-01T00:00:00Z"}"#; // 1GB quota bm.quota_config_json = quota_json.as_bytes().to_vec(); bm.quota_config_updated_at = OffsetDateTime::now_utc(); diff --git a/crates/ecstore/src/bucket/quota/checker.rs b/crates/ecstore/src/bucket/quota/checker.rs new file mode 100644 index 00000000..2a7f2858 --- /dev/null +++ b/crates/ecstore/src/bucket/quota/checker.rs @@ -0,0 +1,195 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::{BucketQuota, QuotaCheckResult, QuotaError, QuotaOperation}; +use crate::bucket::metadata_sys::{BucketMetadataSys, update}; +use crate::data_usage::get_bucket_usage_memory; +use rustfs_common::metrics::Metric; +use rustfs_config::QUOTA_CONFIG_FILE; +use std::sync::Arc; +use std::time::Instant; +use tokio::sync::RwLock; +use tracing::{debug, warn}; + +pub struct QuotaChecker { + metadata_sys: Arc>, +} + +impl QuotaChecker { + pub fn new(metadata_sys: Arc>) -> Self { + Self { metadata_sys } + } + + pub async fn check_quota( + &self, + bucket: &str, + operation: QuotaOperation, + operation_size: u64, + ) -> Result { + let start_time = Instant::now(); + let quota_config = self.get_quota_config(bucket).await?; + + // If no quota limit is set, allow operation + let quota_limit = match quota_config.quota { + None => { + let current_usage = self.get_real_time_usage(bucket).await?; + return Ok(QuotaCheckResult { + allowed: true, + current_usage, + quota_limit: None, + operation_size, + remaining: None, + }); + } + Some(q) => q, + }; + + let current_usage = self.get_real_time_usage(bucket).await?; + + let expected_usage = match operation { + QuotaOperation::PutObject | QuotaOperation::CopyObject => current_usage + operation_size, + QuotaOperation::DeleteObject => current_usage.saturating_sub(operation_size), + }; + + let allowed = match operation { + QuotaOperation::PutObject | QuotaOperation::CopyObject => { + quota_config.check_operation_allowed(current_usage, operation_size) + } + QuotaOperation::DeleteObject => true, + }; + + let remaining = if quota_limit >= expected_usage { + Some(quota_limit - expected_usage) + } else { + Some(0) + }; + + if !allowed { + warn!( + "Quota exceeded for bucket: {}, current: {}, limit: {}, attempted: {}", + bucket, current_usage, quota_limit, operation_size + ); + } + + let result = QuotaCheckResult { + allowed, + current_usage, + quota_limit: Some(quota_limit), + operation_size, + remaining, + }; + + let duration = start_time.elapsed(); + rustfs_common::metrics::Metrics::inc_time(Metric::QuotaCheck, duration).await; + if !allowed { + rustfs_common::metrics::Metrics::inc_time(Metric::QuotaViolation, duration).await; + } + + Ok(result) + } + + pub async fn get_quota_config(&self, bucket: &str) -> Result { + let meta = self + .metadata_sys + .read() + .await + .get(bucket) + .await + .map_err(QuotaError::StorageError)?; + + if meta.quota_config_json.is_empty() { + debug!("No quota config found for bucket: {}, using default", bucket); + return Ok(BucketQuota::new(None)); + } + + let quota: BucketQuota = serde_json::from_slice(&meta.quota_config_json).map_err(|e| QuotaError::InvalidConfig { + reason: format!("Failed to parse quota config: {}", e), + })?; + + Ok(quota) + } + + pub async fn set_quota_config(&mut self, bucket: &str, quota: BucketQuota) -> Result<(), QuotaError> { + let json_data = serde_json::to_vec("a).map_err(|e| QuotaError::InvalidConfig { + reason: format!("Failed to serialize quota config: {}", e), + })?; + let start_time = Instant::now(); + + update(bucket, QUOTA_CONFIG_FILE, json_data) + .await + .map_err(QuotaError::StorageError)?; + + rustfs_common::metrics::Metrics::inc_time(Metric::QuotaSync, start_time.elapsed()).await; + Ok(()) + } + + pub async fn get_quota_stats(&self, bucket: &str) -> Result<(BucketQuota, Option), QuotaError> { + // If bucket doesn't exist, return ConfigNotFound error + if !self.bucket_exists(bucket).await { + return Err(QuotaError::ConfigNotFound { + bucket: bucket.to_string(), + }); + } + + let quota = self.get_quota_config(bucket).await?; + let current_usage = self.get_real_time_usage(bucket).await.unwrap_or(0); + + Ok((quota, Some(current_usage))) + } + + pub async fn bucket_exists(&self, bucket: &str) -> bool { + self.metadata_sys.read().await.get(bucket).await.is_ok() + } + + pub async fn get_real_time_usage(&self, bucket: &str) -> Result { + Ok(get_bucket_usage_memory(bucket).await.unwrap_or(0)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_quota_check_no_limit() { + let result = QuotaCheckResult { + allowed: true, + current_usage: 0, + quota_limit: None, + operation_size: 1024, + remaining: None, + }; + + assert!(result.allowed); + assert_eq!(result.quota_limit, None); + } + + #[tokio::test] + async fn test_quota_check_within_limit() { + let quota = BucketQuota::new(Some(2048)); // 2KB + + // Current usage 512, trying to add 1024 + let allowed = quota.check_operation_allowed(512, 1024); + assert!(allowed); + } + + #[tokio::test] + async fn test_quota_check_exceeds_limit() { + let quota = BucketQuota::new(Some(1024)); // 1KB + + // Current usage 512, trying to add 1024 + let allowed = quota.check_operation_allowed(512, 1024); + assert!(!allowed); + } +} diff --git a/crates/ecstore/src/bucket/quota/mod.rs b/crates/ecstore/src/bucket/quota/mod.rs index b9e778fd..3bf00a05 100644 --- a/crates/ecstore/src/bucket/quota/mod.rs +++ b/crates/ecstore/src/bucket/quota/mod.rs @@ -12,36 +12,37 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod checker; + use crate::error::Result; use rmp_serde::Serializer as rmpSerializer; +use rustfs_config::{ + QUOTA_API_PATH, QUOTA_EXCEEDED_ERROR_CODE, QUOTA_INTERNAL_ERROR_CODE, QUOTA_INVALID_CONFIG_ERROR_CODE, + QUOTA_NOT_FOUND_ERROR_CODE, +}; use serde::{Deserialize, Serialize}; +use thiserror::Error; +use time::OffsetDateTime; -// Define the QuotaType enum -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub enum QuotaType { + /// Hard quota: reject immediately when exceeded + #[default] Hard, } -// Define the BucketQuota structure -#[derive(Debug, Deserialize, Serialize, Default, Clone)] +#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq)] pub struct BucketQuota { - quota: Option, // Use Option to represent optional fields - - size: u64, - - rate: u64, - - requests: u64, - - quota_type: Option, + pub quota: Option, + pub quota_type: QuotaType, + /// Timestamp when this quota configuration was set (for audit purposes) + pub created_at: Option, } impl BucketQuota { pub fn marshal_msg(&self) -> Result> { let mut buf = Vec::new(); - self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?; - Ok(buf) } @@ -49,4 +50,107 @@ impl BucketQuota { let t: BucketQuota = rmp_serde::from_slice(buf)?; Ok(t) } + + pub fn new(quota: Option) -> Self { + let now = OffsetDateTime::now_utc(); + Self { + quota, + quota_type: QuotaType::Hard, + created_at: Some(now), + } + } + + pub fn get_quota_limit(&self) -> Option { + self.quota + } + + pub fn check_operation_allowed(&self, current_usage: u64, operation_size: u64) -> bool { + if let Some(quota_limit) = self.quota { + current_usage.saturating_add(operation_size) <= quota_limit + } else { + true // No quota limit + } + } + + pub fn get_remaining_quota(&self, current_usage: u64) -> Option { + self.quota.map(|limit| limit.saturating_sub(current_usage)) + } +} + +#[derive(Debug)] +pub struct QuotaCheckResult { + pub allowed: bool, + pub current_usage: u64, + /// quota_limit: None means unlimited + pub quota_limit: Option, + pub operation_size: u64, + pub remaining: Option, +} + +#[derive(Debug)] +pub enum QuotaOperation { + PutObject, + CopyObject, + DeleteObject, +} + +#[derive(Debug, Error)] +pub enum QuotaError { + #[error("Bucket quota exceeded: current={current}, limit={limit}, operation={operation}")] + QuotaExceeded { current: u64, limit: u64, operation: u64 }, + #[error("Quota configuration not found for bucket: {bucket}")] + ConfigNotFound { bucket: String }, + #[error("Invalid quota configuration: {reason}")] + InvalidConfig { reason: String }, + #[error("Storage error: {0}")] + StorageError(#[from] crate::error::StorageError), +} + +#[derive(Debug, Serialize)] +pub struct QuotaErrorResponse { + #[serde(rename = "Code")] + pub code: String, + #[serde(rename = "Message")] + pub message: String, + #[serde(rename = "Resource")] + pub resource: String, + #[serde(rename = "RequestId")] + pub request_id: String, + #[serde(rename = "HostId")] + pub host_id: String, +} + +impl QuotaErrorResponse { + pub fn new(quota_error: &QuotaError, request_id: &str, host_id: &str) -> Self { + match quota_error { + QuotaError::QuotaExceeded { .. } => Self { + code: QUOTA_EXCEEDED_ERROR_CODE.to_string(), + message: quota_error.to_string(), + resource: QUOTA_API_PATH.to_string(), + request_id: request_id.to_string(), + host_id: host_id.to_string(), + }, + QuotaError::ConfigNotFound { .. } => Self { + code: QUOTA_NOT_FOUND_ERROR_CODE.to_string(), + message: quota_error.to_string(), + resource: QUOTA_API_PATH.to_string(), + request_id: request_id.to_string(), + host_id: host_id.to_string(), + }, + QuotaError::InvalidConfig { .. } => Self { + code: QUOTA_INVALID_CONFIG_ERROR_CODE.to_string(), + message: quota_error.to_string(), + resource: QUOTA_API_PATH.to_string(), + request_id: request_id.to_string(), + host_id: host_id.to_string(), + }, + QuotaError::StorageError(_) => Self { + code: QUOTA_INTERNAL_ERROR_CODE.to_string(), + message: quota_error.to_string(), + resource: QUOTA_API_PATH.to_string(), + request_id: request_id.to_string(), + host_id: host_id.to_string(), + }, + } + } } diff --git a/crates/ecstore/src/data_usage.rs b/crates/ecstore/src/data_usage.rs index df3ffede..bd434855 100644 --- a/crates/ecstore/src/data_usage.rs +++ b/crates/ecstore/src/data_usage.rs @@ -15,8 +15,10 @@ use std::{ collections::{HashMap, hash_map::Entry}, sync::Arc, - time::SystemTime, + time::{Duration, SystemTime}, }; +use tokio::sync::RwLock; +use tracing::debug; pub mod local_snapshot; pub use local_snapshot::{ @@ -32,6 +34,7 @@ use rustfs_common::data_usage::{ BucketTargetUsageInfo, BucketUsageInfo, DataUsageCache, DataUsageEntry, DataUsageInfo, DiskUsageStatus, SizeSummary, }; use rustfs_utils::path::SLASH_SEPARATOR_STR; +use std::sync::OnceLock; use tokio::fs; use tracing::{error, info, warn}; @@ -42,6 +45,21 @@ pub const DATA_USAGE_ROOT: &str = SLASH_SEPARATOR_STR; const DATA_USAGE_OBJ_NAME: &str = ".usage.json"; const DATA_USAGE_BLOOM_NAME: &str = ".bloomcycle.bin"; pub const DATA_USAGE_CACHE_NAME: &str = ".usage-cache.bin"; +const DATA_USAGE_CACHE_TTL_SECS: u64 = 30; + +type UsageMemoryCache = Arc>>; +type CacheUpdating = Arc>; + +static USAGE_MEMORY_CACHE: OnceLock = OnceLock::new(); +static USAGE_CACHE_UPDATING: OnceLock = OnceLock::new(); + +fn memory_cache() -> &'static UsageMemoryCache { + USAGE_MEMORY_CACHE.get_or_init(|| Arc::new(RwLock::new(HashMap::new()))) +} + +fn cache_updating() -> &'static CacheUpdating { + USAGE_CACHE_UPDATING.get_or_init(|| Arc::new(RwLock::new(false))) +} // Data usage storage paths lazy_static::lazy_static! { @@ -364,8 +382,120 @@ pub async fn compute_bucket_usage(store: Arc, bucket_name: &str) -> Res Ok(usage) } +/// Fast in-memory increment for immediate quota consistency +pub async fn increment_bucket_usage_memory(bucket: &str, size_increment: u64) { + let mut cache = memory_cache().write().await; + let current = cache.entry(bucket.to_string()).or_insert_with(|| (0, SystemTime::now())); + current.0 += size_increment; + current.1 = SystemTime::now(); +} + +/// Fast in-memory decrement for immediate quota consistency +pub async fn decrement_bucket_usage_memory(bucket: &str, size_decrement: u64) { + let mut cache = memory_cache().write().await; + if let Some(current) = cache.get_mut(bucket) { + current.0 = current.0.saturating_sub(size_decrement); + current.1 = SystemTime::now(); + } +} + +/// Get bucket usage from in-memory cache +pub async fn get_bucket_usage_memory(bucket: &str) -> Option { + update_usage_cache_if_needed().await; + + let cache = memory_cache().read().await; + cache.get(bucket).map(|(usage, _)| *usage) +} + +async fn update_usage_cache_if_needed() { + let ttl = Duration::from_secs(DATA_USAGE_CACHE_TTL_SECS); + let double_ttl = ttl * 2; + let now = SystemTime::now(); + + let cache = memory_cache().read().await; + let earliest_timestamp = cache.values().map(|(_, ts)| *ts).min(); + drop(cache); + + let age = match earliest_timestamp { + Some(ts) => now.duration_since(ts).unwrap_or_default(), + None => double_ttl, + }; + + if age < ttl { + return; + } + + let mut updating = cache_updating().write().await; + if age < double_ttl { + if *updating { + return; + } + *updating = true; + drop(updating); + + let cache_clone = (*memory_cache()).clone(); + let updating_clone = (*cache_updating()).clone(); + tokio::spawn(async move { + if let Some(store) = crate::global::GLOBAL_OBJECT_API.get() + && let Ok(data_usage_info) = load_data_usage_from_backend(store.clone()).await + { + let mut cache = cache_clone.write().await; + for (bucket_name, bucket_usage) in data_usage_info.buckets_usage.iter() { + cache.insert(bucket_name.clone(), (bucket_usage.size, SystemTime::now())); + } + } + let mut updating = updating_clone.write().await; + *updating = false; + }); + return; + } + + for retry in 0..10 { + if !*updating { + break; + } + drop(updating); + let delay = Duration::from_millis(1 << retry); + tokio::time::sleep(delay).await; + updating = cache_updating().write().await; + } + + *updating = true; + drop(updating); + + if let Some(store) = crate::global::GLOBAL_OBJECT_API.get() + && let Ok(data_usage_info) = load_data_usage_from_backend(store.clone()).await + { + let mut cache = memory_cache().write().await; + for (bucket_name, bucket_usage) in data_usage_info.buckets_usage.iter() { + cache.insert(bucket_name.clone(), (bucket_usage.size, SystemTime::now())); + } + } + + let mut updating = cache_updating().write().await; + *updating = false; +} + +/// Sync memory cache with backend data (called by scanner) +pub async fn sync_memory_cache_with_backend() -> Result<(), Error> { + if let Some(store) = crate::global::GLOBAL_OBJECT_API.get() { + match load_data_usage_from_backend(store.clone()).await { + Ok(data_usage_info) => { + let mut cache = memory_cache().write().await; + for (bucket, bucket_usage) in data_usage_info.buckets_usage.iter() { + cache.insert(bucket.clone(), (bucket_usage.size, SystemTime::now())); + } + } + Err(e) => { + debug!("Failed to sync memory cache with backend: {}", e); + } + } + } + Ok(()) +} + /// Build basic data usage info with real object counts -async fn build_basic_data_usage_info(store: Arc) -> Result { +pub async fn build_basic_data_usage_info(store: Arc) -> Result { let mut data_usage_info = DataUsageInfo::default(); // Get bucket list diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 678d2dab..52d33c3f 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -4614,7 +4614,9 @@ impl StorageAPI for SetDisks { .await .map_err(|e| to_object_err(e, vec![bucket, object]))?; - Ok(ObjectInfo::from_file_info(&dfi, bucket, object, opts.versioned || opts.version_suspended)) + let mut obj_info = ObjectInfo::from_file_info(&dfi, bucket, object, opts.versioned || opts.version_suspended); + obj_info.size = goi.size; + Ok(obj_info) } #[tracing::instrument(skip(self))] diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 3f1a5614..e55c55ed 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -83,6 +83,7 @@ pub mod kms_keys; pub mod policies; pub mod pools; pub mod profile; +pub mod quota; pub mod rebalance; pub mod service_account; pub mod sts; diff --git a/rustfs/src/admin/handlers/quota.rs b/rustfs/src/admin/handlers/quota.rs new file mode 100644 index 00000000..84944fa8 --- /dev/null +++ b/rustfs/src/admin/handlers/quota.rs @@ -0,0 +1,485 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Quota admin handlers for HTTP API + +use super::Operation; +use crate::admin::auth::validate_admin_request; +use crate::auth::{check_key_valid, get_session_token}; +use hyper::StatusCode; +use matchit::Params; +use rustfs_ecstore::bucket::quota::checker::QuotaChecker; +use rustfs_ecstore::bucket::quota::{BucketQuota, QuotaError, QuotaOperation}; +use rustfs_policy::policy::action::{Action, AdminAction}; +use s3s::{Body, S3Request, S3Response, S3Result, s3_error}; +use serde::{Deserialize, Serialize}; +use serde_json; +use tracing::{debug, info, warn}; + +#[derive(Debug, Deserialize)] +pub struct SetBucketQuotaRequest { + pub quota: Option, + #[serde(default = "default_quota_type")] + pub quota_type: String, +} + +fn default_quota_type() -> String { + rustfs_config::QUOTA_TYPE_HARD.to_string() +} + +#[derive(Debug, Serialize)] +pub struct BucketQuotaResponse { + pub bucket: String, + pub quota: Option, + pub size: u64, + /// Current usage size in bytes + pub quota_type: String, +} + +#[derive(Debug, Serialize)] +pub struct BucketQuotaStats { + pub bucket: String, + pub quota_limit: Option, + pub current_usage: u64, + pub remaining_quota: Option, + pub usage_percentage: Option, +} + +#[derive(Debug, Deserialize)] +pub struct CheckQuotaRequest { + pub operation_type: String, + pub operation_size: u64, +} + +#[derive(Debug, Serialize)] +pub struct CheckQuotaResponse { + pub bucket: String, + pub operation_type: String, + pub operation_size: u64, + pub allowed: bool, + pub current_usage: u64, + pub quota_limit: Option, + pub remaining_quota: Option, +} + +/// Quota management handlers +pub struct SetBucketQuotaHandler; +pub struct GetBucketQuotaHandler; +pub struct ClearBucketQuotaHandler; +pub struct GetBucketQuotaStatsHandler; +pub struct CheckBucketQuotaHandler; + +#[async_trait::async_trait] +impl Operation for SetBucketQuotaHandler { + #[tracing::instrument(skip_all)] + async fn call(&self, mut req: S3Request, params: Params<'_, '_>) -> S3Result> { + warn!("handle SetBucketQuota"); + + let Some(ref cred) = req.credentials else { + return Err(s3_error!(InvalidRequest, "authentication required")); + }; + + let (cred, owner) = + check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &cred.access_key).await?; + + validate_admin_request( + &req.headers, + &cred, + owner, + false, + vec![Action::AdminAction(AdminAction::SetBucketQuotaAdminAction)], + None, + ) + .await?; + + let bucket = params.get("bucket").unwrap_or("").to_string(); + if bucket.is_empty() { + return Err(s3_error!(InvalidRequest, "bucket name is required")); + } + + let body = req + .input + .store_all_limited(rustfs_config::MAX_ADMIN_REQUEST_BODY_SIZE) + .await + .map_err(|e| s3_error!(InvalidRequest, "failed to read request body: {}", e))?; + + let request: SetBucketQuotaRequest = if body.is_empty() { + SetBucketQuotaRequest { + quota: None, + quota_type: default_quota_type(), + } + } else { + serde_json::from_slice(&body).map_err(|e| s3_error!(InvalidRequest, "invalid JSON: {}", e))? + }; + + if request.quota_type.to_uppercase() != rustfs_config::QUOTA_TYPE_HARD { + return Err(s3_error!(InvalidArgument, "{}", rustfs_config::QUOTA_INVALID_TYPE_ERROR_MSG)); + } + + let quota = BucketQuota::new(request.quota); + + let metadata_sys_lock = rustfs_ecstore::bucket::metadata_sys::GLOBAL_BucketMetadataSys + .get() + .ok_or_else(|| s3_error!(InternalError, "{}", rustfs_config::QUOTA_METADATA_SYSTEM_ERROR_MSG))?; + let mut quota_checker = QuotaChecker::new(metadata_sys_lock.clone()); + + quota_checker + .set_quota_config(&bucket, quota.clone()) + .await + .map_err(|e| s3_error!(InternalError, "Failed to set quota: {}", e))?; + + // Get real-time usage from data usage system + let current_usage = if let Some(store) = rustfs_ecstore::global::GLOBAL_OBJECT_API.get() { + match rustfs_ecstore::data_usage::load_data_usage_from_backend(store.clone()).await { + Ok(data_usage_info) => data_usage_info + .buckets_usage + .get(&bucket) + .map(|bucket_usage| bucket_usage.size) + .unwrap_or(0), + Err(_) => 0, + } + } else { + 0 + }; + + let response = BucketQuotaResponse { + bucket, + quota: quota.quota, + size: current_usage, + quota_type: rustfs_config::QUOTA_TYPE_HARD.to_string(), + }; + + let json = + serde_json::to_string(&response).map_err(|e| s3_error!(InternalError, "Failed to serialize response: {}", e))?; + + Ok(S3Response::new((StatusCode::OK, Body::from(json)))) + } +} + +#[async_trait::async_trait] +impl Operation for GetBucketQuotaHandler { + #[tracing::instrument(skip_all)] + async fn call(&self, req: S3Request, params: Params<'_, '_>) -> S3Result> { + warn!("handle GetBucketQuota"); + + let Some(ref cred) = req.credentials else { + return Err(s3_error!(InvalidRequest, "authentication required")); + }; + + let (cred, owner) = + check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &cred.access_key).await?; + + validate_admin_request( + &req.headers, + &cred, + owner, + false, + vec![Action::AdminAction(AdminAction::GetBucketQuotaAdminAction)], + None, + ) + .await?; + + let bucket = params.get("bucket").unwrap_or("").to_string(); + if bucket.is_empty() { + return Err(s3_error!(InvalidRequest, "bucket name is required")); + } + + let metadata_sys_lock = rustfs_ecstore::bucket::metadata_sys::GLOBAL_BucketMetadataSys + .get() + .ok_or_else(|| s3_error!(InternalError, "Bucket metadata system not initialized"))?; + + let quota_checker = QuotaChecker::new(metadata_sys_lock.clone()); + + let (quota, current_usage) = quota_checker.get_quota_stats(&bucket).await.map_err(|e| match e { + QuotaError::ConfigNotFound { .. } => { + s3_error!(NoSuchBucket, "Bucket not found: {}", bucket) + } + _ => s3_error!(InternalError, "Failed to get quota: {}", e), + })?; + + let response = BucketQuotaResponse { + bucket, + quota: quota.quota, + size: current_usage.unwrap_or(0), + quota_type: rustfs_config::QUOTA_TYPE_HARD.to_string(), + }; + + let json = + serde_json::to_string(&response).map_err(|e| s3_error!(InternalError, "Failed to serialize response: {}", e))?; + + Ok(S3Response::new((StatusCode::OK, Body::from(json)))) + } +} + +#[async_trait::async_trait] +impl Operation for ClearBucketQuotaHandler { + #[tracing::instrument(skip_all)] + async fn call(&self, req: S3Request, params: Params<'_, '_>) -> S3Result> { + warn!("handle ClearBucketQuota"); + + let Some(ref cred) = req.credentials else { + return Err(s3_error!(InvalidRequest, "authentication required")); + }; + + let (cred, owner) = + check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &cred.access_key).await?; + + validate_admin_request( + &req.headers, + &cred, + owner, + false, + vec![Action::AdminAction(AdminAction::SetBucketQuotaAdminAction)], + None, + ) + .await?; + + let bucket = params.get("bucket").unwrap_or("").to_string(); + if bucket.is_empty() { + return Err(s3_error!(InvalidRequest, "bucket name is required")); + } + + info!("Clearing quota for bucket: {}", bucket); + + let metadata_sys_lock = rustfs_ecstore::bucket::metadata_sys::GLOBAL_BucketMetadataSys + .get() + .ok_or_else(|| s3_error!(InternalError, "Bucket metadata system not initialized"))?; + + let mut quota_checker = QuotaChecker::new(metadata_sys_lock.clone()); + + // Clear quota (set to None) + let quota = BucketQuota::new(None); + quota_checker + .set_quota_config(&bucket, quota.clone()) + .await + .map_err(|e| s3_error!(InternalError, "Failed to clear quota: {}", e))?; + + info!("Successfully cleared quota for bucket: {}", bucket); + + // Get real-time usage from data usage system + let current_usage = if let Some(store) = rustfs_ecstore::global::GLOBAL_OBJECT_API.get() { + match rustfs_ecstore::data_usage::load_data_usage_from_backend(store.clone()).await { + Ok(data_usage_info) => data_usage_info + .buckets_usage + .get(&bucket) + .map(|bucket_usage| bucket_usage.size) + .unwrap_or(0), + Err(_) => 0, + } + } else { + 0 + }; + + let response = BucketQuotaResponse { + bucket, + quota: None, + size: current_usage, + quota_type: rustfs_config::QUOTA_TYPE_HARD.to_string(), + }; + + let json = + serde_json::to_string(&response).map_err(|e| s3_error!(InternalError, "Failed to serialize response: {}", e))?; + + Ok(S3Response::new((StatusCode::OK, Body::from(json)))) + } +} + +#[async_trait::async_trait] +impl Operation for GetBucketQuotaStatsHandler { + #[tracing::instrument(skip_all)] + async fn call(&self, req: S3Request, params: Params<'_, '_>) -> S3Result> { + warn!("handle GetBucketQuotaStats"); + + let Some(ref cred) = req.credentials else { + return Err(s3_error!(InvalidRequest, "authentication required")); + }; + + let (cred, owner) = + check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &cred.access_key).await?; + + validate_admin_request( + &req.headers, + &cred, + owner, + false, + vec![Action::AdminAction(AdminAction::GetBucketQuotaAdminAction)], + None, + ) + .await?; + + let bucket = params.get("bucket").unwrap_or("").to_string(); + if bucket.is_empty() { + return Err(s3_error!(InvalidRequest, "bucket name is required")); + } + + let metadata_sys_lock = rustfs_ecstore::bucket::metadata_sys::GLOBAL_BucketMetadataSys + .get() + .ok_or_else(|| s3_error!(InternalError, "Bucket metadata system not initialized"))?; + + let quota_checker = QuotaChecker::new(metadata_sys_lock.clone()); + + let (quota, current_usage_opt) = quota_checker.get_quota_stats(&bucket).await.map_err(|e| match e { + QuotaError::ConfigNotFound { .. } => { + s3_error!(NoSuchBucket, "Bucket not found: {}", bucket) + } + _ => s3_error!(InternalError, "Failed to get quota stats: {}", e), + })?; + + let current_usage = current_usage_opt.unwrap_or(0); + let usage_percentage = quota.quota.and_then(|limit| { + if limit == 0 { + None + } else { + Some((current_usage as f64 / limit as f64) * 100.0) + } + }); + + let remaining_quota = quota.get_remaining_quota(current_usage); + + let response = BucketQuotaStats { + bucket, + quota_limit: quota.quota, + current_usage, + remaining_quota, + usage_percentage, + }; + + let json = + serde_json::to_string(&response).map_err(|e| s3_error!(InternalError, "Failed to serialize response: {}", e))?; + + Ok(S3Response::new((StatusCode::OK, Body::from(json)))) + } +} + +#[async_trait::async_trait] +impl Operation for CheckBucketQuotaHandler { + #[tracing::instrument(skip_all)] + async fn call(&self, mut req: S3Request, params: Params<'_, '_>) -> S3Result> { + warn!("handle CheckBucketQuota"); + + let Some(ref cred) = req.credentials else { + return Err(s3_error!(InvalidRequest, "authentication required")); + }; + + let (cred, owner) = + check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &cred.access_key).await?; + + validate_admin_request( + &req.headers, + &cred, + owner, + false, + vec![Action::AdminAction(AdminAction::GetBucketQuotaAdminAction)], + None, + ) + .await?; + + let bucket = params.get("bucket").unwrap_or("").to_string(); + if bucket.is_empty() { + return Err(s3_error!(InvalidRequest, "bucket name is required")); + } + + let body = req + .input + .store_all_limited(rustfs_config::MAX_ADMIN_REQUEST_BODY_SIZE) + .await + .map_err(|e| s3_error!(InvalidRequest, "failed to read request body: {}", e))?; + + let request: CheckQuotaRequest = if body.is_empty() { + return Err(s3_error!(InvalidRequest, "request body cannot be empty")); + } else { + serde_json::from_slice(&body).map_err(|e| s3_error!(InvalidRequest, "invalid JSON: {}", e))? + }; + + debug!( + "Checking quota for bucket: {}, operation: {}, size: {}", + bucket, request.operation_type, request.operation_size + ); + + let metadata_sys_lock = rustfs_ecstore::bucket::metadata_sys::GLOBAL_BucketMetadataSys + .get() + .ok_or_else(|| s3_error!(InternalError, "Bucket metadata system not initialized"))?; + + let quota_checker = QuotaChecker::new(metadata_sys_lock.clone()); + + let operation: QuotaOperation = match request.operation_type.to_uppercase().as_str() { + "PUT" | "PUTOBJECT" => QuotaOperation::PutObject, + "COPY" | "COPYOBJECT" => QuotaOperation::CopyObject, + "DELETE" | "DELETEOBJECT" => QuotaOperation::DeleteObject, + _ => QuotaOperation::PutObject, // Default to PUT operation + }; + + let result = quota_checker + .check_quota(&bucket, operation, request.operation_size) + .await + .map_err(|e| s3_error!(InternalError, "Failed to check quota: {}", e))?; + + let response = CheckQuotaResponse { + bucket, + operation_type: request.operation_type, + operation_size: request.operation_size, + allowed: result.allowed, + current_usage: result.current_usage, + quota_limit: result.quota_limit, + remaining_quota: result.remaining, + }; + + let json = + serde_json::to_string(&response).map_err(|e| s3_error!(InternalError, "Failed to serialize response: {}", e))?; + + Ok(S3Response::new((StatusCode::OK, Body::from(json)))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_quota_type() { + assert_eq!(default_quota_type(), "HARD"); + } + + #[test] + fn test_quota_operation_parsing() { + let parse_operation = |operation: &str| match operation.to_uppercase().as_str() { + "PUT" | "PUTOBJECT" => QuotaOperation::PutObject, + "COPY" | "COPYOBJECT" => QuotaOperation::CopyObject, + "DELETE" | "DELETEOBJECT" => QuotaOperation::DeleteObject, + _ => QuotaOperation::PutObject, + }; + + assert!(matches!(parse_operation("put"), QuotaOperation::PutObject)); + assert!(matches!(parse_operation("PUT"), QuotaOperation::PutObject)); + assert!(matches!(parse_operation("PutObject"), QuotaOperation::PutObject)); + assert!(matches!(parse_operation("copy"), QuotaOperation::CopyObject)); + assert!(matches!(parse_operation("DELETE"), QuotaOperation::DeleteObject)); + assert!(matches!(parse_operation("unknown"), QuotaOperation::PutObject)); + } + + #[tokio::test] + async fn test_quota_response_serialization() { + let response = BucketQuotaResponse { + bucket: "test-bucket".to_string(), + quota: Some(2147483648), + size: 1073741824, + quota_type: rustfs_config::QUOTA_TYPE_HARD.to_string(), + }; + + let json = serde_json::to_string(&response).unwrap(); + assert!(json.contains("test-bucket")); + assert!(json.contains("2147483648")); + assert!(json.contains("HARD")); + } +} diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index e554ace5..4f23261f 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -29,7 +29,7 @@ use handlers::{ event::{ListNotificationTargets, ListTargetsArns, NotificationTarget, RemoveNotificationTarget}, group, kms, kms_dynamic, kms_keys, policies, pools, profile::{TriggerProfileCPU, TriggerProfileMemory}, - rebalance, + quota, rebalance, service_account::{AddServiceAccount, DeleteServiceAccount, InfoServiceAccount, ListServiceAccount, UpdateServiceAccount}, sts, tier, user, }; @@ -202,6 +202,32 @@ pub fn make_admin_route(console_enabled: bool) -> std::io::Result AdminOperation(&tier::ClearTier {}), )?; + r.insert( + Method::PUT, + format!("{}{}", ADMIN_PREFIX, "/v3/quota/{bucket}").as_str(), + AdminOperation("a::SetBucketQuotaHandler {}), + )?; + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/quota/{bucket}").as_str(), + AdminOperation("a::GetBucketQuotaHandler {}), + )?; + r.insert( + Method::DELETE, + format!("{}{}", ADMIN_PREFIX, "/v3/quota/{bucket}").as_str(), + AdminOperation("a::ClearBucketQuotaHandler {}), + )?; + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/v3/quota-stats/{bucket}").as_str(), + AdminOperation("a::GetBucketQuotaStatsHandler {}), + )?; + r.insert( + Method::POST, + format!("{}{}", ADMIN_PREFIX, "/v3/quota-check/{bucket}").as_str(), + AdminOperation("a::CheckBucketQuotaHandler {}), + )?; + r.insert( Method::GET, format!("{}{}", ADMIN_PREFIX, "/export-bucket-metadata").as_str(), diff --git a/rustfs/src/error.rs b/rustfs/src/error.rs index 4ca5fd6c..dabf7d7a 100644 --- a/rustfs/src/error.rs +++ b/rustfs/src/error.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use rustfs_ecstore::bucket::quota::QuotaError; use rustfs_ecstore::error::StorageError; use s3s::{S3Error, S3ErrorCode}; @@ -284,6 +285,29 @@ impl From for ApiError { } } +impl From for ApiError { + fn from(err: QuotaError) -> Self { + let code = match &err { + QuotaError::QuotaExceeded { .. } => S3ErrorCode::InvalidRequest, + QuotaError::ConfigNotFound { .. } => S3ErrorCode::NoSuchBucket, + QuotaError::InvalidConfig { .. } => S3ErrorCode::InvalidArgument, + QuotaError::StorageError(_) => S3ErrorCode::InternalError, + }; + + let message = if code == S3ErrorCode::InternalError { + err.to_string() + } else { + ApiError::error_code_to_message(&code) + }; + + ApiError { + code, + message, + source: Some(Box::new(err)), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index fe6c779d..1fec35ec 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -40,6 +40,7 @@ use datafusion::arrow::{ use futures::StreamExt; use http::{HeaderMap, StatusCode}; use metrics::counter; +use rustfs_ecstore::bucket::quota::checker::QuotaChecker; use rustfs_ecstore::{ bucket::{ lifecycle::{ @@ -54,6 +55,7 @@ use rustfs_ecstore::{ metadata_sys::get_replication_config, object_lock::objectlock_sys::BucketObjectLockSys, policy_sys::PolicySys, + quota::QuotaOperation, replication::{ DeletedObjectReplicationInfo, ReplicationConfigurationExt, check_replicate_delete, get_must_replicate_options, must_replicate, schedule_replication, schedule_replication_delete, @@ -1067,11 +1069,42 @@ impl S3 for FS { } } + // check quota for copy operation + if let Some(metadata_sys) = rustfs_ecstore::bucket::metadata_sys::GLOBAL_BucketMetadataSys.get() { + let quota_checker = QuotaChecker::new(metadata_sys.clone()); + + match quota_checker + .check_quota(&bucket, QuotaOperation::CopyObject, src_info.size as u64) + .await + { + Ok(check_result) => { + if !check_result.allowed { + return Err(S3Error::with_message( + S3ErrorCode::InvalidRequest, + format!( + "Bucket quota exceeded. Current usage: {} bytes, limit: {} bytes", + check_result.current_usage, + check_result.quota_limit.unwrap_or(0) + ), + )); + } + } + Err(e) => { + warn!("Quota check failed for bucket {}: {}, allowing operation", bucket, e); + } + } + } + let oi = store .copy_object(&src_bucket, &src_key, &bucket, &key, &mut src_info, &src_opts, &dst_opts) .await .map_err(ApiError::from)?; + // Update quota tracking after successful copy + if rustfs_ecstore::bucket::metadata_sys::GLOBAL_BucketMetadataSys.get().is_some() { + rustfs_ecstore::data_usage::increment_bucket_usage_memory(&bucket, oi.size as u64).await; + } + // Invalidate cache for the destination object to prevent stale data let manager = get_concurrency_manager(); let dest_bucket = bucket.clone(); @@ -1440,6 +1473,9 @@ impl S3 for FS { } }; + // Fast in-memory update for immediate quota consistency + rustfs_ecstore::data_usage::decrement_bucket_usage_memory(&bucket, obj_info.size as u64).await; + // Invalidate cache for the deleted object let manager = get_concurrency_manager(); let del_bucket = bucket.clone(); @@ -1534,8 +1570,6 @@ impl S3 for FS { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); }; - let has_lock_enable = BucketObjectLockSys::get(&bucket).await.is_some(); - let version_cfg = BucketVersioningSys::get(&bucket).await.unwrap_or_default(); #[derive(Default, Clone)] @@ -1548,6 +1582,7 @@ impl S3 for FS { let mut object_to_delete = Vec::new(); let mut object_to_delete_index = HashMap::new(); + let mut object_sizes = HashMap::new(); for (idx, obj_id) in delete.objects.iter().enumerate() { // Per S3 API spec, "null" string means non-versioned object // Filter out "null" version_id to treat as unversioned @@ -1606,15 +1641,14 @@ impl S3 for FS { .await .map_err(ApiError::from)?; - let mut goi = ObjectInfo::default(); - let mut gerr = None; + // Get object info to collect size for quota tracking + let (goi, gerr) = match store.get_object_info(&bucket, &object.object_name, &opts).await { + Ok(res) => (res, None), + Err(e) => (ObjectInfo::default(), Some(e.to_string())), + }; - if replicate_deletes || object.version_id.is_some() && has_lock_enable { - (goi, gerr) = match store.get_object_info(&bucket, &object.object_name, &opts).await { - Ok(res) => (res, None), - Err(e) => (ObjectInfo::default(), Some(e.to_string())), - }; - } + // Store object size for quota tracking + object_sizes.insert(object.object_name.clone(), goi.size); if is_dir_object(&object.object_name) && object.version_id.is_none() { object.version_id = Some(Uuid::nil()); @@ -1716,6 +1750,10 @@ impl S3 for FS { dobjs[i].replication_state = Some(object_to_delete[i].replication_state()); } delete_results[*didx].delete_object = Some(dobjs[i].clone()); + // Update quota tracking for successfully deleted objects + if let Some(&size) = object_sizes.get(&obj.object_name) { + rustfs_ecstore::data_usage::decrement_bucket_usage_memory(&bucket, size as u64).await; + } continue; } @@ -3151,6 +3189,34 @@ impl S3 for FS { // Validate object key validate_object_key(&key, "PUT")?; + // check quota for put operation + if let Some(size) = content_length + && let Some(metadata_sys) = rustfs_ecstore::bucket::metadata_sys::GLOBAL_BucketMetadataSys.get() + { + let quota_checker = QuotaChecker::new(metadata_sys.clone()); + + match quota_checker + .check_quota(&bucket, QuotaOperation::PutObject, size as u64) + .await + { + Ok(check_result) => { + if !check_result.allowed { + return Err(S3Error::with_message( + S3ErrorCode::InvalidRequest, + format!( + "Bucket quota exceeded. Current usage: {} bytes, limit: {} bytes", + check_result.current_usage, + check_result.quota_limit.unwrap_or(0) + ), + )); + } + } + Err(e) => { + warn!("Quota check failed for bucket {}: {}, allowing operation", bucket, e); + } + } + } + if if_match.is_some() || if_none_match.is_some() { let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); @@ -3429,6 +3495,9 @@ impl S3 for FS { .await .map_err(ApiError::from)?; + // Fast in-memory update for immediate quota consistency + rustfs_ecstore::data_usage::increment_bucket_usage_memory(&bucket, obj_info.size as u64).await; + // Invalidate cache for the written object to prevent stale data let manager = get_concurrency_manager(); let put_bucket = bucket.clone(); @@ -4356,6 +4425,38 @@ impl S3 for FS { .await .map_err(ApiError::from)?; + // check quota after completing multipart upload + if let Some(metadata_sys) = rustfs_ecstore::bucket::metadata_sys::GLOBAL_BucketMetadataSys.get() { + let quota_checker = QuotaChecker::new(metadata_sys.clone()); + + match quota_checker + .check_quota(&bucket, QuotaOperation::PutObject, obj_info.size as u64) + .await + { + Ok(check_result) => { + if !check_result.allowed { + // Quota exceeded, delete the completed object + let _ = store.delete_object(&bucket, &key, ObjectOptions::default()).await; + return Err(S3Error::with_message( + S3ErrorCode::InvalidRequest, + format!( + "Bucket quota exceeded. Current usage: {} bytes, limit: {} bytes", + check_result.current_usage, + check_result.quota_limit.unwrap_or(0) + ), + )); + } + // Update quota tracking after successful multipart upload + if rustfs_ecstore::bucket::metadata_sys::GLOBAL_BucketMetadataSys.get().is_some() { + rustfs_ecstore::data_usage::increment_bucket_usage_memory(&bucket, obj_info.size as u64).await; + } + } + Err(e) => { + warn!("Quota check failed for bucket {}: {}, allowing operation", bucket, e); + } + } + } + // Invalidate cache for the completed multipart object let manager = get_concurrency_manager(); let mpu_bucket = bucket.clone();