mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
feat: implement event notification system
- Add core event notification interfaces - Support multiple notification backends: - Webhook (default) - Kafka - MQTT - HTTP Producer - Implement configurable event filtering - Add async event dispatching with backpressure handling - Provide serialization/deserialization for event payloads This module enables system events to be published to various endpoints with consistent delivery guarantees and failure handling.
This commit is contained in:
397
Cargo.lock
generated
397
Cargo.lock
generated
@@ -371,7 +371,7 @@ dependencies = [
|
||||
"arrow-buffer",
|
||||
"arrow-data",
|
||||
"arrow-schema",
|
||||
"flatbuffers",
|
||||
"flatbuffers 24.12.23",
|
||||
"lz4_flex",
|
||||
]
|
||||
|
||||
@@ -664,6 +664,15 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-waker"
|
||||
version = "1.1.2"
|
||||
@@ -800,11 +809,11 @@ dependencies = [
|
||||
"hyper",
|
||||
"hyper-util",
|
||||
"pin-project-lite",
|
||||
"rustls",
|
||||
"rustls 0.23.26",
|
||||
"rustls-pemfile",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tokio-rustls 0.26.2",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
@@ -1017,6 +1026,12 @@ version = "3.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
|
||||
|
||||
[[package]]
|
||||
name = "bytemuck"
|
||||
version = "1.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6b1fc10dbac614ebc03540c9dbd60e83887fda27794998c6528f1782047d540"
|
||||
|
||||
[[package]]
|
||||
name = "byteorder"
|
||||
version = "1.5.0"
|
||||
@@ -1088,6 +1103,38 @@ dependencies = [
|
||||
"system-deps",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "camino"
|
||||
version = "1.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cargo-platform"
|
||||
version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e35af189006b9c0f00a064685c727031e3ed2d8020f7ba284d78cc2671bd36ea"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cargo_metadata"
|
||||
version = "0.19.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dd5eb614ed4c27c5d706420e4320fbe3216ab31fa1c33cd8246ac36dae4479ba"
|
||||
dependencies = [
|
||||
"camino",
|
||||
"cargo-platform",
|
||||
"semver",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror 2.0.12",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.2.19"
|
||||
@@ -1111,7 +1158,7 @@ version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
|
||||
dependencies = [
|
||||
"nom",
|
||||
"nom 7.1.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2998,6 +3045,12 @@ dependencies = [
|
||||
"const-random",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dotenv"
|
||||
version = "0.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
|
||||
|
||||
[[package]]
|
||||
name = "dpi"
|
||||
version = "0.1.1"
|
||||
@@ -3031,7 +3084,7 @@ version = "0.0.1"
|
||||
dependencies = [
|
||||
"common",
|
||||
"ecstore",
|
||||
"flatbuffers",
|
||||
"flatbuffers 25.2.10",
|
||||
"futures",
|
||||
"lazy_static",
|
||||
"lock",
|
||||
@@ -3060,7 +3113,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"common",
|
||||
"crc32fast",
|
||||
"flatbuffers",
|
||||
"flatbuffers 25.2.10",
|
||||
"futures",
|
||||
"glob",
|
||||
"hex-simd",
|
||||
@@ -3071,7 +3124,7 @@ dependencies = [
|
||||
"madmin",
|
||||
"md-5",
|
||||
"netif",
|
||||
"nix 0.29.0",
|
||||
"nix",
|
||||
"num",
|
||||
"num_cpus",
|
||||
"path-absolutize",
|
||||
@@ -3246,6 +3299,21 @@ dependencies = [
|
||||
"rustc_version",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "figment"
|
||||
version = "0.10.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8cb01cd46b0cf372153850f4c6c272d9cbea2da513e07538405148f95bd789f3"
|
||||
dependencies = [
|
||||
"atomic",
|
||||
"pear",
|
||||
"serde",
|
||||
"serde_yaml",
|
||||
"toml",
|
||||
"uncased",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fixedbitset"
|
||||
version = "0.5.7"
|
||||
@@ -3262,6 +3330,16 @@ dependencies = [
|
||||
"rustc_version",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flatbuffers"
|
||||
version = "25.2.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1045398c1bfd89168b5fd3f1fc11f6e70b34f6f66300c87d44d3de849463abf1"
|
||||
dependencies = [
|
||||
"bitflags 2.9.0",
|
||||
"rustc_version",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.1.1"
|
||||
@@ -3272,6 +3350,17 @@ dependencies = [
|
||||
"miniz_oxide",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flume"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"spin",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fnv"
|
||||
version = "1.0.7"
|
||||
@@ -4087,10 +4176,10 @@ dependencies = [
|
||||
"http",
|
||||
"hyper",
|
||||
"hyper-util",
|
||||
"rustls",
|
||||
"rustls 0.23.26",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tokio-rustls 0.26.2",
|
||||
"tower-service",
|
||||
"webpki-roots",
|
||||
]
|
||||
@@ -4334,6 +4423,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"hashbrown 0.12.3",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4356,6 +4446,12 @@ dependencies = [
|
||||
"cfb",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inlinable_string"
|
||||
version = "0.1.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb"
|
||||
|
||||
[[package]]
|
||||
name = "inout"
|
||||
version = "0.1.4"
|
||||
@@ -4737,19 +4833,19 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libsystemd"
|
||||
version = "0.7.0"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c592dc396b464005f78a5853555b9f240bc5378bf5221acc4e129910b2678869"
|
||||
checksum = "b85fe9dc49de659d05829fdf72b5770c0a5952d1055c34a39f6d4e932bce175d"
|
||||
dependencies = [
|
||||
"hmac 0.12.1",
|
||||
"libc",
|
||||
"log",
|
||||
"nix 0.27.1",
|
||||
"nom",
|
||||
"nix",
|
||||
"nom 8.0.0",
|
||||
"once_cell",
|
||||
"serde",
|
||||
"sha2 0.10.8",
|
||||
"thiserror 1.0.69",
|
||||
"thiserror 2.0.12",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
@@ -5169,18 +5265,6 @@ version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086"
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.27.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
|
||||
dependencies = [
|
||||
"bitflags 2.9.0",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"memoffset",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.29.0"
|
||||
@@ -5210,6 +5294,15 @@ dependencies = [
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "8.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
@@ -5606,6 +5699,12 @@ version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-probe"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.29.1"
|
||||
@@ -5959,6 +6058,29 @@ dependencies = [
|
||||
"hmac 0.12.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pear"
|
||||
version = "0.2.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bdeeaa00ce488657faba8ebf44ab9361f9365a97bd39ffb8a60663f57ff4b467"
|
||||
dependencies = [
|
||||
"inlinable_string",
|
||||
"pear_codegen",
|
||||
"yansi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pear_codegen"
|
||||
version = "0.2.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4bab5b985dc082b345f812b7df84e1bef27e7207b39e448439ba8bd69c93f147"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"proc-macro2-diagnostics",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pem"
|
||||
version = "3.0.5"
|
||||
@@ -6437,6 +6559,7 @@ dependencies = [
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
"version_check",
|
||||
"yansi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6531,7 +6654,7 @@ name = "protos"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"common",
|
||||
"flatbuffers",
|
||||
"flatbuffers 25.2.10",
|
||||
"prost",
|
||||
"prost-build",
|
||||
"protobuf",
|
||||
@@ -6589,7 +6712,7 @@ dependencies = [
|
||||
"quinn-proto",
|
||||
"quinn-udp",
|
||||
"rustc-hash 2.1.1",
|
||||
"rustls",
|
||||
"rustls 0.23.26",
|
||||
"socket2",
|
||||
"thiserror 2.0.12",
|
||||
"tokio",
|
||||
@@ -6608,7 +6731,7 @@ dependencies = [
|
||||
"rand 0.9.0",
|
||||
"ring",
|
||||
"rustc-hash 2.1.1",
|
||||
"rustls",
|
||||
"rustls 0.23.26",
|
||||
"rustls-pki-types",
|
||||
"slab",
|
||||
"thiserror 2.0.12",
|
||||
@@ -6946,7 +7069,7 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"quinn",
|
||||
"rustls",
|
||||
"rustls 0.23.26",
|
||||
"rustls-pemfile",
|
||||
"rustls-pki-types",
|
||||
"serde",
|
||||
@@ -6955,7 +7078,7 @@ dependencies = [
|
||||
"sync_wrapper",
|
||||
"system-configuration",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tokio-rustls 0.26.2",
|
||||
"tokio-util",
|
||||
"tower 0.5.2",
|
||||
"tower-service",
|
||||
@@ -7083,6 +7206,24 @@ dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rumqttc"
|
||||
version = "0.24.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1568e15fab2d546f940ed3a21f48bbbd1c494c90c99c4481339364a497f94a9"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"flume",
|
||||
"futures-util",
|
||||
"log",
|
||||
"rustls-native-certs",
|
||||
"rustls-pemfile",
|
||||
"rustls-webpki 0.102.8",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tokio-rustls 0.25.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rust-embed"
|
||||
version = "8.7.0"
|
||||
@@ -7176,7 +7317,7 @@ dependencies = [
|
||||
"crypto",
|
||||
"datafusion",
|
||||
"ecstore",
|
||||
"flatbuffers",
|
||||
"flatbuffers 25.2.10",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"http",
|
||||
@@ -7202,7 +7343,7 @@ dependencies = [
|
||||
"rust-embed",
|
||||
"rustfs-event-notifier",
|
||||
"rustfs-obs",
|
||||
"rustls",
|
||||
"rustls 0.23.26",
|
||||
"rustls-pemfile",
|
||||
"rustls-pki-types",
|
||||
"s3s",
|
||||
@@ -7213,7 +7354,7 @@ dependencies = [
|
||||
"tikv-jemallocator",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tokio-rustls 0.26.2",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tonic 0.13.0",
|
||||
@@ -7230,12 +7371,24 @@ name = "rustfs-event-notifier"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"parking_lot 0.12.3",
|
||||
"axum",
|
||||
"chrono",
|
||||
"dotenv",
|
||||
"figment",
|
||||
"rdkafka",
|
||||
"reqwest",
|
||||
"rumqttc",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"smallvec",
|
||||
"strum",
|
||||
"thiserror 2.0.12",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uuid",
|
||||
"wildmatch",
|
||||
]
|
||||
|
||||
@@ -7314,6 +7467,20 @@ dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.22.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki 0.102.8",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.23.26"
|
||||
@@ -7325,11 +7492,24 @@ dependencies = [
|
||||
"once_cell",
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki",
|
||||
"rustls-webpki 0.103.1",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-native-certs"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5"
|
||||
dependencies = [
|
||||
"openssl-probe",
|
||||
"rustls-pemfile",
|
||||
"rustls-pki-types",
|
||||
"schannel",
|
||||
"security-framework 2.11.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pemfile"
|
||||
version = "2.2.0"
|
||||
@@ -7348,6 +7528,17 @@ dependencies = [
|
||||
"web-time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.102.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.103.1"
|
||||
@@ -7400,7 +7591,7 @@ dependencies = [
|
||||
"md-5",
|
||||
"memchr",
|
||||
"mime",
|
||||
"nom",
|
||||
"nom 7.1.3",
|
||||
"nugine-rust-utils",
|
||||
"numeric_cast",
|
||||
"pin-project-lite",
|
||||
@@ -7442,6 +7633,15 @@ dependencies = [
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schannel"
|
||||
version = "0.1.27"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d"
|
||||
dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.2.0"
|
||||
@@ -7509,6 +7709,9 @@ name = "semver"
|
||||
version = "1.0.26"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "send_wrapper"
|
||||
@@ -7621,6 +7824,49 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_with"
|
||||
version = "3.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"chrono",
|
||||
"hex",
|
||||
"indexmap 1.9.3",
|
||||
"indexmap 2.9.0",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"serde_with_macros",
|
||||
"time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_with_macros"
|
||||
version = "3.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e"
|
||||
dependencies = [
|
||||
"darling",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_yaml"
|
||||
version = "0.9.34+deprecated"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47"
|
||||
dependencies = [
|
||||
"indexmap 2.9.0",
|
||||
"itoa 1.0.15",
|
||||
"ryu",
|
||||
"serde",
|
||||
"unsafe-libyaml",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "server_fn"
|
||||
version = "0.6.15"
|
||||
@@ -7730,13 +7976,16 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "shadow-rs"
|
||||
version = "0.38.1"
|
||||
version = "1.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ec14cc798c29f4bf74a6c4299c657c04d4e9fba03875c1f0eec569af03aed89"
|
||||
checksum = "6d5625ed609cf66d7e505e7d487aca815626dc4ebb6c0dd07637ca61a44651a6"
|
||||
dependencies = [
|
||||
"cargo_metadata",
|
||||
"const_format",
|
||||
"is_debug",
|
||||
"serde_json",
|
||||
"time",
|
||||
"tzdb",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7881,6 +8130,9 @@ name = "smallvec"
|
||||
version = "1.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "snafu"
|
||||
@@ -7951,6 +8203,9 @@ name = "spin"
|
||||
version = "0.9.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spki"
|
||||
@@ -8452,13 +8707,24 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-rustls"
|
||||
version = "0.25.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
|
||||
dependencies = [
|
||||
"rustls 0.22.4",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-rustls"
|
||||
version = "0.26.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
|
||||
dependencies = [
|
||||
"rustls",
|
||||
"rustls 0.23.26",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@@ -8888,6 +9154,32 @@ version = "1.18.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f"
|
||||
|
||||
[[package]]
|
||||
name = "tz-rs"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1450bf2b99397e72070e7935c89facaa80092ac812502200375f1f7d33c71a1"
|
||||
|
||||
[[package]]
|
||||
name = "tzdb"
|
||||
version = "0.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0be2ea5956f295449f47c0b825c5e109022ff1a6a53bb4f77682a87c2341fbf5"
|
||||
dependencies = [
|
||||
"iana-time-zone",
|
||||
"tz-rs",
|
||||
"tzdb_data",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tzdb_data"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c4c81d75033770e40fbd3643ce7472a1a9fd301f90b7139038228daf8af03ec"
|
||||
dependencies = [
|
||||
"tz-rs",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ucd-trie"
|
||||
version = "0.1.7"
|
||||
@@ -8905,6 +9197,15 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "uncased"
|
||||
version = "0.9.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697"
|
||||
dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicase"
|
||||
version = "2.8.1"
|
||||
@@ -8945,6 +9246,12 @@ dependencies = [
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unsafe-libyaml"
|
||||
version = "0.2.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
|
||||
|
||||
[[package]]
|
||||
name = "untrusted"
|
||||
version = "0.9.0"
|
||||
@@ -9944,6 +10251,12 @@ dependencies = [
|
||||
"hashlink",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "yansi"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
|
||||
|
||||
[[package]]
|
||||
name = "yoke"
|
||||
version = "0.7.5"
|
||||
@@ -9984,7 +10297,7 @@ dependencies = [
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"hex",
|
||||
"nix 0.29.0",
|
||||
"nix",
|
||||
"ordered-stream",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
@@ -10015,7 +10328,7 @@ dependencies = [
|
||||
"futures-core",
|
||||
"futures-lite",
|
||||
"hex",
|
||||
"nix 0.29.0",
|
||||
"nix",
|
||||
"ordered-stream",
|
||||
"serde",
|
||||
"serde_repr",
|
||||
|
||||
25
Cargo.toml
25
Cargo.toml
@@ -51,8 +51,10 @@ datafusion = "46.0.0"
|
||||
derive_builder = "0.20.2"
|
||||
dioxus = { version = "0.6.3", features = ["router"] }
|
||||
dirs = "6.0.0"
|
||||
dotenv = "0.15.0"
|
||||
ecstore = { path = "./ecstore" }
|
||||
flatbuffers = "24.12.23"
|
||||
figment = { version = "0.10.19", features = ["toml", "yaml", "env"] }
|
||||
flatbuffers = "25.2.10"
|
||||
futures = "0.3.31"
|
||||
futures-util = "0.3.31"
|
||||
common = { path = "./common/common" }
|
||||
@@ -71,7 +73,7 @@ jsonwebtoken = "9.3.1"
|
||||
keyring = { version = "3.6.2", features = ["apple-native", "windows-native", "sync-secret-service"] }
|
||||
lock = { path = "./common/lock" }
|
||||
lazy_static = "1.5.0"
|
||||
libsystemd = { version = "0.7" }
|
||||
libsystemd = { version = "0.7.1" }
|
||||
local-ip-address = "0.6.3"
|
||||
matchit = "0.8.4"
|
||||
md-5 = "0.10.6"
|
||||
@@ -79,13 +81,13 @@ mime = "0.3.17"
|
||||
netif = "0.1.6"
|
||||
opentelemetry = { version = "0.29.1" }
|
||||
opentelemetry-appender-tracing = { version = "0.29.1", features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
|
||||
opentelemetry_sdk = { version = "0.29" }
|
||||
opentelemetry_sdk = { version = "0.29.0" }
|
||||
opentelemetry-stdout = { version = "0.29.0" }
|
||||
opentelemetry-otlp = { version = "0.29" }
|
||||
opentelemetry-otlp = { version = "0.29.0" }
|
||||
opentelemetry-prometheus = { version = "0.29.1" }
|
||||
opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] }
|
||||
parking_lot = "0.12.3"
|
||||
pin-project-lite = "0.2"
|
||||
pin-project-lite = "0.2.16"
|
||||
prometheus = "0.14.0"
|
||||
# pin-utils = "0.1.0"
|
||||
prost = "0.13.5"
|
||||
@@ -94,11 +96,12 @@ prost-types = "0.13.5"
|
||||
protobuf = "3.7"
|
||||
protos = { path = "./common/protos" }
|
||||
rand = "0.8.5"
|
||||
rdkafka = { version = "0.37", features = ["tokio"] }
|
||||
rdkafka = { version = "0.37.0", features = ["tokio"] }
|
||||
reqwest = { version = "0.12.15", default-features = false, features = ["rustls-tls", "charset", "http2", "macos-system-configuration", "stream", "json", "blocking"] }
|
||||
rfd = { version = "0.15.3", default-features = false, features = ["xdg-portal", "tokio"] }
|
||||
rmp = "0.8.14"
|
||||
rmp-serde = "1.3.0"
|
||||
rumqttc = { version = "0.24" }
|
||||
rustfs-obs = { path = "crates/obs", version = "0.0.1" }
|
||||
rustfs-event-notifier = { path = "crates/event-notifier", version = "0.0.1" }
|
||||
rust-embed = "8.7.0"
|
||||
@@ -107,10 +110,13 @@ rustls-pki-types = "1.11.0"
|
||||
rustls-pemfile = "2.2.0"
|
||||
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" }
|
||||
s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" }
|
||||
shadow-rs = { version = "0.38.1", default-features = false }
|
||||
shadow-rs = { version = "1.1.1", default-features = false, features = ["metadata"] }
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = "1.0.140"
|
||||
serde_urlencoded = "0.7.1"
|
||||
serde_with = "3.12.0"
|
||||
smallvec = { version = "1.15.0", features = ["serde"] }
|
||||
strum = { version = "0.27.1", features = ["derive"] }
|
||||
sha2 = "0.10.8"
|
||||
snafu = "0.8.5"
|
||||
tempfile = "3.19.1"
|
||||
@@ -126,7 +132,7 @@ time = { version = "0.3.41", features = [
|
||||
tokio = { version = "1.44.2", features = ["fs", "rt-multi-thread"] }
|
||||
tonic = { version = "0.13.0", features = ["gzip"] }
|
||||
tonic-build = "0.13.0"
|
||||
tokio-rustls = { version = "0.26", default-features = false }
|
||||
tokio-rustls = { version = "0.26.2", default-features = false }
|
||||
tokio-stream = "0.1.17"
|
||||
tokio-util = { version = "0.7.14", features = ["io", "compat"] }
|
||||
tower = { version = "0.5.2", features = ["timeout"] }
|
||||
@@ -136,7 +142,7 @@ tracing-core = "0.1.33"
|
||||
tracing-error = "0.2.1"
|
||||
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "time"] }
|
||||
tracing-appender = "0.2.3"
|
||||
tracing-opentelemetry = "0.30"
|
||||
tracing-opentelemetry = "0.30.0"
|
||||
transform-stream = "0.3.1"
|
||||
url = "2.5.4"
|
||||
uuid = { version = "1.16.0", features = [
|
||||
@@ -144,7 +150,6 @@ uuid = { version = "1.16.0", features = [
|
||||
"fast-rng",
|
||||
"macro-diagnostics",
|
||||
] }
|
||||
wildmatch = "2.4.0"
|
||||
workers = { path = "./common/workers" }
|
||||
|
||||
[profile.wasm-dev]
|
||||
|
||||
@@ -6,16 +6,36 @@ repository.workspace = true
|
||||
rust-version.workspace = true
|
||||
version.workspace = true
|
||||
|
||||
[features]
|
||||
default = ["webhook"]
|
||||
webhook = ["dep:reqwest"]
|
||||
kafka = ["rdkafka"]
|
||||
mqtt = ["rumqttc"]
|
||||
http-producer = ["dep:axum"]
|
||||
|
||||
[dependencies]
|
||||
async-trait = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
axum = { workspace = true, optional = true }
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
dotenv = { workspace = true }
|
||||
figment = { workspace = true, features = ["toml", "yaml", "env"] }
|
||||
rdkafka = { workspace = true, features = ["tokio"], optional = true }
|
||||
reqwest = { workspace = true, optional = true }
|
||||
rumqttc = { workspace = true, optional = true }
|
||||
serde = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
thiserror = { workspace = true }
|
||||
wildmatch = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
serde_with = { workspace = true }
|
||||
smallvec = { workspace = true, features = ["serde"] }
|
||||
strum = { workspace = true, features = ["derive"] }
|
||||
tracing = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync", "net", "macros", "signal", "rt-multi-thread"] }
|
||||
tokio-util = { workspace = true }
|
||||
uuid = { workspace = true, features = ["v4", "serde"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["test-util"] }
|
||||
tracing-subscriber = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
27
crates/event-notifier/examples/.env.example
Normal file
27
crates/event-notifier/examples/.env.example
Normal file
@@ -0,0 +1,27 @@
|
||||
# basic configuration
|
||||
EVENT_NOTIF_STORE_PATH=/var/log/event-notification
|
||||
EVENT_NOTIF_CHANNEL_CAPACITY=5000
|
||||
|
||||
# webhook adapter configuration
|
||||
EVENT_NOTIF_ADAPTERS__0__TYPE=Webhook
|
||||
EVENT_NOTIF_ADAPTERS__0__ENDPOINT=https://api.example.com/webhook
|
||||
EVENT_NOTIF_ADAPTERS__0__AUTH_TOKEN=your-secret-token
|
||||
EVENT_NOTIF_ADAPTERS__0__MAX_RETRIES=3
|
||||
EVENT_NOTIF_ADAPTERS__0__TIMEOUT=5000
|
||||
|
||||
# kafka adapter configuration
|
||||
EVENT_NOTIF_ADAPTERS__1__TYPE=Kafka
|
||||
EVENT_NOTIF_ADAPTERS__1__BROKERS=localhost:9092
|
||||
EVENT_NOTIF_ADAPTERS__1__TOPIC=notifications
|
||||
EVENT_NOTIF_ADAPTERS__1__MAX_RETRIES=3
|
||||
EVENT_NOTIF_ADAPTERS__1__TIMEOUT=5000
|
||||
|
||||
# mqtt adapter configuration
|
||||
EVENT_NOTIF_ADAPTERS__2__TYPE=Mqtt
|
||||
EVENT_NOTIF_ADAPTERS__2__BROKER=mqtt.example.com
|
||||
EVENT_NOTIF_ADAPTERS__2__PORT=1883
|
||||
EVENT_NOTIF_ADAPTERS__2__CLIENT_ID=event-notifier
|
||||
EVENT_NOTIF_ADAPTERS__2__TOPIC=events
|
||||
EVENT_NOTIF_ADAPTERS__2__MAX_RETRIES=3
|
||||
|
||||
EVENT_NOTIF_HTTP__PORT=8080
|
||||
28
crates/event-notifier/examples/event.toml
Normal file
28
crates/event-notifier/examples/event.toml
Normal file
@@ -0,0 +1,28 @@
|
||||
# config.toml
|
||||
store_path = "/var/log/event-notification"
|
||||
channel_capacity = 5000
|
||||
|
||||
[[adapters]]
|
||||
type = "Webhook"
|
||||
endpoint = "https://api.example.com/webhook"
|
||||
auth_token = "your-auth-token"
|
||||
max_retries = 3
|
||||
timeout = 5000
|
||||
|
||||
[[adapters]]
|
||||
type = "Kafka"
|
||||
brokers = "localhost:9092"
|
||||
topic = "notifications"
|
||||
max_retries = 3
|
||||
timeout = 5000
|
||||
|
||||
[[adapters]]
|
||||
type = "Mqtt"
|
||||
broker = "mqtt.example.com"
|
||||
port = 1883
|
||||
client_id = "event-notifier"
|
||||
topic = "events"
|
||||
max_retries = 3
|
||||
|
||||
[http]
|
||||
port = 8080
|
||||
102
crates/event-notifier/examples/simple.rs
Normal file
102
crates/event-notifier/examples/simple.rs
Normal file
@@ -0,0 +1,102 @@
|
||||
use rustfs_event_notifier::create_adapters;
|
||||
use rustfs_event_notifier::NotificationSystem;
|
||||
use rustfs_event_notifier::{AdapterConfig, NotificationConfig, WebhookConfig};
|
||||
use rustfs_event_notifier::{Bucket, Event, Identity, Metadata, Name, Object, Source};
|
||||
use std::collections::HashMap;
|
||||
use std::error;
|
||||
use std::sync::Arc;
|
||||
use tokio::signal;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn error::Error>> {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let mut config = NotificationConfig {
|
||||
store_path: "./events".to_string(),
|
||||
channel_capacity: 100,
|
||||
adapters: vec![AdapterConfig::Webhook(WebhookConfig {
|
||||
endpoint: "http://localhost:8080/webhook".to_string(),
|
||||
auth_token: Some("secret-token".to_string()),
|
||||
custom_headers: Some(HashMap::from([("X-Custom".to_string(), "value".to_string())])),
|
||||
max_retries: 3,
|
||||
timeout: 10,
|
||||
})],
|
||||
http: Default::default(),
|
||||
};
|
||||
config.http.port = 8080;
|
||||
|
||||
// loading configuration from specific env files
|
||||
let _config = NotificationConfig::from_env_file(".env.example")?;
|
||||
|
||||
// loading from a specific file
|
||||
let _config = NotificationConfig::from_file("event.toml")?;
|
||||
|
||||
// Automatically load from multiple sources (Priority: Environment Variables > YAML > TOML)
|
||||
let _config = NotificationConfig::load()?;
|
||||
|
||||
let system = Arc::new(tokio::sync::Mutex::new(NotificationSystem::new(config.clone()).await?));
|
||||
let adapters = create_adapters(&config.adapters)?;
|
||||
|
||||
// create an s3 metadata object
|
||||
let metadata = Metadata {
|
||||
schema_version: "1.0".to_string(),
|
||||
configuration_id: "test-config".to_string(),
|
||||
bucket: Bucket {
|
||||
name: "my-bucket".to_string(),
|
||||
owner_identity: Identity {
|
||||
principal_id: "owner123".to_string(),
|
||||
},
|
||||
arn: "arn:aws:s3:::my-bucket".to_string(),
|
||||
},
|
||||
object: Object {
|
||||
key: "test.txt".to_string(),
|
||||
size: Some(1024),
|
||||
etag: Some("abc123".to_string()),
|
||||
content_type: Some("text/plain".to_string()),
|
||||
user_metadata: None,
|
||||
version_id: None,
|
||||
sequencer: "1234567890".to_string(),
|
||||
},
|
||||
};
|
||||
|
||||
// create source object
|
||||
let source = Source {
|
||||
host: "localhost".to_string(),
|
||||
port: "80".to_string(),
|
||||
user_agent: "curl/7.68.0".to_string(),
|
||||
};
|
||||
|
||||
// create events using builder mode
|
||||
let event = Event::builder()
|
||||
.event_time("2023-10-01T12:00:00.000Z")
|
||||
.event_name(Name::ObjectCreatedPut)
|
||||
.user_identity(Identity {
|
||||
principal_id: "user123".to_string(),
|
||||
})
|
||||
.s3(metadata)
|
||||
.source(source)
|
||||
.channels(vec!["webhook".to_string()])
|
||||
.build()
|
||||
.expect("failed to create event");
|
||||
|
||||
{
|
||||
let system = system.lock().await;
|
||||
system.send_event(event).await?;
|
||||
}
|
||||
|
||||
let system_clone = Arc::clone(&system);
|
||||
let system_handle = tokio::spawn(async move {
|
||||
let mut system = system_clone.lock().await;
|
||||
system.start(adapters).await
|
||||
});
|
||||
|
||||
signal::ctrl_c().await?;
|
||||
tracing::info!("Received shutdown signal");
|
||||
{
|
||||
let system = system.lock().await;
|
||||
system.shutdown();
|
||||
}
|
||||
|
||||
system_handle.await??;
|
||||
Ok(())
|
||||
}
|
||||
76
crates/event-notifier/src/adapter/kafka.rs
Normal file
76
crates/event-notifier/src/adapter/kafka.rs
Normal file
@@ -0,0 +1,76 @@
|
||||
use crate::ChannelAdapter;
|
||||
use crate::Error;
|
||||
use crate::Event;
|
||||
use crate::KafkaConfig;
|
||||
use async_trait::async_trait;
|
||||
use rdkafka::error::KafkaError;
|
||||
use rdkafka::producer::{FutureProducer, FutureRecord};
|
||||
use rdkafka::types::RDKafkaErrorCode;
|
||||
use rdkafka::util::Timeout;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
|
||||
/// Kafka adapter for sending events to a Kafka topic.
|
||||
pub struct KafkaAdapter {
|
||||
producer: FutureProducer,
|
||||
topic: String,
|
||||
max_retries: u32,
|
||||
}
|
||||
|
||||
impl KafkaAdapter {
|
||||
/// Creates a new Kafka adapter.
|
||||
pub fn new(config: &KafkaConfig) -> Result<Self, Error> {
|
||||
// Create a Kafka producer with the provided configuration.
|
||||
let producer = rdkafka::config::ClientConfig::new()
|
||||
.set("bootstrap.servers", &config.brokers)
|
||||
.set("message.timeout.ms", config.timeout.to_string())
|
||||
.create()?;
|
||||
|
||||
Ok(Self {
|
||||
producer,
|
||||
topic: config.topic.clone(),
|
||||
max_retries: config.max_retries,
|
||||
})
|
||||
}
|
||||
/// Sends an event to the Kafka topic with retry logic.
|
||||
async fn send_with_retry(&self, event: &Event) -> Result<(), Error> {
|
||||
let event_id = event.id.to_string();
|
||||
let payload = serde_json::to_string(&event)?;
|
||||
|
||||
for attempt in 0..self.max_retries {
|
||||
let record = FutureRecord::to(&self.topic)
|
||||
.key(&event_id)
|
||||
.payload(&payload);
|
||||
|
||||
match self.producer.send(record, Timeout::Never).await {
|
||||
Ok(_) => return Ok(()),
|
||||
Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) => {
|
||||
tracing::warn!(
|
||||
"Kafka attempt {} failed: Queue full. Retrying...",
|
||||
attempt + 1
|
||||
);
|
||||
sleep(Duration::from_secs(2u64.pow(attempt))).await;
|
||||
}
|
||||
Err((e, _)) => {
|
||||
tracing::error!("Kafka send error: {}", e);
|
||||
return Err(Error::Kafka(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(Error::Custom(
|
||||
"Exceeded maximum retry attempts for Kafka message".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ChannelAdapter for KafkaAdapter {
|
||||
fn name(&self) -> String {
|
||||
"kafka".to_string()
|
||||
}
|
||||
|
||||
async fn send(&self, event: &Event) -> Result<(), Error> {
|
||||
self.send_with_retry(event).await
|
||||
}
|
||||
}
|
||||
54
crates/event-notifier/src/adapter/mod.rs
Normal file
54
crates/event-notifier/src/adapter/mod.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
use crate::AdapterConfig;
|
||||
use crate::Error;
|
||||
use crate::Event;
|
||||
use async_trait::async_trait;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(feature = "kafka")]
|
||||
pub(crate) mod kafka;
|
||||
#[cfg(feature = "mqtt")]
|
||||
pub(crate) mod mqtt;
|
||||
#[cfg(feature = "webhook")]
|
||||
pub(crate) mod webhook;
|
||||
|
||||
/// The `ChannelAdapter` trait defines the interface for all channel adapters.
|
||||
#[async_trait]
|
||||
pub trait ChannelAdapter: Send + Sync + 'static {
|
||||
/// Sends an event to the channel.
|
||||
fn name(&self) -> String;
|
||||
/// Sends an event to the channel.
|
||||
async fn send(&self, event: &Event) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
/// Creates channel adapters based on the provided configuration.
|
||||
pub fn create_adapters(configs: &[AdapterConfig]) -> Result<Vec<Arc<dyn ChannelAdapter>>, Box<Error>> {
|
||||
let mut adapters: Vec<Arc<dyn ChannelAdapter>> = Vec::new();
|
||||
|
||||
for config in configs {
|
||||
match config {
|
||||
#[cfg(feature = "webhook")]
|
||||
AdapterConfig::Webhook(webhook_config) => {
|
||||
webhook_config.validate().map_err(|e| Box::new(Error::ConfigError(e)))?;
|
||||
adapters.push(Arc::new(webhook::WebhookAdapter::new(webhook_config.clone())));
|
||||
}
|
||||
#[cfg(feature = "kafka")]
|
||||
AdapterConfig::Kafka(kafka_config) => {
|
||||
adapters.push(Arc::new(kafka::KafkaAdapter::new(kafka_config)?));
|
||||
}
|
||||
#[cfg(feature = "mqtt")]
|
||||
AdapterConfig::Mqtt(mqtt_config) => {
|
||||
let (mqtt, mut event_loop) = mqtt::MqttAdapter::new(mqtt_config);
|
||||
tokio::spawn(async move { while event_loop.poll().await.is_ok() {} });
|
||||
adapters.push(Arc::new(mqtt));
|
||||
}
|
||||
#[cfg(not(feature = "webhook"))]
|
||||
AdapterConfig::Webhook(_) => return Err(Box::new(Error::FeatureDisabled("webhook"))),
|
||||
#[cfg(not(feature = "kafka"))]
|
||||
AdapterConfig::Kafka(_) => return Err(Box::new(Error::FeatureDisabled("kafka"))),
|
||||
#[cfg(not(feature = "mqtt"))]
|
||||
AdapterConfig::Mqtt(_) => return Err(Box::new(Error::FeatureDisabled("mqtt"))),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(adapters)
|
||||
}
|
||||
58
crates/event-notifier/src/adapter/mqtt.rs
Normal file
58
crates/event-notifier/src/adapter/mqtt.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
use crate::ChannelAdapter;
|
||||
use crate::Error;
|
||||
use crate::Event;
|
||||
use crate::MqttConfig;
|
||||
use async_trait::async_trait;
|
||||
use rumqttc::{AsyncClient, MqttOptions, QoS};
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
|
||||
/// MQTT adapter for sending events to an MQTT broker.
|
||||
pub struct MqttAdapter {
|
||||
client: AsyncClient,
|
||||
topic: String,
|
||||
max_retries: u32,
|
||||
}
|
||||
|
||||
impl MqttAdapter {
|
||||
/// Creates a new MQTT adapter.
|
||||
pub fn new(config: &MqttConfig) -> (Self, rumqttc::EventLoop) {
|
||||
let mqtt_options = MqttOptions::new(&config.client_id, &config.broker, config.port);
|
||||
let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_options, 10);
|
||||
(
|
||||
Self {
|
||||
client,
|
||||
topic: config.topic.clone(),
|
||||
max_retries: config.max_retries,
|
||||
},
|
||||
event_loop,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ChannelAdapter for MqttAdapter {
|
||||
fn name(&self) -> String {
|
||||
"mqtt".to_string()
|
||||
}
|
||||
|
||||
async fn send(&self, event: &Event) -> Result<(), Error> {
|
||||
let payload = serde_json::to_string(event).map_err(Error::Serde)?;
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
match self
|
||||
.client
|
||||
.publish(&self.topic, QoS::AtLeastOnce, false, payload.clone())
|
||||
.await
|
||||
{
|
||||
Ok(()) => return Ok(()),
|
||||
Err(e) if attempt < self.max_retries => {
|
||||
attempt += 1;
|
||||
tracing::warn!("MQTT attempt {} failed: {}. Retrying...", attempt, e);
|
||||
sleep(Duration::from_secs(2u64.pow(attempt))).await;
|
||||
}
|
||||
Err(e) => return Err(Error::Mqtt(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
63
crates/event-notifier/src/adapter/webhook.rs
Normal file
63
crates/event-notifier/src/adapter/webhook.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
use crate::ChannelAdapter;
|
||||
use crate::Error;
|
||||
use crate::Event;
|
||||
use crate::WebhookConfig;
|
||||
use async_trait::async_trait;
|
||||
use reqwest::{Client, RequestBuilder};
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
|
||||
/// Webhook adapter for sending events to a webhook endpoint.
|
||||
pub struct WebhookAdapter {
|
||||
config: WebhookConfig,
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl WebhookAdapter {
|
||||
/// Creates a new Webhook adapter.
|
||||
pub fn new(config: WebhookConfig) -> Self {
|
||||
let client = Client::builder()
|
||||
.timeout(Duration::from_secs(config.timeout))
|
||||
.build()
|
||||
.expect("Failed to build reqwest client");
|
||||
Self { config, client }
|
||||
}
|
||||
/// Builds the request to send the event.
|
||||
fn build_request(&self, event: &Event) -> RequestBuilder {
|
||||
let mut request = self.client.post(&self.config.endpoint).json(event);
|
||||
if let Some(token) = &self.config.auth_token {
|
||||
request = request.header("Authorization", format!("Bearer {}", token));
|
||||
}
|
||||
if let Some(headers) = &self.config.custom_headers {
|
||||
for (key, value) in headers {
|
||||
request = request.header(key, value);
|
||||
}
|
||||
}
|
||||
request
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ChannelAdapter for WebhookAdapter {
|
||||
fn name(&self) -> String {
|
||||
"webhook".to_string()
|
||||
}
|
||||
|
||||
async fn send(&self, event: &Event) -> Result<(), Error> {
|
||||
let mut attempt = 0;
|
||||
loop {
|
||||
match self.build_request(event).send().await {
|
||||
Ok(response) => {
|
||||
response.error_for_status().map_err(Error::Http)?;
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) if attempt < self.config.max_retries => {
|
||||
attempt += 1;
|
||||
tracing::warn!("Webhook attempt {} failed: {}. Retrying...", attempt, e);
|
||||
sleep(Duration::from_secs(2u64.pow(attempt))).await;
|
||||
}
|
||||
Err(e) => return Err(Error::Http(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
68
crates/event-notifier/src/bus.rs
Normal file
68
crates/event-notifier/src/bus.rs
Normal file
@@ -0,0 +1,68 @@
|
||||
use crate::ChannelAdapter;
|
||||
use crate::Error;
|
||||
use crate::EventStore;
|
||||
use crate::{Event, Log};
|
||||
use chrono::Utc;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// Handles incoming events from the producer.
|
||||
///
|
||||
/// This function is responsible for receiving events from the producer and sending them to the appropriate adapters.
|
||||
/// It also handles the shutdown process and saves any pending logs to the event store.
|
||||
pub async fn event_bus(
|
||||
mut rx: mpsc::Receiver<Event>,
|
||||
adapters: Vec<Arc<dyn ChannelAdapter>>,
|
||||
store: Arc<EventStore>,
|
||||
shutdown: CancellationToken,
|
||||
) -> Result<(), Error> {
|
||||
let mut pending_logs = Vec::new();
|
||||
let mut current_log = Log {
|
||||
event_name: crate::event::Name::Everything,
|
||||
key: Utc::now().timestamp().to_string(),
|
||||
records: Vec::new(),
|
||||
};
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(event) = rx.recv() => {
|
||||
current_log.records.push(event.clone());
|
||||
let mut send_tasks = Vec::new();
|
||||
for adapter in &adapters {
|
||||
if event.channels.contains(&adapter.name()) {
|
||||
let adapter = adapter.clone();
|
||||
let event = event.clone();
|
||||
send_tasks.push(tokio::spawn(async move {
|
||||
if let Err(e) = adapter.send(&event).await {
|
||||
tracing::error!("Failed to send event to {}: {}", adapter.name(), e);
|
||||
Err(e)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
for task in send_tasks {
|
||||
if task.await?.is_err() {
|
||||
current_log.records.retain(|e| e.id != event.id);
|
||||
}
|
||||
}
|
||||
if !current_log.records.is_empty() {
|
||||
pending_logs.push(current_log.clone());
|
||||
}
|
||||
current_log.records.clear();
|
||||
}
|
||||
_ = shutdown.cancelled() => {
|
||||
tracing::info!("Shutting down event bus, saving pending logs...");
|
||||
if !current_log.records.is_empty() {
|
||||
pending_logs.push(current_log);
|
||||
}
|
||||
store.save_logs(&pending_logs).await?;
|
||||
break;
|
||||
}
|
||||
else => break,
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
161
crates/event-notifier/src/config.rs
Normal file
161
crates/event-notifier/src/config.rs
Normal file
@@ -0,0 +1,161 @@
|
||||
use crate::Error;
|
||||
use figment::providers::Format;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Configuration for the notification system.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WebhookConfig {
|
||||
pub endpoint: String,
|
||||
pub auth_token: Option<String>,
|
||||
pub custom_headers: Option<HashMap<String, String>>,
|
||||
pub max_retries: u32,
|
||||
pub timeout: u64,
|
||||
}
|
||||
|
||||
impl WebhookConfig {
|
||||
/// verify that the configuration is valid
|
||||
pub fn validate(&self) -> Result<(), String> {
|
||||
// verify that endpoint cannot be empty
|
||||
if self.endpoint.trim().is_empty() {
|
||||
return Err("Webhook endpoint cannot be empty".to_string());
|
||||
}
|
||||
|
||||
// verification timeout must be reasonable
|
||||
if self.timeout == 0 {
|
||||
return Err("Webhook timeout must be greater than 0".to_string());
|
||||
}
|
||||
|
||||
// Verify that the maximum number of retry is reasonable
|
||||
if self.max_retries > 10 {
|
||||
return Err("Maximum retry count cannot exceed 10".to_string());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration for the Kafka adapter.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct KafkaConfig {
|
||||
pub brokers: String,
|
||||
pub topic: String,
|
||||
pub max_retries: u32,
|
||||
pub timeout: u64,
|
||||
}
|
||||
|
||||
/// Configuration for the MQTT adapter.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct MqttConfig {
|
||||
pub broker: String,
|
||||
pub port: u16,
|
||||
pub client_id: String,
|
||||
pub topic: String,
|
||||
pub max_retries: u32,
|
||||
}
|
||||
|
||||
/// Configuration for the notification system.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum AdapterConfig {
|
||||
Webhook(WebhookConfig),
|
||||
Kafka(KafkaConfig),
|
||||
Mqtt(MqttConfig),
|
||||
}
|
||||
|
||||
/// http producer configuration
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct HttpProducerConfig {
|
||||
#[serde(default = "default_http_port")]
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
impl Default for HttpProducerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
port: default_http_port(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn default_http_port() -> u16 {
|
||||
3000
|
||||
}
|
||||
|
||||
/// Configuration for the notification system.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct NotificationConfig {
|
||||
#[serde(default = "default_store_path")]
|
||||
pub store_path: String,
|
||||
#[serde(default = "default_channel_capacity")]
|
||||
pub channel_capacity: usize,
|
||||
pub adapters: Vec<AdapterConfig>,
|
||||
#[serde(default)]
|
||||
pub http: HttpProducerConfig,
|
||||
}
|
||||
|
||||
impl Default for NotificationConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
store_path: default_store_path(),
|
||||
channel_capacity: default_channel_capacity(),
|
||||
adapters: Vec::new(),
|
||||
http: HttpProducerConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NotificationConfig {
|
||||
/// create a new configuration with default values
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// create a configuration from a configuration file
|
||||
pub fn from_file(path: &str) -> Result<Self, Error> {
|
||||
let config = figment::Figment::new()
|
||||
.merge(figment::providers::Toml::file(path))
|
||||
.extract()?;
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
/// Read configuration from multiple sources (support TOML, YAML, .env)
|
||||
pub fn load() -> Result<Self, Error> {
|
||||
let figment = figment::Figment::new()
|
||||
// First try to read the config.toml of the current directory
|
||||
.merge(figment::providers::Toml::file("event.toml"))
|
||||
// Then try to read the config.yaml of the current directory
|
||||
.merge(figment::providers::Yaml::file("event.yaml"))
|
||||
// Finally read the environment variable and overwrite the previous value
|
||||
.merge(figment::providers::Env::prefixed("EVENT_NOTIF_"));
|
||||
|
||||
Ok(figment.extract()?)
|
||||
}
|
||||
|
||||
/// loading configuration from env file
|
||||
pub fn from_env_file(path: &str) -> Result<Self, Error> {
|
||||
// loading env files
|
||||
dotenv::from_path(path)
|
||||
.map_err(|e| Error::ConfigError(format!("unable to load env file: {}", e)))?;
|
||||
|
||||
// Extract configuration from environment variables using figurement
|
||||
let figment =
|
||||
figment::Figment::new().merge(figment::providers::Env::prefixed("EVENT_NOTIF_"));
|
||||
|
||||
Ok(figment.extract()?)
|
||||
}
|
||||
}
|
||||
|
||||
/// Provide temporary directories as default storage paths
|
||||
fn default_store_path() -> String {
|
||||
std::env::temp_dir()
|
||||
.join("event-notification")
|
||||
.to_string_lossy()
|
||||
.to_string()
|
||||
}
|
||||
|
||||
/// Provides the recommended default channel capacity for high concurrency systems
|
||||
fn default_channel_capacity() -> usize {
|
||||
10000 // Reasonable default values for high concurrency systems
|
||||
}
|
||||
@@ -1,25 +1,45 @@
|
||||
// src/error.rs
|
||||
use thiserror::Error;
|
||||
use tokio::sync::mpsc::error;
|
||||
use tokio::task::JoinError;
|
||||
|
||||
/// The `Error` enum represents all possible errors that can occur in the application.
|
||||
/// It implements the `std::error::Error` trait and provides a way to convert various error types into a single error type.
|
||||
#[derive(Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("target not found: {0}")]
|
||||
TargetNotFound(String),
|
||||
|
||||
#[error("send failed: {0}")]
|
||||
SendError(String),
|
||||
|
||||
#[error("target error: {0}")]
|
||||
TargetError(String),
|
||||
|
||||
#[error("invalid configuration: {0}")]
|
||||
#[error("Join error: {0}")]
|
||||
JoinError(#[from] JoinError),
|
||||
#[error("IO error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
#[error("Serialization error: {0}")]
|
||||
Serde(#[from] serde_json::Error),
|
||||
#[error("HTTP error: {0}")]
|
||||
Http(#[from] reqwest::Error),
|
||||
#[cfg(feature = "kafka")]
|
||||
#[error("Kafka error: {0}")]
|
||||
Kafka(#[from] rdkafka::error::KafkaError),
|
||||
#[cfg(feature = "mqtt")]
|
||||
#[error("MQTT error: {0}")]
|
||||
Mqtt(#[from] rumqttc::ClientError),
|
||||
#[error("Channel send error: {0}")]
|
||||
ChannelSend(#[from] Box<error::SendError<crate::event::Event>>),
|
||||
#[error("Feature disabled: {0}")]
|
||||
FeatureDisabled(&'static str),
|
||||
#[error("Event bus already started")]
|
||||
EventBusStarted,
|
||||
#[error("necessary fields are missing:{0}")]
|
||||
MissingField(&'static str),
|
||||
#[error("field verification failed:{0}")]
|
||||
ValidationError(&'static str),
|
||||
#[error("{0}")]
|
||||
Custom(String),
|
||||
#[error("Configuration error: {0}")]
|
||||
ConfigError(String),
|
||||
|
||||
#[error("store error: {0}")]
|
||||
StoreError(String),
|
||||
|
||||
#[error("invalid event name: {0}")]
|
||||
InvalidEventName(String), // 添加此变体
|
||||
#[error("Configuration loading error: {0}")]
|
||||
Figment(#[from] figment::Error),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
impl Error {
|
||||
pub(crate) fn custom(msg: &str) -> Error {
|
||||
Self::Custom(msg.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,53 +1,442 @@
|
||||
use crate::Error;
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{DeserializeFromStr, SerializeDisplay};
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use strum::{Display, EnumString};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Event {
|
||||
pub event_version: String,
|
||||
pub event_source: String,
|
||||
pub aws_region: String,
|
||||
pub event_time: String,
|
||||
pub event_name: String,
|
||||
pub user_identity: Identity,
|
||||
pub request_parameters: HashMap<String, String>,
|
||||
pub response_elements: HashMap<String, String>,
|
||||
pub s3: Metadata,
|
||||
pub source: Source,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct Identity {
|
||||
#[serde(rename = "principalId")]
|
||||
pub principal_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct Bucket {
|
||||
pub name: String,
|
||||
#[serde(rename = "ownerIdentity")]
|
||||
pub owner_identity: Identity,
|
||||
pub arn: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct Object {
|
||||
pub key: String,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub size: Option<i64>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none", rename = "eTag")]
|
||||
pub etag: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none", rename = "contentType")]
|
||||
pub content_type: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none", rename = "userMetadata")]
|
||||
pub user_metadata: Option<HashMap<String, String>>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none", rename = "versionId")]
|
||||
pub version_id: Option<String>,
|
||||
pub sequencer: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct Metadata {
|
||||
#[serde(rename = "s3SchemaVersion")]
|
||||
pub schema_version: String,
|
||||
#[serde(rename = "configurationId")]
|
||||
pub configuration_id: String,
|
||||
pub bucket: Bucket,
|
||||
pub object: Object,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Bucket {
|
||||
pub name: String,
|
||||
pub owner_identity: Identity,
|
||||
pub arn: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Object {
|
||||
pub key: String,
|
||||
pub version_id: Option<String>,
|
||||
pub sequencer: String,
|
||||
pub size: Option<u64>,
|
||||
pub etag: Option<String>,
|
||||
pub content_type: Option<String>,
|
||||
pub user_metadata: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct Source {
|
||||
pub host: String,
|
||||
pub port: String,
|
||||
#[serde(rename = "userAgent")]
|
||||
pub user_agent: String,
|
||||
}
|
||||
|
||||
/// Builder for creating an Event.
|
||||
///
|
||||
/// This struct is used to build an Event object with various parameters.
|
||||
/// It provides methods to set each parameter and a build method to create the Event.
|
||||
#[derive(Default, Clone)]
|
||||
pub struct EventBuilder {
|
||||
event_version: Option<String>,
|
||||
event_source: Option<String>,
|
||||
aws_region: Option<String>,
|
||||
event_time: Option<String>,
|
||||
event_name: Option<Name>,
|
||||
user_identity: Option<Identity>,
|
||||
request_parameters: Option<HashMap<String, String>>,
|
||||
response_elements: Option<HashMap<String, String>>,
|
||||
s3: Option<Metadata>,
|
||||
source: Option<Source>,
|
||||
channels: Option<SmallVec<[String; 2]>>,
|
||||
}
|
||||
|
||||
impl EventBuilder {
|
||||
/// create a builder that pre filled default values
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
event_version: Some(Cow::Borrowed("2.0").to_string()),
|
||||
event_source: Some(Cow::Borrowed("aws:s3").to_string()),
|
||||
aws_region: Some("us-east-1".to_string()),
|
||||
event_time: Some(Utc::now().to_rfc3339()),
|
||||
event_name: None,
|
||||
user_identity: Some(Identity {
|
||||
principal_id: "anonymous".to_string(),
|
||||
}),
|
||||
request_parameters: Some(HashMap::new()),
|
||||
response_elements: Some(HashMap::new()),
|
||||
s3: None,
|
||||
source: None,
|
||||
channels: Some(Vec::new().into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// verify and set the event version
|
||||
pub fn event_version(mut self, event_version: impl Into<String>) -> Self {
|
||||
let event_version = event_version.into();
|
||||
if !event_version.is_empty() {
|
||||
self.event_version = Some(event_version);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// verify and set the event source
|
||||
pub fn event_source(mut self, event_source: impl Into<String>) -> Self {
|
||||
let event_source = event_source.into();
|
||||
if !event_source.is_empty() {
|
||||
self.event_source = Some(event_source);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// set up aws regions
|
||||
pub fn aws_region(mut self, aws_region: impl Into<String>) -> Self {
|
||||
self.aws_region = Some(aws_region.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// set event time
|
||||
pub fn event_time(mut self, event_time: impl Into<String>) -> Self {
|
||||
self.event_time = Some(event_time.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// set event name
|
||||
pub fn event_name(mut self, event_name: Name) -> Self {
|
||||
self.event_name = Some(event_name);
|
||||
self
|
||||
}
|
||||
|
||||
/// set user identity
|
||||
pub fn user_identity(mut self, user_identity: Identity) -> Self {
|
||||
self.user_identity = Some(user_identity);
|
||||
self
|
||||
}
|
||||
|
||||
/// set request parameters
|
||||
pub fn request_parameters(mut self, request_parameters: HashMap<String, String>) -> Self {
|
||||
self.request_parameters = Some(request_parameters);
|
||||
self
|
||||
}
|
||||
|
||||
/// set response elements
|
||||
pub fn response_elements(mut self, response_elements: HashMap<String, String>) -> Self {
|
||||
self.response_elements = Some(response_elements);
|
||||
self
|
||||
}
|
||||
|
||||
/// setting up s3 metadata
|
||||
pub fn s3(mut self, s3: Metadata) -> Self {
|
||||
self.s3 = Some(s3);
|
||||
self
|
||||
}
|
||||
|
||||
/// set event source information
|
||||
pub fn source(mut self, source: Source) -> Self {
|
||||
self.source = Some(source);
|
||||
self
|
||||
}
|
||||
|
||||
/// set up the sending channel
|
||||
pub fn channels(mut self, channels: Vec<String>) -> Self {
|
||||
self.channels = Some(channels.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Create a preconfigured builder for common object event scenarios
|
||||
pub fn for_object_creation(s3: Metadata, source: Source) -> Self {
|
||||
Self::new().event_name(Name::ObjectCreatedPut).s3(s3).source(source)
|
||||
}
|
||||
|
||||
/// Create a preconfigured builder for object deletion events
|
||||
pub fn for_object_removal(s3: Metadata, source: Source) -> Self {
|
||||
Self::new().event_name(Name::ObjectRemovedDelete).s3(s3).source(source)
|
||||
}
|
||||
|
||||
/// build event instance
|
||||
///
|
||||
/// Verify the required fields and create a complete Event object
|
||||
pub fn build(self) -> Result<Event, Error> {
|
||||
let event_version = self.event_version.ok_or(Error::MissingField("event_version"))?;
|
||||
|
||||
let event_source = self.event_source.ok_or(Error::MissingField("event_source"))?;
|
||||
|
||||
let aws_region = self.aws_region.ok_or(Error::MissingField("aws_region"))?;
|
||||
|
||||
let event_time = self.event_time.ok_or(Error::MissingField("event_time"))?;
|
||||
|
||||
let event_name = self.event_name.ok_or(Error::MissingField("event_name"))?;
|
||||
|
||||
let user_identity = self.user_identity.ok_or(Error::MissingField("user_identity"))?;
|
||||
|
||||
let request_parameters = self.request_parameters.unwrap_or_default();
|
||||
let response_elements = self.response_elements.unwrap_or_default();
|
||||
|
||||
let s3 = self.s3.ok_or(Error::MissingField("s3"))?;
|
||||
|
||||
let source = self.source.ok_or(Error::MissingField("source"))?;
|
||||
|
||||
let channels = self.channels.unwrap_or_else(|| smallvec![]);
|
||||
|
||||
Ok(Event {
|
||||
event_version,
|
||||
event_source,
|
||||
aws_region,
|
||||
event_time,
|
||||
event_name,
|
||||
user_identity,
|
||||
request_parameters,
|
||||
response_elements,
|
||||
s3,
|
||||
source,
|
||||
id: Uuid::new_v4(),
|
||||
timestamp: Utc::now(),
|
||||
channels,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct Event {
|
||||
#[serde(rename = "eventVersion")]
|
||||
pub event_version: String,
|
||||
#[serde(rename = "eventSource")]
|
||||
pub event_source: String,
|
||||
#[serde(rename = "awsRegion")]
|
||||
pub aws_region: String,
|
||||
#[serde(rename = "eventTime")]
|
||||
pub event_time: String,
|
||||
#[serde(rename = "eventName")]
|
||||
pub event_name: Name,
|
||||
#[serde(rename = "userIdentity")]
|
||||
pub user_identity: Identity,
|
||||
#[serde(rename = "requestParameters")]
|
||||
pub request_parameters: HashMap<String, String>,
|
||||
#[serde(rename = "responseElements")]
|
||||
pub response_elements: HashMap<String, String>,
|
||||
pub s3: Metadata,
|
||||
pub source: Source,
|
||||
pub id: Uuid,
|
||||
pub timestamp: DateTime<Utc>,
|
||||
pub channels: SmallVec<[String; 2]>,
|
||||
}
|
||||
|
||||
impl Event {
|
||||
/// create a new event builder
|
||||
///
|
||||
/// Returns an EventBuilder instance pre-filled with default values
|
||||
pub fn builder() -> EventBuilder {
|
||||
EventBuilder::new()
|
||||
}
|
||||
|
||||
/// Quickly create Event instances with necessary fields
|
||||
///
|
||||
/// suitable for common s3 event scenarios
|
||||
pub fn create(event_name: Name, s3: Metadata, source: Source, channels: Vec<String>) -> Self {
|
||||
Self::builder()
|
||||
.event_name(event_name)
|
||||
.s3(s3)
|
||||
.source(source)
|
||||
.channels(channels)
|
||||
.build()
|
||||
.expect("Failed to create event, missing necessary parameters")
|
||||
}
|
||||
|
||||
/// a convenient way to create a preconfigured builder
|
||||
pub fn for_object_creation(s3: Metadata, source: Source) -> EventBuilder {
|
||||
EventBuilder::for_object_creation(s3, source)
|
||||
}
|
||||
|
||||
/// a convenient way to create a preconfigured builder
|
||||
pub fn for_object_removal(s3: Metadata, source: Source) -> EventBuilder {
|
||||
EventBuilder::for_object_removal(s3, source)
|
||||
}
|
||||
|
||||
/// Determine whether an event belongs to a specific type
|
||||
pub fn is_type(&self, event_type: Name) -> bool {
|
||||
let mask = event_type.mask();
|
||||
(self.event_name.mask() & mask) != 0
|
||||
}
|
||||
|
||||
/// Determine whether an event needs to be sent to a specific channel
|
||||
pub fn is_for_channel(&self, channel: &str) -> bool {
|
||||
self.channels.iter().any(|c| c == channel)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct Log {
|
||||
#[serde(rename = "eventName")]
|
||||
pub event_name: Name,
|
||||
pub key: String,
|
||||
pub records: Vec<Event>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, SerializeDisplay, DeserializeFromStr, Display, EnumString)]
|
||||
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
|
||||
pub enum Name {
|
||||
ObjectAccessedGet,
|
||||
ObjectAccessedGetRetention,
|
||||
ObjectAccessedGetLegalHold,
|
||||
ObjectAccessedHead,
|
||||
ObjectAccessedAttributes,
|
||||
ObjectCreatedCompleteMultipartUpload,
|
||||
ObjectCreatedCopy,
|
||||
ObjectCreatedPost,
|
||||
ObjectCreatedPut,
|
||||
ObjectCreatedPutRetention,
|
||||
ObjectCreatedPutLegalHold,
|
||||
ObjectCreatedPutTagging,
|
||||
ObjectCreatedDeleteTagging,
|
||||
ObjectRemovedDelete,
|
||||
ObjectRemovedDeleteMarkerCreated,
|
||||
ObjectRemovedDeleteAllVersions,
|
||||
ObjectRemovedNoOp,
|
||||
BucketCreated,
|
||||
BucketRemoved,
|
||||
ObjectReplicationFailed,
|
||||
ObjectReplicationComplete,
|
||||
ObjectReplicationMissedThreshold,
|
||||
ObjectReplicationReplicatedAfterThreshold,
|
||||
ObjectReplicationNotTracked,
|
||||
ObjectRestorePost,
|
||||
ObjectRestoreCompleted,
|
||||
ObjectTransitionFailed,
|
||||
ObjectTransitionComplete,
|
||||
ObjectManyVersions,
|
||||
ObjectLargeVersions,
|
||||
PrefixManyFolders,
|
||||
IlmDelMarkerExpirationDelete,
|
||||
ObjectAccessedAll,
|
||||
ObjectCreatedAll,
|
||||
ObjectRemovedAll,
|
||||
ObjectReplicationAll,
|
||||
ObjectRestoreAll,
|
||||
ObjectTransitionAll,
|
||||
ObjectScannerAll,
|
||||
Everything,
|
||||
}
|
||||
|
||||
impl Name {
|
||||
pub fn expand(&self) -> Vec<Name> {
|
||||
match self {
|
||||
Name::ObjectAccessedAll => vec![
|
||||
Name::ObjectAccessedGet,
|
||||
Name::ObjectAccessedHead,
|
||||
Name::ObjectAccessedGetRetention,
|
||||
Name::ObjectAccessedGetLegalHold,
|
||||
Name::ObjectAccessedAttributes,
|
||||
],
|
||||
Name::ObjectCreatedAll => vec![
|
||||
Name::ObjectCreatedCompleteMultipartUpload,
|
||||
Name::ObjectCreatedCopy,
|
||||
Name::ObjectCreatedPost,
|
||||
Name::ObjectCreatedPut,
|
||||
Name::ObjectCreatedPutRetention,
|
||||
Name::ObjectCreatedPutLegalHold,
|
||||
Name::ObjectCreatedPutTagging,
|
||||
Name::ObjectCreatedDeleteTagging,
|
||||
],
|
||||
Name::ObjectRemovedAll => vec![
|
||||
Name::ObjectRemovedDelete,
|
||||
Name::ObjectRemovedDeleteMarkerCreated,
|
||||
Name::ObjectRemovedNoOp,
|
||||
Name::ObjectRemovedDeleteAllVersions,
|
||||
],
|
||||
Name::ObjectReplicationAll => vec![
|
||||
Name::ObjectReplicationFailed,
|
||||
Name::ObjectReplicationComplete,
|
||||
Name::ObjectReplicationNotTracked,
|
||||
Name::ObjectReplicationMissedThreshold,
|
||||
Name::ObjectReplicationReplicatedAfterThreshold,
|
||||
],
|
||||
Name::ObjectRestoreAll => vec![Name::ObjectRestorePost, Name::ObjectRestoreCompleted],
|
||||
Name::ObjectTransitionAll => {
|
||||
vec![Name::ObjectTransitionFailed, Name::ObjectTransitionComplete]
|
||||
}
|
||||
Name::ObjectScannerAll => vec![Name::ObjectManyVersions, Name::ObjectLargeVersions, Name::PrefixManyFolders],
|
||||
Name::Everything => (1..=Name::IlmDelMarkerExpirationDelete as u32)
|
||||
.map(|i| Name::from_repr(i).unwrap())
|
||||
.collect(),
|
||||
_ => vec![*self],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mask(&self) -> u64 {
|
||||
if (*self as u32) < Name::ObjectAccessedAll as u32 {
|
||||
1 << (*self as u32 - 1)
|
||||
} else {
|
||||
self.expand().iter().fold(0, |acc, n| acc | (1 << (*n as u32 - 1)))
|
||||
}
|
||||
}
|
||||
|
||||
fn from_repr(discriminant: u32) -> Option<Self> {
|
||||
match discriminant {
|
||||
1 => Some(Name::ObjectAccessedGet),
|
||||
2 => Some(Name::ObjectAccessedGetRetention),
|
||||
3 => Some(Name::ObjectAccessedGetLegalHold),
|
||||
4 => Some(Name::ObjectAccessedHead),
|
||||
5 => Some(Name::ObjectAccessedAttributes),
|
||||
6 => Some(Name::ObjectCreatedCompleteMultipartUpload),
|
||||
7 => Some(Name::ObjectCreatedCopy),
|
||||
8 => Some(Name::ObjectCreatedPost),
|
||||
9 => Some(Name::ObjectCreatedPut),
|
||||
10 => Some(Name::ObjectCreatedPutRetention),
|
||||
11 => Some(Name::ObjectCreatedPutLegalHold),
|
||||
12 => Some(Name::ObjectCreatedPutTagging),
|
||||
13 => Some(Name::ObjectCreatedDeleteTagging),
|
||||
14 => Some(Name::ObjectRemovedDelete),
|
||||
15 => Some(Name::ObjectRemovedDeleteMarkerCreated),
|
||||
16 => Some(Name::ObjectRemovedDeleteAllVersions),
|
||||
17 => Some(Name::ObjectRemovedNoOp),
|
||||
18 => Some(Name::BucketCreated),
|
||||
19 => Some(Name::BucketRemoved),
|
||||
20 => Some(Name::ObjectReplicationFailed),
|
||||
21 => Some(Name::ObjectReplicationComplete),
|
||||
22 => Some(Name::ObjectReplicationMissedThreshold),
|
||||
23 => Some(Name::ObjectReplicationReplicatedAfterThreshold),
|
||||
24 => Some(Name::ObjectReplicationNotTracked),
|
||||
25 => Some(Name::ObjectRestorePost),
|
||||
26 => Some(Name::ObjectRestoreCompleted),
|
||||
27 => Some(Name::ObjectTransitionFailed),
|
||||
28 => Some(Name::ObjectTransitionComplete),
|
||||
29 => Some(Name::ObjectManyVersions),
|
||||
30 => Some(Name::ObjectLargeVersions),
|
||||
31 => Some(Name::PrefixManyFolders),
|
||||
32 => Some(Name::IlmDelMarkerExpirationDelete),
|
||||
33 => Some(Name::ObjectAccessedAll),
|
||||
34 => Some(Name::ObjectCreatedAll),
|
||||
35 => Some(Name::ObjectRemovedAll),
|
||||
36 => Some(Name::ObjectReplicationAll),
|
||||
37 => Some(Name::ObjectRestoreAll),
|
||||
38 => Some(Name::ObjectTransitionAll),
|
||||
39 => Some(Name::ObjectScannerAll),
|
||||
40 => Some(Name::Everything),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,79 +0,0 @@
|
||||
use crate::error::Error;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::str::FromStr;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
#[repr(u64)]
|
||||
pub enum EventName {
|
||||
// 对象操作事件
|
||||
ObjectCreatedAll = 1 << 0,
|
||||
ObjectCreatedPut = 1 << 1,
|
||||
ObjectCreatedPost = 1 << 2,
|
||||
ObjectCreatedCopy = 1 << 3,
|
||||
ObjectCreatedCompleteMultipartUpload = 1 << 4,
|
||||
|
||||
ObjectRemovedAll = 1 << 5,
|
||||
ObjectRemovedDelete = 1 << 6,
|
||||
ObjectRemovedDeleteMarkerCreated = 1 << 7,
|
||||
|
||||
ObjectAccessedAll = 1 << 8,
|
||||
ObjectAccessedGet = 1 << 9,
|
||||
ObjectAccessedHead = 1 << 10,
|
||||
|
||||
ObjectRestoredAll = 1 << 11,
|
||||
ObjectRestoredPost = 1 << 12,
|
||||
ObjectRestoredCompleted = 1 << 13,
|
||||
|
||||
ReplicationAll = 1 << 14,
|
||||
ReplicationFailed = 1 << 15,
|
||||
ReplicationComplete = 1 << 16,
|
||||
}
|
||||
|
||||
impl EventName {
|
||||
pub fn mask(&self) -> u64 {
|
||||
*self as u64
|
||||
}
|
||||
|
||||
pub fn expand(&self) -> Vec<EventName> {
|
||||
match self {
|
||||
EventName::ObjectCreatedAll => vec![
|
||||
EventName::ObjectCreatedPut,
|
||||
EventName::ObjectCreatedPost,
|
||||
EventName::ObjectCreatedCopy,
|
||||
EventName::ObjectCreatedCompleteMultipartUpload,
|
||||
],
|
||||
EventName::ObjectRemovedAll => vec![EventName::ObjectRemovedDelete, EventName::ObjectRemovedDeleteMarkerCreated],
|
||||
EventName::ObjectAccessedAll => vec![EventName::ObjectAccessedGet, EventName::ObjectAccessedHead],
|
||||
EventName::ObjectRestoredAll => vec![EventName::ObjectRestoredPost, EventName::ObjectRestoredCompleted],
|
||||
EventName::ReplicationAll => vec![EventName::ReplicationFailed, EventName::ReplicationComplete],
|
||||
_ => vec![*self],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for EventName {
|
||||
type Err = Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"s3:ObjectCreated:*" => Ok(EventName::ObjectCreatedAll),
|
||||
"s3:ObjectCreated:Put" => Ok(EventName::ObjectCreatedPut),
|
||||
"s3:ObjectCreated:Post" => Ok(EventName::ObjectCreatedPost),
|
||||
"s3:ObjectCreated:Copy" => Ok(EventName::ObjectCreatedCopy),
|
||||
"s3:ObjectCreated:CompleteMultipartUpload" => Ok(EventName::ObjectCreatedCompleteMultipartUpload),
|
||||
"s3:ObjectRemoved:*" => Ok(EventName::ObjectRemovedAll),
|
||||
"s3:ObjectRemoved:Delete" => Ok(EventName::ObjectRemovedDelete),
|
||||
"s3:ObjectRemoved:DeleteMarkerCreated" => Ok(EventName::ObjectRemovedDeleteMarkerCreated),
|
||||
"s3:ObjectAccessed:*" => Ok(EventName::ObjectAccessedAll),
|
||||
"s3:ObjectAccessed:Get" => Ok(EventName::ObjectAccessedGet),
|
||||
"s3:ObjectAccessed:Head" => Ok(EventName::ObjectAccessedHead),
|
||||
"s3:ObjectRestored:*" => Ok(EventName::ObjectRestoredAll),
|
||||
"s3:ObjectRestored:Post" => Ok(EventName::ObjectRestoredPost),
|
||||
"s3:ObjectRestored:Completed" => Ok(EventName::ObjectRestoredCompleted),
|
||||
"s3:Replication:*" => Ok(EventName::ReplicationAll),
|
||||
"s3:Replication:Failed" => Ok(EventName::ReplicationFailed),
|
||||
"s3:Replication:Complete" => Ok(EventName::ReplicationComplete),
|
||||
_ => Err(Error::InvalidEventName(format!("Unrecognized event name: {}", s))),
|
||||
}
|
||||
}
|
||||
}
|
||||
100
crates/event-notifier/src/global.rs
Normal file
100
crates/event-notifier/src/global.rs
Normal file
@@ -0,0 +1,100 @@
|
||||
use crate::{ChannelAdapter, Error, Event, NotificationConfig, NotificationSystem};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{Mutex, OnceCell};
|
||||
|
||||
static GLOBAL_SYSTEM: OnceCell<Arc<Mutex<NotificationSystem>>> = OnceCell::const_new();
|
||||
static INITIALIZED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
|
||||
|
||||
/// initialize the global notification system
|
||||
pub async fn initialize(config: NotificationConfig) -> Result<(), Error> {
|
||||
if INITIALIZED.swap(true, std::sync::atomic::Ordering::SeqCst) {
|
||||
return Err(Error::custom("notify the system has been initialized"));
|
||||
}
|
||||
|
||||
let system = Arc::new(Mutex::new(NotificationSystem::new(config).await?));
|
||||
GLOBAL_SYSTEM
|
||||
.set(system)
|
||||
.map_err(|_| Error::custom("unable to set up global notification system"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// start the global notification system
|
||||
pub async fn start(adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
|
||||
let system = get_system().await?;
|
||||
|
||||
// create a new task to run the system
|
||||
let system_clone = Arc::clone(&system);
|
||||
tokio::spawn(async move {
|
||||
let mut system_guard = system_clone.lock().await;
|
||||
if let Err(e) = system_guard.start(adapters).await {
|
||||
tracing::error!("notify the system to start failed: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Initialize and start the global notification system
|
||||
///
|
||||
/// This method combines the functions of `initialize` and `start` to provide one-step setup:
|
||||
/// - initialize system configuration
|
||||
/// - create an adapter
|
||||
/// - start event listening
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use rustfs_event_notifier::{initialize_and_start, NotificationConfig};
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() -> Result<(), rustfs_event_notifier::Error> {
|
||||
/// let config = NotificationConfig {
|
||||
/// store_path: "./events".to_string(),
|
||||
/// channel_capacity: 100,
|
||||
/// adapters: vec![/* 适配器配置 */],
|
||||
/// http: Default::default(),
|
||||
/// };
|
||||
///
|
||||
/// // complete initialization and startup in one step
|
||||
/// initialize_and_start(config).await?;
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn initialize_and_start(config: NotificationConfig) -> Result<(), Error> {
|
||||
// initialize the system first
|
||||
initialize(config.clone()).await?;
|
||||
|
||||
// create an adapter
|
||||
let adapters = crate::create_adapters(&config.adapters).expect("failed to create adapters");
|
||||
|
||||
// start the system
|
||||
start(adapters).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// send events to notification system
|
||||
pub async fn send_event(event: Event) -> Result<(), Error> {
|
||||
let system = get_system().await?;
|
||||
let system_guard = system.lock().await;
|
||||
system_guard.send_event(event).await
|
||||
}
|
||||
|
||||
/// turn off the notification system
|
||||
pub fn shutdown() -> Result<(), Error> {
|
||||
if let Some(system) = GLOBAL_SYSTEM.get() {
|
||||
let system_guard = system.blocking_lock();
|
||||
system_guard.shutdown();
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::custom("notification system not initialized"))
|
||||
}
|
||||
}
|
||||
|
||||
/// get system instance
|
||||
async fn get_system() -> Result<Arc<Mutex<NotificationSystem>>, Error> {
|
||||
GLOBAL_SYSTEM
|
||||
.get()
|
||||
.cloned()
|
||||
.ok_or_else(|| Error::custom("notification system not initialized"))
|
||||
}
|
||||
@@ -1,80 +1,131 @@
|
||||
/// RustFS Event Notifier
|
||||
/// This crate provides a simple event notification system for RustFS.
|
||||
/// It allows for the registration of event handlers and the triggering of events.
|
||||
///
|
||||
mod adapter;
|
||||
mod bus;
|
||||
mod config;
|
||||
mod error;
|
||||
mod event;
|
||||
mod event_name;
|
||||
mod notifier;
|
||||
mod rules;
|
||||
mod stats;
|
||||
mod target;
|
||||
mod target_entry;
|
||||
mod global;
|
||||
mod producer;
|
||||
mod store;
|
||||
|
||||
pub fn add(left: u64, right: u64) -> u64 {
|
||||
left + right
|
||||
pub use adapter::create_adapters;
|
||||
#[cfg(feature = "kafka")]
|
||||
pub use adapter::kafka::KafkaAdapter;
|
||||
#[cfg(feature = "mqtt")]
|
||||
pub use adapter::mqtt::MqttAdapter;
|
||||
#[cfg(feature = "webhook")]
|
||||
pub use adapter::webhook::WebhookAdapter;
|
||||
pub use adapter::ChannelAdapter;
|
||||
pub use bus::event_bus;
|
||||
#[cfg(feature = "http-producer")]
|
||||
pub use config::HttpProducerConfig;
|
||||
#[cfg(feature = "kafka")]
|
||||
pub use config::KafkaConfig;
|
||||
#[cfg(feature = "mqtt")]
|
||||
pub use config::MqttConfig;
|
||||
#[cfg(feature = "webhook")]
|
||||
pub use config::WebhookConfig;
|
||||
pub use config::{AdapterConfig, NotificationConfig};
|
||||
pub use error::Error;
|
||||
|
||||
pub use event::{Bucket, Event, EventBuilder, Identity, Log, Metadata, Name, Object, Source};
|
||||
pub use global::{initialize, initialize_and_start, send_event, shutdown, start};
|
||||
pub use store::EventStore;
|
||||
|
||||
#[cfg(feature = "http-producer")]
|
||||
pub use producer::http::HttpProducer;
|
||||
#[cfg(feature = "http-producer")]
|
||||
pub use producer::EventProducer;
|
||||
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// The `NotificationSystem` struct represents the notification system.
|
||||
/// It manages the event bus and the adapters.
|
||||
/// It is responsible for sending and receiving events.
|
||||
/// It also handles the shutdown process.
|
||||
pub struct NotificationSystem {
|
||||
tx: mpsc::Sender<Event>,
|
||||
rx: Option<mpsc::Receiver<Event>>,
|
||||
store: Arc<EventStore>,
|
||||
shutdown: CancellationToken,
|
||||
#[cfg(feature = "http-producer")]
|
||||
http_config: HttpProducerConfig,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::event::{Bucket, Event, Identity, Metadata, Object, Source};
|
||||
use crate::target::{TargetID, TargetList};
|
||||
use std::collections::HashMap;
|
||||
impl NotificationSystem {
|
||||
/// Creates a new `NotificationSystem` instance.
|
||||
pub async fn new(config: NotificationConfig) -> Result<Self, Error> {
|
||||
let (tx, rx) = mpsc::channel::<Event>(config.channel_capacity);
|
||||
let store = Arc::new(EventStore::new(&config.store_path).await?);
|
||||
let shutdown = CancellationToken::new();
|
||||
|
||||
#[test]
|
||||
fn it_works() {
|
||||
let result = add(2, 2);
|
||||
assert_eq!(result, 4);
|
||||
let restored_logs = store.load_logs().await?;
|
||||
for log in restored_logs {
|
||||
for event in log.records {
|
||||
// For example, where the send method may return a SendError when calling it
|
||||
tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
tx,
|
||||
rx: Some(rx),
|
||||
store,
|
||||
shutdown,
|
||||
#[cfg(feature = "http-producer")]
|
||||
http_config: config.http,
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let target_list = TargetList::new();
|
||||
let event = Event {
|
||||
event_version: "1.0".to_string(),
|
||||
event_source: "aws:s3".to_string(),
|
||||
aws_region: "us-west-2".to_string(),
|
||||
event_time: "2023-10-01T12:00:00Z".to_string(),
|
||||
event_name: "PutObject".to_string(),
|
||||
user_identity: Identity {
|
||||
principal_id: "user123".to_string(),
|
||||
/// Starts the notification system.
|
||||
/// It initializes the event bus and the producer.
|
||||
pub async fn start(&mut self, adapters: Vec<Arc<dyn ChannelAdapter>>) -> Result<(), Error> {
|
||||
let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?;
|
||||
|
||||
let shutdown_clone = self.shutdown.clone();
|
||||
let store_clone = self.store.clone();
|
||||
let bus_handle = tokio::spawn(async move {
|
||||
if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone).await {
|
||||
tracing::error!("Event bus failed: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
#[cfg(feature = "http-producer")]
|
||||
{
|
||||
let producer = HttpProducer::new(self.tx.clone(), self.http_config.port);
|
||||
producer.start().await?;
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
result = bus_handle => {
|
||||
result.map_err(Error::JoinError)?;
|
||||
Ok(())
|
||||
},
|
||||
request_parameters: HashMap::new(),
|
||||
response_elements: HashMap::new(),
|
||||
s3: Metadata {
|
||||
schema_version: "1.0".to_string(),
|
||||
configuration_id: "config123".to_string(),
|
||||
bucket: Bucket {
|
||||
name: "my-bucket".to_string(),
|
||||
owner_identity: Identity {
|
||||
principal_id: "owner123".to_string(),
|
||||
},
|
||||
arn: "arn:aws:s3:::my-bucket".to_string(),
|
||||
},
|
||||
object: Object {
|
||||
key: "my-object.txt".to_string(),
|
||||
version_id: Some("version123".to_string()),
|
||||
sequencer: "seq123".to_string(),
|
||||
size: Some(1024),
|
||||
etag: Some("etag123".to_string()),
|
||||
content_type: Some("text/plain".to_string()),
|
||||
user_metadata: HashMap::new(),
|
||||
},
|
||||
},
|
||||
source: Source {
|
||||
host: "localhost".to_string(),
|
||||
user_agent: "RustFS/1.0".to_string(),
|
||||
},
|
||||
};
|
||||
let target_ids: &[TargetID] = &["".to_string()];
|
||||
// 发送事件
|
||||
let results = target_list.send(event, &target_ids).await;
|
||||
println!("result len:{:?}", results.len());
|
||||
// 获取统计信息
|
||||
let stats = target_list.get_stats();
|
||||
for (id, stat) in stats {
|
||||
println!("Target {}: {} events, {} failed", id, stat.total_events, stat.failed_events);
|
||||
_ = self.shutdown.cancelled() => {
|
||||
tracing::info!("System shutdown triggered");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends an event to the notification system.
|
||||
/// This method is used to send events to the event bus.
|
||||
pub async fn send_event(&self, event: Event) -> Result<(), Error> {
|
||||
self.tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Shuts down the notification system.
|
||||
/// This method is used to cancel the event bus and producer tasks.
|
||||
pub fn shutdown(&self) {
|
||||
self.shutdown.cancel();
|
||||
}
|
||||
|
||||
/// Sets the HTTP port for the notification system.
|
||||
/// This method is used to change the port for the HTTP producer.
|
||||
#[cfg(feature = "http-producer")]
|
||||
pub fn set_http_port(&mut self, port: u16) {
|
||||
self.http_config.port = port;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
// src/notifier.rs
|
||||
use crate::error::Error;
|
||||
use crate::event::Event;
|
||||
use crate::rules::{RulesMap, TargetIDSet};
|
||||
use crate::target::TargetList;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
pub struct EventNotifier {
|
||||
target_list: Arc<TargetList>,
|
||||
rules: RwLock<HashMap<String, RulesMap>>,
|
||||
tx: broadcast::Sender<Event>,
|
||||
}
|
||||
|
||||
impl EventNotifier {
|
||||
pub fn new(capacity: usize) -> Self {
|
||||
let (tx, _) = broadcast::channel(capacity);
|
||||
Self {
|
||||
target_list: Arc::new(TargetList::new()),
|
||||
rules: RwLock::new(HashMap::new()),
|
||||
tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send(&self, event: Event) -> Result<(), Error> {
|
||||
let rules = self
|
||||
.rules
|
||||
.read()
|
||||
.map_err(|_| Error::StoreError("Failed to read rules".to_string()))?;
|
||||
// 检查规则匹配
|
||||
let target_ids = if let Some(rules_map) = rules.get(&event.s3.bucket.name) {
|
||||
rules_map.match_targets(event.s3.event_name, &event.s3.object.key)
|
||||
} else {
|
||||
TargetIDSet::new()
|
||||
};
|
||||
|
||||
// 发送事件
|
||||
if !target_ids.is_empty() {
|
||||
self.target_list.send(event.clone(), &target_ids).await;
|
||||
}
|
||||
|
||||
// 广播给监听者
|
||||
let _ = self.tx.send(event);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
83
crates/event-notifier/src/producer.rs
Normal file
83
crates/event-notifier/src/producer.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use crate::Error;
|
||||
use crate::Event;
|
||||
use async_trait::async_trait;
|
||||
|
||||
/// event producer characteristics
|
||||
#[allow(dead_code)]
|
||||
#[async_trait]
|
||||
pub trait EventProducer: Send + Sync {
|
||||
/// start producer services
|
||||
async fn start(&self) -> Result<(), Error>;
|
||||
/// stop producer services
|
||||
async fn stop(&self) -> Result<(), Error>;
|
||||
/// send a single event
|
||||
async fn send_event(&self, event: Event) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
#[cfg(feature = "http-producer")]
|
||||
pub mod http {
|
||||
use super::*;
|
||||
use axum::{routing::post, Json, Router};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct HttpProducer {
|
||||
tx: mpsc::Sender<Event>,
|
||||
port: u16,
|
||||
shutdown: Arc<tokio::sync::Notify>,
|
||||
}
|
||||
|
||||
impl HttpProducer {
|
||||
pub fn new(tx: mpsc::Sender<Event>, port: u16) -> Self {
|
||||
Self {
|
||||
tx,
|
||||
port,
|
||||
shutdown: Arc::new(tokio::sync::Notify::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventProducer for HttpProducer {
|
||||
async fn start(&self) -> Result<(), Error> {
|
||||
let producer = self.clone();
|
||||
let app = Router::new().route(
|
||||
"/event",
|
||||
post(move |event| {
|
||||
let prod = producer.clone();
|
||||
async move { handle_event(event, prod).await }
|
||||
}),
|
||||
);
|
||||
|
||||
let addr = format!("0.0.0.0:{}", self.port);
|
||||
let listener = tokio::net::TcpListener::bind(&addr).await?;
|
||||
|
||||
let shutdown = self.shutdown.clone();
|
||||
tokio::select! {
|
||||
result = axum::serve(listener, app) => {
|
||||
result?;
|
||||
Ok(())
|
||||
}
|
||||
_ = shutdown.notified() => Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<(), Error> {
|
||||
self.shutdown.notify_one();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_event(&self, event: Event) -> Result<(), Error> {
|
||||
self.tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_event(Json(event): Json<Event>, producer: HttpProducer) -> Result<(), axum::http::StatusCode> {
|
||||
producer
|
||||
.send_event(event)
|
||||
.await
|
||||
.map_err(|_| axum::http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
@@ -1,191 +0,0 @@
|
||||
// src/rules.rs
|
||||
use crate::event_name::EventName;
|
||||
use crate::target::TargetID;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use wildmatch::WildMatch;
|
||||
|
||||
/// TargetIDSet represents a set of target IDs
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct TargetIDSet(HashSet<TargetID>);
|
||||
|
||||
impl TargetIDSet {
|
||||
pub fn new() -> Self {
|
||||
Self(HashSet::new())
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, target_id: TargetID) -> bool {
|
||||
self.0.insert(target_id)
|
||||
}
|
||||
|
||||
pub fn contains(&self, target_id: &TargetID) -> bool {
|
||||
self.0.contains(target_id)
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, target_id: &TargetID) -> bool {
|
||||
self.0.remove(target_id)
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.0.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.0.clear()
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = &TargetID> {
|
||||
self.0.iter()
|
||||
}
|
||||
|
||||
pub fn extend<I: IntoIterator<Item = TargetID>>(&mut self, iter: I) {
|
||||
self.0.extend(iter)
|
||||
}
|
||||
|
||||
pub fn union(&self, other: &TargetIDSet) -> TargetIDSet {
|
||||
let mut result = self.clone();
|
||||
result.extend(other.0.iter().cloned());
|
||||
result
|
||||
}
|
||||
|
||||
pub fn difference(&self, other: &TargetIDSet) -> TargetIDSet {
|
||||
let mut result = TargetIDSet::new();
|
||||
for id in self.0.iter() {
|
||||
if !other.contains(id) {
|
||||
result.insert(id.clone());
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
impl FromIterator<TargetID> for TargetIDSet {
|
||||
fn from_iter<I: IntoIterator<Item = TargetID>>(iter: I) -> Self {
|
||||
let mut set = TargetIDSet::new();
|
||||
set.extend(iter);
|
||||
set
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Rules {
|
||||
patterns: HashMap<String, TargetIDSet>,
|
||||
}
|
||||
|
||||
impl Rules {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
patterns: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add(&mut self, pattern: String, target_id: TargetID) {
|
||||
let entry = self.patterns.entry(pattern).or_insert_with(TargetIDSet::new);
|
||||
entry.insert(target_id);
|
||||
}
|
||||
|
||||
pub fn match_object(&self, object_name: &str) -> bool {
|
||||
for pattern in self.patterns.keys() {
|
||||
if WildMatch::new(pattern).matches(object_name) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub fn match_targets(&self, object_name: &str) -> TargetIDSet {
|
||||
let mut target_ids = TargetIDSet::new();
|
||||
|
||||
for (pattern, ids) in &self.patterns {
|
||||
if WildMatch::new(pattern).matches(object_name) {
|
||||
target_ids.extend(ids.iter().cloned());
|
||||
}
|
||||
}
|
||||
|
||||
target_ids
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct RulesMap {
|
||||
rules: HashMap<EventName, Rules>,
|
||||
}
|
||||
|
||||
impl RulesMap {
|
||||
pub fn new() -> Self {
|
||||
Self { rules: HashMap::new() }
|
||||
}
|
||||
|
||||
pub fn add(&mut self, events: &[EventName], pattern: String, target_id: TargetID) {
|
||||
for &event in events {
|
||||
for expanded_event in event.expand() {
|
||||
let rules = self.rules.entry(expanded_event).or_insert_with(Rules::new);
|
||||
rules.add(pattern.clone(), target_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn match_simple(&self, event: EventName, object_name: &str) -> bool {
|
||||
match self.rules.get(&event) {
|
||||
Some(rules) => rules.match_object(object_name),
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn match_targets(&self, event: EventName, object_name: &str) -> TargetIDSet {
|
||||
match self.rules.get(&event) {
|
||||
Some(rules) => rules.match_targets(object_name),
|
||||
None => TargetIDSet::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for RulesMap {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
rules: self.rules.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Rules {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::event_name::EventName;
|
||||
|
||||
#[test]
|
||||
fn test_rules() {
|
||||
let mut rules = Rules::new();
|
||||
rules.add("*.txt".to_string(), "target1".to_string());
|
||||
rules.add("*.jpg".to_string(), "target2".to_string());
|
||||
|
||||
assert!(rules.match_object("file.txt"));
|
||||
assert!(!rules.match_object("file.pdf"));
|
||||
assert!(rules.match_object("image.jpg"));
|
||||
assert!(!rules.match_object("image.png"));
|
||||
|
||||
let target_ids = rules.match_targets("file.txt");
|
||||
assert_eq!(target_ids.len(), 1);
|
||||
assert!(target_ids.iter().any(|id| id == "target1"));
|
||||
|
||||
let mut rules_map = RulesMap::new();
|
||||
let target_id = "target1".to_string();
|
||||
// 添加规则
|
||||
rules_map.add(&[EventName::ObjectCreatedAll], String::from("images/*"), target_id);
|
||||
|
||||
// 匹配对象
|
||||
let matches = rules_map.match_simple(EventName::ObjectCreatedPut, "images/photo.jpg"); // returns true
|
||||
|
||||
// 获取匹配的目标
|
||||
let target_ids = rules_map.match_targets(EventName::ObjectCreatedPut, "images/photo.jpg");
|
||||
}
|
||||
}
|
||||
@@ -1,81 +0,0 @@
|
||||
// src/stats.rs
|
||||
use crate::target::TargetID;
|
||||
use parking_lot::RwLock;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicI64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct TargetStat {
|
||||
pub current_send_calls: AtomicI64,
|
||||
pub total_events: AtomicI64,
|
||||
pub failed_events: AtomicI64,
|
||||
pub current_queue: AtomicI64,
|
||||
}
|
||||
|
||||
impl TargetStat {
|
||||
pub fn inc_send_calls(&self) {
|
||||
self.current_send_calls.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn dec_send_calls(&self) {
|
||||
self.current_send_calls.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn inc_total_events(&self) {
|
||||
self.total_events.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn inc_failed_events(&self) {
|
||||
self.failed_events.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn set_queue_size(&self, size: i64) {
|
||||
self.current_queue.store(size, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct TargetStats {
|
||||
stats: RwLock<HashMap<TargetID, Arc<TargetStat>>>,
|
||||
}
|
||||
|
||||
impl TargetStats {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn get_or_create(&self, id: &TargetID) -> Arc<TargetStat> {
|
||||
let mut stats = self.stats.write();
|
||||
stats
|
||||
.entry(id.clone())
|
||||
.or_insert_with(|| Arc::new(TargetStat::default()))
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub fn get_stats(&self) -> HashMap<TargetID, TargetSnapshot> {
|
||||
self.stats
|
||||
.read()
|
||||
.iter()
|
||||
.map(|(id, stat)| {
|
||||
(
|
||||
id.clone(),
|
||||
TargetSnapshot {
|
||||
current_send_calls: stat.current_send_calls.load(Ordering::Relaxed),
|
||||
total_events: stat.total_events.load(Ordering::Relaxed),
|
||||
failed_events: stat.failed_events.load(Ordering::Relaxed),
|
||||
current_queue: stat.current_queue.load(Ordering::Relaxed),
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TargetSnapshot {
|
||||
pub current_send_calls: i64,
|
||||
pub total_events: i64,
|
||||
pub failed_events: i64,
|
||||
pub current_queue: i64,
|
||||
}
|
||||
53
crates/event-notifier/src/store.rs
Normal file
53
crates/event-notifier/src/store.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
use crate::Error;
|
||||
use crate::Log;
|
||||
use chrono::Utc;
|
||||
use std::sync::Arc;
|
||||
use tokio::fs::{create_dir_all, File, OpenOptions};
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
/// `EventStore` is a struct that manages the storage of event logs.
|
||||
pub struct EventStore {
|
||||
path: String,
|
||||
lock: Arc<RwLock<()>>,
|
||||
}
|
||||
|
||||
impl EventStore {
|
||||
pub async fn new(path: &str) -> Result<Self, Error> {
|
||||
create_dir_all(path).await?;
|
||||
Ok(Self {
|
||||
path: path.to_string(),
|
||||
lock: Arc::new(RwLock::new(())),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn save_logs(&self, logs: &[Log]) -> Result<(), Error> {
|
||||
let _guard = self.lock.write().await;
|
||||
let file_path = format!("{}/events_{}.jsonl", self.path, Utc::now().timestamp());
|
||||
let file = OpenOptions::new().create(true).append(true).open(&file_path).await?;
|
||||
let mut writer = BufWriter::new(file);
|
||||
for log in logs {
|
||||
let line = serde_json::to_string(log)?;
|
||||
writer.write_all(line.as_bytes()).await?;
|
||||
writer.write_all(b"\n").await?;
|
||||
}
|
||||
writer.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn load_logs(&self) -> Result<Vec<Log>, Error> {
|
||||
let _guard = self.lock.read().await;
|
||||
let mut logs = Vec::new();
|
||||
let mut entries = tokio::fs::read_dir(&self.path).await?;
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let file = File::open(entry.path()).await?;
|
||||
let reader = BufReader::new(file);
|
||||
let mut lines = reader.lines();
|
||||
while let Some(line) = lines.next_line().await? {
|
||||
let log: Log = serde_json::from_str(&line)?;
|
||||
logs.push(log);
|
||||
}
|
||||
}
|
||||
Ok(logs)
|
||||
}
|
||||
}
|
||||
@@ -1,66 +0,0 @@
|
||||
// src/target.rs
|
||||
use crate::error::{Error, Result};
|
||||
use crate::event::Event;
|
||||
use crate::stats::{TargetSnapshot, TargetStats};
|
||||
use async_trait::async_trait;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
pub type TargetID = String;
|
||||
|
||||
#[async_trait]
|
||||
pub trait Target: Send + Sync {
|
||||
fn id(&self) -> TargetID;
|
||||
async fn send(&self, event: Event) -> Result<()>;
|
||||
async fn is_active(&self) -> bool;
|
||||
async fn close(&self) -> Result<()>;
|
||||
}
|
||||
|
||||
pub struct TargetList {
|
||||
targets: Arc<RwLock<HashMap<TargetID, Box<dyn Target>>>>,
|
||||
stats: Arc<TargetStats>,
|
||||
}
|
||||
|
||||
impl TargetList {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
targets: Arc::new(RwLock::new(HashMap::new())),
|
||||
stats: Arc::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send(&self, event: Event, target_ids: &[TargetID]) -> Vec<Result<()>> {
|
||||
let mut results = Vec::with_capacity(target_ids.len());
|
||||
let targets = self.targets.read().unwrap();
|
||||
|
||||
for id in target_ids {
|
||||
let target = match targets.get(id) {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
results.push(Err(Error::TargetNotFound(id.to_string())));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let stats = self.stats.get_or_create(id);
|
||||
stats.inc_send_calls();
|
||||
|
||||
let result = target.send(event.clone()).await;
|
||||
|
||||
stats.dec_send_calls();
|
||||
stats.inc_total_events();
|
||||
|
||||
if result.is_err() {
|
||||
stats.inc_failed_events();
|
||||
}
|
||||
|
||||
results.push(result);
|
||||
}
|
||||
|
||||
results
|
||||
}
|
||||
|
||||
pub fn get_stats(&self) -> HashMap<TargetID, TargetSnapshot> {
|
||||
self.stats.get_stats()
|
||||
}
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
mod webhook;
|
||||
@@ -1,161 +0,0 @@
|
||||
use crate::event::Event;
|
||||
use crate::target::{Target, TargetID};
|
||||
use async_trait::async_trait;
|
||||
use reqwest::{header, Client};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum WebhookError {
|
||||
#[error("HTTP client error: {0}")]
|
||||
HttpClient(#[from] reqwest::Error),
|
||||
#[error("JSON serialization error: {0}")]
|
||||
Json(#[from] serde_json::Error),
|
||||
#[error("Store error: {0}")]
|
||||
Store(#[from] Box<dyn std::error::Error + Send + Sync>),
|
||||
#[error("Webhook request failed after retries")]
|
||||
RequestFailed,
|
||||
}
|
||||
|
||||
type Result<T> = std::result::Result<T, WebhookError>;
|
||||
|
||||
/// Webhook 目标配置
|
||||
|
||||
/// Webhook 目标配置
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WebhookConfig {
|
||||
/// Webhook endpoint URL
|
||||
pub endpoint: String,
|
||||
/// 认证令牌
|
||||
pub auth_token: Option<String>,
|
||||
/// 自定义请求头
|
||||
pub custom_headers: Option<HashMap<String, String>>,
|
||||
/// 重试次数
|
||||
pub max_retries: u32,
|
||||
/// 连接超时时间 (秒)
|
||||
pub timeout: u64,
|
||||
}
|
||||
|
||||
/// Webhook Target 实现
|
||||
pub struct Webhook {
|
||||
/// 目标 ID
|
||||
id: TargetID,
|
||||
/// HTTP 客户端
|
||||
client: Client,
|
||||
/// 配置信息
|
||||
config: WebhookConfig,
|
||||
/// 事件存储
|
||||
store: Arc<Mutex<dyn Store>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Target for Webhook {
|
||||
/// 返回目标 ID
|
||||
fn id(&self) -> &TargetID {
|
||||
&self.id
|
||||
}
|
||||
|
||||
/// 检查 Webhook 是否可用
|
||||
async fn is_active(&self) -> Result<bool> {
|
||||
// 发送测试请求验证连接
|
||||
let resp = self.client.get(&self.config.endpoint).send().await?;
|
||||
|
||||
Ok(resp.status().is_success())
|
||||
}
|
||||
|
||||
/// 保存事件到存储并异步发送
|
||||
async fn save(&self, event: Event) -> Result<()> {
|
||||
// 序列化事件
|
||||
let event_json = serde_json::to_string(&event)?;
|
||||
|
||||
// 存储事件
|
||||
self.store.lock().await.save(event_json)?;
|
||||
|
||||
// 异步发送
|
||||
tokio::spawn(self.send_event(event));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 从存储发送事件
|
||||
async fn send(&self, key: StoreKey) -> Result<()> {
|
||||
// 从存储获取事件
|
||||
let event_json = self.store.lock().await.get(&key)?;
|
||||
let event: Event = serde_json::from_str(&event_json)?;
|
||||
|
||||
// 发送事件到 webhook
|
||||
self.send_event(event).await?;
|
||||
|
||||
// 发送成功后删除
|
||||
self.store.lock().await.delete(&key)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 返回事件存储
|
||||
fn store(&self) -> Arc<Mutex<dyn Store>> {
|
||||
self.store.clone()
|
||||
}
|
||||
|
||||
/// 关闭 Target
|
||||
async fn close(&self) -> Result<()> {
|
||||
// 等待所有事件发送完成
|
||||
self.store.lock().await.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Webhook {
|
||||
/// 创建新的 Webhook Target
|
||||
pub fn new(id: TargetID, config: WebhookConfig, store: Arc<Mutex<dyn Store>>) -> Result<Self> {
|
||||
// 构建 HTTP 客户端
|
||||
let client = Client::builder().timeout(Duration::from_secs(config.timeout)).build()?;
|
||||
|
||||
Ok(Self {
|
||||
id,
|
||||
client,
|
||||
config,
|
||||
store,
|
||||
})
|
||||
}
|
||||
|
||||
/// 发送事件到 Webhook endpoint
|
||||
async fn send_event(&self, event: Event) -> Result<()> {
|
||||
let mut retries = 0;
|
||||
loop {
|
||||
// 构建请求
|
||||
let mut req = self.client.post(&self.config.endpoint).json(&event);
|
||||
|
||||
// 添加认证头
|
||||
if let Some(token) = &self.config.auth_token {
|
||||
req = req.header(header::AUTHORIZATION, format!("Bearer {}", token));
|
||||
}
|
||||
|
||||
// 添加自定义头
|
||||
if let Some(headers) = &self.config.custom_headers {
|
||||
for (key, value) in headers {
|
||||
req = req.header(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
// 发送请求
|
||||
match req.send().await {
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
return Ok(());
|
||||
}
|
||||
_ if retries < self.config.max_retries => {
|
||||
retries += 1;
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
Err(e) => return Err(WebhookError::HttpClient(e)),
|
||||
_ => return Err(WebhookError::RequestFailed),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
160
crates/event-notifier/tests/integration.rs
Normal file
160
crates/event-notifier/tests/integration.rs
Normal file
@@ -0,0 +1,160 @@
|
||||
use rustfs_event_notifier::{AdapterConfig, NotificationSystem, WebhookConfig};
|
||||
use rustfs_event_notifier::{Bucket, Event, EventBuilder, Identity, Metadata, Name, Object, Source};
|
||||
use rustfs_event_notifier::{ChannelAdapter, WebhookAdapter};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_webhook_adapter() {
|
||||
let adapter = WebhookAdapter::new(WebhookConfig {
|
||||
endpoint: "http://localhost:8080/webhook".to_string(),
|
||||
auth_token: None,
|
||||
custom_headers: None,
|
||||
max_retries: 1,
|
||||
timeout: 5,
|
||||
});
|
||||
|
||||
// create an s3 metadata object
|
||||
let metadata = Metadata {
|
||||
schema_version: "1.0".to_string(),
|
||||
configuration_id: "test-config".to_string(),
|
||||
bucket: Bucket {
|
||||
name: "my-bucket".to_string(),
|
||||
owner_identity: Identity {
|
||||
principal_id: "owner123".to_string(),
|
||||
},
|
||||
arn: "arn:aws:s3:::my-bucket".to_string(),
|
||||
},
|
||||
object: Object {
|
||||
key: "test.txt".to_string(),
|
||||
size: Some(1024),
|
||||
etag: Some("abc123".to_string()),
|
||||
content_type: Some("text/plain".to_string()),
|
||||
user_metadata: None,
|
||||
version_id: None,
|
||||
sequencer: "1234567890".to_string(),
|
||||
},
|
||||
};
|
||||
|
||||
// create source object
|
||||
let source = Source {
|
||||
host: "localhost".to_string(),
|
||||
port: "80".to_string(),
|
||||
user_agent: "curl/7.68.0".to_string(),
|
||||
};
|
||||
|
||||
// Create events using builder mode
|
||||
let event = Event::builder()
|
||||
.event_version("2.0")
|
||||
.event_source("aws:s3")
|
||||
.aws_region("us-east-1")
|
||||
.event_time("2023-10-01T12:00:00.000Z")
|
||||
.event_name(Name::ObjectCreatedPut)
|
||||
.user_identity(Identity {
|
||||
principal_id: "user123".to_string(),
|
||||
})
|
||||
.request_parameters(HashMap::new())
|
||||
.response_elements(HashMap::new())
|
||||
.s3(metadata)
|
||||
.source(source)
|
||||
.channels(vec!["webhook".to_string()])
|
||||
.build()
|
||||
.expect("failed to create event");
|
||||
|
||||
let result = adapter.send(&event).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_notification_system() {
|
||||
let config = rustfs_event_notifier::NotificationConfig {
|
||||
store_path: "./test_events".to_string(),
|
||||
channel_capacity: 100,
|
||||
adapters: vec![AdapterConfig::Webhook(WebhookConfig {
|
||||
endpoint: "http://localhost:8080/webhook".to_string(),
|
||||
auth_token: None,
|
||||
custom_headers: None,
|
||||
max_retries: 1,
|
||||
timeout: 5,
|
||||
})],
|
||||
http: Default::default(),
|
||||
};
|
||||
let system = Arc::new(tokio::sync::Mutex::new(NotificationSystem::new(config.clone()).await.unwrap()));
|
||||
let adapters: Vec<Arc<dyn ChannelAdapter>> = vec![Arc::new(WebhookAdapter::new(WebhookConfig {
|
||||
endpoint: "http://localhost:8080/webhook".to_string(),
|
||||
auth_token: None,
|
||||
custom_headers: None,
|
||||
max_retries: 1,
|
||||
timeout: 5,
|
||||
}))];
|
||||
|
||||
// create an s3 metadata object
|
||||
let metadata = Metadata {
|
||||
schema_version: "1.0".to_string(),
|
||||
configuration_id: "test-config".to_string(),
|
||||
bucket: Bucket {
|
||||
name: "my-bucket".to_string(),
|
||||
owner_identity: Identity {
|
||||
principal_id: "owner123".to_string(),
|
||||
},
|
||||
arn: "arn:aws:s3:::my-bucket".to_string(),
|
||||
},
|
||||
object: Object {
|
||||
key: "test.txt".to_string(),
|
||||
size: Some(1024),
|
||||
etag: Some("abc123".to_string()),
|
||||
content_type: Some("text/plain".to_string()),
|
||||
user_metadata: None,
|
||||
version_id: None,
|
||||
sequencer: "1234567890".to_string(),
|
||||
},
|
||||
};
|
||||
|
||||
// create source object
|
||||
let source = Source {
|
||||
host: "localhost".to_string(),
|
||||
port: "80".to_string(),
|
||||
user_agent: "curl/7.68.0".to_string(),
|
||||
};
|
||||
|
||||
// create a preconfigured builder with objects
|
||||
let event = EventBuilder::for_object_creation(metadata, source)
|
||||
.user_identity(Identity {
|
||||
principal_id: "user123".to_string(),
|
||||
})
|
||||
.event_time("2023-10-01T12:00:00.000Z")
|
||||
.channels(vec!["webhook".to_string()])
|
||||
.build()
|
||||
.expect("failed to create event");
|
||||
|
||||
{
|
||||
let system_lock = system.lock().await;
|
||||
system_lock.send_event(event).await.unwrap();
|
||||
}
|
||||
|
||||
let system_clone = Arc::clone(&system);
|
||||
let system_handle = tokio::spawn(async move {
|
||||
let mut system = system_clone.lock().await;
|
||||
system.start(adapters).await
|
||||
});
|
||||
|
||||
// set 10 seconds timeout
|
||||
match tokio::time::timeout(std::time::Duration::from_secs(10), system_handle).await {
|
||||
Ok(result) => {
|
||||
println!("System started successfully");
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
Err(_) => {
|
||||
println!("System operation timed out, forcing shutdown");
|
||||
// create a new task to handle the timeout
|
||||
let system = Arc::clone(&system);
|
||||
tokio::spawn(async move {
|
||||
if let Ok(guard) = system.try_lock() {
|
||||
guard.shutdown();
|
||||
}
|
||||
});
|
||||
// give the system some time to clean up resources
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -60,7 +60,7 @@ s3s.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_urlencoded = { workspace = true }
|
||||
shadow-rs.workspace = true
|
||||
shadow-rs = { workspace = true, features = ["build"] }
|
||||
tracing.workspace = true
|
||||
time = { workspace = true, features = ["parsing", "formatting", "serde"] }
|
||||
tokio-util.workspace = true
|
||||
@@ -103,5 +103,5 @@ hyper-util = { workspace = true, features = [
|
||||
] }
|
||||
transform-stream = { workspace = true }
|
||||
netif = "0.1.6"
|
||||
shadow-rs.workspace = true
|
||||
shadow-rs = { workspace = true, features = ["build"] }
|
||||
# pin-utils = "0.1.0"
|
||||
|
||||
@@ -103,6 +103,10 @@ pub struct Opt {
|
||||
|
||||
#[arg(long, env = "RUSTFS_LICENSE")]
|
||||
pub license: Option<String>,
|
||||
|
||||
/// event notifier config file
|
||||
#[arg(long, env = "RUSTFS_EVENT_CONFIG")]
|
||||
pub event_config: Option<String>,
|
||||
}
|
||||
|
||||
// lazy_static::lazy_static! {
|
||||
|
||||
Reference in New Issue
Block a user