feat: add S3 object legal hold and retention management APIs (#476)

* add bucket rule

* translation

* improve code for event notice add rule
This commit is contained in:
houseme
2025-09-02 00:14:10 +08:00
committed by GitHub
parent 7462be983a
commit 04bf4b0f98
6 changed files with 182 additions and 47 deletions

17
Cargo.lock generated
View File

@@ -2445,9 +2445,9 @@ dependencies = [
[[package]]
name = "deranged"
version = "0.4.0"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e"
checksum = "75d7cc94194b4dd0fa12845ef8c911101b7f37633cda14997a6e82099aa0b693"
dependencies = [
"powerfmt",
"serde",
@@ -7362,12 +7362,11 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.41"
version = "0.3.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40"
checksum = "8ca967379f9d8eb8058d86ed467d81d03e81acd45757e4ca341c24affbe8e8e3"
dependencies = [
"deranged",
"itoa",
"libc",
"num-conv",
"num_threads",
@@ -7379,15 +7378,15 @@ dependencies = [
[[package]]
name = "time-core"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c"
checksum = "a9108bb380861b07264b950ded55a44a14a4adc68b9f5efd85aafc3aa4d40a68"
[[package]]
name = "time-macros"
version = "0.2.22"
version = "0.2.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49"
checksum = "7182799245a7264ce590b349d90338f1c1affad93d2639aed5f8f69c090b334c"
dependencies = [
"num-conv",
"time-core",

View File

@@ -224,7 +224,7 @@ tempfile = "3.21.0"
temp-env = "0.3.6"
test-case = "3.3.1"
thiserror = "2.0.16"
time = { version = "0.3.41", features = [
time = { version = "0.3.42", features = [
"std",
"parsing",
"formatting",

View File

@@ -124,7 +124,7 @@ pub const DEFAULT_LOG_FILENAME: &str = "rustfs";
/// This is the default log filename for OBS.
/// It is used to store the logs of the application.
/// Default value: rustfs.log
pub const DEFAULT_OBS_LOG_FILENAME: &str = concat!(DEFAULT_LOG_FILENAME, ".log");
pub const DEFAULT_OBS_LOG_FILENAME: &str = concat!(DEFAULT_LOG_FILENAME, ".");
/// Default sink file log file for rustfs
/// This is the default sink file log file for rustfs.

View File

@@ -162,13 +162,13 @@ impl Notifier {
&self,
bucket_name: &str,
region: &str,
event_rules: &[(Vec<EventName>, &str, &str, Vec<TargetID>)],
event_rules: &[(Vec<EventName>, String, String, Vec<TargetID>)],
) -> Result<(), NotificationError> {
let mut bucket_config = BucketNotificationConfig::new(region);
for (event_names, prefix, suffix, target_ids) in event_rules {
// Use `new_pattern` to construct a matching pattern
let pattern = crate::rules::pattern::new_pattern(Some(prefix), Some(suffix));
let pattern = crate::rules::pattern::new_pattern(Some(prefix.as_str()), Some(suffix.as_str()));
for target_id in target_ids {
bucket_config.add_rule(event_names, pattern.clone(), target_id.clone());
@@ -186,4 +186,25 @@ impl Notifier {
.load_bucket_notification_config(bucket_name, &bucket_config)
.await
}
/// Clear all notification rules for the specified bucket.
/// # Parameter
/// - `bucket_name`: The name of the target bucket.
/// # Return value
/// Returns `Result<(), NotificationError>`, Ok on success, and an error on failure.
/// # Using
/// This function allows you to clear all notification rules for a specific bucket.
/// This is useful when you want to reset the notification configuration for a bucket.
///
pub async fn clear_bucket_notification_rules(&self, bucket_name: &str) -> Result<(), NotificationError> {
// Get global NotificationSystem instance
let notification_sys = match notification_system() {
Some(sys) => sys,
None => return Err(NotificationError::ServerNotInitialized),
};
// Clear configuration
notification_sys.remove_bucket_notification_config(bucket_name).await;
Ok(())
}
}

View File

@@ -17,14 +17,13 @@ use rustfs_config::observability::{
DEFAULT_SINKS_FILE_FLUSH_THRESHOLD, DEFAULT_SINKS_KAFKA_BATCH_SIZE, DEFAULT_SINKS_KAFKA_BATCH_TIMEOUT_MS,
DEFAULT_SINKS_KAFKA_BROKERS, DEFAULT_SINKS_KAFKA_TOPIC, DEFAULT_SINKS_WEBHOOK_AUTH_TOKEN, DEFAULT_SINKS_WEBHOOK_ENDPOINT,
DEFAULT_SINKS_WEBHOOK_MAX_RETRIES, DEFAULT_SINKS_WEBHOOK_RETRY_DELAY_MS, ENV_AUDIT_LOGGER_QUEUE_CAPACITY, ENV_OBS_ENDPOINT,
ENV_OBS_ENVIRONMENT, ENV_OBS_LOCAL_LOGGING_ENABLED, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_KEEP_FILES,
ENV_OBS_ENVIRONMENT, ENV_OBS_LOCAL_LOGGING_ENABLED, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_KEEP_FILES,
ENV_OBS_LOG_ROTATION_SIZE_MB, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOGGER_LEVEL, ENV_OBS_METER_INTERVAL, ENV_OBS_SAMPLE_RATIO,
ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_SINKS_FILE_BUFFER_SIZE, ENV_SINKS_FILE_FLUSH_INTERVAL_MS,
ENV_SINKS_FILE_FLUSH_THRESHOLD, ENV_SINKS_FILE_PATH, ENV_SINKS_KAFKA_BATCH_SIZE, ENV_SINKS_KAFKA_BATCH_TIMEOUT_MS,
ENV_SINKS_KAFKA_BROKERS, ENV_SINKS_KAFKA_TOPIC, ENV_SINKS_WEBHOOK_AUTH_TOKEN, ENV_SINKS_WEBHOOK_ENDPOINT,
ENV_SINKS_WEBHOOK_MAX_RETRIES, ENV_SINKS_WEBHOOK_RETRY_DELAY_MS,
ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_OBS_USE_STDOUT, ENV_SINKS_FILE_BUFFER_SIZE,
ENV_SINKS_FILE_FLUSH_INTERVAL_MS, ENV_SINKS_FILE_FLUSH_THRESHOLD, ENV_SINKS_FILE_PATH, ENV_SINKS_KAFKA_BATCH_SIZE,
ENV_SINKS_KAFKA_BATCH_TIMEOUT_MS, ENV_SINKS_KAFKA_BROKERS, ENV_SINKS_KAFKA_TOPIC, ENV_SINKS_WEBHOOK_AUTH_TOKEN,
ENV_SINKS_WEBHOOK_ENDPOINT, ENV_SINKS_WEBHOOK_MAX_RETRIES, ENV_SINKS_WEBHOOK_RETRY_DELAY_MS,
};
use rustfs_config::observability::{ENV_OBS_LOG_DIRECTORY, ENV_OBS_USE_STDOUT};
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_SIZE_MB, DEFAULT_LOG_ROTATION_TIME,
DEFAULT_OBS_LOG_FILENAME, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,

View File

@@ -29,12 +29,6 @@ use chrono::Utc;
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
use rustfs_ecstore::set_disk::MAX_PARTS_COUNT;
use rustfs_s3select_api::object_store::bytes_stream;
use rustfs_s3select_api::query::Context;
use rustfs_s3select_api::query::Query;
use rustfs_s3select_query::get_global_db;
// use rustfs_ecstore::store_api::RESERVED_METADATA_PREFIX;
use futures::StreamExt;
use http::HeaderMap;
@@ -64,6 +58,7 @@ use rustfs_ecstore::compress::is_compressible;
use rustfs_ecstore::error::StorageError;
use rustfs_ecstore::new_object_layer_fn;
use rustfs_ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE;
use rustfs_ecstore::set_disk::MAX_PARTS_COUNT;
use rustfs_ecstore::store_api::BucketOptions;
use rustfs_ecstore::store_api::CompletePart;
use rustfs_ecstore::store_api::DeleteBucketOptions;
@@ -77,6 +72,7 @@ use rustfs_ecstore::store_api::PutObjReader;
use rustfs_ecstore::store_api::StorageAPI;
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
use rustfs_notify::global::notifier_instance;
use rustfs_policy::auth;
use rustfs_policy::policy::action::Action;
use rustfs_policy::policy::action::S3Action;
@@ -86,7 +82,12 @@ use rustfs_rio::EtagReader;
use rustfs_rio::HashReader;
use rustfs_rio::Reader;
use rustfs_rio::WarpReader;
use rustfs_s3select_api::object_store::bytes_stream;
use rustfs_s3select_api::query::Context;
use rustfs_s3select_api::query::Query;
use rustfs_s3select_query::get_global_db;
use rustfs_targets::EventName;
use rustfs_targets::arn::{TargetID, TargetIDError};
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::path::path_join_buf;
use rustfs_zip::CompressionFormat;
@@ -262,7 +263,7 @@ impl FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
}
}
@@ -290,6 +291,7 @@ impl FS {
Ok(S3Response::new(output))
}
}
#[async_trait::async_trait]
impl S3 for FS {
#[tracing::instrument(
@@ -335,7 +337,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -481,7 +483,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -681,7 +683,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(DeleteBucketOutput {}))
@@ -756,7 +758,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -841,7 +843,7 @@ impl S3 for FS {
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
}
});
@@ -961,11 +963,11 @@ impl S3 for FS {
}
}
let mut content_length = info.size as i64;
let mut content_length = info.size;
let content_range = if let Some(rs) = rs {
let total_size = info.get_actual_size().map_err(ApiError::from)?;
let (start, length) = rs.get_offset_length(total_size as i64).map_err(ApiError::from)?;
let (start, length) = rs.get_offset_length(total_size).map_err(ApiError::from)?;
content_length = length;
Some(format!("bytes {}-{}/{}", start, start as i64 + length - 1, total_size))
} else {
@@ -1006,7 +1008,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -1128,7 +1130,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -1512,7 +1514,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -1590,7 +1592,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -2147,7 +2149,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(PutObjectTaggingOutput { version_id: None }))
@@ -2214,7 +2216,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None }))
@@ -2791,20 +2793,56 @@ impl S3 for FS {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
// Verify that the bucket exists
store
.get_bucket_info(&bucket, &BucketOptions::default())
.await
.map_err(ApiError::from)?;
// Persist the new notification configuration
let data = try_!(serialize(&notification_configuration));
metadata_sys::update(&bucket, BUCKET_NOTIFICATION_CONFIG, data)
.await
.map_err(ApiError::from)?;
// TODO: event notice add rule
// Determine region (BucketInfo has no region field) -> use global region or default
let region = rustfs_ecstore::global::get_global_region().unwrap_or_else(|| req.region.clone().unwrap_or_default());
Ok(S3Response::new(PutBucketNotificationConfigurationOutput::default()))
// Purge old rules and resolve new rules in parallel
let clear_rules = notifier_instance().clear_bucket_notification_rules(&bucket);
let parse_rules = async {
let mut event_rules = Vec::new();
process_queue_configurations(
&mut event_rules,
notification_configuration.queue_configurations.clone(),
TargetID::from_str,
);
process_topic_configurations(
&mut event_rules,
notification_configuration.topic_configurations.clone(),
TargetID::from_str,
);
process_lambda_configurations(
&mut event_rules,
notification_configuration.lambda_function_configurations.clone(),
TargetID::from_str,
);
event_rules
};
let (clear_result, event_rules) = tokio::join!(clear_rules, parse_rules);
clear_result.map_err(|e| s3_error!(InternalError, "Failed to clear rules: {e}"))?;
// Add a new notification rule
notifier_instance()
.add_event_specific_rules(&bucket, &region, &event_rules)
.await
.map_err(|e| s3_error!(InternalError, "Failed to add rules: {e}"))?;
Ok(S3Response::new(PutBucketNotificationConfigurationOutput {}))
}
async fn get_bucket_acl(&self, req: S3Request<GetBucketAclInput>) -> S3Result<S3Response<GetBucketAclOutput>> {
@@ -2951,7 +2989,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -3129,7 +3167,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -3208,7 +3246,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -3269,7 +3307,7 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
@@ -3344,13 +3382,91 @@ impl S3 for FS {
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
rustfs_notify::global::notifier_instance().notify(event_args).await;
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
}
}
/// Auxiliary functions: extract prefixes and suffixes
fn extract_prefix_suffix(filter: Option<&NotificationConfigurationFilter>) -> (String, String) {
if let Some(filter) = filter {
if let Some(filter_rules) = &filter.key {
let mut prefix = String::new();
let mut suffix = String::new();
if let Some(rules) = &filter_rules.filter_rules {
for rule in rules {
if let (Some(name), Some(value)) = (rule.name.as_ref(), rule.value.as_ref()) {
match name.as_str() {
"prefix" => prefix = value.clone(),
"suffix" => suffix = value.clone(),
_ => {}
}
}
}
}
return (prefix, suffix);
}
}
(String::new(), String::new())
}
/// Auxiliary functions: Handle configuration
fn process_queue_configurations<F>(
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
configurations: Option<Vec<QueueConfiguration>>,
target_id_parser: F,
) where
F: Fn(&str) -> Result<TargetID, TargetIDError>,
{
if let Some(configs) = configurations {
for cfg in configs {
let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect();
let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref());
let target_ids = vec![target_id_parser(&cfg.queue_arn).ok()].into_iter().flatten().collect();
event_rules.push((events, prefix, suffix, target_ids));
}
}
}
fn process_topic_configurations<F>(
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
configurations: Option<Vec<TopicConfiguration>>,
target_id_parser: F,
) where
F: Fn(&str) -> Result<TargetID, TargetIDError>,
{
if let Some(configs) = configurations {
for cfg in configs {
let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect();
let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref());
let target_ids = vec![target_id_parser(&cfg.topic_arn).ok()].into_iter().flatten().collect();
event_rules.push((events, prefix, suffix, target_ids));
}
}
}
fn process_lambda_configurations<F>(
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
configurations: Option<Vec<LambdaFunctionConfiguration>>,
target_id_parser: F,
) where
F: Fn(&str) -> Result<TargetID, TargetIDError>,
{
if let Some(configs) = configurations {
for cfg in configs {
let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect();
let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref());
let target_ids = vec![target_id_parser(&cfg.lambda_function_arn).ok()]
.into_iter()
.flatten()
.collect();
event_rules.push((events, prefix, suffix, target_ids));
}
}
}
#[cfg(test)]
mod tests {
use super::*;