feat: implement bucket quota system (#1461)

Signed-off-by: yxrxy <1532529704@qq.com>
Co-authored-by: loverustfs <hello@rustfs.com>
This commit is contained in:
yxrxy
2026-01-12 11:42:07 +08:00
committed by GitHub
parent 78b13f3ff2
commit 29d86036b1
17 changed files with 1964 additions and 42 deletions

View File

@@ -355,7 +355,7 @@ impl BucketMetadata {
self.tagging_config = Some(deserialize::<Tagging>(&self.tagging_config_xml)?);
}
if !self.quota_config_json.is_empty() {
self.quota_config = Some(BucketQuota::unmarshal(&self.quota_config_json)?);
self.quota_config = Some(serde_json::from_slice(&self.quota_config_json)?);
}
if !self.replication_config_xml.is_empty() {
self.replication_config = Some(deserialize::<ReplicationConfiguration>(&self.replication_config_xml)?);
@@ -487,7 +487,8 @@ mod test {
bm.tagging_config_updated_at = OffsetDateTime::now_utc();
// Add quota configuration
let quota_json = r#"{"quota":1073741824,"quotaType":"hard"}"#; // 1GB quota
let quota_json =
r#"{"quota":1073741824,"quota_type":"Hard","created_at":"2024-01-01T00:00:00Z","updated_at":"2024-01-01T00:00:00Z"}"#; // 1GB quota
bm.quota_config_json = quota_json.as_bytes().to_vec();
bm.quota_config_updated_at = OffsetDateTime::now_utc();

View File

@@ -0,0 +1,195 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::{BucketQuota, QuotaCheckResult, QuotaError, QuotaOperation};
use crate::bucket::metadata_sys::{BucketMetadataSys, update};
use crate::data_usage::get_bucket_usage_memory;
use rustfs_common::metrics::Metric;
use rustfs_config::QUOTA_CONFIG_FILE;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
use tracing::{debug, warn};
pub struct QuotaChecker {
metadata_sys: Arc<RwLock<BucketMetadataSys>>,
}
impl QuotaChecker {
pub fn new(metadata_sys: Arc<RwLock<BucketMetadataSys>>) -> Self {
Self { metadata_sys }
}
pub async fn check_quota(
&self,
bucket: &str,
operation: QuotaOperation,
operation_size: u64,
) -> Result<QuotaCheckResult, QuotaError> {
let start_time = Instant::now();
let quota_config = self.get_quota_config(bucket).await?;
// If no quota limit is set, allow operation
let quota_limit = match quota_config.quota {
None => {
let current_usage = self.get_real_time_usage(bucket).await?;
return Ok(QuotaCheckResult {
allowed: true,
current_usage,
quota_limit: None,
operation_size,
remaining: None,
});
}
Some(q) => q,
};
let current_usage = self.get_real_time_usage(bucket).await?;
let expected_usage = match operation {
QuotaOperation::PutObject | QuotaOperation::CopyObject => current_usage + operation_size,
QuotaOperation::DeleteObject => current_usage.saturating_sub(operation_size),
};
let allowed = match operation {
QuotaOperation::PutObject | QuotaOperation::CopyObject => {
quota_config.check_operation_allowed(current_usage, operation_size)
}
QuotaOperation::DeleteObject => true,
};
let remaining = if quota_limit >= expected_usage {
Some(quota_limit - expected_usage)
} else {
Some(0)
};
if !allowed {
warn!(
"Quota exceeded for bucket: {}, current: {}, limit: {}, attempted: {}",
bucket, current_usage, quota_limit, operation_size
);
}
let result = QuotaCheckResult {
allowed,
current_usage,
quota_limit: Some(quota_limit),
operation_size,
remaining,
};
let duration = start_time.elapsed();
rustfs_common::metrics::Metrics::inc_time(Metric::QuotaCheck, duration).await;
if !allowed {
rustfs_common::metrics::Metrics::inc_time(Metric::QuotaViolation, duration).await;
}
Ok(result)
}
pub async fn get_quota_config(&self, bucket: &str) -> Result<BucketQuota, QuotaError> {
let meta = self
.metadata_sys
.read()
.await
.get(bucket)
.await
.map_err(QuotaError::StorageError)?;
if meta.quota_config_json.is_empty() {
debug!("No quota config found for bucket: {}, using default", bucket);
return Ok(BucketQuota::new(None));
}
let quota: BucketQuota = serde_json::from_slice(&meta.quota_config_json).map_err(|e| QuotaError::InvalidConfig {
reason: format!("Failed to parse quota config: {}", e),
})?;
Ok(quota)
}
pub async fn set_quota_config(&mut self, bucket: &str, quota: BucketQuota) -> Result<(), QuotaError> {
let json_data = serde_json::to_vec(&quota).map_err(|e| QuotaError::InvalidConfig {
reason: format!("Failed to serialize quota config: {}", e),
})?;
let start_time = Instant::now();
update(bucket, QUOTA_CONFIG_FILE, json_data)
.await
.map_err(QuotaError::StorageError)?;
rustfs_common::metrics::Metrics::inc_time(Metric::QuotaSync, start_time.elapsed()).await;
Ok(())
}
pub async fn get_quota_stats(&self, bucket: &str) -> Result<(BucketQuota, Option<u64>), QuotaError> {
// If bucket doesn't exist, return ConfigNotFound error
if !self.bucket_exists(bucket).await {
return Err(QuotaError::ConfigNotFound {
bucket: bucket.to_string(),
});
}
let quota = self.get_quota_config(bucket).await?;
let current_usage = self.get_real_time_usage(bucket).await.unwrap_or(0);
Ok((quota, Some(current_usage)))
}
pub async fn bucket_exists(&self, bucket: &str) -> bool {
self.metadata_sys.read().await.get(bucket).await.is_ok()
}
pub async fn get_real_time_usage(&self, bucket: &str) -> Result<u64, QuotaError> {
Ok(get_bucket_usage_memory(bucket).await.unwrap_or(0))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_quota_check_no_limit() {
let result = QuotaCheckResult {
allowed: true,
current_usage: 0,
quota_limit: None,
operation_size: 1024,
remaining: None,
};
assert!(result.allowed);
assert_eq!(result.quota_limit, None);
}
#[tokio::test]
async fn test_quota_check_within_limit() {
let quota = BucketQuota::new(Some(2048)); // 2KB
// Current usage 512, trying to add 1024
let allowed = quota.check_operation_allowed(512, 1024);
assert!(allowed);
}
#[tokio::test]
async fn test_quota_check_exceeds_limit() {
let quota = BucketQuota::new(Some(1024)); // 1KB
// Current usage 512, trying to add 1024
let allowed = quota.check_operation_allowed(512, 1024);
assert!(!allowed);
}
}

View File

@@ -12,36 +12,37 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod checker;
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use rustfs_config::{
QUOTA_API_PATH, QUOTA_EXCEEDED_ERROR_CODE, QUOTA_INTERNAL_ERROR_CODE, QUOTA_INVALID_CONFIG_ERROR_CODE,
QUOTA_NOT_FOUND_ERROR_CODE,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use time::OffsetDateTime;
// Define the QuotaType enum
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum QuotaType {
/// Hard quota: reject immediately when exceeded
#[default]
Hard,
}
// Define the BucketQuota structure
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq)]
pub struct BucketQuota {
quota: Option<u64>, // Use Option to represent optional fields
size: u64,
rate: u64,
requests: u64,
quota_type: Option<QuotaType>,
pub quota: Option<u64>,
pub quota_type: QuotaType,
/// Timestamp when this quota configuration was set (for audit purposes)
pub created_at: Option<OffsetDateTime>,
}
impl BucketQuota {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
@@ -49,4 +50,107 @@ impl BucketQuota {
let t: BucketQuota = rmp_serde::from_slice(buf)?;
Ok(t)
}
pub fn new(quota: Option<u64>) -> Self {
let now = OffsetDateTime::now_utc();
Self {
quota,
quota_type: QuotaType::Hard,
created_at: Some(now),
}
}
pub fn get_quota_limit(&self) -> Option<u64> {
self.quota
}
pub fn check_operation_allowed(&self, current_usage: u64, operation_size: u64) -> bool {
if let Some(quota_limit) = self.quota {
current_usage.saturating_add(operation_size) <= quota_limit
} else {
true // No quota limit
}
}
pub fn get_remaining_quota(&self, current_usage: u64) -> Option<u64> {
self.quota.map(|limit| limit.saturating_sub(current_usage))
}
}
#[derive(Debug)]
pub struct QuotaCheckResult {
pub allowed: bool,
pub current_usage: u64,
/// quota_limit: None means unlimited
pub quota_limit: Option<u64>,
pub operation_size: u64,
pub remaining: Option<u64>,
}
#[derive(Debug)]
pub enum QuotaOperation {
PutObject,
CopyObject,
DeleteObject,
}
#[derive(Debug, Error)]
pub enum QuotaError {
#[error("Bucket quota exceeded: current={current}, limit={limit}, operation={operation}")]
QuotaExceeded { current: u64, limit: u64, operation: u64 },
#[error("Quota configuration not found for bucket: {bucket}")]
ConfigNotFound { bucket: String },
#[error("Invalid quota configuration: {reason}")]
InvalidConfig { reason: String },
#[error("Storage error: {0}")]
StorageError(#[from] crate::error::StorageError),
}
#[derive(Debug, Serialize)]
pub struct QuotaErrorResponse {
#[serde(rename = "Code")]
pub code: String,
#[serde(rename = "Message")]
pub message: String,
#[serde(rename = "Resource")]
pub resource: String,
#[serde(rename = "RequestId")]
pub request_id: String,
#[serde(rename = "HostId")]
pub host_id: String,
}
impl QuotaErrorResponse {
pub fn new(quota_error: &QuotaError, request_id: &str, host_id: &str) -> Self {
match quota_error {
QuotaError::QuotaExceeded { .. } => Self {
code: QUOTA_EXCEEDED_ERROR_CODE.to_string(),
message: quota_error.to_string(),
resource: QUOTA_API_PATH.to_string(),
request_id: request_id.to_string(),
host_id: host_id.to_string(),
},
QuotaError::ConfigNotFound { .. } => Self {
code: QUOTA_NOT_FOUND_ERROR_CODE.to_string(),
message: quota_error.to_string(),
resource: QUOTA_API_PATH.to_string(),
request_id: request_id.to_string(),
host_id: host_id.to_string(),
},
QuotaError::InvalidConfig { .. } => Self {
code: QUOTA_INVALID_CONFIG_ERROR_CODE.to_string(),
message: quota_error.to_string(),
resource: QUOTA_API_PATH.to_string(),
request_id: request_id.to_string(),
host_id: host_id.to_string(),
},
QuotaError::StorageError(_) => Self {
code: QUOTA_INTERNAL_ERROR_CODE.to_string(),
message: quota_error.to_string(),
resource: QUOTA_API_PATH.to_string(),
request_id: request_id.to_string(),
host_id: host_id.to_string(),
},
}
}
}

View File

@@ -15,8 +15,10 @@
use std::{
collections::{HashMap, hash_map::Entry},
sync::Arc,
time::SystemTime,
time::{Duration, SystemTime},
};
use tokio::sync::RwLock;
use tracing::debug;
pub mod local_snapshot;
pub use local_snapshot::{
@@ -32,6 +34,7 @@ use rustfs_common::data_usage::{
BucketTargetUsageInfo, BucketUsageInfo, DataUsageCache, DataUsageEntry, DataUsageInfo, DiskUsageStatus, SizeSummary,
};
use rustfs_utils::path::SLASH_SEPARATOR_STR;
use std::sync::OnceLock;
use tokio::fs;
use tracing::{error, info, warn};
@@ -42,6 +45,21 @@ pub const DATA_USAGE_ROOT: &str = SLASH_SEPARATOR_STR;
const DATA_USAGE_OBJ_NAME: &str = ".usage.json";
const DATA_USAGE_BLOOM_NAME: &str = ".bloomcycle.bin";
pub const DATA_USAGE_CACHE_NAME: &str = ".usage-cache.bin";
const DATA_USAGE_CACHE_TTL_SECS: u64 = 30;
type UsageMemoryCache = Arc<RwLock<HashMap<String, (u64, SystemTime)>>>;
type CacheUpdating = Arc<RwLock<bool>>;
static USAGE_MEMORY_CACHE: OnceLock<UsageMemoryCache> = OnceLock::new();
static USAGE_CACHE_UPDATING: OnceLock<CacheUpdating> = OnceLock::new();
fn memory_cache() -> &'static UsageMemoryCache {
USAGE_MEMORY_CACHE.get_or_init(|| Arc::new(RwLock::new(HashMap::new())))
}
fn cache_updating() -> &'static CacheUpdating {
USAGE_CACHE_UPDATING.get_or_init(|| Arc::new(RwLock::new(false)))
}
// Data usage storage paths
lazy_static::lazy_static! {
@@ -364,8 +382,120 @@ pub async fn compute_bucket_usage(store: Arc<ECStore>, bucket_name: &str) -> Res
Ok(usage)
}
/// Fast in-memory increment for immediate quota consistency
pub async fn increment_bucket_usage_memory(bucket: &str, size_increment: u64) {
let mut cache = memory_cache().write().await;
let current = cache.entry(bucket.to_string()).or_insert_with(|| (0, SystemTime::now()));
current.0 += size_increment;
current.1 = SystemTime::now();
}
/// Fast in-memory decrement for immediate quota consistency
pub async fn decrement_bucket_usage_memory(bucket: &str, size_decrement: u64) {
let mut cache = memory_cache().write().await;
if let Some(current) = cache.get_mut(bucket) {
current.0 = current.0.saturating_sub(size_decrement);
current.1 = SystemTime::now();
}
}
/// Get bucket usage from in-memory cache
pub async fn get_bucket_usage_memory(bucket: &str) -> Option<u64> {
update_usage_cache_if_needed().await;
let cache = memory_cache().read().await;
cache.get(bucket).map(|(usage, _)| *usage)
}
async fn update_usage_cache_if_needed() {
let ttl = Duration::from_secs(DATA_USAGE_CACHE_TTL_SECS);
let double_ttl = ttl * 2;
let now = SystemTime::now();
let cache = memory_cache().read().await;
let earliest_timestamp = cache.values().map(|(_, ts)| *ts).min();
drop(cache);
let age = match earliest_timestamp {
Some(ts) => now.duration_since(ts).unwrap_or_default(),
None => double_ttl,
};
if age < ttl {
return;
}
let mut updating = cache_updating().write().await;
if age < double_ttl {
if *updating {
return;
}
*updating = true;
drop(updating);
let cache_clone = (*memory_cache()).clone();
let updating_clone = (*cache_updating()).clone();
tokio::spawn(async move {
if let Some(store) = crate::global::GLOBAL_OBJECT_API.get()
&& let Ok(data_usage_info) = load_data_usage_from_backend(store.clone()).await
{
let mut cache = cache_clone.write().await;
for (bucket_name, bucket_usage) in data_usage_info.buckets_usage.iter() {
cache.insert(bucket_name.clone(), (bucket_usage.size, SystemTime::now()));
}
}
let mut updating = updating_clone.write().await;
*updating = false;
});
return;
}
for retry in 0..10 {
if !*updating {
break;
}
drop(updating);
let delay = Duration::from_millis(1 << retry);
tokio::time::sleep(delay).await;
updating = cache_updating().write().await;
}
*updating = true;
drop(updating);
if let Some(store) = crate::global::GLOBAL_OBJECT_API.get()
&& let Ok(data_usage_info) = load_data_usage_from_backend(store.clone()).await
{
let mut cache = memory_cache().write().await;
for (bucket_name, bucket_usage) in data_usage_info.buckets_usage.iter() {
cache.insert(bucket_name.clone(), (bucket_usage.size, SystemTime::now()));
}
}
let mut updating = cache_updating().write().await;
*updating = false;
}
/// Sync memory cache with backend data (called by scanner)
pub async fn sync_memory_cache_with_backend() -> Result<(), Error> {
if let Some(store) = crate::global::GLOBAL_OBJECT_API.get() {
match load_data_usage_from_backend(store.clone()).await {
Ok(data_usage_info) => {
let mut cache = memory_cache().write().await;
for (bucket, bucket_usage) in data_usage_info.buckets_usage.iter() {
cache.insert(bucket.clone(), (bucket_usage.size, SystemTime::now()));
}
}
Err(e) => {
debug!("Failed to sync memory cache with backend: {}", e);
}
}
}
Ok(())
}
/// Build basic data usage info with real object counts
async fn build_basic_data_usage_info(store: Arc<ECStore>) -> Result<DataUsageInfo, Error> {
pub async fn build_basic_data_usage_info(store: Arc<ECStore>) -> Result<DataUsageInfo, Error> {
let mut data_usage_info = DataUsageInfo::default();
// Get bucket list

View File

@@ -4614,7 +4614,9 @@ impl StorageAPI for SetDisks {
.await
.map_err(|e| to_object_err(e, vec![bucket, object]))?;
Ok(ObjectInfo::from_file_info(&dfi, bucket, object, opts.versioned || opts.version_suspended))
let mut obj_info = ObjectInfo::from_file_info(&dfi, bucket, object, opts.versioned || opts.version_suspended);
obj_info.size = goi.size;
Ok(obj_info)
}
#[tracing::instrument(skip(self))]