modify crates name

This commit is contained in:
houseme
2025-06-06 11:14:36 +08:00
parent 8555d48ca2
commit 4199bf2ba4
35 changed files with 172 additions and 980 deletions

46
Cargo.lock generated
View File

@@ -7655,7 +7655,7 @@ dependencies = [
"rmp-serde",
"rust-embed",
"rustfs-config",
"rustfs-event",
"rustfs-notify",
"rustfs-obs",
"rustfs-utils",
"rustfs-zip",
@@ -7693,7 +7693,28 @@ dependencies = [
]
[[package]]
name = "rustfs-event"
name = "rustfs-gui"
version = "0.0.1"
dependencies = [
"chrono",
"dioxus",
"dirs",
"hex",
"keyring",
"lazy_static",
"rfd 0.15.3",
"rust-embed",
"rust-i18n",
"serde",
"serde_json",
"sha2 0.10.9",
"tokio",
"tracing-appender",
"tracing-subscriber",
]
[[package]]
name = "rustfs-notify"
version = "0.0.1"
dependencies = [
"async-trait",
@@ -7721,27 +7742,6 @@ dependencies = [
"uuid",
]
[[package]]
name = "rustfs-gui"
version = "0.0.1"
dependencies = [
"chrono",
"dioxus",
"dirs",
"hex",
"keyring",
"lazy_static",
"rfd 0.15.3",
"rust-embed",
"rust-i18n",
"serde",
"serde_json",
"sha2 0.10.9",
"tokio",
"tracing-appender",
"tracing-subscriber",
]
[[package]]
name = "rustfs-obs"
version = "0.0.1"

View File

@@ -7,7 +7,7 @@ members = [
"common/protos", # Protocol buffer definitions
"common/workers", # Worker thread pools and task scheduling
"crates/config", # Configuration management
"crates/event", # Event notification system
"crates/notify", # Event notification system
"crates/obs", # Observability utilities
"crates/utils", # Utility functions and helpers
"crypto", # Cryptography and security features
@@ -51,7 +51,7 @@ rustfs = { path = "./rustfs", version = "0.0.1" }
rustfs-zip = { path = "./crates/zip", version = "0.0.1" }
rustfs-config = { path = "./crates/config", version = "0.0.1" }
rustfs-obs = { path = "crates/obs", version = "0.0.1" }
rustfs-event = { path = "crates/event", version = "0.0.1" }
rustfs-notify = { path = "crates/notify", version = "0.0.1" }
rustfs-utils = { path = "crates/utils", version = "0.0.1" }
workers = { path = "./common/workers", version = "0.0.1" }
tokio-tar = "0.3.1"

View File

@@ -1,144 +0,0 @@
use rustfs_event::{
AdapterConfig, Bucket, ChannelAdapterType, Error as NotifierError, Event, Identity, Metadata, Name, NotifierConfig, Object,
Source, WebhookConfig,
};
use std::collections::HashMap;
use tokio::signal;
use tracing::Level;
use tracing_subscriber::FmtSubscriber;
async fn setup_notification_system() -> Result<(), NotifierError> {
let config = NotifierConfig {
store_path: "./deploy/logs/event_store".into(),
channel_capacity: 100,
adapters: vec![AdapterConfig::Webhook(WebhookConfig {
endpoint: "http://127.0.0.1:3020/webhook".into(),
auth_token: Some("your-auth-token".into()),
custom_headers: Some(HashMap::new()),
max_retries: 3,
timeout: Some(30),
retry_interval: Some(5),
client_cert: None,
client_key: None,
common: rustfs_event::AdapterCommon {
identifier: "webhook".into(),
comment: "webhook".into(),
enable: true,
queue_dir: "./deploy/logs/event_queue".into(),
queue_limit: 100,
},
})],
};
rustfs_event::initialize(&config).await?;
// wait for the system to be ready
for _ in 0..50 {
// wait up to 5 seconds
if rustfs_event::is_ready() {
return Ok(());
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Err(NotifierError::custom("notify the system of initialization timeout"))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// initialization log
// tracing_subscriber::fmt::init();
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::DEBUG) // set to debug or lower level
.with_target(false) // simplify output
.finish();
tracing::subscriber::set_global_default(subscriber).expect("failed to set up log subscriber");
// set up notification system
if let Err(e) = setup_notification_system().await {
eprintln!("unable to initialize notification system:{}", e);
return Err(e.into());
}
// create a shutdown signal processing
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
// start signal processing task
tokio::spawn(async move {
let _ = signal::ctrl_c().await;
println!("Received the shutdown signal and prepared to exit...");
let _ = shutdown_tx.send(());
});
// main application logic
tokio::select! {
_ = async {
loop {
// application logic
// create an s3 metadata object
let metadata = Metadata {
schema_version: "1.0".to_string(),
configuration_id: "test-config".to_string(),
bucket: Bucket {
name: "my-bucket".to_string(),
owner_identity: Identity {
principal_id: "owner123".to_string(),
},
arn: "arn:aws:s3:::my-bucket".to_string(),
},
object: Object {
key: "test.txt".to_string(),
size: Some(1024),
etag: Some("abc123".to_string()),
content_type: Some("text/plain".to_string()),
user_metadata: None,
version_id: None,
sequencer: "1234567890".to_string(),
},
};
// create source object
let source = Source {
host: "localhost".to_string(),
port: "80".to_string(),
user_agent: "curl/7.68.0".to_string(),
};
// create events using builder mode
let event = Event::builder()
.event_time("2023-10-01T12:00:00.000Z")
.event_name(Name::ObjectCreatedPut)
.user_identity(Identity {
principal_id: "user123".to_string(),
})
.s3(metadata)
.source(source)
.channels(vec![ChannelAdapterType::Webhook.to_string()])
.build()
.expect("failed to create event");
if let Err(e) = rustfs_event::send_event(event).await {
eprintln!("send event failed:{}", e);
}
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
}
} => {},
_ = &mut shutdown_rx => {
println!("close the app");
}
}
// 优雅关闭通知系统
println!("turn off the notification system");
if let Err(e) = rustfs_event::shutdown().await {
eprintln!("An error occurred while shutting down the notification system:{}", e);
} else {
println!("the notification system has been closed safely");
}
println!("the application has been closed safely");
Ok(())
}

View File

@@ -1,120 +0,0 @@
use rustfs_event::NotifierSystem;
use rustfs_event::{create_adapters, ChannelAdapterType};
use rustfs_event::{AdapterConfig, NotifierConfig, WebhookConfig};
use rustfs_event::{Bucket, Event, Identity, Metadata, Name, Object, Source};
use std::collections::HashMap;
use std::error;
use std::sync::Arc;
use tokio::signal;
use tracing::Level;
use tracing_subscriber::FmtSubscriber;
#[tokio::main]
async fn main() -> Result<(), Box<dyn error::Error>> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::DEBUG) // set to debug or lower level
.with_target(false) // simplify output
.finish();
tracing::subscriber::set_global_default(subscriber).expect("failed to set up log subscriber");
let config = NotifierConfig {
store_path: "./events".to_string(),
channel_capacity: 100,
adapters: vec![AdapterConfig::Webhook(WebhookConfig {
endpoint: "http://127.0.0.1:3020/webhook".to_string(),
auth_token: Some("secret-token".to_string()),
custom_headers: Some(HashMap::from([("X-Custom".to_string(), "value".to_string())])),
max_retries: 3,
timeout: Some(30),
retry_interval: Some(5),
client_cert: None,
client_key: None,
common: rustfs_event::AdapterCommon {
identifier: "webhook".to_string(),
comment: "webhook".to_string(),
enable: true,
queue_dir: "./deploy/logs/event_queue".to_string(),
queue_limit: 100,
},
})],
};
// event_load_config
// loading configuration from environment variables
let _config = NotifierConfig::event_load_config(Some("./crates/event/examples/event.toml".to_string()));
tracing::info!("event_load_config config: {:?} \n", _config);
dotenvy::dotenv()?;
let _config = NotifierConfig::event_load_config(None);
tracing::info!("event_load_config config: {:?} \n", _config);
let system = Arc::new(tokio::sync::Mutex::new(NotifierSystem::new(config.clone()).await?));
let adapters = create_adapters(&config.adapters)?;
// create an s3 metadata object
let metadata = Metadata {
schema_version: "1.0".to_string(),
configuration_id: "test-config".to_string(),
bucket: Bucket {
name: "my-bucket".to_string(),
owner_identity: Identity {
principal_id: "owner123".to_string(),
},
arn: "arn:aws:s3:::my-bucket".to_string(),
},
object: Object {
key: "test.txt".to_string(),
size: Some(1024),
etag: Some("abc123".to_string()),
content_type: Some("text/plain".to_string()),
user_metadata: None,
version_id: None,
sequencer: "1234567890".to_string(),
},
};
// create source object
let source = Source {
host: "localhost".to_string(),
port: "80".to_string(),
user_agent: "curl/7.68.0".to_string(),
};
// create events using builder mode
let event = Event::builder()
.event_time("2023-10-01T12:00:00.000Z")
.event_name(Name::ObjectCreatedPut)
.user_identity(Identity {
principal_id: "user123".to_string(),
})
.s3(metadata)
.source(source)
.channels(vec![ChannelAdapterType::Webhook.to_string()])
.build()
.expect("failed to create event");
{
let system = system.lock().await;
system.send_event(event).await?;
}
let system_clone = Arc::clone(&system);
let system_handle = tokio::spawn(async move {
let mut system = system_clone.lock().await;
system.start(adapters).await
});
signal::ctrl_c().await?;
tracing::info!("Received shutdown signal");
let result = {
let mut system = system.lock().await;
system.shutdown().await
};
if let Err(e) = result {
tracing::error!("Failed to shut down the notification system: {}", e);
} else {
tracing::info!("Notification system shut down successfully");
}
system_handle.await??;
Ok(())
}

View File

@@ -1,234 +0,0 @@
use crate::{create_adapters, Error, Event, NotifierConfig, NotifierSystem};
use std::sync::{atomic, Arc};
use tokio::sync::{Mutex, OnceCell};
use tracing::instrument;
static GLOBAL_SYSTEM: OnceCell<Arc<Mutex<NotifierSystem>>> = OnceCell::const_new();
static INITIALIZED: atomic::AtomicBool = atomic::AtomicBool::new(false);
static READY: atomic::AtomicBool = atomic::AtomicBool::new(false);
static INIT_LOCK: Mutex<()> = Mutex::const_new(());
/// Initializes the global notification system.
///
/// This function performs the following steps:
/// 1. Checks if the system is already initialized.
/// 2. Creates a new `NotificationSystem` instance.
/// 3. Creates adapters based on the provided configuration.
/// 4. Starts the notification system with the created adapters.
/// 5. Sets the global system instance.
///
/// # Errors
///
/// Returns an error if:
/// - The system is already initialized.
/// - Creating the `NotificationSystem` fails.
/// - Creating adapters fails.
/// - Starting the notification system fails.
/// - Setting the global system instance fails.
#[instrument]
pub async fn initialize(config: &NotifierConfig) -> Result<(), Error> {
let _lock = INIT_LOCK.lock().await;
// Check if the system is already initialized.
if INITIALIZED.load(atomic::Ordering::SeqCst) {
return Err(Error::custom("Notification system has already been initialized"));
}
// Check if the system is already ready.
if READY.load(atomic::Ordering::SeqCst) {
return Err(Error::custom("Notification system is already ready"));
}
// Check if the system is shutting down.
if let Some(system) = GLOBAL_SYSTEM.get() {
let system_guard = system.lock().await;
if system_guard.shutdown_cancelled() {
return Err(Error::custom("Notification system is shutting down"));
}
}
// check if config adapters len is than 0
if config.adapters.is_empty() {
return Err(Error::custom("No adapters configured"));
}
// Attempt to initialize, and reset the INITIALIZED flag if it fails.
let result: Result<(), Error> = async {
let system = NotifierSystem::new(config.clone()).await.map_err(|e| {
tracing::error!("Failed to create NotificationSystem: {:?}", e);
e
})?;
let adapters = create_adapters(&config.adapters).map_err(|e| {
tracing::error!("Failed to create adapters: {:?}", e);
e
})?;
tracing::info!("adapters len:{:?}", adapters.len());
let system_clone = Arc::new(Mutex::new(system));
let adapters_clone = adapters.clone();
GLOBAL_SYSTEM.set(system_clone.clone()).map_err(|_| {
let err = Error::custom("Unable to set up global notification system");
tracing::error!("{:?}", err);
err
})?;
tokio::spawn(async move {
if let Err(e) = system_clone.lock().await.start(adapters_clone).await {
tracing::error!("Notification system failed to start: {}", e);
}
tracing::info!("Notification system started in background");
});
tracing::info!("system start success,start set READY value");
READY.store(true, atomic::Ordering::SeqCst);
tracing::info!("Notification system is ready to process events");
Ok(())
}
.await;
if result.is_err() {
INITIALIZED.store(false, atomic::Ordering::SeqCst);
READY.store(false, atomic::Ordering::SeqCst);
return result;
}
INITIALIZED.store(true, atomic::Ordering::SeqCst);
Ok(())
}
/// Checks if the notification system is initialized.
pub fn is_initialized() -> bool {
INITIALIZED.load(atomic::Ordering::SeqCst)
}
/// Checks if the notification system is ready.
pub fn is_ready() -> bool {
READY.load(atomic::Ordering::SeqCst)
}
/// Sends an event to the notification system.
///
/// # Errors
///
/// Returns an error if:
/// - The system is not initialized.
/// - The system is not ready.
/// - Sending the event fails.
#[instrument(fields(event))]
pub async fn send_event(event: Event) -> Result<(), Error> {
if !READY.load(atomic::Ordering::SeqCst) {
return Err(Error::custom("Notification system not ready, please wait for initialization to complete"));
}
let system = get_system().await?;
let system_guard = system.lock().await;
system_guard.send_event(event).await
}
/// Shuts down the notification system.
#[instrument]
pub async fn shutdown() -> Result<(), Error> {
if let Some(system) = GLOBAL_SYSTEM.get() {
tracing::info!("Shutting down notification system start");
let result = {
let mut system_guard = system.lock().await;
system_guard.shutdown().await
};
if let Err(e) = &result {
tracing::error!("Notification system shutdown failed: {}", e);
} else {
tracing::info!("Event bus shutdown completed");
}
tracing::info!(
"Shutdown method called set static value start, READY: {}, INITIALIZED: {}",
READY.load(atomic::Ordering::SeqCst),
INITIALIZED.load(atomic::Ordering::SeqCst)
);
READY.store(false, atomic::Ordering::SeqCst);
INITIALIZED.store(false, atomic::Ordering::SeqCst);
tracing::info!(
"Shutdown method called set static value end, READY: {}, INITIALIZED: {}",
READY.load(atomic::Ordering::SeqCst),
INITIALIZED.load(atomic::Ordering::SeqCst)
);
result
} else {
Err(Error::custom("Notification system not initialized"))
}
}
/// Retrieves the global notification system instance.
///
/// # Errors
///
/// Returns an error if the system is not initialized.
async fn get_system() -> Result<Arc<Mutex<NotifierSystem>>, Error> {
GLOBAL_SYSTEM
.get()
.cloned()
.ok_or_else(|| Error::custom("Notification system not initialized"))
}
#[cfg(test)]
mod tests {
use crate::{initialize, is_initialized, is_ready, NotifierConfig};
fn init_tracing() {
// Use try_init to avoid panic if already initialized
let _ = tracing_subscriber::fmt::try_init();
}
#[tokio::test]
async fn test_initialize_success() {
init_tracing();
let config = NotifierConfig::default(); // assume there is a default configuration
let result = initialize(&config).await;
assert!(result.is_err(), "Initialization should not succeed");
assert!(!is_initialized(), "System should not be marked as initialized");
assert!(!is_ready(), "System should not be marked as ready");
}
#[tokio::test]
async fn test_initialize_twice() {
init_tracing();
let config = NotifierConfig::default();
let _ = initialize(&config.clone()).await; // first initialization
let result = initialize(&config).await; // second initialization
assert!(result.is_err(), "Initialization should succeed");
assert!(result.is_err(), "Re-initialization should fail");
}
#[tokio::test]
async fn test_initialize_failure_resets_state() {
init_tracing();
// Test with empty adapters to force failure
let config = NotifierConfig {
adapters: Vec::new(),
..Default::default()
};
let result = initialize(&config).await;
assert!(result.is_err(), "Initialization should fail with empty adapters");
assert!(!is_initialized(), "System should not be marked as initialized after failure");
assert!(!is_ready(), "System should not be marked as ready after failure");
}
#[tokio::test]
async fn test_is_initialized_and_is_ready() {
init_tracing();
// Initially, the system should not be initialized or ready
assert!(!is_initialized(), "System should not be initialized initially");
assert!(!is_ready(), "System should not be ready initially");
// Test with empty adapters to ensure failure
let config = NotifierConfig {
adapters: Vec::new(),
..Default::default()
};
let result = initialize(&config).await;
assert!(result.is_err(), "Initialization should fail with empty adapters");
assert!(!is_initialized(), "System should not be initialized after failed init");
assert!(!is_ready(), "System should not be ready after failed init");
}
}

View File

@@ -1,136 +0,0 @@
use crate::{event_bus, ChannelAdapter, Error, Event, EventStore, NotifierConfig};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
/// The `NotificationSystem` struct represents the notification system.
/// It manages the event bus and the adapters.
/// It is responsible for sending and receiving events.
/// It also handles the shutdown process.
pub struct NotifierSystem {
tx: mpsc::Sender<Event>,
rx: Option<mpsc::Receiver<Event>>,
store: Arc<EventStore>,
shutdown: CancellationToken,
shutdown_complete: Option<tokio::sync::oneshot::Sender<()>>,
shutdown_receiver: Option<tokio::sync::oneshot::Receiver<()>>,
}
impl NotifierSystem {
/// Creates a new `NotificationSystem` instance.
#[instrument(skip(config))]
pub async fn new(config: NotifierConfig) -> Result<Self, Error> {
let (tx, rx) = mpsc::channel::<Event>(config.channel_capacity.try_into().unwrap());
let store = Arc::new(EventStore::new(&config.store_path).await?);
let shutdown = CancellationToken::new();
let restored_logs = store.load_logs().await?;
for log in restored_logs {
for event in log.records {
// For example, where the send method may return a SendError when calling it
tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?;
}
}
// Initialize shutdown_complete to Some(tx)
let (complete_tx, complete_rx) = tokio::sync::oneshot::channel();
Ok(Self {
tx,
rx: Some(rx),
store,
shutdown,
shutdown_complete: Some(complete_tx),
shutdown_receiver: Some(complete_rx),
})
}
/// Starts the notification system.
/// It initializes the event bus and the producer.
#[instrument(skip_all)]
pub async fn start(&mut self, adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
if self.shutdown.is_cancelled() {
let error = Error::custom("System is shutting down");
self.handle_error("start", &error);
return Err(error);
}
self.log(tracing::Level::INFO, "start", "Starting the notification system");
let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?;
let shutdown_clone = self.shutdown.clone();
let store_clone = self.store.clone();
let shutdown_complete = self.shutdown_complete.take();
tokio::spawn(async move {
if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone, shutdown_complete).await {
tracing::error!("Event bus failed: {}", e);
}
});
self.log(tracing::Level::INFO, "start", "Notification system started successfully");
Ok(())
}
/// Sends an event to the notification system.
/// This method is used to send events to the event bus.
#[instrument(skip(self))]
pub async fn send_event(&self, event: Event) -> Result<(), Error> {
self.log(tracing::Level::DEBUG, "send_event", &format!("Sending event: {:?}", event));
if self.shutdown.is_cancelled() {
let error = Error::custom("System is shutting down");
self.handle_error("send_event", &error);
return Err(error);
}
if let Err(e) = self.tx.send(event).await {
let error = Error::ChannelSend(Box::new(e));
self.handle_error("send_event", &error);
return Err(error);
}
self.log(tracing::Level::INFO, "send_event", "Event sent successfully");
Ok(())
}
/// Shuts down the notification system.
/// This method is used to cancel the event bus and producer tasks.
#[instrument(skip(self))]
pub async fn shutdown(&mut self) -> Result<(), Error> {
tracing::info!("Shutting down the notification system");
self.shutdown.cancel();
// wait for the event bus to be completely closed
if let Some(receiver) = self.shutdown_receiver.take() {
match receiver.await {
Ok(_) => {
tracing::info!("Event bus shutdown completed successfully");
Ok(())
}
Err(e) => {
let error = Error::custom(format!("Failed to receive shutdown completion: {}", e).as_str());
self.handle_error("shutdown", &error);
Err(error)
}
}
} else {
tracing::warn!("Shutdown receiver not available, the event bus might still be running");
Err(Error::custom("Shutdown receiver not available"))
}
}
/// shutdown state
pub fn shutdown_cancelled(&self) -> bool {
self.shutdown.is_cancelled()
}
#[instrument(skip(self))]
pub fn handle_error(&self, context: &str, error: &Error) {
self.log(tracing::Level::ERROR, context, &format!("{:?}", error));
// TODO Can be extended to record to files or send to monitoring systems
}
#[instrument(skip(self))]
fn log(&self, level: tracing::Level, context: &str, message: &str) {
match level {
tracing::Level::ERROR => tracing::error!("[{}] {}", context, message),
tracing::Level::WARN => tracing::warn!("[{}] {}", context, message),
tracing::Level::INFO => tracing::info!("[{}] {}", context, message),
tracing::Level::DEBUG => tracing::debug!("[{}] {}", context, message),
tracing::Level::TRACE => tracing::trace!("[{}] {}", context, message),
}
}
}

View File

@@ -1,177 +0,0 @@
use rustfs_event::{AdapterCommon, AdapterConfig, ChannelAdapterType, NotifierSystem, WebhookConfig};
use rustfs_event::{Bucket, Event, EventBuilder, Identity, Metadata, Name, Object, Source};
use rustfs_event::{ChannelAdapter, WebhookAdapter};
use std::collections::HashMap;
use std::sync::Arc;
#[tokio::test]
async fn test_webhook_adapter() {
let adapter = WebhookAdapter::new(WebhookConfig {
common: AdapterCommon {
identifier: "webhook".to_string(),
comment: "webhook".to_string(),
enable: true,
queue_dir: "./deploy/logs/event_queue".to_string(),
queue_limit: 100,
},
endpoint: "http://localhost:8080/webhook".to_string(),
auth_token: None,
custom_headers: None,
max_retries: 1,
timeout: Some(5),
retry_interval: Some(5),
client_cert: None,
client_key: None,
});
// create an s3 metadata object
let metadata = Metadata {
schema_version: "1.0".to_string(),
configuration_id: "test-config".to_string(),
bucket: Bucket {
name: "my-bucket".to_string(),
owner_identity: Identity {
principal_id: "owner123".to_string(),
},
arn: "arn:aws:s3:::my-bucket".to_string(),
},
object: Object {
key: "test.txt".to_string(),
size: Some(1024),
etag: Some("abc123".to_string()),
content_type: Some("text/plain".to_string()),
user_metadata: None,
version_id: None,
sequencer: "1234567890".to_string(),
},
};
// create source object
let source = Source {
host: "localhost".to_string(),
port: "80".to_string(),
user_agent: "curl/7.68.0".to_string(),
};
// Create events using builder mode
let event = Event::builder()
.event_version("2.0")
.event_source("aws:s3")
.aws_region("us-east-1")
.event_time("2023-10-01T12:00:00.000Z")
.event_name(Name::ObjectCreatedPut)
.user_identity(Identity {
principal_id: "user123".to_string(),
})
.request_parameters(HashMap::new())
.response_elements(HashMap::new())
.s3(metadata)
.source(source)
.channels(vec![ChannelAdapterType::Webhook.to_string()])
.build()
.expect("failed to create event");
let result = adapter.send(&event).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_notification_system() {
let config = rustfs_event::NotifierConfig {
store_path: "./test_events".to_string(),
channel_capacity: 100,
adapters: vec![AdapterConfig::Webhook(WebhookConfig {
common: Default::default(),
endpoint: "http://localhost:8080/webhook".to_string(),
auth_token: None,
custom_headers: None,
max_retries: 1,
timeout: Some(5),
retry_interval: Some(5),
client_cert: None,
client_key: None,
})],
};
let system = Arc::new(tokio::sync::Mutex::new(NotifierSystem::new(config.clone()).await.unwrap()));
let adapters: Vec<Arc<dyn ChannelAdapter>> = vec![Arc::new(WebhookAdapter::new(WebhookConfig {
common: Default::default(),
endpoint: "http://localhost:8080/webhook".to_string(),
auth_token: None,
custom_headers: None,
max_retries: 1,
timeout: Some(5),
retry_interval: Some(5),
client_cert: None,
client_key: None,
}))];
// create an s3 metadata object
let metadata = Metadata {
schema_version: "1.0".to_string(),
configuration_id: "test-config".to_string(),
bucket: Bucket {
name: "my-bucket".to_string(),
owner_identity: Identity {
principal_id: "owner123".to_string(),
},
arn: "arn:aws:s3:::my-bucket".to_string(),
},
object: Object {
key: "test.txt".to_string(),
size: Some(1024),
etag: Some("abc123".to_string()),
content_type: Some("text/plain".to_string()),
user_metadata: None,
version_id: None,
sequencer: "1234567890".to_string(),
},
};
// create source object
let source = Source {
host: "localhost".to_string(),
port: "80".to_string(),
user_agent: "curl/7.68.0".to_string(),
};
// create a preconfigured builder with objects
let event = EventBuilder::for_object_creation(metadata, source)
.user_identity(Identity {
principal_id: "user123".to_string(),
})
.event_time("2023-10-01T12:00:00.000Z")
.channels(vec![ChannelAdapterType::Webhook.to_string()])
.build()
.expect("failed to create event");
{
let system_lock = system.lock().await;
system_lock.send_event(event).await.unwrap();
}
let system_clone = Arc::clone(&system);
let system_handle = tokio::spawn(async move {
let mut system = system_clone.lock().await;
system.start(adapters).await
});
// set 10 seconds timeout
match tokio::time::timeout(std::time::Duration::from_secs(10), system_handle).await {
Ok(result) => {
println!("System started successfully");
assert!(result.is_ok());
}
Err(_) => {
println!("System operation timed out, forcing shutdown");
// create a new task to handle the timeout
let system = Arc::clone(&system);
tokio::spawn(async move {
if let Ok(mut guard) = system.try_lock() {
guard.shutdown().await.unwrap();
}
});
// give the system some time to clean up resources
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}

View File

@@ -1,5 +1,5 @@
[package]
name = "rustfs-event"
name = "rustfs-notify"
edition.workspace = true
license.workspace = true
repository.workspace = true

View File

@@ -7,6 +7,7 @@ use crate::{Event, DEFAULT_RETRY_INTERVAL};
use async_trait::async_trait;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::{self, Client, Identity, RequestBuilder};
use serde_json::to_string;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
@@ -105,7 +106,8 @@ impl WebhookAdapter {
} else {
crate::config::default_queue_limit()
};
let store = QueueStore::new(store_path, queue_limit, Some(".event".to_string()));
let name = config.common.identifier.clone();
let store = QueueStore::new(store_path, name, queue_limit, Some(".event".to_string()));
if let Err(e) = store.open() {
tracing::error!("Unable to open queue storage: {}", e);
None

View File

@@ -2,11 +2,9 @@ mod adapter;
mod config;
mod error;
mod event;
mod event_notifier;
mod event_system;
mod global;
mod notifier;
mod store;
mod system;
pub use adapter::create_adapters;
#[cfg(all(feature = "kafka", target_os = "linux"))]
@@ -15,21 +13,21 @@ pub use adapter::kafka::KafkaAdapter;
pub use adapter::mqtt::MqttAdapter;
#[cfg(feature = "webhook")]
pub use adapter::webhook::WebhookAdapter;
pub use adapter::ChannelAdapter;
pub use adapter::ChannelAdapterType;
pub use config::adapter::AdapterCommon;
pub use config::adapter::AdapterConfig;
pub use config::notifier::EventNotifierConfig;
pub use config::{DEFAULT_MAX_RETRIES, DEFAULT_RETRY_INTERVAL};
pub use error::Error;
pub use event::{Bucket, Event, EventBuilder, Identity, Log, Metadata, Name, Object, Source};
pub use store::queue::QueueStore;
#[cfg(all(feature = "kafka", target_os = "linux"))]
pub use config::kafka::KafkaConfig;
#[cfg(feature = "mqtt")]
pub use config::mqtt::MqttConfig;
pub use config::notifier::EventNotifierConfig;
#[cfg(feature = "webhook")]
pub use config::webhook::WebhookConfig;
pub use config::{DEFAULT_MAX_RETRIES, DEFAULT_RETRY_INTERVAL};
pub use error::Error;
pub use event::{Bucket, Event, EventBuilder, Identity, Log, Metadata, Name, Object, Source};
pub use global::{initialize, is_initialized, is_ready, send_event, shutdown};
pub use notifier::NotifierSystem;
pub use store::queue::QueueStore;

View File

@@ -6,79 +6,80 @@ use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn};
use tracing_subscriber::util::SubscriberInitExt;
/// 事件通知器
/// Event Notifier
pub struct EventNotifier {
/// 事件发送通道
/// The event sending channel
sender: mpsc::Sender<Event>,
/// 接收器任务句柄
/// Receiver task handle
task_handle: Option<tokio::task::JoinHandle<()>>,
/// 配置信息
/// Configuration information
config: EventNotifierConfig,
/// 关闭标记
/// Turn off tagging
shutdown: CancellationToken,
/// 关闭通知通道
/// Close the notification channel
shutdown_complete_tx: Option<broadcast::Sender<()>>,
}
impl EventNotifier {
/// 创建新的事件通知器
/// Create a new event notifier
#[instrument(skip_all)]
pub async fn new(store: Arc<ECStore>) -> Result<Self> {
let manager = crate::store::manager::EventManager::new(store);
// 初始化配置
let config = manager.init().await?;
let manager = Arc::new(manager.await);
// 创建适配器
let adapters = manager.create_adapters().await?;
info!("创建了 {} 个适配器", adapters.len());
// Initialize the configuration
let config = manager.clone().init().await?;
// 创建关闭标记
// Create adapters
let adapters = manager.clone().create_adapters().await?;
info!("Created {} adapters", adapters.len());
// Create a close marker
let shutdown = CancellationToken::new();
let (shutdown_complete_tx, _) = broadcast::channel(1);
// 创建事件通道 - 使用默认容量,因为每个适配器都有自己的队列
// 这里使用较小的通道容量,因为事件会被快速分发到适配器
let (sender, mut receiver) = mpsc::channel(100);
let (sender, mut receiver) = mpsc::channel::<Event>(100);
let shutdown_clone = shutdown.clone();
let shutdown_complete_tx_clone = shutdown_complete_tx.clone();
let adapters_clone = adapters.clone();
// 启动事件处理任务
// Start the event processing task
let task_handle = tokio::spawn(async move {
debug!("事件处理任务启动");
debug!("The event processing task starts");
loop {
tokio::select! {
Some(event) = receiver.recv() => {
debug!("收到事件:{}", event.id);
debug!("The event is received:{}", event.id);
// 分发到所有适配器
// Distribute to all adapters
for adapter in &adapters_clone {
let adapter_name = adapter.name();
match adapter.send(&event).await {
Ok(_) => {
debug!("事件 {} 成功发送到适配器 {}", event.id, adapter_name);
debug!("Event {} Successfully sent to the adapter {}", event.id, adapter_name);
}
Err(e) => {
error!("事件 {} 发送到适配器 {} 失败:{}", event.id, adapter_name, e);
error!("Event {} send to adapter {} failed:{}", event.id, adapter_name, e);
}
}
}
}
_ = shutdown_clone.cancelled() => {
info!("接收到关闭信号,事件处理任务停止");
info!("A shutdown signal is received, and the event processing task is stopped");
let _ = shutdown_complete_tx_clone.send(());
break;
}
}
}
debug!("事件处理任务已停止");
debug!("The event processing task has been stopped");
});
Ok(Self {
@@ -90,21 +91,21 @@ impl EventNotifier {
})
}
/// 关闭事件通知器
/// Turn off the event notifier
pub async fn shutdown(&mut self) -> Result<()> {
info!("关闭事件通知器");
info!("Turn off the event notifier");
self.shutdown.cancel();
if let Some(shutdown_tx) = self.shutdown_complete_tx.take() {
let mut rx = shutdown_tx.subscribe();
// 等待关闭完成信号或超时
// Wait for the shutdown to complete the signal or time out
tokio::select! {
_ = rx.recv() => {
debug!("收到关闭完成信号");
debug!("A shutdown completion signal is received");
}
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
warn!("关闭超时,强制终止");
warn!("Shutdown timeout and forced termination");
}
}
}
@@ -112,30 +113,30 @@ impl EventNotifier {
if let Some(handle) = self.task_handle.take() {
handle.abort();
match handle.await {
Ok(_) => debug!("事件处理任务已正常终止"),
Ok(_) => debug!("The event processing task has been terminated gracefully"),
Err(e) => {
if e.is_cancelled() {
debug!("事件处理任务已取消");
debug!("The event processing task has been canceled");
} else {
error!("等待事件处理任务终止时出错:{}", e);
error!("An error occurred while waiting for the event processing task to terminate:{}", e);
}
}
}
}
info!("事件通知器已完全关闭");
info!("The event notifier is completely turned off");
Ok(())
}
/// 发送事件
/// Send events
pub async fn send(&self, event: Event) -> Result<()> {
self.sender
.send(event)
.await
.map_err(|e| Error::msg(format!("发送事件到通道失败:{}", e)))
.map_err(|e| Error::msg(format!("Failed to send events to channel:{}", e)))
}
/// 获取当前配置
/// Get the current configuration
pub fn config(&self) -> &EventNotifierConfig {
&self.config
}

View File

@@ -1,5 +1,4 @@
use common::error::{Error, Result};
use ecstore::utils::path::dir;
use serde::{de::DeserializeOwned, Serialize};
use snap::raw::{Decoder, Encoder};
use std::collections::HashMap;
@@ -168,7 +167,7 @@ where
}
Self {
directory: directory.into(),
directory: directory.as_ref().to_path_buf(),
name,
entry_limit: limit,
file_ext: ext,
@@ -298,7 +297,10 @@ where
// Update the item mapping
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as i64;
let mut entries = self.entries.write().map_err(|_| Error::msg("获取写锁失败"))?;
let mut entries = self
.entries
.write()
.map_err(|_| Error::msg("Failed to obtain a write lock"))?;
entries.insert(key.to_string(), now);
Ok(())
@@ -323,48 +325,24 @@ where
Ok(data)
}
}
/// Check whether the storage limit is reached
fn check_entry_limit(&self) -> Result<()> {
let entries = self.entries.read().map_err(|_| Error::msg("Failed to obtain a read lock"))?;
if entries.len() as u64 >= self.entry_limit {
return Err(Error::msg("The storage limit has been reached"));
}
Ok(())
}
}
impl<T> Store<T> for QueueStore<T>
where
T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
{
fn open(&self) -> Result<()> {
// Create a directory (if it doesn't exist)
fs::create_dir_all(&self.directory)?;
// Read existing files
let files = self.list_files()?;
let mut entries = self
.entries
.write()
.map_err(|_| Error::msg("Failed to obtain a write lock"))?;
for file in files {
if let Ok(meta) = file.metadata() {
if let Ok(modified) = meta.modified() {
if let Ok(since_epoch) = modified.duration_since(UNIX_EPOCH) {
entries.insert(file.file_name().to_string_lossy().to_string(), since_epoch.as_nanos() as i64);
}
}
}
}
Ok(())
}
fn delete(&self) -> Result<()> {
fs::remove_dir_all(&self.directory)?;
Ok(())
}
fn put(&self, item: T) -> Result<Key> {
{
let entries = self.entries.read().map_err(|_| Error::msg("Failed to obtain a read lock"))?;
if entries.len() as u64 >= self.entry_limit {
return Err(Error::msg("The storage limit has been reached"));
}
self.check_entry_limit()?;
}
// generate a new uuid
@@ -375,17 +353,13 @@ where
Ok(key)
}
fn put_multiple(&self, items: Vec<T>) -> Result<Key> {
if items.is_empty() {
return Err(Error::msg("The list of items is empty"));
}
{
let entries = self.entries.read().map_err(|_| Error::msg("Failed to obtain a read lock"))?;
if entries.len() as u64 >= self.entry_limit {
return Err(Error::msg("The storage limit has been reached"));
}
self.check_entry_limit()?;
}
// Generate a new UUID
@@ -419,19 +393,19 @@ where
// if the read fails try parsing it once
if reader.read_to_end(&mut buffer).is_err() {
// Try to parse the entire data as a single object
match serde_json::from_slice::<T>(&data) {
return match serde_json::from_slice::<T>(&data) {
Ok(item) => {
items.push(item);
return Ok(items);
Ok(items)
}
Err(_) => {
// An attempt was made to resolve to an array of objects
match serde_json::from_slice::<Vec<T>>(&data) {
Ok(array_items) => return Ok(array_items),
Err(e) => return Err(Error::msg(format!("Failed to parse the data:{}", e))),
Ok(array_items) => Ok(array_items),
Err(e) => Err(Error::msg(format!("Failed to parse the data:{}", e))),
}
}
}
};
}
// Read JSON objects by row
@@ -517,4 +491,34 @@ where
Ok(())
}
fn open(&self) -> Result<()> {
// Create a directory (if it doesn't exist)
fs::create_dir_all(&self.directory)?;
// Read existing files
let files = self.list_files()?;
let mut entries = self
.entries
.write()
.map_err(|_| Error::msg("Failed to obtain a write lock"))?;
for file in files {
if let Ok(meta) = file.metadata() {
if let Ok(modified) = meta.modified() {
if let Ok(since_epoch) = modified.duration_since(UNIX_EPOCH) {
entries.insert(file.file_name().to_string_lossy().to_string(), since_epoch.as_nanos() as i64);
}
}
}
}
Ok(())
}
fn delete(&self) -> Result<()> {
fs::remove_dir_all(&self.directory)?;
Ok(())
}
}

View File

@@ -1,81 +1,81 @@
use crate::config::notifier::EventNotifierConfig;
use crate::event_notifier::EventNotifier;
use crate::notifier::EventNotifier;
use common::error::Result;
use ecstore::store::ECStore;
use once_cell::sync::OnceCell;
use std::sync::{Arc, Mutex};
use tracing::{debug, error, info};
/// 全局事件系统
/// Global event system
pub struct EventSystem {
/// 事件通知器
/// Event Notifier
notifier: Mutex<Option<EventNotifier>>,
}
impl EventSystem {
/// 创建一个新的事件系统
/// Create a new event system
pub fn new() -> Self {
Self {
notifier: Mutex::new(None),
}
}
/// 初始化事件系统
/// Initialize the event system
pub async fn init(&self, store: Arc<ECStore>) -> Result<EventNotifierConfig> {
info!("初始化事件系统");
info!("Initialize the event system");
let notifier = EventNotifier::new(store).await?;
let config = notifier.config().clone();
let mut guard = self
.notifier
.lock()
.map_err(|e| common::error::Error::msg(format!("获取锁失败:{}", e)))?;
.map_err(|e| common::error::Error::msg(format!("Failed to acquire locks:{}", e)))?;
*guard = Some(notifier);
debug!("事件系统初始化完成");
debug!("The event system initialization is complete");
Ok(config)
}
/// 发送事件
/// Send events
pub async fn send_event(&self, event: crate::Event) -> Result<()> {
let guard = self
.notifier
.lock()
.map_err(|e| common::error::Error::msg(format!("获取锁失败:{}", e)))?;
.map_err(|e| common::error::Error::msg(format!("Failed to acquire locks:{}", e)))?;
if let Some(notifier) = &*guard {
notifier.send(event).await
} else {
error!("事件系统未初始化");
Err(common::error::Error::msg("事件系统未初始化"))
error!("The event system is not initialized");
Err(common::error::Error::msg("The event system is not initialized"))
}
}
/// 关闭事件系统
/// Shut down the event system
pub async fn shutdown(&self) -> Result<()> {
info!("关闭事件系统");
info!("Shut down the event system");
let mut guard = self
.notifier
.lock()
.map_err(|e| common::error::Error::msg(format!("获取锁失败:{}", e)))?;
.map_err(|e| common::error::Error::msg(format!("Failed to acquire locks:{}", e)))?;
if let Some(ref mut notifier) = *guard {
notifier.shutdown().await?;
*guard = None;
info!("事件系统已关闭");
info!("The event system is down");
Ok(())
} else {
debug!("事件系统已经关闭");
debug!("The event system has been shut down");
Ok(())
}
}
}
/// 全局事件系统实例
/// A global event system instance
pub static GLOBAL_EVENT_SYS: OnceCell<EventSystem> = OnceCell::new();
/// 初始化全局事件系统
/// Initialize the global event system
pub fn init_global_event_system() -> &'static EventSystem {
GLOBAL_EVENT_SYS.get_or_init(EventSystem::new)
}

View File

@@ -78,11 +78,7 @@ impl KVS {
KVS(Vec::new())
}
pub fn get(&self, key: &str) -> String {
if let Some(v) = self.lookup(key) {
v
} else {
"".to_owned()
}
if let Some(v) = self.lookup(key) { v } else { "".to_owned() }
}
pub fn lookup(&self, key: &str) -> Option<String> {
for kv in self.0.iter() {

View File

@@ -53,7 +53,7 @@ protos.workspace = true
query = { workspace = true }
rmp-serde.workspace = true
rustfs-config = { workspace = true }
rustfs-event = { workspace = true }
rustfs-notify = { workspace = true }
rustfs-obs = { workspace = true }
rustfs-utils = { workspace = true, features = ["full"] }
rustls.workspace = true

View File

@@ -1,4 +1,5 @@
use rustfs_event::NotifierConfig;
use rustfs_config::NotifierConfig;
use rustfs_event::EventNotifierConfig;
use tracing::{error, info, instrument};
#[instrument]
@@ -7,26 +8,26 @@ pub(crate) async fn init_event_notifier(notifier_config: Option<String>) {
let notifier_config_present = notifier_config.is_some();
let config = if notifier_config_present {
info!("event_config is not empty, path: {:?}", notifier_config);
NotifierConfig::event_load_config(notifier_config)
EventNotifierConfig::event_load_config(notifier_config)
} else {
info!("event_config is empty");
// rustfs_event::get_event_notifier_config().clone()
NotifierConfig::default()
EventNotifierConfig::default()
};
info!("using event_config: {:?}", config);
tokio::spawn(async move {
let result = rustfs_event::initialize(&config).await;
match result {
Ok(_) => info!(
"event notifier initialized successfully {}",
if notifier_config_present {
"by config file"
} else {
"by sys config"
}
),
Err(e) => error!("Failed to initialize event notifier: {}", e),
}
// let result = rustfs_event::initialize(&config).await;
// match result {
// Ok(_) => info!(
// "event notifier initialized successfully {}",
// if notifier_config_present {
// "by config file"
// } else {
// "by sys config"
// }
// ),
// Err(e) => error!("Failed to initialize event notifier: {}", e),
// }
});
}

View File

@@ -569,14 +569,14 @@ async fn run(opt: config::Opt) -> Result<()> {
// update the status to stopping first
state_manager.update(ServiceState::Stopping);
// Stop the notification system
if rustfs_event::is_ready() {
// stop event notifier
rustfs_event::shutdown().await.map_err(|err| {
error!("Failed to shut down the notification system: {}", err);
Error::from_string(err.to_string())
})?;
}
// // Stop the notification system
// if rustfs_event::is_ready() {
// // stop event notifier
// rustfs_event::shutdown().await.map_err(|err| {
// error!("Failed to shut down the notification system: {}", err);
// Error::from_string(err.to_string())
// })?;
// }
info!("Server is stopping...");
let _ = shutdown_tx.send(());

View File

@@ -13,5 +13,6 @@ pub(crate) fn create_metadata() -> Metadata {
/// Create a new event object
#[allow(dead_code)]
pub(crate) async fn send_event(event: Event) -> Result<(), Box<dyn std::error::Error>> {
rustfs_event::send_event(event).await.map_err(|e| e.into())
// rustfs_event::send_event(event).await.map_err(|e| e.into())
Ok(())
}