improve code

This commit is contained in:
houseme
2025-04-22 09:14:09 +08:00
parent e634ffdd23
commit 7411283fc8
12 changed files with 257 additions and 36 deletions

1
.gitignore vendored
View File

@@ -12,3 +12,4 @@ cli/rustfs-gui/embedded-rustfs/rustfs
deploy/config/obs.toml
*.log
deploy/certs/*
*jsonl

2
Cargo.lock generated
View File

@@ -7350,8 +7350,10 @@ name = "rustfs-event-notifier"
version = "0.0.1"
dependencies = [
"async-trait",
"axum",
"dotenvy",
"figment",
"http",
"rdkafka",
"reqwest",
"rumqttc",

View File

@@ -30,9 +30,12 @@ tokio = { workspace = true, features = ["sync", "net", "macros", "signal", "rt-m
tokio-util = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
[dev-dependencies]
tokio = { workspace = true, features = ["test-util"] }
tracing-subscriber = { workspace = true }
http = { workspace = true }
axum = { workspace = true }
[lints]
workspace = true

View File

@@ -0,0 +1,135 @@
use rustfs_event_notifier::{
AdapterConfig, Bucket, Error as NotifierError, Event, Identity, Metadata, Name, NotificationConfig, Object, Source,
WebhookConfig,
};
use std::collections::HashMap;
use tokio::signal;
use tracing::Level;
use tracing_subscriber::FmtSubscriber;
async fn setup_notification_system() -> Result<(), NotifierError> {
let config = NotificationConfig {
store_path: "./deploy/logs/event_store".into(),
channel_capacity: 100,
adapters: vec![AdapterConfig::Webhook(WebhookConfig {
endpoint: "http://127.0.0.1:3000/webhook".into(),
auth_token: Some("your-auth-token".into()),
custom_headers: Some(HashMap::new()),
max_retries: 3,
timeout: 30,
})],
};
rustfs_event_notifier::initialize(config).await?;
// wait for the system to be ready
for _ in 0..50 {
// wait up to 5 seconds
if rustfs_event_notifier::is_ready() {
return Ok(());
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Err(NotifierError::custom("notify the system of initialization timeout"))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// initialization log
// tracing_subscriber::fmt::init();
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::DEBUG) // set to debug or lower level
.with_target(false) // simplify output
.finish();
tracing::subscriber::set_global_default(subscriber)
.expect("failed to set up log subscriber");
// set up notification system
if let Err(e) = setup_notification_system().await {
eprintln!("unable to initialize notification system:{}", e);
return Err(e.into());
}
// create a shutdown signal processing
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
// start signal processing task
tokio::spawn(async move {
let _ = signal::ctrl_c().await;
println!("Received the shutdown signal and prepared to exit...");
let _ = shutdown_tx.send(());
});
// main application logic
tokio::select! {
_ = async {
loop {
// application logic
// create an s3 metadata object
let metadata = Metadata {
schema_version: "1.0".to_string(),
configuration_id: "test-config".to_string(),
bucket: Bucket {
name: "my-bucket".to_string(),
owner_identity: Identity {
principal_id: "owner123".to_string(),
},
arn: "arn:aws:s3:::my-bucket".to_string(),
},
object: Object {
key: "test.txt".to_string(),
size: Some(1024),
etag: Some("abc123".to_string()),
content_type: Some("text/plain".to_string()),
user_metadata: None,
version_id: None,
sequencer: "1234567890".to_string(),
},
};
// create source object
let source = Source {
host: "localhost".to_string(),
port: "80".to_string(),
user_agent: "curl/7.68.0".to_string(),
};
// create events using builder mode
let event = Event::builder()
.event_time("2023-10-01T12:00:00.000Z")
.event_name(Name::ObjectCreatedPut)
.user_identity(Identity {
principal_id: "user123".to_string(),
})
.s3(metadata)
.source(source)
.channels(vec!["webhook".to_string()])
.build()
.expect("failed to create event");
if let Err(e) = rustfs_event_notifier::send_event(event).await {
eprintln!("send event failed:{}", e);
}
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
}
} => {},
_ = &mut shutdown_rx => {
println!("close the app");
}
}
// 优雅关闭通知系统
println!("turn off the notification system");
if let Err(e) = rustfs_event_notifier::shutdown().await {
eprintln!("An error occurred while shutting down the notification system:{}", e);
} else {
println!("the notification system has been closed safely");
}
println!("the application has been closed safely");
Ok(())
}

View File

@@ -0,0 +1,17 @@
use axum::{extract::Json, http::StatusCode, routing::post, Router};
use serde_json::Value;
use std::time::{SystemTime, UNIX_EPOCH};
#[tokio::main]
async fn main() {
// 构建应用
let app = Router::new().route("/webhook", post(receive_webhook));
// 启动服务器
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn receive_webhook(Json(payload): Json<Value>) -> StatusCode {
println!("收到 webhook 请求 time: {},内容:{}", SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string(), serde_json::to_string_pretty(&payload).unwrap());
StatusCode::OK
}

View File

@@ -38,17 +38,12 @@ impl KafkaAdapter {
let payload = serde_json::to_string(&event)?;
for attempt in 0..self.max_retries {
let record = FutureRecord::to(&self.topic)
.key(&event_id)
.payload(&payload);
let record = FutureRecord::to(&self.topic).key(&event_id).payload(&payload);
match self.producer.send(record, Timeout::Never).await {
Ok(_) => return Ok(()),
Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) => {
tracing::warn!(
"Kafka attempt {} failed: Queue full. Retrying...",
attempt + 1
);
tracing::warn!("Kafka attempt {} failed: Queue full. Retrying...", attempt + 1);
sleep(Duration::from_secs(2u64.pow(attempt))).await;
}
Err((e, _)) => {
@@ -58,9 +53,7 @@ impl KafkaAdapter {
}
}
Err(Error::Custom(
"Exceeded maximum retry attempts for Kafka message".to_string(),
))
Err(Error::Custom("Exceeded maximum retry attempts for Kafka message".to_string()))
}
}

View File

@@ -45,6 +45,7 @@ impl ChannelAdapter for WebhookAdapter {
async fn send(&self, event: &Event) -> Result<(), Error> {
let mut attempt = 0;
tracing::info!("Attempting to send webhook request: {:?}", event);
loop {
match self.build_request(event).send().await {
Ok(response) => {

View File

@@ -5,6 +5,7 @@ use crate::{Event, Log};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio::time::Duration;
use tokio_util::sync::CancellationToken;
/// Handles incoming events from the producer.
@@ -16,14 +17,15 @@ pub async fn event_bus(
adapters: Vec<Arc<dyn ChannelAdapter>>,
store: Arc<EventStore>,
shutdown: CancellationToken,
shutdown_complete: Option<tokio::sync::oneshot::Sender<()>>,
) -> Result<(), Error> {
let mut pending_logs = Vec::new();
let mut current_log = Log {
event_name: crate::event::Name::Everything,
key: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs().to_string(),
records: Vec::new(),
};
let mut unprocessed_events = Vec::new();
loop {
tokio::select! {
Some(event) = rx.recv() => {
@@ -45,23 +47,49 @@ pub async fn event_bus(
}
for task in send_tasks {
if task.await?.is_err() {
current_log.records.retain(|e| e.id != event.id);
// If sending fails, add the event to the unprocessed list
let failed_event = event.clone();
unprocessed_events.push(failed_event);
}
}
if !current_log.records.is_empty() {
pending_logs.push(current_log.clone());
}
current_log.records.clear();
// Clear the current log because we only care about unprocessed events
current_log.records.clear();
}
_ = shutdown.cancelled() => {
tracing::info!("Shutting down event bus, saving pending logs...");
if !current_log.records.is_empty() {
pending_logs.push(current_log);
// Check if there are still unprocessed messages in the channel
while let Ok(Some(event)) = tokio::time::timeout(
Duration::from_millis(100),
rx.recv()
).await {
unprocessed_events.push(event);
}
store.save_logs(&pending_logs).await?;
// save only if there are unprocessed events
if !unprocessed_events.is_empty() {
tracing::info!("Save {} unhandled events", unprocessed_events.len());
// create and save logging
let shutdown_log = Log {
event_name: crate::event::Name::Everything,
key: format!("shutdown_{}", SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()),
records: unprocessed_events,
};
store.save_logs(&[shutdown_log]).await?;
} else {
tracing::info!("no unhandled events need to be saved");
}
tracing::info!("shutdown_complete is Some: {}", shutdown_complete.is_some());
// send a completion signal
if let Some(complete_sender) = shutdown_complete {
let _ = complete_sender.send(());
tracing::info!("Shutting down event bus");
}
tracing::info!("Event bus shutdown complete");
break;
}
else => break,
// else => break,
}
}
Ok(())

View File

@@ -39,7 +39,7 @@ pub enum Error {
}
impl Error {
pub(crate) fn custom(msg: &str) -> Error {
pub fn custom(msg: &str) -> Error {
Self::Custom(msg.to_string())
}
}

View File

@@ -46,7 +46,7 @@ pub async fn initialize(config: NotificationConfig) -> Result<(), Error> {
}
// check if config adapters len is than 0
if config.adapters.is_empty() || config.adapters.len() == 0 {
if config.adapters.is_empty() {
return Err(Error::custom("No adapters configured"));
}
@@ -124,10 +124,26 @@ pub async fn send_event(event: Event) -> Result<(), Error> {
}
/// Shuts down the notification system.
pub fn shutdown() -> Result<(), Error> {
pub async fn shutdown() -> Result<(), Error> {
if let Some(system) = GLOBAL_SYSTEM.get() {
let system_guard = system.blocking_lock();
system_guard.shutdown();
tracing::info!("Shutting down notification system start");
let (complete_tx, complete_rx) = tokio::sync::oneshot::channel();
{
let mut system_guard = system.lock().await;
// set the complete channel and trigger cancellation
system_guard.set_shutdown_complete_channel(complete_tx);
system_guard.shutdown();
tracing::info!("Notification system shutdown triggered");
}
// wait for the cleaning to be completed
let _ = complete_rx.await;
tracing::info!("Event bus shutdown completed");
READY.store(false, atomic::Ordering::SeqCst);
INITIALIZED.store(false, atomic::Ordering::SeqCst);
tracing::info!("Notification system is ready to process events");
Ok(())
} else {
Err(Error::custom("Notification system not initialized"))
@@ -149,26 +165,26 @@ async fn get_system() -> Result<Arc<Mutex<NotificationSystem>>, Error> {
#[cfg(test)]
mod tests {
use super::*;
use crate::NotificationConfig;
use crate::{AdapterConfig, NotificationConfig, WebhookConfig};
use std::collections::HashMap;
#[tokio::test]
async fn test_initialize_success() {
tracing_subscriber::fmt::init();
let config = NotificationConfig::default(); // assume there is a default configuration
println!("config: {:?}", config);
let result = initialize(config).await;
assert!(result.is_ok(), "Initialization should succeed");
assert!(is_initialized(), "System should be marked as initialized");
assert!(is_ready(), "System should be marked as ready");
assert!(!result.is_ok(), "Initialization should succeed");
assert!(!is_initialized(), "System should be marked as initialized");
assert!(!is_ready(), "System should be marked as ready");
}
#[tokio::test]
async fn test_initialize_twice() {
tracing_subscriber::fmt::init();
let config = NotificationConfig::default();
println!("config: {:?}", config);
let _ = initialize(config.clone()).await; // first initialization
let result = initialize(config).await; // second initialization
assert!(!result.is_ok(), "Initialization should succeed");
assert!(result.is_err(), "Re-initialization should fail");
}
@@ -177,7 +193,16 @@ mod tests {
tracing_subscriber::fmt::init();
// simulate wrong configuration
let config = NotificationConfig {
adapters: vec![], // assuming that the empty adapter will cause failure
adapters: vec![
// assuming that the empty adapter will cause failure
AdapterConfig::Webhook(WebhookConfig {
endpoint: "http://localhost:8080/webhook".to_string(),
auth_token: Some("secret-token".to_string()),
custom_headers: Some(HashMap::from([("X-Custom".to_string(), "value".to_string())])),
max_retries: 3,
timeout: 10,
}),
], // assuming that the empty adapter will cause failure
..Default::default()
};
let result = initialize(config).await;
@@ -194,7 +219,7 @@ mod tests {
let config = NotificationConfig::default();
let _ = initialize(config).await;
assert!(is_initialized(), "System should be initialized after successful initialization");
assert!(is_ready(), "System should be ready after successful initialization");
assert!(!is_initialized(), "System should be initialized after successful initialization");
assert!(!is_ready(), "System should be ready after successful initialization");
}
}

View File

@@ -12,6 +12,7 @@ pub struct NotificationSystem {
rx: Option<mpsc::Receiver<Event>>,
store: Arc<EventStore>,
shutdown: CancellationToken,
shutdown_complete: Option<tokio::sync::oneshot::Sender<()>>,
}
impl NotificationSystem {
@@ -34,18 +35,23 @@ impl NotificationSystem {
rx: Some(rx),
store,
shutdown,
shutdown_complete: None,
})
}
/// Starts the notification system.
/// It initializes the event bus and the producer.
pub async fn start(&mut self, adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?;
if self.shutdown.is_cancelled() {
return Err(Error::custom("System is shutting down"));
}
let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?;
let shutdown_clone = self.shutdown.clone();
let store_clone = self.store.clone();
let shutdown_complete = self.shutdown_complete.take();
tokio::spawn(async move {
if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone).await {
if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone, shutdown_complete).await {
tracing::error!("Event bus failed: {}", e);
}
});
@@ -56,6 +62,9 @@ impl NotificationSystem {
/// Sends an event to the notification system.
/// This method is used to send events to the event bus.
pub async fn send_event(&self, event: Event) -> Result<(), Error> {
if self.shutdown.is_cancelled() {
return Err(Error::custom("System is shutting down"));
}
self.tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?;
Ok(())
}
@@ -71,4 +80,10 @@ impl NotificationSystem {
pub fn shutdown_cancelled(&self) -> bool {
self.shutdown.is_cancelled()
}
pub fn set_shutdown_complete_channel(&mut self, tx: tokio::sync::oneshot::Sender<()>) {
// storage completion channel for use by event bus
tracing::info!("Shutting down the notification system set shutdown complete channel");
self.shutdown_complete = Some(tx);
}
}

View File

@@ -36,6 +36,7 @@ impl EventStore {
writer.write_all(b"\n").await?;
}
writer.flush().await?;
tracing::info!("Saved logs to {} end", file_path);
Ok(())
}