fix: return error instead of silently ignoring invalid ARNs in notification config (#1528)

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Audric
2026-01-16 16:12:55 +08:00
committed by GitHub
parent ed4329d50c
commit 548a39ffe7
3 changed files with 147 additions and 26 deletions

View File

@@ -129,6 +129,8 @@ impl ARN {
}
/// Parsing ARN from string
/// Only accepts ARNs with the RustFS prefix: "arn:rustfs:sqs:"
/// Format: arn:rustfs:sqs:{region}:{id}:{name}
pub fn parse(s: &str) -> Result<Self, TargetError> {
if !s.starts_with(ARN_PREFIX) {
return Err(TargetError::InvalidARN(s.to_string()));

View File

@@ -116,21 +116,29 @@ pub(crate) async fn add_bucket_notification_configuration(buckets: Vec<String>)
"Bucket '{}' has existing notification configuration: {:?}", bucket, cfg);
let mut event_rules = Vec::new();
process_queue_configurations(&mut event_rules, cfg.queue_configurations.clone(), |arn_str| {
if let Err(e) = process_queue_configurations(&mut event_rules, cfg.queue_configurations.clone(), |arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
});
process_topic_configurations(&mut event_rules, cfg.topic_configurations.clone(), |arn_str| {
}) {
error!("Failed to parse queue notification config for bucket '{}': {:?}", bucket, e);
}
if let Err(e) = process_topic_configurations(&mut event_rules, cfg.topic_configurations.clone(), |arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
});
process_lambda_configurations(&mut event_rules, cfg.lambda_function_configurations.clone(), |arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
});
}) {
error!("Failed to parse topic notification config for bucket '{}': {:?}", bucket, e);
}
if let Err(e) =
process_lambda_configurations(&mut event_rules, cfg.lambda_function_configurations.clone(), |arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
})
{
error!("Failed to parse lambda notification config for bucket '{}': {:?}", bucket, e);
}
if let Err(e) = notifier_global::add_event_specific_rules(bucket, region, &event_rules)
.await

View File

