diff --git a/crates/targets/src/arn.rs b/crates/targets/src/arn.rs index 1853ad9b..b695891e 100644 --- a/crates/targets/src/arn.rs +++ b/crates/targets/src/arn.rs @@ -129,6 +129,8 @@ impl ARN { } /// Parsing ARN from string + /// Only accepts ARNs with the RustFS prefix: "arn:rustfs:sqs:" + /// Format: arn:rustfs:sqs:{region}:{id}:{name} pub fn parse(s: &str) -> Result { if !s.starts_with(ARN_PREFIX) { return Err(TargetError::InvalidARN(s.to_string())); diff --git a/rustfs/src/init.rs b/rustfs/src/init.rs index 66a016b9..91aba513 100644 --- a/rustfs/src/init.rs +++ b/rustfs/src/init.rs @@ -116,21 +116,29 @@ pub(crate) async fn add_bucket_notification_configuration(buckets: Vec) "Bucket '{}' has existing notification configuration: {:?}", bucket, cfg); let mut event_rules = Vec::new(); - process_queue_configurations(&mut event_rules, cfg.queue_configurations.clone(), |arn_str| { + if let Err(e) = process_queue_configurations(&mut event_rules, cfg.queue_configurations.clone(), |arn_str| { ARN::parse(arn_str) .map(|arn| arn.target_id) .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) - }); - process_topic_configurations(&mut event_rules, cfg.topic_configurations.clone(), |arn_str| { + }) { + error!("Failed to parse queue notification config for bucket '{}': {:?}", bucket, e); + } + if let Err(e) = process_topic_configurations(&mut event_rules, cfg.topic_configurations.clone(), |arn_str| { ARN::parse(arn_str) .map(|arn| arn.target_id) .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) - }); - process_lambda_configurations(&mut event_rules, cfg.lambda_function_configurations.clone(), |arn_str| { - ARN::parse(arn_str) - .map(|arn| arn.target_id) - .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) - }); + }) { + error!("Failed to parse topic notification config for bucket '{}': {:?}", bucket, e); + } + if let Err(e) = + process_lambda_configurations(&mut event_rules, cfg.lambda_function_configurations.clone(), |arn_str| { + ARN::parse(arn_str) + .map(|arn| arn.target_id) + .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) + }) + { + error!("Failed to parse lambda notification config for bucket '{}': {:?}", bucket, e); + } if let Err(e) = notifier_global::add_event_specific_rules(bucket, region, &event_rules) .await diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 9f584d60..d73de5e2 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -5820,12 +5820,12 @@ impl S3 for FS { ARN::parse(arn_str) .map(|arn| arn.target_id) .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) - }); + })?; process_topic_configurations(&mut event_rules, notification_configuration.topic_configurations.clone(), |arn_str| { ARN::parse(arn_str) .map(|arn| arn.target_id) .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) - }); + })?; process_lambda_configurations( &mut event_rules, notification_configuration.lambda_function_configurations.clone(), @@ -5834,14 +5834,16 @@ impl S3 for FS { .map(|arn| arn.target_id) .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) }, - ); + )?; - event_rules + Ok::<_, TargetIDError>(event_rules) }; - let (clear_result, event_rules) = tokio::join!(clear_rules, parse_rules); + let (clear_result, event_rules_result) = tokio::join!(clear_rules, parse_rules); clear_result.map_err(|e| s3_error!(InternalError, "Failed to clear rules: {e}"))?; + let event_rules = + event_rules_result.map_err(|e| s3_error!(InvalidArgument, "Invalid ARN in notification configuration: {e}"))?; warn!("notify event rules: {:?}", &event_rules); // Add a new notification rule @@ -6357,54 +6359,57 @@ pub(crate) fn process_queue_configurations( event_rules: &mut Vec<(Vec, String, String, Vec)>, configurations: Option>, target_id_parser: F, -) where +) -> Result<(), TargetIDError> +where F: Fn(&str) -> Result, { if let Some(configs) = configurations { for cfg in configs { let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect(); let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref()); - let target_ids = vec![target_id_parser(&cfg.queue_arn).ok()].into_iter().flatten().collect(); - event_rules.push((events, prefix, suffix, target_ids)); + let target_id = target_id_parser(&cfg.queue_arn)?; + event_rules.push((events, prefix, suffix, vec![target_id])); } } + Ok(()) } pub(crate) fn process_topic_configurations( event_rules: &mut Vec<(Vec, String, String, Vec)>, configurations: Option>, target_id_parser: F, -) where +) -> Result<(), TargetIDError> +where F: Fn(&str) -> Result, { if let Some(configs) = configurations { for cfg in configs { let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect(); let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref()); - let target_ids = vec![target_id_parser(&cfg.topic_arn).ok()].into_iter().flatten().collect(); - event_rules.push((events, prefix, suffix, target_ids)); + let target_id = target_id_parser(&cfg.topic_arn)?; + event_rules.push((events, prefix, suffix, vec![target_id])); } } + Ok(()) } pub(crate) fn process_lambda_configurations( event_rules: &mut Vec<(Vec, String, String, Vec)>, configurations: Option>, target_id_parser: F, -) where +) -> Result<(), TargetIDError> +where F: Fn(&str) -> Result, { if let Some(configs) = configurations { for cfg in configs { let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect(); let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref()); - let target_ids = vec![target_id_parser(&cfg.lambda_function_arn).ok()] - .into_iter() - .flatten() - .collect(); - event_rules.push((events, prefix, suffix, target_ids)); + let target_id = target_id_parser(&cfg.lambda_function_arn)?; + event_rules.push((events, prefix, suffix, vec![target_id])); } } + Ok(()) } pub(crate) async fn has_replication_rules(bucket: &str, objects: &[ObjectToDelete]) -> bool { @@ -7064,4 +7069,110 @@ mod tests { // and "https://example.sub.com" matches because it starts with "https://example." and ends with ".com" // This is acceptable for our use case as S3 CORS typically uses "https://*.example.com" format } + + // === Notification Configuration Error Propagation Tests === + + #[test] + fn test_process_queue_configurations_propagates_error_on_invalid_arn() { + use rustfs_targets::arn::{ARN, TargetIDError}; + + let mut event_rules = Vec::new(); + let invalid_arn = "arn:minio:sqs::1:webhook"; // Wrong prefix, should fail + + let result = process_queue_configurations( + &mut event_rules, + Some(vec![s3s::dto::QueueConfiguration { + events: vec!["s3:ObjectCreated:*".to_string().into()], + queue_arn: invalid_arn.to_string(), + filter: None, + id: None, + }]), + |arn_str| { + ARN::parse(arn_str) + .map(|arn| arn.target_id) + .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) + }, + ); + + assert!(result.is_err(), "Should return error for invalid ARN prefix"); + assert!(event_rules.is_empty(), "Should not add rules when ARN is invalid"); + } + + #[test] + fn test_process_topic_configurations_propagates_error_on_invalid_arn() { + use rustfs_targets::arn::{ARN, TargetIDError}; + + let mut event_rules = Vec::new(); + let invalid_arn = "arn:aws:sns:us-east-1:123:topic"; // Wrong prefix, should fail + + let result = process_topic_configurations( + &mut event_rules, + Some(vec![s3s::dto::TopicConfiguration { + events: vec!["s3:ObjectCreated:*".to_string().into()], + topic_arn: invalid_arn.to_string(), + filter: None, + id: None, + }]), + |arn_str| { + ARN::parse(arn_str) + .map(|arn| arn.target_id) + .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) + }, + ); + + assert!(result.is_err(), "Should return error for invalid ARN prefix"); + assert!(event_rules.is_empty(), "Should not add rules when ARN is invalid"); + } + + #[test] + fn test_process_lambda_configurations_propagates_error_on_invalid_arn() { + use rustfs_targets::arn::{ARN, TargetIDError}; + + let mut event_rules = Vec::new(); + let invalid_arn = "arn:aws:lambda:us-east-1:123:function"; // Wrong prefix, should fail + + let result = process_lambda_configurations( + &mut event_rules, + Some(vec![s3s::dto::LambdaFunctionConfiguration { + events: vec!["s3:ObjectCreated:*".to_string().into()], + lambda_function_arn: invalid_arn.to_string(), + filter: None, + id: None, + }]), + |arn_str| { + ARN::parse(arn_str) + .map(|arn| arn.target_id) + .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) + }, + ); + + assert!(result.is_err(), "Should return error for invalid ARN prefix"); + assert!(event_rules.is_empty(), "Should not add rules when ARN is invalid"); + } + + #[test] + fn test_process_queue_configurations_succeeds_with_valid_arn() { + use rustfs_targets::arn::{ARN, TargetIDError}; + + let mut event_rules = Vec::new(); + let valid_arn = "arn:rustfs:sqs:us-east-1:1:webhook"; // Correct prefix + + let result = process_queue_configurations( + &mut event_rules, + Some(vec![s3s::dto::QueueConfiguration { + events: vec!["s3:ObjectCreated:*".to_string().into()], + queue_arn: valid_arn.to_string(), + filter: None, + id: None, + }]), + |arn_str| { + ARN::parse(arn_str) + .map(|arn| arn.target_id) + .map_err(|e| TargetIDError::InvalidFormat(e.to_string())) + }, + ); + + assert!(result.is_ok(), "Should succeed with valid ARN"); + assert_eq!(event_rules.len(), 1, "Should add one rule"); + } }