mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
add bucket notification configuration (#502)
This commit is contained in:
@@ -69,7 +69,7 @@ impl EventNotifier {
|
||||
target_list_guard
|
||||
.keys()
|
||||
.iter()
|
||||
.map(|target_id| target_id.to_arn(region).to_arn_string())
|
||||
.map(|target_id| target_id.to_arn(region).to_string())
|
||||
.collect()
|
||||
}
|
||||
|
||||
|
||||
@@ -101,8 +101,8 @@ impl BucketNotificationConfig {
|
||||
for target_id in target_id_set {
|
||||
// Construct the ARN string for this target_id and self.region
|
||||
let arn_to_check = target_id.to_arn(&self.region); // Assuming TargetID has to_arn
|
||||
if !arn_list.contains(&arn_to_check.to_arn_string()) {
|
||||
return Err(BucketNotificationConfigError::ArnNotFound(arn_to_check.to_arn_string()));
|
||||
if !arn_list.contains(&arn_to_check.to_string()) {
|
||||
return Err(BucketNotificationConfigError::ArnNotFound(arn_to_check.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,7 +168,7 @@ impl QueueConfig {
|
||||
// Validate ARN (similar to Go's Queue.Validate)
|
||||
// The Go code checks targetList.Exists(q.ARN.TargetID)
|
||||
// Here we check against a provided arn_list
|
||||
let _config_arn_str = self.arn.to_arn_string();
|
||||
let _config_arn_str = self.arn.to_string();
|
||||
if !self.arn.region.is_empty() && self.arn.region != region {
|
||||
return Err(ParseConfigError::UnknownRegion(self.arn.region.clone()));
|
||||
}
|
||||
@@ -187,8 +187,8 @@ impl QueueConfig {
|
||||
partition: self.arn.partition.clone(), // or default "rustfs"
|
||||
};
|
||||
|
||||
if !arn_list.contains(&effective_arn.to_arn_string()) {
|
||||
return Err(ParseConfigError::ArnNotFound(effective_arn.to_arn_string()));
|
||||
if !arn_list.contains(&effective_arn.to_string()) {
|
||||
return Err(ParseConfigError::ArnNotFound(effective_arn.to_string()));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -266,7 +266,7 @@ impl NotificationConfiguration {
|
||||
queue_config.validate(current_region, arn_list)?;
|
||||
let queue_key = (
|
||||
queue_config.id.clone(),
|
||||
queue_config.arn.to_arn_string(), // Assuming that the ARN structure implements Display or ToString
|
||||
queue_config.arn.to_string(), // Assuming that the ARN structure implements Display or ToString
|
||||
);
|
||||
if !unique_queues.insert(queue_key.clone()) {
|
||||
return Err(ParseConfigError::DuplicateQueueConfiguration(queue_key.0, queue_key.1));
|
||||
|
||||
@@ -212,7 +212,7 @@ impl Serialize for ARN {
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
serializer.serialize_str(&self.to_arn_string())
|
||||
serializer.serialize_str(&self.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ mod version;
|
||||
|
||||
// Ensure the correct path for parse_license is imported
|
||||
use crate::server::{SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, start_http_server, wait_for_shutdown};
|
||||
use crate::storage::ecfs::{process_lambda_configurations, process_queue_configurations, process_topic_configurations};
|
||||
use chrono::Datelike;
|
||||
use clap::Parser;
|
||||
use license::init_license;
|
||||
@@ -34,6 +35,7 @@ use rustfs_ahm::{
|
||||
};
|
||||
use rustfs_common::globals::set_global_addr;
|
||||
use rustfs_config::DEFAULT_DELIMITER;
|
||||
use rustfs_ecstore::bucket::metadata_sys;
|
||||
use rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
|
||||
use rustfs_ecstore::cmd::bucket_replication::init_bucket_replication_pool;
|
||||
use rustfs_ecstore::config as ecconfig;
|
||||
@@ -51,9 +53,13 @@ use rustfs_ecstore::{
|
||||
update_erasure_type,
|
||||
};
|
||||
use rustfs_iam::init_iam_sys;
|
||||
use rustfs_notify::global::notifier_instance;
|
||||
use rustfs_obs::{init_obs, set_global_guard};
|
||||
use rustfs_targets::arn::TargetID;
|
||||
use rustfs_utils::net::parse_and_resolve_address;
|
||||
use s3s::s3_error;
|
||||
use std::io::{Error, Result};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
|
||||
@@ -94,7 +100,6 @@ async fn main() -> Result<()> {
|
||||
#[instrument(skip(opt))]
|
||||
async fn run(opt: config::Opt) -> Result<()> {
|
||||
debug!("opt: {:?}", &opt);
|
||||
|
||||
if let Some(region) = &opt.region {
|
||||
rustfs_ecstore::global::set_global_region(region.clone());
|
||||
}
|
||||
@@ -172,26 +177,36 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
.await
|
||||
.map_err(Error::other)?;
|
||||
|
||||
let buckets = buckets_list.into_iter().map(|v| v.name).collect();
|
||||
// Collect bucket names into a vector
|
||||
let buckets: Vec<String> = buckets_list.into_iter().map(|v| v.name).collect();
|
||||
|
||||
init_bucket_metadata_sys(store.clone(), buckets).await;
|
||||
// Initialize the bucket metadata system
|
||||
init_bucket_metadata_sys(store.clone(), buckets.clone()).await;
|
||||
|
||||
// Initialize the IAM system
|
||||
init_iam_sys(store.clone()).await?;
|
||||
|
||||
// add bucket notification configuration
|
||||
add_bucket_notification_configuration(buckets).await;
|
||||
|
||||
// Initialize the global notification system
|
||||
new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| {
|
||||
error!("new_global_notification_sys failed {:?}", &err);
|
||||
Error::other(err)
|
||||
})?;
|
||||
|
||||
// Create a cancellation token for AHM services
|
||||
let _ = create_ahm_services_cancel_token();
|
||||
|
||||
// Initialize heal manager with channel processor
|
||||
let heal_storage = Arc::new(ECStoreHealStorage::new(store.clone()));
|
||||
let heal_manager = init_heal_manager(heal_storage, None).await?;
|
||||
|
||||
let scanner = Scanner::new(Some(ScannerConfig::default()), Some(heal_manager));
|
||||
scanner.start().await?;
|
||||
|
||||
// print server info
|
||||
print_server_info();
|
||||
// initialize bucket replication pool
|
||||
init_bucket_replication_pool().await;
|
||||
|
||||
// Async update check (optional)
|
||||
@@ -273,7 +288,7 @@ async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &toki
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
pub(crate) async fn init_event_notifier() {
|
||||
async fn init_event_notifier() {
|
||||
info!("Initializing event notifier...");
|
||||
|
||||
// 1. Get the global configuration loaded by ecstore
|
||||
@@ -311,8 +326,48 @@ pub(crate) async fn init_event_notifier() {
|
||||
});
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn add_bucket_notification_configuration(buckets: Vec<String>) {
|
||||
let region_opt = rustfs_ecstore::global::get_global_region();
|
||||
let region = match region_opt {
|
||||
Some(ref r) if !r.is_empty() => r,
|
||||
_ => {
|
||||
warn!("Global region is not set; attempting notification configuration for all buckets with an empty region.");
|
||||
""
|
||||
}
|
||||
};
|
||||
for bucket in buckets.iter() {
|
||||
let has_notification_config = metadata_sys::get_notification_config(bucket).await.unwrap_or_else(|err| {
|
||||
warn!("get_notification_config err {:?}", err);
|
||||
None
|
||||
});
|
||||
|
||||
match has_notification_config {
|
||||
Some(cfg) => {
|
||||
info!("Bucket '{}' has existing notification configuration: {:?}", bucket, cfg);
|
||||
|
||||
let mut event_rules = Vec::new();
|
||||
process_queue_configurations(&mut event_rules, cfg.queue_configurations.clone(), TargetID::from_str);
|
||||
process_topic_configurations(&mut event_rules, cfg.topic_configurations.clone(), TargetID::from_str);
|
||||
process_lambda_configurations(&mut event_rules, cfg.lambda_function_configurations.clone(), TargetID::from_str);
|
||||
|
||||
if let Err(e) = notifier_instance()
|
||||
.add_event_specific_rules(bucket, region, &event_rules)
|
||||
.await
|
||||
.map_err(|e| s3_error!(InternalError, "Failed to add rules: {e}"))
|
||||
{
|
||||
error!("Failed to add rules for bucket '{}': {:?}", bucket, e);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
info!("Bucket '{}' has no existing notification configuration.", bucket);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Shuts down the event notifier system gracefully
|
||||
pub async fn shutdown_event_notifier() {
|
||||
async fn shutdown_event_notifier() {
|
||||
info!("Shutting down event notifier system...");
|
||||
|
||||
if !rustfs_notify::is_notification_system_initialized() {
|
||||
|
||||
@@ -2772,13 +2772,10 @@ impl S3 for FS {
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
let has_notification_config = match metadata_sys::get_notification_config(&bucket).await {
|
||||
Ok(cfg) => cfg,
|
||||
Err(err) => {
|
||||
warn!("get_notification_config err {:?}", err);
|
||||
None
|
||||
}
|
||||
};
|
||||
let has_notification_config = metadata_sys::get_notification_config(&bucket).await.unwrap_or_else(|err| {
|
||||
warn!("get_notification_config err {:?}", err);
|
||||
None
|
||||
});
|
||||
|
||||
// TODO: valid target list
|
||||
|
||||
@@ -3434,7 +3431,7 @@ fn extract_prefix_suffix(filter: Option<&NotificationConfigurationFilter>) -> (S
|
||||
}
|
||||
|
||||
/// Auxiliary functions: Handle configuration
|
||||
fn process_queue_configurations<F>(
|
||||
pub(crate) fn process_queue_configurations<F>(
|
||||
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
|
||||
configurations: Option<Vec<QueueConfiguration>>,
|
||||
target_id_parser: F,
|
||||
@@ -3451,7 +3448,7 @@ fn process_queue_configurations<F>(
|
||||
}
|
||||
}
|
||||
|
||||
fn process_topic_configurations<F>(
|
||||
pub(crate) fn process_topic_configurations<F>(
|
||||
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
|
||||
configurations: Option<Vec<TopicConfiguration>>,
|
||||
target_id_parser: F,
|
||||
@@ -3468,7 +3465,7 @@ fn process_topic_configurations<F>(
|
||||
}
|
||||
}
|
||||
|
||||
fn process_lambda_configurations<F>(
|
||||
pub(crate) fn process_lambda_configurations<F>(
|
||||
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
|
||||
configurations: Option<Vec<LambdaFunctionConfiguration>>,
|
||||
target_id_parser: F,
|
||||
|
||||
Reference in New Issue
Block a user