From 4e00110bfec6c80f8dcd3af626d69925626b7756 Mon Sep 17 00:00:00 2001 From: houseme Date: Wed, 10 Sep 2025 00:56:27 +0800 Subject: [PATCH] add bucket notification configuration (#502) --- crates/notify/src/notifier.rs | 2 +- crates/notify/src/rules/config.rs | 4 +- crates/notify/src/rules/xml_config.rs | 8 ++-- crates/targets/src/arn.rs | 2 +- rustfs/src/main.rs | 67 ++++++++++++++++++++++++--- rustfs/src/storage/ecfs.rs | 17 +++---- 6 files changed, 76 insertions(+), 24 deletions(-) diff --git a/crates/notify/src/notifier.rs b/crates/notify/src/notifier.rs index c4586c69..8a266fa3 100644 --- a/crates/notify/src/notifier.rs +++ b/crates/notify/src/notifier.rs @@ -69,7 +69,7 @@ impl EventNotifier { target_list_guard .keys() .iter() - .map(|target_id| target_id.to_arn(region).to_arn_string()) + .map(|target_id| target_id.to_arn(region).to_string()) .collect() } diff --git a/crates/notify/src/rules/config.rs b/crates/notify/src/rules/config.rs index 81fbc4eb..eda91472 100644 --- a/crates/notify/src/rules/config.rs +++ b/crates/notify/src/rules/config.rs @@ -101,8 +101,8 @@ impl BucketNotificationConfig { for target_id in target_id_set { // Construct the ARN string for this target_id and self.region let arn_to_check = target_id.to_arn(&self.region); // Assuming TargetID has to_arn - if !arn_list.contains(&arn_to_check.to_arn_string()) { - return Err(BucketNotificationConfigError::ArnNotFound(arn_to_check.to_arn_string())); + if !arn_list.contains(&arn_to_check.to_string()) { + return Err(BucketNotificationConfigError::ArnNotFound(arn_to_check.to_string())); } } } diff --git a/crates/notify/src/rules/xml_config.rs b/crates/notify/src/rules/xml_config.rs index e8401dfd..518218ff 100644 --- a/crates/notify/src/rules/xml_config.rs +++ b/crates/notify/src/rules/xml_config.rs @@ -168,7 +168,7 @@ impl QueueConfig { // Validate ARN (similar to Go's Queue.Validate) // The Go code checks targetList.Exists(q.ARN.TargetID) // Here we check against a provided arn_list - let _config_arn_str = self.arn.to_arn_string(); + let _config_arn_str = self.arn.to_string(); if !self.arn.region.is_empty() && self.arn.region != region { return Err(ParseConfigError::UnknownRegion(self.arn.region.clone())); } @@ -187,8 +187,8 @@ impl QueueConfig { partition: self.arn.partition.clone(), // or default "rustfs" }; - if !arn_list.contains(&effective_arn.to_arn_string()) { - return Err(ParseConfigError::ArnNotFound(effective_arn.to_arn_string())); + if !arn_list.contains(&effective_arn.to_string()) { + return Err(ParseConfigError::ArnNotFound(effective_arn.to_string())); } Ok(()) } @@ -266,7 +266,7 @@ impl NotificationConfiguration { queue_config.validate(current_region, arn_list)?; let queue_key = ( queue_config.id.clone(), - queue_config.arn.to_arn_string(), // Assuming that the ARN structure implements Display or ToString + queue_config.arn.to_string(), // Assuming that the ARN structure implements Display or ToString ); if !unique_queues.insert(queue_key.clone()) { return Err(ParseConfigError::DuplicateQueueConfiguration(queue_key.0, queue_key.1)); diff --git a/crates/targets/src/arn.rs b/crates/targets/src/arn.rs index 4b029782..a2ef3528 100644 --- a/crates/targets/src/arn.rs +++ b/crates/targets/src/arn.rs @@ -212,7 +212,7 @@ impl Serialize for ARN { where S: Serializer, { - serializer.serialize_str(&self.to_arn_string()) + serializer.serialize_str(&self.to_string()) } } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 1a95efe7..c75c4083 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -25,6 +25,7 @@ mod version; // Ensure the correct path for parse_license is imported use crate::server::{SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, start_http_server, wait_for_shutdown}; +use crate::storage::ecfs::{process_lambda_configurations, process_queue_configurations, process_topic_configurations}; use chrono::Datelike; use clap::Parser; use license::init_license; @@ -34,6 +35,7 @@ use rustfs_ahm::{ }; use rustfs_common::globals::set_global_addr; use rustfs_config::DEFAULT_DELIMITER; +use rustfs_ecstore::bucket::metadata_sys; use rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys; use rustfs_ecstore::cmd::bucket_replication::init_bucket_replication_pool; use rustfs_ecstore::config as ecconfig; @@ -51,9 +53,13 @@ use rustfs_ecstore::{ update_erasure_type, }; use rustfs_iam::init_iam_sys; +use rustfs_notify::global::notifier_instance; use rustfs_obs::{init_obs, set_global_guard}; +use rustfs_targets::arn::TargetID; use rustfs_utils::net::parse_and_resolve_address; +use s3s::s3_error; use std::io::{Error, Result}; +use std::str::FromStr; use std::sync::Arc; use tracing::{debug, error, info, instrument, warn}; @@ -94,7 +100,6 @@ async fn main() -> Result<()> { #[instrument(skip(opt))] async fn run(opt: config::Opt) -> Result<()> { debug!("opt: {:?}", &opt); - if let Some(region) = &opt.region { rustfs_ecstore::global::set_global_region(region.clone()); } @@ -172,26 +177,36 @@ async fn run(opt: config::Opt) -> Result<()> { .await .map_err(Error::other)?; - let buckets = buckets_list.into_iter().map(|v| v.name).collect(); + // Collect bucket names into a vector + let buckets: Vec = buckets_list.into_iter().map(|v| v.name).collect(); - init_bucket_metadata_sys(store.clone(), buckets).await; + // Initialize the bucket metadata system + init_bucket_metadata_sys(store.clone(), buckets.clone()).await; + // Initialize the IAM system init_iam_sys(store.clone()).await?; + // add bucket notification configuration + add_bucket_notification_configuration(buckets).await; + + // Initialize the global notification system new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| { error!("new_global_notification_sys failed {:?}", &err); Error::other(err) })?; + // Create a cancellation token for AHM services let _ = create_ahm_services_cancel_token(); // Initialize heal manager with channel processor let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone())); let heal_manager = init_heal_manager(heal_storage, None).await?; - let scanner = Scanner::new(Some(ScannerConfig::default()), Some(heal_manager)); scanner.start().await?; + + // print server info print_server_info(); + // initialize bucket replication pool init_bucket_replication_pool().await; // Async update check (optional) @@ -273,7 +288,7 @@ async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &toki } #[instrument] -pub(crate) async fn init_event_notifier() { +async fn init_event_notifier() { info!("Initializing event notifier..."); // 1. Get the global configuration loaded by ecstore @@ -311,8 +326,48 @@ pub(crate) async fn init_event_notifier() { }); } +#[instrument(skip_all)] +async fn add_bucket_notification_configuration(buckets: Vec) { + let region_opt = rustfs_ecstore::global::get_global_region(); + let region = match region_opt { + Some(ref r) if !r.is_empty() => r, + _ => { + warn!("Global region is not set; attempting notification configuration for all buckets with an empty region."); + "" + } + }; + for bucket in buckets.iter() { + let has_notification_config = metadata_sys::get_notification_config(bucket).await.unwrap_or_else(|err| { + warn!("get_notification_config err {:?}", err); + None + }); + + match has_notification_config { + Some(cfg) => { + info!("Bucket '{}' has existing notification configuration: {:?}", bucket, cfg); + + let mut event_rules = Vec::new(); + process_queue_configurations(&mut event_rules, cfg.queue_configurations.clone(), TargetID::from_str); + process_topic_configurations(&mut event_rules, cfg.topic_configurations.clone(), TargetID::from_str); + process_lambda_configurations(&mut event_rules, cfg.lambda_function_configurations.clone(), TargetID::from_str); + + if let Err(e) = notifier_instance() + .add_event_specific_rules(bucket, region, &event_rules) + .await + .map_err(|e| s3_error!(InternalError, "Failed to add rules: {e}")) + { + error!("Failed to add rules for bucket '{}': {:?}", bucket, e); + } + } + None => { + info!("Bucket '{}' has no existing notification configuration.", bucket); + } + } + } +} + /// Shuts down the event notifier system gracefully -pub async fn shutdown_event_notifier() { +async fn shutdown_event_notifier() { info!("Shutting down event notifier system..."); if !rustfs_notify::is_notification_system_initialized() { diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 8575e0ef..1e416f4a 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -2772,13 +2772,10 @@ impl S3 for FS { .await .map_err(ApiError::from)?; - let has_notification_config = match metadata_sys::get_notification_config(&bucket).await { - Ok(cfg) => cfg, - Err(err) => { - warn!("get_notification_config err {:?}", err); - None - } - }; + let has_notification_config = metadata_sys::get_notification_config(&bucket).await.unwrap_or_else(|err| { + warn!("get_notification_config err {:?}", err); + None + }); // TODO: valid target list @@ -3434,7 +3431,7 @@ fn extract_prefix_suffix(filter: Option<&NotificationConfigurationFilter>) -> (S } /// Auxiliary functions: Handle configuration -fn process_queue_configurations( +pub(crate) fn process_queue_configurations( event_rules: &mut Vec<(Vec, String, String, Vec)>, configurations: Option>, target_id_parser: F, @@ -3451,7 +3448,7 @@ fn process_queue_configurations( } } -fn process_topic_configurations( +pub(crate) fn process_topic_configurations( event_rules: &mut Vec<(Vec, String, String, Vec)>, configurations: Option>, target_id_parser: F, @@ -3468,7 +3465,7 @@ fn process_topic_configurations( } } -fn process_lambda_configurations( +pub(crate) fn process_lambda_configurations( event_rules: &mut Vec<(Vec, String, String, Vec)>, configurations: Option>, target_id_parser: F,