improve code notify

This commit is contained in:
houseme
2025-06-23 12:47:58 +08:00
parent 5155a3d544
commit 7bb7f9e309
18 changed files with 306 additions and 304 deletions

1
Cargo.lock generated
View File

@@ -8389,6 +8389,7 @@ dependencies = [
"axum",
"chrono",
"const-str",
"dashmap 6.1.0",
"ecstore",
"form_urlencoded",
"once_cell",

View File

@@ -87,6 +87,7 @@ clap = { version = "4.5.40", features = ["derive", "env"] }
config = "0.15.11"
const-str = { version = "0.6.2", features = ["std", "proc"] }
crc32fast = "1.4.2"
dashmap = "6.1.0"
datafusion = "46.0.1"
derive_builder = "0.20.2"
dioxus = { version = "0.6.3", features = ["router"] }

View File

@@ -11,6 +11,7 @@ rustfs-utils = { workspace = true, features = ["path", "sys"] }
async-trait = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
const-str = { workspace = true }
dashmap = { workspace = true }
ecstore = { workspace = true }
form_urlencoded = { workspace = true }
once_cell = { workspace = true }

View File

@@ -1,3 +1,4 @@
use std::sync::Arc;
use ecstore::config::{Config, ENABLE_KEY, ENABLE_ON, KV, KVS};
use rustfs_notify::arn::TargetID;
use rustfs_notify::factory::{
@@ -159,10 +160,8 @@ async fn main() -> Result<(), NotificationError> {
system.load_bucket_notification_config("my-bucket", &bucket_config).await?;
info!("\n---> Sending an event...");
let event = Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut);
system
.send_event("my-bucket", "s3:ObjectCreated:Put", "document.pdf", event)
.await;
let event = Arc::new(Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut));
system.send_event(event).await;
info!("✅ Event sent. Only the Webhook target should receive it. Check logs for warnings about the missing MQTT target.");
tokio::time::sleep(Duration::from_secs(2)).await;

View File

@@ -1,3 +1,4 @@
use std::sync::Arc;
use ecstore::config::{Config, ENABLE_KEY, ENABLE_ON, KV, KVS};
// Using Global Accessories
use rustfs_notify::arn::TargetID;
@@ -153,10 +154,8 @@ async fn main() -> Result<(), NotificationError> {
// --- Send events ---
info!("\n---> Sending an event...");
let event = Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut);
system
.send_event("my-bucket", "s3:ObjectCreated:Put", "document.pdf", event)
.await;
let event = Arc::new(Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut));
system.send_event(event).await;
info!("✅ Event sent. Both Webhook and MQTT targets should receive it.");
tokio::time::sleep(Duration::from_secs(2)).await;

View File

