This commit is contained in:
houseme
2025-06-23 04:15:05 +08:00
parent 928453db62
commit 5155a3d544
8 changed files with 16 additions and 20 deletions

View File

@@ -4,12 +4,11 @@ use rustfs_notify::factory::{
DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME,
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
};
use rustfs_notify::global::notification_system;
use rustfs_notify::store::DEFAULT_LIMIT;
use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError};
use rustfs_notify::{initialize, notification_system};
use std::time::Duration;
use tracing::info;
use tracing_subscriber::util::SubscriberInitExt;
#[tokio::main]
async fn main() -> Result<(), NotificationError> {
@@ -19,7 +18,7 @@ async fn main() -> Result<(), NotificationError> {
Some(sys) => sys,
None => {
let config = Config::new();
notification_system::initialize(config).await?;
initialize(config).await?;
notification_system().expect("Failed to initialize notification system")
}
};

View File

@@ -5,12 +5,11 @@ use rustfs_notify::factory::{
DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME,
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
};
use rustfs_notify::global::notification_system;
use rustfs_notify::store::DEFAULT_LIMIT;
use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError};
use rustfs_notify::{initialize, notification_system};
use std::time::Duration;
use tracing::info;
use tracing_subscriber::util::SubscriberInitExt;
#[tokio::main]
async fn main() -> Result<(), NotificationError> {
@@ -21,7 +20,7 @@ async fn main() -> Result<(), NotificationError> {
Some(sys) => sys,
None => {
let config = Config::new();
notification_system::initialize(config).await?;
initialize(config).await?;
notification_system().expect("Failed to initialize notification system")
}
};

View File

@@ -488,13 +488,12 @@ impl Event {
s3_metadata.object.etag = args.object.etag.clone();
s3_metadata.object.content_type = args.object.content_type.clone();
// Filter out internal reserved metadata
let user_metadata = args
.object
.user_defined
.iter()
.filter(|&(k, v)| !k.to_lowercase().starts_with("x-amz-meta-internal-"))
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<HashMap<String, String>>();
let mut user_metadata = HashMap::new();
for (k, v) in &args.object.user_defined.unwrap_or_default() {
if !k.to_lowercase().starts_with("x-amz-meta-internal-") {
user_metadata.insert(k.clone(), v.clone());
}
}
s3_metadata.object.user_metadata = Some(user_metadata);
}

View File

@@ -32,7 +32,7 @@ pub struct Notifier {
// Rely on getting an instance of NotificationSystem from the outside.
}
impl crate::notifier::Notifier {
impl Notifier {
/// Notify an event asynchronously.
/// This is the only entry point for all event notifications in the system.
pub async fn notify(&self, args: EventArgs) {

View File

@@ -73,10 +73,6 @@ pub struct Opt {
#[arg(long, env = "RUSTFS_LICENSE")]
pub license: Option<String>,
/// event notifier config file
#[arg(long, env = "RUSTFS_EVENT_CONFIG")]
pub event_config: Option<String>,
}
// lazy_static::lazy_static! {

View File

@@ -516,7 +516,7 @@ async fn run(opt: config::Opt) -> Result<()> {
// GLOBAL_EVENT_SYS.init(store.clone()).await?;
// Initialize event notifier
event::init_event_notifier(opt.event_config).await;
event::init_event_notifier().await;
let buckets_list = store
.list_bucket(&BucketOptions {

View File

@@ -66,7 +66,6 @@ use policy::policy::Validator;
use query::instance::make_rustfsms;
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
use rustfs_notify::EventName;
use rustfs_rio::CompressReader;
use rustfs_rio::HashReader;
use rustfs_rio::Reader;

View File

@@ -3,6 +3,7 @@ use s3s::{S3Request, S3Response};
use std::collections::HashMap;
/// Extract request parameters from S3Request, mainly header information.
#[allow(dead_code)]
pub fn extract_req_params<T>(req: &S3Request<T>) -> HashMap<String, String> {
let mut params = HashMap::new();
for (key, value) in req.headers.iter() {
@@ -14,6 +15,7 @@ pub fn extract_req_params<T>(req: &S3Request<T>) -> HashMap<String, String> {
}
/// Extract response elements from S3Response, mainly header information.
#[allow(dead_code)]
pub fn extract_resp_elements<T>(resp: &S3Response<T>) -> HashMap<String, String> {
let mut params = HashMap::new();
for (key, value) in resp.headers.iter() {
@@ -25,6 +27,7 @@ pub fn extract_resp_elements<T>(resp: &S3Response<T>) -> HashMap<String, String>
}
/// Get host from header information.
#[allow(dead_code)]
pub fn get_request_host(headers: &HeaderMap) -> String {
headers
.get("host")
@@ -34,6 +37,7 @@ pub fn get_request_host(headers: &HeaderMap) -> String {
}
/// Get user-agent from header information.
#[allow(dead_code)]
pub fn get_request_user_agent(headers: &HeaderMap) -> String {
headers
.get("user-agent")