From 27480f76252df251d1a12df64fd24342347dccf7 Mon Sep 17 00:00:00 2001 From: houseme Date: Wed, 14 Jan 2026 14:18:02 +0800 Subject: [PATCH] Refactor Event Admin Handlers and Parallelize Target Status Probes (#1501) --- crates/notify/src/integration.rs | 17 ++ crates/notify/src/notifier.rs | 5 + rustfs/src/admin/handlers/event.rs | 386 +++++++++++------------------ 3 files changed, 165 insertions(+), 243 deletions(-) diff --git a/crates/notify/src/integration.rs b/crates/notify/src/integration.rs index ddce7560..43da7649 100644 --- a/crates/notify/src/integration.rs +++ b/crates/notify/src/integration.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::notification_system_subscriber::NotificationSystemSubscriberView; +use crate::notifier::TargetList; use crate::{ Event, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, }; @@ -191,6 +192,22 @@ impl NotificationSystem { self.notifier.target_list().read().await.keys() } + /// Gets the complete Target list, including both active and inactive Targets. + /// + /// # Return + /// An `Arc>` containing all Targets. + pub async fn get_all_targets(&self) -> Arc> { + self.notifier.target_list() + } + + /// Gets all Target values, including both active and inactive Targets. + /// + /// # Return + /// A Vec containing all Targets. + pub async fn get_target_values(&self) -> Vec + Send + Sync>> { + self.notifier.target_list().read().await.values() + } + /// Checks if there are active subscribers for the given bucket and event name. pub async fn has_subscriber(&self, bucket: &str, event: &EventName) -> bool { if !self.subscriber_view.has_subscriber(bucket, event) { diff --git a/crates/notify/src/notifier.rs b/crates/notify/src/notifier.rs index a5c8dd6e..859e64b9 100644 --- a/crates/notify/src/notifier.rs +++ b/crates/notify/src/notifier.rs @@ -370,6 +370,11 @@ impl TargetList { self.targets.keys().cloned().collect() } + /// Returns all targets in the list + pub fn values(&self) -> Vec + Send + Sync>> { + self.targets.values().cloned().collect() + } + /// Returns the number of targets pub fn len(&self) -> usize { self.targets.len() diff --git a/rustfs/src/admin/handlers/event.rs b/rustfs/src/admin/handlers/event.rs index a8b93227..eca0ad00 100644 --- a/rustfs/src/admin/handlers/event.rs +++ b/rustfs/src/admin/handlers/event.rs @@ -14,21 +14,24 @@ use crate::admin::router::Operation; use crate::auth::{check_key_valid, get_session_token}; +use futures::stream::{FuturesUnordered, StreamExt}; use http::{HeaderMap, StatusCode}; use matchit::Params; use rustfs_config::notify::{NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS}; use rustfs_config::{ENABLE_KEY, EnableState, MAX_ADMIN_REQUEST_BODY_SIZE}; use rustfs_targets::check_mqtt_broker_available; -use s3s::header::CONTENT_LENGTH; -use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error}; +use s3s::{Body, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error}; use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::io::{Error, ErrorKind}; use std::net::SocketAddr; use std::path::Path; +use std::sync::Arc; use tokio::net::lookup_host; -use tokio::time::{Duration, sleep}; -use tracing::{Span, debug, error, info, warn}; +use tokio::sync::Semaphore; +use tokio::time::{Duration, sleep, timeout}; +use tracing::{Span, info, warn}; use url::Url; #[derive(Debug, Deserialize)] @@ -54,12 +57,34 @@ struct NotificationEndpointsResponse { notification_endpoints: Vec, } +// --- Helper Functions --- + +async fn check_permissions(req: &S3Request) -> S3Result<()> { + let Some(input_cred) = &req.credentials else { + return Err(s3_error!(InvalidRequest, "credentials not found")); + }; + check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; + Ok(()) +} + +fn get_notification_system() -> S3Result> { + rustfs_notify::notification_system().ok_or_else(|| s3_error!(InternalError, "notification system not initialized")) +} + +fn build_response(status: StatusCode, body: Body, request_id: Option<&http::HeaderValue>) -> S3Response<(StatusCode, Body)> { + let mut header = HeaderMap::new(); + header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + if let Some(v) = request_id { + header.insert("x-request-id", v.clone()); + } + S3Response::with_headers((status, body), header) +} + async fn retry_with_backoff(mut operation: F, max_attempts: usize, base_delay: Duration) -> Result where F: FnMut() -> Fut, Fut: Future>, { - assert!(max_attempts > 0, "max_attempts must be greater than 0"); let mut attempts = 0; let mut delay = base_delay; let mut last_err = None; @@ -71,13 +96,6 @@ where last_err = Some(e); attempts += 1; if attempts < max_attempts { - warn!( - "Retry attempt {}/{} failed: {}. Retrying in {:?}", - attempts, - max_attempts, - last_err.as_ref().unwrap(), - delay - ); sleep(delay).await; delay = delay.saturating_mul(2); } @@ -87,130 +105,73 @@ where Err(last_err.unwrap_or_else(|| Error::other("retry_with_backoff: unknown error"))) } -async fn retry_metadata(path: &str) -> Result<(), Error> { - retry_with_backoff(|| async { tokio::fs::metadata(path).await.map(|_| ()) }, 3, Duration::from_millis(100)).await -} - async fn validate_queue_dir(queue_dir: &str) -> S3Result<()> { if !queue_dir.is_empty() { if !Path::new(queue_dir).is_absolute() { return Err(s3_error!(InvalidArgument, "queue_dir must be absolute path")); } - - if let Err(e) = retry_metadata(queue_dir).await { - return match e.kind() { - ErrorKind::NotFound => Err(s3_error!(InvalidArgument, "queue_dir does not exist")), - ErrorKind::PermissionDenied => Err(s3_error!(InvalidArgument, "queue_dir exists but permission denied")), - _ => Err(s3_error!(InvalidArgument, "failed to access queue_dir: {}", e)), - }; - } - } - - Ok(()) -} - -fn validate_cert_key_pair(cert: &Option, key: &Option) -> S3Result<()> { - if cert.is_some() != key.is_some() { - return Err(s3_error!(InvalidArgument, "client_cert and client_key must be specified as a pair")); + retry_with_backoff( + || async { tokio::fs::metadata(queue_dir).await.map(|_| ()) }, + 3, + Duration::from_millis(100), + ) + .await + .map_err(|e| match e.kind() { + ErrorKind::NotFound => s3_error!(InvalidArgument, "queue_dir does not exist"), + ErrorKind::PermissionDenied => s3_error!(InvalidArgument, "queue_dir exists but permission denied"), + _ => s3_error!(InvalidArgument, "failed to access queue_dir: {}", e), + })?; } Ok(()) } -/// Set (create or update) a notification target +// --- Operations --- + pub struct NotificationTarget {} #[async_trait::async_trait] impl Operation for NotificationTarget { async fn call(&self, req: S3Request, params: Params<'_, '_>) -> S3Result> { let span = Span::current(); let _enter = span.enter(); - // 1. Analyze query parameters let (target_type, target_name) = extract_target_params(¶ms)?; - // 2. Permission verification - let Some(input_cred) = &req.credentials else { - return Err(s3_error!(InvalidRequest, "credentials not found")); - }; - let (_cred, _owner) = - check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; + check_permissions(&req).await?; + let ns = get_notification_system()?; - // 3. Get notification system instance - let Some(ns) = rustfs_notify::notification_system() else { - return Err(s3_error!(InternalError, "notification system not initialized")); - }; - - // 4. The parsing request body is KVS (Key-Value Store) let mut input = req.input; - let body = input.store_all_limited(MAX_ADMIN_REQUEST_BODY_SIZE).await.map_err(|e| { + let body_bytes = input.store_all_limited(MAX_ADMIN_REQUEST_BODY_SIZE).await.map_err(|e| { warn!("failed to read request body: {:?}", e); s3_error!(InvalidRequest, "failed to read request body") })?; - // 1. Get the allowed key range - let allowed_keys: std::collections::HashSet<&str> = match target_type { + let notification_body: NotificationTargetBody = serde_json::from_slice(&body_bytes) + .map_err(|e| s3_error!(InvalidArgument, "invalid json body for target config: {}", e))?; + + let allowed_keys: HashSet<&str> = match target_type { NOTIFY_WEBHOOK_SUB_SYS => rustfs_config::notify::NOTIFY_WEBHOOK_KEYS.iter().cloned().collect(), NOTIFY_MQTT_SUB_SYS => rustfs_config::notify::NOTIFY_MQTT_KEYS.iter().cloned().collect(), _ => unreachable!(), }; - let notification_body: NotificationTargetBody = serde_json::from_slice(&body) - .map_err(|e| s3_error!(InvalidArgument, "invalid json body for target config: {}", e))?; + let kv_map: HashMap<&str, &str> = notification_body + .key_values + .iter() + .map(|kv| (kv.key.as_str(), kv.value.as_str())) + .collect(); - // 2. Filter and verify keys, and splice target_name - let mut kvs_vec = Vec::new(); - let mut endpoint_val = None; - let mut queue_dir_val = None; - let mut client_cert_val = None; - let mut client_key_val = None; - let mut qos_val = None; - let mut topic_val = String::new(); - - for kv in notification_body.key_values.iter() { - if !allowed_keys.contains(kv.key.as_str()) { - return Err(s3_error!( - InvalidArgument, - "key '{}' not allowed for target type '{}'", - kv.key, - target_type - )); + // Validate keys + for key in kv_map.keys() { + if !allowed_keys.contains(key) { + return Err(s3_error!(InvalidArgument, "key '{}' not allowed for target type '{}'", key, target_type)); } - if kv.key == "endpoint" { - endpoint_val = Some(kv.value.clone()); - } - - if target_type == NOTIFY_MQTT_SUB_SYS { - if kv.key == rustfs_config::MQTT_BROKER { - endpoint_val = Some(kv.value.clone()); - } - if kv.key == rustfs_config::MQTT_TOPIC { - topic_val = kv.value.clone(); - } - } - - if kv.key == "queue_dir" { - queue_dir_val = Some(kv.value.clone()); - } - if kv.key == "client_cert" { - client_cert_val = Some(kv.value.clone()); - } - if kv.key == "client_key" { - client_key_val = Some(kv.value.clone()); - } - if kv.key == "qos" { - qos_val = Some(kv.value.clone()); - } - - kvs_vec.push(rustfs_ecstore::config::KV { - key: kv.key.clone(), - value: kv.value.clone(), - hidden_if_empty: false, - }); } + // Type-specific validation if target_type == NOTIFY_WEBHOOK_SUB_SYS { - let endpoint = endpoint_val - .clone() + let endpoint = kv_map + .get("endpoint") .ok_or_else(|| s3_error!(InvalidArgument, "endpoint is required"))?; - let url = Url::parse(&endpoint).map_err(|e| s3_error!(InvalidArgument, "invalid endpoint url: {}", e))?; + let url = Url::parse(endpoint).map_err(|e| s3_error!(InvalidArgument, "invalid endpoint url: {}", e))?; let host = url .host_str() .ok_or_else(|| s3_error!(InvalidArgument, "endpoint missing host"))?; @@ -218,207 +179,147 @@ impl Operation for NotificationTarget { .port_or_known_default() .ok_or_else(|| s3_error!(InvalidArgument, "endpoint missing port"))?; let addr = format!("{host}:{port}"); - // First, try to parse as SocketAddr (IP:port) - if addr.parse::().is_err() { - // If not an IP:port, try DNS resolution - if lookup_host(&addr).await.is_err() { - return Err(s3_error!(InvalidArgument, "invalid or unresolvable endpoint address")); - } + if addr.parse::().is_err() && lookup_host(&addr).await.is_err() { + return Err(s3_error!(InvalidArgument, "invalid or unresolvable endpoint address")); } - if let Some(queue_dir) = queue_dir_val.clone() { - validate_queue_dir(&queue_dir).await?; + if let Some(queue_dir) = kv_map.get("queue_dir") { + validate_queue_dir(queue_dir).await?; } - validate_cert_key_pair(&client_cert_val, &client_key_val)?; - } + if kv_map.contains_key("client_cert") != kv_map.contains_key("client_key") { + return Err(s3_error!(InvalidArgument, "client_cert and client_key must be specified as a pair")); + } + } else if target_type == NOTIFY_MQTT_SUB_SYS { + let endpoint = kv_map + .get(rustfs_config::MQTT_BROKER) + .ok_or_else(|| s3_error!(InvalidArgument, "broker endpoint is required"))?; + let topic = kv_map + .get(rustfs_config::MQTT_TOPIC) + .ok_or_else(|| s3_error!(InvalidArgument, "topic is required"))?; + check_mqtt_broker_available(endpoint, topic) + .await + .map_err(|e| s3_error!(InvalidArgument, "MQTT Broker unavailable: {}", e))?; - if target_type == NOTIFY_MQTT_SUB_SYS { - let endpoint = endpoint_val.ok_or_else(|| s3_error!(InvalidArgument, "broker endpoint is required"))?; - if topic_val.is_empty() { - return Err(s3_error!(InvalidArgument, "topic is required")); - } - // Check MQTT Broker availability - if let Err(e) = check_mqtt_broker_available(&endpoint, &topic_val).await { - return Err(s3_error!(InvalidArgument, "MQTT Broker unavailable: {}", e)); - } - - if let Some(queue_dir) = queue_dir_val { - validate_queue_dir(&queue_dir).await?; - if let Some(qos) = qos_val { + if let Some(queue_dir) = kv_map.get("queue_dir") { + validate_queue_dir(queue_dir).await?; + if let Some(qos) = kv_map.get("qos") { match qos.parse::() { - Ok(qos_int) if qos_int == 1 || qos_int == 2 => {} - Ok(0) => { - return Err(s3_error!(InvalidArgument, "qos should be 1 or 2 if queue_dir is set")); - } - _ => { - return Err(s3_error!(InvalidArgument, "qos must be an integer 0, 1, or 2")); - } + Ok(1) | Ok(2) => {} + Ok(0) => return Err(s3_error!(InvalidArgument, "qos should be 1 or 2 if queue_dir is set")), + _ => return Err(s3_error!(InvalidArgument, "qos must be an integer 0, 1, or 2")), } } } } - // 3. Add ENABLE_KEY + let mut kvs_vec: Vec<_> = notification_body + .key_values + .into_iter() + .map(|kv| rustfs_ecstore::config::KV { + key: kv.key, + value: kv.value, + hidden_if_empty: false, + }) + .collect(); + kvs_vec.push(rustfs_ecstore::config::KV { key: ENABLE_KEY.to_string(), value: EnableState::On.to_string(), hidden_if_empty: false, }); - let kvs = rustfs_ecstore::config::KVS(kvs_vec); - - // 5. Call notification system to set target configuration info!("Setting target config for type '{}', name '{}'", target_type, target_name); - ns.set_target_config(target_type, target_name, kvs).await.map_err(|e| { - error!("failed to set target config: {}", e); - S3Error::with_message(S3ErrorCode::InternalError, format!("failed to set target config: {e}")) - })?; + ns.set_target_config(target_type, target_name, rustfs_ecstore::config::KVS(kvs_vec)) + .await + .map_err(|e| s3_error!(InternalError, "failed to set target config: {}", e))?; - let mut header = HeaderMap::new(); - header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); - header.insert(CONTENT_LENGTH, "0".parse().unwrap()); - if let Some(v) = req.headers.get("x-request-id") { - header.insert("x-request-id", v.clone()); - } - Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) + Ok(build_response(StatusCode::OK, Body::empty(), req.headers.get("x-request-id"))) } } -/// Get a list of notification targets for all activities pub struct ListNotificationTargets {} #[async_trait::async_trait] impl Operation for ListNotificationTargets { async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { let span = Span::current(); let _enter = span.enter(); - debug!("ListNotificationTargets call start request params: {:?}", req.uri.query()); + check_permissions(&req).await?; + let ns = get_notification_system()?; - // 1. Permission verification - let Some(input_cred) = &req.credentials else { - return Err(s3_error!(InvalidRequest, "credentials not found")); - }; - let (_cred, _owner) = - check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; + let targets = ns.get_target_values().await; + let target_count = targets.len(); - // 2. Get notification system instance - let Some(ns) = rustfs_notify::notification_system() else { - return Err(s3_error!(InternalError, "notification system not initialized")); - }; + let semaphore = Arc::new(Semaphore::new(10)); + let mut futures = FuturesUnordered::new(); - // 3. Get the list of activity targets - let active_targets = ns.get_active_targets().await; - - debug!("ListNotificationTargets call found {} active targets", active_targets.len()); - let mut notification_endpoints = Vec::new(); - for target_id in active_targets.iter() { - notification_endpoints.push(NotificationEndpoint { - account_id: target_id.id.clone(), - service: target_id.name.to_string(), - status: "online".to_string(), + for target in targets { + let sem = Arc::clone(&semaphore); + futures.push(async move { + let _permit = sem.acquire().await; + let status = match timeout(Duration::from_secs(3), target.is_active()).await { + Ok(Ok(true)) => "online", + _ => "offline", + }; + NotificationEndpoint { + account_id: target.id().to_string(), + service: target.name().to_string(), + status: status.to_string(), + } }); } - let response = NotificationEndpointsResponse { notification_endpoints }; - - // 4. Serialize and return the result - let data = serde_json::to_vec(&response).map_err(|e| { - error!("Failed to serialize notification targets response: {:?}", response); - S3Error::with_message(S3ErrorCode::InternalError, format!("failed to serialize targets: {e}")) - })?; - debug!("ListNotificationTargets call end, response data length: {}", data.len(),); - let mut header = HeaderMap::new(); - header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); - if let Some(v) = req.headers.get("x-request-id") { - header.insert("x-request-id", v.clone()); + let mut notification_endpoints = Vec::with_capacity(target_count); + while let Some(endpoint) = futures.next().await { + notification_endpoints.push(endpoint); } - Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header)) + + let data = serde_json::to_vec(&NotificationEndpointsResponse { notification_endpoints }) + .map_err(|e| s3_error!(InternalError, "failed to serialize targets: {}", e))?; + + Ok(build_response(StatusCode::OK, Body::from(data), req.headers.get("x-request-id"))) } } -/// Get a list of notification targets for all activities pub struct ListTargetsArns {} #[async_trait::async_trait] impl Operation for ListTargetsArns { async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { let span = Span::current(); let _enter = span.enter(); - debug!("ListTargetsArns call start request params: {:?}", req.uri.query()); + check_permissions(&req).await?; + let ns = get_notification_system()?; - // 1. Permission verification - let Some(input_cred) = &req.credentials else { - return Err(s3_error!(InvalidRequest, "credentials not found")); - }; - let (_cred, _owner) = - check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; - - // 2. Get notification system instance - let Some(ns) = rustfs_notify::notification_system() else { - return Err(s3_error!(InternalError, "notification system not initialized")); - }; - - // 3. Get the list of activity targets let active_targets = ns.get_active_targets().await; + let region = req + .region + .clone() + .ok_or_else(|| s3_error!(InvalidRequest, "region not found"))?; - debug!("ListTargetsArns call found {} active targets", active_targets.len()); + let data_target_arn_list: Vec<_> = active_targets.iter().map(|id| id.to_arn(®ion).to_string()).collect(); - let region = match req.region.clone() { - Some(region) => region, - None => return Err(s3_error!(InvalidRequest, "region not found")), - }; - let mut data_target_arn_list = Vec::new(); - - for target_id in active_targets.iter() { - data_target_arn_list.push(target_id.to_arn(®ion).to_string()); - } - - // 4. Serialize and return the result let data = serde_json::to_vec(&data_target_arn_list) - .map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("failed to serialize targets: {e}")))?; - debug!("ListTargetsArns call end, response data length: {}", data.len(),); - let mut header = HeaderMap::new(); - header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); - if let Some(v) = req.headers.get("x-request-id") { - header.insert("x-request-id", v.clone()); - } - Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header)) + .map_err(|e| s3_error!(InternalError, "failed to serialize targets: {}", e))?; + + Ok(build_response(StatusCode::OK, Body::from(data), req.headers.get("x-request-id"))) } } -/// Delete a specified notification target pub struct RemoveNotificationTarget {} #[async_trait::async_trait] impl Operation for RemoveNotificationTarget { async fn call(&self, req: S3Request, params: Params<'_, '_>) -> S3Result> { let span = Span::current(); let _enter = span.enter(); - // 1. Analyze query parameters let (target_type, target_name) = extract_target_params(¶ms)?; - // 2. Permission verification - let Some(input_cred) = &req.credentials else { - return Err(s3_error!(InvalidRequest, "credentials not found")); - }; - let (_cred, _owner) = - check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; + check_permissions(&req).await?; + let ns = get_notification_system()?; - // 3. Get notification system instance - let Some(ns) = rustfs_notify::notification_system() else { - return Err(s3_error!(InternalError, "notification system not initialized")); - }; - - // 4. Call notification system to remove target configuration info!("Removing target config for type '{}', name '{}'", target_type, target_name); - ns.remove_target_config(target_type, target_name).await.map_err(|e| { - error!("failed to remove target config: {}", e); - S3Error::with_message(S3ErrorCode::InternalError, format!("failed to remove target config: {e}")) - })?; + ns.remove_target_config(target_type, target_name) + .await + .map_err(|e| s3_error!(InternalError, "failed to remove target config: {}", e))?; - let mut header = HeaderMap::new(); - header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); - header.insert(CONTENT_LENGTH, "0".parse().unwrap()); - if let Some(v) = req.headers.get("x-request-id") { - header.insert("x-request-id", v.clone()); - } - Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) + Ok(build_response(StatusCode::OK, Body::empty(), req.headers.get("x-request-id"))) } } @@ -433,7 +334,6 @@ fn extract_target_params<'a>(params: &'a Params<'_, '_>) -> S3Result<(&'a str, & if target_type != NOTIFY_WEBHOOK_SUB_SYS && target_type != NOTIFY_MQTT_SUB_SYS { return Err(s3_error!(InvalidArgument, "unsupported target type: '{}'", target_type)); } - let target_name = extract_param(params, "target_name")?; Ok((target_type, target_name)) }