From 7411283fc8a9a9bfe0bc1a25d5bbb2fb983a85e6 Mon Sep 17 00:00:00 2001 From: houseme Date: Tue, 22 Apr 2025 09:14:09 +0800 Subject: [PATCH] improve code --- .gitignore | 1 + Cargo.lock | 2 + crates/event-notifier/Cargo.toml | 3 + crates/event-notifier/examples/full.rs | 135 +++++++++++++++++++ crates/event-notifier/examples/webhook.rs | 17 +++ crates/event-notifier/src/adapter/kafka.rs | 13 +- crates/event-notifier/src/adapter/webhook.rs | 1 + crates/event-notifier/src/bus.rs | 48 +++++-- crates/event-notifier/src/error.rs | 2 +- crates/event-notifier/src/global.rs | 51 +++++-- crates/event-notifier/src/notifier.rs | 19 ++- crates/event-notifier/src/store.rs | 1 + 12 files changed, 257 insertions(+), 36 deletions(-) create mode 100644 crates/event-notifier/examples/full.rs create mode 100644 crates/event-notifier/examples/webhook.rs diff --git a/.gitignore b/.gitignore index 9b826609..8f19d5fc 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ cli/rustfs-gui/embedded-rustfs/rustfs deploy/config/obs.toml *.log deploy/certs/* +*jsonl \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 917fd144..fe139f85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7350,8 +7350,10 @@ name = "rustfs-event-notifier" version = "0.0.1" dependencies = [ "async-trait", + "axum", "dotenvy", "figment", + "http", "rdkafka", "reqwest", "rumqttc", diff --git a/crates/event-notifier/Cargo.toml b/crates/event-notifier/Cargo.toml index ce5b1f04..c5329494 100644 --- a/crates/event-notifier/Cargo.toml +++ b/crates/event-notifier/Cargo.toml @@ -30,9 +30,12 @@ tokio = { workspace = true, features = ["sync", "net", "macros", "signal", "rt-m tokio-util = { workspace = true } uuid = { workspace = true, features = ["v4", "serde"] } + [dev-dependencies] tokio = { workspace = true, features = ["test-util"] } tracing-subscriber = { workspace = true } +http = { workspace = true } +axum = { workspace = true } [lints] workspace = true diff --git a/crates/event-notifier/examples/full.rs b/crates/event-notifier/examples/full.rs new file mode 100644 index 00000000..5e3cd549 --- /dev/null +++ b/crates/event-notifier/examples/full.rs @@ -0,0 +1,135 @@ +use rustfs_event_notifier::{ + AdapterConfig, Bucket, Error as NotifierError, Event, Identity, Metadata, Name, NotificationConfig, Object, Source, + WebhookConfig, +}; +use std::collections::HashMap; +use tokio::signal; +use tracing::Level; +use tracing_subscriber::FmtSubscriber; + +async fn setup_notification_system() -> Result<(), NotifierError> { + let config = NotificationConfig { + store_path: "./deploy/logs/event_store".into(), + channel_capacity: 100, + adapters: vec![AdapterConfig::Webhook(WebhookConfig { + endpoint: "http://127.0.0.1:3000/webhook".into(), + auth_token: Some("your-auth-token".into()), + custom_headers: Some(HashMap::new()), + max_retries: 3, + timeout: 30, + })], + }; + + rustfs_event_notifier::initialize(config).await?; + + // wait for the system to be ready + for _ in 0..50 { + // wait up to 5 seconds + if rustfs_event_notifier::is_ready() { + return Ok(()); + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + + Err(NotifierError::custom("notify the system of initialization timeout")) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // initialization log + // tracing_subscriber::fmt::init(); + + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::DEBUG) // set to debug or lower level + .with_target(false) // simplify output + .finish(); + tracing::subscriber::set_global_default(subscriber) + .expect("failed to set up log subscriber"); + + // set up notification system + if let Err(e) = setup_notification_system().await { + eprintln!("unable to initialize notification system:{}", e); + return Err(e.into()); + } + + // create a shutdown signal processing + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + + // start signal processing task + tokio::spawn(async move { + let _ = signal::ctrl_c().await; + println!("Received the shutdown signal and prepared to exit..."); + let _ = shutdown_tx.send(()); + }); + + // main application logic + tokio::select! { + _ = async { + loop { + // application logic + // create an s3 metadata object + let metadata = Metadata { + schema_version: "1.0".to_string(), + configuration_id: "test-config".to_string(), + bucket: Bucket { + name: "my-bucket".to_string(), + owner_identity: Identity { + principal_id: "owner123".to_string(), + }, + arn: "arn:aws:s3:::my-bucket".to_string(), + }, + object: Object { + key: "test.txt".to_string(), + size: Some(1024), + etag: Some("abc123".to_string()), + content_type: Some("text/plain".to_string()), + user_metadata: None, + version_id: None, + sequencer: "1234567890".to_string(), + }, + }; + + // create source object + let source = Source { + host: "localhost".to_string(), + port: "80".to_string(), + user_agent: "curl/7.68.0".to_string(), + }; + + // create events using builder mode + let event = Event::builder() + .event_time("2023-10-01T12:00:00.000Z") + .event_name(Name::ObjectCreatedPut) + .user_identity(Identity { + principal_id: "user123".to_string(), + }) + .s3(metadata) + .source(source) + .channels(vec!["webhook".to_string()]) + .build() + .expect("failed to create event"); + + if let Err(e) = rustfs_event_notifier::send_event(event).await { + eprintln!("send event failed:{}", e); + } + + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + } + } => {}, + + _ = &mut shutdown_rx => { + println!("close the app"); + } + } + + // 优雅关闭通知系统 + println!("turn off the notification system"); + if let Err(e) = rustfs_event_notifier::shutdown().await { + eprintln!("An error occurred while shutting down the notification system:{}", e); + } else { + println!("the notification system has been closed safely"); + } + + println!("the application has been closed safely"); + Ok(()) +} diff --git a/crates/event-notifier/examples/webhook.rs b/crates/event-notifier/examples/webhook.rs new file mode 100644 index 00000000..3b9caad7 --- /dev/null +++ b/crates/event-notifier/examples/webhook.rs @@ -0,0 +1,17 @@ +use axum::{extract::Json, http::StatusCode, routing::post, Router}; +use serde_json::Value; +use std::time::{SystemTime, UNIX_EPOCH}; + +#[tokio::main] +async fn main() { + // 构建应用 + let app = Router::new().route("/webhook", post(receive_webhook)); + // 启动服务器 + let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); + axum::serve(listener, app).await.unwrap(); +} + +async fn receive_webhook(Json(payload): Json) -> StatusCode { + println!("收到 webhook 请求 time: {},内容:{}", SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string(), serde_json::to_string_pretty(&payload).unwrap()); + StatusCode::OK +} diff --git a/crates/event-notifier/src/adapter/kafka.rs b/crates/event-notifier/src/adapter/kafka.rs index cfb55e6d..0abd1f00 100644 --- a/crates/event-notifier/src/adapter/kafka.rs +++ b/crates/event-notifier/src/adapter/kafka.rs @@ -38,17 +38,12 @@ impl KafkaAdapter { let payload = serde_json::to_string(&event)?; for attempt in 0..self.max_retries { - let record = FutureRecord::to(&self.topic) - .key(&event_id) - .payload(&payload); + let record = FutureRecord::to(&self.topic).key(&event_id).payload(&payload); match self.producer.send(record, Timeout::Never).await { Ok(_) => return Ok(()), Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) => { - tracing::warn!( - "Kafka attempt {} failed: Queue full. Retrying...", - attempt + 1 - ); + tracing::warn!("Kafka attempt {} failed: Queue full. Retrying...", attempt + 1); sleep(Duration::from_secs(2u64.pow(attempt))).await; } Err((e, _)) => { @@ -58,9 +53,7 @@ impl KafkaAdapter { } } - Err(Error::Custom( - "Exceeded maximum retry attempts for Kafka message".to_string(), - )) + Err(Error::Custom("Exceeded maximum retry attempts for Kafka message".to_string())) } } diff --git a/crates/event-notifier/src/adapter/webhook.rs b/crates/event-notifier/src/adapter/webhook.rs index 80b8c9cb..447c463e 100644 --- a/crates/event-notifier/src/adapter/webhook.rs +++ b/crates/event-notifier/src/adapter/webhook.rs @@ -45,6 +45,7 @@ impl ChannelAdapter for WebhookAdapter { async fn send(&self, event: &Event) -> Result<(), Error> { let mut attempt = 0; + tracing::info!("Attempting to send webhook request: {:?}", event); loop { match self.build_request(event).send().await { Ok(response) => { diff --git a/crates/event-notifier/src/bus.rs b/crates/event-notifier/src/bus.rs index 55e5cd8b..c04733a0 100644 --- a/crates/event-notifier/src/bus.rs +++ b/crates/event-notifier/src/bus.rs @@ -5,6 +5,7 @@ use crate::{Event, Log}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; +use tokio::time::Duration; use tokio_util::sync::CancellationToken; /// Handles incoming events from the producer. @@ -16,14 +17,15 @@ pub async fn event_bus( adapters: Vec>, store: Arc, shutdown: CancellationToken, + shutdown_complete: Option>, ) -> Result<(), Error> { - let mut pending_logs = Vec::new(); let mut current_log = Log { event_name: crate::event::Name::Everything, key: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string(), records: Vec::new(), }; + let mut unprocessed_events = Vec::new(); loop { tokio::select! { Some(event) = rx.recv() => { @@ -45,23 +47,49 @@ pub async fn event_bus( } for task in send_tasks { if task.await?.is_err() { - current_log.records.retain(|e| e.id != event.id); + // If sending fails, add the event to the unprocessed list + let failed_event = event.clone(); + unprocessed_events.push(failed_event); } } - if !current_log.records.is_empty() { - pending_logs.push(current_log.clone()); - } - current_log.records.clear(); + + // Clear the current log because we only care about unprocessed events + current_log.records.clear(); } _ = shutdown.cancelled() => { tracing::info!("Shutting down event bus, saving pending logs..."); - if !current_log.records.is_empty() { - pending_logs.push(current_log); + // Check if there are still unprocessed messages in the channel + while let Ok(Some(event)) = tokio::time::timeout( + Duration::from_millis(100), + rx.recv() + ).await { + unprocessed_events.push(event); } - store.save_logs(&pending_logs).await?; + + // save only if there are unprocessed events + if !unprocessed_events.is_empty() { + tracing::info!("Save {} unhandled events", unprocessed_events.len()); + // create and save logging + let shutdown_log = Log { + event_name: crate::event::Name::Everything, + key: format!("shutdown_{}", SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()), + records: unprocessed_events, + }; + + store.save_logs(&[shutdown_log]).await?; + } else { + tracing::info!("no unhandled events need to be saved"); + } + tracing::info!("shutdown_complete is Some: {}", shutdown_complete.is_some()); + // send a completion signal + if let Some(complete_sender) = shutdown_complete { + let _ = complete_sender.send(()); + tracing::info!("Shutting down event bus"); + } + tracing::info!("Event bus shutdown complete"); break; } - else => break, + // else => break, } } Ok(()) diff --git a/crates/event-notifier/src/error.rs b/crates/event-notifier/src/error.rs index 7a7d3192..69a30adf 100644 --- a/crates/event-notifier/src/error.rs +++ b/crates/event-notifier/src/error.rs @@ -39,7 +39,7 @@ pub enum Error { } impl Error { - pub(crate) fn custom(msg: &str) -> Error { + pub fn custom(msg: &str) -> Error { Self::Custom(msg.to_string()) } } diff --git a/crates/event-notifier/src/global.rs b/crates/event-notifier/src/global.rs index 1c75a88e..bad6022a 100644 --- a/crates/event-notifier/src/global.rs +++ b/crates/event-notifier/src/global.rs @@ -46,7 +46,7 @@ pub async fn initialize(config: NotificationConfig) -> Result<(), Error> { } // check if config adapters len is than 0 - if config.adapters.is_empty() || config.adapters.len() == 0 { + if config.adapters.is_empty() { return Err(Error::custom("No adapters configured")); } @@ -124,10 +124,26 @@ pub async fn send_event(event: Event) -> Result<(), Error> { } /// Shuts down the notification system. -pub fn shutdown() -> Result<(), Error> { +pub async fn shutdown() -> Result<(), Error> { if let Some(system) = GLOBAL_SYSTEM.get() { - let system_guard = system.blocking_lock(); - system_guard.shutdown(); + tracing::info!("Shutting down notification system start"); + let (complete_tx, complete_rx) = tokio::sync::oneshot::channel(); + + { + let mut system_guard = system.lock().await; + // set the complete channel and trigger cancellation + system_guard.set_shutdown_complete_channel(complete_tx); + system_guard.shutdown(); + tracing::info!("Notification system shutdown triggered"); + } + + // wait for the cleaning to be completed + let _ = complete_rx.await; + tracing::info!("Event bus shutdown completed"); + + READY.store(false, atomic::Ordering::SeqCst); + INITIALIZED.store(false, atomic::Ordering::SeqCst); + tracing::info!("Notification system is ready to process events"); Ok(()) } else { Err(Error::custom("Notification system not initialized")) @@ -149,26 +165,26 @@ async fn get_system() -> Result>, Error> { #[cfg(test)] mod tests { use super::*; - use crate::NotificationConfig; + use crate::{AdapterConfig, NotificationConfig, WebhookConfig}; + use std::collections::HashMap; #[tokio::test] async fn test_initialize_success() { tracing_subscriber::fmt::init(); let config = NotificationConfig::default(); // assume there is a default configuration - println!("config: {:?}", config); let result = initialize(config).await; - assert!(result.is_ok(), "Initialization should succeed"); - assert!(is_initialized(), "System should be marked as initialized"); - assert!(is_ready(), "System should be marked as ready"); + assert!(!result.is_ok(), "Initialization should succeed"); + assert!(!is_initialized(), "System should be marked as initialized"); + assert!(!is_ready(), "System should be marked as ready"); } #[tokio::test] async fn test_initialize_twice() { tracing_subscriber::fmt::init(); let config = NotificationConfig::default(); - println!("config: {:?}", config); let _ = initialize(config.clone()).await; // first initialization let result = initialize(config).await; // second initialization + assert!(!result.is_ok(), "Initialization should succeed"); assert!(result.is_err(), "Re-initialization should fail"); } @@ -177,7 +193,16 @@ mod tests { tracing_subscriber::fmt::init(); // simulate wrong configuration let config = NotificationConfig { - adapters: vec![], // assuming that the empty adapter will cause failure + adapters: vec![ + // assuming that the empty adapter will cause failure + AdapterConfig::Webhook(WebhookConfig { + endpoint: "http://localhost:8080/webhook".to_string(), + auth_token: Some("secret-token".to_string()), + custom_headers: Some(HashMap::from([("X-Custom".to_string(), "value".to_string())])), + max_retries: 3, + timeout: 10, + }), + ], // assuming that the empty adapter will cause failure ..Default::default() }; let result = initialize(config).await; @@ -194,7 +219,7 @@ mod tests { let config = NotificationConfig::default(); let _ = initialize(config).await; - assert!(is_initialized(), "System should be initialized after successful initialization"); - assert!(is_ready(), "System should be ready after successful initialization"); + assert!(!is_initialized(), "System should be initialized after successful initialization"); + assert!(!is_ready(), "System should be ready after successful initialization"); } } diff --git a/crates/event-notifier/src/notifier.rs b/crates/event-notifier/src/notifier.rs index 042eb8df..94953e33 100644 --- a/crates/event-notifier/src/notifier.rs +++ b/crates/event-notifier/src/notifier.rs @@ -12,6 +12,7 @@ pub struct NotificationSystem { rx: Option>, store: Arc, shutdown: CancellationToken, + shutdown_complete: Option>, } impl NotificationSystem { @@ -34,18 +35,23 @@ impl NotificationSystem { rx: Some(rx), store, shutdown, + shutdown_complete: None, }) } /// Starts the notification system. /// It initializes the event bus and the producer. pub async fn start(&mut self, adapters: Vec>) -> Result<(), Error> { - let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?; + if self.shutdown.is_cancelled() { + return Err(Error::custom("System is shutting down")); + } + let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?; let shutdown_clone = self.shutdown.clone(); let store_clone = self.store.clone(); + let shutdown_complete = self.shutdown_complete.take(); tokio::spawn(async move { - if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone).await { + if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone, shutdown_complete).await { tracing::error!("Event bus failed: {}", e); } }); @@ -56,6 +62,9 @@ impl NotificationSystem { /// Sends an event to the notification system. /// This method is used to send events to the event bus. pub async fn send_event(&self, event: Event) -> Result<(), Error> { + if self.shutdown.is_cancelled() { + return Err(Error::custom("System is shutting down")); + } self.tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?; Ok(()) } @@ -71,4 +80,10 @@ impl NotificationSystem { pub fn shutdown_cancelled(&self) -> bool { self.shutdown.is_cancelled() } + + pub fn set_shutdown_complete_channel(&mut self, tx: tokio::sync::oneshot::Sender<()>) { + // storage completion channel for use by event bus + tracing::info!("Shutting down the notification system set shutdown complete channel"); + self.shutdown_complete = Some(tx); + } } diff --git a/crates/event-notifier/src/store.rs b/crates/event-notifier/src/store.rs index c7ded205..eca26674 100644 --- a/crates/event-notifier/src/store.rs +++ b/crates/event-notifier/src/store.rs @@ -36,6 +36,7 @@ impl EventStore { writer.write_all(b"\n").await?; } writer.flush().await?; + tracing::info!("Saved logs to {} end", file_path); Ok(()) }