feat: implement event notification system

- Add core event notification interfaces
- Support multiple notification backends:
  - Webhook (default)
  - Kafka
  - MQTT
  - HTTP Producer
- Implement configurable event filtering
- Add async event dispatching with backpressure handling
- Provide serialization/deserialization for event payloads

This module enables system events to be published to various endpoints
with consistent delivery guarantees and failure handling.
This commit is contained in:
houseme
2025-04-21 00:17:27 +08:00
parent 9b6170e94f
commit bb6324b4e5
28 changed files with 2015 additions and 806 deletions

397
Cargo.lock generated
View File

@@ -371,7 +371,7 @@ dependencies = [
"arrow-buffer",
"arrow-data",
"arrow-schema",
"flatbuffers",
"flatbuffers 24.12.23",
"lz4_flex",
]
@@ -664,6 +664,15 @@ dependencies = [
"num-traits",
]
[[package]]
name = "atomic"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994"
dependencies = [
"bytemuck",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
@@ -800,11 +809,11 @@ dependencies = [
"hyper",
"hyper-util",
"pin-project-lite",
"rustls",
"rustls 0.23.26",
"rustls-pemfile",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tokio-rustls 0.26.2",
"tower-service",
]
@@ -1017,6 +1026,12 @@ version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
[[package]]
name = "bytemuck"
version = "1.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6b1fc10dbac614ebc03540c9dbd60e83887fda27794998c6528f1782047d540"
[[package]]
name = "byteorder"
version = "1.5.0"
@@ -1088,6 +1103,38 @@ dependencies = [
"system-deps",
]
[[package]]
name = "camino"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3"
dependencies = [
"serde",
]
[[package]]
name = "cargo-platform"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e35af189006b9c0f00a064685c727031e3ed2d8020f7ba284d78cc2671bd36ea"
dependencies = [
"serde",
]
[[package]]
name = "cargo_metadata"
version = "0.19.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd5eb614ed4c27c5d706420e4320fbe3216ab31fa1c33cd8246ac36dae4479ba"
dependencies = [
"camino",
"cargo-platform",
"semver",
"serde",
"serde_json",
"thiserror 2.0.12",
]
[[package]]
name = "cc"
version = "1.2.19"
@@ -1111,7 +1158,7 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
"nom 7.1.3",
]
[[package]]
@@ -2998,6 +3045,12 @@ dependencies = [
"const-random",
]
[[package]]
name = "dotenv"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
[[package]]
name = "dpi"
version = "0.1.1"
@@ -3031,7 +3084,7 @@ version = "0.0.1"
dependencies = [
"common",
"ecstore",
"flatbuffers",
"flatbuffers 25.2.10",
"futures",
"lazy_static",
"lock",
@@ -3060,7 +3113,7 @@ dependencies = [
"chrono",
"common",
"crc32fast",
"flatbuffers",
"flatbuffers 25.2.10",
"futures",
"glob",
"hex-simd",
@@ -3071,7 +3124,7 @@ dependencies = [
"madmin",
"md-5",
"netif",
"nix 0.29.0",
"nix",
"num",
"num_cpus",
"path-absolutize",
@@ -3246,6 +3299,21 @@ dependencies = [
"rustc_version",
]
[[package]]
name = "figment"
version = "0.10.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cb01cd46b0cf372153850f4c6c272d9cbea2da513e07538405148f95bd789f3"
dependencies = [
"atomic",
"pear",
"serde",
"serde_yaml",
"toml",
"uncased",
"version_check",
]
[[package]]
name = "fixedbitset"
version = "0.5.7"
@@ -3262,6 +3330,16 @@ dependencies = [
"rustc_version",
]
[[package]]
name = "flatbuffers"
version = "25.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1045398c1bfd89168b5fd3f1fc11f6e70b34f6f66300c87d44d3de849463abf1"
dependencies = [
"bitflags 2.9.0",
"rustc_version",
]
[[package]]
name = "flate2"
version = "1.1.1"
@@ -3272,6 +3350,17 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "flume"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095"
dependencies = [
"futures-core",
"futures-sink",
"spin",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -4087,10 +4176,10 @@ dependencies = [
"http",
"hyper",
"hyper-util",
"rustls",
"rustls 0.23.26",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tokio-rustls 0.26.2",
"tower-service",
"webpki-roots",
]
@@ -4334,6 +4423,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
dependencies = [
"autocfg",
"hashbrown 0.12.3",
"serde",
]
[[package]]
@@ -4356,6 +4446,12 @@ dependencies = [
"cfb",
]
[[package]]
name = "inlinable_string"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb"
[[package]]
name = "inout"
version = "0.1.4"
@@ -4737,19 +4833,19 @@ dependencies = [
[[package]]
name = "libsystemd"
version = "0.7.0"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c592dc396b464005f78a5853555b9f240bc5378bf5221acc4e129910b2678869"
checksum = "b85fe9dc49de659d05829fdf72b5770c0a5952d1055c34a39f6d4e932bce175d"
dependencies = [
"hmac 0.12.1",
"libc",
"log",
"nix 0.27.1",
"nom",
"nix",
"nom 8.0.0",
"once_cell",
"serde",
"sha2 0.10.8",
"thiserror 1.0.69",
"thiserror 2.0.12",
"uuid",
]
@@ -5169,18 +5265,6 @@ version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086"
[[package]]
name = "nix"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
dependencies = [
"bitflags 2.9.0",
"cfg-if",
"libc",
"memoffset",
]
[[package]]
name = "nix"
version = "0.29.0"
@@ -5210,6 +5294,15 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nom"
version = "8.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405"
dependencies = [
"memchr",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@@ -5606,6 +5699,12 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "openssl-probe"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "opentelemetry"
version = "0.29.1"
@@ -5959,6 +6058,29 @@ dependencies = [
"hmac 0.12.1",
]
[[package]]
name = "pear"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdeeaa00ce488657faba8ebf44ab9361f9365a97bd39ffb8a60663f57ff4b467"
dependencies = [
"inlinable_string",
"pear_codegen",
"yansi",
]
[[package]]
name = "pear_codegen"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bab5b985dc082b345f812b7df84e1bef27e7207b39e448439ba8bd69c93f147"
dependencies = [
"proc-macro2",
"proc-macro2-diagnostics",
"quote",
"syn 2.0.100",
]
[[package]]
name = "pem"
version = "3.0.5"
@@ -6437,6 +6559,7 @@ dependencies = [
"quote",
"syn 2.0.100",
"version_check",
"yansi",
]
[[package]]
@@ -6531,7 +6654,7 @@ name = "protos"
version = "0.0.1"
dependencies = [
"common",
"flatbuffers",
"flatbuffers 25.2.10",
"prost",
"prost-build",
"protobuf",
@@ -6589,7 +6712,7 @@ dependencies = [
"quinn-proto",
"quinn-udp",
"rustc-hash 2.1.1",
"rustls",
"rustls 0.23.26",
"socket2",
"thiserror 2.0.12",
"tokio",
@@ -6608,7 +6731,7 @@ dependencies = [
"rand 0.9.0",
"ring",
"rustc-hash 2.1.1",
"rustls",
"rustls 0.23.26",
"rustls-pki-types",
"slab",
"thiserror 2.0.12",
@@ -6946,7 +7069,7 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"quinn",
"rustls",
"rustls 0.23.26",
"rustls-pemfile",
"rustls-pki-types",
"serde",
@@ -6955,7 +7078,7 @@ dependencies = [
"sync_wrapper",
"system-configuration",
"tokio",
"tokio-rustls",
"tokio-rustls 0.26.2",
"tokio-util",
"tower 0.5.2",
"tower-service",
@@ -7083,6 +7206,24 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rumqttc"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1568e15fab2d546f940ed3a21f48bbbd1c494c90c99c4481339364a497f94a9"
dependencies = [
"bytes",
"flume",
"futures-util",
"log",
"rustls-native-certs",
"rustls-pemfile",
"rustls-webpki 0.102.8",
"thiserror 1.0.69",
"tokio",
"tokio-rustls 0.25.0",
]
[[package]]
name = "rust-embed"
version = "8.7.0"
@@ -7176,7 +7317,7 @@ dependencies = [
"crypto",
"datafusion",
"ecstore",
"flatbuffers",
"flatbuffers 25.2.10",
"futures",
"futures-util",
"http",
@@ -7202,7 +7343,7 @@ dependencies = [
"rust-embed",
"rustfs-event-notifier",
"rustfs-obs",
"rustls",
"rustls 0.23.26",
"rustls-pemfile",
"rustls-pki-types",
"s3s",
@@ -7213,7 +7354,7 @@ dependencies = [
"tikv-jemallocator",
"time",
"tokio",
"tokio-rustls",
"tokio-rustls 0.26.2",
"tokio-stream",
"tokio-util",
"tonic 0.13.0",
@@ -7230,12 +7371,24 @@ name = "rustfs-event-notifier"
version = "0.0.1"
dependencies = [
"async-trait",
"parking_lot 0.12.3",
"axum",
"chrono",
"dotenv",
"figment",
"rdkafka",
"reqwest",
"rumqttc",
"serde",
"serde_json",
"serde_with",
"smallvec",
"strum",
"thiserror 2.0.12",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
"uuid",
"wildmatch",
]
@@ -7314,6 +7467,20 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "rustls"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432"
dependencies = [
"log",
"ring",
"rustls-pki-types",
"rustls-webpki 0.102.8",
"subtle",
"zeroize",
]
[[package]]
name = "rustls"
version = "0.23.26"
@@ -7325,11 +7492,24 @@ dependencies = [
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki",
"rustls-webpki 0.103.1",
"subtle",
"zeroize",
]
[[package]]
name = "rustls-native-certs"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5"
dependencies = [
"openssl-probe",
"rustls-pemfile",
"rustls-pki-types",
"schannel",
"security-framework 2.11.1",
]
[[package]]
name = "rustls-pemfile"
version = "2.2.0"
@@ -7348,6 +7528,17 @@ dependencies = [
"web-time",
]
[[package]]
name = "rustls-webpki"
version = "0.102.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9"
dependencies = [
"ring",
"rustls-pki-types",
"untrusted",
]
[[package]]
name = "rustls-webpki"
version = "0.103.1"
@@ -7400,7 +7591,7 @@ dependencies = [
"md-5",
"memchr",
"mime",
"nom",
"nom 7.1.3",
"nugine-rust-utils",
"numeric_cast",
"pin-project-lite",
@@ -7442,6 +7633,15 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "schannel"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@@ -7509,6 +7709,9 @@ name = "semver"
version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0"
dependencies = [
"serde",
]
[[package]]
name = "send_wrapper"
@@ -7621,6 +7824,49 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_with"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa"
dependencies = [
"base64 0.22.1",
"chrono",
"hex",
"indexmap 1.9.3",
"indexmap 2.9.0",
"serde",
"serde_derive",
"serde_json",
"serde_with_macros",
"time",
]
[[package]]
name = "serde_with_macros"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e"
dependencies = [
"darling",
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "serde_yaml"
version = "0.9.34+deprecated"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47"
dependencies = [
"indexmap 2.9.0",
"itoa 1.0.15",
"ryu",
"serde",
"unsafe-libyaml",
]
[[package]]
name = "server_fn"
version = "0.6.15"
@@ -7730,13 +7976,16 @@ dependencies = [
[[package]]
name = "shadow-rs"
version = "0.38.1"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ec14cc798c29f4bf74a6c4299c657c04d4e9fba03875c1f0eec569af03aed89"
checksum = "6d5625ed609cf66d7e505e7d487aca815626dc4ebb6c0dd07637ca61a44651a6"
dependencies = [
"cargo_metadata",
"const_format",
"is_debug",
"serde_json",
"time",
"tzdb",
]
[[package]]
@@ -7881,6 +8130,9 @@ name = "smallvec"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9"
dependencies = [
"serde",
]
[[package]]
name = "snafu"
@@ -7951,6 +8203,9 @@ name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
@@ -8452,13 +8707,24 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "tokio-rustls"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
dependencies = [
"rustls 0.22.4",
"rustls-pki-types",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
dependencies = [
"rustls",
"rustls 0.23.26",
"tokio",
]
@@ -8888,6 +9154,32 @@ version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f"
[[package]]
name = "tz-rs"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1450bf2b99397e72070e7935c89facaa80092ac812502200375f1f7d33c71a1"
[[package]]
name = "tzdb"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0be2ea5956f295449f47c0b825c5e109022ff1a6a53bb4f77682a87c2341fbf5"
dependencies = [
"iana-time-zone",
"tz-rs",
"tzdb_data",
]
[[package]]
name = "tzdb_data"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c4c81d75033770e40fbd3643ce7472a1a9fd301f90b7139038228daf8af03ec"
dependencies = [
"tz-rs",
]
[[package]]
name = "ucd-trie"
version = "0.1.7"
@@ -8905,6 +9197,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "uncased"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697"
dependencies = [
"version_check",
]
[[package]]
name = "unicase"
version = "2.8.1"
@@ -8945,6 +9246,12 @@ dependencies = [
"subtle",
]
[[package]]
name = "unsafe-libyaml"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
[[package]]
name = "untrusted"
version = "0.9.0"
@@ -9944,6 +10251,12 @@ dependencies = [
"hashlink",
]
[[package]]
name = "yansi"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
[[package]]
name = "yoke"
version = "0.7.5"
@@ -9984,7 +10297,7 @@ dependencies = [
"futures-sink",
"futures-util",
"hex",
"nix 0.29.0",
"nix",
"ordered-stream",
"rand 0.8.5",
"serde",
@@ -10015,7 +10328,7 @@ dependencies = [
"futures-core",
"futures-lite",
"hex",
"nix 0.29.0",
"nix",
"ordered-stream",
"serde",
"serde_repr",

View File

@@ -51,8 +51,10 @@ datafusion = "46.0.0"
derive_builder = "0.20.2"
dioxus = { version = "0.6.3", features = ["router"] }
dirs = "6.0.0"
dotenv = "0.15.0"
ecstore = { path = "./ecstore" }
flatbuffers = "24.12.23"
figment = { version = "0.10.19", features = ["toml", "yaml", "env"] }
flatbuffers = "25.2.10"
futures = "0.3.31"
futures-util = "0.3.31"
common = { path = "./common/common" }
@@ -71,7 +73,7 @@ jsonwebtoken = "9.3.1"
keyring = { version = "3.6.2", features = ["apple-native", "windows-native", "sync-secret-service"] }
lock = { path = "./common/lock" }
lazy_static = "1.5.0"
libsystemd = { version = "0.7" }
libsystemd = { version = "0.7.1" }
local-ip-address = "0.6.3"
matchit = "0.8.4"
md-5 = "0.10.6"
@@ -79,13 +81,13 @@ mime = "0.3.17"
netif = "0.1.6"
opentelemetry = { version = "0.29.1" }
opentelemetry-appender-tracing = { version = "0.29.1", features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
opentelemetry_sdk = { version = "0.29" }
opentelemetry_sdk = { version = "0.29.0" }
opentelemetry-stdout = { version = "0.29.0" }
opentelemetry-otlp = { version = "0.29" }
opentelemetry-otlp = { version = "0.29.0" }
opentelemetry-prometheus = { version = "0.29.1" }
opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] }
parking_lot = "0.12.3"
pin-project-lite = "0.2"
pin-project-lite = "0.2.16"
prometheus = "0.14.0"
# pin-utils = "0.1.0"
prost = "0.13.5"
@@ -94,11 +96,12 @@ prost-types = "0.13.5"
protobuf = "3.7"
protos = { path = "./common/protos" }
rand = "0.8.5"
rdkafka = { version = "0.37", features = ["tokio"] }
rdkafka = { version = "0.37.0", features = ["tokio"] }
reqwest = { version = "0.12.15", default-features = false, features = ["rustls-tls", "charset", "http2", "macos-system-configuration", "stream", "json", "blocking"] }
rfd = { version = "0.15.3", default-features = false, features = ["xdg-portal", "tokio"] }
rmp = "0.8.14"
rmp-serde = "1.3.0"
rumqttc = { version = "0.24" }
rustfs-obs = { path = "crates/obs", version = "0.0.1" }
rustfs-event-notifier = { path = "crates/event-notifier", version = "0.0.1" }
rust-embed = "8.7.0"
@@ -107,10 +110,13 @@ rustls-pki-types = "1.11.0"
rustls-pemfile = "2.2.0"
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" }
s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" }
shadow-rs = { version = "0.38.1", default-features = false }
shadow-rs = { version = "1.1.1", default-features = false, features = ["metadata"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
serde_urlencoded = "0.7.1"
serde_with = "3.12.0"
smallvec = { version = "1.15.0", features = ["serde"] }
strum = { version = "0.27.1", features = ["derive"] }
sha2 = "0.10.8"
snafu = "0.8.5"
tempfile = "3.19.1"
@@ -126,7 +132,7 @@ time = { version = "0.3.41", features = [
tokio = { version = "1.44.2", features = ["fs", "rt-multi-thread"] }
tonic = { version = "0.13.0", features = ["gzip"] }
tonic-build = "0.13.0"
tokio-rustls = { version = "0.26", default-features = false }
tokio-rustls = { version = "0.26.2", default-features = false }
tokio-stream = "0.1.17"
tokio-util = { version = "0.7.14", features = ["io", "compat"] }
tower = { version = "0.5.2", features = ["timeout"] }
@@ -136,7 +142,7 @@ tracing-core = "0.1.33"
tracing-error = "0.2.1"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "time"] }
tracing-appender = "0.2.3"
tracing-opentelemetry = "0.30"
tracing-opentelemetry = "0.30.0"
transform-stream = "0.3.1"
url = "2.5.4"
uuid = { version = "1.16.0", features = [
@@ -144,7 +150,6 @@ uuid = { version = "1.16.0", features = [
"fast-rng",
"macro-diagnostics",
] }
wildmatch = "2.4.0"
workers = { path = "./common/workers" }
[profile.wasm-dev]

View File

@@ -6,16 +6,36 @@ repository.workspace = true
rust-version.workspace = true
version.workspace = true
[features]
default = ["webhook"]
webhook = ["dep:reqwest"]
kafka = ["rdkafka"]
mqtt = ["rumqttc"]
http-producer = ["dep:axum"]
[dependencies]
async-trait = { workspace = true }
parking_lot = { workspace = true }
reqwest = { workspace = true }
axum = { workspace = true, optional = true }
chrono = { workspace = true, features = ["serde"] }
dotenv = { workspace = true }
figment = { workspace = true, features = ["toml", "yaml", "env"] }
rdkafka = { workspace = true, features = ["tokio"], optional = true }
reqwest = { workspace = true, optional = true }
rumqttc = { workspace = true, optional = true }
serde = { workspace = true }
tokio = { workspace = true, features = ["full"] }
thiserror = { workspace = true }
wildmatch = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
smallvec = { workspace = true, features = ["serde"] }
strum = { workspace = true, features = ["derive"] }
tracing = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["sync", "net", "macros", "signal", "rt-multi-thread"] }
tokio-util = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
[dev-dependencies]
tokio = { workspace = true, features = ["test-util"] }
tracing-subscriber = { workspace = true }
[lints]
workspace = true

View File

@@ -0,0 +1,27 @@
# basic configuration
EVENT_NOTIF_STORE_PATH=/var/log/event-notification
EVENT_NOTIF_CHANNEL_CAPACITY=5000
# webhook adapter configuration
EVENT_NOTIF_ADAPTERS__0__TYPE=Webhook
EVENT_NOTIF_ADAPTERS__0__ENDPOINT=https://api.example.com/webhook
EVENT_NOTIF_ADAPTERS__0__AUTH_TOKEN=your-secret-token
EVENT_NOTIF_ADAPTERS__0__MAX_RETRIES=3
EVENT_NOTIF_ADAPTERS__0__TIMEOUT=5000
# kafka adapter configuration
EVENT_NOTIF_ADAPTERS__1__TYPE=Kafka
EVENT_NOTIF_ADAPTERS__1__BROKERS=localhost:9092
EVENT_NOTIF_ADAPTERS__1__TOPIC=notifications
EVENT_NOTIF_ADAPTERS__1__MAX_RETRIES=3
EVENT_NOTIF_ADAPTERS__1__TIMEOUT=5000
# mqtt adapter configuration
EVENT_NOTIF_ADAPTERS__2__TYPE=Mqtt
EVENT_NOTIF_ADAPTERS__2__BROKER=mqtt.example.com
EVENT_NOTIF_ADAPTERS__2__PORT=1883
EVENT_NOTIF_ADAPTERS__2__CLIENT_ID=event-notifier
EVENT_NOTIF_ADAPTERS__2__TOPIC=events
EVENT_NOTIF_ADAPTERS__2__MAX_RETRIES=3
EVENT_NOTIF_HTTP__PORT=8080

View File

@@ -0,0 +1,28 @@
# config.toml
store_path = "/var/log/event-notification"
channel_capacity = 5000
[[adapters]]
type = "Webhook"
endpoint = "https://api.example.com/webhook"
auth_token = "your-auth-token"
max_retries = 3
timeout = 5000
[[adapters]]
type = "Kafka"
brokers = "localhost:9092"
topic = "notifications"
max_retries = 3
timeout = 5000
[[adapters]]
type = "Mqtt"
broker = "mqtt.example.com"
port = 1883
client_id = "event-notifier"
topic = "events"
max_retries = 3
[http]
port = 8080

View File

@@ -0,0 +1,102 @@
use rustfs_event_notifier::create_adapters;
use rustfs_event_notifier::NotificationSystem;
use rustfs_event_notifier::{AdapterConfig, NotificationConfig, WebhookConfig};
use rustfs_event_notifier::{Bucket, Event, Identity, Metadata, Name, Object, Source};
use std::collections::HashMap;
use std::error;
use std::sync::Arc;
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn error::Error>> {
tracing_subscriber::fmt::init();
let mut config = NotificationConfig {
store_path: "./events".to_string(),
channel_capacity: 100,
adapters: vec![AdapterConfig::Webhook(WebhookConfig {
endpoint: "http://localhost:8080/webhook".to_string(),
auth_token: Some("secret-token".to_string()),
custom_headers: Some(HashMap::from([("X-Custom".to_string(), "value".to_string())])),
max_retries: 3,
timeout: 10,
})],
http: Default::default(),
};
config.http.port = 8080;
// loading configuration from specific env files
let _config = NotificationConfig::from_env_file(".env.example")?;
// loading from a specific file
let _config = NotificationConfig::from_file("event.toml")?;
// Automatically load from multiple sources (Priority: Environment Variables > YAML > TOML)
let _config = NotificationConfig::load()?;
let system = Arc::new(tokio::sync::Mutex::new(NotificationSystem::new(config.clone()).await?));
let adapters = create_adapters(&config.adapters)?;
// create an s3 metadata object
let metadata = Metadata {
schema_version: "1.0".to_string(),
configuration_id: "test-config".to_string(),
bucket: Bucket {
name: "my-bucket".to_string(),
owner_identity: Identity {
principal_id: "owner123".to_string(),
},
arn: "arn:aws:s3:::my-bucket".to_string(),
},
object: Object {
key: "test.txt".to_string(),
size: Some(1024),
etag: Some("abc123".to_string()),
content_type: Some("text/plain".to_string()),
user_metadata: None,
version_id: None,
sequencer: "1234567890".to_string(),
},
};
// create source object
let source = Source {
host: "localhost".to_string(),
port: "80".to_string(),
user_agent: "curl/7.68.0".to_string(),
};
// create events using builder mode
let event = Event::builder()
.event_time("2023-10-01T12:00:00.000Z")
.event_name(Name::ObjectCreatedPut)
.user_identity(Identity {
principal_id: "user123".to_string(),
})
.s3(metadata)
.source(source)
.channels(vec!["webhook".to_string()])
.build()
.expect("failed to create event");
{
let system = system.lock().await;
system.send_event(event).await?;
}
let system_clone = Arc::clone(&system);
let system_handle = tokio::spawn(async move {
let mut system = system_clone.lock().await;
system.start(adapters).await
});
signal::ctrl_c().await?;
tracing::info!("Received shutdown signal");
{
let system = system.lock().await;
system.shutdown();
}
system_handle.await??;
Ok(())
}

View File

@@ -0,0 +1,76 @@
use crate::ChannelAdapter;
use crate::Error;
use crate::Event;
use crate::KafkaConfig;
use async_trait::async_trait;
use rdkafka::error::KafkaError;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::types::RDKafkaErrorCode;
use rdkafka::util::Timeout;
use std::time::Duration;
use tokio::time::sleep;
/// Kafka adapter for sending events to a Kafka topic.
pub struct KafkaAdapter {
producer: FutureProducer,
topic: String,
max_retries: u32,
}
impl KafkaAdapter {
/// Creates a new Kafka adapter.
pub fn new(config: &KafkaConfig) -> Result<Self, Error> {
// Create a Kafka producer with the provided configuration.
let producer = rdkafka::config::ClientConfig::new()
.set("bootstrap.servers", &config.brokers)
.set("message.timeout.ms", config.timeout.to_string())
.create()?;
Ok(Self {
producer,
topic: config.topic.clone(),
max_retries: config.max_retries,
})
}
/// Sends an event to the Kafka topic with retry logic.
async fn send_with_retry(&self, event: &Event) -> Result<(), Error> {
let event_id = event.id.to_string();
let payload = serde_json::to_string(&event)?;
for attempt in 0..self.max_retries {
let record = FutureRecord::to(&self.topic)
.key(&event_id)
.payload(&payload);
match self.producer.send(record, Timeout::Never).await {
Ok(_) => return Ok(()),
Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) => {
tracing::warn!(
"Kafka attempt {} failed: Queue full. Retrying...",
attempt + 1
);
sleep(Duration::from_secs(2u64.pow(attempt))).await;
}
Err((e, _)) => {
tracing::error!("Kafka send error: {}", e);
return Err(Error::Kafka(e));
}
}
}
Err(Error::Custom(
"Exceeded maximum retry attempts for Kafka message".to_string(),
))
}
}
#[async_trait]
impl ChannelAdapter for KafkaAdapter {
fn name(&self) -> String {
"kafka".to_string()
}
async fn send(&self, event: &Event) -> Result<(), Error> {
self.send_with_retry(event).await
}
}

View File

@@ -0,0 +1,54 @@
use crate::AdapterConfig;
use crate::Error;
use crate::Event;
use async_trait::async_trait;
use std::sync::Arc;
#[cfg(feature = "kafka")]
pub(crate) mod kafka;
#[cfg(feature = "mqtt")]
pub(crate) mod mqtt;
#[cfg(feature = "webhook")]
pub(crate) mod webhook;
/// The `ChannelAdapter` trait defines the interface for all channel adapters.
#[async_trait]
pub trait ChannelAdapter: Send + Sync + 'static {
/// Sends an event to the channel.
fn name(&self) -> String;
/// Sends an event to the channel.
async fn send(&self, event: &Event) -> Result<(), Error>;
}
/// Creates channel adapters based on the provided configuration.
pub fn create_adapters(configs: &[AdapterConfig]) -> Result<Vec<Arc<dyn ChannelAdapter>>, Box<Error>> {
let mut adapters: Vec<Arc<dyn ChannelAdapter>> = Vec::new();
for config in configs {
match config {
#[cfg(feature = "webhook")]
AdapterConfig::Webhook(webhook_config) => {
webhook_config.validate().map_err(|e| Box::new(Error::ConfigError(e)))?;
adapters.push(Arc::new(webhook::WebhookAdapter::new(webhook_config.clone())));
}
#[cfg(feature = "kafka")]
AdapterConfig::Kafka(kafka_config) => {
adapters.push(Arc::new(kafka::KafkaAdapter::new(kafka_config)?));
}
#[cfg(feature = "mqtt")]
AdapterConfig::Mqtt(mqtt_config) => {
let (mqtt, mut event_loop) = mqtt::MqttAdapter::new(mqtt_config);
tokio::spawn(async move { while event_loop.poll().await.is_ok() {} });
adapters.push(Arc::new(mqtt));
}
#[cfg(not(feature = "webhook"))]
AdapterConfig::Webhook(_) => return Err(Box::new(Error::FeatureDisabled("webhook"))),
#[cfg(not(feature = "kafka"))]
AdapterConfig::Kafka(_) => return Err(Box::new(Error::FeatureDisabled("kafka"))),
#[cfg(not(feature = "mqtt"))]
AdapterConfig::Mqtt(_) => return Err(Box::new(Error::FeatureDisabled("mqtt"))),
}
}
Ok(adapters)
}

View File

@@ -0,0 +1,58 @@
use crate::ChannelAdapter;
use crate::Error;
use crate::Event;
use crate::MqttConfig;
use async_trait::async_trait;
use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::time::Duration;
use tokio::time::sleep;
/// MQTT adapter for sending events to an MQTT broker.
pub struct MqttAdapter {
client: AsyncClient,
topic: String,
max_retries: u32,
}
impl MqttAdapter {
/// Creates a new MQTT adapter.
pub fn new(config: &MqttConfig) -> (Self, rumqttc::EventLoop) {
let mqtt_options = MqttOptions::new(&config.client_id, &config.broker, config.port);
let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_options, 10);
(
Self {
client,
topic: config.topic.clone(),
max_retries: config.max_retries,
},
event_loop,
)
}
}
#[async_trait]
impl ChannelAdapter for MqttAdapter {
fn name(&self) -> String {
"mqtt".to_string()
}
async fn send(&self, event: &Event) -> Result<(), Error> {
let payload = serde_json::to_string(event).map_err(Error::Serde)?;
let mut attempt = 0;
loop {
match self
.client
.publish(&self.topic, QoS::AtLeastOnce, false, payload.clone())
.await
{
Ok(()) => return Ok(()),
Err(e) if attempt < self.max_retries => {
attempt += 1;
tracing::warn!("MQTT attempt {} failed: {}. Retrying...", attempt, e);
sleep(Duration::from_secs(2u64.pow(attempt))).await;
}
Err(e) => return Err(Error::Mqtt(e)),
}
}
}
}

View File

@@ -0,0 +1,63 @@
use crate::ChannelAdapter;
use crate::Error;
use crate::Event;
use crate::WebhookConfig;
use async_trait::async_trait;
use reqwest::{Client, RequestBuilder};
use std::time::Duration;
use tokio::time::sleep;
/// Webhook adapter for sending events to a webhook endpoint.
pub struct WebhookAdapter {
config: WebhookConfig,
client: Client,
}
impl WebhookAdapter {
/// Creates a new Webhook adapter.
pub fn new(config: WebhookConfig) -> Self {
let client = Client::builder()
.timeout(Duration::from_secs(config.timeout))
.build()
.expect("Failed to build reqwest client");
Self { config, client }
}
/// Builds the request to send the event.
fn build_request(&self, event: &Event) -> RequestBuilder {
let mut request = self.client.post(&self.config.endpoint).json(event);
if let Some(token) = &self.config.auth_token {
request = request.header("Authorization", format!("Bearer {}", token));
}
if let Some(headers) = &self.config.custom_headers {
for (key, value) in headers {
request = request.header(key, value);
}
}
request
}
}
#[async_trait]
impl ChannelAdapter for WebhookAdapter {
fn name(&self) -> String {
"webhook".to_string()
}
async fn send(&self, event: &Event) -> Result<(), Error> {
let mut attempt = 0;
loop {
match self.build_request(event).send().await {
Ok(response) => {
response.error_for_status().map_err(Error::Http)?;
return Ok(());
}
Err(e) if attempt < self.config.max_retries => {
attempt += 1;
tracing::warn!("Webhook attempt {} failed: {}. Retrying...", attempt, e);
sleep(Duration::from_secs(2u64.pow(attempt))).await;
}
Err(e) => return Err(Error::Http(e)),
}
}
}
}

View File

@@ -0,0 +1,68 @@
use crate::ChannelAdapter;
use crate::Error;
use crate::EventStore;
use crate::{Event, Log};
use chrono::Utc;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
/// Handles incoming events from the producer.
///
/// This function is responsible for receiving events from the producer and sending them to the appropriate adapters.
/// It also handles the shutdown process and saves any pending logs to the event store.
pub async fn event_bus(
mut rx: mpsc::Receiver<Event>,
adapters: Vec<Arc<dyn ChannelAdapter>>,
store: Arc<EventStore>,
shutdown: CancellationToken,
) -> Result<(), Error> {
let mut pending_logs = Vec::new();
let mut current_log = Log {
event_name: crate::event::Name::Everything,
key: Utc::now().timestamp().to_string(),
records: Vec::new(),
};
loop {
tokio::select! {
Some(event) = rx.recv() => {
current_log.records.push(event.clone());
let mut send_tasks = Vec::new();
for adapter in &adapters {
if event.channels.contains(&adapter.name()) {
let adapter = adapter.clone();
let event = event.clone();
send_tasks.push(tokio::spawn(async move {
if let Err(e) = adapter.send(&event).await {
tracing::error!("Failed to send event to {}: {}", adapter.name(), e);
Err(e)
} else {
Ok(())
}
}));
}
}
for task in send_tasks {
if task.await?.is_err() {
current_log.records.retain(|e| e.id != event.id);
}
}
if !current_log.records.is_empty() {
pending_logs.push(current_log.clone());
}
current_log.records.clear();
}
_ = shutdown.cancelled() => {
tracing::info!("Shutting down event bus, saving pending logs...");
if !current_log.records.is_empty() {
pending_logs.push(current_log);
}
store.save_logs(&pending_logs).await?;
break;
}
else => break,
}
}
Ok(())
}

View File

@@ -0,0 +1,161 @@
use crate::Error;
use figment::providers::Format;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Configuration for the notification system.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
pub endpoint: String,
pub auth_token: Option<String>,
pub custom_headers: Option<HashMap<String, String>>,
pub max_retries: u32,
pub timeout: u64,
}
impl WebhookConfig {
/// verify that the configuration is valid
pub fn validate(&self) -> Result<(), String> {
// verify that endpoint cannot be empty
if self.endpoint.trim().is_empty() {
return Err("Webhook endpoint cannot be empty".to_string());
}
// verification timeout must be reasonable
if self.timeout == 0 {
return Err("Webhook timeout must be greater than 0".to_string());
}
// Verify that the maximum number of retry is reasonable
if self.max_retries > 10 {
return Err("Maximum retry count cannot exceed 10".to_string());
}
Ok(())
}
}
/// Configuration for the Kafka adapter.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaConfig {
pub brokers: String,
pub topic: String,
pub max_retries: u32,
pub timeout: u64,
}
/// Configuration for the MQTT adapter.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MqttConfig {
pub broker: String,
pub port: u16,
pub client_id: String,
pub topic: String,
pub max_retries: u32,
}
/// Configuration for the notification system.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum AdapterConfig {
Webhook(WebhookConfig),
Kafka(KafkaConfig),
Mqtt(MqttConfig),
}
/// http producer configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpProducerConfig {
#[serde(default = "default_http_port")]
pub port: u16,
}
impl Default for HttpProducerConfig {
fn default() -> Self {
Self {
port: default_http_port(),
}
}
}
fn default_http_port() -> u16 {
3000
}
/// Configuration for the notification system.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NotificationConfig {
#[serde(default = "default_store_path")]
pub store_path: String,
#[serde(default = "default_channel_capacity")]
pub channel_capacity: usize,
pub adapters: Vec<AdapterConfig>,
#[serde(default)]
pub http: HttpProducerConfig,
}
impl Default for NotificationConfig {
fn default() -> Self {
Self {
store_path: default_store_path(),
channel_capacity: default_channel_capacity(),
adapters: Vec::new(),
http: HttpProducerConfig::default(),
}
}
}
impl NotificationConfig {
/// create a new configuration with default values
pub fn new() -> Self {
Self::default()
}
/// create a configuration from a configuration file
pub fn from_file(path: &str) -> Result<Self, Error> {
let config = figment::Figment::new()
.merge(figment::providers::Toml::file(path))
.extract()?;
Ok(config)
}
/// Read configuration from multiple sources (support TOML, YAML, .env)
pub fn load() -> Result<Self, Error> {
let figment = figment::Figment::new()
// First try to read the config.toml of the current directory
.merge(figment::providers::Toml::file("event.toml"))
// Then try to read the config.yaml of the current directory
.merge(figment::providers::Yaml::file("event.yaml"))
// Finally read the environment variable and overwrite the previous value
.merge(figment::providers::Env::prefixed("EVENT_NOTIF_"));
Ok(figment.extract()?)
}
/// loading configuration from env file
pub fn from_env_file(path: &str) -> Result<Self, Error> {
// loading env files
dotenv::from_path(path)
.map_err(|e| Error::ConfigError(format!("unable to load env file: {}", e)))?;
// Extract configuration from environment variables using figurement
let figment =
figment::Figment::new().merge(figment::providers::Env::prefixed("EVENT_NOTIF_"));
Ok(figment.extract()?)
}
}
/// Provide temporary directories as default storage paths
fn default_store_path() -> String {
std::env::temp_dir()
.join("event-notification")
.to_string_lossy()
.to_string()
}
/// Provides the recommended default channel capacity for high concurrency systems
fn default_channel_capacity() -> usize {
10000 // Reasonable default values for high concurrency systems
}

View File

@@ -1,25 +1,45 @@
// src/error.rs
use thiserror::Error;
use tokio::sync::mpsc::error;
use tokio::task::JoinError;
/// The `Error` enum represents all possible errors that can occur in the application.
/// It implements the `std::error::Error` trait and provides a way to convert various error types into a single error type.
#[derive(Error, Debug)]
pub enum Error {
#[error("target not found: {0}")]
TargetNotFound(String),
#[error("send failed: {0}")]
SendError(String),
#[error("target error: {0}")]
TargetError(String),
#[error("invalid configuration: {0}")]
#[error("Join error: {0}")]
JoinError(#[from] JoinError),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[cfg(feature = "kafka")]
#[error("Kafka error: {0}")]
Kafka(#[from] rdkafka::error::KafkaError),
#[cfg(feature = "mqtt")]
#[error("MQTT error: {0}")]
Mqtt(#[from] rumqttc::ClientError),
#[error("Channel send error: {0}")]
ChannelSend(#[from] Box<error::SendError<crate::event::Event>>),
#[error("Feature disabled: {0}")]
FeatureDisabled(&'static str),
#[error("Event bus already started")]
EventBusStarted,
#[error("necessary fields are missing:{0}")]
MissingField(&'static str),
#[error("field verification failed:{0}")]
ValidationError(&'static str),
#[error("{0}")]
Custom(String),
#[error("Configuration error: {0}")]
ConfigError(String),
#[error("store error: {0}")]
StoreError(String),
#[error("invalid event name: {0}")]
InvalidEventName(String), // 添加此变体
#[error("Configuration loading error: {0}")]
Figment(#[from] figment::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
impl Error {
pub(crate) fn custom(msg: &str) -> Error {
Self::Custom(msg.to_string())
}
}

View File

@@ -1,53 +1,442 @@
use crate::Error;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_with::{DeserializeFromStr, SerializeDisplay};
use smallvec::{smallvec, SmallVec};
use std::borrow::Cow;
use std::collections::HashMap;
use strum::{Display, EnumString};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Event {
pub event_version: String,
pub event_source: String,
pub aws_region: String,
pub event_time: String,
pub event_name: String,
pub user_identity: Identity,
pub request_parameters: HashMap<String, String>,
pub response_elements: HashMap<String, String>,
pub s3: Metadata,
pub source: Source,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Identity {
#[serde(rename = "principalId")]
pub principal_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Bucket {
pub name: String,
#[serde(rename = "ownerIdentity")]
pub owner_identity: Identity,
pub arn: String,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Object {
pub key: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub size: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none", rename = "eTag")]
pub etag: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none", rename = "contentType")]
pub content_type: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none", rename = "userMetadata")]
pub user_metadata: Option<HashMap<String, String>>,
#[serde(default, skip_serializing_if = "Option::is_none", rename = "versionId")]
pub version_id: Option<String>,
pub sequencer: String,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Metadata {
#[serde(rename = "s3SchemaVersion")]
pub schema_version: String,
#[serde(rename = "configurationId")]
pub configuration_id: String,
pub bucket: Bucket,
pub object: Object,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Bucket {
pub name: String,
pub owner_identity: Identity,
pub arn: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Object {
pub key: String,
pub version_id: Option<String>,
pub sequencer: String,
pub size: Option<u64>,
pub etag: Option<String>,
pub content_type: Option<String>,
pub user_metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Source {
pub host: String,
pub port: String,
#[serde(rename = "userAgent")]
pub user_agent: String,
}
/// Builder for creating an Event.
///
/// This struct is used to build an Event object with various parameters.
/// It provides methods to set each parameter and a build method to create the Event.
#[derive(Default, Clone)]
pub struct EventBuilder {
event_version: Option<String>,
event_source: Option<String>,
aws_region: Option<String>,
event_time: Option<String>,
event_name: Option<Name>,
user_identity: Option<Identity>,
request_parameters: Option<HashMap<String, String>>,
response_elements: Option<HashMap<String, String>>,
s3: Option<Metadata>,
source: Option<Source>,
channels: Option<SmallVec<[String; 2]>>,
}
impl EventBuilder {
/// create a builder that pre filled default values
pub fn new() -> Self {
Self {
event_version: Some(Cow::Borrowed("2.0").to_string()),
event_source: Some(Cow::Borrowed("aws:s3").to_string()),
aws_region: Some("us-east-1".to_string()),
event_time: Some(Utc::now().to_rfc3339()),
event_name: None,
user_identity: Some(Identity {
principal_id: "anonymous".to_string(),
}),
request_parameters: Some(HashMap::new()),
response_elements: Some(HashMap::new()),
s3: None,
source: None,
channels: Some(Vec::new().into()),
}
}
/// verify and set the event version
pub fn event_version(mut self, event_version: impl Into<String>) -> Self {
let event_version = event_version.into();
if !event_version.is_empty() {
self.event_version = Some(event_version);
}
self
}
/// verify and set the event source
pub fn event_source(mut self, event_source: impl Into<String>) -> Self {
let event_source = event_source.into();
if !event_source.is_empty() {
self.event_source = Some(event_source);
}
self
}
/// set up aws regions
pub fn aws_region(mut self, aws_region: impl Into<String>) -> Self {
self.aws_region = Some(aws_region.into());
self
}
/// set event time
pub fn event_time(mut self, event_time: impl Into<String>) -> Self {
self.event_time = Some(event_time.into());
self
}
/// set event name
pub fn event_name(mut self, event_name: Name) -> Self {
self.event_name = Some(event_name);
self
}
/// set user identity
pub fn user_identity(mut self, user_identity: Identity) -> Self {
self.user_identity = Some(user_identity);
self
}
/// set request parameters
pub fn request_parameters(mut self, request_parameters: HashMap<String, String>) -> Self {
self.request_parameters = Some(request_parameters);
self
}
/// set response elements
pub fn response_elements(mut self, response_elements: HashMap<String, String>) -> Self {
self.response_elements = Some(response_elements);
self
}
/// setting up s3 metadata
pub fn s3(mut self, s3: Metadata) -> Self {
self.s3 = Some(s3);
self
}
/// set event source information
pub fn source(mut self, source: Source) -> Self {
self.source = Some(source);
self
}
/// set up the sending channel
pub fn channels(mut self, channels: Vec<String>) -> Self {
self.channels = Some(channels.into());
self
}
/// Create a preconfigured builder for common object event scenarios
pub fn for_object_creation(s3: Metadata, source: Source) -> Self {
Self::new().event_name(Name::ObjectCreatedPut).s3(s3).source(source)
}
/// Create a preconfigured builder for object deletion events
pub fn for_object_removal(s3: Metadata, source: Source) -> Self {
Self::new().event_name(Name::ObjectRemovedDelete).s3(s3).source(source)
}
/// build event instance
///
/// Verify the required fields and create a complete Event object
pub fn build(self) -> Result<Event, Error> {
let event_version = self.event_version.ok_or(Error::MissingField("event_version"))?;
let event_source = self.event_source.ok_or(Error::MissingField("event_source"))?;
let aws_region = self.aws_region.ok_or(Error::MissingField("aws_region"))?;
let event_time = self.event_time.ok_or(Error::MissingField("event_time"))?;
let event_name = self.event_name.ok_or(Error::MissingField("event_name"))?;
let user_identity = self.user_identity.ok_or(Error::MissingField("user_identity"))?;
let request_parameters = self.request_parameters.unwrap_or_default();
let response_elements = self.response_elements.unwrap_or_default();
let s3 = self.s3.ok_or(Error::MissingField("s3"))?;
let source = self.source.ok_or(Error::MissingField("source"))?;
let channels = self.channels.unwrap_or_else(|| smallvec![]);
Ok(Event {
event_version,
event_source,
aws_region,
event_time,
event_name,
user_identity,
request_parameters,
response_elements,
s3,
source,
id: Uuid::new_v4(),
timestamp: Utc::now(),
channels,
})
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Event {
#[serde(rename = "eventVersion")]
pub event_version: String,
#[serde(rename = "eventSource")]
pub event_source: String,
#[serde(rename = "awsRegion")]
pub aws_region: String,
#[serde(rename = "eventTime")]
pub event_time: String,
#[serde(rename = "eventName")]
pub event_name: Name,
#[serde(rename = "userIdentity")]
pub user_identity: Identity,
#[serde(rename = "requestParameters")]
pub request_parameters: HashMap<String, String>,
#[serde(rename = "responseElements")]
pub response_elements: HashMap<String, String>,
pub s3: Metadata,
pub source: Source,
pub id: Uuid,
pub timestamp: DateTime<Utc>,
pub channels: SmallVec<[String; 2]>,
}
impl Event {
/// create a new event builder
///
/// Returns an EventBuilder instance pre-filled with default values
pub fn builder() -> EventBuilder {
EventBuilder::new()
}
/// Quickly create Event instances with necessary fields
///
/// suitable for common s3 event scenarios
pub fn create(event_name: Name, s3: Metadata, source: Source, channels: Vec<String>) -> Self {
Self::builder()
.event_name(event_name)
.s3(s3)
.source(source)
.channels(channels)
.build()
.expect("Failed to create event, missing necessary parameters")
}
/// a convenient way to create a preconfigured builder
pub fn for_object_creation(s3: Metadata, source: Source) -> EventBuilder {
EventBuilder::for_object_creation(s3, source)
}
/// a convenient way to create a preconfigured builder
pub fn for_object_removal(s3: Metadata, source: Source) -> EventBuilder {
EventBuilder::for_object_removal(s3, source)
}
/// Determine whether an event belongs to a specific type
pub fn is_type(&self, event_type: Name) -> bool {
let mask = event_type.mask();
(self.event_name.mask() & mask) != 0
}
/// Determine whether an event needs to be sent to a specific channel
pub fn is_for_channel(&self, channel: &str) -> bool {
self.channels.iter().any(|c| c == channel)
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Log {
#[serde(rename = "eventName")]
pub event_name: Name,
pub key: String,
pub records: Vec<Event>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, SerializeDisplay, DeserializeFromStr, Display, EnumString)]
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
pub enum Name {
ObjectAccessedGet,
ObjectAccessedGetRetention,
ObjectAccessedGetLegalHold,
ObjectAccessedHead,
ObjectAccessedAttributes,
ObjectCreatedCompleteMultipartUpload,
ObjectCreatedCopy,
ObjectCreatedPost,
ObjectCreatedPut,
ObjectCreatedPutRetention,
ObjectCreatedPutLegalHold,
ObjectCreatedPutTagging,
ObjectCreatedDeleteTagging,
ObjectRemovedDelete,
ObjectRemovedDeleteMarkerCreated,
ObjectRemovedDeleteAllVersions,
ObjectRemovedNoOp,
BucketCreated,
BucketRemoved,
ObjectReplicationFailed,
ObjectReplicationComplete,
ObjectReplicationMissedThreshold,
ObjectReplicationReplicatedAfterThreshold,
ObjectReplicationNotTracked,
ObjectRestorePost,
ObjectRestoreCompleted,
ObjectTransitionFailed,
ObjectTransitionComplete,
ObjectManyVersions,
ObjectLargeVersions,
PrefixManyFolders,
IlmDelMarkerExpirationDelete,
ObjectAccessedAll,
ObjectCreatedAll,
ObjectRemovedAll,
ObjectReplicationAll,
ObjectRestoreAll,
ObjectTransitionAll,
ObjectScannerAll,
Everything,
}
impl Name {
pub fn expand(&self) -> Vec<Name> {
match self {
Name::ObjectAccessedAll => vec![
Name::ObjectAccessedGet,
Name::ObjectAccessedHead,
Name::ObjectAccessedGetRetention,
Name::ObjectAccessedGetLegalHold,
Name::ObjectAccessedAttributes,
],
Name::ObjectCreatedAll => vec![
Name::ObjectCreatedCompleteMultipartUpload,
Name::ObjectCreatedCopy,
Name::ObjectCreatedPost,
Name::ObjectCreatedPut,
Name::ObjectCreatedPutRetention,
Name::ObjectCreatedPutLegalHold,
Name::ObjectCreatedPutTagging,
Name::ObjectCreatedDeleteTagging,
],
Name::ObjectRemovedAll => vec![
Name::ObjectRemovedDelete,
Name::ObjectRemovedDeleteMarkerCreated,
Name::ObjectRemovedNoOp,
Name::ObjectRemovedDeleteAllVersions,
],
Name::ObjectReplicationAll => vec![
Name::ObjectReplicationFailed,
Name::ObjectReplicationComplete,
Name::ObjectReplicationNotTracked,
Name::ObjectReplicationMissedThreshold,
Name::ObjectReplicationReplicatedAfterThreshold,
],
Name::ObjectRestoreAll => vec![Name::ObjectRestorePost, Name::ObjectRestoreCompleted],
Name::ObjectTransitionAll => {
vec![Name::ObjectTransitionFailed, Name::ObjectTransitionComplete]
}
Name::ObjectScannerAll => vec![Name::ObjectManyVersions, Name::ObjectLargeVersions, Name::PrefixManyFolders],
Name::Everything => (1..=Name::IlmDelMarkerExpirationDelete as u32)
.map(|i| Name::from_repr(i).unwrap())
.collect(),
_ => vec![*self],
}
}
pub fn mask(&self) -> u64 {
if (*self as u32) < Name::ObjectAccessedAll as u32 {
1 << (*self as u32 - 1)
} else {
self.expand().iter().fold(0, |acc, n| acc | (1 << (*n as u32 - 1)))
}
}
fn from_repr(discriminant: u32) -> Option<Self> {
match discriminant {
1 => Some(Name::ObjectAccessedGet),
2 => Some(Name::ObjectAccessedGetRetention),
3 => Some(Name::ObjectAccessedGetLegalHold),
4 => Some(Name::ObjectAccessedHead),
5 => Some(Name::ObjectAccessedAttributes),
6 => Some(Name::ObjectCreatedCompleteMultipartUpload),
7 => Some(Name::ObjectCreatedCopy),
8 => Some(Name::ObjectCreatedPost),
9 => Some(Name::ObjectCreatedPut),
10 => Some(Name::ObjectCreatedPutRetention),
11 => Some(Name::ObjectCreatedPutLegalHold),
12 => Some(Name::ObjectCreatedPutTagging),
13 => Some(Name::ObjectCreatedDeleteTagging),
14 => Some(Name::ObjectRemovedDelete),
15 => Some(Name::ObjectRemovedDeleteMarkerCreated),
16 => Some(Name::ObjectRemovedDeleteAllVersions),
17 => Some(Name::ObjectRemovedNoOp),
18 => Some(Name::BucketCreated),
19 => Some(Name::BucketRemoved),
20 => Some(Name::ObjectReplicationFailed),
21 => Some(Name::ObjectReplicationComplete),
22 => Some(Name::ObjectReplicationMissedThreshold),
23 => Some(Name::ObjectReplicationReplicatedAfterThreshold),
24 => Some(Name::ObjectReplicationNotTracked),
25 => Some(Name::ObjectRestorePost),
26 => Some(Name::ObjectRestoreCompleted),
27 => Some(Name::ObjectTransitionFailed),
28 => Some(Name::ObjectTransitionComplete),
29 => Some(Name::ObjectManyVersions),
30 => Some(Name::ObjectLargeVersions),
31 => Some(Name::PrefixManyFolders),
32 => Some(Name::IlmDelMarkerExpirationDelete),
33 => Some(Name::ObjectAccessedAll),
34 => Some(Name::ObjectCreatedAll),
35 => Some(Name::ObjectRemovedAll),
36 => Some(Name::ObjectReplicationAll),
37 => Some(Name::ObjectRestoreAll),
38 => Some(Name::ObjectTransitionAll),
39 => Some(Name::ObjectScannerAll),
40 => Some(Name::Everything),
_ => None,
}
}
}

View File

@@ -1,79 +0,0 @@
use crate::error::Error;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[repr(u64)]
pub enum EventName {
// 对象操作事件
ObjectCreatedAll = 1 << 0,
ObjectCreatedPut = 1 << 1,
ObjectCreatedPost = 1 << 2,
ObjectCreatedCopy = 1 << 3,
ObjectCreatedCompleteMultipartUpload = 1 << 4,
ObjectRemovedAll = 1 << 5,
ObjectRemovedDelete = 1 << 6,
ObjectRemovedDeleteMarkerCreated = 1 << 7,
ObjectAccessedAll = 1 << 8,
ObjectAccessedGet = 1 << 9,
ObjectAccessedHead = 1 << 10,
ObjectRestoredAll = 1 << 11,
ObjectRestoredPost = 1 << 12,
ObjectRestoredCompleted = 1 << 13,
ReplicationAll = 1 << 14,
ReplicationFailed = 1 << 15,
ReplicationComplete = 1 << 16,
}
impl EventName {
pub fn mask(&self) -> u64 {
*self as u64
}
pub fn expand(&self) -> Vec<EventName> {
match self {
EventName::ObjectCreatedAll => vec![
EventName::ObjectCreatedPut,
EventName::ObjectCreatedPost,
EventName::ObjectCreatedCopy,
EventName::ObjectCreatedCompleteMultipartUpload,
],
EventName::ObjectRemovedAll => vec![EventName::ObjectRemovedDelete, EventName::ObjectRemovedDeleteMarkerCreated],
EventName::ObjectAccessedAll => vec![EventName::ObjectAccessedGet, EventName::ObjectAccessedHead],
EventName::ObjectRestoredAll => vec![EventName::ObjectRestoredPost, EventName::ObjectRestoredCompleted],
EventName::ReplicationAll => vec![EventName::ReplicationFailed, EventName::ReplicationComplete],
_ => vec![*self],
}
}
}
impl FromStr for EventName {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"s3:ObjectCreated:*" => Ok(EventName::ObjectCreatedAll),
"s3:ObjectCreated:Put" => Ok(EventName::ObjectCreatedPut),
"s3:ObjectCreated:Post" => Ok(EventName::ObjectCreatedPost),
"s3:ObjectCreated:Copy" => Ok(EventName::ObjectCreatedCopy),
"s3:ObjectCreated:CompleteMultipartUpload" => Ok(EventName::ObjectCreatedCompleteMultipartUpload),
"s3:ObjectRemoved:*" => Ok(EventName::ObjectRemovedAll),
"s3:ObjectRemoved:Delete" => Ok(EventName::ObjectRemovedDelete),
"s3:ObjectRemoved:DeleteMarkerCreated" => Ok(EventName::ObjectRemovedDeleteMarkerCreated),
"s3:ObjectAccessed:*" => Ok(EventName::ObjectAccessedAll),
"s3:ObjectAccessed:Get" => Ok(EventName::ObjectAccessedGet),
"s3:ObjectAccessed:Head" => Ok(EventName::ObjectAccessedHead),
"s3:ObjectRestored:*" => Ok(EventName::ObjectRestoredAll),
"s3:ObjectRestored:Post" => Ok(EventName::ObjectRestoredPost),
"s3:ObjectRestored:Completed" => Ok(EventName::ObjectRestoredCompleted),
"s3:Replication:*" => Ok(EventName::ReplicationAll),
"s3:Replication:Failed" => Ok(EventName::ReplicationFailed),
"s3:Replication:Complete" => Ok(EventName::ReplicationComplete),
_ => Err(Error::InvalidEventName(format!("Unrecognized event name: {}", s))),
}
}
}

View File

@@ -0,0 +1,100 @@
use crate::{ChannelAdapter, Error, Event, NotificationConfig, NotificationSystem};
use std::sync::Arc;
use tokio::sync::{Mutex, OnceCell};
static GLOBAL_SYSTEM: OnceCell<Arc<Mutex<NotificationSystem>>> = OnceCell::const_new();
static INITIALIZED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
/// initialize the global notification system
pub async fn initialize(config: NotificationConfig) -> Result<(), Error> {
if INITIALIZED.swap(true, std::sync::atomic::Ordering::SeqCst) {
return Err(Error::custom("notify the system has been initialized"));
}
let system = Arc::new(Mutex::new(NotificationSystem::new(config).await?));
GLOBAL_SYSTEM
.set(system)
.map_err(|_| Error::custom("unable to set up global notification system"))?;
Ok(())
}
/// start the global notification system
pub async fn start(adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
let system = get_system().await?;
// create a new task to run the system
let system_clone = Arc::clone(&system);
tokio::spawn(async move {
let mut system_guard = system_clone.lock().await;
if let Err(e) = system_guard.start(adapters).await {
tracing::error!("notify the system to start failed: {}", e);
}
});
Ok(())
}
/// Initialize and start the global notification system
///
/// This method combines the functions of `initialize` and `start` to provide one-step setup:
/// - initialize system configuration
/// - create an adapter
/// - start event listening
///
/// # Example
///
/// ```rust
/// use rustfs_event_notifier::{initialize_and_start, NotificationConfig};
///
/// #[tokio::main]
/// async fn main() -> Result<(), rustfs_event_notifier::Error> {
/// let config = NotificationConfig {
/// store_path: "./events".to_string(),
/// channel_capacity: 100,
/// adapters: vec![/* 适配器配置 */],
/// http: Default::default(),
/// };
///
/// // complete initialization and startup in one step
/// initialize_and_start(config).await?;
/// Ok(())
/// }
/// ```
pub async fn initialize_and_start(config: NotificationConfig) -> Result<(), Error> {
// initialize the system first
initialize(config.clone()).await?;
// create an adapter
let adapters = crate::create_adapters(&config.adapters).expect("failed to create adapters");
// start the system
start(adapters).await?;
Ok(())
}
/// send events to notification system
pub async fn send_event(event: Event) -> Result<(), Error> {
let system = get_system().await?;
let system_guard = system.lock().await;
system_guard.send_event(event).await
}
/// turn off the notification system
pub fn shutdown() -> Result<(), Error> {
if let Some(system) = GLOBAL_SYSTEM.get() {
let system_guard = system.blocking_lock();
system_guard.shutdown();
Ok(())
} else {
Err(Error::custom("notification system not initialized"))
}
}
/// get system instance
async fn get_system() -> Result<Arc<Mutex<NotificationSystem>>, Error> {
GLOBAL_SYSTEM
.get()
.cloned()
.ok_or_else(|| Error::custom("notification system not initialized"))
}

View File

@@ -1,80 +1,131 @@
/// RustFS Event Notifier
/// This crate provides a simple event notification system for RustFS.
/// It allows for the registration of event handlers and the triggering of events.
///
mod adapter;
mod bus;
mod config;
mod error;
mod event;
mod event_name;
mod notifier;
mod rules;
mod stats;
mod target;
mod target_entry;
mod global;
mod producer;
mod store;
pub fn add(left: u64, right: u64) -> u64 {
left + right
pub use adapter::create_adapters;
#[cfg(feature = "kafka")]
pub use adapter::kafka::KafkaAdapter;
#[cfg(feature = "mqtt")]
pub use adapter::mqtt::MqttAdapter;
#[cfg(feature = "webhook")]
pub use adapter::webhook::WebhookAdapter;
pub use adapter::ChannelAdapter;
pub use bus::event_bus;
#[cfg(feature = "http-producer")]
pub use config::HttpProducerConfig;
#[cfg(feature = "kafka")]
pub use config::KafkaConfig;
#[cfg(feature = "mqtt")]
pub use config::MqttConfig;
#[cfg(feature = "webhook")]
pub use config::WebhookConfig;
pub use config::{AdapterConfig, NotificationConfig};
pub use error::Error;
pub use event::{Bucket, Event, EventBuilder, Identity, Log, Metadata, Name, Object, Source};
pub use global::{initialize, initialize_and_start, send_event, shutdown, start};
pub use store::EventStore;
#[cfg(feature = "http-producer")]
pub use producer::http::HttpProducer;
#[cfg(feature = "http-producer")]
pub use producer::EventProducer;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
/// The `NotificationSystem` struct represents the notification system.
/// It manages the event bus and the adapters.
/// It is responsible for sending and receiving events.
/// It also handles the shutdown process.
pub struct NotificationSystem {
tx: mpsc::Sender<Event>,
rx: Option<mpsc::Receiver<Event>>,
store: Arc<EventStore>,
shutdown: CancellationToken,
#[cfg(feature = "http-producer")]
http_config: HttpProducerConfig,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::{Bucket, Event, Identity, Metadata, Object, Source};
use crate::target::{TargetID, TargetList};
use std::collections::HashMap;
impl NotificationSystem {
/// Creates a new `NotificationSystem` instance.
pub async fn new(config: NotificationConfig) -> Result<Self, Error> {
let (tx, rx) = mpsc::channel::<Event>(config.channel_capacity);
let store = Arc::new(EventStore::new(&config.store_path).await?);
let shutdown = CancellationToken::new();
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
let restored_logs = store.load_logs().await?;
for log in restored_logs {
for event in log.records {
// For example, where the send method may return a SendError when calling it
tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?;
}
}
Ok(Self {
tx,
rx: Some(rx),
store,
shutdown,
#[cfg(feature = "http-producer")]
http_config: config.http,
})
}
#[tokio::main]
async fn main() {
let target_list = TargetList::new();
let event = Event {
event_version: "1.0".to_string(),
event_source: "aws:s3".to_string(),
aws_region: "us-west-2".to_string(),
event_time: "2023-10-01T12:00:00Z".to_string(),
event_name: "PutObject".to_string(),
user_identity: Identity {
principal_id: "user123".to_string(),
/// Starts the notification system.
/// It initializes the event bus and the producer.
pub async fn start(&mut self, adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?;
let shutdown_clone = self.shutdown.clone();
let store_clone = self.store.clone();
let bus_handle = tokio::spawn(async move {
if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone).await {
tracing::error!("Event bus failed: {}", e);
}
});
#[cfg(feature = "http-producer")]
{
let producer = HttpProducer::new(self.tx.clone(), self.http_config.port);
producer.start().await?;
}
tokio::select! {
result = bus_handle => {
result.map_err(Error::JoinError)?;
Ok(())
},
request_parameters: HashMap::new(),
response_elements: HashMap::new(),
s3: Metadata {
schema_version: "1.0".to_string(),
configuration_id: "config123".to_string(),
bucket: Bucket {
name: "my-bucket".to_string(),
owner_identity: Identity {
principal_id: "owner123".to_string(),
},
arn: "arn:aws:s3:::my-bucket".to_string(),
},
object: Object {
key: "my-object.txt".to_string(),
version_id: Some("version123".to_string()),
sequencer: "seq123".to_string(),
size: Some(1024),
etag: Some("etag123".to_string()),
content_type: Some("text/plain".to_string()),
user_metadata: HashMap::new(),
},
},
source: Source {
host: "localhost".to_string(),
user_agent: "RustFS/1.0".to_string(),
},
};
let target_ids: &[TargetID] = &["".to_string()];
// 发送事件
let results = target_list.send(event, &target_ids).await;
println!("result len:{:?}", results.len());
// 获取统计信息
let stats = target_list.get_stats();
for (id, stat) in stats {
println!("Target {}: {} events, {} failed", id, stat.total_events, stat.failed_events);
_ = self.shutdown.cancelled() => {
tracing::info!("System shutdown triggered");
Ok(())
}
}
}
/// Sends an event to the notification system.
/// This method is used to send events to the event bus.
pub async fn send_event(&self, event: Event) -> Result<(), Error> {
self.tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?;
Ok(())
}
/// Shuts down the notification system.
/// This method is used to cancel the event bus and producer tasks.
pub fn shutdown(&self) {
self.shutdown.cancel();
}
/// Sets the HTTP port for the notification system.
/// This method is used to change the port for the HTTP producer.
#[cfg(feature = "http-producer")]
pub fn set_http_port(&mut self, port: u16) {
self.http_config.port = port;
}
}

View File

@@ -1,47 +0,0 @@
// src/notifier.rs
use crate::error::Error;
use crate::event::Event;
use crate::rules::{RulesMap, TargetIDSet};
use crate::target::TargetList;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tokio::sync::broadcast;
pub struct EventNotifier {
target_list: Arc<TargetList>,
rules: RwLock<HashMap<String, RulesMap>>,
tx: broadcast::Sender<Event>,
}
impl EventNotifier {
pub fn new(capacity: usize) -> Self {
let (tx, _) = broadcast::channel(capacity);
Self {
target_list: Arc::new(TargetList::new()),
rules: RwLock::new(HashMap::new()),
tx,
}
}
pub async fn send(&self, event: Event) -> Result<(), Error> {
let rules = self
.rules
.read()
.map_err(|_| Error::StoreError("Failed to read rules".to_string()))?;
// 检查规则匹配
let target_ids = if let Some(rules_map) = rules.get(&event.s3.bucket.name) {
rules_map.match_targets(event.s3.event_name, &event.s3.object.key)
} else {
TargetIDSet::new()
};
// 发送事件
if !target_ids.is_empty() {
self.target_list.send(event.clone(), &target_ids).await;
}
// 广播给监听者
let _ = self.tx.send(event);
Ok(())
}
}

View File

@@ -0,0 +1,83 @@
use crate::Error;
use crate::Event;
use async_trait::async_trait;
/// event producer characteristics
#[allow(dead_code)]
#[async_trait]
pub trait EventProducer: Send + Sync {
/// start producer services
async fn start(&self) -> Result<(), Error>;
/// stop producer services
async fn stop(&self) -> Result<(), Error>;
/// send a single event
async fn send_event(&self, event: Event) -> Result<(), Error>;
}
#[cfg(feature = "http-producer")]
pub mod http {
use super::*;
use axum::{routing::post, Json, Router};
use std::sync::Arc;
use tokio::sync::mpsc;
#[derive(Clone)]
pub struct HttpProducer {
tx: mpsc::Sender<Event>,
port: u16,
shutdown: Arc<tokio::sync::Notify>,
}
impl HttpProducer {
pub fn new(tx: mpsc::Sender<Event>, port: u16) -> Self {
Self {
tx,
port,
shutdown: Arc::new(tokio::sync::Notify::new()),
}
}
}
#[async_trait]
impl EventProducer for HttpProducer {
async fn start(&self) -> Result<(), Error> {
let producer = self.clone();
let app = Router::new().route(
"/event",
post(move |event| {
let prod = producer.clone();
async move { handle_event(event, prod).await }
}),
);
let addr = format!("0.0.0.0:{}", self.port);
let listener = tokio::net::TcpListener::bind(&addr).await?;
let shutdown = self.shutdown.clone();
tokio::select! {
result = axum::serve(listener, app) => {
result?;
Ok(())
}
_ = shutdown.notified() => Ok(())
}
}
async fn stop(&self) -> Result<(), Error> {
self.shutdown.notify_one();
Ok(())
}
async fn send_event(&self, event: Event) -> Result<(), Error> {
self.tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?;
Ok(())
}
}
async fn handle_event(Json(event): Json<Event>, producer: HttpProducer) -> Result<(), axum::http::StatusCode> {
producer
.send_event(event)
.await
.map_err(|_| axum::http::StatusCode::INTERNAL_SERVER_ERROR)
}
}

View File

@@ -1,191 +0,0 @@
// src/rules.rs
use crate::event_name::EventName;
use crate::target::TargetID;
use std::collections::{HashMap, HashSet};
use wildmatch::WildMatch;
/// TargetIDSet represents a set of target IDs
#[derive(Debug, Clone, Default)]
pub struct TargetIDSet(HashSet<TargetID>);
impl TargetIDSet {
pub fn new() -> Self {
Self(HashSet::new())
}
pub fn insert(&mut self, target_id: TargetID) -> bool {
self.0.insert(target_id)
}
pub fn contains(&self, target_id: &TargetID) -> bool {
self.0.contains(target_id)
}
pub fn remove(&mut self, target_id: &TargetID) -> bool {
self.0.remove(target_id)
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn clear(&mut self) {
self.0.clear()
}
pub fn iter(&self) -> impl Iterator<Item = &TargetID> {
self.0.iter()
}
pub fn extend<I: IntoIterator<Item = TargetID>>(&mut self, iter: I) {
self.0.extend(iter)
}
pub fn union(&self, other: &TargetIDSet) -> TargetIDSet {
let mut result = self.clone();
result.extend(other.0.iter().cloned());
result
}
pub fn difference(&self, other: &TargetIDSet) -> TargetIDSet {
let mut result = TargetIDSet::new();
for id in self.0.iter() {
if !other.contains(id) {
result.insert(id.clone());
}
}
result
}
}
impl FromIterator<TargetID> for TargetIDSet {
fn from_iter<I: IntoIterator<Item = TargetID>>(iter: I) -> Self {
let mut set = TargetIDSet::new();
set.extend(iter);
set
}
}
#[derive(Debug, Clone)]
pub struct Rules {
patterns: HashMap<String, TargetIDSet>,
}
impl Rules {
pub fn new() -> Self {
Self {
patterns: HashMap::new(),
}
}
pub fn add(&mut self, pattern: String, target_id: TargetID) {
let entry = self.patterns.entry(pattern).or_insert_with(TargetIDSet::new);
entry.insert(target_id);
}
pub fn match_object(&self, object_name: &str) -> bool {
for pattern in self.patterns.keys() {
if WildMatch::new(pattern).matches(object_name) {
return true;
}
}
false
}
pub fn match_targets(&self, object_name: &str) -> TargetIDSet {
let mut target_ids = TargetIDSet::new();
for (pattern, ids) in &self.patterns {
if WildMatch::new(pattern).matches(object_name) {
target_ids.extend(ids.iter().cloned());
}
}
target_ids
}
}
#[derive(Debug, Default)]
pub struct RulesMap {
rules: HashMap<EventName, Rules>,
}
impl RulesMap {
pub fn new() -> Self {
Self { rules: HashMap::new() }
}
pub fn add(&mut self, events: &[EventName], pattern: String, target_id: TargetID) {
for &event in events {
for expanded_event in event.expand() {
let rules = self.rules.entry(expanded_event).or_insert_with(Rules::new);
rules.add(pattern.clone(), target_id.clone());
}
}
}
pub fn match_simple(&self, event: EventName, object_name: &str) -> bool {
match self.rules.get(&event) {
Some(rules) => rules.match_object(object_name),
None => false,
}
}
pub fn match_targets(&self, event: EventName, object_name: &str) -> TargetIDSet {
match self.rules.get(&event) {
Some(rules) => rules.match_targets(object_name),
None => TargetIDSet::new(),
}
}
}
impl Clone for RulesMap {
fn clone(&self) -> Self {
Self {
rules: self.rules.clone(),
}
}
}
impl Default for Rules {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event_name::EventName;
#[test]
fn test_rules() {
let mut rules = Rules::new();
rules.add("*.txt".to_string(), "target1".to_string());
rules.add("*.jpg".to_string(), "target2".to_string());
assert!(rules.match_object("file.txt"));
assert!(!rules.match_object("file.pdf"));
assert!(rules.match_object("image.jpg"));
assert!(!rules.match_object("image.png"));
let target_ids = rules.match_targets("file.txt");
assert_eq!(target_ids.len(), 1);
assert!(target_ids.iter().any(|id| id == "target1"));
let mut rules_map = RulesMap::new();
let target_id = "target1".to_string();
// 添加规则
rules_map.add(&[EventName::ObjectCreatedAll], String::from("images/*"), target_id);
// 匹配对象
let matches = rules_map.match_simple(EventName::ObjectCreatedPut, "images/photo.jpg"); // returns true
// 获取匹配的目标
let target_ids = rules_map.match_targets(EventName::ObjectCreatedPut, "images/photo.jpg");
}
}

View File

@@ -1,81 +0,0 @@
// src/stats.rs
use crate::target::TargetID;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
#[derive(Debug, Default)]
pub struct TargetStat {
pub current_send_calls: AtomicI64,
pub total_events: AtomicI64,
pub failed_events: AtomicI64,
pub current_queue: AtomicI64,
}
impl TargetStat {
pub fn inc_send_calls(&self) {
self.current_send_calls.fetch_add(1, Ordering::Relaxed);
}
pub fn dec_send_calls(&self) {
self.current_send_calls.fetch_sub(1, Ordering::Relaxed);
}
pub fn inc_total_events(&self) {
self.total_events.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_failed_events(&self) {
self.failed_events.fetch_add(1, Ordering::Relaxed);
}
pub fn set_queue_size(&self, size: i64) {
self.current_queue.store(size, Ordering::Relaxed);
}
}
#[derive(Default)]
pub struct TargetStats {
stats: RwLock<HashMap<TargetID, Arc<TargetStat>>>,
}
impl TargetStats {
pub fn new() -> Self {
Self::default()
}
pub fn get_or_create(&self, id: &TargetID) -> Arc<TargetStat> {
let mut stats = self.stats.write();
stats
.entry(id.clone())
.or_insert_with(|| Arc::new(TargetStat::default()))
.clone()
}
pub fn get_stats(&self) -> HashMap<TargetID, TargetSnapshot> {
self.stats
.read()
.iter()
.map(|(id, stat)| {
(
id.clone(),
TargetSnapshot {
current_send_calls: stat.current_send_calls.load(Ordering::Relaxed),
total_events: stat.total_events.load(Ordering::Relaxed),
failed_events: stat.failed_events.load(Ordering::Relaxed),
current_queue: stat.current_queue.load(Ordering::Relaxed),
},
)
})
.collect()
}
}
#[derive(Debug, Clone)]
pub struct TargetSnapshot {
pub current_send_calls: i64,
pub total_events: i64,
pub failed_events: i64,
pub current_queue: i64,
}

View File

@@ -0,0 +1,53 @@
use crate::Error;
use crate::Log;
use chrono::Utc;
use std::sync::Arc;
use tokio::fs::{create_dir_all, File, OpenOptions};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::sync::RwLock;
/// `EventStore` is a struct that manages the storage of event logs.
pub struct EventStore {
path: String,
lock: Arc<RwLock<()>>,
}
impl EventStore {
pub async fn new(path: &str) -> Result<Self, Error> {
create_dir_all(path).await?;
Ok(Self {
path: path.to_string(),
lock: Arc::new(RwLock::new(())),
})
}
pub async fn save_logs(&self, logs: &[Log]) -> Result<(), Error> {
let _guard = self.lock.write().await;
let file_path = format!("{}/events_{}.jsonl", self.path, Utc::now().timestamp());
let file = OpenOptions::new().create(true).append(true).open(&file_path).await?;
let mut writer = BufWriter::new(file);
for log in logs {
let line = serde_json::to_string(log)?;
writer.write_all(line.as_bytes()).await?;
writer.write_all(b"\n").await?;
}
writer.flush().await?;
Ok(())
}
pub async fn load_logs(&self) -> Result<Vec<Log>, Error> {
let _guard = self.lock.read().await;
let mut logs = Vec::new();
let mut entries = tokio::fs::read_dir(&self.path).await?;
while let Some(entry) = entries.next_entry().await? {
let file = File::open(entry.path()).await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
let log: Log = serde_json::from_str(&line)?;
logs.push(log);
}
}
Ok(logs)
}
}

View File

@@ -1,66 +0,0 @@
// src/target.rs
use crate::error::{Error, Result};
use crate::event::Event;
use crate::stats::{TargetSnapshot, TargetStats};
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
pub type TargetID = String;
#[async_trait]
pub trait Target: Send + Sync {
fn id(&self) -> TargetID;
async fn send(&self, event: Event) -> Result<()>;
async fn is_active(&self) -> bool;
async fn close(&self) -> Result<()>;
}
pub struct TargetList {
targets: Arc<RwLock<HashMap<TargetID, Box<dyn Target>>>>,
stats: Arc<TargetStats>,
}
impl TargetList {
pub fn new() -> Self {
Self {
targets: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::default(),
}
}
pub async fn send(&self, event: Event, target_ids: &[TargetID]) -> Vec<Result<()>> {
let mut results = Vec::with_capacity(target_ids.len());
let targets = self.targets.read().unwrap();
for id in target_ids {
let target = match targets.get(id) {
Some(t) => t,
None => {
results.push(Err(Error::TargetNotFound(id.to_string())));
continue;
}
};
let stats = self.stats.get_or_create(id);
stats.inc_send_calls();
let result = target.send(event.clone()).await;
stats.dec_send_calls();
stats.inc_total_events();
if result.is_err() {
stats.inc_failed_events();
}
results.push(result);
}
results
}
pub fn get_stats(&self) -> HashMap<TargetID, TargetSnapshot> {
self.stats.get_stats()
}
}

View File

@@ -1 +0,0 @@
mod webhook;

View File

@@ -1,161 +0,0 @@
use crate::event::Event;
use crate::target::{Target, TargetID};
use async_trait::async_trait;
use reqwest::{header, Client};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
#[derive(Error, Debug)]
pub enum WebhookError {
#[error("HTTP client error: {0}")]
HttpClient(#[from] reqwest::Error),
#[error("JSON serialization error: {0}")]
Json(#[from] serde_json::Error),
#[error("Store error: {0}")]
Store(#[from] Box<dyn std::error::Error + Send + Sync>),
#[error("Webhook request failed after retries")]
RequestFailed,
}
type Result<T> = std::result::Result<T, WebhookError>;
/// Webhook 目标配置
/// Webhook 目标配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
/// Webhook endpoint URL
pub endpoint: String,
/// 认证令牌
pub auth_token: Option<String>,
/// 自定义请求头
pub custom_headers: Option<HashMap<String, String>>,
/// 重试次数
pub max_retries: u32,
/// 连接超时时间 (秒)
pub timeout: u64,
}
/// Webhook Target 实现
pub struct Webhook {
/// 目标 ID
id: TargetID,
/// HTTP 客户端
client: Client,
/// 配置信息
config: WebhookConfig,
/// 事件存储
store: Arc<Mutex<dyn Store>>,
}
#[async_trait]
impl Target for Webhook {
/// 返回目标 ID
fn id(&self) -> &TargetID {
&self.id
}
/// 检查 Webhook 是否可用
async fn is_active(&self) -> Result<bool> {
// 发送测试请求验证连接
let resp = self.client.get(&self.config.endpoint).send().await?;
Ok(resp.status().is_success())
}
/// 保存事件到存储并异步发送
async fn save(&self, event: Event) -> Result<()> {
// 序列化事件
let event_json = serde_json::to_string(&event)?;
// 存储事件
self.store.lock().await.save(event_json)?;
// 异步发送
tokio::spawn(self.send_event(event));
Ok(())
}
/// 从存储发送事件
async fn send(&self, key: StoreKey) -> Result<()> {
// 从存储获取事件
let event_json = self.store.lock().await.get(&key)?;
let event: Event = serde_json::from_str(&event_json)?;
// 发送事件到 webhook
self.send_event(event).await?;
// 发送成功后删除
self.store.lock().await.delete(&key)?;
Ok(())
}
/// 返回事件存储
fn store(&self) -> Arc<Mutex<dyn Store>> {
self.store.clone()
}
/// 关闭 Target
async fn close(&self) -> Result<()> {
// 等待所有事件发送完成
self.store.lock().await.flush()?;
Ok(())
}
}
impl Webhook {
/// 创建新的 Webhook Target
pub fn new(id: TargetID, config: WebhookConfig, store: Arc<Mutex<dyn Store>>) -> Result<Self> {
// 构建 HTTP 客户端
let client = Client::builder().timeout(Duration::from_secs(config.timeout)).build()?;
Ok(Self {
id,
client,
config,
store,
})
}
/// 发送事件到 Webhook endpoint
async fn send_event(&self, event: Event) -> Result<()> {
let mut retries = 0;
loop {
// 构建请求
let mut req = self.client.post(&self.config.endpoint).json(&event);
// 添加认证头
if let Some(token) = &self.config.auth_token {
req = req.header(header::AUTHORIZATION, format!("Bearer {}", token));
}
// 添加自定义头
if let Some(headers) = &self.config.custom_headers {
for (key, value) in headers {
req = req.header(key, value);
}
}
// 发送请求
match req.send().await {
Ok(resp) if resp.status().is_success() => {
return Ok(());
}
_ if retries < self.config.max_retries => {
retries += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
Err(e) => return Err(WebhookError::HttpClient(e)),
_ => return Err(WebhookError::RequestFailed),
}
}
}
}

View File

@@ -0,0 +1,160 @@
use rustfs_event_notifier::{AdapterConfig, NotificationSystem, WebhookConfig};
use rustfs_event_notifier::{Bucket, Event, EventBuilder, Identity, Metadata, Name, Object, Source};
use rustfs_event_notifier::{ChannelAdapter, WebhookAdapter};
use std::collections::HashMap;
use std::sync::Arc;
#[tokio::test]
async fn test_webhook_adapter() {
let adapter = WebhookAdapter::new(WebhookConfig {
endpoint: "http://localhost:8080/webhook".to_string(),
auth_token: None,
custom_headers: None,
max_retries: 1,
timeout: 5,
});
// create an s3 metadata object
let metadata = Metadata {
schema_version: "1.0".to_string(),
configuration_id: "test-config".to_string(),
bucket: Bucket {
name: "my-bucket".to_string(),
owner_identity: Identity {
principal_id: "owner123".to_string(),
},
arn: "arn:aws:s3:::my-bucket".to_string(),
},
object: Object {
key: "test.txt".to_string(),
size: Some(1024),
etag: Some("abc123".to_string()),
content_type: Some("text/plain".to_string()),
user_metadata: None,
version_id: None,
sequencer: "1234567890".to_string(),
},
};
// create source object
let source = Source {
host: "localhost".to_string(),
port: "80".to_string(),
user_agent: "curl/7.68.0".to_string(),
};
// Create events using builder mode
let event = Event::builder()
.event_version("2.0")
.event_source("aws:s3")
.aws_region("us-east-1")
.event_time("2023-10-01T12:00:00.000Z")
.event_name(Name::ObjectCreatedPut)
.user_identity(Identity {
principal_id: "user123".to_string(),
})
.request_parameters(HashMap::new())
.response_elements(HashMap::new())
.s3(metadata)
.source(source)
.channels(vec!["webhook".to_string()])
.build()
.expect("failed to create event");
let result = adapter.send(&event).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_notification_system() {
let config = rustfs_event_notifier::NotificationConfig {
store_path: "./test_events".to_string(),
channel_capacity: 100,
adapters: vec![AdapterConfig::Webhook(WebhookConfig {
endpoint: "http://localhost:8080/webhook".to_string(),
auth_token: None,
custom_headers: None,
max_retries: 1,
timeout: 5,
})],
http: Default::default(),
};
let system = Arc::new(tokio::sync::Mutex::new(NotificationSystem::new(config.clone()).await.unwrap()));
let adapters: Vec<Arc<dyn ChannelAdapter>> = vec![Arc::new(WebhookAdapter::new(WebhookConfig {
endpoint: "http://localhost:8080/webhook".to_string(),
auth_token: None,
custom_headers: None,
max_retries: 1,
timeout: 5,
}))];
// create an s3 metadata object
let metadata = Metadata {
schema_version: "1.0".to_string(),
configuration_id: "test-config".to_string(),
bucket: Bucket {
name: "my-bucket".to_string(),
owner_identity: Identity {
principal_id: "owner123".to_string(),
},
arn: "arn:aws:s3:::my-bucket".to_string(),
},
object: Object {
key: "test.txt".to_string(),
size: Some(1024),
etag: Some("abc123".to_string()),
content_type: Some("text/plain".to_string()),
user_metadata: None,
version_id: None,
sequencer: "1234567890".to_string(),
},
};
// create source object
let source = Source {
host: "localhost".to_string(),
port: "80".to_string(),
user_agent: "curl/7.68.0".to_string(),
};
// create a preconfigured builder with objects
let event = EventBuilder::for_object_creation(metadata, source)
.user_identity(Identity {
principal_id: "user123".to_string(),
})
.event_time("2023-10-01T12:00:00.000Z")
.channels(vec!["webhook".to_string()])
.build()
.expect("failed to create event");
{
let system_lock = system.lock().await;
system_lock.send_event(event).await.unwrap();
}
let system_clone = Arc::clone(&system);
let system_handle = tokio::spawn(async move {
let mut system = system_clone.lock().await;
system.start(adapters).await
});
// set 10 seconds timeout
match tokio::time::timeout(std::time::Duration::from_secs(10), system_handle).await {
Ok(result) => {
println!("System started successfully");
assert!(result.is_ok());
}
Err(_) => {
println!("System operation timed out, forcing shutdown");
// create a new task to handle the timeout
let system = Arc::clone(&system);
tokio::spawn(async move {
if let Ok(guard) = system.try_lock() {
guard.shutdown();
}
});
// give the system some time to clean up resources
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}

View File

@@ -60,7 +60,7 @@ s3s.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_urlencoded = { workspace = true }
shadow-rs.workspace = true
shadow-rs = { workspace = true, features = ["build"] }
tracing.workspace = true
time = { workspace = true, features = ["parsing", "formatting", "serde"] }
tokio-util.workspace = true
@@ -103,5 +103,5 @@ hyper-util = { workspace = true, features = [
] }
transform-stream = { workspace = true }
netif = "0.1.6"
shadow-rs.workspace = true
shadow-rs = { workspace = true, features = ["build"] }
# pin-utils = "0.1.0"

View File

@@ -103,6 +103,10 @@ pub struct Opt {
#[arg(long, env = "RUSTFS_LICENSE")]
pub license: Option<String>,
/// event notifier config file
#[arg(long, env = "RUSTFS_EVENT_CONFIG")]
pub event_config: Option<String>,
}
// lazy_static::lazy_static! {