diff --git a/Cargo.lock b/Cargo.lock index 799d7b6e..3892d3af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3046,10 +3046,10 @@ dependencies = [ ] [[package]] -name = "dotenv" -version = "0.15.0" +name = "dotenvy" +version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" [[package]] name = "dpi" @@ -7372,8 +7372,7 @@ version = "0.0.1" dependencies = [ "async-trait", "axum", - "chrono", - "dotenv", + "dotenvy", "figment", "rdkafka", "reqwest", @@ -7389,7 +7388,6 @@ dependencies = [ "tracing", "tracing-subscriber", "uuid", - "wildmatch", ] [[package]] @@ -9639,12 +9637,6 @@ dependencies = [ "rustix 0.38.44", ] -[[package]] -name = "wildmatch" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68ce1ab1f8c62655ebe1350f589c61e505cf94d385bc6a12899442d9081e71fd" - [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 52bd76da..d7eabe8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,7 @@ datafusion = "46.0.0" derive_builder = "0.20.2" dioxus = { version = "0.6.3", features = ["router"] } dirs = "6.0.0" -dotenv = "0.15.0" +dotenvy = "0.15.7" ecstore = { path = "./ecstore" } figment = { version = "0.10.19", features = ["toml", "yaml", "env"] } flatbuffers = "25.2.10" diff --git a/crates/event-notifier/Cargo.toml b/crates/event-notifier/Cargo.toml index 065cc698..0c3bc394 100644 --- a/crates/event-notifier/Cargo.toml +++ b/crates/event-notifier/Cargo.toml @@ -16,8 +16,7 @@ http-producer = ["dep:axum"] [dependencies] async-trait = { workspace = true } axum = { workspace = true, optional = true } -chrono = { workspace = true, features = ["serde"] } -dotenv = { workspace = true } +dotenvy = { workspace = true } figment = { workspace = true, features = ["toml", "yaml", "env"] } rdkafka = { workspace = true, features = ["tokio"], optional = true } reqwest = { workspace = true, optional = true } diff --git a/crates/event-notifier/examples/simple.rs b/crates/event-notifier/examples/simple.rs index 5324586a..77e3b56a 100644 --- a/crates/event-notifier/examples/simple.rs +++ b/crates/event-notifier/examples/simple.rs @@ -14,6 +14,7 @@ async fn main() -> Result<(), Box> { let mut config = NotificationConfig { store_path: "./events".to_string(), channel_capacity: 100, + timeout: 50, adapters: vec![AdapterConfig::Webhook(WebhookConfig { endpoint: "http://localhost:8080/webhook".to_string(), auth_token: Some("secret-token".to_string()), diff --git a/crates/event-notifier/src/bus.rs b/crates/event-notifier/src/bus.rs index 0ef341ce..55e5cd8b 100644 --- a/crates/event-notifier/src/bus.rs +++ b/crates/event-notifier/src/bus.rs @@ -2,8 +2,8 @@ use crate::ChannelAdapter; use crate::Error; use crate::EventStore; use crate::{Event, Log}; -use chrono::Utc; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; @@ -20,7 +20,7 @@ pub async fn event_bus( let mut pending_logs = Vec::new(); let mut current_log = Log { event_name: crate::event::Name::Everything, - key: Utc::now().timestamp().to_string(), + key: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string(), records: Vec::new(), }; diff --git a/crates/event-notifier/src/config.rs b/crates/event-notifier/src/config.rs index 823bf83d..af5f8899 100644 --- a/crates/event-notifier/src/config.rs +++ b/crates/event-notifier/src/config.rs @@ -89,6 +89,8 @@ pub struct NotificationConfig { pub store_path: String, #[serde(default = "default_channel_capacity")] pub channel_capacity: usize, + #[serde(default = "default_timeout")] + pub timeout: u64, pub adapters: Vec, #[serde(default)] pub http: HttpProducerConfig, @@ -99,6 +101,7 @@ impl Default for NotificationConfig { Self { store_path: default_store_path(), channel_capacity: default_channel_capacity(), + timeout: default_timeout(), adapters: Vec::new(), http: HttpProducerConfig::default(), } @@ -136,12 +139,10 @@ impl NotificationConfig { /// loading configuration from env file pub fn from_env_file(path: &str) -> Result { // loading env files - dotenv::from_path(path) - .map_err(|e| Error::ConfigError(format!("unable to load env file: {}", e)))?; + dotenvy::from_path(path).map_err(|e| Error::ConfigError(format!("unable to load env file: {}", e)))?; // Extract configuration from environment variables using figurement - let figment = - figment::Figment::new().merge(figment::providers::Env::prefixed("EVENT_NOTIF_")); + let figment = figment::Figment::new().merge(figment::providers::Env::prefixed("EVENT_NOTIF_")); Ok(figment.extract()?) } @@ -149,13 +150,15 @@ impl NotificationConfig { /// Provide temporary directories as default storage paths fn default_store_path() -> String { - std::env::temp_dir() - .join("event-notification") - .to_string_lossy() - .to_string() + std::env::temp_dir().join("event-notification").to_string_lossy().to_string() } /// Provides the recommended default channel capacity for high concurrency systems fn default_channel_capacity() -> usize { 10000 // Reasonable default values for high concurrency systems } + +/// Provides the recommended default timeout for high concurrency systems +fn default_timeout() -> u64 { + 50 // Reasonable default values for high concurrency systems +} diff --git a/crates/event-notifier/src/event.rs b/crates/event-notifier/src/event.rs index 00bc5983..ce32aecc 100644 --- a/crates/event-notifier/src/event.rs +++ b/crates/event-notifier/src/event.rs @@ -1,19 +1,21 @@ use crate::Error; -use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_with::{DeserializeFromStr, SerializeDisplay}; use smallvec::{smallvec, SmallVec}; use std::borrow::Cow; use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; use strum::{Display, EnumString}; use uuid::Uuid; +/// A struct representing the identity of the user #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Identity { #[serde(rename = "principalId")] pub principal_id: String, } +/// A struct representing the bucket information #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Bucket { pub name: String, @@ -22,6 +24,7 @@ pub struct Bucket { pub arn: String, } +/// A struct representing the object information #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Object { pub key: String, @@ -38,6 +41,7 @@ pub struct Object { pub sequencer: String, } +/// A struct representing the metadata of the event #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Metadata { #[serde(rename = "s3SchemaVersion")] @@ -48,6 +52,7 @@ pub struct Metadata { pub object: Object, } +/// A struct representing the source of the event #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Source { pub host: String, @@ -82,7 +87,7 @@ impl EventBuilder { event_version: Some(Cow::Borrowed("2.0").to_string()), event_source: Some(Cow::Borrowed("aws:s3").to_string()), aws_region: Some("us-east-1".to_string()), - event_time: Some(Utc::now().to_rfc3339()), + event_time: Some(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string()), event_name: None, user_identity: Some(Identity { principal_id: "anonymous".to_string(), @@ -214,7 +219,7 @@ impl EventBuilder { s3, source, id: Uuid::new_v4(), - timestamp: Utc::now(), + timestamp: SystemTime::now(), channels, }) } @@ -241,7 +246,7 @@ pub struct Event { pub s3: Metadata, pub source: Source, pub id: Uuid, - pub timestamp: DateTime, + pub timestamp: SystemTime, pub channels: SmallVec<[String; 2]>, } diff --git a/crates/event-notifier/src/global.rs b/crates/event-notifier/src/global.rs index a14380b3..ed666a8d 100644 --- a/crates/event-notifier/src/global.rs +++ b/crates/event-notifier/src/global.rs @@ -1,23 +1,90 @@ use crate::{ChannelAdapter, Error, Event, NotificationConfig, NotificationSystem}; -use std::sync::Arc; +use std::sync::{atomic, Arc}; +use std::time; +use std::time::Duration; use tokio::sync::{Mutex, OnceCell}; static GLOBAL_SYSTEM: OnceCell>> = OnceCell::const_new(); -static INITIALIZED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); +static INITIALIZED: atomic::AtomicBool = atomic::AtomicBool::new(false); +static READY: atomic::AtomicBool = atomic::AtomicBool::new(false); +static INIT_LOCK: Mutex<()> = Mutex::const_new(()); /// initialize the global notification system pub async fn initialize(config: NotificationConfig) -> Result<(), Error> { - if INITIALIZED.swap(true, std::sync::atomic::Ordering::SeqCst) { + // use lock to protect the initialization process + let _lock = INIT_LOCK.lock().await; + + if INITIALIZED.swap(true, atomic::Ordering::SeqCst) { return Err(Error::custom("notify the system has been initialized")); } - let system = Arc::new(Mutex::new(NotificationSystem::new(config).await?)); - GLOBAL_SYSTEM - .set(system) - .map_err(|_| Error::custom("unable to set up global notification system"))?; + match Arc::new(Mutex::new(NotificationSystem::new(config).await?)) { + system => { + if let Err(_) = GLOBAL_SYSTEM.set(system.clone()) { + INITIALIZED.store(false, atomic::Ordering::SeqCst); + return Err(Error::custom("unable to set up global notification system")); + } + system + } + }; Ok(()) } +/// securely initialize the global notification system +pub async fn initialize_safe(config: NotificationConfig) -> Result<(), Error> { + // use-lock-to-protect-the-initialization-process + let _lock = INIT_LOCK.lock().await; + + if INITIALIZED.load(atomic::Ordering::SeqCst) { + return Err(Error::custom("notify the system has been initialized")); + } + + // set-initialization-flag + INITIALIZED.store(true, atomic::Ordering::SeqCst); + + match Arc::new(Mutex::new(NotificationSystem::new(config).await?)) { + system => { + if let Err(_) = GLOBAL_SYSTEM.set(system.clone()) { + INITIALIZED.store(false, atomic::Ordering::SeqCst); + return Err(Error::custom("unable to set up global notification system")); + } + system + } + }; + Ok(()) +} + +/// securely start the global notification system +pub async fn start_safe(adapters: Vec>) -> Result<(), Error> { + // start process with lock protection + let _lock = INIT_LOCK.lock().await; + + if !INITIALIZED.load(atomic::Ordering::SeqCst) { + return Err(Error::custom("notification system not initialized")); + } + + if READY.load(atomic::Ordering::SeqCst) { + return Err(Error::custom("notification system already started")); + } + + let system = get_system().await?; + + // Execute startup operations directly on the current thread, rather than generating new tasks + let mut system_guard = system.lock().await; + match system_guard.start(adapters).await { + Ok(_) => { + READY.store(true, atomic::Ordering::SeqCst); + tracing::info!("Notification system is ready to process events"); + Ok(()) + } + Err(e) => { + tracing::error!("Notify system start failed: {}", e); + INITIALIZED.store(false, atomic::Ordering::SeqCst); + Err(e) + } + } +} + /// start the global notification system pub async fn start(adapters: Vec>) -> Result<(), Error> { let system = get_system().await?; @@ -26,11 +93,36 @@ pub async fn start(adapters: Vec>) -> Result<(), Error> let system_clone = Arc::clone(&system); tokio::spawn(async move { let mut system_guard = system_clone.lock().await; - if let Err(e) = system_guard.start(adapters).await { - tracing::error!("notify the system to start failed: {}", e); + match system_guard.start(adapters).await { + Ok(_) => { + // The system is started and runs normally, set the ready flag + READY.store(true, atomic::Ordering::SeqCst); + tracing::info!("Notification system is ready to process events"); + } + Err(e) => { + tracing::error!("Notify system start failed: {}", e); + INITIALIZED.store(false, atomic::Ordering::SeqCst); + } } }); + // Wait for a while to ensure the system has a chance to start + tokio::time::sleep(Duration::from_millis(100)).await; + + Ok(()) +} + +/// waiting for notification system to be fully ready +async fn wait_until_ready(timeout: Duration) -> Result<(), Error> { + let start = time::Instant::now(); + + while !READY.load(atomic::Ordering::SeqCst) { + if start.elapsed() > timeout { + return Err(Error::custom("timeout waiting for notification system to become ready")); + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + Ok(()) } @@ -51,6 +143,7 @@ pub async fn start(adapters: Vec>) -> Result<(), Error> /// let config = NotificationConfig { /// store_path: "./events".to_string(), /// channel_capacity: 100, +/// timeout: 0, /// adapters: vec![/* 适配器配置 */], /// http: Default::default(), /// }; @@ -73,8 +166,29 @@ pub async fn initialize_and_start(config: NotificationConfig) -> Result<(), Erro Ok(()) } +/// Initialize and start the global notification system and wait until it's ready +pub async fn initialize_and_start_with_ready_check(config: NotificationConfig, timeout: Duration) -> Result<(), Error> { + // initialize the system + initialize(config.clone()).await?; + + // create an adapter + let adapters = crate::create_adapters(&config.adapters).expect("failed to create adapters"); + + // start the system + start(adapters).await?; + + // wait for the system to be ready + wait_until_ready(timeout).await?; + + Ok(()) +} + /// send events to notification system pub async fn send_event(event: Event) -> Result<(), Error> { + // check if the system is ready to receive events + if !READY.load(atomic::Ordering::SeqCst) { + return Err(Error::custom("notification system not ready, please wait for initialization to complete")); + } let system = get_system().await?; let system_guard = system.lock().await; system_guard.send_event(event).await diff --git a/crates/event-notifier/src/lib.rs b/crates/event-notifier/src/lib.rs index 9ba226c6..2fa9d895 100644 --- a/crates/event-notifier/src/lib.rs +++ b/crates/event-notifier/src/lib.rs @@ -4,6 +4,7 @@ mod config; mod error; mod event; mod global; +mod notifier; mod producer; mod store; @@ -28,104 +29,9 @@ pub use config::{AdapterConfig, NotificationConfig}; pub use error::Error; pub use event::{Bucket, Event, EventBuilder, Identity, Log, Metadata, Name, Object, Source}; -pub use global::{initialize, initialize_and_start, send_event, shutdown, start}; +pub use global::{ + initialize, initialize_and_start, initialize_and_start_with_ready_check, initialize_safe, send_event, shutdown, start, + start_safe, +}; +pub use notifier::NotificationSystem; pub use store::EventStore; - -#[cfg(feature = "http-producer")] -pub use producer::http::HttpProducer; -#[cfg(feature = "http-producer")] -pub use producer::EventProducer; - -use std::sync::Arc; -use tokio::sync::mpsc; -use tokio_util::sync::CancellationToken; - -/// The `NotificationSystem` struct represents the notification system. -/// It manages the event bus and the adapters. -/// It is responsible for sending and receiving events. -/// It also handles the shutdown process. -pub struct NotificationSystem { - tx: mpsc::Sender, - rx: Option>, - store: Arc, - shutdown: CancellationToken, - #[cfg(feature = "http-producer")] - http_config: HttpProducerConfig, -} - -impl NotificationSystem { - /// Creates a new `NotificationSystem` instance. - pub async fn new(config: NotificationConfig) -> Result { - let (tx, rx) = mpsc::channel::(config.channel_capacity); - let store = Arc::new(EventStore::new(&config.store_path).await?); - let shutdown = CancellationToken::new(); - - let restored_logs = store.load_logs().await?; - for log in restored_logs { - for event in log.records { - // For example, where the send method may return a SendError when calling it - tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?; - } - } - - Ok(Self { - tx, - rx: Some(rx), - store, - shutdown, - #[cfg(feature = "http-producer")] - http_config: config.http, - }) - } - - /// Starts the notification system. - /// It initializes the event bus and the producer. - pub async fn start(&mut self, adapters: Vec>) -> Result<(), Error> { - let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?; - - let shutdown_clone = self.shutdown.clone(); - let store_clone = self.store.clone(); - let bus_handle = tokio::spawn(async move { - if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone).await { - tracing::error!("Event bus failed: {}", e); - } - }); - - #[cfg(feature = "http-producer")] - { - let producer = HttpProducer::new(self.tx.clone(), self.http_config.port); - producer.start().await?; - } - - tokio::select! { - result = bus_handle => { - result.map_err(Error::JoinError)?; - Ok(()) - }, - _ = self.shutdown.cancelled() => { - tracing::info!("System shutdown triggered"); - Ok(()) - } - } - } - - /// Sends an event to the notification system. - /// This method is used to send events to the event bus. - pub async fn send_event(&self, event: Event) -> Result<(), Error> { - self.tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?; - Ok(()) - } - - /// Shuts down the notification system. - /// This method is used to cancel the event bus and producer tasks. - pub fn shutdown(&self) { - self.shutdown.cancel(); - } - - /// Sets the HTTP port for the notification system. - /// This method is used to change the port for the HTTP producer. - #[cfg(feature = "http-producer")] - pub fn set_http_port(&mut self, port: u16) { - self.http_config.port = port; - } -} diff --git a/crates/event-notifier/src/notifier.rs b/crates/event-notifier/src/notifier.rs new file mode 100644 index 00000000..e75fe25d --- /dev/null +++ b/crates/event-notifier/src/notifier.rs @@ -0,0 +1,101 @@ +#[cfg(feature = "http-producer")] +pub use crate::producer::http::HttpProducer; +#[cfg(feature = "http-producer")] +pub use crate::producer::EventProducer; + +#[cfg(feature = "http-producer")] +pub use crate::config::HttpProducerConfig; +use crate::{event_bus, ChannelAdapter, Error, Event, EventStore, NotificationConfig}; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +/// The `NotificationSystem` struct represents the notification system. +/// It manages the event bus and the adapters. +/// It is responsible for sending and receiving events. +/// It also handles the shutdown process. +pub struct NotificationSystem { + tx: mpsc::Sender, + rx: Option>, + store: Arc, + shutdown: CancellationToken, + #[cfg(feature = "http-producer")] + http_config: HttpProducerConfig, +} + +impl NotificationSystem { + /// Creates a new `NotificationSystem` instance. + pub async fn new(config: NotificationConfig) -> Result { + let (tx, rx) = mpsc::channel::(config.channel_capacity); + let store = Arc::new(EventStore::new(&config.store_path).await?); + let shutdown = CancellationToken::new(); + + let restored_logs = store.load_logs().await?; + for log in restored_logs { + for event in log.records { + // For example, where the send method may return a SendError when calling it + tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?; + } + } + + Ok(Self { + tx, + rx: Some(rx), + store, + shutdown, + #[cfg(feature = "http-producer")] + http_config: config.http, + }) + } + + /// Starts the notification system. + /// It initializes the event bus and the producer. + pub async fn start(&mut self, adapters: Vec>) -> Result<(), Error> { + let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?; + + let shutdown_clone = self.shutdown.clone(); + let store_clone = self.store.clone(); + let bus_handle = tokio::spawn(async move { + if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone).await { + tracing::error!("Event bus failed: {}", e); + } + }); + + #[cfg(feature = "http-producer")] + { + let producer = crate::producer::http::HttpProducer::new(self.tx.clone(), self.http_config.port); + producer.start().await?; + } + + tokio::select! { + result = bus_handle => { + result.map_err(Error::JoinError)?; + Ok(()) + }, + _ = self.shutdown.cancelled() => { + tracing::info!("System shutdown triggered"); + Ok(()) + } + } + } + + /// Sends an event to the notification system. + /// This method is used to send events to the event bus. + pub async fn send_event(&self, event: Event) -> Result<(), Error> { + self.tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?; + Ok(()) + } + + /// Shuts down the notification system. + /// This method is used to cancel the event bus and producer tasks. + pub fn shutdown(&self) { + self.shutdown.cancel(); + } + + /// Sets the HTTP port for the notification system. + /// This method is used to change the port for the HTTP producer. + #[cfg(feature = "http-producer")] + pub fn set_http_port(&mut self, port: u16) { + self.http_config.port = port; + } +} diff --git a/crates/event-notifier/src/store.rs b/crates/event-notifier/src/store.rs index 995cdc83..c7ded205 100644 --- a/crates/event-notifier/src/store.rs +++ b/crates/event-notifier/src/store.rs @@ -1,7 +1,7 @@ use crate::Error; use crate::Log; -use chrono::Utc; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::fs::{create_dir_all, File, OpenOptions}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::sync::RwLock; @@ -23,7 +23,11 @@ impl EventStore { pub async fn save_logs(&self, logs: &[Log]) -> Result<(), Error> { let _guard = self.lock.write().await; - let file_path = format!("{}/events_{}.jsonl", self.path, Utc::now().timestamp()); + let file_path = format!( + "{}/events_{}.jsonl", + self.path, + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() + ); let file = OpenOptions::new().create(true).append(true).open(&file_path).await?; let mut writer = BufWriter::new(file); for log in logs { diff --git a/crates/event-notifier/tests/integration.rs b/crates/event-notifier/tests/integration.rs index 2829b8b9..f1b07080 100644 --- a/crates/event-notifier/tests/integration.rs +++ b/crates/event-notifier/tests/integration.rs @@ -70,6 +70,7 @@ async fn test_notification_system() { let config = rustfs_event_notifier::NotificationConfig { store_path: "./test_events".to_string(), channel_capacity: 100, + timeout: 50, adapters: vec![AdapterConfig::Webhook(WebhookConfig { endpoint: "http://localhost:8080/webhook".to_string(), auth_token: None,