From bfc165abe0eb3e6bcd93a3ef0b2d3c5484805b02 Mon Sep 17 00:00:00 2001 From: houseme Date: Mon, 21 Apr 2025 00:17:27 +0800 Subject: [PATCH] feat: implement event notification system - Add core event notification interfaces - Support multiple notification backends: - Webhook (default) - Kafka - MQTT - HTTP Producer - Implement configurable event filtering - Add async event dispatching with backpressure handling - Provide serialization/deserialization for event payloads This module enables system events to be published to various endpoints with consistent delivery guarantees and failure handling. --- Cargo.lock | 397 +++++++++++++-- Cargo.toml | 25 +- crates/event-notifier/Cargo.toml | 30 +- crates/event-notifier/examples/.env.example | 27 ++ crates/event-notifier/examples/event.toml | 28 ++ crates/event-notifier/examples/simple.rs | 102 ++++ crates/event-notifier/src/adapter/kafka.rs | 76 +++ crates/event-notifier/src/adapter/mod.rs | 54 +++ crates/event-notifier/src/adapter/mqtt.rs | 58 +++ crates/event-notifier/src/adapter/webhook.rs | 63 +++ crates/event-notifier/src/bus.rs | 68 +++ crates/event-notifier/src/config.rs | 161 ++++++ crates/event-notifier/src/error.rs | 56 ++- crates/event-notifier/src/event.rs | 459 ++++++++++++++++-- crates/event-notifier/src/event_name.rs | 79 --- crates/event-notifier/src/global.rs | 100 ++++ crates/event-notifier/src/lib.rs | 187 ++++--- crates/event-notifier/src/notifier.rs | 47 -- crates/event-notifier/src/producer.rs | 83 ++++ crates/event-notifier/src/rules.rs | 191 -------- crates/event-notifier/src/stats.rs | 81 ---- crates/event-notifier/src/store.rs | 53 ++ crates/event-notifier/src/target.rs | 66 --- crates/event-notifier/src/target_entry/mod.rs | 1 - .../src/target_entry/webhook.rs | 161 ------ crates/event-notifier/tests/integration.rs | 160 ++++++ rustfs/Cargo.toml | 4 +- rustfs/src/config/mod.rs | 4 + 28 files changed, 2015 insertions(+), 806 deletions(-) create mode 100644 crates/event-notifier/examples/.env.example create mode 100644 crates/event-notifier/examples/event.toml create mode 100644 crates/event-notifier/examples/simple.rs create mode 100644 crates/event-notifier/src/adapter/kafka.rs create mode 100644 crates/event-notifier/src/adapter/mod.rs create mode 100644 crates/event-notifier/src/adapter/mqtt.rs create mode 100644 crates/event-notifier/src/adapter/webhook.rs create mode 100644 crates/event-notifier/src/bus.rs create mode 100644 crates/event-notifier/src/config.rs delete mode 100644 crates/event-notifier/src/event_name.rs create mode 100644 crates/event-notifier/src/global.rs delete mode 100644 crates/event-notifier/src/notifier.rs create mode 100644 crates/event-notifier/src/producer.rs delete mode 100644 crates/event-notifier/src/rules.rs delete mode 100644 crates/event-notifier/src/stats.rs create mode 100644 crates/event-notifier/src/store.rs delete mode 100644 crates/event-notifier/src/target.rs delete mode 100644 crates/event-notifier/src/target_entry/mod.rs delete mode 100644 crates/event-notifier/src/target_entry/webhook.rs create mode 100644 crates/event-notifier/tests/integration.rs diff --git a/Cargo.lock b/Cargo.lock index c0e82fdb..799d7b6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -371,7 +371,7 @@ dependencies = [ "arrow-buffer", "arrow-data", "arrow-schema", - "flatbuffers", + "flatbuffers 24.12.23", "lz4_flex", ] @@ -664,6 +664,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994" +dependencies = [ + "bytemuck", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -800,11 +809,11 @@ dependencies = [ "hyper", "hyper-util", "pin-project-lite", - "rustls", + "rustls 0.23.26", "rustls-pemfile", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tower-service", ] @@ -1017,6 +1026,12 @@ version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" +[[package]] +name = "bytemuck" +version = "1.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6b1fc10dbac614ebc03540c9dbd60e83887fda27794998c6528f1782047d540" + [[package]] name = "byteorder" version = "1.5.0" @@ -1088,6 +1103,38 @@ dependencies = [ "system-deps", ] +[[package]] +name = "camino" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e35af189006b9c0f00a064685c727031e3ed2d8020f7ba284d78cc2671bd36ea" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.19.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd5eb614ed4c27c5d706420e4320fbe3216ab31fa1c33cd8246ac36dae4479ba" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", + "thiserror 2.0.12", +] + [[package]] name = "cc" version = "1.2.19" @@ -1111,7 +1158,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" dependencies = [ - "nom", + "nom 7.1.3", ] [[package]] @@ -2998,6 +3045,12 @@ dependencies = [ "const-random", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "dpi" version = "0.1.1" @@ -3031,7 +3084,7 @@ version = "0.0.1" dependencies = [ "common", "ecstore", - "flatbuffers", + "flatbuffers 25.2.10", "futures", "lazy_static", "lock", @@ -3060,7 +3113,7 @@ dependencies = [ "chrono", "common", "crc32fast", - "flatbuffers", + "flatbuffers 25.2.10", "futures", "glob", "hex-simd", @@ -3071,7 +3124,7 @@ dependencies = [ "madmin", "md-5", "netif", - "nix 0.29.0", + "nix", "num", "num_cpus", "path-absolutize", @@ -3246,6 +3299,21 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "figment" +version = "0.10.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cb01cd46b0cf372153850f4c6c272d9cbea2da513e07538405148f95bd789f3" +dependencies = [ + "atomic", + "pear", + "serde", + "serde_yaml", + "toml", + "uncased", + "version_check", +] + [[package]] name = "fixedbitset" version = "0.5.7" @@ -3262,6 +3330,16 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "flatbuffers" +version = "25.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1045398c1bfd89168b5fd3f1fc11f6e70b34f6f66300c87d44d3de849463abf1" +dependencies = [ + "bitflags 2.9.0", + "rustc_version", +] + [[package]] name = "flate2" version = "1.1.1" @@ -3272,6 +3350,17 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -4087,10 +4176,10 @@ dependencies = [ "http", "hyper", "hyper-util", - "rustls", + "rustls 0.23.26", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tower-service", "webpki-roots", ] @@ -4334,6 +4423,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -4356,6 +4446,12 @@ dependencies = [ "cfb", ] +[[package]] +name = "inlinable_string" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" + [[package]] name = "inout" version = "0.1.4" @@ -4737,19 +4833,19 @@ dependencies = [ [[package]] name = "libsystemd" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c592dc396b464005f78a5853555b9f240bc5378bf5221acc4e129910b2678869" +checksum = "b85fe9dc49de659d05829fdf72b5770c0a5952d1055c34a39f6d4e932bce175d" dependencies = [ "hmac 0.12.1", "libc", "log", - "nix 0.27.1", - "nom", + "nix", + "nom 8.0.0", "once_cell", "serde", "sha2 0.10.8", - "thiserror 1.0.69", + "thiserror 2.0.12", "uuid", ] @@ -5169,18 +5265,6 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" -[[package]] -name = "nix" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" -dependencies = [ - "bitflags 2.9.0", - "cfg-if", - "libc", - "memoffset", -] - [[package]] name = "nix" version = "0.29.0" @@ -5210,6 +5294,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nom" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405" +dependencies = [ + "memchr", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -5606,6 +5699,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + [[package]] name = "opentelemetry" version = "0.29.1" @@ -5959,6 +6058,29 @@ dependencies = [ "hmac 0.12.1", ] +[[package]] +name = "pear" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdeeaa00ce488657faba8ebf44ab9361f9365a97bd39ffb8a60663f57ff4b467" +dependencies = [ + "inlinable_string", + "pear_codegen", + "yansi", +] + +[[package]] +name = "pear_codegen" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bab5b985dc082b345f812b7df84e1bef27e7207b39e448439ba8bd69c93f147" +dependencies = [ + "proc-macro2", + "proc-macro2-diagnostics", + "quote", + "syn 2.0.100", +] + [[package]] name = "pem" version = "3.0.5" @@ -6437,6 +6559,7 @@ dependencies = [ "quote", "syn 2.0.100", "version_check", + "yansi", ] [[package]] @@ -6531,7 +6654,7 @@ name = "protos" version = "0.0.1" dependencies = [ "common", - "flatbuffers", + "flatbuffers 25.2.10", "prost", "prost-build", "protobuf", @@ -6589,7 +6712,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.1.1", - "rustls", + "rustls 0.23.26", "socket2", "thiserror 2.0.12", "tokio", @@ -6608,7 +6731,7 @@ dependencies = [ "rand 0.9.0", "ring", "rustc-hash 2.1.1", - "rustls", + "rustls 0.23.26", "rustls-pki-types", "slab", "thiserror 2.0.12", @@ -6946,7 +7069,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls", + "rustls 0.23.26", "rustls-pemfile", "rustls-pki-types", "serde", @@ -6955,7 +7078,7 @@ dependencies = [ "sync_wrapper", "system-configuration", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tokio-util", "tower 0.5.2", "tower-service", @@ -7083,6 +7206,24 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rumqttc" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1568e15fab2d546f940ed3a21f48bbbd1c494c90c99c4481339364a497f94a9" +dependencies = [ + "bytes", + "flume", + "futures-util", + "log", + "rustls-native-certs", + "rustls-pemfile", + "rustls-webpki 0.102.8", + "thiserror 1.0.69", + "tokio", + "tokio-rustls 0.25.0", +] + [[package]] name = "rust-embed" version = "8.7.0" @@ -7176,7 +7317,7 @@ dependencies = [ "crypto", "datafusion", "ecstore", - "flatbuffers", + "flatbuffers 25.2.10", "futures", "futures-util", "http", @@ -7202,7 +7343,7 @@ dependencies = [ "rust-embed", "rustfs-event-notifier", "rustfs-obs", - "rustls", + "rustls 0.23.26", "rustls-pemfile", "rustls-pki-types", "s3s", @@ -7213,7 +7354,7 @@ dependencies = [ "tikv-jemallocator", "time", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tokio-stream", "tokio-util", "tonic 0.13.0", @@ -7230,12 +7371,24 @@ name = "rustfs-event-notifier" version = "0.0.1" dependencies = [ "async-trait", - "parking_lot 0.12.3", + "axum", + "chrono", + "dotenv", + "figment", + "rdkafka", "reqwest", + "rumqttc", "serde", "serde_json", + "serde_with", + "smallvec", + "strum", "thiserror 2.0.12", "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", + "uuid", "wildmatch", ] @@ -7314,6 +7467,20 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki 0.102.8", + "subtle", + "zeroize", +] + [[package]] name = "rustls" version = "0.23.26" @@ -7325,11 +7492,24 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.103.1", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -7348,6 +7528,17 @@ dependencies = [ "web-time", ] +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.1" @@ -7400,7 +7591,7 @@ dependencies = [ "md-5", "memchr", "mime", - "nom", + "nom 7.1.3", "nugine-rust-utils", "numeric_cast", "pin-project-lite", @@ -7442,6 +7633,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -7509,6 +7709,9 @@ name = "semver" version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" +dependencies = [ + "serde", +] [[package]] name = "send_wrapper" @@ -7621,6 +7824,49 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa" +dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.9.0", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.100", +] + +[[package]] +name = "serde_yaml" +version = "0.9.34+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" +dependencies = [ + "indexmap 2.9.0", + "itoa 1.0.15", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "server_fn" version = "0.6.15" @@ -7730,13 +7976,16 @@ dependencies = [ [[package]] name = "shadow-rs" -version = "0.38.1" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ec14cc798c29f4bf74a6c4299c657c04d4e9fba03875c1f0eec569af03aed89" +checksum = "6d5625ed609cf66d7e505e7d487aca815626dc4ebb6c0dd07637ca61a44651a6" dependencies = [ + "cargo_metadata", "const_format", "is_debug", + "serde_json", "time", + "tzdb", ] [[package]] @@ -7881,6 +8130,9 @@ name = "smallvec" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9" +dependencies = [ + "serde", +] [[package]] name = "snafu" @@ -7951,6 +8203,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "spki" @@ -8452,13 +8707,24 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls", + "rustls 0.23.26", "tokio", ] @@ -8888,6 +9154,32 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" +[[package]] +name = "tz-rs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1450bf2b99397e72070e7935c89facaa80092ac812502200375f1f7d33c71a1" + +[[package]] +name = "tzdb" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0be2ea5956f295449f47c0b825c5e109022ff1a6a53bb4f77682a87c2341fbf5" +dependencies = [ + "iana-time-zone", + "tz-rs", + "tzdb_data", +] + +[[package]] +name = "tzdb_data" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c4c81d75033770e40fbd3643ce7472a1a9fd301f90b7139038228daf8af03ec" +dependencies = [ + "tz-rs", +] + [[package]] name = "ucd-trie" version = "0.1.7" @@ -8905,6 +9197,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "uncased" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697" +dependencies = [ + "version_check", +] + [[package]] name = "unicase" version = "2.8.1" @@ -8945,6 +9246,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + [[package]] name = "untrusted" version = "0.9.0" @@ -9944,6 +10251,12 @@ dependencies = [ "hashlink", ] +[[package]] +name = "yansi" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" + [[package]] name = "yoke" version = "0.7.5" @@ -9984,7 +10297,7 @@ dependencies = [ "futures-sink", "futures-util", "hex", - "nix 0.29.0", + "nix", "ordered-stream", "rand 0.8.5", "serde", @@ -10015,7 +10328,7 @@ dependencies = [ "futures-core", "futures-lite", "hex", - "nix 0.29.0", + "nix", "ordered-stream", "serde", "serde_repr", diff --git a/Cargo.toml b/Cargo.toml index 99e5073f..52bd76da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,8 +51,10 @@ datafusion = "46.0.0" derive_builder = "0.20.2" dioxus = { version = "0.6.3", features = ["router"] } dirs = "6.0.0" +dotenv = "0.15.0" ecstore = { path = "./ecstore" } -flatbuffers = "24.12.23" +figment = { version = "0.10.19", features = ["toml", "yaml", "env"] } +flatbuffers = "25.2.10" futures = "0.3.31" futures-util = "0.3.31" common = { path = "./common/common" } @@ -71,7 +73,7 @@ jsonwebtoken = "9.3.1" keyring = { version = "3.6.2", features = ["apple-native", "windows-native", "sync-secret-service"] } lock = { path = "./common/lock" } lazy_static = "1.5.0" -libsystemd = { version = "0.7" } +libsystemd = { version = "0.7.1" } local-ip-address = "0.6.3" matchit = "0.8.4" md-5 = "0.10.6" @@ -79,13 +81,13 @@ mime = "0.3.17" netif = "0.1.6" opentelemetry = { version = "0.29.1" } opentelemetry-appender-tracing = { version = "0.29.1", features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] } -opentelemetry_sdk = { version = "0.29" } +opentelemetry_sdk = { version = "0.29.0" } opentelemetry-stdout = { version = "0.29.0" } -opentelemetry-otlp = { version = "0.29" } +opentelemetry-otlp = { version = "0.29.0" } opentelemetry-prometheus = { version = "0.29.1" } opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] } parking_lot = "0.12.3" -pin-project-lite = "0.2" +pin-project-lite = "0.2.16" prometheus = "0.14.0" # pin-utils = "0.1.0" prost = "0.13.5" @@ -94,11 +96,12 @@ prost-types = "0.13.5" protobuf = "3.7" protos = { path = "./common/protos" } rand = "0.8.5" -rdkafka = { version = "0.37", features = ["tokio"] } +rdkafka = { version = "0.37.0", features = ["tokio"] } reqwest = { version = "0.12.15", default-features = false, features = ["rustls-tls", "charset", "http2", "macos-system-configuration", "stream", "json", "blocking"] } rfd = { version = "0.15.3", default-features = false, features = ["xdg-portal", "tokio"] } rmp = "0.8.14" rmp-serde = "1.3.0" +rumqttc = { version = "0.24" } rustfs-obs = { path = "crates/obs", version = "0.0.1" } rustfs-event-notifier = { path = "crates/event-notifier", version = "0.0.1" } rust-embed = "8.7.0" @@ -107,10 +110,13 @@ rustls-pki-types = "1.11.0" rustls-pemfile = "2.2.0" s3s = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" } s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" } -shadow-rs = { version = "0.38.1", default-features = false } +shadow-rs = { version = "1.1.1", default-features = false, features = ["metadata"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" serde_urlencoded = "0.7.1" +serde_with = "3.12.0" +smallvec = { version = "1.15.0", features = ["serde"] } +strum = { version = "0.27.1", features = ["derive"] } sha2 = "0.10.8" snafu = "0.8.5" tempfile = "3.19.1" @@ -126,7 +132,7 @@ time = { version = "0.3.41", features = [ tokio = { version = "1.44.2", features = ["fs", "rt-multi-thread"] } tonic = { version = "0.13.0", features = ["gzip"] } tonic-build = "0.13.0" -tokio-rustls = { version = "0.26", default-features = false } +tokio-rustls = { version = "0.26.2", default-features = false } tokio-stream = "0.1.17" tokio-util = { version = "0.7.14", features = ["io", "compat"] } tower = { version = "0.5.2", features = ["timeout"] } @@ -136,7 +142,7 @@ tracing-core = "0.1.33" tracing-error = "0.2.1" tracing-subscriber = { version = "0.3.19", features = ["env-filter", "time"] } tracing-appender = "0.2.3" -tracing-opentelemetry = "0.30" +tracing-opentelemetry = "0.30.0" transform-stream = "0.3.1" url = "2.5.4" uuid = { version = "1.16.0", features = [ @@ -144,7 +150,6 @@ uuid = { version = "1.16.0", features = [ "fast-rng", "macro-diagnostics", ] } -wildmatch = "2.4.0" workers = { path = "./common/workers" } [profile.wasm-dev] diff --git a/crates/event-notifier/Cargo.toml b/crates/event-notifier/Cargo.toml index 2e4afc7b..065cc698 100644 --- a/crates/event-notifier/Cargo.toml +++ b/crates/event-notifier/Cargo.toml @@ -6,16 +6,36 @@ repository.workspace = true rust-version.workspace = true version.workspace = true +[features] +default = ["webhook"] +webhook = ["dep:reqwest"] +kafka = ["rdkafka"] +mqtt = ["rumqttc"] +http-producer = ["dep:axum"] + [dependencies] async-trait = { workspace = true } -parking_lot = { workspace = true } -reqwest = { workspace = true } +axum = { workspace = true, optional = true } +chrono = { workspace = true, features = ["serde"] } +dotenv = { workspace = true } +figment = { workspace = true, features = ["toml", "yaml", "env"] } +rdkafka = { workspace = true, features = ["tokio"], optional = true } +reqwest = { workspace = true, optional = true } +rumqttc = { workspace = true, optional = true } serde = { workspace = true } -tokio = { workspace = true, features = ["full"] } -thiserror = { workspace = true } -wildmatch = { workspace = true } serde_json = { workspace = true } +serde_with = { workspace = true } +smallvec = { workspace = true, features = ["serde"] } +strum = { workspace = true, features = ["derive"] } +tracing = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["sync", "net", "macros", "signal", "rt-multi-thread"] } +tokio-util = { workspace = true } +uuid = { workspace = true, features = ["v4", "serde"] } +[dev-dependencies] +tokio = { workspace = true, features = ["test-util"] } +tracing-subscriber = { workspace = true } [lints] workspace = true diff --git a/crates/event-notifier/examples/.env.example b/crates/event-notifier/examples/.env.example new file mode 100644 index 00000000..d42bc9fc --- /dev/null +++ b/crates/event-notifier/examples/.env.example @@ -0,0 +1,27 @@ +# basic configuration +EVENT_NOTIF_STORE_PATH=/var/log/event-notification +EVENT_NOTIF_CHANNEL_CAPACITY=5000 + +# webhook adapter configuration +EVENT_NOTIF_ADAPTERS__0__TYPE=Webhook +EVENT_NOTIF_ADAPTERS__0__ENDPOINT=https://api.example.com/webhook +EVENT_NOTIF_ADAPTERS__0__AUTH_TOKEN=your-secret-token +EVENT_NOTIF_ADAPTERS__0__MAX_RETRIES=3 +EVENT_NOTIF_ADAPTERS__0__TIMEOUT=5000 + +# kafka adapter configuration +EVENT_NOTIF_ADAPTERS__1__TYPE=Kafka +EVENT_NOTIF_ADAPTERS__1__BROKERS=localhost:9092 +EVENT_NOTIF_ADAPTERS__1__TOPIC=notifications +EVENT_NOTIF_ADAPTERS__1__MAX_RETRIES=3 +EVENT_NOTIF_ADAPTERS__1__TIMEOUT=5000 + +# mqtt adapter configuration +EVENT_NOTIF_ADAPTERS__2__TYPE=Mqtt +EVENT_NOTIF_ADAPTERS__2__BROKER=mqtt.example.com +EVENT_NOTIF_ADAPTERS__2__PORT=1883 +EVENT_NOTIF_ADAPTERS__2__CLIENT_ID=event-notifier +EVENT_NOTIF_ADAPTERS__2__TOPIC=events +EVENT_NOTIF_ADAPTERS__2__MAX_RETRIES=3 + +EVENT_NOTIF_HTTP__PORT=8080 \ No newline at end of file diff --git a/crates/event-notifier/examples/event.toml b/crates/event-notifier/examples/event.toml new file mode 100644 index 00000000..dcde5abf --- /dev/null +++ b/crates/event-notifier/examples/event.toml @@ -0,0 +1,28 @@ +# config.toml +store_path = "/var/log/event-notification" +channel_capacity = 5000 + +[[adapters]] +type = "Webhook" +endpoint = "https://api.example.com/webhook" +auth_token = "your-auth-token" +max_retries = 3 +timeout = 5000 + +[[adapters]] +type = "Kafka" +brokers = "localhost:9092" +topic = "notifications" +max_retries = 3 +timeout = 5000 + +[[adapters]] +type = "Mqtt" +broker = "mqtt.example.com" +port = 1883 +client_id = "event-notifier" +topic = "events" +max_retries = 3 + +[http] +port = 8080 \ No newline at end of file diff --git a/crates/event-notifier/examples/simple.rs b/crates/event-notifier/examples/simple.rs new file mode 100644 index 00000000..5324586a --- /dev/null +++ b/crates/event-notifier/examples/simple.rs @@ -0,0 +1,102 @@ +use rustfs_event_notifier::create_adapters; +use rustfs_event_notifier::NotificationSystem; +use rustfs_event_notifier::{AdapterConfig, NotificationConfig, WebhookConfig}; +use rustfs_event_notifier::{Bucket, Event, Identity, Metadata, Name, Object, Source}; +use std::collections::HashMap; +use std::error; +use std::sync::Arc; +use tokio::signal; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let mut config = NotificationConfig { + store_path: "./events".to_string(), + channel_capacity: 100, + adapters: vec![AdapterConfig::Webhook(WebhookConfig { + endpoint: "http://localhost:8080/webhook".to_string(), + auth_token: Some("secret-token".to_string()), + custom_headers: Some(HashMap::from([("X-Custom".to_string(), "value".to_string())])), + max_retries: 3, + timeout: 10, + })], + http: Default::default(), + }; + config.http.port = 8080; + + // loading configuration from specific env files + let _config = NotificationConfig::from_env_file(".env.example")?; + + // loading from a specific file + let _config = NotificationConfig::from_file("event.toml")?; + + // Automatically load from multiple sources (Priority: Environment Variables > YAML > TOML) + let _config = NotificationConfig::load()?; + + let system = Arc::new(tokio::sync::Mutex::new(NotificationSystem::new(config.clone()).await?)); + let adapters = create_adapters(&config.adapters)?; + + // create an s3 metadata object + let metadata = Metadata { + schema_version: "1.0".to_string(), + configuration_id: "test-config".to_string(), + bucket: Bucket { + name: "my-bucket".to_string(), + owner_identity: Identity { + principal_id: "owner123".to_string(), + }, + arn: "arn:aws:s3:::my-bucket".to_string(), + }, + object: Object { + key: "test.txt".to_string(), + size: Some(1024), + etag: Some("abc123".to_string()), + content_type: Some("text/plain".to_string()), + user_metadata: None, + version_id: None, + sequencer: "1234567890".to_string(), + }, + }; + + // create source object + let source = Source { + host: "localhost".to_string(), + port: "80".to_string(), + user_agent: "curl/7.68.0".to_string(), + }; + + // create events using builder mode + let event = Event::builder() + .event_time("2023-10-01T12:00:00.000Z") + .event_name(Name::ObjectCreatedPut) + .user_identity(Identity { + principal_id: "user123".to_string(), + }) + .s3(metadata) + .source(source) + .channels(vec!["webhook".to_string()]) + .build() + .expect("failed to create event"); + + { + let system = system.lock().await; + system.send_event(event).await?; + } + + let system_clone = Arc::clone(&system); + let system_handle = tokio::spawn(async move { + let mut system = system_clone.lock().await; + system.start(adapters).await + }); + + signal::ctrl_c().await?; + tracing::info!("Received shutdown signal"); + { + let system = system.lock().await; + system.shutdown(); + } + + system_handle.await??; + Ok(()) +} diff --git a/crates/event-notifier/src/adapter/kafka.rs b/crates/event-notifier/src/adapter/kafka.rs new file mode 100644 index 00000000..cfb55e6d --- /dev/null +++ b/crates/event-notifier/src/adapter/kafka.rs @@ -0,0 +1,76 @@ +use crate::ChannelAdapter; +use crate::Error; +use crate::Event; +use crate::KafkaConfig; +use async_trait::async_trait; +use rdkafka::error::KafkaError; +use rdkafka::producer::{FutureProducer, FutureRecord}; +use rdkafka::types::RDKafkaErrorCode; +use rdkafka::util::Timeout; +use std::time::Duration; +use tokio::time::sleep; + +/// Kafka adapter for sending events to a Kafka topic. +pub struct KafkaAdapter { + producer: FutureProducer, + topic: String, + max_retries: u32, +} + +impl KafkaAdapter { + /// Creates a new Kafka adapter. + pub fn new(config: &KafkaConfig) -> Result { + // Create a Kafka producer with the provided configuration. + let producer = rdkafka::config::ClientConfig::new() + .set("bootstrap.servers", &config.brokers) + .set("message.timeout.ms", config.timeout.to_string()) + .create()?; + + Ok(Self { + producer, + topic: config.topic.clone(), + max_retries: config.max_retries, + }) + } + /// Sends an event to the Kafka topic with retry logic. + async fn send_with_retry(&self, event: &Event) -> Result<(), Error> { + let event_id = event.id.to_string(); + let payload = serde_json::to_string(&event)?; + + for attempt in 0..self.max_retries { + let record = FutureRecord::to(&self.topic) + .key(&event_id) + .payload(&payload); + + match self.producer.send(record, Timeout::Never).await { + Ok(_) => return Ok(()), + Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) => { + tracing::warn!( + "Kafka attempt {} failed: Queue full. Retrying...", + attempt + 1 + ); + sleep(Duration::from_secs(2u64.pow(attempt))).await; + } + Err((e, _)) => { + tracing::error!("Kafka send error: {}", e); + return Err(Error::Kafka(e)); + } + } + } + + Err(Error::Custom( + "Exceeded maximum retry attempts for Kafka message".to_string(), + )) + } +} + +#[async_trait] +impl ChannelAdapter for KafkaAdapter { + fn name(&self) -> String { + "kafka".to_string() + } + + async fn send(&self, event: &Event) -> Result<(), Error> { + self.send_with_retry(event).await + } +} diff --git a/crates/event-notifier/src/adapter/mod.rs b/crates/event-notifier/src/adapter/mod.rs new file mode 100644 index 00000000..9b8cab9f --- /dev/null +++ b/crates/event-notifier/src/adapter/mod.rs @@ -0,0 +1,54 @@ +use crate::AdapterConfig; +use crate::Error; +use crate::Event; +use async_trait::async_trait; +use std::sync::Arc; + +#[cfg(feature = "kafka")] +pub(crate) mod kafka; +#[cfg(feature = "mqtt")] +pub(crate) mod mqtt; +#[cfg(feature = "webhook")] +pub(crate) mod webhook; + +/// The `ChannelAdapter` trait defines the interface for all channel adapters. +#[async_trait] +pub trait ChannelAdapter: Send + Sync + 'static { + /// Sends an event to the channel. + fn name(&self) -> String; + /// Sends an event to the channel. + async fn send(&self, event: &Event) -> Result<(), Error>; +} + +/// Creates channel adapters based on the provided configuration. +pub fn create_adapters(configs: &[AdapterConfig]) -> Result>, Box> { + let mut adapters: Vec> = Vec::new(); + + for config in configs { + match config { + #[cfg(feature = "webhook")] + AdapterConfig::Webhook(webhook_config) => { + webhook_config.validate().map_err(|e| Box::new(Error::ConfigError(e)))?; + adapters.push(Arc::new(webhook::WebhookAdapter::new(webhook_config.clone()))); + } + #[cfg(feature = "kafka")] + AdapterConfig::Kafka(kafka_config) => { + adapters.push(Arc::new(kafka::KafkaAdapter::new(kafka_config)?)); + } + #[cfg(feature = "mqtt")] + AdapterConfig::Mqtt(mqtt_config) => { + let (mqtt, mut event_loop) = mqtt::MqttAdapter::new(mqtt_config); + tokio::spawn(async move { while event_loop.poll().await.is_ok() {} }); + adapters.push(Arc::new(mqtt)); + } + #[cfg(not(feature = "webhook"))] + AdapterConfig::Webhook(_) => return Err(Box::new(Error::FeatureDisabled("webhook"))), + #[cfg(not(feature = "kafka"))] + AdapterConfig::Kafka(_) => return Err(Box::new(Error::FeatureDisabled("kafka"))), + #[cfg(not(feature = "mqtt"))] + AdapterConfig::Mqtt(_) => return Err(Box::new(Error::FeatureDisabled("mqtt"))), + } + } + + Ok(adapters) +} diff --git a/crates/event-notifier/src/adapter/mqtt.rs b/crates/event-notifier/src/adapter/mqtt.rs new file mode 100644 index 00000000..9aab61e8 --- /dev/null +++ b/crates/event-notifier/src/adapter/mqtt.rs @@ -0,0 +1,58 @@ +use crate::ChannelAdapter; +use crate::Error; +use crate::Event; +use crate::MqttConfig; +use async_trait::async_trait; +use rumqttc::{AsyncClient, MqttOptions, QoS}; +use std::time::Duration; +use tokio::time::sleep; + +/// MQTT adapter for sending events to an MQTT broker. +pub struct MqttAdapter { + client: AsyncClient, + topic: String, + max_retries: u32, +} + +impl MqttAdapter { + /// Creates a new MQTT adapter. + pub fn new(config: &MqttConfig) -> (Self, rumqttc::EventLoop) { + let mqtt_options = MqttOptions::new(&config.client_id, &config.broker, config.port); + let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_options, 10); + ( + Self { + client, + topic: config.topic.clone(), + max_retries: config.max_retries, + }, + event_loop, + ) + } +} + +#[async_trait] +impl ChannelAdapter for MqttAdapter { + fn name(&self) -> String { + "mqtt".to_string() + } + + async fn send(&self, event: &Event) -> Result<(), Error> { + let payload = serde_json::to_string(event).map_err(Error::Serde)?; + let mut attempt = 0; + loop { + match self + .client + .publish(&self.topic, QoS::AtLeastOnce, false, payload.clone()) + .await + { + Ok(()) => return Ok(()), + Err(e) if attempt < self.max_retries => { + attempt += 1; + tracing::warn!("MQTT attempt {} failed: {}. Retrying...", attempt, e); + sleep(Duration::from_secs(2u64.pow(attempt))).await; + } + Err(e) => return Err(Error::Mqtt(e)), + } + } + } +} diff --git a/crates/event-notifier/src/adapter/webhook.rs b/crates/event-notifier/src/adapter/webhook.rs new file mode 100644 index 00000000..80b8c9cb --- /dev/null +++ b/crates/event-notifier/src/adapter/webhook.rs @@ -0,0 +1,63 @@ +use crate::ChannelAdapter; +use crate::Error; +use crate::Event; +use crate::WebhookConfig; +use async_trait::async_trait; +use reqwest::{Client, RequestBuilder}; +use std::time::Duration; +use tokio::time::sleep; + +/// Webhook adapter for sending events to a webhook endpoint. +pub struct WebhookAdapter { + config: WebhookConfig, + client: Client, +} + +impl WebhookAdapter { + /// Creates a new Webhook adapter. + pub fn new(config: WebhookConfig) -> Self { + let client = Client::builder() + .timeout(Duration::from_secs(config.timeout)) + .build() + .expect("Failed to build reqwest client"); + Self { config, client } + } + /// Builds the request to send the event. + fn build_request(&self, event: &Event) -> RequestBuilder { + let mut request = self.client.post(&self.config.endpoint).json(event); + if let Some(token) = &self.config.auth_token { + request = request.header("Authorization", format!("Bearer {}", token)); + } + if let Some(headers) = &self.config.custom_headers { + for (key, value) in headers { + request = request.header(key, value); + } + } + request + } +} + +#[async_trait] +impl ChannelAdapter for WebhookAdapter { + fn name(&self) -> String { + "webhook".to_string() + } + + async fn send(&self, event: &Event) -> Result<(), Error> { + let mut attempt = 0; + loop { + match self.build_request(event).send().await { + Ok(response) => { + response.error_for_status().map_err(Error::Http)?; + return Ok(()); + } + Err(e) if attempt < self.config.max_retries => { + attempt += 1; + tracing::warn!("Webhook attempt {} failed: {}. Retrying...", attempt, e); + sleep(Duration::from_secs(2u64.pow(attempt))).await; + } + Err(e) => return Err(Error::Http(e)), + } + } + } +} diff --git a/crates/event-notifier/src/bus.rs b/crates/event-notifier/src/bus.rs new file mode 100644 index 00000000..0ef341ce --- /dev/null +++ b/crates/event-notifier/src/bus.rs @@ -0,0 +1,68 @@ +use crate::ChannelAdapter; +use crate::Error; +use crate::EventStore; +use crate::{Event, Log}; +use chrono::Utc; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +/// Handles incoming events from the producer. +/// +/// This function is responsible for receiving events from the producer and sending them to the appropriate adapters. +/// It also handles the shutdown process and saves any pending logs to the event store. +pub async fn event_bus( + mut rx: mpsc::Receiver, + adapters: Vec>, + store: Arc, + shutdown: CancellationToken, +) -> Result<(), Error> { + let mut pending_logs = Vec::new(); + let mut current_log = Log { + event_name: crate::event::Name::Everything, + key: Utc::now().timestamp().to_string(), + records: Vec::new(), + }; + + loop { + tokio::select! { + Some(event) = rx.recv() => { + current_log.records.push(event.clone()); + let mut send_tasks = Vec::new(); + for adapter in &adapters { + if event.channels.contains(&adapter.name()) { + let adapter = adapter.clone(); + let event = event.clone(); + send_tasks.push(tokio::spawn(async move { + if let Err(e) = adapter.send(&event).await { + tracing::error!("Failed to send event to {}: {}", adapter.name(), e); + Err(e) + } else { + Ok(()) + } + })); + } + } + for task in send_tasks { + if task.await?.is_err() { + current_log.records.retain(|e| e.id != event.id); + } + } + if !current_log.records.is_empty() { + pending_logs.push(current_log.clone()); + } + current_log.records.clear(); + } + _ = shutdown.cancelled() => { + tracing::info!("Shutting down event bus, saving pending logs..."); + if !current_log.records.is_empty() { + pending_logs.push(current_log); + } + store.save_logs(&pending_logs).await?; + break; + } + else => break, + } + } + Ok(()) +} diff --git a/crates/event-notifier/src/config.rs b/crates/event-notifier/src/config.rs new file mode 100644 index 00000000..823bf83d --- /dev/null +++ b/crates/event-notifier/src/config.rs @@ -0,0 +1,161 @@ +use crate::Error; +use figment::providers::Format; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Configuration for the notification system. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WebhookConfig { + pub endpoint: String, + pub auth_token: Option, + pub custom_headers: Option>, + pub max_retries: u32, + pub timeout: u64, +} + +impl WebhookConfig { + /// verify that the configuration is valid + pub fn validate(&self) -> Result<(), String> { + // verify that endpoint cannot be empty + if self.endpoint.trim().is_empty() { + return Err("Webhook endpoint cannot be empty".to_string()); + } + + // verification timeout must be reasonable + if self.timeout == 0 { + return Err("Webhook timeout must be greater than 0".to_string()); + } + + // Verify that the maximum number of retry is reasonable + if self.max_retries > 10 { + return Err("Maximum retry count cannot exceed 10".to_string()); + } + + Ok(()) + } +} + +/// Configuration for the Kafka adapter. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct KafkaConfig { + pub brokers: String, + pub topic: String, + pub max_retries: u32, + pub timeout: u64, +} + +/// Configuration for the MQTT adapter. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MqttConfig { + pub broker: String, + pub port: u16, + pub client_id: String, + pub topic: String, + pub max_retries: u32, +} + +/// Configuration for the notification system. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum AdapterConfig { + Webhook(WebhookConfig), + Kafka(KafkaConfig), + Mqtt(MqttConfig), +} + +/// http producer configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HttpProducerConfig { + #[serde(default = "default_http_port")] + pub port: u16, +} + +impl Default for HttpProducerConfig { + fn default() -> Self { + Self { + port: default_http_port(), + } + } +} + +fn default_http_port() -> u16 { + 3000 +} + +/// Configuration for the notification system. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NotificationConfig { + #[serde(default = "default_store_path")] + pub store_path: String, + #[serde(default = "default_channel_capacity")] + pub channel_capacity: usize, + pub adapters: Vec, + #[serde(default)] + pub http: HttpProducerConfig, +} + +impl Default for NotificationConfig { + fn default() -> Self { + Self { + store_path: default_store_path(), + channel_capacity: default_channel_capacity(), + adapters: Vec::new(), + http: HttpProducerConfig::default(), + } + } +} + +impl NotificationConfig { + /// create a new configuration with default values + pub fn new() -> Self { + Self::default() + } + + /// create a configuration from a configuration file + pub fn from_file(path: &str) -> Result { + let config = figment::Figment::new() + .merge(figment::providers::Toml::file(path)) + .extract()?; + + Ok(config) + } + + /// Read configuration from multiple sources (support TOML, YAML, .env) + pub fn load() -> Result { + let figment = figment::Figment::new() + // First try to read the config.toml of the current directory + .merge(figment::providers::Toml::file("event.toml")) + // Then try to read the config.yaml of the current directory + .merge(figment::providers::Yaml::file("event.yaml")) + // Finally read the environment variable and overwrite the previous value + .merge(figment::providers::Env::prefixed("EVENT_NOTIF_")); + + Ok(figment.extract()?) + } + + /// loading configuration from env file + pub fn from_env_file(path: &str) -> Result { + // loading env files + dotenv::from_path(path) + .map_err(|e| Error::ConfigError(format!("unable to load env file: {}", e)))?; + + // Extract configuration from environment variables using figurement + let figment = + figment::Figment::new().merge(figment::providers::Env::prefixed("EVENT_NOTIF_")); + + Ok(figment.extract()?) + } +} + +/// Provide temporary directories as default storage paths +fn default_store_path() -> String { + std::env::temp_dir() + .join("event-notification") + .to_string_lossy() + .to_string() +} + +/// Provides the recommended default channel capacity for high concurrency systems +fn default_channel_capacity() -> usize { + 10000 // Reasonable default values for high concurrency systems +} diff --git a/crates/event-notifier/src/error.rs b/crates/event-notifier/src/error.rs index 1945c351..9f1e41b4 100644 --- a/crates/event-notifier/src/error.rs +++ b/crates/event-notifier/src/error.rs @@ -1,25 +1,45 @@ -// src/error.rs use thiserror::Error; +use tokio::sync::mpsc::error; +use tokio::task::JoinError; +/// The `Error` enum represents all possible errors that can occur in the application. +/// It implements the `std::error::Error` trait and provides a way to convert various error types into a single error type. #[derive(Error, Debug)] pub enum Error { - #[error("target not found: {0}")] - TargetNotFound(String), - - #[error("send failed: {0}")] - SendError(String), - - #[error("target error: {0}")] - TargetError(String), - - #[error("invalid configuration: {0}")] + #[error("Join error: {0}")] + JoinError(#[from] JoinError), + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + #[error("Serialization error: {0}")] + Serde(#[from] serde_json::Error), + #[error("HTTP error: {0}")] + Http(#[from] reqwest::Error), + #[cfg(feature = "kafka")] + #[error("Kafka error: {0}")] + Kafka(#[from] rdkafka::error::KafkaError), + #[cfg(feature = "mqtt")] + #[error("MQTT error: {0}")] + Mqtt(#[from] rumqttc::ClientError), + #[error("Channel send error: {0}")] + ChannelSend(#[from] Box>), + #[error("Feature disabled: {0}")] + FeatureDisabled(&'static str), + #[error("Event bus already started")] + EventBusStarted, + #[error("necessary fields are missing:{0}")] + MissingField(&'static str), + #[error("field verification failed:{0}")] + ValidationError(&'static str), + #[error("{0}")] + Custom(String), + #[error("Configuration error: {0}")] ConfigError(String), - - #[error("store error: {0}")] - StoreError(String), - - #[error("invalid event name: {0}")] - InvalidEventName(String), // 添加此变体 + #[error("Configuration loading error: {0}")] + Figment(#[from] figment::Error), } -pub type Result = std::result::Result; +impl Error { + pub(crate) fn custom(msg: &str) -> Error { + Self::Custom(msg.to_string()) + } +} diff --git a/crates/event-notifier/src/event.rs b/crates/event-notifier/src/event.rs index 695f2c3d..00bc5983 100644 --- a/crates/event-notifier/src/event.rs +++ b/crates/event-notifier/src/event.rs @@ -1,53 +1,442 @@ +use crate::Error; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use serde_with::{DeserializeFromStr, SerializeDisplay}; +use smallvec::{smallvec, SmallVec}; +use std::borrow::Cow; use std::collections::HashMap; +use strum::{Display, EnumString}; +use uuid::Uuid; -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Event { - pub event_version: String, - pub event_source: String, - pub aws_region: String, - pub event_time: String, - pub event_name: String, - pub user_identity: Identity, - pub request_parameters: HashMap, - pub response_elements: HashMap, - pub s3: Metadata, - pub source: Source, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct Identity { + #[serde(rename = "principalId")] pub principal_id: String, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Bucket { + pub name: String, + #[serde(rename = "ownerIdentity")] + pub owner_identity: Identity, + pub arn: String, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Object { + pub key: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub size: Option, + #[serde(default, skip_serializing_if = "Option::is_none", rename = "eTag")] + pub etag: Option, + #[serde(default, skip_serializing_if = "Option::is_none", rename = "contentType")] + pub content_type: Option, + #[serde(default, skip_serializing_if = "Option::is_none", rename = "userMetadata")] + pub user_metadata: Option>, + #[serde(default, skip_serializing_if = "Option::is_none", rename = "versionId")] + pub version_id: Option, + pub sequencer: String, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct Metadata { + #[serde(rename = "s3SchemaVersion")] pub schema_version: String, + #[serde(rename = "configurationId")] pub configuration_id: String, pub bucket: Bucket, pub object: Object, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Bucket { - pub name: String, - pub owner_identity: Identity, - pub arn: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Object { - pub key: String, - pub version_id: Option, - pub sequencer: String, - pub size: Option, - pub etag: Option, - pub content_type: Option, - pub user_metadata: HashMap, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct Source { pub host: String, + pub port: String, + #[serde(rename = "userAgent")] pub user_agent: String, } + +/// Builder for creating an Event. +/// +/// This struct is used to build an Event object with various parameters. +/// It provides methods to set each parameter and a build method to create the Event. +#[derive(Default, Clone)] +pub struct EventBuilder { + event_version: Option, + event_source: Option, + aws_region: Option, + event_time: Option, + event_name: Option, + user_identity: Option, + request_parameters: Option>, + response_elements: Option>, + s3: Option, + source: Option, + channels: Option>, +} + +impl EventBuilder { + /// create a builder that pre filled default values + pub fn new() -> Self { + Self { + event_version: Some(Cow::Borrowed("2.0").to_string()), + event_source: Some(Cow::Borrowed("aws:s3").to_string()), + aws_region: Some("us-east-1".to_string()), + event_time: Some(Utc::now().to_rfc3339()), + event_name: None, + user_identity: Some(Identity { + principal_id: "anonymous".to_string(), + }), + request_parameters: Some(HashMap::new()), + response_elements: Some(HashMap::new()), + s3: None, + source: None, + channels: Some(Vec::new().into()), + } + } + + /// verify and set the event version + pub fn event_version(mut self, event_version: impl Into) -> Self { + let event_version = event_version.into(); + if !event_version.is_empty() { + self.event_version = Some(event_version); + } + self + } + + /// verify and set the event source + pub fn event_source(mut self, event_source: impl Into) -> Self { + let event_source = event_source.into(); + if !event_source.is_empty() { + self.event_source = Some(event_source); + } + self + } + + /// set up aws regions + pub fn aws_region(mut self, aws_region: impl Into) -> Self { + self.aws_region = Some(aws_region.into()); + self + } + + /// set event time + pub fn event_time(mut self, event_time: impl Into) -> Self { + self.event_time = Some(event_time.into()); + self + } + + /// set event name + pub fn event_name(mut self, event_name: Name) -> Self { + self.event_name = Some(event_name); + self + } + + /// set user identity + pub fn user_identity(mut self, user_identity: Identity) -> Self { + self.user_identity = Some(user_identity); + self + } + + /// set request parameters + pub fn request_parameters(mut self, request_parameters: HashMap) -> Self { + self.request_parameters = Some(request_parameters); + self + } + + /// set response elements + pub fn response_elements(mut self, response_elements: HashMap) -> Self { + self.response_elements = Some(response_elements); + self + } + + /// setting up s3 metadata + pub fn s3(mut self, s3: Metadata) -> Self { + self.s3 = Some(s3); + self + } + + /// set event source information + pub fn source(mut self, source: Source) -> Self { + self.source = Some(source); + self + } + + /// set up the sending channel + pub fn channels(mut self, channels: Vec) -> Self { + self.channels = Some(channels.into()); + self + } + + /// Create a preconfigured builder for common object event scenarios + pub fn for_object_creation(s3: Metadata, source: Source) -> Self { + Self::new().event_name(Name::ObjectCreatedPut).s3(s3).source(source) + } + + /// Create a preconfigured builder for object deletion events + pub fn for_object_removal(s3: Metadata, source: Source) -> Self { + Self::new().event_name(Name::ObjectRemovedDelete).s3(s3).source(source) + } + + /// build event instance + /// + /// Verify the required fields and create a complete Event object + pub fn build(self) -> Result { + let event_version = self.event_version.ok_or(Error::MissingField("event_version"))?; + + let event_source = self.event_source.ok_or(Error::MissingField("event_source"))?; + + let aws_region = self.aws_region.ok_or(Error::MissingField("aws_region"))?; + + let event_time = self.event_time.ok_or(Error::MissingField("event_time"))?; + + let event_name = self.event_name.ok_or(Error::MissingField("event_name"))?; + + let user_identity = self.user_identity.ok_or(Error::MissingField("user_identity"))?; + + let request_parameters = self.request_parameters.unwrap_or_default(); + let response_elements = self.response_elements.unwrap_or_default(); + + let s3 = self.s3.ok_or(Error::MissingField("s3"))?; + + let source = self.source.ok_or(Error::MissingField("source"))?; + + let channels = self.channels.unwrap_or_else(|| smallvec![]); + + Ok(Event { + event_version, + event_source, + aws_region, + event_time, + event_name, + user_identity, + request_parameters, + response_elements, + s3, + source, + id: Uuid::new_v4(), + timestamp: Utc::now(), + channels, + }) + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Event { + #[serde(rename = "eventVersion")] + pub event_version: String, + #[serde(rename = "eventSource")] + pub event_source: String, + #[serde(rename = "awsRegion")] + pub aws_region: String, + #[serde(rename = "eventTime")] + pub event_time: String, + #[serde(rename = "eventName")] + pub event_name: Name, + #[serde(rename = "userIdentity")] + pub user_identity: Identity, + #[serde(rename = "requestParameters")] + pub request_parameters: HashMap, + #[serde(rename = "responseElements")] + pub response_elements: HashMap, + pub s3: Metadata, + pub source: Source, + pub id: Uuid, + pub timestamp: DateTime, + pub channels: SmallVec<[String; 2]>, +} + +impl Event { + /// create a new event builder + /// + /// Returns an EventBuilder instance pre-filled with default values + pub fn builder() -> EventBuilder { + EventBuilder::new() + } + + /// Quickly create Event instances with necessary fields + /// + /// suitable for common s3 event scenarios + pub fn create(event_name: Name, s3: Metadata, source: Source, channels: Vec) -> Self { + Self::builder() + .event_name(event_name) + .s3(s3) + .source(source) + .channels(channels) + .build() + .expect("Failed to create event, missing necessary parameters") + } + + /// a convenient way to create a preconfigured builder + pub fn for_object_creation(s3: Metadata, source: Source) -> EventBuilder { + EventBuilder::for_object_creation(s3, source) + } + + /// a convenient way to create a preconfigured builder + pub fn for_object_removal(s3: Metadata, source: Source) -> EventBuilder { + EventBuilder::for_object_removal(s3, source) + } + + /// Determine whether an event belongs to a specific type + pub fn is_type(&self, event_type: Name) -> bool { + let mask = event_type.mask(); + (self.event_name.mask() & mask) != 0 + } + + /// Determine whether an event needs to be sent to a specific channel + pub fn is_for_channel(&self, channel: &str) -> bool { + self.channels.iter().any(|c| c == channel) + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Log { + #[serde(rename = "eventName")] + pub event_name: Name, + pub key: String, + pub records: Vec, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, SerializeDisplay, DeserializeFromStr, Display, EnumString)] +#[strum(serialize_all = "SCREAMING_SNAKE_CASE")] +pub enum Name { + ObjectAccessedGet, + ObjectAccessedGetRetention, + ObjectAccessedGetLegalHold, + ObjectAccessedHead, + ObjectAccessedAttributes, + ObjectCreatedCompleteMultipartUpload, + ObjectCreatedCopy, + ObjectCreatedPost, + ObjectCreatedPut, + ObjectCreatedPutRetention, + ObjectCreatedPutLegalHold, + ObjectCreatedPutTagging, + ObjectCreatedDeleteTagging, + ObjectRemovedDelete, + ObjectRemovedDeleteMarkerCreated, + ObjectRemovedDeleteAllVersions, + ObjectRemovedNoOp, + BucketCreated, + BucketRemoved, + ObjectReplicationFailed, + ObjectReplicationComplete, + ObjectReplicationMissedThreshold, + ObjectReplicationReplicatedAfterThreshold, + ObjectReplicationNotTracked, + ObjectRestorePost, + ObjectRestoreCompleted, + ObjectTransitionFailed, + ObjectTransitionComplete, + ObjectManyVersions, + ObjectLargeVersions, + PrefixManyFolders, + IlmDelMarkerExpirationDelete, + ObjectAccessedAll, + ObjectCreatedAll, + ObjectRemovedAll, + ObjectReplicationAll, + ObjectRestoreAll, + ObjectTransitionAll, + ObjectScannerAll, + Everything, +} + +impl Name { + pub fn expand(&self) -> Vec { + match self { + Name::ObjectAccessedAll => vec![ + Name::ObjectAccessedGet, + Name::ObjectAccessedHead, + Name::ObjectAccessedGetRetention, + Name::ObjectAccessedGetLegalHold, + Name::ObjectAccessedAttributes, + ], + Name::ObjectCreatedAll => vec![ + Name::ObjectCreatedCompleteMultipartUpload, + Name::ObjectCreatedCopy, + Name::ObjectCreatedPost, + Name::ObjectCreatedPut, + Name::ObjectCreatedPutRetention, + Name::ObjectCreatedPutLegalHold, + Name::ObjectCreatedPutTagging, + Name::ObjectCreatedDeleteTagging, + ], + Name::ObjectRemovedAll => vec![ + Name::ObjectRemovedDelete, + Name::ObjectRemovedDeleteMarkerCreated, + Name::ObjectRemovedNoOp, + Name::ObjectRemovedDeleteAllVersions, + ], + Name::ObjectReplicationAll => vec![ + Name::ObjectReplicationFailed, + Name::ObjectReplicationComplete, + Name::ObjectReplicationNotTracked, + Name::ObjectReplicationMissedThreshold, + Name::ObjectReplicationReplicatedAfterThreshold, + ], + Name::ObjectRestoreAll => vec![Name::ObjectRestorePost, Name::ObjectRestoreCompleted], + Name::ObjectTransitionAll => { + vec![Name::ObjectTransitionFailed, Name::ObjectTransitionComplete] + } + Name::ObjectScannerAll => vec![Name::ObjectManyVersions, Name::ObjectLargeVersions, Name::PrefixManyFolders], + Name::Everything => (1..=Name::IlmDelMarkerExpirationDelete as u32) + .map(|i| Name::from_repr(i).unwrap()) + .collect(), + _ => vec![*self], + } + } + + pub fn mask(&self) -> u64 { + if (*self as u32) < Name::ObjectAccessedAll as u32 { + 1 << (*self as u32 - 1) + } else { + self.expand().iter().fold(0, |acc, n| acc | (1 << (*n as u32 - 1))) + } + } + + fn from_repr(discriminant: u32) -> Option { + match discriminant { + 1 => Some(Name::ObjectAccessedGet), + 2 => Some(Name::ObjectAccessedGetRetention), + 3 => Some(Name::ObjectAccessedGetLegalHold), + 4 => Some(Name::ObjectAccessedHead), + 5 => Some(Name::ObjectAccessedAttributes), + 6 => Some(Name::ObjectCreatedCompleteMultipartUpload), + 7 => Some(Name::ObjectCreatedCopy), + 8 => Some(Name::ObjectCreatedPost), + 9 => Some(Name::ObjectCreatedPut), + 10 => Some(Name::ObjectCreatedPutRetention), + 11 => Some(Name::ObjectCreatedPutLegalHold), + 12 => Some(Name::ObjectCreatedPutTagging), + 13 => Some(Name::ObjectCreatedDeleteTagging), + 14 => Some(Name::ObjectRemovedDelete), + 15 => Some(Name::ObjectRemovedDeleteMarkerCreated), + 16 => Some(Name::ObjectRemovedDeleteAllVersions), + 17 => Some(Name::ObjectRemovedNoOp), + 18 => Some(Name::BucketCreated), + 19 => Some(Name::BucketRemoved), + 20 => Some(Name::ObjectReplicationFailed), + 21 => Some(Name::ObjectReplicationComplete), + 22 => Some(Name::ObjectReplicationMissedThreshold), + 23 => Some(Name::ObjectReplicationReplicatedAfterThreshold), + 24 => Some(Name::ObjectReplicationNotTracked), + 25 => Some(Name::ObjectRestorePost), + 26 => Some(Name::ObjectRestoreCompleted), + 27 => Some(Name::ObjectTransitionFailed), + 28 => Some(Name::ObjectTransitionComplete), + 29 => Some(Name::ObjectManyVersions), + 30 => Some(Name::ObjectLargeVersions), + 31 => Some(Name::PrefixManyFolders), + 32 => Some(Name::IlmDelMarkerExpirationDelete), + 33 => Some(Name::ObjectAccessedAll), + 34 => Some(Name::ObjectCreatedAll), + 35 => Some(Name::ObjectRemovedAll), + 36 => Some(Name::ObjectReplicationAll), + 37 => Some(Name::ObjectRestoreAll), + 38 => Some(Name::ObjectTransitionAll), + 39 => Some(Name::ObjectScannerAll), + 40 => Some(Name::Everything), + _ => None, + } + } +} diff --git a/crates/event-notifier/src/event_name.rs b/crates/event-notifier/src/event_name.rs deleted file mode 100644 index c60cce25..00000000 --- a/crates/event-notifier/src/event_name.rs +++ /dev/null @@ -1,79 +0,0 @@ -use crate::error::Error; -use serde::{Deserialize, Serialize}; -use std::str::FromStr; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[repr(u64)] -pub enum EventName { - // 对象操作事件 - ObjectCreatedAll = 1 << 0, - ObjectCreatedPut = 1 << 1, - ObjectCreatedPost = 1 << 2, - ObjectCreatedCopy = 1 << 3, - ObjectCreatedCompleteMultipartUpload = 1 << 4, - - ObjectRemovedAll = 1 << 5, - ObjectRemovedDelete = 1 << 6, - ObjectRemovedDeleteMarkerCreated = 1 << 7, - - ObjectAccessedAll = 1 << 8, - ObjectAccessedGet = 1 << 9, - ObjectAccessedHead = 1 << 10, - - ObjectRestoredAll = 1 << 11, - ObjectRestoredPost = 1 << 12, - ObjectRestoredCompleted = 1 << 13, - - ReplicationAll = 1 << 14, - ReplicationFailed = 1 << 15, - ReplicationComplete = 1 << 16, -} - -impl EventName { - pub fn mask(&self) -> u64 { - *self as u64 - } - - pub fn expand(&self) -> Vec { - match self { - EventName::ObjectCreatedAll => vec![ - EventName::ObjectCreatedPut, - EventName::ObjectCreatedPost, - EventName::ObjectCreatedCopy, - EventName::ObjectCreatedCompleteMultipartUpload, - ], - EventName::ObjectRemovedAll => vec![EventName::ObjectRemovedDelete, EventName::ObjectRemovedDeleteMarkerCreated], - EventName::ObjectAccessedAll => vec![EventName::ObjectAccessedGet, EventName::ObjectAccessedHead], - EventName::ObjectRestoredAll => vec![EventName::ObjectRestoredPost, EventName::ObjectRestoredCompleted], - EventName::ReplicationAll => vec![EventName::ReplicationFailed, EventName::ReplicationComplete], - _ => vec![*self], - } - } -} - -impl FromStr for EventName { - type Err = Error; - - fn from_str(s: &str) -> Result { - match s { - "s3:ObjectCreated:*" => Ok(EventName::ObjectCreatedAll), - "s3:ObjectCreated:Put" => Ok(EventName::ObjectCreatedPut), - "s3:ObjectCreated:Post" => Ok(EventName::ObjectCreatedPost), - "s3:ObjectCreated:Copy" => Ok(EventName::ObjectCreatedCopy), - "s3:ObjectCreated:CompleteMultipartUpload" => Ok(EventName::ObjectCreatedCompleteMultipartUpload), - "s3:ObjectRemoved:*" => Ok(EventName::ObjectRemovedAll), - "s3:ObjectRemoved:Delete" => Ok(EventName::ObjectRemovedDelete), - "s3:ObjectRemoved:DeleteMarkerCreated" => Ok(EventName::ObjectRemovedDeleteMarkerCreated), - "s3:ObjectAccessed:*" => Ok(EventName::ObjectAccessedAll), - "s3:ObjectAccessed:Get" => Ok(EventName::ObjectAccessedGet), - "s3:ObjectAccessed:Head" => Ok(EventName::ObjectAccessedHead), - "s3:ObjectRestored:*" => Ok(EventName::ObjectRestoredAll), - "s3:ObjectRestored:Post" => Ok(EventName::ObjectRestoredPost), - "s3:ObjectRestored:Completed" => Ok(EventName::ObjectRestoredCompleted), - "s3:Replication:*" => Ok(EventName::ReplicationAll), - "s3:Replication:Failed" => Ok(EventName::ReplicationFailed), - "s3:Replication:Complete" => Ok(EventName::ReplicationComplete), - _ => Err(Error::InvalidEventName(format!("Unrecognized event name: {}", s))), - } - } -} diff --git a/crates/event-notifier/src/global.rs b/crates/event-notifier/src/global.rs new file mode 100644 index 00000000..a14380b3 --- /dev/null +++ b/crates/event-notifier/src/global.rs @@ -0,0 +1,100 @@ +use crate::{ChannelAdapter, Error, Event, NotificationConfig, NotificationSystem}; +use std::sync::Arc; +use tokio::sync::{Mutex, OnceCell}; + +static GLOBAL_SYSTEM: OnceCell>> = OnceCell::const_new(); +static INITIALIZED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); + +/// initialize the global notification system +pub async fn initialize(config: NotificationConfig) -> Result<(), Error> { + if INITIALIZED.swap(true, std::sync::atomic::Ordering::SeqCst) { + return Err(Error::custom("notify the system has been initialized")); + } + + let system = Arc::new(Mutex::new(NotificationSystem::new(config).await?)); + GLOBAL_SYSTEM + .set(system) + .map_err(|_| Error::custom("unable to set up global notification system"))?; + Ok(()) +} + +/// start the global notification system +pub async fn start(adapters: Vec>) -> Result<(), Error> { + let system = get_system().await?; + + // create a new task to run the system + let system_clone = Arc::clone(&system); + tokio::spawn(async move { + let mut system_guard = system_clone.lock().await; + if let Err(e) = system_guard.start(adapters).await { + tracing::error!("notify the system to start failed: {}", e); + } + }); + + Ok(()) +} + +/// Initialize and start the global notification system +/// +/// This method combines the functions of `initialize` and `start` to provide one-step setup: +/// - initialize system configuration +/// - create an adapter +/// - start event listening +/// +/// # Example +/// +/// ```rust +/// use rustfs_event_notifier::{initialize_and_start, NotificationConfig}; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), rustfs_event_notifier::Error> { +/// let config = NotificationConfig { +/// store_path: "./events".to_string(), +/// channel_capacity: 100, +/// adapters: vec![/* 适配器配置 */], +/// http: Default::default(), +/// }; +/// +/// // complete initialization and startup in one step +/// initialize_and_start(config).await?; +/// Ok(()) +/// } +/// ``` +pub async fn initialize_and_start(config: NotificationConfig) -> Result<(), Error> { + // initialize the system first + initialize(config.clone()).await?; + + // create an adapter + let adapters = crate::create_adapters(&config.adapters).expect("failed to create adapters"); + + // start the system + start(adapters).await?; + + Ok(()) +} + +/// send events to notification system +pub async fn send_event(event: Event) -> Result<(), Error> { + let system = get_system().await?; + let system_guard = system.lock().await; + system_guard.send_event(event).await +} + +/// turn off the notification system +pub fn shutdown() -> Result<(), Error> { + if let Some(system) = GLOBAL_SYSTEM.get() { + let system_guard = system.blocking_lock(); + system_guard.shutdown(); + Ok(()) + } else { + Err(Error::custom("notification system not initialized")) + } +} + +/// get system instance +async fn get_system() -> Result>, Error> { + GLOBAL_SYSTEM + .get() + .cloned() + .ok_or_else(|| Error::custom("notification system not initialized")) +} diff --git a/crates/event-notifier/src/lib.rs b/crates/event-notifier/src/lib.rs index fbfc9f86..9ba226c6 100644 --- a/crates/event-notifier/src/lib.rs +++ b/crates/event-notifier/src/lib.rs @@ -1,80 +1,131 @@ -/// RustFS Event Notifier -/// This crate provides a simple event notification system for RustFS. -/// It allows for the registration of event handlers and the triggering of events. -/// +mod adapter; +mod bus; +mod config; mod error; mod event; -mod event_name; -mod notifier; -mod rules; -mod stats; -mod target; -mod target_entry; +mod global; +mod producer; +mod store; -pub fn add(left: u64, right: u64) -> u64 { - left + right +pub use adapter::create_adapters; +#[cfg(feature = "kafka")] +pub use adapter::kafka::KafkaAdapter; +#[cfg(feature = "mqtt")] +pub use adapter::mqtt::MqttAdapter; +#[cfg(feature = "webhook")] +pub use adapter::webhook::WebhookAdapter; +pub use adapter::ChannelAdapter; +pub use bus::event_bus; +#[cfg(feature = "http-producer")] +pub use config::HttpProducerConfig; +#[cfg(feature = "kafka")] +pub use config::KafkaConfig; +#[cfg(feature = "mqtt")] +pub use config::MqttConfig; +#[cfg(feature = "webhook")] +pub use config::WebhookConfig; +pub use config::{AdapterConfig, NotificationConfig}; +pub use error::Error; + +pub use event::{Bucket, Event, EventBuilder, Identity, Log, Metadata, Name, Object, Source}; +pub use global::{initialize, initialize_and_start, send_event, shutdown, start}; +pub use store::EventStore; + +#[cfg(feature = "http-producer")] +pub use producer::http::HttpProducer; +#[cfg(feature = "http-producer")] +pub use producer::EventProducer; + +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +/// The `NotificationSystem` struct represents the notification system. +/// It manages the event bus and the adapters. +/// It is responsible for sending and receiving events. +/// It also handles the shutdown process. +pub struct NotificationSystem { + tx: mpsc::Sender, + rx: Option>, + store: Arc, + shutdown: CancellationToken, + #[cfg(feature = "http-producer")] + http_config: HttpProducerConfig, } -#[cfg(test)] -mod tests { - use super::*; - use crate::event::{Bucket, Event, Identity, Metadata, Object, Source}; - use crate::target::{TargetID, TargetList}; - use std::collections::HashMap; +impl NotificationSystem { + /// Creates a new `NotificationSystem` instance. + pub async fn new(config: NotificationConfig) -> Result { + let (tx, rx) = mpsc::channel::(config.channel_capacity); + let store = Arc::new(EventStore::new(&config.store_path).await?); + let shutdown = CancellationToken::new(); - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); + let restored_logs = store.load_logs().await?; + for log in restored_logs { + for event in log.records { + // For example, where the send method may return a SendError when calling it + tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?; + } + } + + Ok(Self { + tx, + rx: Some(rx), + store, + shutdown, + #[cfg(feature = "http-producer")] + http_config: config.http, + }) } - #[tokio::main] - async fn main() { - let target_list = TargetList::new(); - let event = Event { - event_version: "1.0".to_string(), - event_source: "aws:s3".to_string(), - aws_region: "us-west-2".to_string(), - event_time: "2023-10-01T12:00:00Z".to_string(), - event_name: "PutObject".to_string(), - user_identity: Identity { - principal_id: "user123".to_string(), + /// Starts the notification system. + /// It initializes the event bus and the producer. + pub async fn start(&mut self, adapters: Vec>) -> Result<(), Error> { + let rx = self.rx.take().ok_or_else(|| Error::EventBusStarted)?; + + let shutdown_clone = self.shutdown.clone(); + let store_clone = self.store.clone(); + let bus_handle = tokio::spawn(async move { + if let Err(e) = event_bus(rx, adapters, store_clone, shutdown_clone).await { + tracing::error!("Event bus failed: {}", e); + } + }); + + #[cfg(feature = "http-producer")] + { + let producer = HttpProducer::new(self.tx.clone(), self.http_config.port); + producer.start().await?; + } + + tokio::select! { + result = bus_handle => { + result.map_err(Error::JoinError)?; + Ok(()) }, - request_parameters: HashMap::new(), - response_elements: HashMap::new(), - s3: Metadata { - schema_version: "1.0".to_string(), - configuration_id: "config123".to_string(), - bucket: Bucket { - name: "my-bucket".to_string(), - owner_identity: Identity { - principal_id: "owner123".to_string(), - }, - arn: "arn:aws:s3:::my-bucket".to_string(), - }, - object: Object { - key: "my-object.txt".to_string(), - version_id: Some("version123".to_string()), - sequencer: "seq123".to_string(), - size: Some(1024), - etag: Some("etag123".to_string()), - content_type: Some("text/plain".to_string()), - user_metadata: HashMap::new(), - }, - }, - source: Source { - host: "localhost".to_string(), - user_agent: "RustFS/1.0".to_string(), - }, - }; - let target_ids: &[TargetID] = &["".to_string()]; - // 发送事件 - let results = target_list.send(event, &target_ids).await; - println!("result len:{:?}", results.len()); - // 获取统计信息 - let stats = target_list.get_stats(); - for (id, stat) in stats { - println!("Target {}: {} events, {} failed", id, stat.total_events, stat.failed_events); + _ = self.shutdown.cancelled() => { + tracing::info!("System shutdown triggered"); + Ok(()) + } } } + + /// Sends an event to the notification system. + /// This method is used to send events to the event bus. + pub async fn send_event(&self, event: Event) -> Result<(), Error> { + self.tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?; + Ok(()) + } + + /// Shuts down the notification system. + /// This method is used to cancel the event bus and producer tasks. + pub fn shutdown(&self) { + self.shutdown.cancel(); + } + + /// Sets the HTTP port for the notification system. + /// This method is used to change the port for the HTTP producer. + #[cfg(feature = "http-producer")] + pub fn set_http_port(&mut self, port: u16) { + self.http_config.port = port; + } } diff --git a/crates/event-notifier/src/notifier.rs b/crates/event-notifier/src/notifier.rs deleted file mode 100644 index 7b81a24d..00000000 --- a/crates/event-notifier/src/notifier.rs +++ /dev/null @@ -1,47 +0,0 @@ -// src/notifier.rs -use crate::error::Error; -use crate::event::Event; -use crate::rules::{RulesMap, TargetIDSet}; -use crate::target::TargetList; -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; -use tokio::sync::broadcast; - -pub struct EventNotifier { - target_list: Arc, - rules: RwLock>, - tx: broadcast::Sender, -} - -impl EventNotifier { - pub fn new(capacity: usize) -> Self { - let (tx, _) = broadcast::channel(capacity); - Self { - target_list: Arc::new(TargetList::new()), - rules: RwLock::new(HashMap::new()), - tx, - } - } - - pub async fn send(&self, event: Event) -> Result<(), Error> { - let rules = self - .rules - .read() - .map_err(|_| Error::StoreError("Failed to read rules".to_string()))?; - // 检查规则匹配 - let target_ids = if let Some(rules_map) = rules.get(&event.s3.bucket.name) { - rules_map.match_targets(event.s3.event_name, &event.s3.object.key) - } else { - TargetIDSet::new() - }; - - // 发送事件 - if !target_ids.is_empty() { - self.target_list.send(event.clone(), &target_ids).await; - } - - // 广播给监听者 - let _ = self.tx.send(event); - Ok(()) - } -} diff --git a/crates/event-notifier/src/producer.rs b/crates/event-notifier/src/producer.rs new file mode 100644 index 00000000..fc53b946 --- /dev/null +++ b/crates/event-notifier/src/producer.rs @@ -0,0 +1,83 @@ +use crate::Error; +use crate::Event; +use async_trait::async_trait; + +/// event producer characteristics +#[allow(dead_code)] +#[async_trait] +pub trait EventProducer: Send + Sync { + /// start producer services + async fn start(&self) -> Result<(), Error>; + /// stop producer services + async fn stop(&self) -> Result<(), Error>; + /// send a single event + async fn send_event(&self, event: Event) -> Result<(), Error>; +} + +#[cfg(feature = "http-producer")] +pub mod http { + use super::*; + use axum::{routing::post, Json, Router}; + use std::sync::Arc; + use tokio::sync::mpsc; + + #[derive(Clone)] + pub struct HttpProducer { + tx: mpsc::Sender, + port: u16, + shutdown: Arc, + } + + impl HttpProducer { + pub fn new(tx: mpsc::Sender, port: u16) -> Self { + Self { + tx, + port, + shutdown: Arc::new(tokio::sync::Notify::new()), + } + } + } + + #[async_trait] + impl EventProducer for HttpProducer { + async fn start(&self) -> Result<(), Error> { + let producer = self.clone(); + let app = Router::new().route( + "/event", + post(move |event| { + let prod = producer.clone(); + async move { handle_event(event, prod).await } + }), + ); + + let addr = format!("0.0.0.0:{}", self.port); + let listener = tokio::net::TcpListener::bind(&addr).await?; + + let shutdown = self.shutdown.clone(); + tokio::select! { + result = axum::serve(listener, app) => { + result?; + Ok(()) + } + _ = shutdown.notified() => Ok(()) + } + } + + async fn stop(&self) -> Result<(), Error> { + self.shutdown.notify_one(); + Ok(()) + } + + async fn send_event(&self, event: Event) -> Result<(), Error> { + self.tx.send(event).await.map_err(|e| Error::ChannelSend(Box::new(e)))?; + Ok(()) + } + } + + async fn handle_event(Json(event): Json, producer: HttpProducer) -> Result<(), axum::http::StatusCode> { + producer + .send_event(event) + .await + .map_err(|_| axum::http::StatusCode::INTERNAL_SERVER_ERROR) + } +} diff --git a/crates/event-notifier/src/rules.rs b/crates/event-notifier/src/rules.rs deleted file mode 100644 index 671bcfb5..00000000 --- a/crates/event-notifier/src/rules.rs +++ /dev/null @@ -1,191 +0,0 @@ -// src/rules.rs -use crate::event_name::EventName; -use crate::target::TargetID; -use std::collections::{HashMap, HashSet}; -use wildmatch::WildMatch; - -/// TargetIDSet represents a set of target IDs -#[derive(Debug, Clone, Default)] -pub struct TargetIDSet(HashSet); - -impl TargetIDSet { - pub fn new() -> Self { - Self(HashSet::new()) - } - - pub fn insert(&mut self, target_id: TargetID) -> bool { - self.0.insert(target_id) - } - - pub fn contains(&self, target_id: &TargetID) -> bool { - self.0.contains(target_id) - } - - pub fn remove(&mut self, target_id: &TargetID) -> bool { - self.0.remove(target_id) - } - - pub fn len(&self) -> usize { - self.0.len() - } - - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } - - pub fn clear(&mut self) { - self.0.clear() - } - - pub fn iter(&self) -> impl Iterator { - self.0.iter() - } - - pub fn extend>(&mut self, iter: I) { - self.0.extend(iter) - } - - pub fn union(&self, other: &TargetIDSet) -> TargetIDSet { - let mut result = self.clone(); - result.extend(other.0.iter().cloned()); - result - } - - pub fn difference(&self, other: &TargetIDSet) -> TargetIDSet { - let mut result = TargetIDSet::new(); - for id in self.0.iter() { - if !other.contains(id) { - result.insert(id.clone()); - } - } - result - } -} - -impl FromIterator for TargetIDSet { - fn from_iter>(iter: I) -> Self { - let mut set = TargetIDSet::new(); - set.extend(iter); - set - } -} - -#[derive(Debug, Clone)] -pub struct Rules { - patterns: HashMap, -} - -impl Rules { - pub fn new() -> Self { - Self { - patterns: HashMap::new(), - } - } - - pub fn add(&mut self, pattern: String, target_id: TargetID) { - let entry = self.patterns.entry(pattern).or_insert_with(TargetIDSet::new); - entry.insert(target_id); - } - - pub fn match_object(&self, object_name: &str) -> bool { - for pattern in self.patterns.keys() { - if WildMatch::new(pattern).matches(object_name) { - return true; - } - } - false - } - - pub fn match_targets(&self, object_name: &str) -> TargetIDSet { - let mut target_ids = TargetIDSet::new(); - - for (pattern, ids) in &self.patterns { - if WildMatch::new(pattern).matches(object_name) { - target_ids.extend(ids.iter().cloned()); - } - } - - target_ids - } -} - -#[derive(Debug, Default)] -pub struct RulesMap { - rules: HashMap, -} - -impl RulesMap { - pub fn new() -> Self { - Self { rules: HashMap::new() } - } - - pub fn add(&mut self, events: &[EventName], pattern: String, target_id: TargetID) { - for &event in events { - for expanded_event in event.expand() { - let rules = self.rules.entry(expanded_event).or_insert_with(Rules::new); - rules.add(pattern.clone(), target_id.clone()); - } - } - } - - pub fn match_simple(&self, event: EventName, object_name: &str) -> bool { - match self.rules.get(&event) { - Some(rules) => rules.match_object(object_name), - None => false, - } - } - - pub fn match_targets(&self, event: EventName, object_name: &str) -> TargetIDSet { - match self.rules.get(&event) { - Some(rules) => rules.match_targets(object_name), - None => TargetIDSet::new(), - } - } -} - -impl Clone for RulesMap { - fn clone(&self) -> Self { - Self { - rules: self.rules.clone(), - } - } -} - -impl Default for Rules { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::event_name::EventName; - - #[test] - fn test_rules() { - let mut rules = Rules::new(); - rules.add("*.txt".to_string(), "target1".to_string()); - rules.add("*.jpg".to_string(), "target2".to_string()); - - assert!(rules.match_object("file.txt")); - assert!(!rules.match_object("file.pdf")); - assert!(rules.match_object("image.jpg")); - assert!(!rules.match_object("image.png")); - - let target_ids = rules.match_targets("file.txt"); - assert_eq!(target_ids.len(), 1); - assert!(target_ids.iter().any(|id| id == "target1")); - - let mut rules_map = RulesMap::new(); - let target_id = "target1".to_string(); - // 添加规则 - rules_map.add(&[EventName::ObjectCreatedAll], String::from("images/*"), target_id); - - // 匹配对象 - let matches = rules_map.match_simple(EventName::ObjectCreatedPut, "images/photo.jpg"); // returns true - - // 获取匹配的目标 - let target_ids = rules_map.match_targets(EventName::ObjectCreatedPut, "images/photo.jpg"); - } -} diff --git a/crates/event-notifier/src/stats.rs b/crates/event-notifier/src/stats.rs deleted file mode 100644 index 570bbeca..00000000 --- a/crates/event-notifier/src/stats.rs +++ /dev/null @@ -1,81 +0,0 @@ -// src/stats.rs -use crate::target::TargetID; -use parking_lot::RwLock; -use std::collections::HashMap; -use std::sync::atomic::{AtomicI64, Ordering}; -use std::sync::Arc; - -#[derive(Debug, Default)] -pub struct TargetStat { - pub current_send_calls: AtomicI64, - pub total_events: AtomicI64, - pub failed_events: AtomicI64, - pub current_queue: AtomicI64, -} - -impl TargetStat { - pub fn inc_send_calls(&self) { - self.current_send_calls.fetch_add(1, Ordering::Relaxed); - } - - pub fn dec_send_calls(&self) { - self.current_send_calls.fetch_sub(1, Ordering::Relaxed); - } - - pub fn inc_total_events(&self) { - self.total_events.fetch_add(1, Ordering::Relaxed); - } - - pub fn inc_failed_events(&self) { - self.failed_events.fetch_add(1, Ordering::Relaxed); - } - - pub fn set_queue_size(&self, size: i64) { - self.current_queue.store(size, Ordering::Relaxed); - } -} - -#[derive(Default)] -pub struct TargetStats { - stats: RwLock>>, -} - -impl TargetStats { - pub fn new() -> Self { - Self::default() - } - - pub fn get_or_create(&self, id: &TargetID) -> Arc { - let mut stats = self.stats.write(); - stats - .entry(id.clone()) - .or_insert_with(|| Arc::new(TargetStat::default())) - .clone() - } - - pub fn get_stats(&self) -> HashMap { - self.stats - .read() - .iter() - .map(|(id, stat)| { - ( - id.clone(), - TargetSnapshot { - current_send_calls: stat.current_send_calls.load(Ordering::Relaxed), - total_events: stat.total_events.load(Ordering::Relaxed), - failed_events: stat.failed_events.load(Ordering::Relaxed), - current_queue: stat.current_queue.load(Ordering::Relaxed), - }, - ) - }) - .collect() - } -} - -#[derive(Debug, Clone)] -pub struct TargetSnapshot { - pub current_send_calls: i64, - pub total_events: i64, - pub failed_events: i64, - pub current_queue: i64, -} diff --git a/crates/event-notifier/src/store.rs b/crates/event-notifier/src/store.rs new file mode 100644 index 00000000..995cdc83 --- /dev/null +++ b/crates/event-notifier/src/store.rs @@ -0,0 +1,53 @@ +use crate::Error; +use crate::Log; +use chrono::Utc; +use std::sync::Arc; +use tokio::fs::{create_dir_all, File, OpenOptions}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::sync::RwLock; + +/// `EventStore` is a struct that manages the storage of event logs. +pub struct EventStore { + path: String, + lock: Arc>, +} + +impl EventStore { + pub async fn new(path: &str) -> Result { + create_dir_all(path).await?; + Ok(Self { + path: path.to_string(), + lock: Arc::new(RwLock::new(())), + }) + } + + pub async fn save_logs(&self, logs: &[Log]) -> Result<(), Error> { + let _guard = self.lock.write().await; + let file_path = format!("{}/events_{}.jsonl", self.path, Utc::now().timestamp()); + let file = OpenOptions::new().create(true).append(true).open(&file_path).await?; + let mut writer = BufWriter::new(file); + for log in logs { + let line = serde_json::to_string(log)?; + writer.write_all(line.as_bytes()).await?; + writer.write_all(b"\n").await?; + } + writer.flush().await?; + Ok(()) + } + + pub async fn load_logs(&self) -> Result, Error> { + let _guard = self.lock.read().await; + let mut logs = Vec::new(); + let mut entries = tokio::fs::read_dir(&self.path).await?; + while let Some(entry) = entries.next_entry().await? { + let file = File::open(entry.path()).await?; + let reader = BufReader::new(file); + let mut lines = reader.lines(); + while let Some(line) = lines.next_line().await? { + let log: Log = serde_json::from_str(&line)?; + logs.push(log); + } + } + Ok(logs) + } +} diff --git a/crates/event-notifier/src/target.rs b/crates/event-notifier/src/target.rs deleted file mode 100644 index 68b575b7..00000000 --- a/crates/event-notifier/src/target.rs +++ /dev/null @@ -1,66 +0,0 @@ -// src/target.rs -use crate::error::{Error, Result}; -use crate::event::Event; -use crate::stats::{TargetSnapshot, TargetStats}; -use async_trait::async_trait; -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; - -pub type TargetID = String; - -#[async_trait] -pub trait Target: Send + Sync { - fn id(&self) -> TargetID; - async fn send(&self, event: Event) -> Result<()>; - async fn is_active(&self) -> bool; - async fn close(&self) -> Result<()>; -} - -pub struct TargetList { - targets: Arc>>>, - stats: Arc, -} - -impl TargetList { - pub fn new() -> Self { - Self { - targets: Arc::new(RwLock::new(HashMap::new())), - stats: Arc::default(), - } - } - - pub async fn send(&self, event: Event, target_ids: &[TargetID]) -> Vec> { - let mut results = Vec::with_capacity(target_ids.len()); - let targets = self.targets.read().unwrap(); - - for id in target_ids { - let target = match targets.get(id) { - Some(t) => t, - None => { - results.push(Err(Error::TargetNotFound(id.to_string()))); - continue; - } - }; - - let stats = self.stats.get_or_create(id); - stats.inc_send_calls(); - - let result = target.send(event.clone()).await; - - stats.dec_send_calls(); - stats.inc_total_events(); - - if result.is_err() { - stats.inc_failed_events(); - } - - results.push(result); - } - - results - } - - pub fn get_stats(&self) -> HashMap { - self.stats.get_stats() - } -} diff --git a/crates/event-notifier/src/target_entry/mod.rs b/crates/event-notifier/src/target_entry/mod.rs deleted file mode 100644 index a2a1c275..00000000 --- a/crates/event-notifier/src/target_entry/mod.rs +++ /dev/null @@ -1 +0,0 @@ -mod webhook; diff --git a/crates/event-notifier/src/target_entry/webhook.rs b/crates/event-notifier/src/target_entry/webhook.rs deleted file mode 100644 index 1736ad36..00000000 --- a/crates/event-notifier/src/target_entry/webhook.rs +++ /dev/null @@ -1,161 +0,0 @@ -use crate::event::Event; -use crate::target::{Target, TargetID}; -use async_trait::async_trait; -use reqwest::{header, Client}; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; -use thiserror::Error; -use tokio::io::AsyncWriteExt; -use tokio::sync::Mutex; - -#[derive(Error, Debug)] -pub enum WebhookError { - #[error("HTTP client error: {0}")] - HttpClient(#[from] reqwest::Error), - #[error("JSON serialization error: {0}")] - Json(#[from] serde_json::Error), - #[error("Store error: {0}")] - Store(#[from] Box), - #[error("Webhook request failed after retries")] - RequestFailed, -} - -type Result = std::result::Result; - -/// Webhook 目标配置 - -/// Webhook 目标配置 -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct WebhookConfig { - /// Webhook endpoint URL - pub endpoint: String, - /// 认证令牌 - pub auth_token: Option, - /// 自定义请求头 - pub custom_headers: Option>, - /// 重试次数 - pub max_retries: u32, - /// 连接超时时间 (秒) - pub timeout: u64, -} - -/// Webhook Target 实现 -pub struct Webhook { - /// 目标 ID - id: TargetID, - /// HTTP 客户端 - client: Client, - /// 配置信息 - config: WebhookConfig, - /// 事件存储 - store: Arc>, -} - -#[async_trait] -impl Target for Webhook { - /// 返回目标 ID - fn id(&self) -> &TargetID { - &self.id - } - - /// 检查 Webhook 是否可用 - async fn is_active(&self) -> Result { - // 发送测试请求验证连接 - let resp = self.client.get(&self.config.endpoint).send().await?; - - Ok(resp.status().is_success()) - } - - /// 保存事件到存储并异步发送 - async fn save(&self, event: Event) -> Result<()> { - // 序列化事件 - let event_json = serde_json::to_string(&event)?; - - // 存储事件 - self.store.lock().await.save(event_json)?; - - // 异步发送 - tokio::spawn(self.send_event(event)); - - Ok(()) - } - - /// 从存储发送事件 - async fn send(&self, key: StoreKey) -> Result<()> { - // 从存储获取事件 - let event_json = self.store.lock().await.get(&key)?; - let event: Event = serde_json::from_str(&event_json)?; - - // 发送事件到 webhook - self.send_event(event).await?; - - // 发送成功后删除 - self.store.lock().await.delete(&key)?; - - Ok(()) - } - - /// 返回事件存储 - fn store(&self) -> Arc> { - self.store.clone() - } - - /// 关闭 Target - async fn close(&self) -> Result<()> { - // 等待所有事件发送完成 - self.store.lock().await.flush()?; - Ok(()) - } -} - -impl Webhook { - /// 创建新的 Webhook Target - pub fn new(id: TargetID, config: WebhookConfig, store: Arc>) -> Result { - // 构建 HTTP 客户端 - let client = Client::builder().timeout(Duration::from_secs(config.timeout)).build()?; - - Ok(Self { - id, - client, - config, - store, - }) - } - - /// 发送事件到 Webhook endpoint - async fn send_event(&self, event: Event) -> Result<()> { - let mut retries = 0; - loop { - // 构建请求 - let mut req = self.client.post(&self.config.endpoint).json(&event); - - // 添加认证头 - if let Some(token) = &self.config.auth_token { - req = req.header(header::AUTHORIZATION, format!("Bearer {}", token)); - } - - // 添加自定义头 - if let Some(headers) = &self.config.custom_headers { - for (key, value) in headers { - req = req.header(key, value); - } - } - - // 发送请求 - match req.send().await { - Ok(resp) if resp.status().is_success() => { - return Ok(()); - } - _ if retries < self.config.max_retries => { - retries += 1; - tokio::time::sleep(Duration::from_secs(1)).await; - continue; - } - Err(e) => return Err(WebhookError::HttpClient(e)), - _ => return Err(WebhookError::RequestFailed), - } - } - } -} diff --git a/crates/event-notifier/tests/integration.rs b/crates/event-notifier/tests/integration.rs new file mode 100644 index 00000000..2829b8b9 --- /dev/null +++ b/crates/event-notifier/tests/integration.rs @@ -0,0 +1,160 @@ +use rustfs_event_notifier::{AdapterConfig, NotificationSystem, WebhookConfig}; +use rustfs_event_notifier::{Bucket, Event, EventBuilder, Identity, Metadata, Name, Object, Source}; +use rustfs_event_notifier::{ChannelAdapter, WebhookAdapter}; +use std::collections::HashMap; +use std::sync::Arc; + +#[tokio::test] +async fn test_webhook_adapter() { + let adapter = WebhookAdapter::new(WebhookConfig { + endpoint: "http://localhost:8080/webhook".to_string(), + auth_token: None, + custom_headers: None, + max_retries: 1, + timeout: 5, + }); + + // create an s3 metadata object + let metadata = Metadata { + schema_version: "1.0".to_string(), + configuration_id: "test-config".to_string(), + bucket: Bucket { + name: "my-bucket".to_string(), + owner_identity: Identity { + principal_id: "owner123".to_string(), + }, + arn: "arn:aws:s3:::my-bucket".to_string(), + }, + object: Object { + key: "test.txt".to_string(), + size: Some(1024), + etag: Some("abc123".to_string()), + content_type: Some("text/plain".to_string()), + user_metadata: None, + version_id: None, + sequencer: "1234567890".to_string(), + }, + }; + + // create source object + let source = Source { + host: "localhost".to_string(), + port: "80".to_string(), + user_agent: "curl/7.68.0".to_string(), + }; + + // Create events using builder mode + let event = Event::builder() + .event_version("2.0") + .event_source("aws:s3") + .aws_region("us-east-1") + .event_time("2023-10-01T12:00:00.000Z") + .event_name(Name::ObjectCreatedPut) + .user_identity(Identity { + principal_id: "user123".to_string(), + }) + .request_parameters(HashMap::new()) + .response_elements(HashMap::new()) + .s3(metadata) + .source(source) + .channels(vec!["webhook".to_string()]) + .build() + .expect("failed to create event"); + + let result = adapter.send(&event).await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn test_notification_system() { + let config = rustfs_event_notifier::NotificationConfig { + store_path: "./test_events".to_string(), + channel_capacity: 100, + adapters: vec![AdapterConfig::Webhook(WebhookConfig { + endpoint: "http://localhost:8080/webhook".to_string(), + auth_token: None, + custom_headers: None, + max_retries: 1, + timeout: 5, + })], + http: Default::default(), + }; + let system = Arc::new(tokio::sync::Mutex::new(NotificationSystem::new(config.clone()).await.unwrap())); + let adapters: Vec> = vec![Arc::new(WebhookAdapter::new(WebhookConfig { + endpoint: "http://localhost:8080/webhook".to_string(), + auth_token: None, + custom_headers: None, + max_retries: 1, + timeout: 5, + }))]; + + // create an s3 metadata object + let metadata = Metadata { + schema_version: "1.0".to_string(), + configuration_id: "test-config".to_string(), + bucket: Bucket { + name: "my-bucket".to_string(), + owner_identity: Identity { + principal_id: "owner123".to_string(), + }, + arn: "arn:aws:s3:::my-bucket".to_string(), + }, + object: Object { + key: "test.txt".to_string(), + size: Some(1024), + etag: Some("abc123".to_string()), + content_type: Some("text/plain".to_string()), + user_metadata: None, + version_id: None, + sequencer: "1234567890".to_string(), + }, + }; + + // create source object + let source = Source { + host: "localhost".to_string(), + port: "80".to_string(), + user_agent: "curl/7.68.0".to_string(), + }; + + // create a preconfigured builder with objects + let event = EventBuilder::for_object_creation(metadata, source) + .user_identity(Identity { + principal_id: "user123".to_string(), + }) + .event_time("2023-10-01T12:00:00.000Z") + .channels(vec!["webhook".to_string()]) + .build() + .expect("failed to create event"); + + { + let system_lock = system.lock().await; + system_lock.send_event(event).await.unwrap(); + } + + let system_clone = Arc::clone(&system); + let system_handle = tokio::spawn(async move { + let mut system = system_clone.lock().await; + system.start(adapters).await + }); + + // set 10 seconds timeout + match tokio::time::timeout(std::time::Duration::from_secs(10), system_handle).await { + Ok(result) => { + println!("System started successfully"); + assert!(result.is_ok()); + } + Err(_) => { + println!("System operation timed out, forcing shutdown"); + // create a new task to handle the timeout + let system = Arc::clone(&system); + tokio::spawn(async move { + if let Ok(guard) = system.try_lock() { + guard.shutdown(); + } + }); + // give the system some time to clean up resources + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } +} diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index e2d08815..57da6e9d 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -60,7 +60,7 @@ s3s.workspace = true serde.workspace = true serde_json.workspace = true serde_urlencoded = { workspace = true } -shadow-rs.workspace = true +shadow-rs = { workspace = true, features = ["build"] } tracing.workspace = true time = { workspace = true, features = ["parsing", "formatting", "serde"] } tokio-util.workspace = true @@ -103,5 +103,5 @@ hyper-util = { workspace = true, features = [ ] } transform-stream = { workspace = true } netif = "0.1.6" -shadow-rs.workspace = true +shadow-rs = { workspace = true, features = ["build"] } # pin-utils = "0.1.0" diff --git a/rustfs/src/config/mod.rs b/rustfs/src/config/mod.rs index 1efac71f..e877ebad 100644 --- a/rustfs/src/config/mod.rs +++ b/rustfs/src/config/mod.rs @@ -103,6 +103,10 @@ pub struct Opt { #[arg(long, env = "RUSTFS_LICENSE")] pub license: Option, + + /// event notifier config file + #[arg(long, env = "RUSTFS_EVENT_CONFIG")] + pub event_config: Option, } // lazy_static::lazy_static! {