From 4abfc9f554cb708c9fe1d909b1ee154f5662ec57 Mon Sep 17 00:00:00 2001 From: houseme Date: Fri, 19 Dec 2025 12:07:07 +0800 Subject: [PATCH] Fix/fix event 1216 (#1191) Signed-off-by: loverustfs Co-authored-by: loverustfs --- .github/workflows/ci.yml | 10 +- .github/workflows/e2e-mint.yml | 20 +- .github/workflows/e2e-s3tests.yml | 18 +- .gitignore | 3 + Cargo.lock | 163 ++++-- Cargo.toml | 6 +- crates/audit/Cargo.toml | 1 + crates/audit/src/factory.rs | 223 ++++++++ crates/audit/src/lib.rs | 1 + crates/audit/src/registry.rs | 506 ++++++++---------- crates/audit/src/system.rs | 91 +++- crates/audit/tests/integration_test.rs | 4 +- crates/audit/tests/performance_test.rs | 4 +- crates/audit/tests/system_integration_test.rs | 4 +- crates/common/src/globals.rs | 18 +- crates/config/src/audit/mod.rs | 2 +- crates/config/src/constants/env.rs | 3 +- crates/config/src/notify/mod.rs | 24 +- crates/config/src/notify/store.rs | 4 +- crates/ecstore/src/admin_server_info.rs | 4 +- crates/ecstore/src/config/audit.rs | 6 +- crates/ecstore/src/config/notify.rs | 6 +- crates/ecstore/src/metrics_realtime.rs | 6 +- crates/ecstore/src/sets.rs | 4 +- crates/ecstore/src/store.rs | 12 +- crates/notify/Cargo.toml | 1 + crates/notify/examples/webhook.rs | 13 +- crates/notify/src/factory.rs | 12 +- crates/notify/src/integration.rs | 14 +- crates/notify/src/registry.rs | 22 +- crates/protos/src/lib.rs | 6 +- crates/targets/src/event_name.rs | 2 +- crates/targets/src/target/mqtt.rs | 17 +- crates/targets/src/target/webhook.rs | 13 +- rustfs/src/main.rs | 3 +- rustfs/src/server/audit.rs | 5 +- rustfs/src/storage/ecfs.rs | 1 + rustfs/src/storage/tonic_service.rs | 14 +- scripts/run.sh | 53 +- 39 files changed, 828 insertions(+), 491 deletions(-) create mode 100644 crates/audit/src/factory.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9d36100c..3c7e7662 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,7 +69,7 @@ concurrency: env: CARGO_TERM_COLOR: always RUST_BACKTRACE: 1 - CARGO_BUILD_JOBS: 4 + CARGO_BUILD_JOBS: 8 jobs: @@ -78,7 +78,7 @@ jobs: permissions: actions: write contents: read - runs-on: ubicloud-standard-4 + runs-on: ubicloud-standard-4 outputs: should_skip: ${{ steps.skip_check.outputs.should_skip }} steps: @@ -95,7 +95,7 @@ jobs: name: Typos runs-on: ubicloud-standard-4 steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - uses: dtolnay/rust-toolchain@stable - name: Typos check with custom config file uses: crate-ci/typos@master @@ -108,7 +108,7 @@ jobs: timeout-minutes: 60 steps: - name: Checkout repository - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Setup Rust environment uses: ./.github/actions/setup @@ -140,7 +140,7 @@ jobs: timeout-minutes: 30 steps: - name: Checkout repository - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Clean up previous test run run: | diff --git a/.github/workflows/e2e-mint.yml b/.github/workflows/e2e-mint.yml index 0baf7f49..5923cfde 100644 --- a/.github/workflows/e2e-mint.yml +++ b/.github/workflows/e2e-mint.yml @@ -1,8 +1,22 @@ +# Copyright 2024 RustFS Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + name: e2e-mint on: push: - branches: [main] + branches: [ main ] paths: - ".github/workflows/e2e-mint.yml" - "Dockerfile.source" @@ -27,7 +41,7 @@ jobs: timeout-minutes: 40 steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Enable buildx uses: docker/setup-buildx-action@v3 @@ -104,7 +118,7 @@ jobs: timeout-minutes: 60 steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v6 - name: Enable buildx uses: docker/setup-buildx-action@v3 diff --git a/.github/workflows/e2e-s3tests.yml b/.github/workflows/e2e-s3tests.yml index dcf99bf8..e29d13aa 100644 --- a/.github/workflows/e2e-s3tests.yml +++ b/.github/workflows/e2e-s3tests.yml @@ -1,3 +1,17 @@ +# Copyright 2024 RustFS Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + name: e2e-s3tests on: @@ -47,7 +61,7 @@ jobs: runs-on: ubicloud-standard-4 timeout-minutes: 120 steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - name: Enable buildx uses: docker/setup-buildx-action@v3 @@ -201,7 +215,7 @@ jobs: runs-on: ubicloud-standard-4 timeout-minutes: 150 steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - name: Enable buildx uses: docker/setup-buildx-action@v3 diff --git a/.gitignore b/.gitignore index c5218d5f..d0139ca6 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,6 @@ deploy/logs/*.log.* /s3-tests-local/ /s3tests.conf /s3tests.conf.* +*.events +*.audit +*.snappy \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index c88576d7..7ada333c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -644,9 +644,9 @@ dependencies = [ [[package]] name = "aws-lc-rs" -version = "1.15.1" +version = "1.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b5ce75405893cd713f9ab8e297d8e438f624dde7d706108285f7e17a25a180f" +checksum = "6a88aab2464f1f25453baa7a07c84c5b7684e274054ba06817f382357f77a288" dependencies = [ "aws-lc-sys", "zeroize", @@ -654,9 +654,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.34.0" +version = "0.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "179c3777a8b5e70e90ea426114ffc565b2c1a9f82f6c4a0c5a34aa6ef5e781b6" +checksum = "b45afffdee1e7c9126814751f88dddc747f41d91da16c9551a0f1e8a11e788a1" dependencies = [ "cc", "cmake", @@ -914,9 +914,9 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.61.8" +version = "0.61.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6864c190cbb8e30cf4b77b2c8f3b6dfffa697a09b7218d2f7cd3d4c4065a9f7" +checksum = "49fa1213db31ac95288d981476f78d05d9cbb0353d22cdf3472cc05bb02f6551" dependencies = [ "aws-smithy-types", ] @@ -942,9 +942,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.9.5" +version = "1.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a392db6c583ea4a912538afb86b7be7c5d8887d91604f50eb55c262ee1b4a5f5" +checksum = "65fda37911905ea4d3141a01364bc5509a0f32ae3f3b22d6e330c0abfb62d247" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1337,9 +1337,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.19.0" +version = "3.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" [[package]] name = "bytemuck" @@ -1633,9 +1633,9 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "cmake" -version = "0.1.56" +version = "0.1.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b042e5d8a74ae91bb0961acd039822472ec99f8ab0948cbf6d1369588f8be586" +checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" dependencies = [ "cc", ] @@ -2082,6 +2082,16 @@ dependencies = [ "darling_macro 0.21.3", ] +[[package]] +name = "darling" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ae13da2f202d56bd7f91c25fba009e7717a1e4a1cc98a76d844b65ae912e9d" +dependencies = [ + "darling_core 0.23.0", + "darling_macro 0.23.0", +] + [[package]] name = "darling_core" version = "0.14.4" @@ -2124,6 +2134,19 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "darling_core" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9865a50f7c335f53564bb694ef660825eb8610e0a53d3e11bf1b0d3df31e03b0" +dependencies = [ + "ident_case", + "proc-macro2", + "quote", + "strsim 0.11.1", + "syn 2.0.111", +] + [[package]] name = "darling_macro" version = "0.14.4" @@ -2157,6 +2180,17 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "darling_macro" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3984ec7bd6cfa798e62b4a642426a5be0e68f9401cfc2a01e3fa9ea2fcdb8d" +dependencies = [ + "darling_core 0.23.0", + "quote", + "syn 2.0.111", +] + [[package]] name = "dashmap" version = "6.1.0" @@ -2997,7 +3031,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3267,7 +3301,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -4292,7 +4326,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.1", "system-configuration", "tokio", "tower-service", @@ -4312,7 +4346,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core", + "windows-core 0.62.2", ] [[package]] @@ -4572,7 +4606,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -4797,13 +4831,13 @@ dependencies = [ [[package]] name = "libredox" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" +checksum = "df15f6eac291ed1cf25865b1ee60399f57e7c227e7f51bdbd4c5270396a9ed50" dependencies = [ "bitflags 2.10.0", "libc", - "redox_syscall", + "redox_syscall 0.6.0", ] [[package]] @@ -5260,7 +5294,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5698,7 +5732,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.18", "smallvec", "windows-link 0.2.1", ] @@ -5758,9 +5792,9 @@ checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" [[package]] name = "pastey" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57d6c094ee800037dff99e02cab0eaf3142826586742a270ab3d7a62656bd27a" +checksum = "b867cad97c0791bbd3aaa6472142568c6c9e8f71937e98379f584cfb0cf35bec" [[package]] name = "path-absolutize" @@ -6187,7 +6221,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" dependencies = [ - "toml_edit 0.23.9", + "toml_edit 0.23.10+spec-1.0.0", ] [[package]] @@ -6422,7 +6456,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.35", - "socket2 0.5.10", + "socket2 0.6.1", "thiserror 2.0.17", "tokio", "tracing", @@ -6459,9 +6493,9 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.1", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.60.2", ] [[package]] @@ -6614,6 +6648,15 @@ dependencies = [ "bitflags 2.10.0", ] +[[package]] +name = "redox_syscall" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec96166dafa0886eb81fe1c0a388bece180fbef2135f97c1e2cf8302e74b43b5" +dependencies = [ + "bitflags 2.10.0", +] + [[package]] name = "redox_users" version = "0.5.2" @@ -6791,9 +6834,9 @@ dependencies = [ [[package]] name = "rmcp" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5df440eaa43f8573491ed4a5899719b6d29099500774abba12214a095a4083ed" +checksum = "528d42f8176e6e5e71ea69182b17d1d0a19a6b3b894b564678b74cd7cab13cfa" dependencies = [ "async-trait", "base64", @@ -6813,11 +6856,11 @@ dependencies = [ [[package]] name = "rmcp-macros" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef03779cccab8337dd8617c53fce5c98ec21794febc397531555472ca28f8c3" +checksum = "e3f81daaa494eb8e985c9462f7d6ce1ab05e5299f48aafd76cdd3d8b060e6f59" dependencies = [ - "darling 0.21.3", + "darling 0.23.0", "proc-macro2", "quote", "serde_json", @@ -7126,6 +7169,7 @@ dependencies = [ name = "rustfs-audit" version = "0.0.5" dependencies = [ + "async-trait", "chrono", "const-str", "futures", @@ -7732,7 +7776,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -7786,9 +7830,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.13.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c" +checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282" dependencies = [ "web-time", "zeroize", @@ -8852,7 +8896,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix 1.1.2", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -9165,9 +9209,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.7.3" +version = "0.7.5+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533" +checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347" dependencies = [ "serde_core", ] @@ -9188,21 +9232,21 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.23.9" +version = "0.23.10+spec-1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d7cbc3b4b49633d57a0509303158ca50de80ae32c265093b24c414705807832" +checksum = "84c8b9f757e028cee9fa244aea147aab2a9ec09d5325a9b01e0a49730c2b5269" dependencies = [ "indexmap 2.12.1", - "toml_datetime 0.7.3", + "toml_datetime 0.7.5+spec-1.1.0", "toml_parser", "winnow", ] [[package]] name = "toml_parser" -version = "1.0.4" +version = "1.0.6+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" +checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44" dependencies = [ "winnow", ] @@ -9342,9 +9386,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "log", "pin-project-lite", @@ -9377,9 +9421,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.35" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", "valuable", @@ -9868,7 +9912,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -9884,7 +9928,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" dependencies = [ "windows-collections", - "windows-core", + "windows-core 0.61.2", "windows-future", "windows-link 0.1.3", "windows-numerics", @@ -9896,7 +9940,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" dependencies = [ - "windows-core", + "windows-core 0.61.2", ] [[package]] @@ -9912,13 +9956,26 @@ dependencies = [ "windows-strings 0.4.2", ] +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link 0.2.1", + "windows-result 0.4.1", + "windows-strings 0.5.1", +] + [[package]] name = "windows-future" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" dependencies = [ - "windows-core", + "windows-core 0.61.2", "windows-link 0.1.3", "windows-threading", ] @@ -9963,7 +10020,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" dependencies = [ - "windows-core", + "windows-core 0.61.2", "windows-link 0.1.3", ] diff --git a/Cargo.toml b/Cargo.toml index 33f24fe5..a93368d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,7 +130,7 @@ flatbuffers = "25.9.23" form_urlencoded = "1.2.2" prost = "0.14.1" quick-xml = "0.38.4" -rmcp = { version = "0.11.0" } +rmcp = { version = "0.12.0" } rmp = { version = "0.8.14" } rmp-serde = { version = "1.3.0" } serde = { version = "1.0.228", features = ["derive"] } @@ -150,7 +150,7 @@ pbkdf2 = "0.13.0-rc.5" rsa = { version = "0.10.0-rc.10" } rustls = { version = "0.23.35", features = ["ring", "logging", "std", "tls12"], default-features = false } rustls-pemfile = "2.2.0" -rustls-pki-types = "1.13.1" +rustls-pki-types = "1.13.2" sha1 = "0.11.0-rc.3" sha2 = "0.11.0-rc.3" subtle = "2.6" @@ -238,7 +238,7 @@ temp-env = "0.3.6" tempfile = "3.23.0" test-case = "3.3.1" thiserror = "2.0.17" -tracing = { version = "0.1.43" } +tracing = { version = "0.1.44" } tracing-appender = "0.2.4" tracing-error = "0.2.1" tracing-opentelemetry = "0.32.0" diff --git a/crates/audit/Cargo.toml b/crates/audit/Cargo.toml index 414e05fc..ae97033e 100644 --- a/crates/audit/Cargo.toml +++ b/crates/audit/Cargo.toml @@ -29,6 +29,7 @@ categories = ["web-programming", "development-tools", "asynchronous", "api-bindi rustfs-targets = { workspace = true } rustfs-config = { workspace = true, features = ["audit", "constants"] } rustfs-ecstore = { workspace = true } +async-trait = { workspace = true } chrono = { workspace = true } const-str = { workspace = true } futures = { workspace = true } diff --git a/crates/audit/src/factory.rs b/crates/audit/src/factory.rs new file mode 100644 index 00000000..ea8cd9b9 --- /dev/null +++ b/crates/audit/src/factory.rs @@ -0,0 +1,223 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::AuditEntry; +use async_trait::async_trait; +use hashbrown::HashSet; +use rumqttc::QoS; +use rustfs_config::audit::{AUDIT_MQTT_KEYS, AUDIT_WEBHOOK_KEYS, ENV_AUDIT_MQTT_KEYS, ENV_AUDIT_WEBHOOK_KEYS}; +use rustfs_config::{ + AUDIT_DEFAULT_DIR, DEFAULT_LIMIT, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, + MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CERT, + WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, +}; +use rustfs_ecstore::config::KVS; +use rustfs_targets::{ + Target, + error::TargetError, + target::{mqtt::MQTTArgs, webhook::WebhookArgs}, +}; +use std::time::Duration; +use tracing::{debug, warn}; +use url::Url; + +/// Trait for creating targets from configuration +#[async_trait] +pub trait TargetFactory: Send + Sync { + /// Creates a target from configuration + async fn create_target(&self, id: String, config: &KVS) -> Result + Send + Sync>, TargetError>; + + /// Validates target configuration + fn validate_config(&self, id: &str, config: &KVS) -> Result<(), TargetError>; + + /// Returns a set of valid configuration field names for this target type. + /// This is used to filter environment variables. + fn get_valid_fields(&self) -> HashSet; + + /// Returns a set of valid configuration env field names for this target type. + /// This is used to filter environment variables. + fn get_valid_env_fields(&self) -> HashSet; +} + +/// Factory for creating Webhook targets +pub struct WebhookTargetFactory; + +#[async_trait] +impl TargetFactory for WebhookTargetFactory { + async fn create_target(&self, id: String, config: &KVS) -> Result + Send + Sync>, TargetError> { + // All config values are now read directly from the merged `config` KVS. + let endpoint = config + .lookup(WEBHOOK_ENDPOINT) + .ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?; + let endpoint_url = Url::parse(&endpoint) + .map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {e} (value: '{endpoint}')")))?; + + let args = WebhookArgs { + enable: true, // If we are here, it's already enabled. + endpoint: endpoint_url, + auth_token: config.lookup(WEBHOOK_AUTH_TOKEN).unwrap_or_default(), + queue_dir: config.lookup(WEBHOOK_QUEUE_DIR).unwrap_or(AUDIT_DEFAULT_DIR.to_string()), + queue_limit: config + .lookup(WEBHOOK_QUEUE_LIMIT) + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_LIMIT), + client_cert: config.lookup(WEBHOOK_CLIENT_CERT).unwrap_or_default(), + client_key: config.lookup(WEBHOOK_CLIENT_KEY).unwrap_or_default(), + target_type: rustfs_targets::target::TargetType::AuditLog, + }; + + let target = rustfs_targets::target::webhook::WebhookTarget::new(id, args)?; + Ok(Box::new(target)) + } + + fn validate_config(&self, _id: &str, config: &KVS) -> Result<(), TargetError> { + // Validation also uses the merged `config` KVS directly. + let endpoint = config + .lookup(WEBHOOK_ENDPOINT) + .ok_or_else(|| TargetError::Configuration("Missing webhook endpoint".to_string()))?; + debug!("endpoint: {}", endpoint); + let parsed_endpoint = endpoint.trim(); + Url::parse(parsed_endpoint) + .map_err(|e| TargetError::Configuration(format!("Invalid endpoint URL: {e} (value: '{parsed_endpoint}')")))?; + + let client_cert = config.lookup(WEBHOOK_CLIENT_CERT).unwrap_or_default(); + let client_key = config.lookup(WEBHOOK_CLIENT_KEY).unwrap_or_default(); + + if client_cert.is_empty() != client_key.is_empty() { + return Err(TargetError::Configuration( + "Both client_cert and client_key must be specified together".to_string(), + )); + } + + let queue_dir = config.lookup(WEBHOOK_QUEUE_DIR).unwrap_or(AUDIT_DEFAULT_DIR.to_string()); + if !queue_dir.is_empty() && !std::path::Path::new(&queue_dir).is_absolute() { + return Err(TargetError::Configuration("Webhook queue directory must be an absolute path".to_string())); + } + + Ok(()) + } + + fn get_valid_fields(&self) -> HashSet { + AUDIT_WEBHOOK_KEYS.iter().map(|s| s.to_string()).collect() + } + + fn get_valid_env_fields(&self) -> HashSet { + ENV_AUDIT_WEBHOOK_KEYS.iter().map(|s| s.to_string()).collect() + } +} + +/// Factory for creating MQTT targets +pub struct MQTTTargetFactory; + +#[async_trait] +impl TargetFactory for MQTTTargetFactory { + async fn create_target(&self, id: String, config: &KVS) -> Result + Send + Sync>, TargetError> { + let broker = config + .lookup(MQTT_BROKER) + .ok_or_else(|| TargetError::Configuration("Missing MQTT broker".to_string()))?; + let broker_url = Url::parse(&broker) + .map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {e} (value: '{broker}')")))?; + + let topic = config + .lookup(MQTT_TOPIC) + .ok_or_else(|| TargetError::Configuration("Missing MQTT topic".to_string()))?; + + let args = MQTTArgs { + enable: true, // Assumed enabled. + broker: broker_url, + topic, + qos: config + .lookup(MQTT_QOS) + .and_then(|v| v.parse::().ok()) + .map(|q| match q { + 0 => QoS::AtMostOnce, + 1 => QoS::AtLeastOnce, + 2 => QoS::ExactlyOnce, + _ => QoS::AtLeastOnce, + }) + .unwrap_or(QoS::AtLeastOnce), + username: config.lookup(MQTT_USERNAME).unwrap_or_default(), + password: config.lookup(MQTT_PASSWORD).unwrap_or_default(), + max_reconnect_interval: config + .lookup(MQTT_RECONNECT_INTERVAL) + .and_then(|v| v.parse::().ok()) + .map(Duration::from_secs) + .unwrap_or_else(|| Duration::from_secs(5)), + keep_alive: config + .lookup(MQTT_KEEP_ALIVE_INTERVAL) + .and_then(|v| v.parse::().ok()) + .map(Duration::from_secs) + .unwrap_or_else(|| Duration::from_secs(30)), + queue_dir: config.lookup(MQTT_QUEUE_DIR).unwrap_or(AUDIT_DEFAULT_DIR.to_string()), + queue_limit: config + .lookup(MQTT_QUEUE_LIMIT) + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_LIMIT), + target_type: rustfs_targets::target::TargetType::AuditLog, + }; + + let target = rustfs_targets::target::mqtt::MQTTTarget::new(id, args)?; + Ok(Box::new(target)) + } + + fn validate_config(&self, _id: &str, config: &KVS) -> Result<(), TargetError> { + let broker = config + .lookup(MQTT_BROKER) + .ok_or_else(|| TargetError::Configuration("Missing MQTT broker".to_string()))?; + let url = Url::parse(&broker) + .map_err(|e| TargetError::Configuration(format!("Invalid broker URL: {e} (value: '{broker}')")))?; + + match url.scheme() { + "tcp" | "ssl" | "ws" | "wss" | "mqtt" | "mqtts" => {} + _ => { + return Err(TargetError::Configuration("Unsupported broker URL scheme".to_string())); + } + } + + if config.lookup(MQTT_TOPIC).is_none() { + return Err(TargetError::Configuration("Missing MQTT topic".to_string())); + } + + if let Some(qos_str) = config.lookup(MQTT_QOS) { + let qos = qos_str + .parse::() + .map_err(|_| TargetError::Configuration("Invalid QoS value".to_string()))?; + if qos > 2 { + return Err(TargetError::Configuration("QoS must be 0, 1, or 2".to_string())); + } + } + + let queue_dir = config.lookup(MQTT_QUEUE_DIR).unwrap_or_default(); + if !queue_dir.is_empty() { + if !std::path::Path::new(&queue_dir).is_absolute() { + return Err(TargetError::Configuration("MQTT queue directory must be an absolute path".to_string())); + } + if let Some(qos_str) = config.lookup(MQTT_QOS) { + if qos_str == "0" { + warn!("Using queue_dir with QoS 0 may result in event loss"); + } + } + } + + Ok(()) + } + + fn get_valid_fields(&self) -> HashSet { + AUDIT_MQTT_KEYS.iter().map(|s| s.to_string()).collect() + } + + fn get_valid_env_fields(&self) -> HashSet { + ENV_AUDIT_MQTT_KEYS.iter().map(|s| s.to_string()).collect() + } +} diff --git a/crates/audit/src/lib.rs b/crates/audit/src/lib.rs index 8207bc23..7cca0063 100644 --- a/crates/audit/src/lib.rs +++ b/crates/audit/src/lib.rs @@ -20,6 +20,7 @@ pub mod entity; pub mod error; +pub mod factory; pub mod global; pub mod observability; pub mod registry; diff --git a/crates/audit/src/registry.rs b/crates/audit/src/registry.rs index 30aa325a..c73b300a 100644 --- a/crates/audit/src/registry.rs +++ b/crates/audit/src/registry.rs @@ -12,29 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{AuditEntry, AuditError, AuditResult}; -use futures::{StreamExt, stream::FuturesUnordered}; +use crate::{ + AuditEntry, AuditError, AuditResult, + factory::{MQTTTargetFactory, TargetFactory, WebhookTargetFactory}, +}; +use futures::StreamExt; +use futures::stream::FuturesUnordered; use hashbrown::{HashMap, HashSet}; -use rustfs_config::{ - DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, - MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_BATCH_SIZE, - WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_HTTP_TIMEOUT, WEBHOOK_MAX_RETRY, WEBHOOK_QUEUE_DIR, - WEBHOOK_QUEUE_LIMIT, WEBHOOK_RETRY_INTERVAL, audit::AUDIT_ROUTE_PREFIX, -}; +use rustfs_config::{DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX, EnableState, audit::AUDIT_ROUTE_PREFIX}; use rustfs_ecstore::config::{Config, KVS}; -use rustfs_targets::{ - Target, TargetError, - target::{ChannelTargetType, TargetType, mqtt::MQTTArgs, webhook::WebhookArgs}, -}; +use rustfs_targets::{Target, TargetError, target::ChannelTargetType}; +use std::str::FromStr; use std::sync::Arc; -use std::time::Duration; use tracing::{debug, error, info, warn}; -use url::Url; /// Registry for managing audit targets pub struct AuditRegistry { /// Storage for created targets targets: HashMap + Send + Sync>>, + /// Factories for creating targets + factories: HashMap>, } impl Default for AuditRegistry { @@ -46,162 +43,207 @@ impl Default for AuditRegistry { impl AuditRegistry { /// Creates a new AuditRegistry pub fn new() -> Self { - Self { targets: HashMap::new() } + let mut registry = AuditRegistry { + factories: HashMap::new(), + targets: HashMap::new(), + }; + + // Register built-in factories + registry.register(ChannelTargetType::Webhook.as_str(), Box::new(WebhookTargetFactory)); + registry.register(ChannelTargetType::Mqtt.as_str(), Box::new(MQTTTargetFactory)); + + registry } - /// Creates all audit targets from system configuration and environment variables. + /// Registers a new factory for a target type + /// + /// # Arguments + /// * `target_type` - The type of the target (e.g., "webhook", "mqtt"). + /// * `factory` - The factory instance to create targets of this type. + pub fn register(&mut self, target_type: &str, factory: Box) { + self.factories.insert(target_type.to_string(), factory); + } + + /// Creates a target of the specified type with the given ID and configuration + /// + /// # Arguments + /// * `target_type` - The type of the target (e.g., "webhook", "mqtt"). + /// * `id` - The identifier for the target instance. + /// * `config` - The configuration key-value store for the target. + /// + /// # Returns + /// * `Result + Send + Sync>, TargetError>` - The created target or an error. + pub async fn create_target( + &self, + target_type: &str, + id: String, + config: &KVS, + ) -> Result + Send + Sync>, TargetError> { + let factory = self + .factories + .get(target_type) + .ok_or_else(|| TargetError::Configuration(format!("Unknown target type: {target_type}")))?; + + // Validate configuration before creating target + factory.validate_config(&id, config)?; + + // Create target + factory.create_target(id, config).await + } + + /// Creates all targets from a configuration + /// Create all notification targets from system configuration and environment variables. /// This method processes the creation of each target concurrently as follows: - /// 1. Iterate through supported target types (webhook, mqtt). - /// 2. For each type, resolve its configuration from file and environment variables. + /// 1. Iterate through all registered target types (e.g. webhooks, mqtt). + /// 2. For each type, resolve its configuration in the configuration file and environment variables. /// 3. Identify all target instance IDs that need to be created. - /// 4. Merge configurations with precedence: ENV > file instance > file default. - /// 5. Create async tasks for enabled instances. - /// 6. Execute tasks concurrently and collect successful targets. - /// 7. Persist successful configurations back to system storage. - pub async fn create_targets_from_config( - &mut self, + /// 4. Combine the default configuration, file configuration, and environment variable configuration for each instance. + /// 5. If the instance is enabled, create an asynchronous task for it to instantiate. + /// 6. Concurrency executes all creation tasks and collects results. + pub async fn create_audit_targets_from_config( + &self, config: &Config, ) -> AuditResult + Send + Sync>>> { // Collect only environment variables with the relevant prefix to reduce memory usage let all_env: Vec<(String, String)> = std::env::vars().filter(|(key, _)| key.starts_with(ENV_PREFIX)).collect(); - // A collection of asynchronous tasks for concurrently executing target creation let mut tasks = FuturesUnordered::new(); - // let final_config = config.clone(); - + // let final_config = config.clone(); // Clone a configuration for aggregating the final result // Record the defaults for each segment so that the segment can eventually be rebuilt let mut section_defaults: HashMap = HashMap::new(); - - // Supported target types for audit - let target_types = vec![ChannelTargetType::Webhook.as_str(), ChannelTargetType::Mqtt.as_str()]; - - // 1. Traverse all target types and process them - for target_type in target_types { - let span = tracing::Span::current(); - span.record("target_type", target_type); - info!(target_type = %target_type, "Starting audit target type processing"); + // 1. Traverse all registered plants and process them by target type + for (target_type, factory) in &self.factories { + tracing::Span::current().record("target_type", target_type.as_str()); + info!("Start working on target types..."); // 2. Prepare the configuration source + // 2.1. Get the configuration segment in the file, e.g. 'audit_webhook' let section_name = format!("{AUDIT_ROUTE_PREFIX}{target_type}").to_lowercase(); let file_configs = config.0.get(§ion_name).cloned().unwrap_or_default(); + // 2.2. Get the default configuration for that type let default_cfg = file_configs.get(DEFAULT_DELIMITER).cloned().unwrap_or_default(); - debug!(?default_cfg, "Retrieved default configuration"); + debug!(?default_cfg, "Get the default configuration"); // Save defaults for eventual write back section_defaults.insert(section_name.clone(), default_cfg.clone()); - // Get valid fields for the target type - let valid_fields = match target_type { - "webhook" => get_webhook_valid_fields(), - "mqtt" => get_mqtt_valid_fields(), - _ => { - warn!(target_type = %target_type, "Unknown target type, skipping"); - continue; - } - }; - debug!(?valid_fields, "Retrieved valid configuration fields"); + // *** Optimization point 1: Get all legitimate fields of the current target type *** + let valid_fields = factory.get_valid_fields(); + debug!(?valid_fields, "Get the legitimate configuration fields"); // 3. Resolve instance IDs and configuration overrides from environment variables let mut instance_ids_from_env = HashSet::new(); - let mut env_overrides: HashMap> = HashMap::new(); - - for (env_key, env_value) in &all_env { - let audit_prefix = format!("{ENV_PREFIX}{AUDIT_ROUTE_PREFIX}{target_type}").to_uppercase(); - if !env_key.starts_with(&audit_prefix) { - continue; - } - - let suffix = &env_key[audit_prefix.len()..]; - if suffix.is_empty() { - continue; - } - - // Parse field and instance from suffix (FIELD_INSTANCE or FIELD) - let (field_name, instance_id) = if let Some(last_underscore) = suffix.rfind('_') { - let potential_field = &suffix[1..last_underscore]; // Skip leading _ - let potential_instance = &suffix[last_underscore + 1..]; - - // Check if the part before the last underscore is a valid field - if valid_fields.contains(&potential_field.to_lowercase()) { - (potential_field.to_lowercase(), potential_instance.to_lowercase()) - } else { - // Treat the entire suffix as field name with default instance - (suffix[1..].to_lowercase(), DEFAULT_DELIMITER.to_string()) + // 3.1. Instance discovery: Based on the '..._ENABLE_INSTANCEID' format + let enable_prefix = + format!("{ENV_PREFIX}{AUDIT_ROUTE_PREFIX}{target_type}{DEFAULT_DELIMITER}{ENABLE_KEY}{DEFAULT_DELIMITER}") + .to_uppercase(); + for (key, value) in &all_env { + if EnableState::from_str(value).ok().map(|s| s.is_enabled()).unwrap_or(false) { + if let Some(id) = key.strip_prefix(&enable_prefix) { + if !id.is_empty() { + instance_ids_from_env.insert(id.to_lowercase()); + } } - } else { - // No underscore, treat as field with default instance - (suffix[1..].to_lowercase(), DEFAULT_DELIMITER.to_string()) - }; - - if valid_fields.contains(&field_name) { - if instance_id != DEFAULT_DELIMITER { - instance_ids_from_env.insert(instance_id.clone()); - } - env_overrides - .entry(instance_id) - .or_default() - .insert(field_name, env_value.clone()); - } else { - debug!( - env_key = %env_key, - field_name = %field_name, - "Ignoring environment variable field not found in valid fields for target type {}", - target_type - ); } } - debug!(?env_overrides, "Completed environment variable analysis"); + + // 3.2. Parse all relevant environment variable configurations + // 3.2.1. Build environment variable prefixes such as 'RUSTFS_AUDIT_WEBHOOK_' + let env_prefix = format!("{ENV_PREFIX}{AUDIT_ROUTE_PREFIX}{target_type}{DEFAULT_DELIMITER}").to_uppercase(); + // 3.2.2. 'env_overrides' is used to store configurations parsed from environment variables in the format: {instance id -> {field -> value}} + let mut env_overrides: HashMap> = HashMap::new(); + for (key, value) in &all_env { + if let Some(rest) = key.strip_prefix(&env_prefix) { + // Use rsplitn to split from the right side to properly extract the INSTANCE_ID at the end + // Format: _ or + let mut parts = rest.rsplitn(2, DEFAULT_DELIMITER); + + // The first part from the right is INSTANCE_ID + let instance_id_part = parts.next().unwrap_or(DEFAULT_DELIMITER); + // The remaining part is FIELD_NAME + let field_name_part = parts.next(); + + let (field_name, instance_id) = match field_name_part { + // Case 1: The format is _ + // e.g., rest = "ENDPOINT_PRIMARY" -> field_name="ENDPOINT", instance_id="PRIMARY" + Some(field) => (field.to_lowercase(), instance_id_part.to_lowercase()), + // Case 2: The format is (without INSTANCE_ID) + // e.g., rest = "ENABLE" -> field_name="ENABLE", instance_id="" (Universal configuration `_ DEFAULT_DELIMITER`) + None => (instance_id_part.to_lowercase(), DEFAULT_DELIMITER.to_string()), + }; + + // *** Optimization point 2: Verify whether the parsed field_name is legal *** + if !field_name.is_empty() && valid_fields.contains(&field_name) { + debug!( + instance_id = %if instance_id.is_empty() { DEFAULT_DELIMITER } else { &instance_id }, + %field_name, + %value, + "Parsing to environment variables" + ); + env_overrides + .entry(instance_id) + .or_default() + .insert(field_name, value.clone()); + } else { + // Ignore illegal field names + warn!( + field_name = %field_name, + "Ignore environment variable fields, not found in the list of valid fields for target type {}", + target_type + ); + } + } + } + debug!(?env_overrides, "Complete the environment variable analysis"); // 4. Determine all instance IDs that need to be processed let mut all_instance_ids: HashSet = file_configs.keys().filter(|k| *k != DEFAULT_DELIMITER).cloned().collect(); all_instance_ids.extend(instance_ids_from_env); - debug!(?all_instance_ids, "Determined all instance IDs"); + debug!(?all_instance_ids, "Determine all instance IDs"); // 5. Merge configurations and create tasks for each instance for id in all_instance_ids { - // 5.1. Merge configuration, priority: Environment variables > File instance > File default + // 5.1. Merge configuration, priority: Environment variables > File instance configuration > File default configuration let mut merged_config = default_cfg.clone(); - - // Apply file instance configuration if available + // Instance-specific configuration in application files if let Some(file_instance_cfg) = file_configs.get(&id) { merged_config.extend(file_instance_cfg.clone()); } - - // Apply environment variable overrides + // Application instance-specific environment variable configuration if let Some(env_instance_cfg) = env_overrides.get(&id) { + // Convert HashMap to KVS let mut kvs_from_env = KVS::new(); for (k, v) in env_instance_cfg { kvs_from_env.insert(k.clone(), v.clone()); } merged_config.extend(kvs_from_env); } - debug!(instance_id = %id, ?merged_config, "Completed configuration merge"); + debug!(instance_id = %id, ?merged_config, "Complete configuration merge"); // 5.2. Check if the instance is enabled let enabled = merged_config .lookup(ENABLE_KEY) - .map(|v| parse_enable_value(&v)) + .map(|v| { + EnableState::from_str(v.as_str()) + .ok() + .map(|s| s.is_enabled()) + .unwrap_or(false) + }) .unwrap_or(false); if enabled { - info!(instance_id = %id, "Creating audit target"); - - // Create task for concurrent execution - let target_type_clone = target_type.to_string(); - let id_clone = id.clone(); - let merged_config_arc = Arc::new(merged_config.clone()); - let task = tokio::spawn(async move { - let result = create_audit_target(&target_type_clone, &id_clone, &merged_config_arc).await; - (target_type_clone, id_clone, result, merged_config_arc) + info!(instance_id = %id, "Target is enabled, ready to create a task"); + // 5.3. Create asynchronous tasks for enabled instances + let target_type_clone = target_type.clone(); + let tid = id.clone(); + let merged_config_arc = Arc::new(merged_config); + tasks.push(async move { + let result = factory.create_target(tid.clone(), &merged_config_arc).await; + (target_type_clone, tid, result, Arc::clone(&merged_config_arc)) }); - - tasks.push(task); - - // Update final config with successful instance - // final_config.0.entry(section_name.clone()).or_default().insert(id, merged_config); } else { - info!(instance_id = %id, "Skipping disabled audit target, will be removed from final configuration"); + info!(instance_id = %id, "Skip the disabled target and will be removed from the final configuration"); // Remove disabled target from final configuration // final_config.0.entry(section_name.clone()).or_default().remove(&id); } @@ -211,30 +253,28 @@ impl AuditRegistry { // 6. Concurrently execute all creation tasks and collect results let mut successful_targets = Vec::new(); let mut successful_configs = Vec::new(); - while let Some(task_result) = tasks.next().await { - match task_result { - Ok((target_type, id, result, kvs_arc)) => match result { - Ok(target) => { - info!(target_type = %target_type, instance_id = %id, "Created audit target successfully"); - successful_targets.push(target); - successful_configs.push((target_type, id, kvs_arc)); - } - Err(e) => { - error!(target_type = %target_type, instance_id = %id, error = %e, "Failed to create audit target"); - } - }, + while let Some((target_type, id, result, final_config)) = tasks.next().await { + match result { + Ok(target) => { + info!(target_type = %target_type, instance_id = %id, "Create a target successfully"); + successful_targets.push(target); + successful_configs.push((target_type, id, final_config)); + } Err(e) => { - error!(error = %e, "Task execution failed"); + error!(target_type = %target_type, instance_id = %id, error = %e, "Failed to create a target"); } } } - // Rebuild in pieces based on "default items + successful instances" and overwrite writeback to ensure that deleted/disabled instances will not be "resurrected" + // 7. Aggregate new configuration and write back to system configuration if !successful_configs.is_empty() || !section_defaults.is_empty() { - info!("Prepare to rebuild and save target configurations to the system configuration..."); + info!( + "Prepare to update {} successfully created target configurations to the system configuration...", + successful_configs.len() + ); - // Aggregate successful instances into segments let mut successes_by_section: HashMap> = HashMap::new(); + for (target_type, id, kvs) in successful_configs { let section_name = format!("{AUDIT_ROUTE_PREFIX}{target_type}").to_lowercase(); successes_by_section @@ -244,76 +284,99 @@ impl AuditRegistry { } let mut new_config = config.clone(); - // Collection of segments that need to be processed: Collect all segments where default items exist or where successful instances exist let mut sections: HashSet = HashSet::new(); sections.extend(section_defaults.keys().cloned()); sections.extend(successes_by_section.keys().cloned()); - for section_name in sections { + for section in sections { let mut section_map: std::collections::HashMap = std::collections::HashMap::new(); - - // The default entry (if present) is written back to `_` - if let Some(default_cfg) = section_defaults.get(§ion_name) { - if !default_cfg.is_empty() { - section_map.insert(DEFAULT_DELIMITER.to_string(), default_cfg.clone()); + // Add default item + if let Some(default_kvs) = section_defaults.get(§ion) { + if !default_kvs.is_empty() { + section_map.insert(DEFAULT_DELIMITER.to_string(), default_kvs.clone()); } } - // Successful instance write back - if let Some(instances) = successes_by_section.get(§ion_name) { + // Add successful instance item + if let Some(instances) = successes_by_section.get(§ion) { for (id, kvs) in instances { section_map.insert(id.clone(), kvs.clone()); } } - // Empty segments are removed and non-empty segments are replaced as a whole. + // Empty breaks are removed and non-empty breaks are replaced entirely. if section_map.is_empty() { - new_config.0.remove(§ion_name); + new_config.0.remove(§ion); } else { - new_config.0.insert(section_name, section_map); + new_config.0.insert(section, section_map); } } - // 7. Save the new configuration to the system - let Some(store) = rustfs_ecstore::new_object_layer_fn() else { + let Some(store) = rustfs_ecstore::global::new_object_layer_fn() else { return Err(AuditError::StorageNotAvailable( "Failed to save target configuration: server storage not initialized".to_string(), )); }; match rustfs_ecstore::config::com::save_server_config(store, &new_config).await { - Ok(_) => info!("New audit configuration saved to system successfully"), + Ok(_) => { + info!("The new configuration was saved to the system successfully.") + } Err(e) => { - error!(error = %e, "Failed to save new audit configuration"); + error!("Failed to save the new configuration: {}", e); return Err(AuditError::SaveConfig(Box::new(e))); } } } + + info!(count = successful_targets.len(), "All target processing completed"); Ok(successful_targets) } /// Adds a target to the registry + /// + /// # Arguments + /// * `id` - The identifier for the target. + /// * `target` - The target instance to be added. pub fn add_target(&mut self, id: String, target: Box + Send + Sync>) { self.targets.insert(id, target); } /// Removes a target from the registry + /// + /// # Arguments + /// * `id` - The identifier for the target to be removed. + /// + /// # Returns + /// * `Option + Send + Sync>>` - The removed target if it existed. pub fn remove_target(&mut self, id: &str) -> Option + Send + Sync>> { self.targets.remove(id) } /// Gets a target from the registry + /// + /// # Arguments + /// * `id` - The identifier for the target to be retrieved. + /// + /// # Returns + /// * `Option<&(dyn Target + Send + Sync)>` - The target if it exists. pub fn get_target(&self, id: &str) -> Option<&(dyn Target + Send + Sync)> { self.targets.get(id).map(|t| t.as_ref()) } /// Lists all target IDs + /// + /// # Returns + /// * `Vec` - A vector of all target IDs in the registry. pub fn list_targets(&self) -> Vec { self.targets.keys().cloned().collect() } /// Closes all targets and clears the registry + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure. pub async fn close_all(&mut self) -> AuditResult<()> { let mut errors = Vec::new(); @@ -331,152 +394,3 @@ impl AuditRegistry { Ok(()) } } - -/// Creates an audit target based on type and configuration -async fn create_audit_target( - target_type: &str, - id: &str, - config: &KVS, -) -> Result + Send + Sync>, TargetError> { - match target_type { - val if val == ChannelTargetType::Webhook.as_str() => { - let args = parse_webhook_args(id, config)?; - let target = rustfs_targets::target::webhook::WebhookTarget::new(id.to_string(), args)?; - Ok(Box::new(target)) - } - val if val == ChannelTargetType::Mqtt.as_str() => { - let args = parse_mqtt_args(id, config)?; - let target = rustfs_targets::target::mqtt::MQTTTarget::new(id.to_string(), args)?; - Ok(Box::new(target)) - } - _ => Err(TargetError::Configuration(format!("Unknown target type: {target_type}"))), - } -} - -/// Gets valid field names for webhook configuration -fn get_webhook_valid_fields() -> HashSet { - vec![ - ENABLE_KEY.to_string(), - WEBHOOK_ENDPOINT.to_string(), - WEBHOOK_AUTH_TOKEN.to_string(), - WEBHOOK_CLIENT_CERT.to_string(), - WEBHOOK_CLIENT_KEY.to_string(), - WEBHOOK_BATCH_SIZE.to_string(), - WEBHOOK_QUEUE_LIMIT.to_string(), - WEBHOOK_QUEUE_DIR.to_string(), - WEBHOOK_MAX_RETRY.to_string(), - WEBHOOK_RETRY_INTERVAL.to_string(), - WEBHOOK_HTTP_TIMEOUT.to_string(), - ] - .into_iter() - .collect() -} - -/// Gets valid field names for MQTT configuration -fn get_mqtt_valid_fields() -> HashSet { - vec![ - ENABLE_KEY.to_string(), - MQTT_BROKER.to_string(), - MQTT_TOPIC.to_string(), - MQTT_USERNAME.to_string(), - MQTT_PASSWORD.to_string(), - MQTT_QOS.to_string(), - MQTT_KEEP_ALIVE_INTERVAL.to_string(), - MQTT_RECONNECT_INTERVAL.to_string(), - MQTT_QUEUE_DIR.to_string(), - MQTT_QUEUE_LIMIT.to_string(), - ] - .into_iter() - .collect() -} - -/// Parses webhook arguments from KVS configuration -fn parse_webhook_args(_id: &str, config: &KVS) -> Result { - let endpoint = config - .lookup(WEBHOOK_ENDPOINT) - .filter(|s| !s.is_empty()) - .ok_or_else(|| TargetError::Configuration("webhook endpoint is required".to_string()))?; - - let endpoint_url = - Url::parse(&endpoint).map_err(|e| TargetError::Configuration(format!("invalid webhook endpoint URL: {e}")))?; - - let args = WebhookArgs { - enable: true, // Already validated as enabled - endpoint: endpoint_url, - auth_token: config.lookup(WEBHOOK_AUTH_TOKEN).unwrap_or_default(), - queue_dir: config.lookup(WEBHOOK_QUEUE_DIR).unwrap_or_default(), - queue_limit: config - .lookup(WEBHOOK_QUEUE_LIMIT) - .and_then(|s| s.parse().ok()) - .unwrap_or(100000), - client_cert: config.lookup(WEBHOOK_CLIENT_CERT).unwrap_or_default(), - client_key: config.lookup(WEBHOOK_CLIENT_KEY).unwrap_or_default(), - target_type: TargetType::AuditLog, - }; - - args.validate()?; - Ok(args) -} - -/// Parses MQTT arguments from KVS configuration -fn parse_mqtt_args(_id: &str, config: &KVS) -> Result { - let broker = config - .lookup(MQTT_BROKER) - .filter(|s| !s.is_empty()) - .ok_or_else(|| TargetError::Configuration("MQTT broker is required".to_string()))?; - - let broker_url = Url::parse(&broker).map_err(|e| TargetError::Configuration(format!("invalid MQTT broker URL: {e}")))?; - - let topic = config - .lookup(MQTT_TOPIC) - .filter(|s| !s.is_empty()) - .ok_or_else(|| TargetError::Configuration("MQTT topic is required".to_string()))?; - - let qos = config - .lookup(MQTT_QOS) - .and_then(|s| s.parse::().ok()) - .and_then(|q| match q { - 0 => Some(rumqttc::QoS::AtMostOnce), - 1 => Some(rumqttc::QoS::AtLeastOnce), - 2 => Some(rumqttc::QoS::ExactlyOnce), - _ => None, - }) - .unwrap_or(rumqttc::QoS::AtLeastOnce); - - let args = MQTTArgs { - enable: true, // Already validated as enabled - broker: broker_url, - topic, - qos, - username: config.lookup(MQTT_USERNAME).unwrap_or_default(), - password: config.lookup(MQTT_PASSWORD).unwrap_or_default(), - max_reconnect_interval: parse_duration(&config.lookup(MQTT_RECONNECT_INTERVAL).unwrap_or_else(|| "5s".to_string())) - .unwrap_or(Duration::from_secs(5)), - keep_alive: parse_duration(&config.lookup(MQTT_KEEP_ALIVE_INTERVAL).unwrap_or_else(|| "60s".to_string())) - .unwrap_or(Duration::from_secs(60)), - queue_dir: config.lookup(MQTT_QUEUE_DIR).unwrap_or_default(), - queue_limit: config.lookup(MQTT_QUEUE_LIMIT).and_then(|s| s.parse().ok()).unwrap_or(100000), - target_type: TargetType::AuditLog, - }; - - args.validate()?; - Ok(args) -} - -/// Parses enable value from string -fn parse_enable_value(value: &str) -> bool { - matches!(value.to_lowercase().as_str(), "1" | "on" | "true" | "yes") -} - -/// Parses duration from string (e.g., "3s", "5m") -fn parse_duration(s: &str) -> Option { - if let Some(stripped) = s.strip_suffix('s') { - stripped.parse::().ok().map(Duration::from_secs) - } else if let Some(stripped) = s.strip_suffix('m') { - stripped.parse::().ok().map(|m| Duration::from_secs(m * 60)) - } else if let Some(stripped) = s.strip_suffix("ms") { - stripped.parse::().ok().map(Duration::from_millis) - } else { - s.parse::().ok().map(Duration::from_secs) - } -} diff --git a/crates/audit/src/system.rs b/crates/audit/src/system.rs index cbfd2d51..ad80ffe9 100644 --- a/crates/audit/src/system.rs +++ b/crates/audit/src/system.rs @@ -58,6 +58,12 @@ impl AuditSystem { } /// Starts the audit system with the given configuration + /// + /// # Arguments + /// * `config` - The configuration to use for starting the audit system + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure pub async fn start(&self, config: Config) -> AuditResult<()> { let state = self.state.write().await; @@ -87,7 +93,7 @@ impl AuditSystem { // Create targets from configuration let mut registry = self.registry.lock().await; - match registry.create_targets_from_config(&config).await { + match registry.create_audit_targets_from_config(&config).await { Ok(targets) => { if targets.is_empty() { info!("No enabled audit targets found, keeping audit system stopped"); @@ -143,6 +149,9 @@ impl AuditSystem { } /// Pauses the audit system + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure pub async fn pause(&self) -> AuditResult<()> { let mut state = self.state.write().await; @@ -161,6 +170,9 @@ impl AuditSystem { } /// Resumes the audit system + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure pub async fn resume(&self) -> AuditResult<()> { let mut state = self.state.write().await; @@ -179,6 +191,9 @@ impl AuditSystem { } /// Stops the audit system and closes all targets + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure pub async fn close(&self) -> AuditResult<()> { let mut state = self.state.write().await; @@ -223,11 +238,20 @@ impl AuditSystem { } /// Checks if the audit system is running + /// + /// # Returns + /// * `bool` - True if running, false otherwise pub async fn is_running(&self) -> bool { matches!(*self.state.read().await, AuditSystemState::Running) } /// Dispatches an audit log entry to all active targets + /// + /// # Arguments + /// * `entry` - The audit log entry to dispatch + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure pub async fn dispatch(&self, entry: Arc) -> AuditResult<()> { let start_time = std::time::Instant::now(); @@ -319,6 +343,13 @@ impl AuditSystem { Ok(()) } + /// Dispatches a batch of audit log entries to all active targets + /// + /// # Arguments + /// * `entries` - A vector of audit log entries to dispatch + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure pub async fn dispatch_batch(&self, entries: Vec>) -> AuditResult<()> { let start_time = std::time::Instant::now(); @@ -386,7 +417,13 @@ impl AuditSystem { Ok(()) } - // New: Audit flow background tasks, based on send_from_store, including retries and exponential backoffs + /// Starts the audit stream processing for a target with batching and retry logic + /// # Arguments + /// * `store` - The store from which to read audit entries + /// * `target` - The target to which audit entries will be sent + /// + /// This function spawns a background task that continuously reads audit entries from the provided store + /// and attempts to send them to the specified target. It implements retry logic with exponential backoff fn start_audit_stream_with_batching( &self, store: Box, Error = StoreError, Key = Key> + Send>, @@ -462,6 +499,12 @@ impl AuditSystem { } /// Enables a specific target + /// + /// # Arguments + /// * `target_id` - The ID of the target to enable + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure pub async fn enable_target(&self, target_id: &str) -> AuditResult<()> { // This would require storing enabled/disabled state per target // For now, just check if target exists @@ -475,6 +518,12 @@ impl AuditSystem { } /// Disables a specific target + /// + /// # Arguments + /// * `target_id` - The ID of the target to disable + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure pub async fn disable_target(&self, target_id: &str) -> AuditResult<()> { // This would require storing enabled/disabled state per target // For now, just check if target exists @@ -488,6 +537,12 @@ impl AuditSystem { } /// Removes a target from the system + /// + /// # Arguments + /// * `target_id` - The ID of the target to remove + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure pub async fn remove_target(&self, target_id: &str) -> AuditResult<()> { let mut registry = self.registry.lock().await; if let Some(target) = registry.remove_target(target_id) { @@ -502,6 +557,13 @@ impl AuditSystem { } /// Updates or inserts a target + /// + /// # Arguments + /// * `target_id` - The ID of the target to upsert + /// * `target` - The target instance to insert or update + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure pub async fn upsert_target(&self, target_id: String, target: Box + Send + Sync>) -> AuditResult<()> { let mut registry = self.registry.lock().await; @@ -523,18 +585,33 @@ impl AuditSystem { } /// Lists all targets + /// + /// # Returns + /// * `Vec` - List of target IDs pub async fn list_targets(&self) -> Vec { let registry = self.registry.lock().await; registry.list_targets() } /// Gets information about a specific target + /// + /// # Arguments + /// * `target_id` - The ID of the target to retrieve + /// + /// # Returns + /// * `Option` - Target ID if found pub async fn get_target(&self, target_id: &str) -> Option { let registry = self.registry.lock().await; registry.get_target(target_id).map(|target| target.id().to_string()) } /// Reloads configuration and updates targets + /// + /// # Arguments + /// * `new_config` - The new configuration to load + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure pub async fn reload_config(&self, new_config: Config) -> AuditResult<()> { info!("Reloading audit system configuration"); @@ -554,7 +631,7 @@ impl AuditSystem { } // Create new targets from updated configuration - match registry.create_targets_from_config(&new_config).await { + match registry.create_audit_targets_from_config(&new_config).await { Ok(targets) => { info!(target_count = targets.len(), "Reloaded audit targets successfully"); @@ -594,16 +671,22 @@ impl AuditSystem { } /// Gets current audit system metrics + /// + /// # Returns + /// * `AuditMetricsReport` - Current metrics report pub async fn get_metrics(&self) -> observability::AuditMetricsReport { observability::get_metrics_report().await } /// Validates system performance against requirements + /// + /// # Returns + /// * `PerformanceValidation` - Performance validation results pub async fn validate_performance(&self) -> observability::PerformanceValidation { observability::validate_performance().await } - /// Resets all metrics + /// Resets all metrics to initial state pub async fn reset_metrics(&self) { observability::reset_metrics().await; } diff --git a/crates/audit/tests/integration_test.rs b/crates/audit/tests/integration_test.rs index d889c84e..f2ef342e 100644 --- a/crates/audit/tests/integration_test.rs +++ b/crates/audit/tests/integration_test.rs @@ -43,11 +43,11 @@ async fn test_config_parsing_webhook() { audit_webhook_section.insert("_".to_string(), default_kvs); config.0.insert("audit_webhook".to_string(), audit_webhook_section); - let mut registry = AuditRegistry::new(); + let registry = AuditRegistry::new(); // This should not fail even if server storage is not initialized // as it's an integration test - let result = registry.create_targets_from_config(&config).await; + let result = registry.create_audit_targets_from_config(&config).await; // We expect this to fail due to server storage not being initialized // but the parsing should work correctly diff --git a/crates/audit/tests/performance_test.rs b/crates/audit/tests/performance_test.rs index 4080c47b..b96e92eb 100644 --- a/crates/audit/tests/performance_test.rs +++ b/crates/audit/tests/performance_test.rs @@ -44,7 +44,7 @@ async fn test_audit_system_startup_performance() { #[tokio::test] async fn test_concurrent_target_creation() { // Test that multiple targets can be created concurrently - let mut registry = AuditRegistry::new(); + let registry = AuditRegistry::new(); // Create config with multiple webhook instances let mut config = rustfs_ecstore::config::Config(std::collections::HashMap::new()); @@ -63,7 +63,7 @@ async fn test_concurrent_target_creation() { let start = Instant::now(); // This will fail due to server storage not being initialized, but we can measure timing - let result = registry.create_targets_from_config(&config).await; + let result = registry.create_audit_targets_from_config(&config).await; let elapsed = start.elapsed(); println!("Concurrent target creation took: {elapsed:?}"); diff --git a/crates/audit/tests/system_integration_test.rs b/crates/audit/tests/system_integration_test.rs index 267a9fc1..d60c6f18 100644 --- a/crates/audit/tests/system_integration_test.rs +++ b/crates/audit/tests/system_integration_test.rs @@ -135,7 +135,7 @@ async fn test_global_audit_functions() { #[tokio::test] async fn test_config_parsing_with_multiple_instances() { - let mut registry = AuditRegistry::new(); + let registry = AuditRegistry::new(); // Create config with multiple webhook instances let mut config = Config(HashMap::new()); @@ -164,7 +164,7 @@ async fn test_config_parsing_with_multiple_instances() { config.0.insert("audit_webhook".to_string(), webhook_section); // Try to create targets from config - let result = registry.create_targets_from_config(&config).await; + let result = registry.create_audit_targets_from_config(&config).await; // Should fail due to server storage not initialized, but parsing should work match result { diff --git a/crates/common/src/globals.rs b/crates/common/src/globals.rs index 141003a2..6bcc7e29 100644 --- a/crates/common/src/globals.rs +++ b/crates/common/src/globals.rs @@ -19,21 +19,21 @@ use std::sync::LazyLock; use tokio::sync::RwLock; use tonic::transport::Channel; -pub static GLOBAL_Local_Node_Name: LazyLock> = LazyLock::new(|| RwLock::new("".to_string())); -pub static GLOBAL_Rustfs_Host: LazyLock> = LazyLock::new(|| RwLock::new("".to_string())); -pub static GLOBAL_Rustfs_Port: LazyLock> = LazyLock::new(|| RwLock::new("9000".to_string())); -pub static GLOBAL_Rustfs_Addr: LazyLock> = LazyLock::new(|| RwLock::new("".to_string())); -pub static GLOBAL_Conn_Map: LazyLock>> = LazyLock::new(|| RwLock::new(HashMap::new())); +pub static GLOBAL_LOCAL_NODE_NAME: LazyLock> = LazyLock::new(|| RwLock::new("".to_string())); +pub static GLOBAL_RUSTFS_HOST: LazyLock> = LazyLock::new(|| RwLock::new("".to_string())); +pub static GLOBAL_RUSTFS_PORT: LazyLock> = LazyLock::new(|| RwLock::new("9000".to_string())); +pub static GLOBAL_RUSTFS_ADDR: LazyLock> = LazyLock::new(|| RwLock::new("".to_string())); +pub static GLOBAL_CONN_MAP: LazyLock>> = LazyLock::new(|| RwLock::new(HashMap::new())); pub async fn set_global_addr(addr: &str) { - *GLOBAL_Rustfs_Addr.write().await = addr.to_string(); + *GLOBAL_RUSTFS_ADDR.write().await = addr.to_string(); } /// Evict a stale/dead connection from the global connection cache. /// This is critical for cluster recovery when a node dies unexpectedly (e.g., power-off). /// By removing the cached connection, subsequent requests will establish a fresh connection. pub async fn evict_connection(addr: &str) { - let removed = GLOBAL_Conn_Map.write().await.remove(addr); + let removed = GLOBAL_CONN_MAP.write().await.remove(addr); if removed.is_some() { tracing::warn!("Evicted stale connection from cache: {}", addr); } @@ -41,12 +41,12 @@ pub async fn evict_connection(addr: &str) { /// Check if a connection exists in the cache for the given address. pub async fn has_cached_connection(addr: &str) -> bool { - GLOBAL_Conn_Map.read().await.contains_key(addr) + GLOBAL_CONN_MAP.read().await.contains_key(addr) } /// Clear all cached connections. Useful for full cluster reset/recovery. pub async fn clear_all_connections() { - let mut map = GLOBAL_Conn_Map.write().await; + let mut map = GLOBAL_CONN_MAP.write().await; let count = map.len(); map.clear(); if count > 0 { diff --git a/crates/config/src/audit/mod.rs b/crates/config/src/audit/mod.rs index 92a57212..793845ff 100644 --- a/crates/config/src/audit/mod.rs +++ b/crates/config/src/audit/mod.rs @@ -29,7 +29,7 @@ pub const AUDIT_PREFIX: &str = "audit"; pub const AUDIT_ROUTE_PREFIX: &str = const_str::concat!(AUDIT_PREFIX, DEFAULT_DELIMITER); pub const AUDIT_WEBHOOK_SUB_SYS: &str = "audit_webhook"; -pub const AUDIT_MQTT_SUB_SYS: &str = "mqtt_webhook"; +pub const AUDIT_MQTT_SUB_SYS: &str = "audit_mqtt"; pub const AUDIT_STORE_EXTENSION: &str = ".audit"; #[allow(dead_code)] diff --git a/crates/config/src/constants/env.rs b/crates/config/src/constants/env.rs index e78c2b90..84116ba5 100644 --- a/crates/config/src/constants/env.rs +++ b/crates/config/src/constants/env.rs @@ -16,7 +16,8 @@ pub const DEFAULT_DELIMITER: &str = "_"; pub const ENV_PREFIX: &str = "RUSTFS_"; pub const ENV_WORD_DELIMITER: &str = "_"; -pub const DEFAULT_DIR: &str = "/opt/rustfs/events"; // Default directory for event store +pub const EVENT_DEFAULT_DIR: &str = "/opt/rustfs/events"; // Default directory for event store +pub const AUDIT_DEFAULT_DIR: &str = "/opt/rustfs/audit"; // Default directory for audit store pub const DEFAULT_LIMIT: u64 = 100000; // Default store limit /// Standard config keys and values. diff --git a/crates/config/src/notify/mod.rs b/crates/config/src/notify/mod.rs index 91a78de4..6abb2bf8 100644 --- a/crates/config/src/notify/mod.rs +++ b/crates/config/src/notify/mod.rs @@ -24,13 +24,33 @@ pub use webhook::*; use crate::DEFAULT_DELIMITER; -// --- Configuration Constants --- +/// Default target identifier for notifications, +/// Used in notification system when no specific target is provided, +/// Represents the default target stream or endpoint for notifications when no specific target is provided. pub const DEFAULT_TARGET: &str = "1"; - +/// Notification prefix for routing and identification, +/// Used in notification system, +/// This prefix is utilized in constructing routes and identifiers related to notifications within the system. pub const NOTIFY_PREFIX: &str = "notify"; +/// Notification route prefix combining the notification prefix and default delimiter +/// Combines the notification prefix with the default delimiter +/// Used in notification system for defining routes related to notifications. +/// Example: "notify:/" pub const NOTIFY_ROUTE_PREFIX: &str = const_str::concat!(NOTIFY_PREFIX, DEFAULT_DELIMITER); +/// Name of the environment variable that configures target stream concurrency. +/// Controls how many target streams are processed in parallel by the notification system. +/// Defaults to [`DEFAULT_NOTIFY_TARGET_STREAM_CONCURRENCY`] if not set. +/// Example: `RUSTFS_NOTIFY_TARGET_STREAM_CONCURRENCY=20`. +pub const ENV_NOTIFY_TARGET_STREAM_CONCURRENCY: &str = "RUSTFS_NOTIFY_TARGET_STREAM_CONCURRENCY"; + +/// Default concurrency for target stream processing in the notification system +/// This value is used if the environment variable `RUSTFS_NOTIFY_TARGET_STREAM_CONCURRENCY` is not set. +/// It defines how many target streams can be processed in parallel by the notification system at any given time. +/// Adjust this value based on your system's capabilities and expected load. +pub const DEFAULT_NOTIFY_TARGET_STREAM_CONCURRENCY: usize = 20; + #[allow(dead_code)] pub const NOTIFY_SUB_SYSTEMS: &[&str] = &[NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS]; diff --git a/crates/config/src/notify/store.rs b/crates/config/src/notify/store.rs index ed838b05..3dab3de2 100644 --- a/crates/config/src/notify/store.rs +++ b/crates/config/src/notify/store.rs @@ -15,5 +15,5 @@ pub const DEFAULT_EXT: &str = ".unknown"; // Default file extension pub const COMPRESS_EXT: &str = ".snappy"; // Extension for compressed files -/// STORE_EXTENSION - file extension of an event file in store -pub const STORE_EXTENSION: &str = ".event"; +/// NOTIFY_STORE_EXTENSION - file extension of an event file in store +pub const NOTIFY_STORE_EXTENSION: &str = ".event"; diff --git a/crates/ecstore/src/admin_server_info.rs b/crates/ecstore/src/admin_server_info.rs index 7917004c..9117f8c0 100644 --- a/crates/ecstore/src/admin_server_info.rs +++ b/crates/ecstore/src/admin_server_info.rs @@ -23,7 +23,7 @@ use crate::{ }; use crate::data_usage::load_data_usage_cache; -use rustfs_common::{globals::GLOBAL_Local_Node_Name, heal_channel::DriveState}; +use rustfs_common::{globals::GLOBAL_LOCAL_NODE_NAME, heal_channel::DriveState}; use rustfs_madmin::{ BackendDisks, Disk, ErasureSetInfo, ITEM_INITIALIZING, ITEM_OFFLINE, ITEM_ONLINE, InfoMessage, ServerProperties, }; @@ -128,7 +128,7 @@ async fn is_server_resolvable(endpoint: &Endpoint) -> Result<()> { } pub async fn get_local_server_property() -> ServerProperties { - let addr = GLOBAL_Local_Node_Name.read().await.clone(); + let addr = GLOBAL_LOCAL_NODE_NAME.read().await.clone(); let mut pool_numbers = HashSet::new(); let mut network = HashMap::new(); diff --git a/crates/ecstore/src/config/audit.rs b/crates/ecstore/src/config/audit.rs index afbab13b..f0c86403 100644 --- a/crates/ecstore/src/config/audit.rs +++ b/crates/ecstore/src/config/audit.rs @@ -14,7 +14,7 @@ use crate::config::{KV, KVS}; use rustfs_config::{ - COMMENT_KEY, DEFAULT_DIR, DEFAULT_LIMIT, ENABLE_KEY, EnableState, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, + COMMENT_KEY, DEFAULT_LIMIT, ENABLE_KEY, EVENT_DEFAULT_DIR, EnableState, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_BATCH_SIZE, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_HTTP_TIMEOUT, WEBHOOK_MAX_RETRY, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, WEBHOOK_RETRY_INTERVAL, @@ -63,7 +63,7 @@ pub static DEFAULT_AUDIT_WEBHOOK_KVS: LazyLock = LazyLock::new(|| { }, KV { key: WEBHOOK_QUEUE_DIR.to_owned(), - value: DEFAULT_DIR.to_owned(), + value: EVENT_DEFAULT_DIR.to_owned(), hidden_if_empty: false, }, KV { @@ -131,7 +131,7 @@ pub static DEFAULT_AUDIT_MQTT_KVS: LazyLock = LazyLock::new(|| { }, KV { key: MQTT_QUEUE_DIR.to_owned(), - value: DEFAULT_DIR.to_owned(), + value: EVENT_DEFAULT_DIR.to_owned(), hidden_if_empty: false, }, KV { diff --git a/crates/ecstore/src/config/notify.rs b/crates/ecstore/src/config/notify.rs index 74157f52..c9ebf3ba 100644 --- a/crates/ecstore/src/config/notify.rs +++ b/crates/ecstore/src/config/notify.rs @@ -14,7 +14,7 @@ use crate::config::{KV, KVS}; use rustfs_config::{ - COMMENT_KEY, DEFAULT_DIR, DEFAULT_LIMIT, ENABLE_KEY, EnableState, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, + COMMENT_KEY, DEFAULT_LIMIT, ENABLE_KEY, EVENT_DEFAULT_DIR, EnableState, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, }; @@ -47,7 +47,7 @@ pub static DEFAULT_NOTIFY_WEBHOOK_KVS: LazyLock = LazyLock::new(|| { }, KV { key: WEBHOOK_QUEUE_DIR.to_owned(), - value: DEFAULT_DIR.to_owned(), + value: EVENT_DEFAULT_DIR.to_owned(), hidden_if_empty: false, }, KV { @@ -114,7 +114,7 @@ pub static DEFAULT_NOTIFY_MQTT_KVS: LazyLock = LazyLock::new(|| { }, KV { key: MQTT_QUEUE_DIR.to_owned(), - value: DEFAULT_DIR.to_owned(), + value: EVENT_DEFAULT_DIR.to_owned(), hidden_if_empty: false, }, KV { diff --git a/crates/ecstore/src/metrics_realtime.rs b/crates/ecstore/src/metrics_realtime.rs index a0f711e1..4d938a48 100644 --- a/crates/ecstore/src/metrics_realtime.rs +++ b/crates/ecstore/src/metrics_realtime.rs @@ -20,7 +20,7 @@ use crate::{ }; use chrono::Utc; use rustfs_common::{ - globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Addr}, + globals::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_ADDR}, heal_channel::DriveState, metrics::global_metrics, }; @@ -86,7 +86,7 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts) return real_time_metrics; } - let mut by_host_name = GLOBAL_Rustfs_Addr.read().await.clone(); + let mut by_host_name = GLOBAL_RUSTFS_ADDR.read().await.clone(); if !opts.hosts.is_empty() { let server = get_local_server_property().await; if opts.hosts.contains(&server.endpoint) { @@ -95,7 +95,7 @@ pub async fn collect_local_metrics(types: MetricType, opts: &CollectMetricsOpts) return real_time_metrics; } } - let local_node_name = GLOBAL_Local_Node_Name.read().await.clone(); + let local_node_name = GLOBAL_LOCAL_NODE_NAME.read().await.clone(); if by_host_name.starts_with(":") && !local_node_name.starts_with(":") { by_host_name = local_node_name; } diff --git a/crates/ecstore/src/sets.rs b/crates/ecstore/src/sets.rs index 976fcd56..d96e8aa4 100644 --- a/crates/ecstore/src/sets.rs +++ b/crates/ecstore/src/sets.rs @@ -40,7 +40,7 @@ use futures::future::join_all; use http::HeaderMap; use rustfs_common::heal_channel::HealOpts; use rustfs_common::{ - globals::GLOBAL_Local_Node_Name, + globals::GLOBAL_LOCAL_NODE_NAME, heal_channel::{DriveState, HealItemType}, }; use rustfs_filemeta::FileInfo; @@ -170,7 +170,7 @@ impl Sets { let set_disks = SetDisks::new( fast_lock_manager.clone(), - GLOBAL_Local_Node_Name.read().await.to_string(), + GLOBAL_LOCAL_NODE_NAME.read().await.to_string(), Arc::new(RwLock::new(set_drive)), set_drive_count, parity_count, diff --git a/crates/ecstore/src/store.rs b/crates/ecstore/src/store.rs index 3097a9e2..2259e5b5 100644 --- a/crates/ecstore/src/store.rs +++ b/crates/ecstore/src/store.rs @@ -55,7 +55,7 @@ use futures::future::join_all; use http::HeaderMap; use lazy_static::lazy_static; use rand::Rng as _; -use rustfs_common::globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Host, GLOBAL_Rustfs_Port}; +use rustfs_common::globals::{GLOBAL_LOCAL_NODE_NAME, GLOBAL_RUSTFS_HOST, GLOBAL_RUSTFS_PORT}; use rustfs_common::heal_channel::{HealItemType, HealOpts}; use rustfs_filemeta::FileInfo; use rustfs_madmin::heal_commands::HealResultItem; @@ -127,11 +127,11 @@ impl ECStore { info!("ECStore new address: {}", address.to_string()); let mut host = address.ip().to_string(); if host.is_empty() { - host = GLOBAL_Rustfs_Host.read().await.to_string() + host = GLOBAL_RUSTFS_HOST.read().await.to_string() } let mut port = address.port().to_string(); if port.is_empty() { - port = GLOBAL_Rustfs_Port.read().await.to_string() + port = GLOBAL_RUSTFS_PORT.read().await.to_string() } info!("ECStore new host: {}, port: {}", host, port); init_local_peer(&endpoint_pools, &host, &port).await; @@ -2329,15 +2329,15 @@ async fn init_local_peer(endpoint_pools: &EndpointServerPools, host: &String, po if peer_set.is_empty() { if !host.is_empty() { - *GLOBAL_Local_Node_Name.write().await = format!("{host}:{port}"); + *GLOBAL_LOCAL_NODE_NAME.write().await = format!("{host}:{port}"); return; } - *GLOBAL_Local_Node_Name.write().await = format!("127.0.0.1:{port}"); + *GLOBAL_LOCAL_NODE_NAME.write().await = format!("127.0.0.1:{port}"); return; } - *GLOBAL_Local_Node_Name.write().await = peer_set[0].clone(); + *GLOBAL_LOCAL_NODE_NAME.write().await = peer_set[0].clone(); } pub fn is_valid_object_prefix(_object: &str) -> bool { diff --git a/crates/notify/Cargo.toml b/crates/notify/Cargo.toml index 707c5bd2..0f02b70a 100644 --- a/crates/notify/Cargo.toml +++ b/crates/notify/Cargo.toml @@ -29,6 +29,7 @@ documentation = "https://docs.rs/rustfs-notify/latest/rustfs_notify/" rustfs-config = { workspace = true, features = ["notify", "constants"] } rustfs-ecstore = { workspace = true } rustfs-targets = { workspace = true } +rustfs-utils = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true, features = ["serde"] } futures = { workspace = true } diff --git a/crates/notify/examples/webhook.rs b/crates/notify/examples/webhook.rs index b0f47dc9..e7d81c94 100644 --- a/crates/notify/examples/webhook.rs +++ b/crates/notify/examples/webhook.rs @@ -110,20 +110,21 @@ async fn reset_webhook_count(Query(params): Query, headers: HeaderM let reason = params.reason.unwrap_or_else(|| "Reason not provided".to_string()); println!("Reset webhook count, reason: {reason}"); - + let time_now = chrono::offset::Utc::now().to_string(); for header in headers { let (key, value) = header; - println!("Header: {key:?}: {value:?}"); + println!("Header: {key:?}: {value:?}, time: {time_now}"); } println!("Reset webhook count printed headers"); // Reset the counter to 0 WEBHOOK_COUNT.store(0, Ordering::SeqCst); println!("Webhook count has been reset to 0."); + let time_now = chrono::offset::Utc::now().to_string(); Response::builder() .header("Foo", "Bar") .status(StatusCode::OK) - .body(format!("Webhook count reset successfully current_count:{current_count}")) + .body(format!("Webhook count reset successfully current_count:{current_count},time: {time_now}")) .unwrap() } @@ -167,7 +168,11 @@ async fn receive_webhook(Json(payload): Json) -> StatusCode { serde_json::to_string_pretty(&payload).unwrap() ); WEBHOOK_COUNT.fetch_add(1, Ordering::SeqCst); - println!("Total webhook requests received: {}", WEBHOOK_COUNT.load(Ordering::SeqCst)); + println!( + "Total webhook requests received: {} , Time: {}", + WEBHOOK_COUNT.load(Ordering::SeqCst), + chrono::offset::Utc::now() + ); StatusCode::OK } diff --git a/crates/notify/src/factory.rs b/crates/notify/src/factory.rs index 84cf1be6..e15f5c5d 100644 --- a/crates/notify/src/factory.rs +++ b/crates/notify/src/factory.rs @@ -18,9 +18,9 @@ use hashbrown::HashSet; use rumqttc::QoS; use rustfs_config::notify::{ENV_NOTIFY_MQTT_KEYS, ENV_NOTIFY_WEBHOOK_KEYS, NOTIFY_MQTT_KEYS, NOTIFY_WEBHOOK_KEYS}; use rustfs_config::{ - DEFAULT_DIR, DEFAULT_LIMIT, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, - MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, - WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, + DEFAULT_LIMIT, EVENT_DEFAULT_DIR, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, + MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CERT, + WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, }; use rustfs_ecstore::config::KVS; use rustfs_targets::{ @@ -67,7 +67,7 @@ impl TargetFactory for WebhookTargetFactory { enable: true, // If we are here, it's already enabled. endpoint: endpoint_url, auth_token: config.lookup(WEBHOOK_AUTH_TOKEN).unwrap_or_default(), - queue_dir: config.lookup(WEBHOOK_QUEUE_DIR).unwrap_or(DEFAULT_DIR.to_string()), + queue_dir: config.lookup(WEBHOOK_QUEUE_DIR).unwrap_or(EVENT_DEFAULT_DIR.to_string()), queue_limit: config .lookup(WEBHOOK_QUEUE_LIMIT) .and_then(|v| v.parse::().ok()) @@ -100,7 +100,7 @@ impl TargetFactory for WebhookTargetFactory { )); } - let queue_dir = config.lookup(WEBHOOK_QUEUE_DIR).unwrap_or(DEFAULT_DIR.to_string()); + let queue_dir = config.lookup(WEBHOOK_QUEUE_DIR).unwrap_or(EVENT_DEFAULT_DIR.to_string()); if !queue_dir.is_empty() && !std::path::Path::new(&queue_dir).is_absolute() { return Err(TargetError::Configuration("Webhook queue directory must be an absolute path".to_string())); } @@ -159,7 +159,7 @@ impl TargetFactory for MQTTTargetFactory { .and_then(|v| v.parse::().ok()) .map(Duration::from_secs) .unwrap_or_else(|| Duration::from_secs(30)), - queue_dir: config.lookup(MQTT_QUEUE_DIR).unwrap_or(DEFAULT_DIR.to_string()), + queue_dir: config.lookup(MQTT_QUEUE_DIR).unwrap_or(EVENT_DEFAULT_DIR.to_string()), queue_limit: config .lookup(MQTT_QUEUE_LIMIT) .and_then(|v| v.parse::().ok()) diff --git a/crates/notify/src/integration.rs b/crates/notify/src/integration.rs index 4afa0145..dc50857d 100644 --- a/crates/notify/src/integration.rs +++ b/crates/notify/src/integration.rs @@ -16,6 +16,7 @@ use crate::{ Event, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream, }; use hashbrown::HashMap; +use rustfs_config::notify::{DEFAULT_NOTIFY_TARGET_STREAM_CONCURRENCY, ENV_NOTIFY_TARGET_STREAM_CONCURRENCY}; use rustfs_ecstore::config::{Config, KVS}; use rustfs_targets::EventName; use rustfs_targets::arn::TargetID; @@ -108,17 +109,14 @@ pub struct NotificationSystem { impl NotificationSystem { /// Creates a new NotificationSystem pub fn new(config: Config) -> Self { + let concurrency_limiter = + rustfs_utils::get_env_usize(ENV_NOTIFY_TARGET_STREAM_CONCURRENCY, DEFAULT_NOTIFY_TARGET_STREAM_CONCURRENCY); NotificationSystem { notifier: Arc::new(EventNotifier::new()), registry: Arc::new(TargetRegistry::new()), config: Arc::new(RwLock::new(config)), stream_cancellers: Arc::new(RwLock::new(HashMap::new())), - concurrency_limiter: Arc::new(Semaphore::new( - std::env::var("RUSTFS_TARGET_STREAM_CONCURRENCY") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(20), - )), // Limit the maximum number of concurrent processing events to 20 + concurrency_limiter: Arc::new(Semaphore::new(concurrency_limiter)), // Limit the maximum number of concurrent processing events to 20 metrics: Arc::new(NotificationMetrics::new()), } } @@ -269,9 +267,9 @@ impl NotificationSystem { self.update_config_and_reload(|config| { config .0 - .entry(target_type.to_string()) + .entry(target_type.to_lowercase()) .or_default() - .insert(target_name.to_string(), kvs.clone()); + .insert(target_name.to_lowercase(), kvs.clone()); true // The configuration is always modified }) .await diff --git a/crates/notify/src/registry.rs b/crates/notify/src/registry.rs index 9d649793..cdf3aa11 100644 --- a/crates/notify/src/registry.rs +++ b/crates/notify/src/registry.rs @@ -16,9 +16,11 @@ use crate::Event; use crate::factory::{MQTTTargetFactory, TargetFactory, WebhookTargetFactory}; use futures::stream::{FuturesUnordered, StreamExt}; use hashbrown::{HashMap, HashSet}; -use rustfs_config::{DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX, notify::NOTIFY_ROUTE_PREFIX}; +use rustfs_config::{DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX, EnableState, notify::NOTIFY_ROUTE_PREFIX}; use rustfs_ecstore::config::{Config, KVS}; use rustfs_targets::{Target, TargetError, target::ChannelTargetType}; +use std::str::FromStr; +use std::sync::Arc; use tracing::{debug, error, info, warn}; /// Registry for managing target factories @@ -117,11 +119,7 @@ impl TargetRegistry { format!("{ENV_PREFIX}{NOTIFY_ROUTE_PREFIX}{target_type}{DEFAULT_DELIMITER}{ENABLE_KEY}{DEFAULT_DELIMITER}") .to_uppercase(); for (key, value) in &all_env { - if value.eq_ignore_ascii_case(rustfs_config::EnableState::One.as_str()) - || value.eq_ignore_ascii_case(rustfs_config::EnableState::On.as_str()) - || value.eq_ignore_ascii_case(rustfs_config::EnableState::True.as_str()) - || value.eq_ignore_ascii_case(rustfs_config::EnableState::Yes.as_str()) - { + if EnableState::from_str(value).ok().map(|s| s.is_enabled()).unwrap_or(false) { if let Some(id) = key.strip_prefix(&enable_prefix) { if !id.is_empty() { instance_ids_from_env.insert(id.to_lowercase()); @@ -208,10 +206,10 @@ impl TargetRegistry { let enabled = merged_config .lookup(ENABLE_KEY) .map(|v| { - v.eq_ignore_ascii_case(rustfs_config::EnableState::One.as_str()) - || v.eq_ignore_ascii_case(rustfs_config::EnableState::On.as_str()) - || v.eq_ignore_ascii_case(rustfs_config::EnableState::True.as_str()) - || v.eq_ignore_ascii_case(rustfs_config::EnableState::Yes.as_str()) + EnableState::from_str(v.as_str()) + .ok() + .map(|s| s.is_enabled()) + .unwrap_or(false) }) .unwrap_or(false); @@ -220,10 +218,10 @@ impl TargetRegistry { // 5.3. Create asynchronous tasks for enabled instances let target_type_clone = target_type.clone(); let tid = id.clone(); - let merged_config_arc = std::sync::Arc::new(merged_config); + let merged_config_arc = Arc::new(merged_config); tasks.push(async move { let result = factory.create_target(tid.clone(), &merged_config_arc).await; - (target_type_clone, tid, result, std::sync::Arc::clone(&merged_config_arc)) + (target_type_clone, tid, result, Arc::clone(&merged_config_arc)) }); } else { info!(instance_id = %id, "Skip the disabled target and will be removed from the final configuration"); diff --git a/crates/protos/src/lib.rs b/crates/protos/src/lib.rs index 4242a76f..305d67a5 100644 --- a/crates/protos/src/lib.rs +++ b/crates/protos/src/lib.rs @@ -19,7 +19,7 @@ use std::{error::Error, time::Duration}; pub use generated::*; use proto_gen::node_service::node_service_client::NodeServiceClient; -use rustfs_common::globals::{GLOBAL_Conn_Map, evict_connection}; +use rustfs_common::globals::{GLOBAL_CONN_MAP, evict_connection}; use tonic::{ Request, Status, metadata::MetadataValue, @@ -74,7 +74,7 @@ async fn create_new_channel(addr: &str) -> Result> { // Cache the new connection { - GLOBAL_Conn_Map.write().await.insert(addr.to_string(), channel.clone()); + GLOBAL_CONN_MAP.write().await.insert(addr.to_string(), channel.clone()); } debug!("Successfully created and cached gRPC channel to: {}", addr); @@ -111,7 +111,7 @@ pub async fn node_service_time_out_client( let token: MetadataValue<_> = "rustfs rpc".parse()?; // Try to get cached channel - let cached_channel = { GLOBAL_Conn_Map.read().await.get(addr).cloned() }; + let cached_channel = { GLOBAL_CONN_MAP.read().await.get(addr).cloned() }; let channel = match cached_channel { Some(channel) => { diff --git a/crates/targets/src/event_name.rs b/crates/targets/src/event_name.rs index 49df020f..6df8d3f8 100644 --- a/crates/targets/src/event_name.rs +++ b/crates/targets/src/event_name.rs @@ -353,7 +353,7 @@ mod tests { let deserialized = serde_json::from_str::(invalid_str); assert!(deserialized.is_err(), "Deserialization should fail for invalid event name"); - // empty string should be successful only serialization + // Serializing EventName::Everything produces an empty string, but deserializing an empty string should fail. let event_name = EventName::Everything; let serialized_str = "\"\""; let serialized = serde_json::to_string(&event_name); diff --git a/crates/targets/src/target/mqtt.rs b/crates/targets/src/target/mqtt.rs index 45b73e5e..61cb93c0 100644 --- a/crates/targets/src/target/mqtt.rs +++ b/crates/targets/src/target/mqtt.rs @@ -12,12 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::store::Key; -use crate::target::{ChannelTargetType, EntityTarget, TargetType}; -use crate::{StoreError, Target, TargetLog, arn::TargetID, error::TargetError, store::Store}; +use crate::{ + StoreError, Target, TargetLog, + arn::TargetID, + error::TargetError, + store::{Key, QueueStore, Store}, + target::{ChannelTargetType, EntityTarget, TargetType}, +}; use async_trait::async_trait; -use rumqttc::{AsyncClient, EventLoop, MqttOptions, Outgoing, Packet, QoS}; -use rumqttc::{ConnectionError, mqttbytes::Error as MqttBytesError}; +use rumqttc::{AsyncClient, ConnectionError, EventLoop, MqttOptions, Outgoing, Packet, QoS, mqttbytes::Error as MqttBytesError}; use serde::Serialize; use serde::de::DeserializeOwned; use std::sync::Arc; @@ -130,10 +133,10 @@ where debug!(target_id = %target_id, path = %specific_queue_path.display(), "Initializing queue store for MQTT target"); let extension = match args.target_type { TargetType::AuditLog => rustfs_config::audit::AUDIT_STORE_EXTENSION, - TargetType::NotifyEvent => rustfs_config::notify::STORE_EXTENSION, + TargetType::NotifyEvent => rustfs_config::notify::NOTIFY_STORE_EXTENSION, }; - let store = crate::store::QueueStore::>::new(specific_queue_path, args.queue_limit, extension); + let store = QueueStore::>::new(specific_queue_path, args.queue_limit, extension); if let Err(e) = store.open() { error!( target_id = %target_id, diff --git a/crates/targets/src/target/webhook.rs b/crates/targets/src/target/webhook.rs index d2de20e9..c9564274 100644 --- a/crates/targets/src/target/webhook.rs +++ b/crates/targets/src/target/webhook.rs @@ -12,16 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::target::{ChannelTargetType, EntityTarget, TargetType}; use crate::{ StoreError, Target, TargetLog, arn::TargetID, error::TargetError, - store::{Key, Store}, + store::{Key, QueueStore, Store}, + target::{ChannelTargetType, EntityTarget, TargetType}, }; use async_trait::async_trait; use reqwest::{Client, StatusCode, Url}; -use rustfs_config::notify::STORE_EXTENSION; +use rustfs_config::audit::AUDIT_STORE_EXTENSION; +use rustfs_config::notify::NOTIFY_STORE_EXTENSION; use serde::Serialize; use serde::de::DeserializeOwned; use std::{ @@ -155,11 +156,11 @@ where PathBuf::from(&args.queue_dir).join(format!("rustfs-{}-{}", ChannelTargetType::Webhook.as_str(), target_id.id)); let extension = match args.target_type { - TargetType::AuditLog => rustfs_config::audit::AUDIT_STORE_EXTENSION, - TargetType::NotifyEvent => STORE_EXTENSION, + TargetType::AuditLog => AUDIT_STORE_EXTENSION, + TargetType::NotifyEvent => NOTIFY_STORE_EXTENSION, }; - let store = crate::store::QueueStore::>::new(queue_dir, args.queue_limit, extension); + let store = QueueStore::>::new(queue_dir, args.queue_limit, extension); if let Err(e) = store.open() { error!("Failed to open store for Webhook target {}: {}", target_id.id, e); diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index bdc93286..d62777bb 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -16,9 +16,8 @@ mod admin; mod auth; mod config; mod error; -// mod grpc; mod init; -pub mod license; +mod license; mod profiling; mod server; mod storage; diff --git a/rustfs/src/server/audit.rs b/rustfs/src/server/audit.rs index 2a81af15..144f7446 100644 --- a/rustfs/src/server/audit.rs +++ b/rustfs/src/server/audit.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use rustfs_audit::system::AuditSystemState; -use rustfs_audit::{AuditError, AuditResult, audit_system, init_audit_system}; +use rustfs_audit::{AuditError, AuditResult, audit_system, init_audit_system, system::AuditSystemState}; use rustfs_config::DEFAULT_DELIMITER; use rustfs_ecstore::config::GLOBAL_SERVER_CONFIG; use tracing::{info, warn}; @@ -69,7 +68,9 @@ pub(crate) async fn start_audit_system() -> AuditResult<()> { mqtt_config.is_some(), webhook_config.is_some() ); + // 3. Initialize and start the audit system let system = init_audit_system(); + // Check if the audit system is already running let state = system.get_state().await; if state == AuditSystemState::Running { warn!( diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 42ce4a01..e12eb958 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -5122,6 +5122,7 @@ impl S3 for FS { let (clear_result, event_rules) = tokio::join!(clear_rules, parse_rules); clear_result.map_err(|e| s3_error!(InternalError, "Failed to clear rules: {e}"))?; + warn!("notify event rules: {:?}", &event_rules); // Add a new notification rule notifier_global::add_event_specific_rules(&bucket, ®ion, &event_rules) diff --git a/rustfs/src/storage/tonic_service.rs b/rustfs/src/storage/tonic_service.rs index eebe1c74..5ca8ab22 100644 --- a/rustfs/src/storage/tonic_service.rs +++ b/rustfs/src/storage/tonic_service.rs @@ -16,7 +16,7 @@ use bytes::Bytes; use futures::Stream; use futures_util::future::join_all; use rmp_serde::{Deserializer, Serializer}; -use rustfs_common::{globals::GLOBAL_Local_Node_Name, heal_channel::HealOpts}; +use rustfs_common::{globals::GLOBAL_LOCAL_NODE_NAME, heal_channel::HealOpts}; use rustfs_ecstore::{ admin_server_info::get_local_server_property, bucket::{metadata::load_bucket_metadata, metadata_sys}, @@ -1646,7 +1646,7 @@ impl Node for NodeService { } async fn get_net_info(&self, _request: Request) -> Result, Status> { - let addr = GLOBAL_Local_Node_Name.read().await.clone(); + let addr = GLOBAL_LOCAL_NODE_NAME.read().await.clone(); let info = get_net_info(&addr, ""); let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { @@ -1701,7 +1701,7 @@ impl Node for NodeService { &self, _request: Request, ) -> Result, Status> { - let addr = GLOBAL_Local_Node_Name.read().await.clone(); + let addr = GLOBAL_LOCAL_NODE_NAME.read().await.clone(); let info = get_sys_services(&addr); let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { @@ -1719,7 +1719,7 @@ impl Node for NodeService { } async fn get_sys_config(&self, _request: Request) -> Result, Status> { - let addr = GLOBAL_Local_Node_Name.read().await.clone(); + let addr = GLOBAL_LOCAL_NODE_NAME.read().await.clone(); let info = get_sys_config(&addr); let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { @@ -1737,7 +1737,7 @@ impl Node for NodeService { } async fn get_sys_errors(&self, _request: Request) -> Result, Status> { - let addr = GLOBAL_Local_Node_Name.read().await.clone(); + let addr = GLOBAL_LOCAL_NODE_NAME.read().await.clone(); let info = get_sys_errors(&addr); let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { @@ -1755,7 +1755,7 @@ impl Node for NodeService { } async fn get_mem_info(&self, _request: Request) -> Result, Status> { - let addr = GLOBAL_Local_Node_Name.read().await.clone(); + let addr = GLOBAL_LOCAL_NODE_NAME.read().await.clone(); let info = get_mem_info(&addr); let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { @@ -1798,7 +1798,7 @@ impl Node for NodeService { } async fn get_proc_info(&self, _request: Request) -> Result, Status> { - let addr = GLOBAL_Local_Node_Name.read().await.clone(); + let addr = GLOBAL_LOCAL_NODE_NAME.read().await.clone(); let info = get_proc_info(&addr); let mut buf = Vec::new(); if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { diff --git a/scripts/run.sh b/scripts/run.sh index d3e99945..762215c6 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -36,7 +36,7 @@ mkdir -p ./target/volume/test{1..4} if [ -z "$RUST_LOG" ]; then export RUST_BACKTRACE=1 - export RUST_LOG="rustfs=debug,ecstore=info,s3s=debug,iam=info" + export RUST_LOG="rustfs=debug,ecstore=info,s3s=debug,iam=info,notify=info" fi # export RUSTFS_ERASURE_SET_DRIVE_COUNT=5 @@ -90,30 +90,30 @@ export OTEL_INSTRUMENTATION_VERSION="0.1.1" export OTEL_INSTRUMENTATION_SCHEMA_URL="https://opentelemetry.io/schemas/1.31.0" export OTEL_INSTRUMENTATION_ATTRIBUTES="env=production" -# notify -export RUSTFS_NOTIFY_WEBHOOK_ENABLE="on" # Whether to enable webhook notification -export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT="http://[::]:3020/webhook" # Webhook notification address -export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/notify" - -export RUSTFS_NOTIFY_WEBHOOK_ENABLE_PRIMARY="on" # Whether to enable webhook notification -export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_PRIMARY="http://[::]:3020/webhook" # Webhook notification address -export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_PRIMARY="$current_dir/deploy/logs/notify" - -export RUSTFS_NOTIFY_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook notification -export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_MASTER="http://[::]:3020/webhook" # Webhook notification address -export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/notify" - -export RUSTFS_AUDIT_WEBHOOK_ENABLE="on" # Whether to enable webhook audit -export RUSTFS_AUDIT_WEBHOOK_ENDPOINT="http://[::]:3020/webhook" # Webhook audit address -export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/audit" - -export RUSTFS_AUDIT_WEBHOOK_ENABLE_PRIMARY="on" # Whether to enable webhook audit -export RUSTFS_AUDIT_WEBHOOK_ENDPOINT_PRIMARY="http://[::]:3020/webhook" # Webhook audit address -export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR_PRIMARY="$current_dir/deploy/logs/audit" - -export RUSTFS_AUDIT_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook audit -export RUSTFS_AUDIT_WEBHOOK_ENDPOINT_MASTER="http://[::]:3020/webhook" # Webhook audit address -export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/audit" +## notify +#export RUSTFS_NOTIFY_WEBHOOK_ENABLE="on" # Whether to enable webhook notification +#export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT="http://127.0.0.1:3020/webhook" # Webhook notification address +#export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/notify" +# +#export RUSTFS_NOTIFY_WEBHOOK_ENABLE_PRIMARY="on" # Whether to enable webhook notification +#export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_PRIMARY="http://127.0.0.1:3020/webhook" # Webhook notification address +#export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_PRIMARY="$current_dir/deploy/logs/notify" +# +#export RUSTFS_NOTIFY_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook notification +#export RUSTFS_NOTIFY_WEBHOOK_ENDPOINT_MASTER="http://127.0.0.1:3020/webhook" # Webhook notification address +#export RUSTFS_NOTIFY_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/notify" +# +#export RUSTFS_AUDIT_WEBHOOK_ENABLE="on" # Whether to enable webhook audit +#export RUSTFS_AUDIT_WEBHOOK_ENDPOINT="http://127.0.0.1:3020/webhook" # Webhook audit address +#export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR="$current_dir/deploy/logs/audit" +# +#export RUSTFS_AUDIT_WEBHOOK_ENABLE_PRIMARY="on" # Whether to enable webhook audit +#export RUSTFS_AUDIT_WEBHOOK_ENDPOINT_PRIMARY="http://127.0.0.1:3020/webhook" # Webhook audit address +#export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR_PRIMARY="$current_dir/deploy/logs/audit" +# +#export RUSTFS_AUDIT_WEBHOOK_ENABLE_MASTER="on" # Whether to enable webhook audit +#export RUSTFS_AUDIT_WEBHOOK_ENDPOINT_MASTER="http://127.0.0.1:3020/webhook" # Webhook audit address +#export RUSTFS_AUDIT_WEBHOOK_QUEUE_DIR_MASTER="$current_dir/deploy/logs/audit" # export RUSTFS_POLICY_PLUGIN_URL="http://localhost:8181/v1/data/rustfs/authz/allow" # The URL of the OPA system # export RUSTFS_POLICY_PLUGIN_AUTH_TOKEN="your-opa-token" # The authentication token for the OPA system is optional @@ -211,5 +211,4 @@ fi # To run in release mode, use the following line #cargo run --profile release --bin rustfs # To run in debug mode, use the following line -cargo run --bin rustfs - +cargo run --bin rustfs \ No newline at end of file