From 4199bf2ba454c7e4d1b68087ee55615a480b2cd2 Mon Sep 17 00:00:00 2001 From: houseme Date: Fri, 6 Jun 2025 11:14:36 +0800 Subject: [PATCH] modify crates name --- Cargo.lock | 46 ++-- Cargo.toml | 4 +- crates/event/examples/full.rs | 144 ----------- crates/event/examples/simple.rs | 120 --------- crates/event/src/global.rs | 234 ------------------ crates/event/src/notifier.rs | 136 ---------- crates/event/tests/integration.rs | 177 ------------- crates/{event => notify}/Cargo.toml | 2 +- .../{event => notify}/examples/.env.example | 0 .../examples/.env.zh.example | 0 crates/{event => notify}/examples/event.toml | 0 crates/{event => notify}/examples/webhook.rs | 0 crates/{event => notify}/src/adapter/kafka.rs | 0 crates/{event => notify}/src/adapter/mod.rs | 0 crates/{event => notify}/src/adapter/mqtt.rs | 0 .../{event => notify}/src/adapter/webhook.rs | 4 +- .../{event => notify}/src/config/adapter.rs | 0 crates/{event => notify}/src/config/kafka.rs | 0 crates/{event => notify}/src/config/mod.rs | 0 crates/{event => notify}/src/config/mqtt.rs | 0 .../{event => notify}/src/config/notifier.rs | 0 .../{event => notify}/src/config/webhook.rs | 0 crates/{event => notify}/src/error.rs | 0 crates/{event => notify}/src/event.rs | 0 crates/{event => notify}/src/lib.rs | 20 +- .../src/notifier.rs} | 71 +++--- crates/{event => notify}/src/store/manager.rs | 0 crates/{event => notify}/src/store/mod.rs | 0 crates/{event => notify}/src/store/queue.rs | 98 ++++---- .../event_system.rs => notify/src/system.rs} | 38 +-- ecstore/src/config/mod.rs | 6 +- rustfs/Cargo.toml | 2 +- rustfs/src/event.rs | 31 +-- rustfs/src/main.rs | 16 +- rustfs/src/storage/event.rs | 3 +- 35 files changed, 172 insertions(+), 980 deletions(-) delete mode 100644 crates/event/examples/full.rs delete mode 100644 crates/event/examples/simple.rs delete mode 100644 crates/event/src/global.rs delete mode 100644 crates/event/src/notifier.rs delete mode 100644 crates/event/tests/integration.rs rename crates/{event => notify}/Cargo.toml (98%) rename crates/{event => notify}/examples/.env.example (100%) rename crates/{event => notify}/examples/.env.zh.example (100%) rename crates/{event => notify}/examples/event.toml (100%) rename crates/{event => notify}/examples/webhook.rs (100%) rename crates/{event => notify}/src/adapter/kafka.rs (100%) rename crates/{event => notify}/src/adapter/mod.rs (100%) rename crates/{event => notify}/src/adapter/mqtt.rs (100%) rename crates/{event => notify}/src/adapter/webhook.rs (98%) rename crates/{event => notify}/src/config/adapter.rs (100%) rename crates/{event => notify}/src/config/kafka.rs (100%) rename crates/{event => notify}/src/config/mod.rs (100%) rename crates/{event => notify}/src/config/mqtt.rs (100%) rename crates/{event => notify}/src/config/notifier.rs (100%) rename crates/{event => notify}/src/config/webhook.rs (100%) rename crates/{event => notify}/src/error.rs (100%) rename crates/{event => notify}/src/event.rs (100%) rename crates/{event => notify}/src/lib.rs (85%) rename crates/{event/src/event_notifier.rs => notify/src/notifier.rs} (61%) rename crates/{event => notify}/src/store/manager.rs (100%) rename crates/{event => notify}/src/store/mod.rs (100%) rename crates/{event => notify}/src/store/queue.rs (93%) rename crates/{event/src/event_system.rs => notify/src/system.rs} (60%) diff --git a/Cargo.lock b/Cargo.lock index 24c65708..f56dc7c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7655,7 +7655,7 @@ dependencies = [ "rmp-serde", "rust-embed", "rustfs-config", - "rustfs-event", + "rustfs-notify", "rustfs-obs", "rustfs-utils", "rustfs-zip", @@ -7693,7 +7693,28 @@ dependencies = [ ] [[package]] -name = "rustfs-event" +name = "rustfs-gui" +version = "0.0.1" +dependencies = [ + "chrono", + "dioxus", + "dirs", + "hex", + "keyring", + "lazy_static", + "rfd 0.15.3", + "rust-embed", + "rust-i18n", + "serde", + "serde_json", + "sha2 0.10.9", + "tokio", + "tracing-appender", + "tracing-subscriber", +] + +[[package]] +name = "rustfs-notify" version = "0.0.1" dependencies = [ "async-trait", @@ -7721,27 +7742,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "rustfs-gui" -version = "0.0.1" -dependencies = [ - "chrono", - "dioxus", - "dirs", - "hex", - "keyring", - "lazy_static", - "rfd 0.15.3", - "rust-embed", - "rust-i18n", - "serde", - "serde_json", - "sha2 0.10.9", - "tokio", - "tracing-appender", - "tracing-subscriber", -] - [[package]] name = "rustfs-obs" version = "0.0.1" diff --git a/Cargo.toml b/Cargo.toml index 800a56c0..fe3ac351 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ "common/protos", # Protocol buffer definitions "common/workers", # Worker thread pools and task scheduling "crates/config", # Configuration management - "crates/event", # Event notification system + "crates/notify", # Event notification system "crates/obs", # Observability utilities "crates/utils", # Utility functions and helpers "crypto", # Cryptography and security features @@ -51,7 +51,7 @@ rustfs = { path = "./rustfs", version = "0.0.1" } rustfs-zip = { path = "./crates/zip", version = "0.0.1" } rustfs-config = { path = "./crates/config", version = "0.0.1" } rustfs-obs = { path = "crates/obs", version = "0.0.1" } -rustfs-event = { path = "crates/event", version = "0.0.1" } +rustfs-notify = { path = "crates/notify", version = "0.0.1" } rustfs-utils = { path = "crates/utils", version = "0.0.1" } workers = { path = "./common/workers", version = "0.0.1" } tokio-tar = "0.3.1" diff --git a/crates/event/examples/full.rs b/crates/event/examples/full.rs deleted file mode 100644 index 9d03771d..00000000 --- a/crates/event/examples/full.rs +++ /dev/null @@ -1,144 +0,0 @@ -use rustfs_event::{ - AdapterConfig, Bucket, ChannelAdapterType, Error as NotifierError, Event, Identity, Metadata, Name, NotifierConfig, Object, - Source, WebhookConfig, -}; -use std::collections::HashMap; -use tokio::signal; -use tracing::Level; -use tracing_subscriber::FmtSubscriber; - -async fn setup_notification_system() -> Result<(), NotifierError> { - let config = NotifierConfig { - store_path: "./deploy/logs/event_store".into(), - channel_capacity: 100, - adapters: vec![AdapterConfig::Webhook(WebhookConfig { - endpoint: "http://127.0.0.1:3020/webhook".into(), - auth_token: Some("your-auth-token".into()), - custom_headers: Some(HashMap::new()), - max_retries: 3, - timeout: Some(30), - retry_interval: Some(5), - client_cert: None, - client_key: None, - common: rustfs_event::AdapterCommon { - identifier: "webhook".into(), - comment: "webhook".into(), - enable: true, - queue_dir: "./deploy/logs/event_queue".into(), - queue_limit: 100, - }, - })], - }; - - rustfs_event::initialize(&config).await?; - - // wait for the system to be ready - for _ in 0..50 { - // wait up to 5 seconds - if rustfs_event::is_ready() { - return Ok(()); - } - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - } - - Err(NotifierError::custom("notify the system of initialization timeout")) -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - // initialization log - // tracing_subscriber::fmt::init(); - - let subscriber = FmtSubscriber::builder() - .with_max_level(Level::DEBUG) // set to debug or lower level - .with_target(false) // simplify output - .finish(); - tracing::subscriber::set_global_default(subscriber).expect("failed to set up log subscriber"); - - // set up notification system - if let Err(e) = setup_notification_system().await { - eprintln!("unable to initialize notification system:{}", e); - return Err(e.into()); - } - - // create a shutdown signal processing - let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); - - // start signal processing task - tokio::spawn(async move { - let _ = signal::ctrl_c().await; - println!("Received the shutdown signal and prepared to exit..."); - let _ = shutdown_tx.send(()); - }); - - // main application logic - tokio::select! { - _ = async { - loop { - // application logic - // create an s3 metadata object - let metadata = Metadata { - schema_version: "1.0".to_string(), - configuration_id: "test-config".to_string(), - bucket: Bucket { - name: "my-bucket".to_string(), - owner_identity: Identity { - principal_id: "owner123".to_string(), - }, - arn: "arn:aws:s3:::my-bucket".to_string(), - }, - object: Object { - key: "test.txt".to_string(), - size: Some(1024), - etag: Some("abc123".to_string()), - content_type: Some("text/plain".to_string()), - user_metadata: None, - version_id: None, - sequencer: "1234567890".to_string(), - }, - }; - - // create source object - let source = Source { - host: "localhost".to_string(), - port: "80".to_string(), - user_agent: "curl/7.68.0".to_string(), - }; - - // create events using builder mode - let event = Event::builder() - .event_time("2023-10-01T12:00:00.000Z") - .event_name(Name::ObjectCreatedPut) - .user_identity(Identity { - principal_id: "user123".to_string(), - }) - .s3(metadata) - .source(source) - .channels(vec![ChannelAdapterType::Webhook.to_string()]) - .build() - .expect("failed to create event"); - - if let Err(e) = rustfs_event::send_event(event).await { - eprintln!("send event failed:{}", e); - } - - tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; - } - } => {}, - - _ = &mut shutdown_rx => { - println!("close the app"); - } - } - - // 优雅关闭通知系统 - println!("turn off the notification system"); - if let Err(e) = rustfs_event::shutdown().await { - eprintln!("An error occurred while shutting down the notification system:{}", e); - } else { - println!("the notification system has been closed safely"); - } - - println!("the application has been closed safely"); - Ok(()) -} diff --git a/crates/event/examples/simple.rs b/crates/event/examples/simple.rs deleted file mode 100644 index f72713fd..00000000 --- a/crates/event/examples/simple.rs +++ /dev/null @@ -1,120 +0,0 @@ -use rustfs_event::NotifierSystem; -use rustfs_event::{create_adapters, ChannelAdapterType}; -use rustfs_event::{AdapterConfig, NotifierConfig, WebhookConfig}; -use rustfs_event::{Bucket, Event, Identity, Metadata, Name, Object, Source}; -use std::collections::HashMap; -use std::error; -use std::sync::Arc; -use tokio::signal; -use tracing::Level; -use tracing_subscriber::FmtSubscriber; - -#[tokio::main] -async fn main() -> Result<(), Box> { - let subscriber = FmtSubscriber::builder() - .with_max_level(Level::DEBUG) // set to debug or lower level - .with_target(false) // simplify output - .finish(); - tracing::subscriber::set_global_default(subscriber).expect("failed to set up log subscriber"); - - let config = NotifierConfig { - store_path: "./events".to_string(), - channel_capacity: 100, - adapters: vec![AdapterConfig::Webhook(WebhookConfig { - endpoint: "http://127.0.0.1:3020/webhook".to_string(), - auth_token: Some("secret-token".to_string()), - custom_headers: Some(HashMap::from([("X-Custom".to_string(), "value".to_string())])), - max_retries: 3, - timeout: Some(30), - retry_interval: Some(5), - client_cert: None, - client_key: None, - common: rustfs_event::AdapterCommon { - identifier: "webhook".to_string(), - comment: "webhook".to_string(), - enable: true, - queue_dir: "./deploy/logs/event_queue".to_string(), - queue_limit: 100, - }, - })], - }; - - // event_load_config - // loading configuration from environment variables - let _config = NotifierConfig::event_load_config(Some("./crates/event/examples/event.toml".to_string())); - tracing::info!("event_load_config config: {:?} \n", _config); - dotenvy::dotenv()?; - let _config = NotifierConfig::event_load_config(None); - tracing::info!("event_load_config config: {:?} \n", _config); - let system = Arc::new(tokio::sync::Mutex::new(NotifierSystem::new(config.clone()).await?)); - let adapters = create_adapters(&config.adapters)?; - - // create an s3 metadata object - let metadata = Metadata { - schema_version: "1.0".to_string(), - configuration_id: "test-config".to_string(), - bucket: Bucket { - name: "my-bucket".to_string(), - owner_identity: Identity { - principal_id: "owner123".to_string(), - }, - arn: "arn:aws:s3:::my-bucket".to_string(), - }, - object: Object { - key: "test.txt".to_string(), - size: Some(1024), - etag: Some("abc123".to_string()), - content_type: Some("text/plain".to_string()), - user_metadata: None, - version_id: None, - sequencer: "1234567890".to_string(), - }, - }; - - // create source object - let source = Source { - host: "localhost".to_string(), - port: "80".to_string(), - user_agent: "curl/7.68.0".to_string(), - }; - - // create events using builder mode - let event = Event::builder() - .event_time("2023-10-01T12:00:00.000Z") - .event_name(Name::ObjectCreatedPut) - .user_identity(Identity { - principal_id: "user123".to_string(), - }) - .s3(metadata) - .source(source) - .channels(vec![ChannelAdapterType::Webhook.to_string()]) - .build() - .expect("failed to create event"); - - { - let system = system.lock().await; - system.send_event(event).await?; - } - - let system_clone = Arc::clone(&system); - let system_handle = tokio::spawn(async move { - let mut system = system_clone.lock().await; - system.start(adapters).await - }); - - signal::ctrl_c().await?; - tracing::info!("Received shutdown signal"); - let result = { - let mut system = system.lock().await; - system.shutdown().await - }; - - if let Err(e) = result { - tracing::error!("Failed to shut down the notification system: {}", e); - } else { - tracing::info!("Notification system shut down successfully"); - } - - system_handle.await??; - Ok(()) -} diff --git a/crates/event/src/global.rs b/crates/event/src/global.rs deleted file mode 100644 index 51021e2a..00000000 --- a/crates/event/src/global.rs +++ /dev/null @@ -1,234 +0,0 @@ -use crate::{create_adapters, Error, Event, NotifierConfig, NotifierSystem}; -use std::sync::{atomic, Arc}; -use tokio::sync::{Mutex, OnceCell}; -use tracing::instrument; - -static GLOBAL_SYSTEM: OnceCell>> = OnceCell::const_new(); -static INITIALIZED: atomic::AtomicBool = atomic::AtomicBool::new(false); -static READY: atomic::AtomicBool = atomic::AtomicBool::new(false); -static INIT_LOCK: Mutex<()> = Mutex::const_new(()); - -/// Initializes the global notification system. -/// -/// This function performs the following steps: -/// 1. Checks if the system is already initialized. -/// 2. Creates a new `NotificationSystem` instance. -/// 3. Creates adapters based on the provided configuration. -/// 4. Starts the notification system with the created adapters. -/// 5. Sets the global system instance. -/// -/// # Errors -/// -/// Returns an error if: -/// - The system is already initialized. -/// - Creating the `NotificationSystem` fails. -/// - Creating adapters fails. -/// - Starting the notification system fails. -/// - Setting the global system instance fails. -#[instrument] -pub async fn initialize(config: &NotifierConfig) -> Result<(), Error> { - let _lock = INIT_LOCK.lock().await; - - // Check if the system is already initialized. - if INITIALIZED.load(atomic::Ordering::SeqCst) { - return Err(Error::custom("Notification system has already been initialized")); - } - - // Check if the system is already ready. - if READY.load(atomic::Ordering::SeqCst) { - return Err(Error::custom("Notification system is already ready")); - } - - // Check if the system is shutting down. - if let Some(system) = GLOBAL_SYSTEM.get() { - let system_guard = system.lock().await; - if system_guard.shutdown_cancelled() { - return Err(Error::custom("Notification system is shutting down")); - } - } - - // check if config adapters len is than 0 - if config.adapters.is_empty() { - return Err(Error::custom("No adapters configured")); - } - - // Attempt to initialize, and reset the INITIALIZED flag if it fails. - let result: Result<(), Error> = async { - let system = NotifierSystem::new(config.clone()).await.map_err(|e| { - tracing::error!("Failed to create NotificationSystem: {:?}", e); - e - })?; - let adapters = create_adapters(&config.adapters).map_err(|e| { - tracing::error!("Failed to create adapters: {:?}", e); - e - })?; - tracing::info!("adapters len:{:?}", adapters.len()); - let system_clone = Arc::new(Mutex::new(system)); - let adapters_clone = adapters.clone(); - - GLOBAL_SYSTEM.set(system_clone.clone()).map_err(|_| { - let err = Error::custom("Unable to set up global notification system"); - tracing::error!("{:?}", err); - err - })?; - - tokio::spawn(async move { - if let Err(e) = system_clone.lock().await.start(adapters_clone).await { - tracing::error!("Notification system failed to start: {}", e); - } - tracing::info!("Notification system started in background"); - }); - tracing::info!("system start success,start set READY value"); - - READY.store(true, atomic::Ordering::SeqCst); - tracing::info!("Notification system is ready to process events"); - - Ok(()) - } - .await; - - if result.is_err() { - INITIALIZED.store(false, atomic::Ordering::SeqCst); - READY.store(false, atomic::Ordering::SeqCst); - return result; - } - - INITIALIZED.store(true, atomic::Ordering::SeqCst); - Ok(()) -} - -/// Checks if the notification system is initialized. -pub fn is_initialized() -> bool { - INITIALIZED.load(atomic::Ordering::SeqCst) -} - -/// Checks if the notification system is ready. -pub fn is_ready() -> bool { - READY.load(atomic::Ordering::SeqCst) -} - -/// Sends an event to the notification system. -/// -/// # Errors -/// -/// Returns an error if: -/// - The system is not initialized. -/// - The system is not ready. -/// - Sending the event fails. -#[instrument(fields(event))] -pub async fn send_event(event: Event) -> Result<(), Error> { - if !READY.load(atomic::Ordering::SeqCst) { - return Err(Error::custom("Notification system not ready, please wait for initialization to complete")); - } - - let system = get_system().await?; - let system_guard = system.lock().await; - system_guard.send_event(event).await -} - -/// Shuts down the notification system. -#[instrument] -pub async fn shutdown() -> Result<(), Error> { - if let Some(system) = GLOBAL_SYSTEM.get() { - tracing::info!("Shutting down notification system start"); - let result = { - let mut system_guard = system.lock().await; - system_guard.shutdown().await - }; - if let Err(e) = &result { - tracing::error!("Notification system shutdown failed: {}", e); - } else { - tracing::info!("Event bus shutdown completed"); - } - - tracing::info!( - "Shutdown method called set static value start, READY: {}, INITIALIZED: {}", - READY.load(atomic::Ordering::SeqCst), - INITIALIZED.load(atomic::Ordering::SeqCst) - ); - READY.store(false, atomic::Ordering::SeqCst); - INITIALIZED.store(false, atomic::Ordering::SeqCst); - tracing::info!( - "Shutdown method called set static value end, READY: {}, INITIALIZED: {}", - READY.load(atomic::Ordering::SeqCst), - INITIALIZED.load(atomic::Ordering::SeqCst) - ); - result - } else { - Err(Error::custom("Notification system not initialized")) - } -} - -/// Retrieves the global notification system instance. -/// -/// # Errors -/// -/// Returns an error if the system is not initialized. -async fn get_system() -> Result>, Error> { - GLOBAL_SYSTEM - .get() - .cloned() - .ok_or_else(|| Error::custom("Notification system not initialized")) -} - -#[cfg(test)] -mod tests { - use crate::{initialize, is_initialized, is_ready, NotifierConfig}; - - fn init_tracing() { - // Use try_init to avoid panic if already initialized - let _ = tracing_subscriber::fmt::try_init(); - } - - #[tokio::test] - async fn test_initialize_success() { - init_tracing(); - let config = NotifierConfig::default(); // assume there is a default configuration - let result = initialize(&config).await; - assert!(result.is_err(), "Initialization should not succeed"); - assert!(!is_initialized(), "System should not be marked as initialized"); - assert!(!is_ready(), "System should not be marked as ready"); - } - - #[tokio::test] - async fn test_initialize_twice() { - init_tracing(); - let config = NotifierConfig::default(); - let _ = initialize(&config.clone()).await; // first initialization - let result = initialize(&config).await; // second initialization - assert!(result.is_err(), "Initialization should succeed"); - assert!(result.is_err(), "Re-initialization should fail"); - } - - #[tokio::test] - async fn test_initialize_failure_resets_state() { - init_tracing(); - // Test with empty adapters to force failure - let config = NotifierConfig { - adapters: Vec::new(), - ..Default::default() - }; - let result = initialize(&config).await; - assert!(result.is_err(), "Initialization should fail with empty adapters"); - assert!(!is_initialized(), "System should not be marked as initialized after failure"); - assert!(!is_ready(), "System should not be marked as ready after failure"); - } - - #[tokio::test] - async fn test_is_initialized_and_is_ready() { - init_tracing(); - // Initially, the system should not be initialized or ready - assert!(!is_initialized(), "System should not be initialized initially"); - assert!(!is_ready(), "System should not be ready initially"); - - // Test with empty adapters to ensure failure - let config = NotifierConfig { - adapters: Vec::new(), - ..Default::default() - }; - let result = initialize(&config).await; - assert!(result.is_err(), "Initialization should fail with empty adapters"); - assert!(!is_initialized(), "System should not be initialized after failed init"); - assert!(!is_ready(), "System should not be ready after failed init"); - } -} diff --git a/crates/event/src/notifier.rs b/crates/event/src/notifier.rs deleted file mode 100644 index db3bbc9c..00000000 --- a/crates/event/src/notifier.rs +++ /dev/null @@ -1,136 +0,0 @@ -use crate::{event_bus, ChannelAdapter, Error, Event, EventStore, NotifierConfig}; -use std::sync::Arc; -use tokio::sync::mpsc; -use tokio_util::sync::CancellationToken; -use tracing::instrument; - -/// The `NotificationSystem` struct represents the notification system. -/// It manages the event bus and the adapters. -/// It is responsible for sending and receiving events. -/// It also handles the shutdown process. -pub struct NotifierSystem { - tx: mpsc::Sender, - rx: Option>, - store: Arc, - shutdown: CancellationToken, - shutdown_complete: Option>, - shutdown_receiver: Option>, -} - -impl NotifierSystem { - /// Creates a new `NotificationSystem` instance. - #[instrument(skip(config))] - pub async fn new(config: NotifierConfig) -> Result { - let (tx, rx) = mpsc::channel::(config.channel_capacity.try_into().unwrap()); - let store = Arc::new(EventStore::new(&config.store_path).await?); - let shutdown = CancellationToken::new(); - - let restored_logs = store.load_logs().await?; - for log in restored_logs { - for event in log.records { - // For example, where the send method may return a SendError when calling it - tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?; - } - } - // Initialize shutdown_complete to Some(tx) - let (complete_tx, complete_rx) = tokio::sync::oneshot::channel(); - Ok(Self { - tx, - rx: Some(rx), - store, - shutdown, - shutdown_complete: Some(complete_tx), - shutdown_receiver: Some(complete_rx), - }) - } - - /// Starts the notification system. - /// It initializes the event bus and the producer. - #[instrument(skip_all)] - pub async fn start(&mut self, adapters: Vec>) -> Result<(), Error> { - if self.shutdown.is_cancelled() { - let error = Error::custom("System is shutting down"); - self.handle_error("start", &error); - return Err(error); - } - self.log(tracing::Level::INFO, "start", "Starting the notification system"); - let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?; - let shutdown_clone = self.shutdown.clone(); - let store_clone = self.store.clone(); - let shutdown_complete = self.shutdown_complete.take(); - - tokio::spawn(async move { - if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone, shutdown_complete).await { - tracing::error!("Event bus failed: {}", e); - } - }); - self.log(tracing::Level::INFO, "start", "Notification system started successfully"); - Ok(()) - } - - /// Sends an event to the notification system. - /// This method is used to send events to the event bus. - #[instrument(skip(self))] - pub async fn send_event(&self, event: Event) -> Result<(), Error> { - self.log(tracing::Level::DEBUG, "send_event", &format!("Sending event: {:?}", event)); - if self.shutdown.is_cancelled() { - let error = Error::custom("System is shutting down"); - self.handle_error("send_event", &error); - return Err(error); - } - if let Err(e) = self.tx.send(event).await { - let error = Error::ChannelSend(Box::new(e)); - self.handle_error("send_event", &error); - return Err(error); - } - self.log(tracing::Level::INFO, "send_event", "Event sent successfully"); - Ok(()) - } - - /// Shuts down the notification system. - /// This method is used to cancel the event bus and producer tasks. - #[instrument(skip(self))] - pub async fn shutdown(&mut self) -> Result<(), Error> { - tracing::info!("Shutting down the notification system"); - self.shutdown.cancel(); - // wait for the event bus to be completely closed - if let Some(receiver) = self.shutdown_receiver.take() { - match receiver.await { - Ok(_) => { - tracing::info!("Event bus shutdown completed successfully"); - Ok(()) - } - Err(e) => { - let error = Error::custom(format!("Failed to receive shutdown completion: {}", e).as_str()); - self.handle_error("shutdown", &error); - Err(error) - } - } - } else { - tracing::warn!("Shutdown receiver not available, the event bus might still be running"); - Err(Error::custom("Shutdown receiver not available")) - } - } - - /// shutdown state - pub fn shutdown_cancelled(&self) -> bool { - self.shutdown.is_cancelled() - } - - #[instrument(skip(self))] - pub fn handle_error(&self, context: &str, error: &Error) { - self.log(tracing::Level::ERROR, context, &format!("{:?}", error)); - // TODO Can be extended to record to files or send to monitoring systems - } - - #[instrument(skip(self))] - fn log(&self, level: tracing::Level, context: &str, message: &str) { - match level { - tracing::Level::ERROR => tracing::error!("[{}] {}", context, message), - tracing::Level::WARN => tracing::warn!("[{}] {}", context, message), - tracing::Level::INFO => tracing::info!("[{}] {}", context, message), - tracing::Level::DEBUG => tracing::debug!("[{}] {}", context, message), - tracing::Level::TRACE => tracing::trace!("[{}] {}", context, message), - } - } -} diff --git a/crates/event/tests/integration.rs b/crates/event/tests/integration.rs deleted file mode 100644 index b4ede35c..00000000 --- a/crates/event/tests/integration.rs +++ /dev/null @@ -1,177 +0,0 @@ -use rustfs_event::{AdapterCommon, AdapterConfig, ChannelAdapterType, NotifierSystem, WebhookConfig}; -use rustfs_event::{Bucket, Event, EventBuilder, Identity, Metadata, Name, Object, Source}; -use rustfs_event::{ChannelAdapter, WebhookAdapter}; -use std::collections::HashMap; -use std::sync::Arc; - -#[tokio::test] -async fn test_webhook_adapter() { - let adapter = WebhookAdapter::new(WebhookConfig { - common: AdapterCommon { - identifier: "webhook".to_string(), - comment: "webhook".to_string(), - enable: true, - queue_dir: "./deploy/logs/event_queue".to_string(), - queue_limit: 100, - }, - endpoint: "http://localhost:8080/webhook".to_string(), - auth_token: None, - custom_headers: None, - max_retries: 1, - timeout: Some(5), - retry_interval: Some(5), - client_cert: None, - client_key: None, - }); - - // create an s3 metadata object - let metadata = Metadata { - schema_version: "1.0".to_string(), - configuration_id: "test-config".to_string(), - bucket: Bucket { - name: "my-bucket".to_string(), - owner_identity: Identity { - principal_id: "owner123".to_string(), - }, - arn: "arn:aws:s3:::my-bucket".to_string(), - }, - object: Object { - key: "test.txt".to_string(), - size: Some(1024), - etag: Some("abc123".to_string()), - content_type: Some("text/plain".to_string()), - user_metadata: None, - version_id: None, - sequencer: "1234567890".to_string(), - }, - }; - - // create source object - let source = Source { - host: "localhost".to_string(), - port: "80".to_string(), - user_agent: "curl/7.68.0".to_string(), - }; - - // Create events using builder mode - let event = Event::builder() - .event_version("2.0") - .event_source("aws:s3") - .aws_region("us-east-1") - .event_time("2023-10-01T12:00:00.000Z") - .event_name(Name::ObjectCreatedPut) - .user_identity(Identity { - principal_id: "user123".to_string(), - }) - .request_parameters(HashMap::new()) - .response_elements(HashMap::new()) - .s3(metadata) - .source(source) - .channels(vec![ChannelAdapterType::Webhook.to_string()]) - .build() - .expect("failed to create event"); - - let result = adapter.send(&event).await; - assert!(result.is_err()); -} - -#[tokio::test] -async fn test_notification_system() { - let config = rustfs_event::NotifierConfig { - store_path: "./test_events".to_string(), - channel_capacity: 100, - adapters: vec![AdapterConfig::Webhook(WebhookConfig { - common: Default::default(), - endpoint: "http://localhost:8080/webhook".to_string(), - auth_token: None, - custom_headers: None, - max_retries: 1, - timeout: Some(5), - retry_interval: Some(5), - client_cert: None, - client_key: None, - })], - }; - let system = Arc::new(tokio::sync::Mutex::new(NotifierSystem::new(config.clone()).await.unwrap())); - let adapters: Vec> = vec![Arc::new(WebhookAdapter::new(WebhookConfig { - common: Default::default(), - endpoint: "http://localhost:8080/webhook".to_string(), - auth_token: None, - custom_headers: None, - max_retries: 1, - timeout: Some(5), - retry_interval: Some(5), - client_cert: None, - client_key: None, - }))]; - - // create an s3 metadata object - let metadata = Metadata { - schema_version: "1.0".to_string(), - configuration_id: "test-config".to_string(), - bucket: Bucket { - name: "my-bucket".to_string(), - owner_identity: Identity { - principal_id: "owner123".to_string(), - }, - arn: "arn:aws:s3:::my-bucket".to_string(), - }, - object: Object { - key: "test.txt".to_string(), - size: Some(1024), - etag: Some("abc123".to_string()), - content_type: Some("text/plain".to_string()), - user_metadata: None, - version_id: None, - sequencer: "1234567890".to_string(), - }, - }; - - // create source object - let source = Source { - host: "localhost".to_string(), - port: "80".to_string(), - user_agent: "curl/7.68.0".to_string(), - }; - - // create a preconfigured builder with objects - let event = EventBuilder::for_object_creation(metadata, source) - .user_identity(Identity { - principal_id: "user123".to_string(), - }) - .event_time("2023-10-01T12:00:00.000Z") - .channels(vec![ChannelAdapterType::Webhook.to_string()]) - .build() - .expect("failed to create event"); - - { - let system_lock = system.lock().await; - system_lock.send_event(event).await.unwrap(); - } - - let system_clone = Arc::clone(&system); - let system_handle = tokio::spawn(async move { - let mut system = system_clone.lock().await; - system.start(adapters).await - }); - - // set 10 seconds timeout - match tokio::time::timeout(std::time::Duration::from_secs(10), system_handle).await { - Ok(result) => { - println!("System started successfully"); - assert!(result.is_ok()); - } - Err(_) => { - println!("System operation timed out, forcing shutdown"); - // create a new task to handle the timeout - let system = Arc::clone(&system); - tokio::spawn(async move { - if let Ok(mut guard) = system.try_lock() { - guard.shutdown().await.unwrap(); - } - }); - // give the system some time to clean up resources - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - } -} diff --git a/crates/event/Cargo.toml b/crates/notify/Cargo.toml similarity index 98% rename from crates/event/Cargo.toml rename to crates/notify/Cargo.toml index 8256f54f..ce65e1f8 100644 --- a/crates/event/Cargo.toml +++ b/crates/notify/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "rustfs-event" +name = "rustfs-notify" edition.workspace = true license.workspace = true repository.workspace = true diff --git a/crates/event/examples/.env.example b/crates/notify/examples/.env.example similarity index 100% rename from crates/event/examples/.env.example rename to crates/notify/examples/.env.example diff --git a/crates/event/examples/.env.zh.example b/crates/notify/examples/.env.zh.example similarity index 100% rename from crates/event/examples/.env.zh.example rename to crates/notify/examples/.env.zh.example diff --git a/crates/event/examples/event.toml b/crates/notify/examples/event.toml similarity index 100% rename from crates/event/examples/event.toml rename to crates/notify/examples/event.toml diff --git a/crates/event/examples/webhook.rs b/crates/notify/examples/webhook.rs similarity index 100% rename from crates/event/examples/webhook.rs rename to crates/notify/examples/webhook.rs diff --git a/crates/event/src/adapter/kafka.rs b/crates/notify/src/adapter/kafka.rs similarity index 100% rename from crates/event/src/adapter/kafka.rs rename to crates/notify/src/adapter/kafka.rs diff --git a/crates/event/src/adapter/mod.rs b/crates/notify/src/adapter/mod.rs similarity index 100% rename from crates/event/src/adapter/mod.rs rename to crates/notify/src/adapter/mod.rs diff --git a/crates/event/src/adapter/mqtt.rs b/crates/notify/src/adapter/mqtt.rs similarity index 100% rename from crates/event/src/adapter/mqtt.rs rename to crates/notify/src/adapter/mqtt.rs diff --git a/crates/event/src/adapter/webhook.rs b/crates/notify/src/adapter/webhook.rs similarity index 98% rename from crates/event/src/adapter/webhook.rs rename to crates/notify/src/adapter/webhook.rs index 1a8e171d..93a703e3 100644 --- a/crates/event/src/adapter/webhook.rs +++ b/crates/notify/src/adapter/webhook.rs @@ -7,6 +7,7 @@ use crate::{Event, DEFAULT_RETRY_INTERVAL}; use async_trait::async_trait; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use reqwest::{self, Client, Identity, RequestBuilder}; +use serde_json::to_string; use std::fs; use std::path::PathBuf; use std::sync::Arc; @@ -105,7 +106,8 @@ impl WebhookAdapter { } else { crate::config::default_queue_limit() }; - let store = QueueStore::new(store_path, queue_limit, Some(".event".to_string())); + let name = config.common.identifier.clone(); + let store = QueueStore::new(store_path, name, queue_limit, Some(".event".to_string())); if let Err(e) = store.open() { tracing::error!("Unable to open queue storage: {}", e); None diff --git a/crates/event/src/config/adapter.rs b/crates/notify/src/config/adapter.rs similarity index 100% rename from crates/event/src/config/adapter.rs rename to crates/notify/src/config/adapter.rs diff --git a/crates/event/src/config/kafka.rs b/crates/notify/src/config/kafka.rs similarity index 100% rename from crates/event/src/config/kafka.rs rename to crates/notify/src/config/kafka.rs diff --git a/crates/event/src/config/mod.rs b/crates/notify/src/config/mod.rs similarity index 100% rename from crates/event/src/config/mod.rs rename to crates/notify/src/config/mod.rs diff --git a/crates/event/src/config/mqtt.rs b/crates/notify/src/config/mqtt.rs similarity index 100% rename from crates/event/src/config/mqtt.rs rename to crates/notify/src/config/mqtt.rs diff --git a/crates/event/src/config/notifier.rs b/crates/notify/src/config/notifier.rs similarity index 100% rename from crates/event/src/config/notifier.rs rename to crates/notify/src/config/notifier.rs diff --git a/crates/event/src/config/webhook.rs b/crates/notify/src/config/webhook.rs similarity index 100% rename from crates/event/src/config/webhook.rs rename to crates/notify/src/config/webhook.rs diff --git a/crates/event/src/error.rs b/crates/notify/src/error.rs similarity index 100% rename from crates/event/src/error.rs rename to crates/notify/src/error.rs diff --git a/crates/event/src/event.rs b/crates/notify/src/event.rs similarity index 100% rename from crates/event/src/event.rs rename to crates/notify/src/event.rs diff --git a/crates/event/src/lib.rs b/crates/notify/src/lib.rs similarity index 85% rename from crates/event/src/lib.rs rename to crates/notify/src/lib.rs index 4b4759b4..fc31e172 100644 --- a/crates/event/src/lib.rs +++ b/crates/notify/src/lib.rs @@ -2,11 +2,9 @@ mod adapter; mod config; mod error; mod event; -mod event_notifier; -mod event_system; -mod global; mod notifier; mod store; +mod system; pub use adapter::create_adapters; #[cfg(all(feature = "kafka", target_os = "linux"))] @@ -15,21 +13,21 @@ pub use adapter::kafka::KafkaAdapter; pub use adapter::mqtt::MqttAdapter; #[cfg(feature = "webhook")] pub use adapter::webhook::WebhookAdapter; + pub use adapter::ChannelAdapter; pub use adapter::ChannelAdapterType; pub use config::adapter::AdapterCommon; pub use config::adapter::AdapterConfig; +pub use config::notifier::EventNotifierConfig; +pub use config::{DEFAULT_MAX_RETRIES, DEFAULT_RETRY_INTERVAL}; +pub use error::Error; +pub use event::{Bucket, Event, EventBuilder, Identity, Log, Metadata, Name, Object, Source}; +pub use store::queue::QueueStore; + #[cfg(all(feature = "kafka", target_os = "linux"))] pub use config::kafka::KafkaConfig; #[cfg(feature = "mqtt")] pub use config::mqtt::MqttConfig; -pub use config::notifier::EventNotifierConfig; + #[cfg(feature = "webhook")] pub use config::webhook::WebhookConfig; -pub use config::{DEFAULT_MAX_RETRIES, DEFAULT_RETRY_INTERVAL}; -pub use error::Error; - -pub use event::{Bucket, Event, EventBuilder, Identity, Log, Metadata, Name, Object, Source}; -pub use global::{initialize, is_initialized, is_ready, send_event, shutdown}; -pub use notifier::NotifierSystem; -pub use store::queue::QueueStore; diff --git a/crates/event/src/event_notifier.rs b/crates/notify/src/notifier.rs similarity index 61% rename from crates/event/src/event_notifier.rs rename to crates/notify/src/notifier.rs index c9c80d2e..469b6fd9 100644 --- a/crates/event/src/event_notifier.rs +++ b/crates/notify/src/notifier.rs @@ -6,79 +6,80 @@ use std::sync::Arc; use tokio::sync::{broadcast, mpsc}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn}; -use tracing_subscriber::util::SubscriberInitExt; -/// 事件通知器 +/// Event Notifier pub struct EventNotifier { - /// 事件发送通道 + /// The event sending channel sender: mpsc::Sender, - /// 接收器任务句柄 + /// Receiver task handle task_handle: Option>, - /// 配置信息 + /// Configuration information config: EventNotifierConfig, - /// 关闭标记 + /// Turn off tagging shutdown: CancellationToken, - /// 关闭通知通道 + /// Close the notification channel shutdown_complete_tx: Option>, } impl EventNotifier { - /// 创建新的事件通知器 + /// Create a new event notifier #[instrument(skip_all)] pub async fn new(store: Arc) -> Result { let manager = crate::store::manager::EventManager::new(store); - // 初始化配置 - let config = manager.init().await?; + let manager = Arc::new(manager.await); - // 创建适配器 - let adapters = manager.create_adapters().await?; - info!("创建了 {} 个适配器", adapters.len()); + // Initialize the configuration + let config = manager.clone().init().await?; - // 创建关闭标记 + // Create adapters + let adapters = manager.clone().create_adapters().await?; + info!("Created {} adapters", adapters.len()); + + // Create a close marker let shutdown = CancellationToken::new(); let (shutdown_complete_tx, _) = broadcast::channel(1); // 创建事件通道 - 使用默认容量,因为每个适配器都有自己的队列 // 这里使用较小的通道容量,因为事件会被快速分发到适配器 - let (sender, mut receiver) = mpsc::channel(100); + let (sender, mut receiver) = mpsc::channel::(100); let shutdown_clone = shutdown.clone(); let shutdown_complete_tx_clone = shutdown_complete_tx.clone(); let adapters_clone = adapters.clone(); - // 启动事件处理任务 + // Start the event processing task let task_handle = tokio::spawn(async move { - debug!("事件处理任务启动"); + debug!("The event processing task starts"); loop { tokio::select! { Some(event) = receiver.recv() => { - debug!("收到事件:{}", event.id); + debug!("The event is received:{}", event.id); - // 分发到所有适配器 + // Distribute to all adapters for adapter in &adapters_clone { let adapter_name = adapter.name(); match adapter.send(&event).await { Ok(_) => { - debug!("事件 {} 成功发送到适配器 {}", event.id, adapter_name); + debug!("Event {} Successfully sent to the adapter {}", event.id, adapter_name); } Err(e) => { - error!("事件 {} 发送到适配器 {} 失败:{}", event.id, adapter_name, e); + error!("Event {} send to adapter {} failed:{}", event.id, adapter_name, e); } } } } _ = shutdown_clone.cancelled() => { - info!("接收到关闭信号,事件处理任务停止"); + info!("A shutdown signal is received, and the event processing task is stopped"); let _ = shutdown_complete_tx_clone.send(()); break; } } } - debug!("事件处理任务已停止"); + debug!("The event processing task has been stopped"); }); Ok(Self { @@ -90,21 +91,21 @@ impl EventNotifier { }) } - /// 关闭事件通知器 + /// Turn off the event notifier pub async fn shutdown(&mut self) -> Result<()> { - info!("关闭事件通知器"); + info!("Turn off the event notifier"); self.shutdown.cancel(); if let Some(shutdown_tx) = self.shutdown_complete_tx.take() { let mut rx = shutdown_tx.subscribe(); - // 等待关闭完成信号或超时 + // Wait for the shutdown to complete the signal or time out tokio::select! { _ = rx.recv() => { - debug!("收到关闭完成信号"); + debug!("A shutdown completion signal is received"); } _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => { - warn!("关闭超时,强制终止"); + warn!("Shutdown timeout and forced termination"); } } } @@ -112,30 +113,30 @@ impl EventNotifier { if let Some(handle) = self.task_handle.take() { handle.abort(); match handle.await { - Ok(_) => debug!("事件处理任务已正常终止"), + Ok(_) => debug!("The event processing task has been terminated gracefully"), Err(e) => { if e.is_cancelled() { - debug!("事件处理任务已取消"); + debug!("The event processing task has been canceled"); } else { - error!("等待事件处理任务终止时出错:{}", e); + error!("An error occurred while waiting for the event processing task to terminate:{}", e); } } } } - info!("事件通知器已完全关闭"); + info!("The event notifier is completely turned off"); Ok(()) } - /// 发送事件 + /// Send events pub async fn send(&self, event: Event) -> Result<()> { self.sender .send(event) .await - .map_err(|e| Error::msg(format!("发送事件到通道失败:{}", e))) + .map_err(|e| Error::msg(format!("Failed to send events to channel:{}", e))) } - /// 获取当前配置 + /// Get the current configuration pub fn config(&self) -> &EventNotifierConfig { &self.config } diff --git a/crates/event/src/store/manager.rs b/crates/notify/src/store/manager.rs similarity index 100% rename from crates/event/src/store/manager.rs rename to crates/notify/src/store/manager.rs diff --git a/crates/event/src/store/mod.rs b/crates/notify/src/store/mod.rs similarity index 100% rename from crates/event/src/store/mod.rs rename to crates/notify/src/store/mod.rs diff --git a/crates/event/src/store/queue.rs b/crates/notify/src/store/queue.rs similarity index 93% rename from crates/event/src/store/queue.rs rename to crates/notify/src/store/queue.rs index 9ffb6617..16a76818 100644 --- a/crates/event/src/store/queue.rs +++ b/crates/notify/src/store/queue.rs @@ -1,5 +1,4 @@ use common::error::{Error, Result}; -use ecstore::utils::path::dir; use serde::{de::DeserializeOwned, Serialize}; use snap::raw::{Decoder, Encoder}; use std::collections::HashMap; @@ -168,7 +167,7 @@ where } Self { - directory: directory.into(), + directory: directory.as_ref().to_path_buf(), name, entry_limit: limit, file_ext: ext, @@ -298,7 +297,10 @@ where // Update the item mapping let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as i64; - let mut entries = self.entries.write().map_err(|_| Error::msg("获取写锁失败"))?; + let mut entries = self + .entries + .write() + .map_err(|_| Error::msg("Failed to obtain a write lock"))?; entries.insert(key.to_string(), now); Ok(()) @@ -323,48 +325,24 @@ where Ok(data) } } + + /// Check whether the storage limit is reached + fn check_entry_limit(&self) -> Result<()> { + let entries = self.entries.read().map_err(|_| Error::msg("Failed to obtain a read lock"))?; + if entries.len() as u64 >= self.entry_limit { + return Err(Error::msg("The storage limit has been reached")); + } + Ok(()) + } } impl Store for QueueStore where T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, { - fn open(&self) -> Result<()> { - // Create a directory (if it doesn't exist) - fs::create_dir_all(&self.directory)?; - - // Read existing files - let files = self.list_files()?; - - let mut entries = self - .entries - .write() - .map_err(|_| Error::msg("Failed to obtain a write lock"))?; - - for file in files { - if let Ok(meta) = file.metadata() { - if let Ok(modified) = meta.modified() { - if let Ok(since_epoch) = modified.duration_since(UNIX_EPOCH) { - entries.insert(file.file_name().to_string_lossy().to_string(), since_epoch.as_nanos() as i64); - } - } - } - } - - Ok(()) - } - - fn delete(&self) -> Result<()> { - fs::remove_dir_all(&self.directory)?; - Ok(()) - } - fn put(&self, item: T) -> Result { { - let entries = self.entries.read().map_err(|_| Error::msg("Failed to obtain a read lock"))?; - if entries.len() as u64 >= self.entry_limit { - return Err(Error::msg("The storage limit has been reached")); - } + self.check_entry_limit()?; } // generate a new uuid @@ -375,17 +353,13 @@ where Ok(key) } - fn put_multiple(&self, items: Vec) -> Result { if items.is_empty() { return Err(Error::msg("The list of items is empty")); } { - let entries = self.entries.read().map_err(|_| Error::msg("Failed to obtain a read lock"))?; - if entries.len() as u64 >= self.entry_limit { - return Err(Error::msg("The storage limit has been reached")); - } + self.check_entry_limit()?; } // Generate a new UUID @@ -419,19 +393,19 @@ where // if the read fails try parsing it once if reader.read_to_end(&mut buffer).is_err() { // Try to parse the entire data as a single object - match serde_json::from_slice::(&data) { + return match serde_json::from_slice::(&data) { Ok(item) => { items.push(item); - return Ok(items); + Ok(items) } Err(_) => { // An attempt was made to resolve to an array of objects match serde_json::from_slice::>(&data) { - Ok(array_items) => return Ok(array_items), - Err(e) => return Err(Error::msg(format!("Failed to parse the data:{}", e))), + Ok(array_items) => Ok(array_items), + Err(e) => Err(Error::msg(format!("Failed to parse the data:{}", e))), } } - } + }; } // Read JSON objects by row @@ -517,4 +491,34 @@ where Ok(()) } + + fn open(&self) -> Result<()> { + // Create a directory (if it doesn't exist) + fs::create_dir_all(&self.directory)?; + + // Read existing files + let files = self.list_files()?; + + let mut entries = self + .entries + .write() + .map_err(|_| Error::msg("Failed to obtain a write lock"))?; + + for file in files { + if let Ok(meta) = file.metadata() { + if let Ok(modified) = meta.modified() { + if let Ok(since_epoch) = modified.duration_since(UNIX_EPOCH) { + entries.insert(file.file_name().to_string_lossy().to_string(), since_epoch.as_nanos() as i64); + } + } + } + } + + Ok(()) + } + + fn delete(&self) -> Result<()> { + fs::remove_dir_all(&self.directory)?; + Ok(()) + } } diff --git a/crates/event/src/event_system.rs b/crates/notify/src/system.rs similarity index 60% rename from crates/event/src/event_system.rs rename to crates/notify/src/system.rs index 77fd6636..97e9c362 100644 --- a/crates/event/src/event_system.rs +++ b/crates/notify/src/system.rs @@ -1,81 +1,81 @@ use crate::config::notifier::EventNotifierConfig; -use crate::event_notifier::EventNotifier; +use crate::notifier::EventNotifier; use common::error::Result; use ecstore::store::ECStore; use once_cell::sync::OnceCell; use std::sync::{Arc, Mutex}; use tracing::{debug, error, info}; -/// 全局事件系统 +/// Global event system pub struct EventSystem { - /// 事件通知器 + /// Event Notifier notifier: Mutex>, } impl EventSystem { - /// 创建一个新的事件系统 + /// Create a new event system pub fn new() -> Self { Self { notifier: Mutex::new(None), } } - /// 初始化事件系统 + /// Initialize the event system pub async fn init(&self, store: Arc) -> Result { - info!("初始化事件系统"); + info!("Initialize the event system"); let notifier = EventNotifier::new(store).await?; let config = notifier.config().clone(); let mut guard = self .notifier .lock() - .map_err(|e| common::error::Error::msg(format!("获取锁失败:{}", e)))?; + .map_err(|e| common::error::Error::msg(format!("Failed to acquire locks:{}", e)))?; *guard = Some(notifier); - debug!("事件系统初始化完成"); + debug!("The event system initialization is complete"); Ok(config) } - /// 发送事件 + /// Send events pub async fn send_event(&self, event: crate::Event) -> Result<()> { let guard = self .notifier .lock() - .map_err(|e| common::error::Error::msg(format!("获取锁失败:{}", e)))?; + .map_err(|e| common::error::Error::msg(format!("Failed to acquire locks:{}", e)))?; if let Some(notifier) = &*guard { notifier.send(event).await } else { - error!("事件系统未初始化"); - Err(common::error::Error::msg("事件系统未初始化")) + error!("The event system is not initialized"); + Err(common::error::Error::msg("The event system is not initialized")) } } - /// 关闭事件系统 + /// Shut down the event system pub async fn shutdown(&self) -> Result<()> { - info!("关闭事件系统"); + info!("Shut down the event system"); let mut guard = self .notifier .lock() - .map_err(|e| common::error::Error::msg(format!("获取锁失败:{}", e)))?; + .map_err(|e| common::error::Error::msg(format!("Failed to acquire locks:{}", e)))?; if let Some(ref mut notifier) = *guard { notifier.shutdown().await?; *guard = None; - info!("事件系统已关闭"); + info!("The event system is down"); Ok(()) } else { - debug!("事件系统已经关闭"); + debug!("The event system has been shut down"); Ok(()) } } } -/// 全局事件系统实例 +/// A global event system instance pub static GLOBAL_EVENT_SYS: OnceCell = OnceCell::new(); -/// 初始化全局事件系统 +/// Initialize the global event system pub fn init_global_event_system() -> &'static EventSystem { GLOBAL_EVENT_SYS.get_or_init(EventSystem::new) } diff --git a/ecstore/src/config/mod.rs b/ecstore/src/config/mod.rs index cfeded80..ed5c2908 100644 --- a/ecstore/src/config/mod.rs +++ b/ecstore/src/config/mod.rs @@ -78,11 +78,7 @@ impl KVS { KVS(Vec::new()) } pub fn get(&self, key: &str) -> String { - if let Some(v) = self.lookup(key) { - v - } else { - "".to_owned() - } + if let Some(v) = self.lookup(key) { v } else { "".to_owned() } } pub fn lookup(&self, key: &str) -> Option { for kv in self.0.iter() { diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index ae9e65dd..48d58586 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -53,7 +53,7 @@ protos.workspace = true query = { workspace = true } rmp-serde.workspace = true rustfs-config = { workspace = true } -rustfs-event = { workspace = true } +rustfs-notify = { workspace = true } rustfs-obs = { workspace = true } rustfs-utils = { workspace = true, features = ["full"] } rustls.workspace = true diff --git a/rustfs/src/event.rs b/rustfs/src/event.rs index dbbe11e4..7c47f101 100644 --- a/rustfs/src/event.rs +++ b/rustfs/src/event.rs @@ -1,4 +1,5 @@ -use rustfs_event::NotifierConfig; +use rustfs_config::NotifierConfig; +use rustfs_event::EventNotifierConfig; use tracing::{error, info, instrument}; #[instrument] @@ -7,26 +8,26 @@ pub(crate) async fn init_event_notifier(notifier_config: Option) { let notifier_config_present = notifier_config.is_some(); let config = if notifier_config_present { info!("event_config is not empty, path: {:?}", notifier_config); - NotifierConfig::event_load_config(notifier_config) + EventNotifierConfig::event_load_config(notifier_config) } else { info!("event_config is empty"); // rustfs_event::get_event_notifier_config().clone() - NotifierConfig::default() + EventNotifierConfig::default() }; info!("using event_config: {:?}", config); tokio::spawn(async move { - let result = rustfs_event::initialize(&config).await; - match result { - Ok(_) => info!( - "event notifier initialized successfully {}", - if notifier_config_present { - "by config file" - } else { - "by sys config" - } - ), - Err(e) => error!("Failed to initialize event notifier: {}", e), - } + // let result = rustfs_event::initialize(&config).await; + // match result { + // Ok(_) => info!( + // "event notifier initialized successfully {}", + // if notifier_config_present { + // "by config file" + // } else { + // "by sys config" + // } + // ), + // Err(e) => error!("Failed to initialize event notifier: {}", e), + // } }); } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index aa34f961..1146a9fb 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -569,14 +569,14 @@ async fn run(opt: config::Opt) -> Result<()> { // update the status to stopping first state_manager.update(ServiceState::Stopping); - // Stop the notification system - if rustfs_event::is_ready() { - // stop event notifier - rustfs_event::shutdown().await.map_err(|err| { - error!("Failed to shut down the notification system: {}", err); - Error::from_string(err.to_string()) - })?; - } + // // Stop the notification system + // if rustfs_event::is_ready() { + // // stop event notifier + // rustfs_event::shutdown().await.map_err(|err| { + // error!("Failed to shut down the notification system: {}", err); + // Error::from_string(err.to_string()) + // })?; + // } info!("Server is stopping..."); let _ = shutdown_tx.send(()); diff --git a/rustfs/src/storage/event.rs b/rustfs/src/storage/event.rs index 59d17dba..5fc5bef0 100644 --- a/rustfs/src/storage/event.rs +++ b/rustfs/src/storage/event.rs @@ -13,5 +13,6 @@ pub(crate) fn create_metadata() -> Metadata { /// Create a new event object #[allow(dead_code)] pub(crate) async fn send_event(event: Event) -> Result<(), Box> { - rustfs_event::send_event(event).await.map_err(|e| e.into()) + // rustfs_event::send_event(event).await.map_err(|e| e.into()) + Ok(()) }