feat(event-notifier): improve notification system initialization safety

- Add READY atomic flag to track full initialization status
- Implement initialize_safe and start_safe methods with mutex protection
- Add wait_until_ready function with configurable timeout
- Create initialize_and_start_with_ready_check helper method
- Replace sleep-based waiting with proper readiness checks
- Add safety checks before sending events
- Replace chrono with std::time for time handling
- Update error handling to provide clear initialization status

This change reduces race conditions in multi-threaded environments
and ensures events are only processed when the system is fully ready.
This commit is contained in:
houseme
2025-04-21 13:28:01 +08:00
parent bfc165abe0
commit 3b6397012b
12 changed files with 266 additions and 140 deletions

View File

@@ -16,8 +16,7 @@ http-producer = ["dep:axum"]
[dependencies]
async-trait = { workspace = true }
axum = { workspace = true, optional = true }
chrono = { workspace = true, features = ["serde"] }
dotenv = { workspace = true }
dotenvy = { workspace = true }
figment = { workspace = true, features = ["toml", "yaml", "env"] }
rdkafka = { workspace = true, features = ["tokio"], optional = true }
reqwest = { workspace = true, optional = true }

View File

@@ -14,6 +14,7 @@ async fn main() -> Result<(), Box<dyn error::Error>> {
let mut config = NotificationConfig {
store_path: "./events".to_string(),
channel_capacity: 100,
timeout: 50,
adapters: vec![AdapterConfig::Webhook(WebhookConfig {
endpoint: "http://localhost:8080/webhook".to_string(),
auth_token: Some("secret-token".to_string()),

View File

@@ -2,8 +2,8 @@ use crate::ChannelAdapter;
use crate::Error;
use crate::EventStore;
use crate::{Event, Log};
use chrono::Utc;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
@@ -20,7 +20,7 @@ pub async fn event_bus(
let mut pending_logs = Vec::new();
let mut current_log = Log {
event_name: crate::event::Name::Everything,
key: Utc::now().timestamp().to_string(),
key: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string(),
records: Vec::new(),
};

View File

@@ -89,6 +89,8 @@ pub struct NotificationConfig {
pub store_path: String,
#[serde(default = "default_channel_capacity")]
pub channel_capacity: usize,
#[serde(default = "default_timeout")]
pub timeout: u64,
pub adapters: Vec<AdapterConfig>,
#[serde(default)]
pub http: HttpProducerConfig,
@@ -99,6 +101,7 @@ impl Default for NotificationConfig {
Self {
store_path: default_store_path(),
channel_capacity: default_channel_capacity(),
timeout: default_timeout(),
adapters: Vec::new(),
http: HttpProducerConfig::default(),
}
@@ -136,12 +139,10 @@ impl NotificationConfig {
/// loading configuration from env file
pub fn from_env_file(path: &str) -> Result<Self, Error> {
// loading env files
dotenv::from_path(path)
.map_err(|e| Error::ConfigError(format!("unable to load env file: {}", e)))?;
dotenvy::from_path(path).map_err(|e| Error::ConfigError(format!("unable to load env file: {}", e)))?;
// Extract configuration from environment variables using figurement
let figment =
figment::Figment::new().merge(figment::providers::Env::prefixed("EVENT_NOTIF_"));
let figment = figment::Figment::new().merge(figment::providers::Env::prefixed("EVENT_NOTIF_"));
Ok(figment.extract()?)
}
@@ -149,13 +150,15 @@ impl NotificationConfig {
/// Provide temporary directories as default storage paths
fn default_store_path() -> String {
std::env::temp_dir()
.join("event-notification")
.to_string_lossy()
.to_string()
std::env::temp_dir().join("event-notification").to_string_lossy().to_string()
}
/// Provides the recommended default channel capacity for high concurrency systems
fn default_channel_capacity() -> usize {
10000 // Reasonable default values for high concurrency systems
}
/// Provides the recommended default timeout for high concurrency systems
fn default_timeout() -> u64 {
50 // Reasonable default values for high concurrency systems
}

View File

@@ -1,19 +1,21 @@
use crate::Error;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_with::{DeserializeFromStr, SerializeDisplay};
use smallvec::{smallvec, SmallVec};
use std::borrow::Cow;
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use strum::{Display, EnumString};
use uuid::Uuid;
/// A struct representing the identity of the user
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Identity {
#[serde(rename = "principalId")]
pub principal_id: String,
}
/// A struct representing the bucket information
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Bucket {
pub name: String,
@@ -22,6 +24,7 @@ pub struct Bucket {
pub arn: String,
}
/// A struct representing the object information
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Object {
pub key: String,
@@ -38,6 +41,7 @@ pub struct Object {
pub sequencer: String,
}
/// A struct representing the metadata of the event
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Metadata {
#[serde(rename = "s3SchemaVersion")]
@@ -48,6 +52,7 @@ pub struct Metadata {
pub object: Object,
}
/// A struct representing the source of the event
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Source {
pub host: String,
@@ -82,7 +87,7 @@ impl EventBuilder {
event_version: Some(Cow::Borrowed("2.0").to_string()),
event_source: Some(Cow::Borrowed("aws:s3").to_string()),
aws_region: Some("us-east-1".to_string()),
event_time: Some(Utc::now().to_rfc3339()),
event_time: Some(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string()),
event_name: None,
user_identity: Some(Identity {
principal_id: "anonymous".to_string(),
@@ -214,7 +219,7 @@ impl EventBuilder {
s3,
source,
id: Uuid::new_v4(),
timestamp: Utc::now(),
timestamp: SystemTime::now(),
channels,
})
}
@@ -241,7 +246,7 @@ pub struct Event {
pub s3: Metadata,
pub source: Source,
pub id: Uuid,
pub timestamp: DateTime<Utc>,
pub timestamp: SystemTime,
pub channels: SmallVec<[String; 2]>,
}

View File

@@ -1,23 +1,90 @@
use crate::{ChannelAdapter, Error, Event, NotificationConfig, NotificationSystem};
use std::sync::Arc;
use std::sync::{atomic, Arc};
use std::time;
use std::time::Duration;
use tokio::sync::{Mutex, OnceCell};
static GLOBAL_SYSTEM: OnceCell<Arc<Mutex<NotificationSystem>>> = OnceCell::const_new();
static INITIALIZED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
static INITIALIZED: atomic::AtomicBool = atomic::AtomicBool::new(false);
static READY: atomic::AtomicBool = atomic::AtomicBool::new(false);
static INIT_LOCK: Mutex<()> = Mutex::const_new(());
/// initialize the global notification system
pub async fn initialize(config: NotificationConfig) -> Result<(), Error> {
if INITIALIZED.swap(true, std::sync::atomic::Ordering::SeqCst) {
// use lock to protect the initialization process
let _lock = INIT_LOCK.lock().await;
if INITIALIZED.swap(true, atomic::Ordering::SeqCst) {
return Err(Error::custom("notify the system has been initialized"));
}
let system = Arc::new(Mutex::new(NotificationSystem::new(config).await?));
GLOBAL_SYSTEM
.set(system)
.map_err(|_| Error::custom("unable to set up global notification system"))?;
match Arc::new(Mutex::new(NotificationSystem::new(config).await?)) {
system => {
if let Err(_) = GLOBAL_SYSTEM.set(system.clone()) {
INITIALIZED.store(false, atomic::Ordering::SeqCst);
return Err(Error::custom("unable to set up global notification system"));
}
system
}
};
Ok(())
}
/// securely initialize the global notification system
pub async fn initialize_safe(config: NotificationConfig) -> Result<(), Error> {
// use-lock-to-protect-the-initialization-process
let _lock = INIT_LOCK.lock().await;
if INITIALIZED.load(atomic::Ordering::SeqCst) {
return Err(Error::custom("notify the system has been initialized"));
}
// set-initialization-flag
INITIALIZED.store(true, atomic::Ordering::SeqCst);
match Arc::new(Mutex::new(NotificationSystem::new(config).await?)) {
system => {
if let Err(_) = GLOBAL_SYSTEM.set(system.clone()) {
INITIALIZED.store(false, atomic::Ordering::SeqCst);
return Err(Error::custom("unable to set up global notification system"));
}
system
}
};
Ok(())
}
/// securely start the global notification system
pub async fn start_safe(adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
// start process with lock protection
let _lock = INIT_LOCK.lock().await;
if !INITIALIZED.load(atomic::Ordering::SeqCst) {
return Err(Error::custom("notification system not initialized"));
}
if READY.load(atomic::Ordering::SeqCst) {
return Err(Error::custom("notification system already started"));
}
let system = get_system().await?;
// Execute startup operations directly on the current thread, rather than generating new tasks
let mut system_guard = system.lock().await;
match system_guard.start(adapters).await {
Ok(_) => {
READY.store(true, atomic::Ordering::SeqCst);
tracing::info!("Notification system is ready to process events");
Ok(())
}
Err(e) => {
tracing::error!("Notify system start failed: {}", e);
INITIALIZED.store(false, atomic::Ordering::SeqCst);
Err(e)
}
}
}
/// start the global notification system
pub async fn start(adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
let system = get_system().await?;
@@ -26,11 +93,36 @@ pub async fn start(adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error>
let system_clone = Arc::clone(&system);
tokio::spawn(async move {
let mut system_guard = system_clone.lock().await;
if let Err(e) = system_guard.start(adapters).await {
tracing::error!("notify the system to start failed: {}", e);
match system_guard.start(adapters).await {
Ok(_) => {
// The system is started and runs normally, set the ready flag
READY.store(true, atomic::Ordering::SeqCst);
tracing::info!("Notification system is ready to process events");
}
Err(e) => {
tracing::error!("Notify system start failed: {}", e);
INITIALIZED.store(false, atomic::Ordering::SeqCst);
}
}
});
// Wait for a while to ensure the system has a chance to start
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
/// waiting for notification system to be fully ready
async fn wait_until_ready(timeout: Duration) -> Result<(), Error> {
let start = time::Instant::now();
while !READY.load(atomic::Ordering::SeqCst) {
if start.elapsed() > timeout {
return Err(Error::custom("timeout waiting for notification system to become ready"));
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
Ok(())
}
@@ -51,6 +143,7 @@ pub async fn start(adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error>
/// let config = NotificationConfig {
/// store_path: "./events".to_string(),
/// channel_capacity: 100,
/// timeout: 0,
/// adapters: vec![/* 适配器配置 */],
/// http: Default::default(),
/// };
@@ -73,8 +166,29 @@ pub async fn initialize_and_start(config: NotificationConfig) -> Result<(), Erro
Ok(())
}
/// Initialize and start the global notification system and wait until it's ready
pub async fn initialize_and_start_with_ready_check(config: NotificationConfig, timeout: Duration) -> Result<(), Error> {
// initialize the system
initialize(config.clone()).await?;
// create an adapter
let adapters = crate::create_adapters(&config.adapters).expect("failed to create adapters");
// start the system
start(adapters).await?;
// wait for the system to be ready
wait_until_ready(timeout).await?;
Ok(())
}
/// send events to notification system
pub async fn send_event(event: Event) -> Result<(), Error> {
// check if the system is ready to receive events
if !READY.load(atomic::Ordering::SeqCst) {
return Err(Error::custom("notification system not ready, please wait for initialization to complete"));
}
let system = get_system().await?;
let system_guard = system.lock().await;
system_guard.send_event(event).await

View File

@@ -4,6 +4,7 @@ mod config;
mod error;
mod event;
mod global;
mod notifier;
mod producer;
mod store;
@@ -28,104 +29,9 @@ pub use config::{AdapterConfig, NotificationConfig};
pub use error::Error;
pub use event::{Bucket, Event, EventBuilder, Identity, Log, Metadata, Name, Object, Source};
pub use global::{initialize, initialize_and_start, send_event, shutdown, start};
pub use global::{
initialize, initialize_and_start, initialize_and_start_with_ready_check, initialize_safe, send_event, shutdown, start,
start_safe,
};
pub use notifier::NotificationSystem;
pub use store::EventStore;
#[cfg(feature = "http-producer")]
pub use producer::http::HttpProducer;
#[cfg(feature = "http-producer")]
pub use producer::EventProducer;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
/// The `NotificationSystem` struct represents the notification system.
/// It manages the event bus and the adapters.
/// It is responsible for sending and receiving events.
/// It also handles the shutdown process.
pub struct NotificationSystem {
tx: mpsc::Sender<Event>,
rx: Option<mpsc::Receiver<Event>>,
store: Arc<EventStore>,
shutdown: CancellationToken,
#[cfg(feature = "http-producer")]
http_config: HttpProducerConfig,
}
impl NotificationSystem {
/// Creates a new `NotificationSystem` instance.
pub async fn new(config: NotificationConfig) -> Result<Self, Error> {
let (tx, rx) = mpsc::channel::<Event>(config.channel_capacity);
let store = Arc::new(EventStore::new(&config.store_path).await?);
let shutdown = CancellationToken::new();
let restored_logs = store.load_logs().await?;
for log in restored_logs {
for event in log.records {
// For example, where the send method may return a SendError when calling it
tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?;
}
}
Ok(Self {
tx,
rx: Some(rx),
store,
shutdown,
#[cfg(feature = "http-producer")]
http_config: config.http,
})
}
/// Starts the notification system.
/// It initializes the event bus and the producer.
pub async fn start(&mut self, adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?;
let shutdown_clone = self.shutdown.clone();
let store_clone = self.store.clone();
let bus_handle = tokio::spawn(async move {
if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone).await {
tracing::error!("Event bus failed: {}", e);
}
});
#[cfg(feature = "http-producer")]
{
let producer = HttpProducer::new(self.tx.clone(), self.http_config.port);
producer.start().await?;
}
tokio::select! {
result = bus_handle => {
result.map_err(Error::JoinError)?;
Ok(())
},
_ = self.shutdown.cancelled() => {
tracing::info!("System shutdown triggered");
Ok(())
}
}
}
/// Sends an event to the notification system.
/// This method is used to send events to the event bus.
pub async fn send_event(&self, event: Event) -> Result<(), Error> {
self.tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?;
Ok(())
}
/// Shuts down the notification system.
/// This method is used to cancel the event bus and producer tasks.
pub fn shutdown(&self) {
self.shutdown.cancel();
}
/// Sets the HTTP port for the notification system.
/// This method is used to change the port for the HTTP producer.
#[cfg(feature = "http-producer")]
pub fn set_http_port(&mut self, port: u16) {
self.http_config.port = port;
}
}

View File

@@ -0,0 +1,101 @@
#[cfg(feature = "http-producer")]
pub use crate::producer::http::HttpProducer;
#[cfg(feature = "http-producer")]
pub use crate::producer::EventProducer;
#[cfg(feature = "http-producer")]
pub use crate::config::HttpProducerConfig;
use crate::{event_bus, ChannelAdapter, Error, Event, EventStore, NotificationConfig};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
/// The `NotificationSystem` struct represents the notification system.
/// It manages the event bus and the adapters.
/// It is responsible for sending and receiving events.
/// It also handles the shutdown process.
pub struct NotificationSystem {
tx: mpsc::Sender<Event>,
rx: Option<mpsc::Receiver<Event>>,
store: Arc<EventStore>,
shutdown: CancellationToken,
#[cfg(feature = "http-producer")]
http_config: HttpProducerConfig,
}
impl NotificationSystem {
/// Creates a new `NotificationSystem` instance.
pub async fn new(config: NotificationConfig) -> Result<Self, Error> {
let (tx, rx) = mpsc::channel::<Event>(config.channel_capacity);
let store = Arc::new(EventStore::new(&config.store_path).await?);
let shutdown = CancellationToken::new();
let restored_logs = store.load_logs().await?;
for log in restored_logs {
for event in log.records {
// For example, where the send method may return a SendError when calling it
tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?;
}
}
Ok(Self {
tx,
rx: Some(rx),
store,
shutdown,
#[cfg(feature = "http-producer")]
http_config: config.http,
})
}
/// Starts the notification system.
/// It initializes the event bus and the producer.
pub async fn start(&mut self, adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?;
let shutdown_clone = self.shutdown.clone();
let store_clone = self.store.clone();
let bus_handle = tokio::spawn(async move {
if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone).await {
tracing::error!("Event bus failed: {}", e);
}
});
#[cfg(feature = "http-producer")]
{
let producer = crate::producer::http::HttpProducer::new(self.tx.clone(), self.http_config.port);
producer.start().await?;
}
tokio::select! {
result = bus_handle => {
result.map_err(Error::JoinError)?;
Ok(())
},
_ = self.shutdown.cancelled() => {
tracing::info!("System shutdown triggered");
Ok(())
}
}
}
/// Sends an event to the notification system.
/// This method is used to send events to the event bus.
pub async fn send_event(&self, event: Event) -> Result<(), Error> {
self.tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?;
Ok(())
}
/// Shuts down the notification system.
/// This method is used to cancel the event bus and producer tasks.
pub fn shutdown(&self) {
self.shutdown.cancel();
}
/// Sets the HTTP port for the notification system.
/// This method is used to change the port for the HTTP producer.
#[cfg(feature = "http-producer")]
pub fn set_http_port(&mut self, port: u16) {
self.http_config.port = port;
}
}

View File

@@ -1,7 +1,7 @@
use crate::Error;
use crate::Log;
use chrono::Utc;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::fs::{create_dir_all, File, OpenOptions};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::sync::RwLock;
@@ -23,7 +23,11 @@ impl EventStore {
pub async fn save_logs(&self, logs: &[Log]) -> Result<(), Error> {
let _guard = self.lock.write().await;
let file_path = format!("{}/events_{}.jsonl", self.path, Utc::now().timestamp());
let file_path = format!(
"{}/events_{}.jsonl",
self.path,
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
);
let file = OpenOptions::new().create(true).append(true).open(&file_path).await?;
let mut writer = BufWriter::new(file);
for log in logs {

View File

@@ -70,6 +70,7 @@ async fn test_notification_system() {
let config = rustfs_event_notifier::NotificationConfig {
store_path: "./test_events".to_string(),
channel_capacity: 100,
timeout: 50,
adapters: vec![AdapterConfig::Webhook(WebhookConfig {
endpoint: "http://localhost:8080/webhook".to_string(),
auth_token: None,