@@ -1,5 +1,6 @@
use std::io;
use thiserror::Error;
use crate::arn::TargetID;
/// Error types for the store
#[derive(Debug, Error)]
@@ -96,8 +97,20 @@ pub enum NotificationError {
#[error("Notification system has already been initialized")]
AlreadyInitialized,
#[error("Io error: {0}")]
#[error("I/O error: {0}")]
Io(std::io::Error),
#[error("Failed to read configuration: {0}")]
ReadConfig(String),
#[error("Failed to save configuration: {0}")]
SaveConfig(String),
#[error("Target '{0}' not found")]
TargetNotFound(TargetID),
#[error("Server not initialized")]
ServerNotInitialized,
}
impl From<url::ParseError> for TargetError {

View File

@@ -445,29 +445,20 @@ impl Event {
};
let mut resp_elements = args.resp_elements.clone();
resp_elements
.entry("x-amz-request-id".to_string())
.or_insert_with(|| "".to_string());
resp_elements
.entry("x-amz-id-2".to_string())
.or_insert_with(|| "".to_string());
// ... Filling of other response elements
initialize_response_elements(&mut resp_elements, &["x-amz-request-id", "x-amz-id-2"]);
// URL encoding of object keys
let key_name = form_urlencoded::byte_serialize(args.object.name.as_bytes()).collect::<String>();
let principal_id = args.req_params.get("principalId").cloned().unwrap_or_default();
let owner_identity = Identity {
principal_id: principal_id.clone(),
};
let user_identity = Identity { principal_id };
let principal_id = args.req_params.get("principalId").unwrap_or(&String::new()).to_string();
let mut s3_metadata = Metadata {
schema_version: "1.0".to_string(),
configuration_id: "Config".to_string(), // or from args
bucket: Bucket {
name: args.bucket_name.clone(),
owner_identity,
owner_identity: Identity {
principal_id: principal_id.clone(),
},
arn: format!("arn:aws:s3:::{}", args.bucket_name),
},
object: Object {
@@ -503,7 +494,7 @@ impl Event {
aws_region: args.req_params.get("region").cloned().unwrap_or_default(),
event_time: event_time.and_utc(),
event_name: args.event_name,
user_identity,
user_identity: Identity { principal_id },
request_parameters: args.req_params,
response_elements: resp_elements,
s3: s3_metadata,
@@ -516,6 +507,12 @@ impl Event {
}
}
fn initialize_response_elements(elements: &mut HashMap<String, String>, keys: &[&str]) {
for key in keys {
elements.entry(key.to_string()).or_default();
}
}
/// Represents a log of events for sending to targets
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventLog {

View File

@@ -52,9 +52,7 @@ impl Notifier {
}
// Create an event and send it
let event = Event::new(args.clone());
notification_sys
.send_event(&args.bucket_name, &args.event_name.as_str(), &args.object.name.clone(), event)
.await;
let event = Arc::new(Event::new(args));
notification_sys.send_event(event).await;
}
}

View File

@@ -1,7 +1,7 @@
use crate::arn::TargetID;
use crate::store::{Key, Store};
use crate::{
error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, Event,
error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, Event, EventName,
StoreError, Target,
};
use ecstore::config::{Config, KVS};
@@ -173,6 +173,38 @@ impl NotificationSystem {
self.notifier.target_list().read().await.keys()
}
/// Checks if there are active subscribers for the given bucket and event name.
pub async fn has_subscriber(&self, bucket: &str, event_name: &EventName) -> bool {
self.notifier.has_subscriber(bucket, event_name).await
}
async fn update_config_and_reload<F>(&self, mut modifier: F) -> Result<(), NotificationError>
where
F: FnMut(&mut Config) -> bool, // The closure returns a boolean value indicating whether the configuration has been changed
{
let Some(store) = ecstore::global::new_object_layer_fn() else {
return Err(NotificationError::ServerNotInitialized);
};
let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone())
.await
.map_err(|e| NotificationError::ReadConfig(e.to_string()))?;
if !modifier(&mut new_config) {
// If the closure indication has not changed, return in advance
info!("Configuration not changed, skipping save and reload.");
return Ok(());
}
if let Err(e) = ecstore::config::com::save_server_config(store, &new_config).await {
error!("Failed to save config: {}", e);
return Err(NotificationError::SaveConfig(e.to_string()));
}
info!("Configuration updated. Reloading system...");
self.reload_config(new_config).await
}
/// Accurately remove a Target and its related resources through TargetID.
///
/// This process includes:
@@ -188,43 +220,23 @@ impl NotificationSystem {
pub async fn remove_target(&self, target_id: &TargetID, target_type: &str) -> Result<(), NotificationError> {
info!("Attempting to remove target: {}", target_id);
let Some(store) = ecstore::global::new_object_layer_fn() else {
return Err(NotificationError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
"errServerNotInitialized",
)));
};
let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone())
.await
.map_err(|e| NotificationError::Configuration(format!("Failed to read notification config: {}", e)))?;
let mut changed = false;
if let Some(targets_of_type) = new_config.0.get_mut(target_type) {
if targets_of_type.remove(&target_id.name).is_some() {
info!("Removed target {} from the configuration.", target_id);
changed = true;
self.update_config_and_reload(|config| {
let mut changed = false;
if let Some(targets_of_type) = config.0.get_mut(target_type) {
if targets_of_type.remove(&target_id.name).is_some() {
info!("Remove target from configuration {}", target_id);
changed = true;
}
if targets_of_type.is_empty() {
config.0.remove(target_type);
}
}
if targets_of_type.is_empty() {
new_config.0.remove(target_type);
if !changed {
warn!("Target {} not found in configuration", target_id);
}
}
if !changed {
warn!("Target {} was not found in the configuration.", target_id);
return Ok(());
}
if let Err(e) = ecstore::config::com::save_server_config(store, &new_config).await {
error!("Failed to save config for target removal: {}", e);
return Err(NotificationError::Configuration(format!("Failed to save config: {}", e)));
}
info!(
"Configuration updated and persisted for target {} removal. Reloading system...",
target_id
);
self.reload_config(new_config).await
changed
})
.await
}
/// Set or update a Target configuration.
@@ -241,49 +253,15 @@ impl NotificationSystem {
/// If the target configuration is invalid, it returns Err(NotificationError::Configuration).
pub async fn set_target_config(&self, target_type: &str, target_name: &str, kvs: KVS) -> Result<(), NotificationError> {
info!("Setting config for target {} of type {}", target_name, target_type);
// 1. Get the storage handle
let Some(store) = ecstore::global::new_object_layer_fn() else {
return Err(NotificationError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
"errServerNotInitialized",
)));
};
// 2. Read the latest configuration from storage
let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone())
.await
.map_err(|e| NotificationError::Configuration(format!("Failed to read notification config: {}", e)))?;
// 3. Modify the configuration copy
new_config
.0
.entry(target_type.to_string())
.or_default()
.insert(target_name.to_string(), kvs);
// 4. Persist the new configuration
if let Err(e) = ecstore::config::com::save_server_config(store, &new_config).await {
error!("Failed to save notification config: {}", e);
return Err(NotificationError::Configuration(format!("Failed to save notification config: {}", e)));
}
// 5. After the persistence is successful, the system will be reloaded to apply changes.
match self.reload_config(new_config).await {
Ok(_) => {
info!(
"Target {} of type {} configuration updated and reloaded successfully",
target_name, target_type
);
Ok(())
}
Err(e) => {
error!("Failed to reload config for target {} of type {}: {}", target_name, target_type, e);
Err(NotificationError::Configuration(format!(
"Configuration saved, but failed to reload: {}",
e
)))
}
}
self.update_config_and_reload(|config| {
config
.0
.entry(target_type.to_string())
.or_default()
.insert(target_name.to_string(), kvs.clone());
true // The configuration is always modified
})
.await
}
/// Removes all notification configurations for a bucket.
@@ -305,42 +283,22 @@ impl NotificationSystem {
/// If the target configuration does not exist, it returns Ok(()) without making any changes.
pub async fn remove_target_config(&self, target_type: &str, target_name: &str) -> Result<(), NotificationError> {
info!("Removing config for target {} of type {}", target_name, target_type);
let Some(store) = ecstore::global::new_object_layer_fn() else {
return Err(NotificationError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
"errServerNotInitialized",
)));
};
let mut new_config = ecstore::config::com::read_config_without_migrate(store.clone())
.await
.map_err(|e| NotificationError::Configuration(format!("Failed to read notification config: {}", e)))?;
let mut changed = false;
if let Some(targets) = new_config.0.get_mut(target_type) {
if targets.remove(target_name).is_some() {
changed = true;
self.update_config_and_reload(|config| {
let mut changed = false;
if let Some(targets) = config.0.get_mut(target_type) {
if targets.remove(target_name).is_some() {
changed = true;
}
if targets.is_empty() {
config.0.remove(target_type);
}
}
if targets.is_empty() {
new_config.0.remove(target_type);
if !changed {
info!("Target {} of type {} not found, no changes made.", target_name, target_type);
}
}
if !changed {
info!("Target {} of type {} not found, no changes made.", target_name, target_type);
return Ok(());
}
if let Err(e) = ecstore::config::com::save_server_config(store, &new_config).await {
error!("Failed to save config for target removal: {}", e);
return Err(NotificationError::Configuration(format!("Failed to save config: {}", e)));
}
info!(
"Configuration updated and persisted for target {} removal. Reloading system...",
target_name
);
self.reload_config(new_config).await
changed
})
.await
}
/// Enhanced event stream startup function, including monitoring and concurrency control
@@ -355,6 +313,12 @@ impl NotificationSystem {
stream::start_event_stream_with_batching(store, target, metrics, semaphore)
}
/// Update configuration
async fn update_config(&self, new_config: Config) {
let mut config = self.config.write().await;
*config = new_config;
}
/// Reloads the configuration
pub async fn reload_config(&self, new_config: Config) -> Result<(), NotificationError> {
info!("Reload notification configuration starts");
@@ -367,10 +331,7 @@ impl NotificationSystem {
}
// Update the config
{
let mut config = self.config.write().await;
*config = new_config.clone();
}
self.update_config(new_config.clone()).await;
// Create a new target from configuration
let targets: Vec<Box<dyn Target + Send + Sync>> = self
@@ -459,8 +420,8 @@ impl NotificationSystem {
}
/// Sends an event
pub async fn send_event(&self, bucket_name: &str, event_name: &str, object_key: &str, event: Event) {
self.notifier.send(bucket_name, event_name, object_key, event).await;
pub async fn send_event(&self, event: Arc<Event>) {
self.notifier.send(event).await;
}
/// Obtain system status information

View File

@@ -1,5 +1,6 @@
use crate::arn::TargetID;
use crate::{error::NotificationError, event::Event, rules::RulesMap, target::Target, EventName};
use dashmap::DashMap;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument, warn};
@@ -7,7 +8,7 @@ use tracing::{debug, error, info, instrument, warn};
/// Manages event notification to targets based on rules
pub struct EventNotifier {
target_list: Arc<RwLock<TargetList>>,
bucket_rules_map: Arc<RwLock<HashMap<String, RulesMap>>>,
bucket_rules_map: Arc<DashMap<String, RulesMap>>,
}
impl Default for EventNotifier {
@@ -21,7 +22,7 @@ impl EventNotifier {
pub fn new() -> Self {
EventNotifier {
target_list: Arc::new(RwLock::new(TargetList::new())),
bucket_rules_map: Arc::new(RwLock::new(HashMap::new())),
bucket_rules_map: Arc::new(DashMap::new()),
}
}
@@ -40,8 +41,7 @@ impl EventNotifier {
/// This method removes all rules associated with the specified bucket name.
/// It will log a message indicating the removal of rules.
pub async fn remove_rules_map(&self, bucket_name: &str) {
let mut rules_map = self.bucket_rules_map.write().await;
if rules_map.remove(bucket_name).is_some() {
if self.bucket_rules_map.remove(bucket_name).is_some() {
info!("Removed all notification rules for bucket: {}", bucket_name);
}
}
@@ -58,19 +58,17 @@ impl EventNotifier {
/// Adds a rules map for a bucket
pub async fn add_rules_map(&self, bucket_name: &str, rules_map: RulesMap) {
let mut bucket_rules_guard = self.bucket_rules_map.write().await;
if rules_map.is_empty() {
bucket_rules_guard.remove(bucket_name);
self.bucket_rules_map.remove(bucket_name);
} else {
bucket_rules_guard.insert(bucket_name.to_string(), rules_map);
self.bucket_rules_map.insert(bucket_name.to_string(), rules_map);
}
info!("Added rules for bucket: {}", bucket_name);
}
/// Removes notification rules for a bucket
pub async fn remove_notification(&self, bucket_name: &str) {
let mut bucket_rules_guard = self.bucket_rules_map.write().await;
bucket_rules_guard.remove(bucket_name);
self.bucket_rules_map.remove(bucket_name);
info!("Removed notification rules for bucket: {}", bucket_name);
}
@@ -83,12 +81,34 @@ impl EventNotifier {
info!("Removed all targets and their streams");
}
/// Checks if there are active subscribers for the given bucket and event name.
///
/// # Parameters
/// * `bucket_name` - bucket name.
/// * `event_name` - Event name.
///
/// # Return value
/// Return `true` if at least one matching notification rule exists.
pub async fn has_subscriber(&self, bucket_name: &str, event_name: &EventName) -> bool {
// Rules to check if the bucket exists
if let Some(rules_map) = self.bucket_rules_map.get(bucket_name) {
// A composite event (such as ObjectCreatedAll) is expanded to multiple single events.
// We need to check whether any of these single events have the rules configured.
rules_map.has_subscriber(event_name)
} else {
// If no bucket is found, no subscribers
false
}
}
/// Sends an event to the appropriate targets based on the bucket rules
#[instrument(skip(self, event))]
pub async fn send(&self, bucket_name: &str, event_name: &str, object_key: &str, event: Event) {
let bucket_rules_guard = self.bucket_rules_map.read().await;
if let Some(rules) = bucket_rules_guard.get(bucket_name) {
let target_ids = rules.match_rules(EventName::from(event_name), object_key);
pub async fn send(&self, event: Arc<Event>) {
let bucket_name = &event.s3.bucket.name;
let object_key = &event.s3.object.key;
let event_name = event.event_name;
if let Some(rules) = self.bucket_rules_map.get(bucket_name) {
let target_ids = rules.match_rules(event_name, object_key);
if target_ids.is_empty() {
debug!("No matching targets for event in bucket: {}", bucket_name);
return;

View File

@@ -8,7 +8,6 @@ use crate::rules::NotificationConfiguration;
use crate::EventName;
use std::collections::HashMap;
use std::io::Read;
// Assuming this is the XML config structure
/// Configuration for bucket notifications.
/// This struct now holds the parsed and validated rules in the new RulesMap format.
@@ -98,11 +97,7 @@ impl BucketNotificationConfig {
}
// Expose the RulesMap for the notifier
pub fn get_rules_map(&self) -> &RulesMap {
&self.rules
}
pub fn to_rules_map(&self) -> RulesMap {
pub fn get_rules_map(&self) -> RulesMap {
self.rules.clone()
}

View File

@@ -9,32 +9,43 @@ use std::collections::HashMap;
#[derive(Debug, Clone, Default)]
pub struct RulesMap {
map: HashMap<EventName, PatternRules>,
/// A bitmask that represents the union of all event types in this map.
/// Used for quick checks in `has_subscriber`.
total_events_mask: u64,
}
impl RulesMap {
/// Create a new, empty RulesMap.
pub fn new() -> Self {
Default::default()
}
/// Add rule configuration.
/// event_names: A set of event names。
/// pattern: Object key pattern.
/// target_id: Notify the target.
/// Add a rule configuration to the map.
///
/// This method expands the composite event name.
/// This method handles composite event names (such as `s3:ObjectCreated:*`), expanding them as
/// Multiple specific event types and add rules for each event type.
///
/// # Parameters
/// * `event_names` - List of event names associated with this rule.
/// * `pattern` - Matching pattern for object keys. If empty, the default is `*` (match all).
/// * `target_id` - The target ID of the notification.
pub fn add_rule_config(&mut self, event_names: &[EventName], pattern: String, target_id: TargetID) {
let mut effective_pattern = pattern;
if effective_pattern.is_empty() {
effective_pattern = "*".to_string(); // Match all by default
}
let effective_pattern = if pattern.is_empty() {
"*".to_string() // Match all by default
} else {
pattern
};
for event_name_spec in event_names {
// Expand compound event types, for example ObjectCreatedAll -> [ObjectCreatedPut, ObjectCreatedPost, ...]
for expanded_event_name in event_name_spec.expand() {
// Make sure EventName::expand() returns Vec<EventName>
self.map
.entry(expanded_event_name)
.or_default()
.add(effective_pattern.clone(), target_id.clone());
// Update the total_events_mask to include this event type
self.total_events_mask |= expanded_event_name.mask();
}
}
}
@@ -44,13 +55,17 @@ impl RulesMap {
pub fn add_map(&mut self, other_map: &Self) {
for (event_name, other_pattern_rules) in &other_map.map {
let self_pattern_rules = self.map.entry(*event_name).or_default();
// PatternRules::union 返回新的 PatternRules我们需要修改现有的
// PatternRules::union Returns the new PatternRules, we need to modify the existing ones
let merged_rules = self_pattern_rules.union(other_pattern_rules);
*self_pattern_rules = merged_rules;
}
// Directly merge two masks.
self.total_events_mask |= other_map.total_events_mask;
}
/// Remove another rule defined in the RulesMap from the current RulesMap.
///
/// After the rule is removed, `total_events_mask` is recalculated to ensure its accuracy.
pub fn remove_map(&mut self, other_map: &Self) {
let mut events_to_remove = Vec::new();
for (event_name, self_pattern_rules) in &mut self.map {
@@ -64,10 +79,30 @@ impl RulesMap {
for event_name in events_to_remove {
self.map.remove(&event_name);
}
// After removing the rule, recalculate total_events_mask.
self.recalculate_mask();
}
///Rules matching the given event name and object key, returning all matching TargetIDs.
/// Checks whether any configured rules exist for a given event type.
///
/// This method uses a bitmask for a quick check of O(1) complexity.
/// `event_name` can be a compound type, such as `ObjectCreatedAll`.
pub fn has_subscriber(&self, event_name: &EventName) -> bool {
// event_name.mask() will handle compound events correctly
(self.total_events_mask & event_name.mask()) != 0
}
/// Rules matching the given event and object keys and return all matching target IDs.
///
/// # Notice
/// The `event_name` parameter should be a specific, non-compound event type.
/// Because this is taken from the `Event` object that actually occurs.
pub fn match_rules(&self, event_name: EventName, object_key: &str) -> TargetIdSet {
// Use bitmask to quickly determine whether there is a matching rule
if (self.total_events_mask & event_name.mask()) == 0 {
return TargetIdSet::new(); // No matching rules
}
// First try to directly match the event name
if let Some(pattern_rules) = self.map.get(&event_name) {
let targets = pattern_rules.match_targets(object_key);
@@ -89,6 +124,7 @@ impl RulesMap {
.map_or_else(TargetIdSet::new, |pr| pr.match_targets(object_key))
}
/// Check if RulesMap is empty.
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
@@ -97,4 +133,42 @@ impl RulesMap {
pub fn inner(&self) -> &HashMap<EventName, PatternRules> {
&self.map
}
/// A private helper function that recalculates `total_events_mask` based on the content of the current `map`.
/// Called after the removal operation to ensure the accuracy of the mask.
fn recalculate_mask(&mut self) {
let mut new_mask = 0u64;
for event_name in self.map.keys() {
new_mask |= event_name.mask();
}
self.total_events_mask = new_mask;
}
/// Remove rules and optimize performance
#[allow(dead_code)]
pub fn remove_rule(&mut self, event_name: &EventName, pattern: &str) {
if let Some(pattern_rules) = self.map.get_mut(event_name) {
pattern_rules.rules.remove(pattern);
if pattern_rules.is_empty() {
self.map.remove(event_name);
}
}
self.recalculate_mask(); // Delay calculation mask
}
/// Batch Delete Rules
#[allow(dead_code)]
pub fn remove_rules(&mut self, event_names: &[EventName]) {
for event_name in event_names {
self.map.remove(event_name);
}
self.recalculate_mask(); // Unified calculation of mask after batch processing
}
/// Update rules and optimize performance
#[allow(dead_code)]
pub fn update_rule(&mut self, event_name: EventName, pattern: String, target_id: TargetID) {
self.map.entry(event_name).or_default().add(pattern, target_id);
self.total_events_mask |= event_name.mask(); // Update only the relevant bitmask
}
}

View File

@@ -125,7 +125,7 @@ pub trait Store<T>: Send + Sync {
fn open(&self) -> Result<(), Self::Error>;
/// Stores a single item
fn put(&self, item: T) -> Result<Self::Key, Self::Error>;
fn put(&self, item: Arc<T>) -> Result<Self::Key, Self::Error>;
/// Stores multiple items in a single batch
fn put_multiple(&self, items: Vec<T>) -> Result<Self::Key, Self::Error>;
@@ -195,11 +195,7 @@ impl<T: Serialize + DeserializeOwned + Send + Sync> QueueStore<T> {
/// Reads a file for the given key
fn read_file(&self, key: &Key) -> Result<Vec<u8>, StoreError> {
let path = self.file_path(key);
debug!(
"Reading file for key: {},path: {}",
key.to_string(),
path.display()
);
debug!("Reading file for key: {},path: {}", key.to_string(), path.display());
let data = std::fs::read(&path).map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
StoreError::NotFound
@@ -240,13 +236,11 @@ impl<T: Serialize + DeserializeOwned + Send + Sync> QueueStore<T> {
};
std::fs::write(&path, &data).map_err(StoreError::Io)?;
let modified = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as i64;
let mut entries = self.entries.write().map_err(|_| {
StoreError::Internal("Failed to acquire write lock on entries".to_string())
})?;
let modified = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as i64;
let mut entries = self
.entries
.write()
.map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?;
entries.insert(key.to_string(), modified);
debug!("Wrote event to store: {}", key.to_string());
Ok(())
@@ -265,18 +259,16 @@ where
let entries = std::fs::read_dir(&self.directory).map_err(StoreError::Io)?;
// Get the write lock to update the internal state
let mut entries_map = self.entries.write().map_err(|_| {
StoreError::Internal("Failed to acquire write lock on entries".to_string())
})?;
let mut entries_map = self
.entries
.write()
.map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?;
for entry in entries {
let entry = entry.map_err(StoreError::Io)?;
let metadata = entry.metadata().map_err(StoreError::Io)?;
if metadata.is_file() {
let modified = metadata.modified().map_err(StoreError::Io)?;
let unix_nano = modified
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as i64;
let unix_nano = modified.duration_since(UNIX_EPOCH).unwrap_or_default().as_nanos() as i64;
let file_name = entry.file_name().to_string_lossy().to_string();
entries_map.insert(file_name, unix_nano);
@@ -287,12 +279,13 @@ where
Ok(())
}
fn put(&self, item: T) -> Result<Self::Key, Self::Error> {
fn put(&self, item: Arc<T>) -> Result<Self::Key, Self::Error> {
// Check storage limits
{
let entries = self.entries.read().map_err(|_| {
StoreError::Internal("Failed to acquire read lock on entries".to_string())
})?;
let entries = self
.entries
.read()
.map_err(|_| StoreError::Internal("Failed to acquire read lock on entries".to_string()))?;
if entries.len() as u64 >= self.entry_limit {
return Err(StoreError::LimitExceeded);
@@ -307,8 +300,7 @@ where
compress: true,
};
let data =
serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?;
let data = serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?;
self.write_file(&key, &data)?;
Ok(key)
@@ -317,9 +309,10 @@ where
fn put_multiple(&self, items: Vec<T>) -> Result<Self::Key, Self::Error> {
// Check storage limits
{
let entries = self.entries.read().map_err(|_| {
StoreError::Internal("Failed to acquire read lock on entries".to_string())
})?;
let entries = self
.entries
.read()
.map_err(|_| StoreError::Internal("Failed to acquire read lock on entries".to_string()))?;
if entries.len() as u64 >= self.entry_limit {
return Err(StoreError::LimitExceeded);
@@ -327,9 +320,7 @@ where
}
if items.is_empty() {
// Or return an error, or a special key?
return Err(StoreError::Internal(
"Cannot put_multiple with empty items list".to_string(),
));
return Err(StoreError::Internal("Cannot put_multiple with empty items list".to_string()));
}
let uuid = Uuid::new_v4();
let key = Key {
@@ -348,8 +339,7 @@ where
for item in items {
// If items are Vec<Event>, and Event is large, this could be inefficient.
// The current get_multiple deserializes one by one.
let item_data =
serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?;
let item_data = serde_json::to_vec(&item).map_err(|e| StoreError::Serialization(e.to_string()))?;
buffer.extend_from_slice(&item_data);
// If using JSON array: buffer = serde_json::to_vec(&items)?
}
@@ -374,9 +364,7 @@ where
debug!("Reading items from store for key: {}", key.to_string());
let data = self.read_file(key)?;
if data.is_empty() {
return Err(StoreError::Deserialization(
"Cannot deserialize empty data".to_string(),
));
return Err(StoreError::Deserialization("Cannot deserialize empty data".to_string()));
}
let mut items = Vec::with_capacity(key.item_count);
@@ -395,10 +383,7 @@ where
match deserializer.next() {
Some(Ok(item)) => items.push(item),
Some(Err(e)) => {
return Err(StoreError::Deserialization(format!(
"Failed to deserialize item in batch: {}",
e
)));
return Err(StoreError::Deserialization(format!("Failed to deserialize item in batch: {}", e)));
}
None => {
// Reached end of stream sooner than item_count
@@ -435,7 +420,10 @@ where
std::fs::remove_file(&path).map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
// If file not found, still try to remove from entries map in case of inconsistency
warn!("File not found for key {} during del, but proceeding to remove from entries map.", key.to_string());
warn!(
"File not found for key {} during del, but proceeding to remove from entries map.",
key.to_string()
);
StoreError::NotFound
} else {
StoreError::Io(e)
@@ -443,17 +431,15 @@ where
})?;
// Get the write lock to update the internal state
let mut entries = self.entries.write().map_err(|_| {
StoreError::Internal("Failed to acquire write lock on entries".to_string())
})?;
let mut entries = self
.entries
.write()
.map_err(|_| StoreError::Internal("Failed to acquire write lock on entries".to_string()))?;
if entries.remove(&key.to_string()).is_none() {
// Key was not in the map, could be an inconsistency or already deleted.
// This is not necessarily an error if the file deletion succeeded or was NotFound.
debug!(
"Key {} not found in entries map during del, might have been already removed.",
key
);
debug!("Key {} not found in entries map during del, might have been already removed.", key);
}
debug!("Deleted event from store: {}", key.to_string());
Ok(())
@@ -492,7 +478,6 @@ where
}
fn boxed_clone(&self) -> Box<dyn Store<T, Error = Self::Error, Key = Self::Key> + Send + Sync> {
Box::new(self.clone())
as Box<dyn Store<T, Error = Self::Error, Key = Self::Key> + Send + Sync>
Box::new(self.clone()) as Box<dyn Store<T, Error = Self::Error, Key = Self::Key> + Send + Sync>
}
}

View File

@@ -1,3 +1,4 @@
use std::sync::Arc;
use crate::arn::TargetID;
use crate::store::{Key, Store};
use crate::{Event, StoreError, TargetError};
@@ -21,7 +22,7 @@ pub trait Target: Send + Sync + 'static {
async fn is_active(&self) -> Result<bool, TargetError>;
/// Saves an event (either sends it immediately or stores it for later)
async fn save(&self, event: Event) -> Result<(), TargetError>;
async fn save(&self, event: Arc<Event>) -> Result<(), TargetError>;
/// Sends an event from the store
async fn send_from_store(&self, key: Key) -> Result<(), TargetError>;

View File

@@ -58,24 +58,19 @@ impl MQTTArgs {
match self.broker.scheme() {
"ws" | "wss" | "tcp" | "ssl" | "tls" | "tcps" | "mqtt" | "mqtts" => {}
_ => {
return Err(TargetError::Configuration(
"unknown protocol in broker address".to_string(),
));
return Err(TargetError::Configuration("unknown protocol in broker address".to_string()));
}
}
if !self.queue_dir.is_empty() {
let path = std::path::Path::new(&self.queue_dir);
if !path.is_absolute() {
return Err(TargetError::Configuration(
"mqtt queueDir path should be absolute".to_string(),
));
return Err(TargetError::Configuration("mqtt queueDir path should be absolute".to_string()));
}
if self.qos == QoS::AtMostOnce {
return Err(TargetError::Configuration(
"QoS should be AtLeastOnce (1) or ExactlyOnce (2) if queueDir is set"
.to_string(),
"QoS should be AtLeastOnce (1) or ExactlyOnce (2) if queueDir is set".to_string(),
));
}
}
@@ -107,21 +102,12 @@ impl MQTTTarget {
let target_id = TargetID::new(id.clone(), ChannelTargetType::Mqtt.as_str().to_string());
let queue_store = if !args.queue_dir.is_empty() {
let base_path = PathBuf::from(&args.queue_dir);
let unique_dir_name = format!(
"rustfs-{}-{}-{}",
ChannelTargetType::Mqtt.as_str(),
target_id.name,
target_id.id
)
.replace(":", "_");
let unique_dir_name =
format!("rustfs-{}-{}-{}", ChannelTargetType::Mqtt.as_str(), target_id.name, target_id.id).replace(":", "_");
// Ensure the directory name is valid for filesystem
let specific_queue_path = base_path.join(unique_dir_name);
debug!(target_id = %target_id, path = %specific_queue_path.display(), "Initializing queue store for MQTT target");
let store = crate::store::QueueStore::<Event>::new(
specific_queue_path,
args.queue_limit,
STORE_EXTENSION,
);
let store = crate::store::QueueStore::<Event>::new(specific_queue_path, args.queue_limit, STORE_EXTENSION);
if let Err(e) = store.open() {
error!(
target_id = %target_id,
@@ -130,10 +116,7 @@ impl MQTTTarget {
);
return Err(TargetError::Storage(format!("{}", e)));
}
Some(Box::new(store)
as Box<
dyn Store<Event, Error = StoreError, Key = Key> + Send + Sync,
>)
Some(Box::new(store) as Box<dyn Store<Event, Error = StoreError, Key = Key> + Send + Sync>)
} else {
None
};
@@ -175,18 +158,13 @@ impl MQTTTarget {
debug!(target_id = %target_id_clone, "Initializing MQTT background task.");
let host = args_clone.broker.host_str().unwrap_or("localhost");
let port = args_clone.broker.port().unwrap_or(1883);
let mut mqtt_options = MqttOptions::new(
format!("rustfs_notify_{}", uuid::Uuid::new_v4()),
host,
port,
);
let mut mqtt_options = MqttOptions::new(format!("rustfs_notify_{}", uuid::Uuid::new_v4()), host, port);
mqtt_options
.set_keep_alive(args_clone.keep_alive)
.set_max_packet_size(100 * 1024 * 1024, 100 * 1024 * 1024); // 100MB
if !args_clone.username.is_empty() {
mqtt_options
.set_credentials(args_clone.username.clone(), args_clone.password.clone());
mqtt_options.set_credentials(args_clone.username.clone(), args_clone.password.clone());
}
let (new_client, eventloop) = AsyncClient::new(mqtt_options, 10);
@@ -206,12 +184,8 @@ impl MQTTTarget {
*client_arc.lock().await = Some(new_client.clone());
info!(target_id = %target_id_clone, "Spawning MQTT event loop task.");
let task_handle = tokio::spawn(run_mqtt_event_loop(
eventloop,
connected_arc.clone(),
target_id_clone.clone(),
cancel_rx,
));
let task_handle =
tokio::spawn(run_mqtt_event_loop(eventloop, connected_arc.clone(), target_id_clone.clone(), cancel_rx));
Ok(task_handle)
})
.await
@@ -266,17 +240,13 @@ impl MQTTTarget {
records: vec![event.clone()],
};
let data = serde_json::to_vec(&log)
.map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {}", e)))?;
let data =
serde_json::to_vec(&log).map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {}", e)))?;
// Vec<u8> Convert to String, only for printing logs
let data_string = String::from_utf8(data.clone()).map_err(|e| {
TargetError::Encoding(format!("Failed to convert event data to UTF-8: {}", e))
})?;
debug!(
"Sending event to mqtt target: {}, event log: {}",
self.id, data_string
);
let data_string = String::from_utf8(data.clone())
.map_err(|e| TargetError::Encoding(format!("Failed to convert event data to UTF-8: {}", e)))?;
debug!("Sending event to mqtt target: {}, event log: {}", self.id, data_string);
client
.publish(&self.args.topic, self.args.qos, false, data)
@@ -474,9 +444,7 @@ impl Target for MQTTTarget {
if let Some(handle) = self.bg_task_manager.init_cell.get() {
if handle.is_finished() {
error!(target_id = %self.id, "MQTT background task has finished, possibly due to an error. Target is not active.");
return Err(TargetError::Network(
"MQTT background task terminated".to_string(),
));
return Err(TargetError::Network("MQTT background task terminated".to_string()));
}
}
debug!(target_id = %self.id, "MQTT client not yet initialized or task not running/connected.");
@@ -495,7 +463,7 @@ impl Target for MQTTTarget {
}
#[instrument(skip(self, event), fields(target_id = %self.id))]
async fn save(&self, event: Event) -> Result<(), TargetError> {
async fn save(&self, event: Arc<Event>) -> Result<(), TargetError> {
if let Some(store) = &self.store {
debug!(target_id = %self.id, "Event saved to store start");
// If store is configured, ONLY put the event into the store.
@@ -507,10 +475,7 @@ impl Target for MQTTTarget {
}
Err(e) => {
error!(target_id = %self.id, error = %e, "Failed to save event to store");
return Err(TargetError::Storage(format!(
"Failed to save event to store: {}",
e
)));
return Err(TargetError::Storage(format!("Failed to save event to store: {}", e)));
}
}
} else {
@@ -581,10 +546,7 @@ impl Target for MQTTTarget {
error = %e,
"Failed to get event from store"
);
return Err(TargetError::Storage(format!(
"Failed to get event from store: {}",
e
)));
return Err(TargetError::Storage(format!("Failed to get event from store: {}", e)));
}
};
@@ -608,10 +570,7 @@ impl Target for MQTTTarget {
}
Err(e) => {
error!(target_id = %self.id, error = %e, "Failed to delete event from store after send.");
return Err(TargetError::Storage(format!(
"Failed to delete event from store: {}",
e
)));
return Err(TargetError::Storage(format!("Failed to delete event from store: {}", e)));
}
}

View File

@@ -221,24 +221,24 @@ impl WebhookTarget {
.header("Content-Type", "application/json");
if !self.args.auth_token.is_empty() {
// 分割 auth_token 字符串,检查是否已包含认证类型
// Split auth_token string to check if the authentication type is included
let tokens: Vec<&str> = self.args.auth_token.split_whitespace().collect();
match tokens.len() {
2 => {
// 已经包含认证类型和令牌,如 "Bearer token123"
// Already include authentication type and token, such as "Bearer token123"
req_builder = req_builder.header("Authorization", &self.args.auth_token);
}
1 => {
// 只有令牌,需要添加 "Bearer" 前缀
// Only tokens, need to add "Bearer" prefix
req_builder = req_builder.header("Authorization", format!("Bearer {}", self.args.auth_token));
}
_ => {
// 空字符串或其他情况,不添加认证头
// Empty string or other situations, no authentication header is added
}
}
}
// 发送请求
// Send a request
let resp = req_builder.body(data).send().await.map_err(|e| {
if e.is_timeout() || e.is_connect() {
TargetError::NotConnected
@@ -271,7 +271,7 @@ impl Target for WebhookTarget {
self.id.clone()
}
// 确保 Future Send
// Make sure Future is Send
async fn is_active(&self) -> Result<bool, TargetError> {
let socket_addr = lookup_host(&self.addr)
.await
@@ -296,7 +296,7 @@ impl Target for WebhookTarget {
}
}
async fn save(&self, event: Event) -> Result<(), TargetError> {
async fn save(&self, event: Arc<Event>) -> Result<(), TargetError> {
if let Some(store) = &self.store {
// Call the store method directly, no longer need to acquire the lock
store

View File

@@ -100,7 +100,7 @@ impl UserAgent {
fn get_macos_platform(_sys: &System) -> String {
let binding = System::os_version().unwrap_or("14.5.0".to_string());
let version = binding.split('.').collect::<Vec<&str>>();
let major = version.get(0).unwrap_or(&"14").to_string();
let major = version.first().unwrap_or(&"14").to_string();
let minor = version.get(1).unwrap_or(&"5").to_string();
let patch = version.get(2).unwrap_or(&"0").to_string();

View File

@@ -502,11 +502,9 @@ async fn run(opt: config::Opt) -> Result<()> {
});
// init store
let store = ECStore::new(server_addr.clone(), endpoint_pools.clone())
.await
.inspect_err(|err| {
error!("ECStore::new {:?}", err);
})?;
let store = ECStore::new(server_addr, endpoint_pools.clone()).await.inspect_err(|err| {
error!("ECStore::new {:?}", err);
})?;
ecconfig::init();
// config system configuration