@@ -5820,12 +5820,12 @@ impl S3 for FS {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
});
})?;
process_topic_configurations(&mut event_rules, notification_configuration.topic_configurations.clone(), |arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
});
})?;
process_lambda_configurations(
&mut event_rules,
notification_configuration.lambda_function_configurations.clone(),
@@ -5834,14 +5834,16 @@ impl S3 for FS {
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
},
);
)?;
event_rules
Ok::<_, TargetIDError>(event_rules)
};
let (clear_result, event_rules) = tokio::join!(clear_rules, parse_rules);
let (clear_result, event_rules_result) = tokio::join!(clear_rules, parse_rules);
clear_result.map_err(|e| s3_error!(InternalError, "Failed to clear rules: {e}"))?;
let event_rules =
event_rules_result.map_err(|e| s3_error!(InvalidArgument, "Invalid ARN in notification configuration: {e}"))?;
warn!("notify event rules: {:?}", &event_rules);
// Add a new notification rule
@@ -6357,54 +6359,57 @@ pub(crate) fn process_queue_configurations<F>(
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
configurations: Option<Vec<QueueConfiguration>>,
target_id_parser: F,
) where
) -> Result<(), TargetIDError>
where
F: Fn(&str) -> Result<TargetID, TargetIDError>,
{
if let Some(configs) = configurations {
for cfg in configs {
let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect();
let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref());
let target_ids = vec![target_id_parser(&cfg.queue_arn).ok()].into_iter().flatten().collect();
event_rules.push((events, prefix, suffix, target_ids));
let target_id = target_id_parser(&cfg.queue_arn)?;
event_rules.push((events, prefix, suffix, vec![target_id]));
}
}
Ok(())
}
pub(crate) fn process_topic_configurations<F>(
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
configurations: Option<Vec<TopicConfiguration>>,
target_id_parser: F,
) where
) -> Result<(), TargetIDError>
where
F: Fn(&str) -> Result<TargetID, TargetIDError>,
{
if let Some(configs) = configurations {
for cfg in configs {
let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect();
let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref());
let target_ids = vec![target_id_parser(&cfg.topic_arn).ok()].into_iter().flatten().collect();
event_rules.push((events, prefix, suffix, target_ids));
let target_id = target_id_parser(&cfg.topic_arn)?;
event_rules.push((events, prefix, suffix, vec![target_id]));
}
}
Ok(())
}
pub(crate) fn process_lambda_configurations<F>(
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
configurations: Option<Vec<LambdaFunctionConfiguration>>,
target_id_parser: F,
) where
) -> Result<(), TargetIDError>
where
F: Fn(&str) -> Result<TargetID, TargetIDError>,
{
if let Some(configs) = configurations {
for cfg in configs {
let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect();
let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref());
let target_ids = vec![target_id_parser(&cfg.lambda_function_arn).ok()]
.into_iter()
.flatten()
.collect();
event_rules.push((events, prefix, suffix, target_ids));
let target_id = target_id_parser(&cfg.lambda_function_arn)?;
event_rules.push((events, prefix, suffix, vec![target_id]));
}
}
Ok(())
}
pub(crate) async fn has_replication_rules(bucket: &str, objects: &[ObjectToDelete]) -> bool {
@@ -7064,4 +7069,110 @@ mod tests {
// and "https://example.sub.com" matches because it starts with "https://example." and ends with ".com"
// This is acceptable for our use case as S3 CORS typically uses "https://*.example.com" format
}
// === Notification Configuration Error Propagation Tests ===
#[test]
fn test_process_queue_configurations_propagates_error_on_invalid_arn() {
use rustfs_targets::arn::{ARN, TargetIDError};
let mut event_rules = Vec::new();
let invalid_arn = "arn:minio:sqs::1:webhook"; // Wrong prefix, should fail
let result = process_queue_configurations(
&mut event_rules,
Some(vec![s3s::dto::QueueConfiguration {
events: vec!["s3:ObjectCreated:*".to_string().into()],
queue_arn: invalid_arn.to_string(),
filter: None,
id: None,
}]),
|arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
},
);
assert!(result.is_err(), "Should return error for invalid ARN prefix");
assert!(event_rules.is_empty(), "Should not add rules when ARN is invalid");
}
#[test]
fn test_process_topic_configurations_propagates_error_on_invalid_arn() {
use rustfs_targets::arn::{ARN, TargetIDError};
let mut event_rules = Vec::new();
let invalid_arn = "arn:aws:sns:us-east-1:123:topic"; // Wrong prefix, should fail
let result = process_topic_configurations(
&mut event_rules,
Some(vec![s3s::dto::TopicConfiguration {
events: vec!["s3:ObjectCreated:*".to_string().into()],
topic_arn: invalid_arn.to_string(),
filter: None,
id: None,
}]),
|arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
},
);
assert!(result.is_err(), "Should return error for invalid ARN prefix");
assert!(event_rules.is_empty(), "Should not add rules when ARN is invalid");
}
#[test]
fn test_process_lambda_configurations_propagates_error_on_invalid_arn() {
use rustfs_targets::arn::{ARN, TargetIDError};
let mut event_rules = Vec::new();
let invalid_arn = "arn:aws:lambda:us-east-1:123:function"; // Wrong prefix, should fail
let result = process_lambda_configurations(
&mut event_rules,
Some(vec![s3s::dto::LambdaFunctionConfiguration {
events: vec!["s3:ObjectCreated:*".to_string().into()],
lambda_function_arn: invalid_arn.to_string(),
filter: None,
id: None,
}]),
|arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
},
);
assert!(result.is_err(), "Should return error for invalid ARN prefix");
assert!(event_rules.is_empty(), "Should not add rules when ARN is invalid");
}
#[test]
fn test_process_queue_configurations_succeeds_with_valid_arn() {
use rustfs_targets::arn::{ARN, TargetIDError};
let mut event_rules = Vec::new();
let valid_arn = "arn:rustfs:sqs:us-east-1:1:webhook"; // Correct prefix
let result = process_queue_configurations(
&mut event_rules,
Some(vec![s3s::dto::QueueConfiguration {
events: vec!["s3:ObjectCreated:*".to_string().into()],
queue_arn: valid_arn.to_string(),
filter: None,
id: None,
}]),
|arn_str| {
ARN::parse(arn_str)
.map(|arn| arn.target_id)
.map_err(|e| TargetIDError::InvalidFormat(e.to_string()))
},
);
assert!(result.is_ok(), "Should succeed with valid ARN");
assert_eq!(event_rules.len(), 1, "Should add one rule");
}
}