mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-16 17:20:33 +00:00
Feature/event: Optimize and Enhance Notify Crate Functionality (#512)
* improve code for notify * fix * cargo fmt * improve code and create `DEFAULT_DELIMITER` * fix * fix * improve code for notify * fmt * Update crates/notify/src/registry.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update crates/notify/src/factory.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix cllipy --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -8341,6 +8341,7 @@ dependencies = [
|
||||
"quick-xml",
|
||||
"reqwest",
|
||||
"rumqttc",
|
||||
"rustfs-config",
|
||||
"rustfs-utils",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
||||
@@ -201,7 +201,7 @@ impl RustFSConfig {
|
||||
/// Clear the stored configuration from the system keyring
|
||||
///
|
||||
/// # Returns
|
||||
/// Returns `Ok(())` if the configuration was successfully cleared, or an error if the operation failed.
|
||||
/// `Ok(())` if the configuration was successfully cleared, or an error if the operation failed.
|
||||
///
|
||||
/// # Example
|
||||
/// ```
|
||||
@@ -525,14 +525,14 @@ mod tests {
|
||||
host: "127.0.0.1".to_string(),
|
||||
port: "9000".to_string(),
|
||||
access_key: "用户名".to_string(),
|
||||
secret_key: "密码123".to_string(),
|
||||
secret_key: "密码 123".to_string(),
|
||||
domain_name: "测试.com".to_string(),
|
||||
volume_name: "/数据/存储".to_string(),
|
||||
console_address: "127.0.0.1:9001".to_string(),
|
||||
};
|
||||
|
||||
assert_eq!(config.access_key, "用户名");
|
||||
assert_eq!(config.secret_key, "密码123");
|
||||
assert_eq!(config.secret_key, "密码 123");
|
||||
assert_eq!(config.domain_name, "测试.com");
|
||||
assert_eq!(config.volume_name, "/数据/存储");
|
||||
}
|
||||
|
||||
@@ -18,5 +18,6 @@ workspace = true
|
||||
[features]
|
||||
default = []
|
||||
constants = ["dep:const-str"]
|
||||
notify = []
|
||||
observability = []
|
||||
|
||||
|
||||
7
crates/config/src/constants/env.rs
Normal file
7
crates/config/src/constants/env.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
pub const DEFAULT_DELIMITER: &str = "_";
|
||||
pub const ENV_PREFIX: &str = "RUSTFS_";
|
||||
pub const ENV_WORD_DELIMITER: &str = "_";
|
||||
|
||||
/// Medium-drawn lines separator
|
||||
/// This is used to separate words in environment variable names.
|
||||
pub const ENV_WORD_DELIMITER_DASH: &str = "-";
|
||||
@@ -1 +1,2 @@
|
||||
pub(crate) mod app;
|
||||
pub(crate) mod env;
|
||||
|
||||
@@ -2,6 +2,9 @@
|
||||
pub mod constants;
|
||||
#[cfg(feature = "constants")]
|
||||
pub use constants::app::*;
|
||||
|
||||
#[cfg(feature = "constants")]
|
||||
pub use constants::env::*;
|
||||
#[cfg(feature = "notify")]
|
||||
pub mod notify;
|
||||
#[cfg(feature = "observability")]
|
||||
pub mod observability;
|
||||
|
||||
7
crates/config/src/notify/arn.rs
Normal file
7
crates/config/src/notify/arn.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
pub const DEFAULT_ARN_PARTITION: &str = "rustfs";
|
||||
|
||||
pub const DEFAULT_ARN_SERVICE: &str = "sqs";
|
||||
|
||||
/// Default ARN prefix for SQS
|
||||
/// "arn:rustfs:sqs:"
|
||||
pub const ARN_PREFIX: &str = const_str::concat!("arn:", DEFAULT_ARN_PARTITION, ":", DEFAULT_ARN_SERVICE, ":");
|
||||
38
crates/config/src/notify/mod.rs
Normal file
38
crates/config/src/notify/mod.rs
Normal file
@@ -0,0 +1,38 @@
|
||||
mod arn;
|
||||
mod mqtt;
|
||||
mod store;
|
||||
mod webhook;
|
||||
|
||||
pub use arn::*;
|
||||
pub use mqtt::*;
|
||||
pub use store::*;
|
||||
pub use webhook::*;
|
||||
|
||||
// --- Configuration Constants ---
|
||||
pub const DEFAULT_TARGET: &str = "1";
|
||||
|
||||
pub const NOTIFY_PREFIX: &str = "notify";
|
||||
|
||||
pub const NOTIFY_ROUTE_PREFIX: &str = "notify_";
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_SUB_SYSTEMS: &[&str] = &[NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS];
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_KAFKA_SUB_SYS: &str = "notify_kafka";
|
||||
pub const NOTIFY_MQTT_SUB_SYS: &str = "notify_mqtt";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_MY_SQL_SUB_SYS: &str = "notify_mysql";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_NATS_SUB_SYS: &str = "notify_nats";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_NSQ_SUB_SYS: &str = "notify_nsq";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_ES_SUB_SYS: &str = "notify_elasticsearch";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_AMQP_SUB_SYS: &str = "notify_amqp";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_POSTGRES_SUB_SYS: &str = "notify_postgres";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_REDIS_SUB_SYS: &str = "notify_redis";
|
||||
pub const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook";
|
||||
22
crates/config/src/notify/mqtt.rs
Normal file
22
crates/config/src/notify/mqtt.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
// MQTT Keys
|
||||
pub const MQTT_BROKER: &str = "broker";
|
||||
pub const MQTT_TOPIC: &str = "topic";
|
||||
pub const MQTT_QOS: &str = "qos";
|
||||
pub const MQTT_USERNAME: &str = "username";
|
||||
pub const MQTT_PASSWORD: &str = "password";
|
||||
pub const MQTT_RECONNECT_INTERVAL: &str = "reconnect_interval";
|
||||
pub const MQTT_KEEP_ALIVE_INTERVAL: &str = "keep_alive_interval";
|
||||
pub const MQTT_QUEUE_DIR: &str = "queue_dir";
|
||||
pub const MQTT_QUEUE_LIMIT: &str = "queue_limit";
|
||||
|
||||
// MQTT Environment Variables
|
||||
pub const ENV_MQTT_ENABLE: &str = "RUSTFS_NOTIFY_MQTT_ENABLE";
|
||||
pub const ENV_MQTT_BROKER: &str = "RUSTFS_NOTIFY_MQTT_BROKER";
|
||||
pub const ENV_MQTT_TOPIC: &str = "RUSTFS_NOTIFY_MQTT_TOPIC";
|
||||
pub const ENV_MQTT_QOS: &str = "RUSTFS_NOTIFY_MQTT_QOS";
|
||||
pub const ENV_MQTT_USERNAME: &str = "RUSTFS_NOTIFY_MQTT_USERNAME";
|
||||
pub const ENV_MQTT_PASSWORD: &str = "RUSTFS_NOTIFY_MQTT_PASSWORD";
|
||||
pub const ENV_MQTT_RECONNECT_INTERVAL: &str = "RUSTFS_NOTIFY_MQTT_RECONNECT_INTERVAL";
|
||||
pub const ENV_MQTT_KEEP_ALIVE_INTERVAL: &str = "RUSTFS_NOTIFY_MQTT_KEEP_ALIVE_INTERVAL";
|
||||
pub const ENV_MQTT_QUEUE_DIR: &str = "RUSTFS_NOTIFY_MQTT_QUEUE_DIR";
|
||||
pub const ENV_MQTT_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_MQTT_QUEUE_LIMIT";
|
||||
7
crates/config/src/notify/store.rs
Normal file
7
crates/config/src/notify/store.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
pub const DEFAULT_DIR: &str = "/opt/rustfs/events"; // Default directory for event store
|
||||
pub const DEFAULT_LIMIT: u64 = 100000; // Default store limit
|
||||
pub const DEFAULT_EXT: &str = ".unknown"; // Default file extension
|
||||
pub const COMPRESS_EXT: &str = ".snappy"; // Extension for compressed files
|
||||
|
||||
/// STORE_EXTENSION - file extension of an event file in store
|
||||
pub const STORE_EXTENSION: &str = ".event";
|
||||
16
crates/config/src/notify/webhook.rs
Normal file
16
crates/config/src/notify/webhook.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
// Webhook Keys
|
||||
pub const WEBHOOK_ENDPOINT: &str = "endpoint";
|
||||
pub const WEBHOOK_AUTH_TOKEN: &str = "auth_token";
|
||||
pub const WEBHOOK_QUEUE_LIMIT: &str = "queue_limit";
|
||||
pub const WEBHOOK_QUEUE_DIR: &str = "queue_dir";
|
||||
pub const WEBHOOK_CLIENT_CERT: &str = "client_cert";
|
||||
pub const WEBHOOK_CLIENT_KEY: &str = "client_key";
|
||||
|
||||
// Webhook Environment Variables
|
||||
pub const ENV_WEBHOOK_ENABLE: &str = "RUSTFS_NOTIFY_WEBHOOK_ENABLE";
|
||||
pub const ENV_WEBHOOK_ENDPOINT: &str = "RUSTFS_NOTIFY_WEBHOOK_ENDPOINT";
|
||||
pub const ENV_WEBHOOK_AUTH_TOKEN: &str = "RUSTFS_NOTIFY_WEBHOOK_AUTH_TOKEN";
|
||||
pub const ENV_WEBHOOK_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_LIMIT";
|
||||
pub const ENV_WEBHOOK_QUEUE_DIR: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR";
|
||||
pub const ENV_WEBHOOK_CLIENT_CERT: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_CERT";
|
||||
pub const ENV_WEBHOOK_CLIENT_KEY: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_KEY";
|
||||
@@ -7,6 +7,7 @@ rust-version.workspace = true
|
||||
version.workspace = true
|
||||
|
||||
[dependencies]
|
||||
rustfs-config = { workspace = true, features = ["notify"] }
|
||||
rustfs-utils = { workspace = true, features = ["path", "sys"] }
|
||||
async-trait = { workspace = true }
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use ecstore::config::{Config, ENABLE_KEY, ENABLE_ON, KV, KVS};
|
||||
use rustfs_notify::arn::TargetID;
|
||||
use rustfs_notify::factory::{
|
||||
DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME,
|
||||
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
|
||||
use rustfs_config::notify::{
|
||||
DEFAULT_LIMIT, DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC,
|
||||
MQTT_USERNAME, NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR,
|
||||
WEBHOOK_QUEUE_LIMIT,
|
||||
};
|
||||
use rustfs_notify::store::DEFAULT_LIMIT;
|
||||
use rustfs_notify::arn::TargetID;
|
||||
use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger};
|
||||
use rustfs_notify::{initialize, notification_system};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
use ecstore::config::{Config, ENABLE_KEY, ENABLE_ON, KV, KVS};
|
||||
use std::sync::Arc;
|
||||
// Using Global Accessories
|
||||
use rustfs_notify::arn::TargetID;
|
||||
use rustfs_notify::factory::{
|
||||
DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME,
|
||||
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
|
||||
use rustfs_config::notify::{
|
||||
DEFAULT_LIMIT, DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC,
|
||||
MQTT_USERNAME, NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR,
|
||||
WEBHOOK_QUEUE_LIMIT,
|
||||
};
|
||||
use rustfs_notify::store::DEFAULT_LIMIT;
|
||||
use rustfs_notify::arn::TargetID;
|
||||
use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger};
|
||||
use rustfs_notify::{initialize, notification_system};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::info;
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ use axum::{
|
||||
routing::post,
|
||||
};
|
||||
use serde_json::Value;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use axum::extract::Query;
|
||||
@@ -17,7 +18,9 @@ struct ResetParams {
|
||||
}
|
||||
|
||||
// Define a global variable and count the number of data received
|
||||
use rustfs_utils::parse_and_resolve_address;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
static WEBHOOK_COUNT: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
@@ -30,16 +33,23 @@ async fn main() {
|
||||
.route("/webhook/reset", get(reset_webhook_count))
|
||||
.route("/webhook", get(receive_webhook));
|
||||
// Start the server
|
||||
let addr = "0.0.0.0:3020";
|
||||
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
|
||||
println!("Server running on {}", addr);
|
||||
// let addr = "[0.0.0.0.0.0.0.0]:3020";
|
||||
let server_addr = match parse_and_resolve_address(":3020") {
|
||||
Ok(addr) => addr,
|
||||
Err(e) => {
|
||||
eprintln!("Failed to parse address: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let listener = TcpListener::bind(server_addr).await.unwrap();
|
||||
println!("Server running on {}", server_addr);
|
||||
|
||||
// Self-checking after the service is started
|
||||
tokio::spawn(async move {
|
||||
// Give the server some time to start
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
|
||||
match is_service_active(addr).await {
|
||||
match is_service_active(server_addr).await {
|
||||
Ok(true) => println!("Service health check: Successful - Service is running normally"),
|
||||
Ok(false) => eprintln!("Service Health Check: Failed - Service Not Responded"),
|
||||
Err(e) => eprintln!("Service health check errors:{}", e),
|
||||
@@ -106,7 +116,7 @@ async fn reset_webhook_count(Query(params): Query<ResetParams>, headers: HeaderM
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn is_service_active(addr: &str) -> Result<bool, String> {
|
||||
async fn is_service_active(addr: SocketAddr) -> Result<bool, String> {
|
||||
let socket_addr = tokio::net::lookup_host(addr)
|
||||
.await
|
||||
.map_err(|e| format!("Unable to resolve host:{}", e))?
|
||||
|
||||
@@ -1,18 +1,10 @@
|
||||
use crate::TargetError;
|
||||
use const_str::concat;
|
||||
use rustfs_config::notify::{ARN_PREFIX, DEFAULT_ARN_PARTITION, DEFAULT_ARN_SERVICE};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use std::fmt;
|
||||
use std::str::FromStr;
|
||||
use thiserror::Error;
|
||||
|
||||
pub(crate) const DEFAULT_ARN_PARTITION: &str = "rustfs";
|
||||
|
||||
pub(crate) const DEFAULT_ARN_SERVICE: &str = "sqs";
|
||||
|
||||
/// Default ARN prefix for SQS
|
||||
/// "arn:rustfs:sqs:"
|
||||
const ARN_PREFIX: &str = concat!("arn:", DEFAULT_ARN_PARTITION, ":", DEFAULT_ARN_SERVICE, ":");
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum TargetIDError {
|
||||
#[error("Invalid TargetID format '{0}', expect 'ID:Name'")]
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use crate::store::DEFAULT_LIMIT;
|
||||
use crate::{
|
||||
error::TargetError,
|
||||
target::{Target, mqtt::MQTTArgs, webhook::WebhookArgs},
|
||||
@@ -6,85 +5,30 @@ use crate::{
|
||||
use async_trait::async_trait;
|
||||
use ecstore::config::{ENABLE_KEY, ENABLE_ON, KVS};
|
||||
use rumqttc::QoS;
|
||||
use rustfs_config::notify::{
|
||||
DEFAULT_DIR, DEFAULT_LIMIT, ENV_MQTT_BROKER, ENV_MQTT_ENABLE, ENV_MQTT_KEEP_ALIVE_INTERVAL, ENV_MQTT_PASSWORD, ENV_MQTT_QOS,
|
||||
ENV_MQTT_QUEUE_DIR, ENV_MQTT_QUEUE_LIMIT, ENV_MQTT_RECONNECT_INTERVAL, ENV_MQTT_TOPIC, ENV_MQTT_USERNAME,
|
||||
ENV_WEBHOOK_AUTH_TOKEN, ENV_WEBHOOK_CLIENT_CERT, ENV_WEBHOOK_CLIENT_KEY, ENV_WEBHOOK_ENABLE, ENV_WEBHOOK_ENDPOINT,
|
||||
ENV_WEBHOOK_QUEUE_DIR, ENV_WEBHOOK_QUEUE_LIMIT, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS,
|
||||
MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN,
|
||||
WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
|
||||
};
|
||||
use rustfs_config::{DEFAULT_DELIMITER, ENV_WORD_DELIMITER_DASH};
|
||||
use std::time::Duration;
|
||||
use tracing::warn;
|
||||
use tracing::{debug, warn};
|
||||
use url::Url;
|
||||
|
||||
// --- Configuration Constants ---
|
||||
|
||||
// General
|
||||
|
||||
pub const DEFAULT_TARGET: &str = "1";
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_KAFKA_SUB_SYS: &str = "notify_kafka";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_MQTT_SUB_SYS: &str = "notify_mqtt";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_MY_SQL_SUB_SYS: &str = "notify_mysql";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_NATS_SUB_SYS: &str = "notify_nats";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_NSQ_SUB_SYS: &str = "notify_nsq";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_ES_SUB_SYS: &str = "notify_elasticsearch";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_AMQP_SUB_SYS: &str = "notify_amqp";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_POSTGRES_SUB_SYS: &str = "notify_postgres";
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_REDIS_SUB_SYS: &str = "notify_redis";
|
||||
pub const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook";
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub const NOTIFY_SUB_SYSTEMS: &[&str] = &[NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS];
|
||||
|
||||
// Webhook Keys
|
||||
pub const WEBHOOK_ENDPOINT: &str = "endpoint";
|
||||
pub const WEBHOOK_AUTH_TOKEN: &str = "auth_token";
|
||||
pub const WEBHOOK_QUEUE_LIMIT: &str = "queue_limit";
|
||||
pub const WEBHOOK_QUEUE_DIR: &str = "queue_dir";
|
||||
pub const WEBHOOK_CLIENT_CERT: &str = "client_cert";
|
||||
pub const WEBHOOK_CLIENT_KEY: &str = "client_key";
|
||||
|
||||
// Webhook Environment Variables
|
||||
const ENV_WEBHOOK_ENABLE: &str = "RUSTFS_NOTIFY_WEBHOOK_ENABLE";
|
||||
const ENV_WEBHOOK_ENDPOINT: &str = "RUSTFS_NOTIFY_WEBHOOK_ENDPOINT";
|
||||
const ENV_WEBHOOK_AUTH_TOKEN: &str = "RUSTFS_NOTIFY_WEBHOOK_AUTH_TOKEN";
|
||||
const ENV_WEBHOOK_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_LIMIT";
|
||||
const ENV_WEBHOOK_QUEUE_DIR: &str = "RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR";
|
||||
const ENV_WEBHOOK_CLIENT_CERT: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_CERT";
|
||||
const ENV_WEBHOOK_CLIENT_KEY: &str = "RUSTFS_NOTIFY_WEBHOOK_CLIENT_KEY";
|
||||
|
||||
// MQTT Keys
|
||||
pub const MQTT_BROKER: &str = "broker";
|
||||
pub const MQTT_TOPIC: &str = "topic";
|
||||
pub const MQTT_QOS: &str = "qos";
|
||||
pub const MQTT_USERNAME: &str = "username";
|
||||
pub const MQTT_PASSWORD: &str = "password";
|
||||
pub const MQTT_RECONNECT_INTERVAL: &str = "reconnect_interval";
|
||||
pub const MQTT_KEEP_ALIVE_INTERVAL: &str = "keep_alive_interval";
|
||||
pub const MQTT_QUEUE_DIR: &str = "queue_dir";
|
||||
pub const MQTT_QUEUE_LIMIT: &str = "queue_limit";
|
||||
|
||||
// MQTT Environment Variables
|
||||
const ENV_MQTT_ENABLE: &str = "RUSTFS_NOTIFY_MQTT_ENABLE";
|
||||
const ENV_MQTT_BROKER: &str = "RUSTFS_NOTIFY_MQTT_BROKER";
|
||||
const ENV_MQTT_TOPIC: &str = "RUSTFS_NOTIFY_MQTT_TOPIC";
|
||||
const ENV_MQTT_QOS: &str = "RUSTFS_NOTIFY_MQTT_QOS";
|
||||
const ENV_MQTT_USERNAME: &str = "RUSTFS_NOTIFY_MQTT_USERNAME";
|
||||
const ENV_MQTT_PASSWORD: &str = "RUSTFS_NOTIFY_MQTT_PASSWORD";
|
||||
const ENV_MQTT_RECONNECT_INTERVAL: &str = "RUSTFS_NOTIFY_MQTT_RECONNECT_INTERVAL";
|
||||
const ENV_MQTT_KEEP_ALIVE_INTERVAL: &str = "RUSTFS_NOTIFY_MQTT_KEEP_ALIVE_INTERVAL";
|
||||
const ENV_MQTT_QUEUE_DIR: &str = "RUSTFS_NOTIFY_MQTT_QUEUE_DIR";
|
||||
const ENV_MQTT_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_MQTT_QUEUE_LIMIT";
|
||||
|
||||
/// Helper function to get values from environment variables or KVS configurations.
|
||||
///
|
||||
/// It will give priority to reading from environment variables such as `BASE_ENV_KEY_ID` and fall back to the KVS configuration if it fails.
|
||||
fn get_config_value(id: &str, base_env_key: &str, config_key: &str, config: &KVS) -> Option<String> {
|
||||
let env_key = if id != DEFAULT_TARGET {
|
||||
format!("{}_{}", base_env_key, id.to_uppercase().replace('-', "_"))
|
||||
let env_key = if id != DEFAULT_DELIMITER {
|
||||
format!(
|
||||
"{}{}{}",
|
||||
base_env_key,
|
||||
DEFAULT_DELIMITER,
|
||||
id.to_uppercase().replace(ENV_WORD_DELIMITER_DASH, DEFAULT_DELIMITER)
|
||||
)
|
||||
} else {
|
||||
base_env_key.to_string()
|
||||
};
|
||||
@@ -123,11 +67,12 @@ impl TargetFactory for WebhookTargetFactory {
|
||||
|
||||
let endpoint = get(ENV_WEBHOOK_ENDPOINT, WEBHOOK_ENDPOINT)
|
||||
.ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?;
|
||||
let endpoint_url =
|
||||
Url::parse(&endpoint).map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {}", e)))?;
|
||||
let endpoint_url = Url::parse(&endpoint)
|
||||
.map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {} (value: '{}')", e, endpoint)))?;
|
||||
|
||||
let auth_token = get(ENV_WEBHOOK_AUTH_TOKEN, WEBHOOK_AUTH_TOKEN).unwrap_or_default();
|
||||
let queue_dir = get(ENV_WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_DIR).unwrap_or_default();
|
||||
let queue_dir = get(ENV_WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_DIR)
|
||||
.unwrap_or(DEFAULT_DIR.to_string());
|
||||
|
||||
let queue_limit = get(ENV_WEBHOOK_QUEUE_LIMIT, WEBHOOK_QUEUE_LIMIT)
|
||||
.and_then(|v| v.parse::<u64>().ok())
|
||||
@@ -163,7 +108,10 @@ impl TargetFactory for WebhookTargetFactory {
|
||||
|
||||
let endpoint = get(ENV_WEBHOOK_ENDPOINT, WEBHOOK_ENDPOINT)
|
||||
.ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?;
|
||||
Url::parse(&endpoint).map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {}", e)))?;
|
||||
debug!("endpoint: {}", endpoint);
|
||||
let parsed_endpoint = endpoint.trim();
|
||||
Url::parse(parsed_endpoint)
|
||||
.map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {} (value: '{}')", e, parsed_endpoint)))?;
|
||||
|
||||
let client_cert = get(ENV_WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_CERT).unwrap_or_default();
|
||||
let client_key = get(ENV_WEBHOOK_CLIENT_KEY, WEBHOOK_CLIENT_KEY).unwrap_or_default();
|
||||
@@ -174,7 +122,9 @@ impl TargetFactory for WebhookTargetFactory {
|
||||
));
|
||||
}
|
||||
|
||||
let queue_dir = get(ENV_WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_DIR).unwrap_or_default();
|
||||
let queue_dir = get(ENV_WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_DIR)
|
||||
.and_then(|v| v.parse::<String>().ok())
|
||||
.unwrap_or(DEFAULT_DIR.to_string());
|
||||
if !queue_dir.is_empty() && !std::path::Path::new(&queue_dir).is_absolute() {
|
||||
return Err(TargetError::Configuration("Webhook queue directory must be an absolute path".to_string()));
|
||||
}
|
||||
@@ -201,7 +151,8 @@ impl TargetFactory for MQTTTargetFactory {
|
||||
|
||||
let broker =
|
||||
get(ENV_MQTT_BROKER, MQTT_BROKER).ok_or_else(|| TargetError::Configuration("Missing MQTT broker".to_string()))?;
|
||||
let broker_url = Url::parse(&broker).map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {}", e)))?;
|
||||
let broker_url = Url::parse(&broker)
|
||||
.map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {} (value: '{}')", e, broker)))?;
|
||||
|
||||
let topic =
|
||||
get(ENV_MQTT_TOPIC, MQTT_TOPIC).ok_or_else(|| TargetError::Configuration("Missing MQTT topic".to_string()))?;
|
||||
@@ -229,7 +180,9 @@ impl TargetFactory for MQTTTargetFactory {
|
||||
.map(Duration::from_secs)
|
||||
.unwrap_or_else(|| Duration::from_secs(30));
|
||||
|
||||
let queue_dir = get(ENV_MQTT_QUEUE_DIR, MQTT_QUEUE_DIR).unwrap_or_default();
|
||||
let queue_dir = get(ENV_MQTT_QUEUE_DIR, MQTT_QUEUE_DIR)
|
||||
.and_then(|v| v.parse::<String>().ok())
|
||||
.unwrap_or(DEFAULT_DIR.to_string());
|
||||
let queue_limit = get(ENV_MQTT_QUEUE_LIMIT, MQTT_QUEUE_LIMIT)
|
||||
.and_then(|v| v.parse::<u64>().ok())
|
||||
.unwrap_or(DEFAULT_LIMIT);
|
||||
@@ -264,7 +217,8 @@ impl TargetFactory for MQTTTargetFactory {
|
||||
|
||||
let broker =
|
||||
get(ENV_MQTT_BROKER, MQTT_BROKER).ok_or_else(|| TargetError::Configuration("Missing MQTT broker".to_string()))?;
|
||||
let url = Url::parse(&broker).map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {}", e)))?;
|
||||
let url = Url::parse(&broker)
|
||||
.map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {} (value: '{}')", e, broker)))?;
|
||||
|
||||
match url.scheme() {
|
||||
"tcp" | "ssl" | "ws" | "wss" | "mqtt" | "mqtts" => {}
|
||||
@@ -286,7 +240,9 @@ impl TargetFactory for MQTTTargetFactory {
|
||||
}
|
||||
}
|
||||
|
||||
let queue_dir = get(ENV_MQTT_QUEUE_DIR, MQTT_QUEUE_DIR).unwrap_or_default();
|
||||
let queue_dir = get(ENV_MQTT_QUEUE_DIR, MQTT_QUEUE_DIR)
|
||||
.and_then(|v| v.parse::<String>().ok())
|
||||
.unwrap_or(DEFAULT_DIR.to_string());
|
||||
if !queue_dir.is_empty() {
|
||||
if !std::path::Path::new(&queue_dir).is_absolute() {
|
||||
return Err(TargetError::Configuration("MQTT queue directory must be an absolute path".to_string()));
|
||||
|
||||
@@ -2,10 +2,11 @@ use crate::{Event, EventArgs, NotificationError, NotificationSystem};
|
||||
use ecstore::config::Config;
|
||||
use once_cell::sync::Lazy;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use tracing::instrument;
|
||||
|
||||
static NOTIFICATION_SYSTEM: OnceLock<Arc<NotificationSystem>> = OnceLock::new();
|
||||
// Create a globally unique Notifier instance
|
||||
pub static GLOBAL_NOTIFIER: Lazy<Notifier> = Lazy::new(|| Notifier {});
|
||||
static GLOBAL_NOTIFIER: Lazy<Notifier> = Lazy::new(|| Notifier {});
|
||||
|
||||
/// Initialize the global notification system with the given configuration.
|
||||
/// This function should only be called once throughout the application life cycle.
|
||||
@@ -27,14 +28,22 @@ pub fn notification_system() -> Option<Arc<NotificationSystem>> {
|
||||
NOTIFICATION_SYSTEM.get().cloned()
|
||||
}
|
||||
|
||||
pub struct Notifier {
|
||||
// Notifier can hold state, but in this design we make it stateless,
|
||||
// Rely on getting an instance of NotificationSystem from the outside.
|
||||
/// Check if the notification system has been initialized.
|
||||
pub fn is_notification_system_initialized() -> bool {
|
||||
NOTIFICATION_SYSTEM.get().is_some()
|
||||
}
|
||||
|
||||
/// Returns a reference to the global Notifier instance.
|
||||
pub fn notifier_instance() -> &'static Notifier {
|
||||
&GLOBAL_NOTIFIER
|
||||
}
|
||||
|
||||
pub struct Notifier {}
|
||||
|
||||
impl Notifier {
|
||||
/// Notify an event asynchronously.
|
||||
/// This is the only entry point for all event notifications in the system.
|
||||
#[instrument(skip(self, args))]
|
||||
pub async fn notify(&self, args: EventArgs) {
|
||||
// Dependency injection or service positioning mode obtain NotificationSystem instance
|
||||
let notification_sys = match notification_system() {
|
||||
@@ -51,6 +60,11 @@ impl Notifier {
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if any subscribers are interested in the event
|
||||
if !notification_sys.has_subscriber(&args.bucket_name, &args.event_name).await {
|
||||
return;
|
||||
}
|
||||
|
||||
// Create an event and send it
|
||||
let event = Arc::new(Event::new(args));
|
||||
notification_sys.send_event(event).await;
|
||||
|
||||
@@ -5,8 +5,10 @@ use crate::{
|
||||
target::Target,
|
||||
};
|
||||
use ecstore::config::{Config, ENABLE_KEY, ENABLE_OFF, ENABLE_ON, KVS};
|
||||
use rustfs_config::notify::NOTIFY_ROUTE_PREFIX;
|
||||
use rustfs_config::{DEFAULT_DELIMITER, ENV_PREFIX};
|
||||
use std::collections::HashMap;
|
||||
use tracing::{error, info};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
/// Registry for managing target factories
|
||||
pub struct TargetRegistry {
|
||||
@@ -64,20 +66,58 @@ impl TargetRegistry {
|
||||
// Iterate through configuration sections
|
||||
for (section, subsections) in &config.0 {
|
||||
// Only process notification sections
|
||||
if !section.starts_with("notify_") {
|
||||
if !section.starts_with(NOTIFY_ROUTE_PREFIX) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Extract target type from section name
|
||||
let target_type = section.trim_start_matches("notify_");
|
||||
let target_type = section.trim_start_matches(NOTIFY_ROUTE_PREFIX);
|
||||
|
||||
// Iterate through subsections (each representing a target instance)
|
||||
for (target_id, target_config) in subsections {
|
||||
// Skip disabled targets
|
||||
if target_config.lookup(ENABLE_KEY).unwrap_or_else(|| ENABLE_OFF.to_string()) != ENABLE_ON {
|
||||
|
||||
let enable_from_config = target_config.lookup(ENABLE_KEY).unwrap_or_else(|| ENABLE_OFF.to_string());
|
||||
debug!("Target enablement from config: {}/{}: {}", target_type, target_id, enable_from_config);
|
||||
// Check environment variable for target enablement example: RUSTFS_NOTIFY_WEBHOOK_ENABLE|RUSTFS_NOTIFY_WEBHOOK_ENABLE_[TARGET_ID]
|
||||
let env_key = if target_id == DEFAULT_DELIMITER {
|
||||
// If no specific target ID, use the base target type, example: RUSTFS_NOTIFY_WEBHOOK_ENABLE
|
||||
format!(
|
||||
"{}{}{}{}{}",
|
||||
ENV_PREFIX,
|
||||
NOTIFY_ROUTE_PREFIX,
|
||||
target_type.to_uppercase(),
|
||||
DEFAULT_DELIMITER,
|
||||
ENABLE_KEY
|
||||
)
|
||||
} else {
|
||||
// If specific target ID, append it to the key, example: RUSTFS_NOTIFY_WEBHOOK_ENABLE_[TARGET_ID]
|
||||
format!(
|
||||
"{}{}{}{}{}{}{}",
|
||||
ENV_PREFIX,
|
||||
NOTIFY_ROUTE_PREFIX,
|
||||
target_type.to_uppercase(),
|
||||
DEFAULT_DELIMITER,
|
||||
ENABLE_KEY,
|
||||
DEFAULT_DELIMITER,
|
||||
target_id.to_uppercase()
|
||||
)
|
||||
}
|
||||
.to_uppercase();
|
||||
debug!("Target env key: {},Target id: {}", env_key, target_id);
|
||||
let enable_from_env = std::env::var(&env_key)
|
||||
.map(|v| v.eq_ignore_ascii_case(ENABLE_ON) || v.eq_ignore_ascii_case("true"))
|
||||
.unwrap_or(false);
|
||||
debug!("Target env value: {},key: {},Target id: {}", enable_from_env, env_key, target_id);
|
||||
debug!(
|
||||
"Target enablement from env: {}/{}: result: {}",
|
||||
target_type, target_id, enable_from_config
|
||||
);
|
||||
if enable_from_config != ENABLE_ON && !enable_from_env {
|
||||
info!("Skipping disabled target: {}/{}", target_type, target_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
debug!("create target: {}/{} start", target_type, target_id);
|
||||
// Create target
|
||||
match self.create_target(target_type, target_id.clone(), target_config).await {
|
||||
Ok(target) => {
|
||||
@@ -85,7 +125,7 @@ impl TargetRegistry {
|
||||
targets.push(target);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to create target {}/{}: {}", target_type, target_id, e);
|
||||
error!("Failed to create target {}/{}: reason: {}", target_type, target_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::error::StoreError;
|
||||
use rustfs_config::notify::{COMPRESS_EXT, DEFAULT_EXT, DEFAULT_LIMIT};
|
||||
use serde::{Serialize, de::DeserializeOwned};
|
||||
use snap::raw::{Decoder, Encoder};
|
||||
use std::sync::{Arc, RwLock};
|
||||
@@ -11,13 +12,6 @@ use std::{
|
||||
use tracing::{debug, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub const DEFAULT_LIMIT: u64 = 100000; // Default store limit
|
||||
pub const DEFAULT_EXT: &str = ".unknown"; // Default file extension
|
||||
pub const COMPRESS_EXT: &str = ".snappy"; // Extension for compressed files
|
||||
|
||||
/// STORE_EXTENSION - file extension of an event file in store
|
||||
pub const STORE_EXTENSION: &str = ".event";
|
||||
|
||||
/// Represents a key for an entry in the store
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Key {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::store::{Key, STORE_EXTENSION};
|
||||
use crate::store::Key;
|
||||
use crate::target::ChannelTargetType;
|
||||
use crate::{
|
||||
StoreError, Target,
|
||||
@@ -10,6 +10,7 @@ use crate::{
|
||||
use async_trait::async_trait;
|
||||
use rumqttc::{AsyncClient, EventLoop, MqttOptions, Outgoing, Packet, QoS};
|
||||
use rumqttc::{ConnectionError, mqttbytes::Error as MqttBytesError};
|
||||
use rustfs_config::notify::STORE_EXTENSION;
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use crate::store::STORE_EXTENSION;
|
||||
use crate::target::ChannelTargetType;
|
||||
use crate::{
|
||||
StoreError, Target,
|
||||
@@ -9,6 +8,7 @@ use crate::{
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use reqwest::{Client, StatusCode, Url};
|
||||
use rustfs_config::notify::STORE_EXTENSION;
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
sync::{
|
||||
@@ -41,8 +41,8 @@ pub struct WebhookArgs {
|
||||
pub client_key: String,
|
||||
}
|
||||
|
||||
// WebhookArgs 的验证方法
|
||||
impl WebhookArgs {
|
||||
/// WebhookArgs verification method
|
||||
pub fn validate(&self) -> Result<(), TargetError> {
|
||||
if !self.enable {
|
||||
return Ok(());
|
||||
@@ -134,7 +134,7 @@ impl WebhookTarget {
|
||||
target_id.name,
|
||||
target_id.id
|
||||
));
|
||||
let store = super::super::store::QueueStore::<Event>::new(queue_dir, args.queue_limit, STORE_EXTENSION);
|
||||
let store = crate::store::QueueStore::<Event>::new(queue_dir, args.queue_limit, STORE_EXTENSION);
|
||||
|
||||
if let Err(e) = store.open() {
|
||||
error!("Failed to open store for Webhook target {}: {}", target_id.id, e);
|
||||
@@ -172,9 +172,9 @@ impl WebhookTarget {
|
||||
}
|
||||
|
||||
async fn init(&self) -> Result<(), TargetError> {
|
||||
// 使用 CAS 操作确保线程安全初始化
|
||||
// Use CAS operations to ensure thread-safe initialization
|
||||
if !self.initialized.load(Ordering::SeqCst) {
|
||||
// 检查连接
|
||||
// Check the connection
|
||||
match self.is_active().await {
|
||||
Ok(true) => {
|
||||
info!("Webhook target {} is active", self.id);
|
||||
@@ -209,12 +209,12 @@ impl WebhookTarget {
|
||||
let data =
|
||||
serde_json::to_vec(&log).map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {}", e)))?;
|
||||
|
||||
// Vec<u8> 转换为 String
|
||||
// Vec<u8> Convert to String
|
||||
let data_string = String::from_utf8(data.clone())
|
||||
.map_err(|e| TargetError::Encoding(format!("Failed to convert event data to UTF-8: {}", e)))?;
|
||||
debug!("Sending event to webhook target: {}, event log: {}", self.id, data_string);
|
||||
|
||||
// 构建请求
|
||||
// build request
|
||||
let mut req_builder = self
|
||||
.http_client
|
||||
.post(self.args.endpoint.as_str())
|
||||
|
||||
@@ -14,7 +14,7 @@ workspace = true
|
||||
default = []
|
||||
|
||||
[dependencies]
|
||||
rustfs-config = { workspace = true, features = ["constants"] }
|
||||
rustfs-config = { workspace = true, features = ["constants", "notify"] }
|
||||
async-trait.workspace = true
|
||||
backon.workspace = true
|
||||
blake2 = { workspace = true }
|
||||
@@ -92,7 +92,7 @@ urlencoding = { workspace = true }
|
||||
smallvec = { workspace = true }
|
||||
shadow-rs.workspace = true
|
||||
rustfs-filemeta.workspace = true
|
||||
rustfs-utils ={workspace = true, features=["full"]}
|
||||
rustfs-utils = { workspace = true, features = ["full"] }
|
||||
rustfs-rio.workspace = true
|
||||
futures-util.workspace = true
|
||||
reader = { workspace = true }
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::error::{Error, Result};
|
||||
use crate::store_api::{ObjectInfo, ObjectOptions, PutObjReader, StorageAPI};
|
||||
use http::HeaderMap;
|
||||
use lazy_static::lazy_static;
|
||||
use rustfs_config::DEFAULT_DELIMITER;
|
||||
use rustfs_utils::path::SLASH_SEPARATOR;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
@@ -13,7 +14,6 @@ pub const CONFIG_PREFIX: &str = "config";
|
||||
const CONFIG_FILE: &str = "config.json";
|
||||
|
||||
pub const STORAGE_CLASS_SUB_SYS: &str = "storage_class";
|
||||
pub const DEFAULT_KV_KEY: &str = "_";
|
||||
|
||||
lazy_static! {
|
||||
static ref CONFIG_BUCKET: String = format!("{}{}{}", RUSTFS_META_BUCKET, SLASH_SEPARATOR, CONFIG_PREFIX);
|
||||
@@ -193,7 +193,7 @@ async fn apply_dynamic_config<S: StorageAPI>(cfg: &mut Config, api: Arc<S>) -> R
|
||||
async fn apply_dynamic_config_for_sub_sys<S: StorageAPI>(cfg: &mut Config, api: Arc<S>, subsys: &str) -> Result<()> {
|
||||
let set_drive_counts = api.set_drive_counts();
|
||||
if subsys == STORAGE_CLASS_SUB_SYS {
|
||||
let kvs = cfg.get_value(STORAGE_CLASS_SUB_SYS, DEFAULT_KV_KEY).unwrap_or_default();
|
||||
let kvs = cfg.get_value(STORAGE_CLASS_SUB_SYS, DEFAULT_DELIMITER).unwrap_or_default();
|
||||
|
||||
for (i, count) in set_drive_counts.iter().enumerate() {
|
||||
match storageclass::lookup_config(&kvs, *count) {
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
pub mod com;
|
||||
#[allow(dead_code)]
|
||||
pub mod heal;
|
||||
mod notify;
|
||||
pub mod storageclass;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::store::ECStore;
|
||||
use com::{STORAGE_CLASS_SUB_SYS, lookup_configs, read_config_without_migrate};
|
||||
use lazy_static::lazy_static;
|
||||
use rustfs_config::DEFAULT_DELIMITER;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
@@ -120,11 +122,11 @@ impl Config {
|
||||
for (k, v) in defaults.iter() {
|
||||
if !self.0.contains_key(k) {
|
||||
let mut default = HashMap::new();
|
||||
default.insert("_".to_owned(), v.clone());
|
||||
default.insert(DEFAULT_DELIMITER.to_owned(), v.clone());
|
||||
self.0.insert(k.clone(), default);
|
||||
} else if !self.0[k].contains_key("_") {
|
||||
} else if !self.0[k].contains_key(DEFAULT_DELIMITER) {
|
||||
if let Some(m) = self.0.get_mut(k) {
|
||||
m.insert("_".to_owned(), v.clone());
|
||||
m.insert(DEFAULT_DELIMITER.to_owned(), v.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -160,7 +162,16 @@ pub fn register_default_kvs(kvs: HashMap<String, KVS>) {
|
||||
|
||||
pub fn init() {
|
||||
let mut kvs = HashMap::new();
|
||||
// Load storageclass default configuration
|
||||
kvs.insert(STORAGE_CLASS_SUB_SYS.to_owned(), storageclass::DefaultKVS.clone());
|
||||
// TODO: other default
|
||||
// New: Loading default configurations for notify_webhook and notify_mqtt
|
||||
// Referring subsystem names through constants to improve the readability and maintainability of the code
|
||||
kvs.insert(
|
||||
rustfs_config::notify::NOTIFY_WEBHOOK_SUB_SYS.to_owned(),
|
||||
notify::DefaultWebhookKVS.clone(),
|
||||
);
|
||||
kvs.insert(rustfs_config::notify::NOTIFY_MQTT_SUB_SYS.to_owned(), notify::DefaultMqttKVS.clone());
|
||||
|
||||
// Register all default configurations
|
||||
register_default_kvs(kvs)
|
||||
}
|
||||
|
||||
37
ecstore/src/config/notify.rs
Normal file
37
ecstore/src/config/notify.rs
Normal file
@@ -0,0 +1,37 @@
|
||||
use crate::config::{ENABLE_KEY, ENABLE_OFF, KV, KVS};
|
||||
use lazy_static::lazy_static;
|
||||
use rustfs_config::notify::{
|
||||
DEFAULT_DIR, DEFAULT_LIMIT, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT,
|
||||
MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY,
|
||||
WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
|
||||
};
|
||||
|
||||
lazy_static! {
|
||||
/// The default configuration collection of webhooks,
|
||||
/// Use lazy_static! to ensure that these configurations are initialized only once during the program life cycle, enabling high-performance lazy loading.
|
||||
pub static ref DefaultWebhookKVS: KVS = KVS(vec![
|
||||
KV { key: ENABLE_KEY.to_owned(), value: ENABLE_OFF.to_owned(), hidden_if_empty: false },
|
||||
KV { key: WEBHOOK_ENDPOINT.to_owned(), value: "".to_owned(), hidden_if_empty: false },
|
||||
// Sensitive information such as authentication tokens is hidden when the value is empty, enhancing security
|
||||
KV { key: WEBHOOK_AUTH_TOKEN.to_owned(), value: "".to_owned(), hidden_if_empty: true },
|
||||
KV { key: WEBHOOK_QUEUE_LIMIT.to_owned(), value: DEFAULT_LIMIT.to_string().to_owned(), hidden_if_empty: false },
|
||||
KV { key: WEBHOOK_QUEUE_DIR.to_owned(), value: DEFAULT_DIR.to_owned(), hidden_if_empty: false },
|
||||
KV { key: WEBHOOK_CLIENT_CERT.to_owned(), value: "".to_owned(), hidden_if_empty: false },
|
||||
KV { key: WEBHOOK_CLIENT_KEY.to_owned(), value: "".to_owned(), hidden_if_empty: false },
|
||||
]);
|
||||
|
||||
/// MQTT's default configuration collection
|
||||
pub static ref DefaultMqttKVS: KVS = KVS(vec![
|
||||
KV { key: ENABLE_KEY.to_owned(), value: ENABLE_OFF.to_owned(), hidden_if_empty: false },
|
||||
KV { key: MQTT_BROKER.to_owned(), value: "".to_owned(), hidden_if_empty: false },
|
||||
KV { key: MQTT_TOPIC.to_owned(), value: "".to_owned(), hidden_if_empty: false },
|
||||
// Sensitive information such as passwords are hidden when the value is empty
|
||||
KV { key: MQTT_PASSWORD.to_owned(), value: "".to_owned(), hidden_if_empty: true },
|
||||
KV { key: MQTT_USERNAME.to_owned(), value: "".to_owned(), hidden_if_empty: false },
|
||||
KV { key: MQTT_QOS.to_owned(), value: "0".to_owned(), hidden_if_empty: false },
|
||||
KV { key: MQTT_KEEP_ALIVE_INTERVAL.to_owned(), value: "0s".to_owned(), hidden_if_empty: false },
|
||||
KV { key: MQTT_RECONNECT_INTERVAL.to_owned(), value: "0s".to_owned(), hidden_if_empty: false },
|
||||
KV { key: MQTT_QUEUE_DIR.to_owned(), value: DEFAULT_DIR.to_owned(), hidden_if_empty: false },
|
||||
KV { key: MQTT_QUEUE_LIMIT.to_owned(), value: DEFAULT_LIMIT.to_string().to_owned(), hidden_if_empty: false },
|
||||
]);
|
||||
}
|
||||
@@ -57,7 +57,7 @@ protos.workspace = true
|
||||
query = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
rmp-serde.workspace = true
|
||||
rustfs-config = { workspace = true, features = ["constants"] }
|
||||
rustfs-config = { workspace = true, features = ["constants", "notify"] }
|
||||
rustfs-notify = { workspace = true }
|
||||
rustfs-obs = { workspace = true }
|
||||
rustfs-utils = { workspace = true, features = ["full"] }
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use ecstore::config::GLOBAL_ServerConfig;
|
||||
use rustfs_config::DEFAULT_DELIMITER;
|
||||
use tracing::{error, info, instrument};
|
||||
|
||||
#[instrument]
|
||||
@@ -14,8 +15,15 @@ pub(crate) async fn init_event_notifier() {
|
||||
}
|
||||
};
|
||||
|
||||
info!("Global server configuration loaded successfully. config: {:?}", server_config);
|
||||
// 2. Check if the notify subsystem exists in the configuration, and skip initialization if it doesn't
|
||||
if server_config.get_value("notify", "_").is_none() {
|
||||
if server_config
|
||||
.get_value(rustfs_config::notify::NOTIFY_MQTT_SUB_SYS, DEFAULT_DELIMITER)
|
||||
.is_none()
|
||||
|| server_config
|
||||
.get_value(rustfs_config::notify::NOTIFY_WEBHOOK_SUB_SYS, DEFAULT_DELIMITER)
|
||||
.is_none()
|
||||
{
|
||||
info!("'notify' subsystem not configured, skipping event notifier initialization.");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -634,7 +634,7 @@ fn handle_connection_error(err: &(dyn std::error::Error + 'static)) {
|
||||
} else {
|
||||
error!("Unknown hyper error:{:?}", hyper_err);
|
||||
}
|
||||
} else if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
|
||||
} else if let Some(io_err) = err.downcast_ref::<Error>() {
|
||||
error!("Unknown connection IO error:{}", io_err);
|
||||
} else {
|
||||
error!("Unknown connection error type:{:?}", err);
|
||||
|
||||
17
scripts/notify.sh
Executable file
17
scripts/notify.sh
Executable file
@@ -0,0 +1,17 @@
|
||||
#!/bin/bash -e
|
||||
|
||||
current_dir=$(pwd)
|
||||
echo "Current directory: $current_dir"
|
||||
|
||||
if [ -z "$RUST_LOG" ]; then
|
||||
export RUST_BACKTRACE=1
|
||||
export RUST_LOG="rustfs=info,ecstore=info,s3s=debug,iam=info"
|
||||
fi
|
||||
|
||||
# deploy/logs/notify directory
|
||||
echo "Creating log directory if it does not exist..."
|
||||
mkdir -p "$current_dir/deploy/logs/notify"
|
||||
|
||||
# 启动 webhook 服务器
|
||||
echo "Starting webhook server..."
|
||||
cargo run --example webhook -p rustfs-notify &
|
||||
@@ -12,6 +12,7 @@ if [ -z "$SKIP_BUILD" ]; then
|
||||
fi
|
||||
|
||||
current_dir=$(pwd)
|
||||
echo "Current directory: $current_dir"
|
||||
|
||||
# mkdir -p ./target/volume/test
|
||||
mkdir -p ./target/volume/test{0..4}
|
||||
@@ -19,7 +20,7 @@ mkdir -p ./target/volume/test{0..4}
|
||||
|
||||
if [ -z "$RUST_LOG" ]; then
|
||||
export RUST_BACKTRACE=1
|
||||
export RUST_LOG="rustfs=info,ecstore=info,s3s=debug,iam=info"
|
||||
export RUST_LOG="rustfs=debug,ecstore=info,s3s=debug,iam=info"
|
||||
fi
|
||||
|
||||
# export RUSTFS_ERASURE_SET_DRIVE_COUNT=5
|
||||
@@ -45,12 +46,12 @@ export RUSTFS_OBS_ENDPOINT=http://localhost:4317 # OpenTelemetry Collector 的
|
||||
#export RUSTFS_OBS_ENVIRONMENT=develop # 环境名称
|
||||
export RUSTFS_OBS_LOGGER_LEVEL=debug # 日志级别,支持 trace, debug, info, warn, error
|
||||
export RUSTFS_OBS_LOCAL_LOGGING_ENABLED=true # 是否启用本地日志记录
|
||||
export RUSTFS_OBS_LOG_DIRECTORY="./deploy/logs" # Log directory
|
||||
export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
|
||||
export RUSTFS_OBS_LOG_ROTATION_TIME="minute" # Log rotation time unit, can be "second", "minute", "hour", "day"
|
||||
export RUSTFS_OBS_LOG_ROTATION_SIZE_MB=1 # Log rotation size in MB
|
||||
|
||||
#
|
||||
export RUSTFS_SINKS_FILE_PATH=./deploy/logs/rustfs.log
|
||||
export RUSTFS_SINKS_FILE_PATH="$current_dir/deploy/logs/rustfs.log"
|
||||
export RUSTFS_SINKS_FILE_BUFFER_SIZE=12
|
||||
export RUSTFS_SINKS_FILE_FLUSH_INTERVAL_MS=1000
|
||||
export RUSTFS_SINKS_FILE_FLUSH_THRESHOLD=100
|
||||
@@ -74,6 +75,12 @@ export OTEL_INSTRUMENTATION_VERSION="0.1.1"
|
||||
export OTEL_INSTRUMENTATION_SCHEMA_URL="https://opentelemetry.io/schemas/1.31.0"
|
||||
export OTEL_INSTRUMENTATION_ATTRIBUTES="env=production"
|
||||
|
||||
# notify
|
||||
export RUSTFS_NOTIFY_WEBHOOK_ENABLE="true" # 是否启用 webhook 通知
|
||||
export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT="http://[::]:3020/webhook" # webhook 通知地址
|
||||
export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/notify"
|
||||
|
||||
|
||||
export RUSTFS_NS_SCANNER_INTERVAL=60 # 对象扫描间隔时间,单位为秒
|
||||
# exportRUSTFS_SKIP_BACKGROUND_TASK=true
|
||||
|
||||
@@ -87,6 +94,6 @@ if [ -n "$1" ]; then
|
||||
fi
|
||||
|
||||
# 启动 webhook 服务器
|
||||
#cargo run --example webhook -p rustfs-event &
|
||||
#cargo run --example webhook -p rustfs-notify &
|
||||
# 启动主服务
|
||||
cargo run --bin rustfs
|
||||
Reference in New Issue
Block a user