This commit is contained in:
houseme
2025-06-20 10:51:36 +08:00
parent c658d88d25
commit d3cc36f6e0
22 changed files with 353 additions and 544 deletions

3
.gitignore vendored
View File

@@ -17,5 +17,4 @@ deploy/certs/*
.rustfs.sys
.cargo
profile.json
.docker/openobserve-otel/data
rustfs
.docker/openobserve-otel/data

52
Cargo.lock generated
View File

@@ -506,16 +506,6 @@ dependencies = [
"zbus 5.7.1",
]
[[package]]
name = "assert-json-diff"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "async-broadcast"
version = "0.7.2"
@@ -1839,15 +1829,6 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "colored"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "combine"
version = "4.6.7"
@@ -5989,30 +5970,6 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "mockito"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7760e0e418d9b7e5777c0374009ca4c93861b9066f18cb334a20ce50ab63aa48"
dependencies = [
"assert-json-diff",
"bytes",
"colored",
"futures-util",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
"hyper 1.6.0",
"hyper-util",
"log",
"rand 0.9.1",
"regex",
"serde_json",
"serde_urlencoded",
"similar",
"tokio",
]
[[package]]
name = "muda"
version = "0.11.5"
@@ -8431,13 +8388,12 @@ dependencies = [
"chrono",
"const-str",
"ecstore",
"libc",
"mockito",
"once_cell",
"quick-xml",
"reqwest",
"rumqttc",
"rustfs-config",
"rustfs-utils",
"serde",
"serde_json",
"snap",
@@ -9288,12 +9244,6 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e"
[[package]]
name = "similar"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa"
[[package]]
name = "simple_asn1"
version = "0.6.3"

View File

@@ -121,14 +121,12 @@ keyring = { version = "3.6.2", features = [
"sync-secret-service",
] }
lazy_static = "1.5.0"
libc = "0.2.174"
libsystemd = { version = "0.7.2" }
local-ip-address = "0.6.5"
matchit = "0.8.4"
md-5 = "0.10.6"
mime = "0.3.17"
mime_guess = "2.0.5"
mockito = "1.7.0"
netif = "0.1.6"
nix = { version = "0.30.1", features = ["fs"] }
nu-ansi-term = "0.50.1"
@@ -171,7 +169,7 @@ rdkafka = { version = "0.37.0", features = ["tokio"] }
reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] }
reed-solomon-simd = { version = "3.0.0" }
regex = { version = "1.11.1" }
reqwest = { version = "0.12.19", default-features = false, features = [
reqwest = { version = "0.12.20", default-features = false, features = [
"rustls-tls",
"charset",
"http2",

View File

@@ -10,11 +10,10 @@ use bytes::Bytes;
use rmp::Marker;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::convert::TryFrom;
use std::hash::Hasher;
use std::io::{Read, Write};
use std::{collections::HashMap, io::Cursor};
use std::convert::TryFrom;
use std::fs::File;
use time::OffsetDateTime;
use tokio::io::AsyncRead;
use uuid::Uuid;
@@ -518,11 +517,7 @@ impl FileMeta {
let has_vid = {
if !version_id.is_empty() {
let id = Uuid::parse_str(version_id)?;
if !id.is_nil() {
Some(id)
} else {
None
}
if !id.is_nil() { Some(id) } else { None }
} else {
None
}
@@ -1253,11 +1248,7 @@ impl FileMetaVersionHeader {
cur.read_exact(&mut buf)?;
self.version_id = {
let id = Uuid::from_bytes(buf);
if id.is_nil() {
None
} else {
Some(id)
}
if id.is_nil() { None } else { Some(id) }
};
// mod_time
@@ -1431,11 +1422,7 @@ impl MetaObject {
cur.read_exact(&mut buf)?;
self.version_id = {
let id = Uuid::from_bytes(buf);
if id.is_nil() {
None
} else {
Some(id)
}
if id.is_nil() { None } else { Some(id) }
};
}
"DDir" => {
@@ -1444,11 +1431,7 @@ impl MetaObject {
cur.read_exact(&mut buf)?;
self.data_dir = {
let id = Uuid::from_bytes(buf);
if id.is_nil() {
None
} else {
Some(id)
}
if id.is_nil() { None } else { Some(id) }
};
}
"EcAlgo" => {
@@ -2017,11 +2000,7 @@ impl MetaDeleteMarker {
cur.read_exact(&mut buf)?;
self.version_id = {
let id = Uuid::from_bytes(buf);
if id.is_nil() {
None
} else {
Some(id)
}
if id.is_nil() { None } else { Some(id) }
};
}

View File

@@ -12,7 +12,6 @@ async-trait = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
const-str = { workspace = true }
ecstore = { workspace = true }
libc = { workspace = true }
once_cell = { workspace = true }
quick-xml = { workspace = true, features = ["serialize", "async-tokio"] }
reqwest = { workspace = true }
@@ -31,9 +30,9 @@ wildmatch = { workspace = true, features = ["serde"] }
[dev-dependencies]
tokio = { workspace = true, features = ["test-util"] }
mockito = "1.7"
reqwest = { workspace = true, default-features = false, features = ["rustls-tls", "charset", "http2", "system-proxy", "stream", "json", "blocking"] }
axum = { workspace = true }
rustfs-utils = { workspace = true, features = ["path"] }
[lints]
workspace = true

View File

@@ -1,8 +1,6 @@
use notify::arn::TargetID;
use notify::global::notification_system;
use notify::{
init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError,
};
use rustfs_notify::arn::TargetID;
use rustfs_notify::global::notification_system;
use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError};
use std::time::Duration;
use tracing::info;
@@ -12,18 +10,24 @@ async fn main() -> Result<(), NotificationError> {
let system = notification_system();
// --- 初始配置 (Webhook MQTT) ---
let mut config = notify::Config::new();
// --- Initial configuration (Webhook and MQTT) ---
let mut config = rustfs_notify::Config::new();
let current_root = rustfs_utils::dirs::get_project_root().expect("failed to get project root");
println!("Current project root: {}", current_root.display());
// Webhook target configuration
let mut webhook_kvs = notify::KVS::new();
let mut webhook_kvs = rustfs_notify::KVS::new();
webhook_kvs.set("enable", "on");
webhook_kvs.set("endpoint", "http://127.0.0.1:3020/webhook");
webhook_kvs.set("auth_token", "secret-token");
// webhook_kvs.set("queue_dir", "/tmp/data/webhook");
webhook_kvs.set(
"queue_dir",
"/Users/qun/Documents/rust/rustfs/notify/logs/webhook",
current_root
.clone()
.join("../../deploy/logs/notify/webhook")
.to_str()
.unwrap()
.to_string(),
);
webhook_kvs.set("queue_limit", "10000");
let mut webhook_targets = std::collections::HashMap::new();
@@ -31,7 +35,7 @@ async fn main() -> Result<(), NotificationError> {
config.insert("notify_webhook".to_string(), webhook_targets);
// MQTT target configuration
let mut mqtt_kvs = notify::KVS::new();
let mut mqtt_kvs = rustfs_notify::KVS::new();
mqtt_kvs.set("enable", "on");
mqtt_kvs.set("broker", "mqtt://localhost:1883");
mqtt_kvs.set("topic", "rustfs/events");
@@ -41,7 +45,11 @@ async fn main() -> Result<(), NotificationError> {
// webhook_kvs.set("queue_dir", "/tmp/data/mqtt");
mqtt_kvs.set(
"queue_dir",
"/Users/qun/Documents/rust/rustfs/notify/logs/mqtt",
current_root
.join("../../deploy/logs/notify/mqtt")
.to_str()
.unwrap()
.to_string(),
);
mqtt_kvs.set("queue_limit", "10000");
@@ -49,35 +57,32 @@ async fn main() -> Result<(), NotificationError> {
mqtt_targets.insert("1".to_string(), mqtt_kvs);
config.insert("notify_mqtt".to_string(), mqtt_targets);
// 加载配置并初始化系统
// Load the configuration and initialize the system
*system.config.write().await = config;
system.init().await?;
info!("✅ System initialized with Webhook and MQTT targets.");
// --- 1. 查询当前活动的 Target ---
// --- Query the currently active Target ---
let active_targets = system.get_active_targets().await;
info!("\n---> Currently active targets: {:?}", active_targets);
assert_eq!(active_targets.len(), 2);
tokio::time::sleep(Duration::from_secs(1)).await;
// --- 2. 精确删除一个 Target (例如 MQTT) ---
// --- Exactly delete a Target (e.g. MQTT) ---
info!("\n---> Removing MQTT target...");
let mqtt_target_id = TargetID::new("1".to_string(), "mqtt".to_string());
system.remove_target(&mqtt_target_id, "notify_mqtt").await?;
info!("✅ MQTT target removed.");
// --- 3. 再次查询活动的 Target ---
// --- Query the activity's Target again ---
let active_targets_after_removal = system.get_active_targets().await;
info!(
"\n---> Active targets after removal: {:?}",
active_targets_after_removal
);
info!("\n---> Active targets after removal: {:?}", active_targets_after_removal);
assert_eq!(active_targets_after_removal.len(), 1);
assert_eq!(active_targets_after_removal[0].id, "1".to_string());
// --- 4. 发送事件进行验证 ---
// 配置一个规则,指向 Webhook 和已删除的 MQTT
// --- Send events for verification ---
// Configure a rule to point to the Webhook and deleted MQTT
let mut bucket_config = BucketNotificationConfig::new("us-east-1");
bucket_config.add_rule(
&[EventName::ObjectCreatedPut],
@@ -87,20 +92,16 @@ async fn main() -> Result<(), NotificationError> {
bucket_config.add_rule(
&[EventName::ObjectCreatedPut],
"*".to_string(),
TargetID::new("1".to_string(), "mqtt".to_string()), // 这个规则会匹配,但找不到 Target
TargetID::new("1".to_string(), "mqtt".to_string()), // This rule will match, but the Target cannot be found
);
system
.load_bucket_notification_config("my-bucket", &bucket_config)
.await?;
system.load_bucket_notification_config("my-bucket", &bucket_config).await?;
info!("\n---> Sending an event...");
let event = Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut);
system
.send_event("my-bucket", "s3:ObjectCreated:Put", "document.pdf", event)
.await;
info!(
"✅ Event sent. Only the Webhook target should receive it. Check logs for warnings about the missing MQTT target."
);
info!("✅ Event sent. Only the Webhook target should receive it. Check logs for warnings about the missing MQTT target.");
tokio::time::sleep(Duration::from_secs(2)).await;

View File

@@ -1,9 +1,8 @@
use notify::arn::TargetID;
use notify::global::notification_system;
// 1. 使用全局访问器
use notify::{
init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, KVS,
};
// Using Global Accessories
use rustfs_config::notify;
use rustfs_notify::arn::TargetID;
use rustfs_notify::global::notification_system;
use rustfs_notify::{init_logger, BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, KVS};
use std::time::Duration;
use tracing::info;
@@ -11,11 +10,12 @@ use tracing::info;
async fn main() -> Result<(), NotificationError> {
init_logger(LogLevel::Debug);
// 获取全局 NotificationSystem 实例
// Get global NotificationSystem instance
let system = notification_system();
// --- 初始配置 ---
let mut config = notify::Config::new();
// --- Initial configuration ---
let mut config = rustfs_notify::Config::new();
let current_root = rustfs_utils::dirs::get_project_root().expect("failed to get project root");
// Webhook target
let mut webhook_kvs = KVS::new();
webhook_kvs.set("enable", "on");
@@ -23,20 +23,25 @@ async fn main() -> Result<(), NotificationError> {
// webhook_kvs.set("queue_dir", "./logs/webhook");
webhook_kvs.set(
"queue_dir",
"/Users/qun/Documents/rust/rustfs/notify/logs/webhook",
current_root
.clone()
.join("/deploy/logs/notify/webhook")
.to_str()
.unwrap()
.to_string(),
);
let mut webhook_targets = std::collections::HashMap::new();
webhook_targets.insert("1".to_string(), webhook_kvs);
config.insert("notify_webhook".to_string(), webhook_targets);
// 加载初始配置并初始化系统
// Load the initial configuration and initialize the system
*system.config.write().await = config;
system.init().await?;
info!("✅ System initialized with Webhook target.");
tokio::time::sleep(Duration::from_secs(1)).await;
// --- 2. 动态更新系统配置:添加一个 MQTT Target ---
// --- Dynamically update system configuration: Add an MQTT Target ---
info!("\n---> Dynamically adding MQTT target...");
let mut mqtt_kvs = KVS::new();
mqtt_kvs.set("enable", "on");
@@ -47,18 +52,13 @@ async fn main() -> Result<(), NotificationError> {
mqtt_kvs.set("password", "123456");
mqtt_kvs.set("queue_limit", "10000");
// mqtt_kvs.set("queue_dir", "./logs/mqtt");
mqtt_kvs.set(
"queue_dir",
"/Users/qun/Documents/rust/rustfs/notify/logs/mqtt",
);
system
.set_target_config("notify_mqtt", "1", mqtt_kvs)
.await?;
mqtt_kvs.set("queue_dir", current_root.join("/deploy/logs/notify/mqtt").to_str().unwrap().to_string());
system.set_target_config("notify_mqtt", "1", mqtt_kvs).await?;
info!("✅ MQTT target added and system reloaded.");
tokio::time::sleep(Duration::from_secs(1)).await;
// --- 3. 加载和管理 Bucket 配置 ---
// --- Loading and managing Bucket configurations ---
info!("\n---> Loading bucket notification config...");
let mut bucket_config = BucketNotificationConfig::new("us-east-1");
bucket_config.add_rule(
@@ -71,12 +71,10 @@ async fn main() -> Result<(), NotificationError> {
"*".to_string(),
TargetID::new("1".to_string(), "mqtt".to_string()),
);
system
.load_bucket_notification_config("my-bucket", &bucket_config)
.await?;
system.load_bucket_notification_config("my-bucket", &bucket_config).await?;
info!("✅ Bucket 'my-bucket' config loaded.");
// --- 发送事件 ---
// --- Send events ---
info!("\n---> Sending an event...");
let event = Event::new_test_event("my-bucket", "document.pdf", EventName::ObjectCreatedPut);
system
@@ -86,7 +84,7 @@ async fn main() -> Result<(), NotificationError> {
tokio::time::sleep(Duration::from_secs(2)).await;
// --- 动态移除配置 ---
// --- Dynamically remove configuration ---
info!("\n---> Dynamically removing Webhook target...");
system.remove_target_config("notify_webhook", "1").await?;
info!("✅ Webhook target removed and system reloaded.");

View File

@@ -16,40 +16,37 @@ struct ResetParams {
reason: Option<String>,
}
// 定义一个全局变量 统计接受到数据条数
// Define a global variable and count the number of data received
use std::sync::atomic::{AtomicU64, Ordering};
static WEBHOOK_COUNT: AtomicU64 = AtomicU64::new(0);
#[tokio::main]
async fn main() {
// 构建应用
// Build an application
let app = Router::new()
.route("/webhook", post(receive_webhook))
.route(
"/webhook/reset/{reason}",
get(reset_webhook_count_with_path),
)
.route("/webhook/reset/{reason}", get(reset_webhook_count_with_path))
.route("/webhook/reset", get(reset_webhook_count))
.route("/webhook", get(receive_webhook));
// 启动服务器
// Start the server
let addr = "0.0.0.0:3020";
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
println!("Server running on {}", addr);
// 服务启动后进行自检
// Self-checking after the service is started
tokio::spawn(async move {
// 给服务器一点时间启动
// Give the server some time to start
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
match is_service_active(addr).await {
Ok(true) => println!("服务健康检查:成功 - 服务正常运行"),
Ok(false) => eprintln!("服务健康检查:失败 - 服务未响应"),
Err(e) => eprintln!("服务健康检查错误:{}", e),
Ok(true) => println!("Service health check: Successful - Service is running normally"),
Ok(false) => eprintln!("Service Health Check: Failed - Service Not Responded"),
Err(e) => eprintln!("Service health check errors:{}", e),
}
});
// 创建关闭信号处理
// Create a shutdown signal processing
tokio::select! {
result = axum::serve(listener, app) => {
if let Err(e) = result {
@@ -62,16 +59,14 @@ async fn main() {
}
}
/// 创建一个方法重置 WEBHOOK_COUNT 的值
async fn reset_webhook_count_with_path(
axum::extract::Path(reason): axum::extract::Path<String>,
) -> Response<String> {
// 输出当前计数器的值
/// Create a method to reset the value of WEBHOOK_COUNT
async fn reset_webhook_count_with_path(axum::extract::Path(reason): axum::extract::Path<String>) -> Response<String> {
// Output the value of the current counter
let current_count = WEBHOOK_COUNT.load(Ordering::SeqCst);
println!("Current webhook count: {}", current_count);
println!("Reset webhook count, reason: {}", reason);
// 将计数器重置为 0
// Reset the counter to 0
WEBHOOK_COUNT.store(0, Ordering::SeqCst);
println!("Webhook count has been reset to 0.");
@@ -85,17 +80,14 @@ async fn reset_webhook_count_with_path(
.unwrap()
}
/// 创建一个方法重置 WEBHOOK_COUNT 的值
/// 可以通过调用此方法来重置计数器
async fn reset_webhook_count(
Query(params): Query<ResetParams>,
headers: HeaderMap,
) -> Response<String> {
// 输出当前计数器的值
/// Create a method to reset the value of WEBHOOK_COUNT
/// You can reset the counter by calling this method
async fn reset_webhook_count(Query(params): Query<ResetParams>, headers: HeaderMap) -> Response<String> {
// Output the value of the current counter
let current_count = WEBHOOK_COUNT.load(Ordering::SeqCst);
println!("Current webhook count: {}", current_count);
let reason = params.reason.unwrap_or_else(|| "未提供原因".to_string());
let reason = params.reason.unwrap_or_else(|| "Reason not provided".to_string());
println!("Reset webhook count, reason: {}", reason);
for header in headers {
@@ -104,51 +96,41 @@ async fn reset_webhook_count(
}
println!("Reset webhook count printed headers");
// 将计数器重置为 0
// Reset the counter to 0
WEBHOOK_COUNT.store(0, Ordering::SeqCst);
println!("Webhook count has been reset to 0.");
Response::builder()
.header("Foo", "Bar")
.status(StatusCode::OK)
.body(format!(
"Webhook count reset successfully current_count:{}",
current_count
))
.body(format!("Webhook count reset successfully current_count:{}", current_count))
.unwrap()
}
async fn is_service_active(addr: &str) -> Result<bool, String> {
let socket_addr = tokio::net::lookup_host(addr)
.await
.map_err(|e| format!("无法解析主机:{}", e))?
.map_err(|e| format!("Unable to resolve host:{}", e))?
.next()
.ok_or_else(|| "未找到地址".to_string())?;
.ok_or_else(|| "Address not found".to_string())?;
println!("正在检查服务状态:{}", socket_addr);
println!("Checking service status:{}", socket_addr);
match tokio::time::timeout(
std::time::Duration::from_secs(5),
tokio::net::TcpStream::connect(socket_addr),
)
.await
{
match tokio::time::timeout(std::time::Duration::from_secs(5), tokio::net::TcpStream::connect(socket_addr)).await {
Ok(Ok(_)) => Ok(true),
Ok(Err(e)) => {
if e.kind() == std::io::ErrorKind::ConnectionRefused {
Ok(false)
} else {
Err(format!("连接失败:{}", e))
Err(format!("Connection failed:{}", e))
}
}
Err(_) => Err("连接超时".to_string()),
Err(_) => Err("Connection timeout".to_string()),
}
}
async fn receive_webhook(Json(payload): Json<Value>) -> StatusCode {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let since_the_epoch = start.duration_since(UNIX_EPOCH).expect("Time went backwards");
// get the number of seconds since the unix era
let seconds = since_the_epoch.as_secs();
@@ -157,20 +139,14 @@ async fn receive_webhook(Json(payload): Json<Value>) -> StatusCode {
let (year, month, day, hour, minute, second) = convert_seconds_to_date(seconds);
// output result
println!(
"current time:{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
year, month, day, hour, minute, second
);
println!("current time:{:04}-{:02}-{:02} {:02}:{:02}:{:02}", year, month, day, hour, minute, second);
println!(
"received a webhook request time:{} content:\n {}",
seconds,
serde_json::to_string_pretty(&payload).unwrap()
);
WEBHOOK_COUNT.fetch_add(1, Ordering::SeqCst);
println!(
"Total webhook requests received: {}",
WEBHOOK_COUNT.load(Ordering::SeqCst)
);
println!("Total webhook requests received: {}", WEBHOOK_COUNT.load(Ordering::SeqCst));
StatusCode::OK
}
@@ -221,12 +197,5 @@ fn convert_seconds_to_date(seconds: u64) -> (u32, u32, u32, u32, u32, u32) {
// calculate the number of seconds
second += total_seconds;
(
year as u32,
month as u32,
day as u32,
hour as u32,
minute as u32,
second as u32,
)
(year as u32, month as u32, day as u32, hour as u32, minute as u32, second as u32)
}

View File

@@ -3,23 +3,23 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
/// 当解析事件名称字符串失败时返回的错误
/// Error returned when parsing event name string fails
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParseEventNameError(String);
impl fmt::Display for ParseEventNameError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "无效的事件名称:{}", self.0)
write!(f, "Invalid event name:{}", self.0)
}
}
impl std::error::Error for ParseEventNameError {}
/// 表示对象上发生的事件类型。
/// 基于 AWS S3 事件类型,并包含 RustFS 扩展。
/// Represents the type of event that occurs on the object.
/// Based on AWS S3 event type and includes RustFS extension.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum EventName {
// 单一事件类型 (值为 1-32 以兼容掩码逻辑)
// Single event type (values are 1-32 for compatible mask logic)
ObjectAccessedGet = 1,
ObjectAccessedGetRetention = 2,
ObjectAccessedGetLegalHold = 3,
@@ -48,23 +48,23 @@ pub enum EventName {
ObjectRestoreCompleted = 26,
ObjectTransitionFailed = 27,
ObjectTransitionComplete = 28,
ScannerManyVersions = 29, // 对应 Go 的 ObjectManyVersions
ScannerLargeVersions = 30, // 对应 Go 的 ObjectLargeVersions
ScannerBigPrefix = 31, // 对应 Go 的 PrefixManyFolders
LifecycleDelMarkerExpirationDelete = 32, // 对应 Go 的 ILMDelMarkerExpirationDelete
ScannerManyVersions = 29, // ObjectManyVersions corresponding to Go
ScannerLargeVersions = 30, // ObjectLargeVersions corresponding to Go
ScannerBigPrefix = 31, // PrefixManyFolders corresponding to Go
LifecycleDelMarkerExpirationDelete = 32, // ILMDelMarkerExpirationDelete corresponding to Go
// 复合 "All" 事件类型 (没有用于掩码的顺序值)
// Compound "All" event type (no sequential value for mask)
ObjectAccessedAll,
ObjectCreatedAll,
ObjectRemovedAll,
ObjectReplicationAll,
ObjectRestoreAll,
ObjectTransitionAll,
ObjectScannerAll, // 新增,来自 Go
Everything, // 新增,来自 Go
ObjectScannerAll, // New, from Go
Everything, // New, from Go
}
// 用于 Everything.expand() 的单一事件类型顺序数组
// Single event type sequential array for Everything.expand()
const SINGLE_EVENT_NAMES_IN_ORDER: [EventName; 32] = [
EventName::ObjectAccessedGet,
EventName::ObjectAccessedGetRetention,
@@ -103,7 +103,7 @@ const SINGLE_EVENT_NAMES_IN_ORDER: [EventName; 32] = [
const LAST_SINGLE_TYPE_VALUE: u32 = EventName::LifecycleDelMarkerExpirationDelete as u32;
impl EventName {
/// 解析字符串为 EventName
/// The parsed string is EventName.
pub fn parse(s: &str) -> Result<Self, ParseEventNameError> {
match s {
"s3:BucketCreated:*" => Ok(EventName::BucketCreated),
@@ -115,9 +115,7 @@ impl EventName {
"s3:ObjectAccessed:Head" => Ok(EventName::ObjectAccessedHead),
"s3:ObjectAccessed:Attributes" => Ok(EventName::ObjectAccessedAttributes),
"s3:ObjectCreated:*" => Ok(EventName::ObjectCreatedAll),
"s3:ObjectCreated:CompleteMultipartUpload" => {
Ok(EventName::ObjectCreatedCompleteMultipartUpload)
}
"s3:ObjectCreated:CompleteMultipartUpload" => Ok(EventName::ObjectCreatedCompleteMultipartUpload),
"s3:ObjectCreated:Copy" => Ok(EventName::ObjectCreatedCopy),
"s3:ObjectCreated:Post" => Ok(EventName::ObjectCreatedPost),
"s3:ObjectCreated:Put" => Ok(EventName::ObjectCreatedPut),
@@ -127,25 +125,15 @@ impl EventName {
"s3:ObjectCreated:DeleteTagging" => Ok(EventName::ObjectCreatedDeleteTagging),
"s3:ObjectRemoved:*" => Ok(EventName::ObjectRemovedAll),
"s3:ObjectRemoved:Delete" => Ok(EventName::ObjectRemovedDelete),
"s3:ObjectRemoved:DeleteMarkerCreated" => {
Ok(EventName::ObjectRemovedDeleteMarkerCreated)
}
"s3:ObjectRemoved:DeleteMarkerCreated" => Ok(EventName::ObjectRemovedDeleteMarkerCreated),
"s3:ObjectRemoved:NoOP" => Ok(EventName::ObjectRemovedNoOP),
"s3:ObjectRemoved:DeleteAllVersions" => Ok(EventName::ObjectRemovedDeleteAllVersions),
"s3:LifecycleDelMarkerExpiration:Delete" => {
Ok(EventName::LifecycleDelMarkerExpirationDelete)
}
"s3:LifecycleDelMarkerExpiration:Delete" => Ok(EventName::LifecycleDelMarkerExpirationDelete),
"s3:Replication:*" => Ok(EventName::ObjectReplicationAll),
"s3:Replication:OperationFailedReplication" => Ok(EventName::ObjectReplicationFailed),
"s3:Replication:OperationCompletedReplication" => {
Ok(EventName::ObjectReplicationComplete)
}
"s3:Replication:OperationMissedThreshold" => {
Ok(EventName::ObjectReplicationMissedThreshold)
}
"s3:Replication:OperationReplicatedAfterThreshold" => {
Ok(EventName::ObjectReplicationReplicatedAfterThreshold)
}
"s3:Replication:OperationCompletedReplication" => Ok(EventName::ObjectReplicationComplete),
"s3:Replication:OperationMissedThreshold" => Ok(EventName::ObjectReplicationMissedThreshold),
"s3:Replication:OperationReplicatedAfterThreshold" => Ok(EventName::ObjectReplicationReplicatedAfterThreshold),
"s3:Replication:OperationNotTracked" => Ok(EventName::ObjectReplicationNotTracked),
"s3:ObjectRestore:*" => Ok(EventName::ObjectRestoreAll),
"s3:ObjectRestore:Post" => Ok(EventName::ObjectRestorePost),
@@ -156,12 +144,12 @@ impl EventName {
"s3:Scanner:ManyVersions" => Ok(EventName::ScannerManyVersions),
"s3:Scanner:LargeVersions" => Ok(EventName::ScannerLargeVersions),
"s3:Scanner:BigPrefix" => Ok(EventName::ScannerBigPrefix),
// ObjectScannerAll Everything 不能从字符串解析,因为 Go 版本也没有定义它们的字符串表示
// ObjectScannerAll and Everything cannot be parsed from strings, because the Go version also does not define their string representation.
_ => Err(ParseEventNameError(s.to_string())),
}
}
/// 返回事件类型的字符串表示。
/// Returns a string representation of the event type.
pub fn as_str(&self) -> &'static str {
match self {
EventName::BucketCreated => "s3:BucketCreated:*",
@@ -173,9 +161,7 @@ impl EventName {
EventName::ObjectAccessedHead => "s3:ObjectAccessed:Head",
EventName::ObjectAccessedAttributes => "s3:ObjectAccessed:Attributes",
EventName::ObjectCreatedAll => "s3:ObjectCreated:*",
EventName::ObjectCreatedCompleteMultipartUpload => {
"s3:ObjectCreated:CompleteMultipartUpload"
}
EventName::ObjectCreatedCompleteMultipartUpload => "s3:ObjectCreated:CompleteMultipartUpload",
EventName::ObjectCreatedCopy => "s3:ObjectCreated:Copy",
EventName::ObjectCreatedPost => "s3:ObjectCreated:Post",
EventName::ObjectCreatedPut => "s3:ObjectCreated:Put",
@@ -188,19 +174,13 @@ impl EventName {
EventName::ObjectRemovedDeleteMarkerCreated => "s3:ObjectRemoved:DeleteMarkerCreated",
EventName::ObjectRemovedNoOP => "s3:ObjectRemoved:NoOP",
EventName::ObjectRemovedDeleteAllVersions => "s3:ObjectRemoved:DeleteAllVersions",
EventName::LifecycleDelMarkerExpirationDelete => {
"s3:LifecycleDelMarkerExpiration:Delete"
}
EventName::LifecycleDelMarkerExpirationDelete => "s3:LifecycleDelMarkerExpiration:Delete",
EventName::ObjectReplicationAll => "s3:Replication:*",
EventName::ObjectReplicationFailed => "s3:Replication:OperationFailedReplication",
EventName::ObjectReplicationComplete => "s3:Replication:OperationCompletedReplication",
EventName::ObjectReplicationNotTracked => "s3:Replication:OperationNotTracked",
EventName::ObjectReplicationMissedThreshold => {
"s3:Replication:OperationMissedThreshold"
}
EventName::ObjectReplicationReplicatedAfterThreshold => {
"s3:Replication:OperationReplicatedAfterThreshold"
}
EventName::ObjectReplicationMissedThreshold => "s3:Replication:OperationMissedThreshold",
EventName::ObjectReplicationReplicatedAfterThreshold => "s3:Replication:OperationReplicatedAfterThreshold",
EventName::ObjectRestoreAll => "s3:ObjectRestore:*",
EventName::ObjectRestorePost => "s3:ObjectRestore:Post",
EventName::ObjectRestoreCompleted => "s3:ObjectRestore:Completed",
@@ -210,13 +190,13 @@ impl EventName {
EventName::ScannerManyVersions => "s3:Scanner:ManyVersions",
EventName::ScannerLargeVersions => "s3:Scanner:LargeVersions",
EventName::ScannerBigPrefix => "s3:Scanner:BigPrefix",
// Go String() ObjectScannerAll Everything 返回 ""
EventName::ObjectScannerAll => "s3:Scanner:*", // 遵循 Go Expand 中的模式
EventName::Everything => "", // Go String() 对未处理的返回 ""
// Go's String() returns "" for ObjectScannerAll and Everything
EventName::ObjectScannerAll => "s3:Scanner:*", // Follow the pattern in Go Expand
EventName::Everything => "", // Go String() returns "" to unprocessed
}
}
/// 返回缩写事件类型的扩展值。
/// Returns the extended value of the abbreviation event type.
pub fn expand(&self) -> Vec<Self> {
match self {
EventName::ObjectAccessedAll => vec![
@@ -249,41 +229,35 @@ impl EventName {
EventName::ObjectReplicationMissedThreshold,
EventName::ObjectReplicationReplicatedAfterThreshold,
],
EventName::ObjectRestoreAll => vec![
EventName::ObjectRestorePost,
EventName::ObjectRestoreCompleted,
],
EventName::ObjectTransitionAll => vec![
EventName::ObjectTransitionFailed,
EventName::ObjectTransitionComplete,
],
EventName::ObjectRestoreAll => vec![EventName::ObjectRestorePost, EventName::ObjectRestoreCompleted],
EventName::ObjectTransitionAll => vec![EventName::ObjectTransitionFailed, EventName::ObjectTransitionComplete],
EventName::ObjectScannerAll => vec![
// 新增
// New
EventName::ScannerManyVersions,
EventName::ScannerLargeVersions,
EventName::ScannerBigPrefix,
],
EventName::Everything => {
// 新增
// New
SINGLE_EVENT_NAMES_IN_ORDER.to_vec()
}
// 单一类型直接返回自身
// A single type returns to itself directly
_ => vec![*self],
}
}
/// 返回类型的掩码。
/// 复合 "All" 类型会被展开。
/// Returns the mask of type.
/// The compound "All" type will be expanded.
pub fn mask(&self) -> u64 {
let value = *self as u32;
if value > 0 && value <= LAST_SINGLE_TYPE_VALUE {
// 是单一类型
// It's a single type
1u64 << (value - 1)
} else {
// 是复合类型
// It's a compound type
let mut mask = 0u64;
for n in self.expand() {
mask |= n.mask(); // 递归调用 mask
mask |= n.mask(); // Recursively call mask
}
mask
}
@@ -296,7 +270,7 @@ impl fmt::Display for EventName {
}
}
/// 根据字符串转换为 `EventName`
/// Convert to `EventName` according to string
impl From<&str> for EventName {
fn from(event_str: &str) -> Self {
EventName::parse(event_str).unwrap_or_else(|e| panic!("{}", e))
@@ -399,28 +373,16 @@ impl Event {
pub fn new_test_event(bucket: &str, key: &str, event_name: EventName) -> Self {
let mut user_metadata = HashMap::new();
user_metadata.insert("x-amz-meta-test".to_string(), "value".to_string());
user_metadata.insert(
"x-amz-storage-storage-options".to_string(),
"value".to_string(),
);
user_metadata.insert("x-amz-storage-storage-options".to_string(), "value".to_string());
user_metadata.insert("x-amz-meta-".to_string(), "value".to_string());
user_metadata.insert("x-rustfs-meta-".to_string(), "rustfs-value".to_string());
user_metadata.insert("x-request-id".to_string(), "request-id-123".to_string());
user_metadata.insert("x-bucket".to_string(), "bucket".to_string());
user_metadata.insert("x-object".to_string(), "object".to_string());
user_metadata.insert(
"x-rustfs-origin-endpoint".to_string(),
"http://127.0.0.1".to_string(),
);
user_metadata.insert("x-rustfs-origin-endpoint".to_string(), "http://127.0.0.1".to_string());
user_metadata.insert("x-rustfs-user-metadata".to_string(), "metadata".to_string());
user_metadata.insert(
"x-rustfs-deployment-id".to_string(),
"deployment-id-123".to_string(),
);
user_metadata.insert(
"x-rustfs-origin-endpoint-code".to_string(),
"http://127.0.0.1".to_string(),
);
user_metadata.insert("x-rustfs-deployment-id".to_string(), "deployment-id-123".to_string());
user_metadata.insert("x-rustfs-origin-endpoint-code".to_string(), "http://127.0.0.1".to_string());
user_metadata.insert("x-rustfs-bucket-name".to_string(), "bucket".to_string());
user_metadata.insert("x-rustfs-object-key".to_string(), key.to_string());
user_metadata.insert("x-rustfs-object-size".to_string(), "1024".to_string());
@@ -466,7 +428,7 @@ impl Event {
},
}
}
/// 返回事件掩码
/// Return event mask
pub fn mask(&self) -> u64 {
self.event_name.mask()
}

View File

@@ -1,4 +1,4 @@
//! RustFs Notify - A flexible and extensible event notification system for object storage.
//! RustFS Notify - A flexible and extensible event notification system for object storage.
//!
//! This library provides a Rust implementation of a storage bucket notification system,
//! similar to RustFS's notification system. It supports sending events to various targets
@@ -35,7 +35,7 @@ use tracing_subscriber::{fmt, prelude::*, util::SubscriberInitExt, EnvFilter};
///
/// # Example
/// ```
/// notify::init_logger(notify::LogLevel::Info);
/// rustfs_notify::init_logger(rustfs_notify::LogLevel::Info);
/// ```
pub fn init_logger(level: LogLevel) {
let filter = EnvFilter::default().add_directive(level.into());

View File

@@ -39,7 +39,7 @@ impl BucketNotificationConfig {
/// Parses notification configuration from XML.
/// `arn_list` is a list of valid ARN strings for validation.
pub fn from_xml<R: Read>(
pub fn from_xml<R: Read + std::io::BufRead>(
reader: R,
current_region: &str,
arn_list: &[String],
@@ -72,11 +72,7 @@ impl BucketNotificationConfig {
/// However, Go's Config has a Validate method.
/// The primary validation now happens during `from_xml` via `NotificationConfiguration::validate`.
/// This method could re-check against an updated arn_list or region if needed.
pub fn validate(
&self,
current_region: &str,
arn_list: &[String],
) -> Result<(), BucketNotificationConfigError> {
pub fn validate(&self, current_region: &str, arn_list: &[String]) -> Result<(), BucketNotificationConfigError> {
if self.region != current_region {
return Err(BucketNotificationConfigError::RegionMismatch {
config_region: self.region.clone(),
@@ -93,9 +89,7 @@ impl BucketNotificationConfig {
// Construct the ARN string for this target_id and self.region
let arn_to_check = target_id.to_arn(&self.region); // Assuming TargetID has to_arn
if !arn_list.contains(&arn_to_check.to_arn_string()) {
return Err(BucketNotificationConfigError::ArnNotFound(
arn_to_check.to_arn_string(),
));
return Err(BucketNotificationConfigError::ArnNotFound(arn_to_check.to_arn_string()));
}
}
}

View File

@@ -22,12 +22,7 @@ impl RulesMap {
/// target_id: Notify the target.
///
/// This method expands the composite event name.
pub fn add_rule_config(
&mut self,
event_names: &[EventName],
pattern: String,
target_id: TargetID,
) {
pub fn add_rule_config(&mut self, event_names: &[EventName], pattern: String, target_id: TargetID) {
let mut effective_pattern = pattern;
if effective_pattern.is_empty() {
effective_pattern = "*".to_string(); // Match all by default
@@ -55,8 +50,7 @@ impl RulesMap {
}
}
/// 从当前 RulesMap 中移除另一个 RulesMap 中定义的规则。
/// 对应 Go 的 `RulesMap.Remove(rulesMap2 RulesMap)`
/// Remove another rule defined in the RulesMap from the current RulesMap.
pub fn remove_map(&mut self, other_map: &Self) {
let mut events_to_remove = Vec::new();
for (event_name, self_pattern_rules) in &mut self.map {
@@ -72,24 +66,24 @@ impl RulesMap {
}
}
/// 匹配给定事件名称和对象键的规则,返回所有匹配的 TargetID
///Rules matching the given event name and object key, returning all matching TargetIDs.
pub fn match_rules(&self, event_name: EventName, object_key: &str) -> TargetIdSet {
// 首先尝试直接匹配事件名称
// First try to directly match the event name
if let Some(pattern_rules) = self.map.get(&event_name) {
let targets = pattern_rules.match_targets(object_key);
if !targets.is_empty() {
return targets;
}
}
// Go RulesMap[eventName] 直接获取,如果不存在则为空 Rules
// Rust HashMap::get 返回 Option。如果事件名不存在则没有规则。
// 复合事件(如 ObjectCreatedAll add_rule_config 时已展开为单一事件。
// 因此,查询时应使用单一事件名称。
// 如果 event_name 本身就是单一类型,则直接查找。
// 如果 event_name 是复合类型Go 的逻辑是在添加时展开。
// 这里的 match_rules 应该接收已经可能是单一的事件。
// 如果调用者传入的是复合事件,它应该先自行展开或此函数处理。
// 假设 event_name 已经是具体的、可用于查找的事件。
// Go's RulesMap[eventName] is directly retrieved, and if it does not exist, it is empty Rules.
// Rust's HashMap::get returns Option. If the event name does not exist, there is no rule.
// Compound events (such as ObjectCreatedAll) have been expanded as a single event when add_rule_config.
// Therefore, a single event name should be used when querying.
// If event_name itself is a single type, look it up directly.
// If event_name is a compound type, Go's logic is expanded when added.
// Here match_rules should receive events that may already be single.
// If the caller passes in a compound event, it should expand itself or handle this function first.
// Assume that event_name is already a specific event that can be used for searching.
self.map
.get(&event_name)
.map_or_else(TargetIdSet::new, |pr| pr.match_targets(object_key))
@@ -99,7 +93,7 @@ impl RulesMap {
self.map.is_empty()
}
/// 返回内部规则的克隆,用于 BucketNotificationConfig::validate 等场景。
/// Returns a clone of internal rules for use in scenarios such as BucketNotificationConfig::validate.
pub fn inner(&self) -> &HashMap<EventName, PatternRules> {
&self.map
}

View File

@@ -9,7 +9,7 @@ use thiserror::Error;
#[derive(Debug, Error)]
pub enum ParseConfigError {
#[error("XML parsing error:{0}")]
XmlError(#[from] quick_xml::errors::Error),
XmlError(#[from] quick_xml::errors::serialize::DeError),
#[error("Invalid filter value:{0}")]
InvalidFilterValue(String),
#[error("Invalid filter name: {0}, only 'prefix' or 'suffix' is allowed")]
@@ -193,10 +193,10 @@ impl QueueConfig {
pub struct LambdaConfigDetail {
#[serde(rename = "CloudFunction")]
pub arn: String,
// 根据 AWS S3 文档,<CloudFunctionConfiguration> 通常还包含 Id, Event, Filter
// 但为了严格对应提供的 Go `lambda` 结构体,这里只包含 ARN。
// 如果需要完整支持,可以添加其他字段。
// 例如:
// According to AWS S3 documentation, <CloudFunctionConfiguration> usually also contains Id, Event, Filter
// But in order to strictly correspond to the Go `lambda` structure provided, only ARN is included here.
// If full support is required, additional fields can be added.
// For example:
// #[serde(rename = "Id", skip_serializing_if = "Option::is_none")]
// pub id: Option<String>,
// #[serde(rename = "Event", default, skip_serializing_if = "Vec::is_empty")]
@@ -211,7 +211,7 @@ pub struct LambdaConfigDetail {
pub struct TopicConfigDetail {
#[serde(rename = "Topic")]
pub arn: String,
// 类似于 LambdaConfigDetail,可以根据需要扩展以包含 Id, Event, Filter 等字段。
// Similar to LambdaConfigDetail, it can be extended to include fields such as Id, Event, Filter, etc.
}
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
@@ -236,8 +236,8 @@ pub struct NotificationConfiguration {
}
impl NotificationConfiguration {
pub fn from_reader<R: Read>(reader: R) -> Result<Self, ParseConfigError> {
let config: NotificationConfiguration = quick_xml::reader::Reader::from_reader(reader)?;
pub fn from_reader<R: Read + std::io::BufRead>(reader: R) -> Result<Self, ParseConfigError> {
let config: NotificationConfiguration = quick_xml::de::from_reader(reader)?;
Ok(config)
}
@@ -268,7 +268,7 @@ impl NotificationConfiguration {
if self.xmlns.is_none() {
self.xmlns = Some("http://s3.amazonaws.com/doc/2006-03-01/".to_string());
}
// 注意:如果 LambdaConfigDetail TopicConfigDetail 将来包含区域等信息,
// 也可能需要在这里设置默认值。但根据当前定义,它们只包含 ARN 字符串。
// Note: If LambdaConfigDetail and TopicConfigDetail contain information such as regions in the future,
// You may also need to set the default value here. But according to the current definition, they only contain ARN strings.
}
}

View File

@@ -13,7 +13,7 @@ use tracing::{debug, error, info, warn};
/// Streams events from the store to the target
pub async fn stream_events(
store: &mut (dyn Store<crate::event::Event, Error = StoreError, Key = Key> + Send),
store: &mut (dyn Store<Event, Error = StoreError, Key = Key> + Send),
target: &dyn Target,
mut cancel_rx: mpsc::Receiver<()>,
) {
@@ -42,10 +42,7 @@ pub async fn stream_events(
for key in keys {
// Check for cancellation before processing each event
if cancel_rx.try_recv().is_ok() {
info!(
"Cancellation received during processing for target: {}",
target.name()
);
info!("Cancellation received during processing for target: {}", target.name());
return;
}
@@ -70,7 +67,7 @@ pub async fn stream_events(
TargetError::Timeout(_) => {
warn!("Timeout for target {}, retrying...", target.name());
retry_count += 1;
sleep(Duration::from_secs((retry_count * 5) as u64)).await; // 指数退避
sleep(Duration::from_secs((retry_count * 5) as u64)).await; // Exponential backoff
}
_ => {
// Permanent error, skip this event
@@ -84,11 +81,7 @@ pub async fn stream_events(
// Remove event from store if successfully sent
if retry_count >= MAX_RETRIES && !success {
warn!(
"Max retries exceeded for event {}, target: {}, skipping",
key.to_string(),
target.name()
);
warn!("Max retries exceeded for event {}, target: {}, skipping", key.to_string(), target.name());
}
}
@@ -120,10 +113,7 @@ pub fn start_event_stream_with_batching(
semaphore: Arc<Semaphore>,
) -> mpsc::Sender<()> {
let (cancel_tx, cancel_rx) = mpsc::channel(1);
debug!(
"Starting event stream with batching for target: {}",
target.name()
);
debug!("Starting event stream with batching for target: {}", target.name());
tokio::spawn(async move {
stream_events_with_batching(&mut *store, &*target, cancel_rx, metrics, semaphore).await;
info!("Event stream stopped for target: {}", target.name());
@@ -132,7 +122,7 @@ pub fn start_event_stream_with_batching(
cancel_tx
}
/// 带批处理的事件流处理
/// Event stream processing with batch processing
pub async fn stream_events_with_batching(
store: &mut (dyn Store<Event, Error = StoreError, Key = Key> + Send),
target: &dyn Target,
@@ -140,10 +130,7 @@ pub async fn stream_events_with_batching(
metrics: Arc<NotificationMetrics>,
semaphore: Arc<Semaphore>,
) {
info!(
"Starting event stream with batching for target: {}",
target.name()
);
info!("Starting event stream with batching for target: {}", target.name());
// Configuration parameters
const DEFAULT_BATCH_SIZE: usize = 1;
@@ -160,106 +147,64 @@ pub async fn stream_events_with_batching(
let mut last_flush = Instant::now();
loop {
// 检查取消信号
// Check the cancel signal
if cancel_rx.try_recv().is_ok() {
info!("Cancellation received for target: {}", target.name());
return;
}
// 获取存储中的事件列表
// Get a list of events in storage
let keys = store.list();
debug!(
"Found {} keys in store for target: {}",
keys.len(),
target.name()
);
debug!("Found {} keys in store for target: {}", keys.len(), target.name());
if keys.is_empty() {
// 如果批处理中有数据且超时,则刷新批处理
// If there is data in the batch and timeout, refresh the batch
if !batch.is_empty() && last_flush.elapsed() >= BATCH_TIMEOUT {
process_batch(
&mut batch,
&mut batch_keys,
target,
MAX_RETRIES,
BASE_RETRY_DELAY,
&metrics,
&semaphore,
)
.await;
process_batch(&mut batch, &mut batch_keys, target, MAX_RETRIES, BASE_RETRY_DELAY, &metrics, &semaphore).await;
last_flush = Instant::now();
}
// 无事件,等待后再检查
// No event, wait before checking
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
// 处理每个事件
// Handle each event
for key in keys {
// 再次检查取消信号
// Check the cancel signal again
if cancel_rx.try_recv().is_ok() {
info!(
"Cancellation received during processing for target: {}",
target.name()
);
info!("Cancellation received during processing for target: {}", target.name());
// 在退出前处理已收集的批次
// Processing collected batches before exiting
if !batch.is_empty() {
process_batch(
&mut batch,
&mut batch_keys,
target,
MAX_RETRIES,
BASE_RETRY_DELAY,
&metrics,
&semaphore,
)
.await;
process_batch(&mut batch, &mut batch_keys, target, MAX_RETRIES, BASE_RETRY_DELAY, &metrics, &semaphore).await;
}
return;
}
// 尝试从存储中获取事件
// Try to get events from storage
match store.get(&key) {
Ok(event) => {
// 添加到批处理
// Add to batch
batch.push(event);
batch_keys.push(key);
metrics.increment_processing();
// 如果批次已满或距离上次刷新已经过了足够时间,则处理批次
// If the batch is full or enough time has passed since the last refresh, the batch will be processed
if batch.len() >= batch_size || last_flush.elapsed() >= BATCH_TIMEOUT {
process_batch(
&mut batch,
&mut batch_keys,
target,
MAX_RETRIES,
BASE_RETRY_DELAY,
&metrics,
&semaphore,
)
.await;
process_batch(&mut batch, &mut batch_keys, target, MAX_RETRIES, BASE_RETRY_DELAY, &metrics, &semaphore)
.await;
last_flush = Instant::now();
}
}
Err(e) => {
error!(
"Failed to target: {}, get event {} from store: {}",
target.name(),
key.to_string(),
e
);
// 可以考虑删除无法读取的事件,防止无限循环尝试读取
error!("Failed to target: {}, get event {} from store: {}", target.name(), key.to_string(), e);
// Consider deleting unreadable events to prevent infinite loops from trying to read
match store.del(&key) {
Ok(_) => {
info!("Deleted corrupted event {} from store", key.to_string());
}
Err(del_err) => {
error!(
"Failed to delete corrupted event {}: {}",
key.to_string(),
del_err
);
error!("Failed to delete corrupted event {}: {}", key.to_string(), del_err);
}
}
@@ -268,12 +213,12 @@ pub async fn stream_events_with_batching(
}
}
// 小延迟再进行下一轮检查
// A small delay will be conducted to check the next round
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
/// 处理事件批次
/// Processing event batches
async fn process_batch(
batch: &mut Vec<Event>,
batch_keys: &mut Vec<Key>,
@@ -283,16 +228,12 @@ async fn process_batch(
metrics: &Arc<NotificationMetrics>,
semaphore: &Arc<Semaphore>,
) {
debug!(
"Processing batch of {} events for target: {}",
batch.len(),
target.name()
);
debug!("Processing batch of {} events for target: {}", batch.len(), target.name());
if batch.is_empty() {
return;
}
// 获取信号量许可,限制并发
// Obtain semaphore permission to limit concurrency
let permit = match semaphore.clone().acquire_owned().await {
Ok(permit) => permit,
Err(e) => {
@@ -301,30 +242,26 @@ async fn process_batch(
}
};
// 处理批次中的每个事件
// Handle every event in the batch
for (_event, key) in batch.iter().zip(batch_keys.iter()) {
let mut retry_count = 0;
let mut success = false;
// 重试逻辑
// Retry logic
while retry_count < max_retries && !success {
match target.send_from_store(key.clone()).await {
Ok(_) => {
info!(
"Successfully sent event for target: {}, Key: {}",
target.name(),
key.to_string()
);
info!("Successfully sent event for target: {}, Key: {}", target.name(), key.to_string());
success = true;
metrics.increment_processed();
}
Err(e) => {
// 根据错误类型采用不同的重试策略
// Different retry strategies are adopted according to the error type
match &e {
TargetError::NotConnected => {
warn!("Target {} not connected, retrying...", target.name());
retry_count += 1;
tokio::time::sleep(base_delay * (1 << retry_count)).await; // 指数退避
tokio::time::sleep(base_delay * (1 << retry_count)).await; // Exponential backoff
}
TargetError::Timeout(_) => {
warn!("Timeout for target {}, retrying...", target.name());
@@ -332,7 +269,7 @@ async fn process_batch(
tokio::time::sleep(base_delay * (1 << retry_count)).await;
}
_ => {
// 永久性错误,跳过此事件
// Permanent error, skip this event
error!("Permanent error for target {}: {}", target.name(), e);
metrics.increment_failed();
break;
@@ -342,21 +279,17 @@ async fn process_batch(
}
}
// 处理最大重试次数耗尽的情况
// Handle the situation where the maximum number of retry exhaustion is exhausted
if retry_count >= max_retries && !success {
warn!(
"Max retries exceeded for event {}, target: {}, skipping",
key.to_string(),
target.name()
);
warn!("Max retries exceeded for event {}, target: {}, skipping", key.to_string(), target.name());
metrics.increment_failed();
}
}
// 清空已处理的批次
// Clear processed batches
batch.clear();
batch_keys.clear();
// 释放信号量许可(通过 drop
// Release semaphore permission (via drop)
drop(permit);
}

View File

@@ -1,22 +1,22 @@
#[allow(dead_code)]
const NOTIFY_KAFKA_SUB_SYS: &str = "notify_kafka";
pub const NOTIFY_KAFKA_SUB_SYS: &str = "notify_kafka";
#[allow(dead_code)]
const NOTIFY_MQTT_SUB_SYS: &str = "notify_mqtt";
pub const NOTIFY_MQTT_SUB_SYS: &str = "notify_mqtt";
#[allow(dead_code)]
const NOTIFY_MY_SQL_SUB_SYS: &str = "notify_mysql";
pub const NOTIFY_MY_SQL_SUB_SYS: &str = "notify_mysql";
#[allow(dead_code)]
const NOTIFY_NATS_SUB_SYS: &str = "notify_nats";
pub const NOTIFY_NATS_SUB_SYS: &str = "notify_nats";
#[allow(dead_code)]
const NOTIFY_NSQ_SUB_SYS: &str = "notify_nsq";
pub const NOTIFY_NSQ_SUB_SYS: &str = "notify_nsq";
#[allow(dead_code)]
const NOTIFY_ES_SUB_SYS: &str = "notify_elasticsearch";
pub const NOTIFY_ES_SUB_SYS: &str = "notify_elasticsearch";
#[allow(dead_code)]
const NOTIFY_AMQP_SUB_SYS: &str = "notify_amqp";
pub const NOTIFY_AMQP_SUB_SYS: &str = "notify_amqp";
#[allow(dead_code)]
const NOTIFY_POSTGRES_SUB_SYS: &str = "notify_postgres";
pub const NOTIFY_POSTGRES_SUB_SYS: &str = "notify_postgres";
#[allow(dead_code)]
const NOTIFY_REDIS_SUB_SYS: &str = "notify_redis";
const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook";
pub const NOTIFY_REDIS_SUB_SYS: &str = "notify_redis";
pub const NOTIFY_WEBHOOK_SUB_SYS: &str = "notify_webhook";
// Webhook constants
pub const WEBHOOK_ENDPOINT: &str = "endpoint";

View File

@@ -1,17 +1,14 @@
use std::env;
use std::fmt;
#[cfg(unix)]
use libc::uname;
#[cfg(unix)]
use std::ffi::CStr;
use std::os::unix::process::ExitStatusExt;
#[cfg(windows)]
use std::process::Command;
use std::os::windows::process::ExitStatusExt;
use std::{env, process};
// 定义 Rustfs 版本
// Define Rustfs version
const RUSTFS_VERSION: &str = "1.0.0";
// 业务类型枚举
// Business Type Enumeration
#[derive(Debug, Clone, PartialEq)]
pub enum ServiceType {
Basis,
@@ -33,7 +30,7 @@ impl ServiceType {
}
}
// UserAgent 结构体
// UserAgent structure
struct UserAgent {
os_platform: String,
arch: String,
@@ -42,7 +39,7 @@ struct UserAgent {
}
impl UserAgent {
// 创建新的 UserAgent 实例,接受业务类型参数
// Create a new UserAgent instance and accept business type parameters
fn new(service: ServiceType) -> Self {
let os_platform = Self::get_os_platform();
let arch = env::consts::ARCH.to_string();
@@ -56,7 +53,7 @@ impl UserAgent {
}
}
// 获取操作系统平台信息
// Obtain operating system platform information
fn get_os_platform() -> String {
if cfg!(target_os = "windows") {
Self::get_windows_platform()
@@ -69,14 +66,18 @@ impl UserAgent {
}
}
// 获取 Windows 平台信息
// Get Windows platform information
#[cfg(windows)]
fn get_windows_platform() -> String {
// 使用 cmd /c ver 获取版本
let output = Command::new("cmd")
// Use cmd /c ver to get the version
let output = process::Command::new("cmd")
.args(&["/C", "ver"])
.output()
.unwrap_or_default();
.unwrap_or_else(|_| process::Output {
status: process::ExitStatus::from_raw(0),
stdout: Vec::new(),
stderr: Vec::new(),
});
let version = String::from_utf8_lossy(&output.stdout);
let version = version
.lines()
@@ -92,27 +93,29 @@ impl UserAgent {
"N/A".to_string()
}
// 获取 macOS 平台信息
// Get macOS platform information
#[cfg(target_os = "macos")]
fn get_macos_platform() -> String {
unsafe {
let mut name = std::mem::zeroed();
if uname(&mut name) == 0 {
let release = CStr::from_ptr(name.release.as_ptr()).to_string_lossy();
// 映射内核版本(如 23.5.0)到 User-Agent 格式(如 14_5_0
let major = release
.split('.')
.next()
.unwrap_or("14")
.parse::<i32>()
.unwrap_or(14);
let minor = if major >= 20 { major - 9 } else { 14 };
let patch = release.split('.').nth(1).unwrap_or("0");
format!("Macintosh; Intel Mac OS X {}_{}_{}", minor, patch, 0)
} else {
"Macintosh; Intel Mac OS X 14_5_0".to_string()
}
}
let output = process::Command::new("sw_vers")
.args(&["-productVersion"])
.output()
.unwrap_or_else(|_| process::Output {
status: process::ExitStatus::from_raw(0),
stdout: Vec::new(),
stderr: Vec::new(),
});
let version = String::from_utf8_lossy(&output.stdout).trim().to_string();
let parts: Vec<&str> = version.split('.').collect();
let major = parts.get(0).unwrap_or(&"10").parse::<i32>().unwrap_or(10);
let minor = parts.get(1).map_or("15", |&m| m);
let patch = parts.get(2).map_or("0", |&p| p);
// Detect whether it is an Apple Silicon chip
let arch = env::consts::ARCH;
let cpu_info = if arch == "aarch64" { "Apple" } else { "Intel" };
// Convert to User-Agent format
format!("Macintosh; {} Mac OS X {}_{}_{}", cpu_info, major, minor, patch)
}
#[cfg(not(target_os = "macos"))]
@@ -120,17 +123,22 @@ impl UserAgent {
"N/A".to_string()
}
// 获取 Linux 平台信息
// Get Linux platform information
#[cfg(target_os = "linux")]
fn get_linux_platform() -> String {
unsafe {
let mut name = std::mem::zeroed();
if uname(&mut name) == 0 {
let release = CStr::from_ptr(name.release.as_ptr()).to_string_lossy();
format!("X11; Linux {}", release)
} else {
"X11; Linux Unknown".to_string()
}
let output = process::Command::new("uname")
.arg("-r")
.output()
.unwrap_or_else(|_| process::Output {
status: process::ExitStatus::from_raw(0),
stdout: Vec::new(),
stderr: Vec::new(),
});
if output.status.success() {
let release = String::from_utf8_lossy(&output.stdout).trim().to_string();
format!("X11; Linux {}", release)
} else {
"X11; Linux Unknown".to_string()
}
}
@@ -140,15 +148,11 @@ impl UserAgent {
}
}
// 实现 Display trait 以格式化 User-Agent
// Implement Display trait to format User-Agent
impl fmt::Display for UserAgent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.service == ServiceType::Basis {
return write!(
f,
"Mozilla/5.0 ({}; {}) Rustfs/{}",
self.os_platform, self.arch, self.version
);
return write!(f, "Mozilla/5.0 ({}; {}) Rustfs/{}", self.os_platform, self.arch, self.version);
}
write!(
f,
@@ -161,7 +165,7 @@ impl fmt::Display for UserAgent {
}
}
// 获取 User-Agent 字符串,接受业务类型参数
// Get the User-Agent string and accept business type parameters
pub fn get_user_agent(service: ServiceType) -> String {
UserAgent::new(service).to_string()
}

60
crates/utils/src/dirs.rs Normal file
View File

@@ -0,0 +1,60 @@
use std::env;
use std::path::{Path, PathBuf};
/// Get the absolute path to the current project
///
/// This function will try the following method to get the project path:
/// 1. Use the `CARGO_MANIFEST_DIR` environment variable to get the project root directory.
/// 2. Use `std::env::current_exe()` to get the executable file path and deduce the project root directory.
/// 3. Use `std::env::current_dir()` to get the current working directory and try to deduce the project root directory.
///
/// If all methods fail, an error is returned.
///
/// # Returns
/// - `Ok(PathBuf)`: The absolute path of the project that was successfully obtained.
/// - `Err(String)`: Error message for the failed path.
pub fn get_project_root() -> Result<PathBuf, String> {
// Try to get the project root directory through the CARGO_MANIFEST_DIR environment variable
if let Ok(manifest_dir) = env::var("CARGO_MANIFEST_DIR") {
let project_root = Path::new(&manifest_dir).to_path_buf();
println!("Get the project root directory with CARGO_MANIFEST_DIR:{}", project_root.display());
return Ok(project_root);
}
// Try to deduce the project root directory through the current executable file path
if let Ok(current_exe) = env::current_exe() {
let mut project_root = current_exe;
// Assume that the project root directory is in the parent directory of the parent directory of the executable path (usually target/debug or target/release)
project_root.pop(); // Remove the executable file name
project_root.pop(); // Remove target/debug or target/release
println!("Deduce the project root directory through current_exe:{}", project_root.display());
return Ok(project_root);
}
// Try to deduce the project root directory from the current working directory
if let Ok(mut current_dir) = env::current_dir() {
// Assume that the project root directory is in the parent directory of the current working directory
current_dir.pop();
println!("Deduce the project root directory through current_dir:{}", current_dir.display());
return Ok(current_dir);
}
// If all methods fail, return an error
Err("The project root directory cannot be obtained. Please check the running environment and project structure.".to_string())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_project_root() {
match get_project_root() {
Ok(path) => {
assert!(path.exists(), "The project root directory does not exist:{}", path.display());
println!("The test is passed, the project root directory:{}", path.display());
}
Err(e) => panic!("Failed to get the project root directory:{}", e),
}
}
}

View File

@@ -28,6 +28,9 @@ pub mod crypto;
#[cfg(feature = "compress")]
pub mod compress;
#[cfg(feature = "path")]
pub mod dirs;
#[cfg(feature = "tls")]
pub use certs::*;
#[cfg(feature = "hash")]

View File

@@ -8,7 +8,6 @@ use rustfs_utils::path::SLASH_SEPARATOR;
use std::collections::HashSet;
use std::sync::Arc;
use tracing::{error, warn};
use crate::disk::fs::SLASH_SEPARATOR;
pub const CONFIG_PREFIX: &str = "config";
const CONFIG_FILE: &str = "config.json";

View File

@@ -1,9 +1,8 @@
use core::slice::SlicePattern;
use crate::bitrot::{create_bitrot_reader, create_bitrot_writer};
use crate::disk::error_reduce::{reduce_read_quorum_errs, reduce_write_quorum_errs, OBJECT_OP_IGNORED_ERRS};
use crate::disk::{
self, conv_part_err_to_int, has_part_err, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT, CHECK_PART_FILE_NOT_FOUND,
CHECK_PART_SUCCESS,
self, conv_part_err_to_int, has_part_err, CHECK_PART_DISK_NOT_FOUND, CHECK_PART_FILE_CORRUPT,
CHECK_PART_FILE_NOT_FOUND, CHECK_PART_SUCCESS,
};
use crate::erasure_coding;
use crate::erasure_coding::bitrot_verify;
@@ -17,8 +16,8 @@ use crate::{
config::{storageclass, GLOBAL_StorageClass},
disk::{
endpoint::Endpoint, error::DiskError, format::FormatV3, new_disk, CheckPartsResp, DeleteOptions, DiskAPI, DiskInfo,
DiskInfoOptions, DiskOption, DiskStore, FileInfoVersions, ReadMultipleReq, ReadMultipleResp, ReadOptions,
UpdateMetadataOpts, RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET,
DiskInfoOptions, DiskOption, DiskStore, FileInfoVersions, ReadMultipleReq, ReadMultipleResp,
ReadOptions, UpdateMetadataOpts, RUSTFS_META_BUCKET, RUSTFS_META_MULTIPART_BUCKET, RUSTFS_META_TMP_BUCKET,
},
error::{to_object_err, StorageError},
global::{
@@ -58,10 +57,10 @@ use md5::{Digest as Md5Digest, Md5};
use rand::{seq::SliceRandom, Rng};
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::{
file_info_from_raw,
headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS},
merge_file_meta_versions, FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, MetaCacheEntry,
MetadataResolutionParams, ObjectPartInfo, RawFileInfo,
file_info_from_raw, headers::{AMZ_OBJECT_TAGGING, AMZ_STORAGE_CLASS}, merge_file_meta_versions, FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries,
MetaCacheEntry, MetadataResolutionParams,
ObjectPartInfo,
RawFileInfo,
};
use rustfs_rio::{EtagResolvable, HashReader, TryGetIndex as _, WarpReader};
use rustfs_utils::{
@@ -80,7 +79,6 @@ use std::{
sync::Arc,
time::Duration,
};
use sha2::digest::HashReader;
use time::OffsetDateTime;
use tokio::{
io::AsyncWrite,
@@ -95,7 +93,6 @@ use tracing::error;
use tracing::{debug, info, warn};
use uuid::Uuid;
use workers::workers::Workers;
use crate::disk::fs::SLASH_SEPARATOR;
pub const DEFAULT_READ_BUFFER_SIZE: usize = 1024 * 1024;
@@ -404,11 +401,7 @@ impl SetDisks {
}
}
if max >= write_quorum {
data_dir
} else {
None
}
if max >= write_quorum { data_dir } else { None }
}
#[allow(dead_code)]
@@ -739,11 +732,7 @@ impl SetDisks {
fn common_time(times: &[Option<OffsetDateTime>], quorum: usize) -> Option<OffsetDateTime> {
let (time, count) = Self::common_time_and_occurrence(times);
if count >= quorum {
time
} else {
None
}
if count >= quorum { time } else { None }
}
fn common_time_and_occurrence(times: &[Option<OffsetDateTime>]) -> (Option<OffsetDateTime>, usize) {
@@ -784,11 +773,7 @@ impl SetDisks {
fn common_etag(etags: &[Option<String>], quorum: usize) -> Option<String> {
let (etag, count) = Self::common_etags(etags);
if count >= quorum {
etag
} else {
None
}
if count >= quorum { etag } else { None }
}
fn common_etags(etags: &[Option<String>]) -> (Option<String>, usize) {
@@ -4069,7 +4054,7 @@ impl StorageAPI for SetDisks {
async fn local_storage_info(&self) -> madmin::StorageInfo {
let disks = self.get_disks_internal().await;
let mut local_disks: Vec<Option<Arc<crate::disk::Disk>>> = Vec::new();
let mut local_disks: Vec<Option<Arc<disk::Disk>>> = Vec::new();
let mut local_endpoints = Vec::new();
for (i, ep) in self.set_endpoints.iter().enumerate() {

View File

@@ -18,7 +18,6 @@ use std::fmt::Debug;
use std::io::Cursor;
use std::str::FromStr as _;
use std::sync::Arc;
use sha2::digest::HashReader;
use time::OffsetDateTime;
use tokio::io::{AsyncRead, AsyncReadExt};
use tracing::warn;
@@ -904,7 +903,7 @@ pub trait StorageAPI: ObjectIO {
opts: &HealOpts,
) -> Result<(HealResultItem, Option<Error>)>;
async fn heal_objects(&self, bucket: &str, prefix: &str, opts: &HealOpts, hs: Arc<HealSequence>, is_meta: bool)
-> Result<()>;
-> Result<()>;
async fn get_pool_and_set(&self, id: &str) -> Result<(Option<usize>, Option<usize>, Option<usize>)>;
async fn check_abandoned_parts(&self, bucket: &str, object: &str, opts: &HealOpts) -> Result<()>;
}

View File

@@ -25,7 +25,6 @@ use tokio::sync::broadcast::{self, Receiver as B_Receiver};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{error, warn};
use uuid::Uuid;
use crate::disk::fs::SLASH_SEPARATOR;
const MAX_OBJECT_LIST: i32 = 1000;
// const MAX_DELETE_LIST: i32 = 1000;
@@ -838,11 +837,7 @@ impl ECStore {
if fiter(&fi) {
let item = ObjectInfoOrErr {
item: Some(ObjectInfo::from_file_info(&fi, &bucket, &fi.name, {
if let Some(v) = &vcf {
v.versioned(&fi.name)
} else {
false
}
if let Some(v) = &vcf { v.versioned(&fi.name) } else { false }
})),
err: None,
};
@@ -854,11 +849,7 @@ impl ECStore {
} else {
let item = ObjectInfoOrErr {
item: Some(ObjectInfo::from_file_info(&fi, &bucket, &fi.name, {
if let Some(v) = &vcf {
v.versioned(&fi.name)
} else {
false
}
if let Some(v) = &vcf { v.versioned(&fi.name) } else { false }
})),
err: None,
};
@@ -894,11 +885,7 @@ impl ECStore {
if fiter(fi) {
let item = ObjectInfoOrErr {
item: Some(ObjectInfo::from_file_info(fi, &bucket, &fi.name, {
if let Some(v) = &vcf {
v.versioned(&fi.name)
} else {
false
}
if let Some(v) = &vcf { v.versioned(&fi.name) } else { false }
})),
err: None,
};
@@ -910,11 +897,7 @@ impl ECStore {
} else {
let item = ObjectInfoOrErr {
item: Some(ObjectInfo::from_file_info(fi, &bucket, &fi.name, {
if let Some(v) = &vcf {
v.versioned(&fi.name)
} else {
false
}
if let Some(v) = &vcf { v.versioned(&fi.name) } else { false }
})),
err: None,
};