Refactor Event Admin Handlers and Parallelize Target Status Probes (#1501)

This commit is contained in:
houseme
2026-01-14 14:18:02 +08:00
committed by GitHub
parent f795299d53
commit 27480f7625
3 changed files with 165 additions and 243 deletions

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use crate::notification_system_subscriber::NotificationSystemSubscriberView;
use crate::notifier::TargetList;
use crate::{
Event, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream,
};
@@ -191,6 +192,22 @@ impl NotificationSystem {
self.notifier.target_list().read().await.keys()
}
/// Gets the complete Target list, including both active and inactive Targets.
///
/// # Return
/// An `Arc<RwLock<TargetList>>` containing all Targets.
pub async fn get_all_targets(&self) -> Arc<RwLock<TargetList>> {
self.notifier.target_list()
}
/// Gets all Target values, including both active and inactive Targets.
///
/// # Return
/// A Vec containing all Targets.
pub async fn get_target_values(&self) -> Vec<Arc<dyn Target<Event> + Send + Sync>> {
self.notifier.target_list().read().await.values()
}
/// Checks if there are active subscribers for the given bucket and event name.
pub async fn has_subscriber(&self, bucket: &str, event: &EventName) -> bool {
if !self.subscriber_view.has_subscriber(bucket, event) {

View File

@@ -370,6 +370,11 @@ impl TargetList {
self.targets.keys().cloned().collect()
}
/// Returns all targets in the list
pub fn values(&self) -> Vec<Arc<dyn Target<Event> + Send + Sync>> {
self.targets.values().cloned().collect()
}
/// Returns the number of targets
pub fn len(&self) -> usize {
self.targets.len()

View File

@@ -14,21 +14,24 @@
use crate::admin::router::Operation;
use crate::auth::{check_key_valid, get_session_token};
use futures::stream::{FuturesUnordered, StreamExt};
use http::{HeaderMap, StatusCode};
use matchit::Params;
use rustfs_config::notify::{NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS};
use rustfs_config::{ENABLE_KEY, EnableState, MAX_ADMIN_REQUEST_BODY_SIZE};
use rustfs_targets::check_mqtt_broker_available;
use s3s::header::CONTENT_LENGTH;
use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error};
use s3s::{Body, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::io::{Error, ErrorKind};
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use tokio::net::lookup_host;
use tokio::time::{Duration, sleep};
use tracing::{Span, debug, error, info, warn};
use tokio::sync::Semaphore;
use tokio::time::{Duration, sleep, timeout};
use tracing::{Span, info, warn};
use url::Url;
#[derive(Debug, Deserialize)]
@@ -54,12 +57,34 @@ struct NotificationEndpointsResponse {
notification_endpoints: Vec<NotificationEndpoint>,
}
// --- Helper Functions ---
async fn check_permissions(req: &S3Request<Body>) -> S3Result<()> {
let Some(input_cred) = &req.credentials else {
return Err(s3_error!(InvalidRequest, "credentials not found"));
};
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
Ok(())
}
fn get_notification_system() -> S3Result<Arc<rustfs_notify::NotificationSystem>> {
rustfs_notify::notification_system().ok_or_else(|| s3_error!(InternalError, "notification system not initialized"))
}
fn build_response(status: StatusCode, body: Body, request_id: Option<&http::HeaderValue>) -> S3Response<(StatusCode, Body)> {
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
if let Some(v) = request_id {
header.insert("x-request-id", v.clone());
}
S3Response::with_headers((status, body), header)
}
async fn retry_with_backoff<F, Fut, T>(mut operation: F, max_attempts: usize, base_delay: Duration) -> Result<T, Error>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, Error>>,
{
assert!(max_attempts > 0, "max_attempts must be greater than 0");
let mut attempts = 0;
let mut delay = base_delay;
let mut last_err = None;
@@ -71,13 +96,6 @@ where
last_err = Some(e);
attempts += 1;
if attempts < max_attempts {
warn!(
"Retry attempt {}/{} failed: {}. Retrying in {:?}",
attempts,
max_attempts,
last_err.as_ref().unwrap(),
delay
);
sleep(delay).await;
delay = delay.saturating_mul(2);
}
@@ -87,130 +105,73 @@ where
Err(last_err.unwrap_or_else(|| Error::other("retry_with_backoff: unknown error")))
}
async fn retry_metadata(path: &str) -> Result<(), Error> {
retry_with_backoff(|| async { tokio::fs::metadata(path).await.map(|_| ()) }, 3, Duration::from_millis(100)).await
}
async fn validate_queue_dir(queue_dir: &str) -> S3Result<()> {
if !queue_dir.is_empty() {
if !Path::new(queue_dir).is_absolute() {
return Err(s3_error!(InvalidArgument, "queue_dir must be absolute path"));
}
if let Err(e) = retry_metadata(queue_dir).await {
return match e.kind() {
ErrorKind::NotFound => Err(s3_error!(InvalidArgument, "queue_dir does not exist")),
ErrorKind::PermissionDenied => Err(s3_error!(InvalidArgument, "queue_dir exists but permission denied")),
_ => Err(s3_error!(InvalidArgument, "failed to access queue_dir: {}", e)),
};
}
}
Ok(())
}
fn validate_cert_key_pair(cert: &Option<String>, key: &Option<String>) -> S3Result<()> {
if cert.is_some() != key.is_some() {
return Err(s3_error!(InvalidArgument, "client_cert and client_key must be specified as a pair"));
retry_with_backoff(
|| async { tokio::fs::metadata(queue_dir).await.map(|_| ()) },
3,
Duration::from_millis(100),
)
.await
.map_err(|e| match e.kind() {
ErrorKind::NotFound => s3_error!(InvalidArgument, "queue_dir does not exist"),
ErrorKind::PermissionDenied => s3_error!(InvalidArgument, "queue_dir exists but permission denied"),
_ => s3_error!(InvalidArgument, "failed to access queue_dir: {}", e),
})?;
}
Ok(())
}
/// Set (create or update) a notification target
// --- Operations ---
pub struct NotificationTarget {}
#[async_trait::async_trait]
impl Operation for NotificationTarget {
async fn call(&self, req: S3Request<Body>, params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let span = Span::current();
let _enter = span.enter();
// 1. Analyze query parameters
let (target_type, target_name) = extract_target_params(&params)?;
// 2. Permission verification
let Some(input_cred) = &req.credentials else {
return Err(s3_error!(InvalidRequest, "credentials not found"));
};
let (_cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
check_permissions(&req).await?;
let ns = get_notification_system()?;
// 3. Get notification system instance
let Some(ns) = rustfs_notify::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
// 4. The parsing request body is KVS (Key-Value Store)
let mut input = req.input;
let body = input.store_all_limited(MAX_ADMIN_REQUEST_BODY_SIZE).await.map_err(|e| {
let body_bytes = input.store_all_limited(MAX_ADMIN_REQUEST_BODY_SIZE).await.map_err(|e| {
warn!("failed to read request body: {:?}", e);
s3_error!(InvalidRequest, "failed to read request body")
})?;
// 1. Get the allowed key range
let allowed_keys: std::collections::HashSet<&str> = match target_type {
let notification_body: NotificationTargetBody = serde_json::from_slice(&body_bytes)
.map_err(|e| s3_error!(InvalidArgument, "invalid json body for target config: {}", e))?;
let allowed_keys: HashSet<&str> = match target_type {
NOTIFY_WEBHOOK_SUB_SYS => rustfs_config::notify::NOTIFY_WEBHOOK_KEYS.iter().cloned().collect(),
NOTIFY_MQTT_SUB_SYS => rustfs_config::notify::NOTIFY_MQTT_KEYS.iter().cloned().collect(),
_ => unreachable!(),
};
let notification_body: NotificationTargetBody = serde_json::from_slice(&body)
.map_err(|e| s3_error!(InvalidArgument, "invalid json body for target config: {}", e))?;
let kv_map: HashMap<&str, &str> = notification_body
.key_values
.iter()
.map(|kv| (kv.key.as_str(), kv.value.as_str()))
.collect();
// 2. Filter and verify keys, and splice target_name
let mut kvs_vec = Vec::new();
let mut endpoint_val = None;
let mut queue_dir_val = None;
let mut client_cert_val = None;
let mut client_key_val = None;
let mut qos_val = None;
let mut topic_val = String::new();
for kv in notification_body.key_values.iter() {
if !allowed_keys.contains(kv.key.as_str()) {
return Err(s3_error!(
InvalidArgument,
"key '{}' not allowed for target type '{}'",
kv.key,
target_type
));
// Validate keys
for key in kv_map.keys() {
if !allowed_keys.contains(key) {
return Err(s3_error!(InvalidArgument, "key '{}' not allowed for target type '{}'", key, target_type));
}
if kv.key == "endpoint" {
endpoint_val = Some(kv.value.clone());
}
if target_type == NOTIFY_MQTT_SUB_SYS {
if kv.key == rustfs_config::MQTT_BROKER {
endpoint_val = Some(kv.value.clone());
}
if kv.key == rustfs_config::MQTT_TOPIC {
topic_val = kv.value.clone();
}
}
if kv.key == "queue_dir" {
queue_dir_val = Some(kv.value.clone());
}
if kv.key == "client_cert" {
client_cert_val = Some(kv.value.clone());
}
if kv.key == "client_key" {
client_key_val = Some(kv.value.clone());
}
if kv.key == "qos" {
qos_val = Some(kv.value.clone());
}
kvs_vec.push(rustfs_ecstore::config::KV {
key: kv.key.clone(),
value: kv.value.clone(),
hidden_if_empty: false,
});
}
// Type-specific validation
if target_type == NOTIFY_WEBHOOK_SUB_SYS {
let endpoint = endpoint_val
.clone()
let endpoint = kv_map
.get("endpoint")
.ok_or_else(|| s3_error!(InvalidArgument, "endpoint is required"))?;
let url = Url::parse(&endpoint).map_err(|e| s3_error!(InvalidArgument, "invalid endpoint url: {}", e))?;
let url = Url::parse(endpoint).map_err(|e| s3_error!(InvalidArgument, "invalid endpoint url: {}", e))?;
let host = url
.host_str()
.ok_or_else(|| s3_error!(InvalidArgument, "endpoint missing host"))?;
@@ -218,207 +179,147 @@ impl Operation for NotificationTarget {
.port_or_known_default()
.ok_or_else(|| s3_error!(InvalidArgument, "endpoint missing port"))?;
let addr = format!("{host}:{port}");
// First, try to parse as SocketAddr (IP:port)
if addr.parse::<SocketAddr>().is_err() {
// If not an IP:port, try DNS resolution
if lookup_host(&addr).await.is_err() {
return Err(s3_error!(InvalidArgument, "invalid or unresolvable endpoint address"));
}
if addr.parse::<SocketAddr>().is_err() && lookup_host(&addr).await.is_err() {
return Err(s3_error!(InvalidArgument, "invalid or unresolvable endpoint address"));
}
if let Some(queue_dir) = queue_dir_val.clone() {
validate_queue_dir(&queue_dir).await?;
if let Some(queue_dir) = kv_map.get("queue_dir") {
validate_queue_dir(queue_dir).await?;
}
validate_cert_key_pair(&client_cert_val, &client_key_val)?;
}
if kv_map.contains_key("client_cert") != kv_map.contains_key("client_key") {
return Err(s3_error!(InvalidArgument, "client_cert and client_key must be specified as a pair"));
}
} else if target_type == NOTIFY_MQTT_SUB_SYS {
let endpoint = kv_map
.get(rustfs_config::MQTT_BROKER)
.ok_or_else(|| s3_error!(InvalidArgument, "broker endpoint is required"))?;
let topic = kv_map
.get(rustfs_config::MQTT_TOPIC)
.ok_or_else(|| s3_error!(InvalidArgument, "topic is required"))?;
check_mqtt_broker_available(endpoint, topic)
.await
.map_err(|e| s3_error!(InvalidArgument, "MQTT Broker unavailable: {}", e))?;
if target_type == NOTIFY_MQTT_SUB_SYS {
let endpoint = endpoint_val.ok_or_else(|| s3_error!(InvalidArgument, "broker endpoint is required"))?;
if topic_val.is_empty() {
return Err(s3_error!(InvalidArgument, "topic is required"));
}
// Check MQTT Broker availability
if let Err(e) = check_mqtt_broker_available(&endpoint, &topic_val).await {
return Err(s3_error!(InvalidArgument, "MQTT Broker unavailable: {}", e));
}
if let Some(queue_dir) = queue_dir_val {
validate_queue_dir(&queue_dir).await?;
if let Some(qos) = qos_val {
if let Some(queue_dir) = kv_map.get("queue_dir") {
validate_queue_dir(queue_dir).await?;
if let Some(qos) = kv_map.get("qos") {
match qos.parse::<u8>() {
Ok(qos_int) if qos_int == 1 || qos_int == 2 => {}
Ok(0) => {
return Err(s3_error!(InvalidArgument, "qos should be 1 or 2 if queue_dir is set"));
}
_ => {
return Err(s3_error!(InvalidArgument, "qos must be an integer 0, 1, or 2"));
}
Ok(1) | Ok(2) => {}
Ok(0) => return Err(s3_error!(InvalidArgument, "qos should be 1 or 2 if queue_dir is set")),
_ => return Err(s3_error!(InvalidArgument, "qos must be an integer 0, 1, or 2")),
}
}
}
}
// 3. Add ENABLE_KEY
let mut kvs_vec: Vec<_> = notification_body
.key_values
.into_iter()
.map(|kv| rustfs_ecstore::config::KV {
key: kv.key,
value: kv.value,
hidden_if_empty: false,
})
.collect();
kvs_vec.push(rustfs_ecstore::config::KV {
key: ENABLE_KEY.to_string(),
value: EnableState::On.to_string(),
hidden_if_empty: false,
});
let kvs = rustfs_ecstore::config::KVS(kvs_vec);
// 5. Call notification system to set target configuration
info!("Setting target config for type '{}', name '{}'", target_type, target_name);
ns.set_target_config(target_type, target_name, kvs).await.map_err(|e| {
error!("failed to set target config: {}", e);
S3Error::with_message(S3ErrorCode::InternalError, format!("failed to set target config: {e}"))
})?;
ns.set_target_config(target_type, target_name, rustfs_ecstore::config::KVS(kvs_vec))
.await
.map_err(|e| s3_error!(InternalError, "failed to set target config: {}", e))?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
header.insert(CONTENT_LENGTH, "0".parse().unwrap());
if let Some(v) = req.headers.get("x-request-id") {
header.insert("x-request-id", v.clone());
}
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
Ok(build_response(StatusCode::OK, Body::empty(), req.headers.get("x-request-id")))
}
}
/// Get a list of notification targets for all activities
pub struct ListNotificationTargets {}
#[async_trait::async_trait]
impl Operation for ListNotificationTargets {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let span = Span::current();
let _enter = span.enter();
debug!("ListNotificationTargets call start request params: {:?}", req.uri.query());
check_permissions(&req).await?;
let ns = get_notification_system()?;
// 1. Permission verification
let Some(input_cred) = &req.credentials else {
return Err(s3_error!(InvalidRequest, "credentials not found"));
};
let (_cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
let targets = ns.get_target_values().await;
let target_count = targets.len();
// 2. Get notification system instance
let Some(ns) = rustfs_notify::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
let semaphore = Arc::new(Semaphore::new(10));
let mut futures = FuturesUnordered::new();
// 3. Get the list of activity targets
let active_targets = ns.get_active_targets().await;
debug!("ListNotificationTargets call found {} active targets", active_targets.len());
let mut notification_endpoints = Vec::new();
for target_id in active_targets.iter() {
notification_endpoints.push(NotificationEndpoint {
account_id: target_id.id.clone(),
service: target_id.name.to_string(),
status: "online".to_string(),
for target in targets {
let sem = Arc::clone(&semaphore);
futures.push(async move {
let _permit = sem.acquire().await;
let status = match timeout(Duration::from_secs(3), target.is_active()).await {
Ok(Ok(true)) => "online",
_ => "offline",
};
NotificationEndpoint {
account_id: target.id().to_string(),
service: target.name().to_string(),
status: status.to_string(),
}
});
}
let response = NotificationEndpointsResponse { notification_endpoints };
// 4. Serialize and return the result
let data = serde_json::to_vec(&response).map_err(|e| {
error!("Failed to serialize notification targets response: {:?}", response);
S3Error::with_message(S3ErrorCode::InternalError, format!("failed to serialize targets: {e}"))
})?;
debug!("ListNotificationTargets call end, response data length: {}", data.len(),);
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
if let Some(v) = req.headers.get("x-request-id") {
header.insert("x-request-id", v.clone());
let mut notification_endpoints = Vec::with_capacity(target_count);
while let Some(endpoint) = futures.next().await {
notification_endpoints.push(endpoint);
}
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
let data = serde_json::to_vec(&NotificationEndpointsResponse { notification_endpoints })
.map_err(|e| s3_error!(InternalError, "failed to serialize targets: {}", e))?;
Ok(build_response(StatusCode::OK, Body::from(data), req.headers.get("x-request-id")))
}
}
/// Get a list of notification targets for all activities
pub struct ListTargetsArns {}
#[async_trait::async_trait]
impl Operation for ListTargetsArns {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let span = Span::current();
let _enter = span.enter();
debug!("ListTargetsArns call start request params: {:?}", req.uri.query());
check_permissions(&req).await?;
let ns = get_notification_system()?;
// 1. Permission verification
let Some(input_cred) = &req.credentials else {
return Err(s3_error!(InvalidRequest, "credentials not found"));
};
let (_cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
// 2. Get notification system instance
let Some(ns) = rustfs_notify::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
// 3. Get the list of activity targets
let active_targets = ns.get_active_targets().await;
let region = req
.region
.clone()
.ok_or_else(|| s3_error!(InvalidRequest, "region not found"))?;
debug!("ListTargetsArns call found {} active targets", active_targets.len());
let data_target_arn_list: Vec<_> = active_targets.iter().map(|id| id.to_arn(&region).to_string()).collect();
let region = match req.region.clone() {
Some(region) => region,
None => return Err(s3_error!(InvalidRequest, "region not found")),
};
let mut data_target_arn_list = Vec::new();
for target_id in active_targets.iter() {
data_target_arn_list.push(target_id.to_arn(&region).to_string());
}
// 4. Serialize and return the result
let data = serde_json::to_vec(&data_target_arn_list)
.map_err(|e| S3Error::with_message(S3ErrorCode::InternalError, format!("failed to serialize targets: {e}")))?;
debug!("ListTargetsArns call end, response data length: {}", data.len(),);
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
if let Some(v) = req.headers.get("x-request-id") {
header.insert("x-request-id", v.clone());
}
Ok(S3Response::with_headers((StatusCode::OK, Body::from(data)), header))
.map_err(|e| s3_error!(InternalError, "failed to serialize targets: {}", e))?;
Ok(build_response(StatusCode::OK, Body::from(data), req.headers.get("x-request-id")))
}
}
/// Delete a specified notification target
pub struct RemoveNotificationTarget {}
#[async_trait::async_trait]
impl Operation for RemoveNotificationTarget {
async fn call(&self, req: S3Request<Body>, params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let span = Span::current();
let _enter = span.enter();
// 1. Analyze query parameters
let (target_type, target_name) = extract_target_params(&params)?;
// 2. Permission verification
let Some(input_cred) = &req.credentials else {
return Err(s3_error!(InvalidRequest, "credentials not found"));
};
let (_cred, _owner) =
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
check_permissions(&req).await?;
let ns = get_notification_system()?;
// 3. Get notification system instance
let Some(ns) = rustfs_notify::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
// 4. Call notification system to remove target configuration
info!("Removing target config for type '{}', name '{}'", target_type, target_name);
ns.remove_target_config(target_type, target_name).await.map_err(|e| {
error!("failed to remove target config: {}", e);
S3Error::with_message(S3ErrorCode::InternalError, format!("failed to remove target config: {e}"))
})?;
ns.remove_target_config(target_type, target_name)
.await
.map_err(|e| s3_error!(InternalError, "failed to remove target config: {}", e))?;
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
header.insert(CONTENT_LENGTH, "0".parse().unwrap());
if let Some(v) = req.headers.get("x-request-id") {
header.insert("x-request-id", v.clone());
}
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
Ok(build_response(StatusCode::OK, Body::empty(), req.headers.get("x-request-id")))
}
}
@@ -433,7 +334,6 @@ fn extract_target_params<'a>(params: &'a Params<'_, '_>) -> S3Result<(&'a str, &
if target_type != NOTIFY_WEBHOOK_SUB_SYS && target_type != NOTIFY_MQTT_SUB_SYS {
return Err(s3_error!(InvalidArgument, "unsupported target type: '{}'", target_type));
}
let target_name = extract_param(params, "target_name")?;
Ok((target_type, target_name))
}