From 04bf4b0f984bd0e50a11ddd10ec666575a9c57f7 Mon Sep 17 00:00:00 2001 From: houseme Date: Tue, 2 Sep 2025 00:14:10 +0800 Subject: [PATCH] feat: add S3 object legal hold and retention management APIs (#476) * add bucket rule * translation * improve code for event notice add rule --- Cargo.lock | 17 ++- Cargo.toml | 2 +- crates/config/src/constants/app.rs | 2 +- crates/notify/src/global.rs | 25 ++++- crates/obs/src/config.rs | 11 +- rustfs/src/storage/ecfs.rs | 172 ++++++++++++++++++++++++----- 6 files changed, 182 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a7b334c8..bffc6599 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2445,9 +2445,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.4.0" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +checksum = "75d7cc94194b4dd0fa12845ef8c911101b7f37633cda14997a6e82099aa0b693" dependencies = [ "powerfmt", "serde", @@ -7362,12 +7362,11 @@ dependencies = [ [[package]] name = "time" -version = "0.3.41" +version = "0.3.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +checksum = "8ca967379f9d8eb8058d86ed467d81d03e81acd45757e4ca341c24affbe8e8e3" dependencies = [ "deranged", - "itoa", "libc", "num-conv", "num_threads", @@ -7379,15 +7378,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" +checksum = "a9108bb380861b07264b950ded55a44a14a4adc68b9f5efd85aafc3aa4d40a68" [[package]] name = "time-macros" -version = "0.2.22" +version = "0.2.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +checksum = "7182799245a7264ce590b349d90338f1c1affad93d2639aed5f8f69c090b334c" dependencies = [ "num-conv", "time-core", diff --git a/Cargo.toml b/Cargo.toml index 5513ef57..87b10933 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -224,7 +224,7 @@ tempfile = "3.21.0" temp-env = "0.3.6" test-case = "3.3.1" thiserror = "2.0.16" -time = { version = "0.3.41", features = [ +time = { version = "0.3.42", features = [ "std", "parsing", "formatting", diff --git a/crates/config/src/constants/app.rs b/crates/config/src/constants/app.rs index b890defd..3c32276e 100644 --- a/crates/config/src/constants/app.rs +++ b/crates/config/src/constants/app.rs @@ -124,7 +124,7 @@ pub const DEFAULT_LOG_FILENAME: &str = "rustfs"; /// This is the default log filename for OBS. /// It is used to store the logs of the application. /// Default value: rustfs.log -pub const DEFAULT_OBS_LOG_FILENAME: &str = concat!(DEFAULT_LOG_FILENAME, ".log"); +pub const DEFAULT_OBS_LOG_FILENAME: &str = concat!(DEFAULT_LOG_FILENAME, "."); /// Default sink file log file for rustfs /// This is the default sink file log file for rustfs. diff --git a/crates/notify/src/global.rs b/crates/notify/src/global.rs index e8a70b06..99e759db 100644 --- a/crates/notify/src/global.rs +++ b/crates/notify/src/global.rs @@ -162,13 +162,13 @@ impl Notifier { &self, bucket_name: &str, region: &str, - event_rules: &[(Vec, &str, &str, Vec)], + event_rules: &[(Vec, String, String, Vec)], ) -> Result<(), NotificationError> { let mut bucket_config = BucketNotificationConfig::new(region); for (event_names, prefix, suffix, target_ids) in event_rules { // Use `new_pattern` to construct a matching pattern - let pattern = crate::rules::pattern::new_pattern(Some(prefix), Some(suffix)); + let pattern = crate::rules::pattern::new_pattern(Some(prefix.as_str()), Some(suffix.as_str())); for target_id in target_ids { bucket_config.add_rule(event_names, pattern.clone(), target_id.clone()); @@ -186,4 +186,25 @@ impl Notifier { .load_bucket_notification_config(bucket_name, &bucket_config) .await } + + /// Clear all notification rules for the specified bucket. + /// # Parameter + /// - `bucket_name`: The name of the target bucket. + /// # Return value + /// Returns `Result<(), NotificationError>`, Ok on success, and an error on failure. + /// # Using + /// This function allows you to clear all notification rules for a specific bucket. + /// This is useful when you want to reset the notification configuration for a bucket. + /// + pub async fn clear_bucket_notification_rules(&self, bucket_name: &str) -> Result<(), NotificationError> { + // Get global NotificationSystem instance + let notification_sys = match notification_system() { + Some(sys) => sys, + None => return Err(NotificationError::ServerNotInitialized), + }; + + // Clear configuration + notification_sys.remove_bucket_notification_config(bucket_name).await; + Ok(()) + } } diff --git a/crates/obs/src/config.rs b/crates/obs/src/config.rs index 8031cc84..999a9da1 100644 --- a/crates/obs/src/config.rs +++ b/crates/obs/src/config.rs @@ -17,14 +17,13 @@ use rustfs_config::observability::{ DEFAULT_SINKS_FILE_FLUSH_THRESHOLD, DEFAULT_SINKS_KAFKA_BATCH_SIZE, DEFAULT_SINKS_KAFKA_BATCH_TIMEOUT_MS, DEFAULT_SINKS_KAFKA_BROKERS, DEFAULT_SINKS_KAFKA_TOPIC, DEFAULT_SINKS_WEBHOOK_AUTH_TOKEN, DEFAULT_SINKS_WEBHOOK_ENDPOINT, DEFAULT_SINKS_WEBHOOK_MAX_RETRIES, DEFAULT_SINKS_WEBHOOK_RETRY_DELAY_MS, ENV_AUDIT_LOGGER_QUEUE_CAPACITY, ENV_OBS_ENDPOINT, - ENV_OBS_ENVIRONMENT, ENV_OBS_LOCAL_LOGGING_ENABLED, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_KEEP_FILES, + ENV_OBS_ENVIRONMENT, ENV_OBS_LOCAL_LOGGING_ENABLED, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_KEEP_FILES, ENV_OBS_LOG_ROTATION_SIZE_MB, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOGGER_LEVEL, ENV_OBS_METER_INTERVAL, ENV_OBS_SAMPLE_RATIO, - ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_SINKS_FILE_BUFFER_SIZE, ENV_SINKS_FILE_FLUSH_INTERVAL_MS, - ENV_SINKS_FILE_FLUSH_THRESHOLD, ENV_SINKS_FILE_PATH, ENV_SINKS_KAFKA_BATCH_SIZE, ENV_SINKS_KAFKA_BATCH_TIMEOUT_MS, - ENV_SINKS_KAFKA_BROKERS, ENV_SINKS_KAFKA_TOPIC, ENV_SINKS_WEBHOOK_AUTH_TOKEN, ENV_SINKS_WEBHOOK_ENDPOINT, - ENV_SINKS_WEBHOOK_MAX_RETRIES, ENV_SINKS_WEBHOOK_RETRY_DELAY_MS, + ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_OBS_USE_STDOUT, ENV_SINKS_FILE_BUFFER_SIZE, + ENV_SINKS_FILE_FLUSH_INTERVAL_MS, ENV_SINKS_FILE_FLUSH_THRESHOLD, ENV_SINKS_FILE_PATH, ENV_SINKS_KAFKA_BATCH_SIZE, + ENV_SINKS_KAFKA_BATCH_TIMEOUT_MS, ENV_SINKS_KAFKA_BROKERS, ENV_SINKS_KAFKA_TOPIC, ENV_SINKS_WEBHOOK_AUTH_TOKEN, + ENV_SINKS_WEBHOOK_ENDPOINT, ENV_SINKS_WEBHOOK_MAX_RETRIES, ENV_SINKS_WEBHOOK_RETRY_DELAY_MS, }; -use rustfs_config::observability::{ENV_OBS_LOG_DIRECTORY, ENV_OBS_USE_STDOUT}; use rustfs_config::{ APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_SIZE_MB, DEFAULT_LOG_ROTATION_TIME, DEFAULT_OBS_LOG_FILENAME, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT, diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 15779cb6..599d454f 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -29,12 +29,6 @@ use chrono::Utc; use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder; use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder; use datafusion::arrow::json::writer::JsonArray; -use rustfs_ecstore::set_disk::MAX_PARTS_COUNT; -use rustfs_s3select_api::object_store::bytes_stream; -use rustfs_s3select_api::query::Context; -use rustfs_s3select_api::query::Query; -use rustfs_s3select_query::get_global_db; - // use rustfs_ecstore::store_api::RESERVED_METADATA_PREFIX; use futures::StreamExt; use http::HeaderMap; @@ -64,6 +58,7 @@ use rustfs_ecstore::compress::is_compressible; use rustfs_ecstore::error::StorageError; use rustfs_ecstore::new_object_layer_fn; use rustfs_ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE; +use rustfs_ecstore::set_disk::MAX_PARTS_COUNT; use rustfs_ecstore::store_api::BucketOptions; use rustfs_ecstore::store_api::CompletePart; use rustfs_ecstore::store_api::DeleteBucketOptions; @@ -77,6 +72,7 @@ use rustfs_ecstore::store_api::PutObjReader; use rustfs_ecstore::store_api::StorageAPI; use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER; use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING}; +use rustfs_notify::global::notifier_instance; use rustfs_policy::auth; use rustfs_policy::policy::action::Action; use rustfs_policy::policy::action::S3Action; @@ -86,7 +82,12 @@ use rustfs_rio::EtagReader; use rustfs_rio::HashReader; use rustfs_rio::Reader; use rustfs_rio::WarpReader; +use rustfs_s3select_api::object_store::bytes_stream; +use rustfs_s3select_api::query::Context; +use rustfs_s3select_api::query::Query; +use rustfs_s3select_query::get_global_db; use rustfs_targets::EventName; +use rustfs_targets::arn::{TargetID, TargetIDError}; use rustfs_utils::CompressionAlgorithm; use rustfs_utils::path::path_join_buf; use rustfs_zip::CompressionFormat; @@ -262,7 +263,7 @@ impl FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); } } @@ -290,6 +291,7 @@ impl FS { Ok(S3Response::new(output)) } } + #[async_trait::async_trait] impl S3 for FS { #[tracing::instrument( @@ -335,7 +337,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(output)) @@ -481,7 +483,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(output)) @@ -681,7 +683,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(DeleteBucketOutput {})) @@ -756,7 +758,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(output)) @@ -841,7 +843,7 @@ impl S3 for FS { host: rustfs_utils::get_request_host(&req.headers), user_agent: rustfs_utils::get_request_user_agent(&req.headers), }; - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; } }); @@ -961,11 +963,11 @@ impl S3 for FS { } } - let mut content_length = info.size as i64; + let mut content_length = info.size; let content_range = if let Some(rs) = rs { let total_size = info.get_actual_size().map_err(ApiError::from)?; - let (start, length) = rs.get_offset_length(total_size as i64).map_err(ApiError::from)?; + let (start, length) = rs.get_offset_length(total_size).map_err(ApiError::from)?; content_length = length; Some(format!("bytes {}-{}/{}", start, start as i64 + length - 1, total_size)) } else { @@ -1006,7 +1008,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(output)) @@ -1128,7 +1130,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(output)) @@ -1512,7 +1514,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(output)) @@ -1590,7 +1592,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(output)) @@ -2147,7 +2149,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(PutObjectTaggingOutput { version_id: None })) @@ -2214,7 +2216,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None })) @@ -2791,20 +2793,56 @@ impl S3 for FS { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); }; + // Verify that the bucket exists store .get_bucket_info(&bucket, &BucketOptions::default()) .await .map_err(ApiError::from)?; + // Persist the new notification configuration let data = try_!(serialize(¬ification_configuration)); - metadata_sys::update(&bucket, BUCKET_NOTIFICATION_CONFIG, data) .await .map_err(ApiError::from)?; - // TODO: event notice add rule + // Determine region (BucketInfo has no region field) -> use global region or default + let region = rustfs_ecstore::global::get_global_region().unwrap_or_else(|| req.region.clone().unwrap_or_default()); - Ok(S3Response::new(PutBucketNotificationConfigurationOutput::default())) + // Purge old rules and resolve new rules in parallel + let clear_rules = notifier_instance().clear_bucket_notification_rules(&bucket); + let parse_rules = async { + let mut event_rules = Vec::new(); + + process_queue_configurations( + &mut event_rules, + notification_configuration.queue_configurations.clone(), + TargetID::from_str, + ); + process_topic_configurations( + &mut event_rules, + notification_configuration.topic_configurations.clone(), + TargetID::from_str, + ); + process_lambda_configurations( + &mut event_rules, + notification_configuration.lambda_function_configurations.clone(), + TargetID::from_str, + ); + + event_rules + }; + + let (clear_result, event_rules) = tokio::join!(clear_rules, parse_rules); + + clear_result.map_err(|e| s3_error!(InternalError, "Failed to clear rules: {e}"))?; + + // Add a new notification rule + notifier_instance() + .add_event_specific_rules(&bucket, ®ion, &event_rules) + .await + .map_err(|e| s3_error!(InternalError, "Failed to add rules: {e}"))?; + + Ok(S3Response::new(PutBucketNotificationConfigurationOutput {})) } async fn get_bucket_acl(&self, req: S3Request) -> S3Result> { @@ -2951,7 +2989,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(output)) @@ -3129,7 +3167,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(output)) @@ -3208,7 +3246,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(output)) @@ -3269,7 +3307,7 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(output)) @@ -3344,13 +3382,91 @@ impl S3 for FS { // Asynchronous call will not block the response of the current request tokio::spawn(async move { - rustfs_notify::global::notifier_instance().notify(event_args).await; + notifier_instance().notify(event_args).await; }); Ok(S3Response::new(output)) } } +/// Auxiliary functions: extract prefixes and suffixes +fn extract_prefix_suffix(filter: Option<&NotificationConfigurationFilter>) -> (String, String) { + if let Some(filter) = filter { + if let Some(filter_rules) = &filter.key { + let mut prefix = String::new(); + let mut suffix = String::new(); + if let Some(rules) = &filter_rules.filter_rules { + for rule in rules { + if let (Some(name), Some(value)) = (rule.name.as_ref(), rule.value.as_ref()) { + match name.as_str() { + "prefix" => prefix = value.clone(), + "suffix" => suffix = value.clone(), + _ => {} + } + } + } + } + return (prefix, suffix); + } + } + (String::new(), String::new()) +} + +/// Auxiliary functions: Handle configuration +fn process_queue_configurations( + event_rules: &mut Vec<(Vec, String, String, Vec)>, + configurations: Option>, + target_id_parser: F, +) where + F: Fn(&str) -> Result, +{ + if let Some(configs) = configurations { + for cfg in configs { + let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect(); + let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref()); + let target_ids = vec![target_id_parser(&cfg.queue_arn).ok()].into_iter().flatten().collect(); + event_rules.push((events, prefix, suffix, target_ids)); + } + } +} + +fn process_topic_configurations( + event_rules: &mut Vec<(Vec, String, String, Vec)>, + configurations: Option>, + target_id_parser: F, +) where + F: Fn(&str) -> Result, +{ + if let Some(configs) = configurations { + for cfg in configs { + let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect(); + let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref()); + let target_ids = vec![target_id_parser(&cfg.topic_arn).ok()].into_iter().flatten().collect(); + event_rules.push((events, prefix, suffix, target_ids)); + } + } +} + +fn process_lambda_configurations( + event_rules: &mut Vec<(Vec, String, String, Vec)>, + configurations: Option>, + target_id_parser: F, +) where + F: Fn(&str) -> Result, +{ + if let Some(configs) = configurations { + for cfg in configs { + let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect(); + let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref()); + let target_ids = vec![target_id_parser(&cfg.lambda_function_arn).ok()] + .into_iter() + .flatten() + .collect(); + event_rules.push((events, prefix, suffix, target_ids)); + } + } +} + #[cfg(test)] mod tests { use super::*;