From 98be7df0f5bbfcb9dd1a144d2f4aaedb0fca3bf8 Mon Sep 17 00:00:00 2001 From: houseme Date: Mon, 10 Nov 2025 17:30:50 +0800 Subject: [PATCH] feat(storage): refactor audit and notification with OperationHelper (#825) * improve code for audit * improve code ecfs.rs * improve code * improve code for ecfs.rs * feat(storage): refactor audit and notification with OperationHelper This commit introduces a significant refactoring of the audit logging and event notification mechanisms within `ecfs.rs`. The core of this change is the new `OperationHelper` struct, which encapsulates and simplifies the logic for both concerns. It replaces the previous `AuditHelper` and manual event dispatching. Key improvements include: - **Unified Handling**: `OperationHelper` manages both audit and notification builders, providing a single, consistent entry point for S3 operations. - **RAII for Automation**: By leveraging the `Drop` trait, the helper automatically dispatches logs and notifications when it goes out of scope. This simplifies S3 method implementations and ensures cleanup even on early returns. - **Fluent API**: A builder-like pattern with methods such as `.object()`, `.version_id()`, and `.suppress_event()` makes the code more readable and expressive. - **Context-Aware Logic**: The helper's `.complete()` method intelligently populates log details based on the operation's `S3Result` and only triggers notifications on success. - **Modular Design**: All helper logic is now isolated in `rustfs/src/storage/helper.rs`, improving separation of concerns and making `ecfs.rs` cleaner. This refactoring significantly enhances code clarity, reduces boilerplate, and improves the robustness of logging and notification handling across the storage layer. * fix * fix * fix * fix * fix * fix * fix * improve code for audit and notify * fix * fix * fix --- Cargo.lock | 137 +++-- Cargo.toml | 5 +- crates/audit/Cargo.toml | 1 + crates/audit/src/entity.rs | 423 ++++++-------- crates/audit/src/error.rs | 12 +- crates/audit/src/global.rs | 52 +- crates/audit/src/lib.rs | 2 +- crates/audit/src/registry.rs | 8 +- crates/audit/src/system.rs | 10 +- crates/audit/tests/integration_test.rs | 2 +- crates/audit/tests/performance_test.rs | 10 +- crates/audit/tests/system_integration_test.rs | 55 +- crates/notify/Cargo.toml | 1 - crates/notify/src/error.rs | 29 +- crates/notify/src/event.rs | 117 ++++ crates/notify/src/global.rs | 43 +- crates/notify/src/integration.rs | 4 +- crates/notify/src/lib.rs | 14 +- rustfs/Cargo.toml | 1 - rustfs/src/admin/handlers/event.rs | 8 +- rustfs/src/main.rs | 5 +- rustfs/src/storage/ecfs.rs | 526 ++++++------------ rustfs/src/storage/entity.rs | 63 +++ rustfs/src/storage/helper.rs | 209 +++++++ rustfs/src/storage/mod.rs | 3 +- 25 files changed, 905 insertions(+), 835 deletions(-) create mode 100644 rustfs/src/storage/entity.rs create mode 100644 rustfs/src/storage/helper.rs diff --git a/Cargo.lock b/Cargo.lock index 764283d0..0d1d2591 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -505,7 +505,7 @@ checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -527,7 +527,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -538,7 +538,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -564,7 +564,7 @@ checksum = "99e1aca718ea7b89985790c94aad72d77533063fe00bc497bb79a7c2dae6a661" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -1193,7 +1193,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -1273,7 +1273,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -1357,9 +1357,9 @@ dependencies = [ [[package]] name = "bytesize" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5c434ae3cf0089ca203e9019ebe529c47ff45cefe8af7c85ecb734ef541822f" +checksum = "c99fa31e08a43eaa5913ef68d7e01c37a2bdce6ed648168239ad33b7d30a9cd8" [[package]] name = "bytestring" @@ -1614,7 +1614,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -1707,7 +1707,7 @@ checksum = "a08a8aee16926ee1c4ad18868b8c3dfe5106359053f91e035861ec2a17116988" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2024,7 +2024,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2082,7 +2082,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.11.1", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2096,7 +2096,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.11.1", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2118,7 +2118,7 @@ checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ "darling_core 0.20.11", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2129,7 +2129,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" dependencies = [ "darling_core 0.21.3", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2604,7 +2604,7 @@ checksum = "ec6f637bce95efac05cdfb9b6c19579ed4aa5f6b94d951cfa5bb054b7bb4f730" dependencies = [ "datafusion-expr", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2843,7 +2843,7 @@ checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2885,7 +2885,7 @@ dependencies = [ "darling 0.20.11", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2905,7 +2905,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" dependencies = [ "derive_builder_core 0.20.2", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -2925,7 +2925,7 @@ checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "unicode-xid", ] @@ -2988,7 +2988,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3159,7 +3159,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3180,7 +3180,7 @@ dependencies = [ "darling 0.21.3", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3219,7 +3219,7 @@ checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -3501,7 +3501,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -5841,7 +5841,7 @@ dependencies = [ "phf_shared 0.11.3", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -5879,7 +5879,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -6074,7 +6074,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -6133,7 +6133,7 @@ dependencies = [ "pulldown-cmark", "pulldown-cmark-to-cmark", "regex", - "syn 2.0.109", + "syn 2.0.110", "tempfile", ] @@ -6147,7 +6147,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -6160,7 +6160,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -6246,9 +6246,9 @@ dependencies = [ [[package]] name = "pulldown-cmark-to-cmark" -version = "21.0.0" +version = "21.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5b6a0769a491a08b31ea5c62494a8f144ee0987d86d670a8af4df1e1b7cde75" +checksum = "8246feae3db61428fd0bb94285c690b460e4517d83152377543ca802357785f1" dependencies = [ "pulldown-cmark", ] @@ -6455,7 +6455,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b" dependencies = [ "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -6507,7 +6507,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -6666,7 +6666,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -6753,7 +6753,7 @@ dependencies = [ "quote", "rust-embed-utils", "shellexpand", - "syn 2.0.109", + "syn 2.0.110", "walkdir", ] @@ -6849,7 +6849,6 @@ dependencies = [ "rustfs-zip", "rustls 0.23.35", "s3s", - "scopeguard", "serde", "serde_json", "serde_urlencoded", @@ -6922,6 +6921,7 @@ dependencies = [ "chrono", "const-str", "futures", + "hashbrown 0.16.0", "metrics", "rumqttc", "rustfs-config", @@ -7210,7 +7210,6 @@ dependencies = [ "form_urlencoded", "futures", "hashbrown 0.16.0", - "once_cell", "quick-xml 0.38.3", "rayon", "rumqttc", @@ -7765,7 +7764,7 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -7919,7 +7918,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -7930,7 +7929,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -8015,7 +8014,7 @@ dependencies = [ "darling 0.21.3", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -8040,7 +8039,7 @@ checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -8229,7 +8228,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -8315,7 +8314,7 @@ checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -8409,7 +8408,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -8421,7 +8420,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -8544,9 +8543,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.109" +version = "2.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f17c7e013e88258aa9543dcbe81aca68a667a9ac37cd69c9fbc07858bfe0e2f" +checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea" dependencies = [ "proc-macro2", "quote", @@ -8591,7 +8590,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -8690,7 +8689,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -8701,7 +8700,7 @@ checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "test-case-core", ] @@ -8731,7 +8730,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -8742,7 +8741,7 @@ checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -8898,7 +8897,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -9041,7 +9040,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -9066,7 +9065,7 @@ dependencies = [ "prost-build", "prost-types", "quote", - "syn 2.0.109", + "syn 2.0.110", "tempfile", "tonic-build", ] @@ -9159,7 +9158,7 @@ checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -9401,7 +9400,7 @@ checksum = "d9384a660318abfbd7f8932c34d67e4d1ec511095f95972ddc01e19d7ba8413f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -9557,7 +9556,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "wasm-bindgen-shared", ] @@ -9731,7 +9730,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -9742,7 +9741,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -10012,7 +10011,7 @@ dependencies = [ "darling 0.20.11", "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -10077,7 +10076,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "synstructure 0.13.2", ] @@ -10098,7 +10097,7 @@ checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -10118,7 +10117,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", "synstructure 0.13.2", ] @@ -10139,7 +10138,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] @@ -10172,7 +10171,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.109", + "syn 2.0.110", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ca06dbd6..aa843b9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -123,7 +123,7 @@ tower-http = { version = "0.6.6", features = ["cors"] } # Serialization and Data Formats bytes = { version = "1.10.1", features = ["serde"] } -bytesize = "2.1.0" +bytesize = "2.2.0" byteorder = "1.5.0" flatbuffers = "25.9.23" form_urlencoded = "1.2.2" @@ -226,7 +226,6 @@ rumqttc = { version = "0.25.0" } rust-embed = { version = "8.9.0" } rustc-hash = { version = "2.1.1" } s3s = { version = "0.12.0-rc.3", features = ["minio"] } -scopeguard = "1.2.0" serial_test = "3.2.0" shadow-rs = { version = "1.4.0", default-features = false } siphasher = "1.0.1" @@ -280,7 +279,7 @@ mimalloc = "0.1" [workspace.metadata.cargo-shear] -ignored = ["rustfs", "rustfs-mcp", "tokio-test", "scopeguard"] +ignored = ["rustfs", "rustfs-mcp", "tokio-test"] [profile.release] opt-level = 3 diff --git a/crates/audit/Cargo.toml b/crates/audit/Cargo.toml index ea430d5b..414e05fc 100644 --- a/crates/audit/Cargo.toml +++ b/crates/audit/Cargo.toml @@ -32,6 +32,7 @@ rustfs-ecstore = { workspace = true } chrono = { workspace = true } const-str = { workspace = true } futures = { workspace = true } +hashbrown = { workspace = true } metrics = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/audit/src/entity.rs b/crates/audit/src/entity.rs index 5b8c95a7..33f4ce39 100644 --- a/crates/audit/src/entity.rs +++ b/crates/audit/src/entity.rs @@ -13,18 +13,10 @@ // limitations under the License. use chrono::{DateTime, Utc}; +use hashbrown::HashMap; use rustfs_targets::EventName; use serde::{Deserialize, Serialize}; use serde_json::Value; -use std::collections::HashMap; - -/// Trait for types that can be serialized to JSON and have a timestamp -pub trait LogRecord { - /// Serialize the record to a JSON string - fn to_json(&self) -> String; - /// Get the timestamp of the record - fn get_timestamp(&self) -> chrono::DateTime; -} /// ObjectVersion represents an object version with key and versionId #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] @@ -36,19 +28,12 @@ pub struct ObjectVersion { } impl ObjectVersion { - /// Set the object name (chainable) - pub fn set_object_name(&mut self, name: String) -> &mut Self { - self.object_name = name; - self - } - /// Set the version ID (chainable) - pub fn set_version_id(&mut self, version_id: Option) -> &mut Self { - self.version_id = version_id; - self + pub fn new(object_name: String, version_id: Option) -> Self { + Self { object_name, version_id } } } -/// ApiDetails contains API information for the audit entry +/// `ApiDetails` contains API information for the audit entry. #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct ApiDetails { #[serde(skip_serializing_if = "Option::is_none")] @@ -79,75 +64,86 @@ pub struct ApiDetails { pub time_to_response_in_ns: Option, } -impl ApiDetails { - /// Set API name (chainable) - pub fn set_name(&mut self, name: Option) -> &mut Self { - self.name = name; +/// Builder for `ApiDetails`. +#[derive(Default, Clone)] +pub struct ApiDetailsBuilder(pub ApiDetails); + +impl ApiDetailsBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn name(mut self, name: impl Into) -> Self { + self.0.name = Some(name.into()); self } - /// Set bucket name (chainable) - pub fn set_bucket(&mut self, bucket: Option) -> &mut Self { - self.bucket = bucket; + + pub fn bucket(mut self, bucket: impl Into) -> Self { + self.0.bucket = Some(bucket.into()); self } - /// Set object name (chainable) - pub fn set_object(&mut self, object: Option) -> &mut Self { - self.object = object; + + pub fn object(mut self, object: impl Into) -> Self { + self.0.object = Some(object.into()); self } - /// Set objects list (chainable) - pub fn set_objects(&mut self, objects: Option>) -> &mut Self { - self.objects = objects; + + pub fn objects(mut self, objects: Vec) -> Self { + self.0.objects = Some(objects); self } - /// Set status (chainable) - pub fn set_status(&mut self, status: Option) -> &mut Self { - self.status = status; + + pub fn status(mut self, status: impl Into) -> Self { + self.0.status = Some(status.into()); self } - /// Set status code (chainable) - pub fn set_status_code(&mut self, code: Option) -> &mut Self { - self.status_code = code; + + pub fn status_code(mut self, code: i32) -> Self { + self.0.status_code = Some(code); self } - /// Set input bytes (chainable) - pub fn set_input_bytes(&mut self, bytes: Option) -> &mut Self { - self.input_bytes = bytes; + + pub fn input_bytes(mut self, bytes: i64) -> Self { + self.0.input_bytes = Some(bytes); self } - /// Set output bytes (chainable) - pub fn set_output_bytes(&mut self, bytes: Option) -> &mut Self { - self.output_bytes = bytes; + + pub fn output_bytes(mut self, bytes: i64) -> Self { + self.0.output_bytes = Some(bytes); self } - /// Set header bytes (chainable) - pub fn set_header_bytes(&mut self, bytes: Option) -> &mut Self { - self.header_bytes = bytes; + + pub fn header_bytes(mut self, bytes: i64) -> Self { + self.0.header_bytes = Some(bytes); self } - /// Set time to first byte (chainable) - pub fn set_time_to_first_byte(&mut self, t: Option) -> &mut Self { - self.time_to_first_byte = t; + + pub fn time_to_first_byte(mut self, t: impl Into) -> Self { + self.0.time_to_first_byte = Some(t.into()); self } - /// Set time to first byte in nanoseconds (chainable) - pub fn set_time_to_first_byte_in_ns(&mut self, t: Option) -> &mut Self { - self.time_to_first_byte_in_ns = t; + + pub fn time_to_first_byte_in_ns(mut self, t: impl Into) -> Self { + self.0.time_to_first_byte_in_ns = Some(t.into()); self } - /// Set time to response (chainable) - pub fn set_time_to_response(&mut self, t: Option) -> &mut Self { - self.time_to_response = t; + + pub fn time_to_response(mut self, t: impl Into) -> Self { + self.0.time_to_response = Some(t.into()); self } - /// Set time to response in nanoseconds (chainable) - pub fn set_time_to_response_in_ns(&mut self, t: Option) -> &mut Self { - self.time_to_response_in_ns = t; + + pub fn time_to_response_in_ns(mut self, t: impl Into) -> Self { + self.0.time_to_response_in_ns = Some(t.into()); self } + + pub fn build(self) -> ApiDetails { + self.0 + } } -/// AuditEntry represents an audit log entry +/// `AuditEntry` represents an audit log entry. #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct AuditEntry { pub version: String, @@ -155,6 +151,7 @@ pub struct AuditEntry { pub deployment_id: Option, #[serde(rename = "siteName", skip_serializing_if = "Option::is_none")] pub site_name: Option, + #[serde(with = "chrono::serde::ts_milliseconds")] pub time: DateTime, pub event: EventName, #[serde(rename = "type", skip_serializing_if = "Option::is_none")] @@ -191,200 +188,130 @@ pub struct AuditEntry { pub error: Option, } -impl AuditEntry { - /// Create a new AuditEntry with required fields - #[allow(clippy::too_many_arguments)] - pub fn new( - version: String, - deployment_id: Option, - site_name: Option, - time: DateTime, - event: EventName, - entry_type: Option, - trigger: String, - api: ApiDetails, - ) -> Self { - AuditEntry { - version, - deployment_id, - site_name, - time, +/// Constructor for `AuditEntry`. +pub struct AuditEntryBuilder(AuditEntry); + +impl AuditEntryBuilder { + /// Create a new builder with all required fields. + pub fn new(version: impl Into, event: EventName, trigger: impl Into, api: ApiDetails) -> Self { + Self(AuditEntry { + version: version.into(), + time: Utc::now(), event, - entry_type, - trigger, + trigger: trigger.into(), api, - remote_host: None, - request_id: None, - user_agent: None, - req_path: None, - req_host: None, - req_node: None, - req_claims: None, - req_query: None, - req_header: None, - resp_header: None, - tags: None, - access_key: None, - parent_user: None, - error: None, - } + ..Default::default() + }) } - /// Set version (chainable) - pub fn set_version(&mut self, version: String) -> &mut Self { - self.version = version; - self - } - /// Set deployment ID (chainable) - pub fn set_deployment_id(&mut self, id: Option) -> &mut Self { - self.deployment_id = id; - self - } - /// Set site name (chainable) - pub fn set_site_name(&mut self, name: Option) -> &mut Self { - self.site_name = name; - self - } - /// Set time (chainable) - pub fn set_time(&mut self, time: DateTime) -> &mut Self { - self.time = time; - self - } - /// Set event (chainable) - pub fn set_event(&mut self, event: EventName) -> &mut Self { - self.event = event; - self - } - /// Set entry type (chainable) - pub fn set_entry_type(&mut self, entry_type: Option) -> &mut Self { - self.entry_type = entry_type; - self - } - /// Set trigger (chainable) - pub fn set_trigger(&mut self, trigger: String) -> &mut Self { - self.trigger = trigger; - self - } - /// Set API details (chainable) - pub fn set_api(&mut self, api: ApiDetails) -> &mut Self { - self.api = api; - self - } - /// Set remote host (chainable) - pub fn set_remote_host(&mut self, host: Option) -> &mut Self { - self.remote_host = host; - self - } - /// Set request ID (chainable) - pub fn set_request_id(&mut self, id: Option) -> &mut Self { - self.request_id = id; - self - } - /// Set user agent (chainable) - pub fn set_user_agent(&mut self, agent: Option) -> &mut Self { - self.user_agent = agent; - self - } - /// Set request path (chainable) - pub fn set_req_path(&mut self, path: Option) -> &mut Self { - self.req_path = path; - self - } - /// Set request host (chainable) - pub fn set_req_host(&mut self, host: Option) -> &mut Self { - self.req_host = host; - self - } - /// Set request node (chainable) - pub fn set_req_node(&mut self, node: Option) -> &mut Self { - self.req_node = node; - self - } - /// Set request claims (chainable) - pub fn set_req_claims(&mut self, claims: Option>) -> &mut Self { - self.req_claims = claims; - self - } - /// Set request query (chainable) - pub fn set_req_query(&mut self, query: Option>) -> &mut Self { - self.req_query = query; - self - } - /// Set request header (chainable) - pub fn set_req_header(&mut self, header: Option>) -> &mut Self { - self.req_header = header; - self - } - /// Set response header (chainable) - pub fn set_resp_header(&mut self, header: Option>) -> &mut Self { - self.resp_header = header; - self - } - /// Set tags (chainable) - pub fn set_tags(&mut self, tags: Option>) -> &mut Self { - self.tags = tags; - self - } - /// Set access key (chainable) - pub fn set_access_key(&mut self, key: Option) -> &mut Self { - self.access_key = key; - self - } - /// Set parent user (chainable) - pub fn set_parent_user(&mut self, user: Option) -> &mut Self { - self.parent_user = user; - self - } - /// Set error message (chainable) - pub fn set_error(&mut self, error: Option) -> &mut Self { - self.error = error; + // event + pub fn version(mut self, version: impl Into) -> Self { + self.0.version = version.into(); self } - /// Build AuditEntry from context or parameters (example, can be extended) - pub fn from_context( - version: String, - deployment_id: Option, - time: DateTime, - event: EventName, - trigger: String, - api: ApiDetails, - tags: Option>, - ) -> Self { - AuditEntry { - version, - deployment_id, - site_name: None, - time, - event, - entry_type: None, - trigger, - api, - remote_host: None, - request_id: None, - user_agent: None, - req_path: None, - req_host: None, - req_node: None, - req_claims: None, - req_query: None, - req_header: None, - resp_header: None, - tags, - access_key: None, - parent_user: None, - error: None, - } - } -} - -impl LogRecord for AuditEntry { - /// Serialize AuditEntry to JSON string - fn to_json(&self) -> String { - serde_json::to_string(self).unwrap_or_else(|_| String::from("{}")) - } - /// Get the timestamp of the audit entry - fn get_timestamp(&self) -> DateTime { - self.time + pub fn event(mut self, event: EventName) -> Self { + self.0.event = event; + self + } + + pub fn api(mut self, api_details: ApiDetails) -> Self { + self.0.api = api_details; + self + } + + pub fn deployment_id(mut self, id: impl Into) -> Self { + self.0.deployment_id = Some(id.into()); + self + } + + pub fn site_name(mut self, name: impl Into) -> Self { + self.0.site_name = Some(name.into()); + self + } + + pub fn time(mut self, time: DateTime) -> Self { + self.0.time = time; + self + } + + pub fn entry_type(mut self, entry_type: impl Into) -> Self { + self.0.entry_type = Some(entry_type.into()); + self + } + + pub fn remote_host(mut self, host: impl Into) -> Self { + self.0.remote_host = Some(host.into()); + self + } + + pub fn request_id(mut self, id: impl Into) -> Self { + self.0.request_id = Some(id.into()); + self + } + + pub fn user_agent(mut self, agent: impl Into) -> Self { + self.0.user_agent = Some(agent.into()); + self + } + + pub fn req_path(mut self, path: impl Into) -> Self { + self.0.req_path = Some(path.into()); + self + } + + pub fn req_host(mut self, host: impl Into) -> Self { + self.0.req_host = Some(host.into()); + self + } + + pub fn req_node(mut self, node: impl Into) -> Self { + self.0.req_node = Some(node.into()); + self + } + + pub fn req_claims(mut self, claims: HashMap) -> Self { + self.0.req_claims = Some(claims); + self + } + + pub fn req_query(mut self, query: HashMap) -> Self { + self.0.req_query = Some(query); + self + } + + pub fn req_header(mut self, header: HashMap) -> Self { + self.0.req_header = Some(header); + self + } + + pub fn resp_header(mut self, header: HashMap) -> Self { + self.0.resp_header = Some(header); + self + } + + pub fn tags(mut self, tags: HashMap) -> Self { + self.0.tags = Some(tags); + self + } + + pub fn access_key(mut self, key: impl Into) -> Self { + self.0.access_key = Some(key.into()); + self + } + + pub fn parent_user(mut self, user: impl Into) -> Self { + self.0.parent_user = Some(user.into()); + self + } + + pub fn error(mut self, error: impl Into) -> Self { + self.0.error = Some(error.into()); + self + } + + /// Construct the final `AuditEntry`. + pub fn build(self) -> AuditEntry { + self.0 } } diff --git a/crates/audit/src/error.rs b/crates/audit/src/error.rs index 8f85f047..3734437e 100644 --- a/crates/audit/src/error.rs +++ b/crates/audit/src/error.rs @@ -21,7 +21,7 @@ pub type AuditResult = Result; #[derive(Error, Debug)] pub enum AuditError { #[error("Configuration error: {0}")] - Configuration(String), + Configuration(String, #[source] Option>), #[error("config not loaded")] ConfigNotLoaded, @@ -35,11 +35,14 @@ pub enum AuditError { #[error("System already initialized")] AlreadyInitialized, + #[error("Storage not available: {0}")] + StorageNotAvailable(String), + #[error("Failed to save configuration: {0}")] - SaveConfig(String), + SaveConfig(#[source] Box), #[error("Failed to load configuration: {0}")] - LoadConfig(String), + LoadConfig(#[source] Box), #[error("Serialization error: {0}")] Serialization(#[from] serde_json::Error), @@ -49,7 +52,4 @@ pub enum AuditError { #[error("Join error: {0}")] Join(#[from] tokio::task::JoinError), - - #[error("Server storage not initialized: {0}")] - ServerNotInitialized(String), } diff --git a/crates/audit/src/global.rs b/crates/audit/src/global.rs index e9b3176d..b95c510e 100644 --- a/crates/audit/src/global.rs +++ b/crates/audit/src/global.rs @@ -15,7 +15,7 @@ use crate::{AuditEntry, AuditResult, AuditSystem}; use rustfs_ecstore::config::Config; use std::sync::{Arc, OnceLock}; -use tracing::{error, warn}; +use tracing::{error, trace, warn}; /// Global audit system instance static AUDIT_SYSTEM: OnceLock> = OnceLock::new(); @@ -30,6 +30,19 @@ pub fn audit_system() -> Option> { AUDIT_SYSTEM.get().cloned() } +/// A helper macro for executing closures if the global audit system is initialized. +/// If not initialized, log a warning and return `Ok(())`. +macro_rules! with_audit_system { + ($async_closure:expr) => { + if let Some(system) = audit_system() { + (async move { $async_closure(system).await }).await + } else { + warn!("Audit system not initialized, operation skipped."); + Ok(()) + } + }; +} + /// Start the global audit system with configuration pub async fn start_audit_system(config: Config) -> AuditResult<()> { let system = init_audit_system(); @@ -38,32 +51,17 @@ pub async fn start_audit_system(config: Config) -> AuditResult<()> { /// Stop the global audit system pub async fn stop_audit_system() -> AuditResult<()> { - if let Some(system) = audit_system() { - system.close().await - } else { - warn!("Audit system not initialized, cannot stop"); - Ok(()) - } + with_audit_system!(|system: Arc| async move { system.close().await }) } /// Pause the global audit system pub async fn pause_audit_system() -> AuditResult<()> { - if let Some(system) = audit_system() { - system.pause().await - } else { - warn!("Audit system not initialized, cannot pause"); - Ok(()) - } + with_audit_system!(|system: Arc| async move { system.pause().await }) } /// Resume the global audit system pub async fn resume_audit_system() -> AuditResult<()> { - if let Some(system) = audit_system() { - system.resume().await - } else { - warn!("Audit system not initialized, cannot resume"); - Ok(()) - } + with_audit_system!(|system: Arc| async move { system.resume().await }) } /// Dispatch an audit log entry to all targets @@ -72,23 +70,23 @@ pub async fn dispatch_audit_log(entry: Arc) -> AuditResult<()> { if system.is_running().await { system.dispatch(entry).await } else { - // System not running, just drop the log entry without error + // The system is initialized but not running (for example, it is suspended). Silently discard log entries based on original logic. + // For debugging purposes, it can be useful to add a trace log here. + trace!("Audit system is not running, dropping audit entry."); Ok(()) } } else { - // System not initialized, just drop the log entry without error + // The system is not initialized at all. This is a more important state. + // It might be better to return an error or log a warning. + warn!("Audit system not initialized, dropping audit entry."); + // If this should be a hard failure, you can return Err(AuditError::NotInitialized("...")) Ok(()) } } /// Reload the global audit system configuration pub async fn reload_audit_config(config: Config) -> AuditResult<()> { - if let Some(system) = audit_system() { - system.reload_config(config).await - } else { - warn!("Audit system not initialized, cannot reload config"); - Ok(()) - } + with_audit_system!(|system: Arc| async move { system.reload_config(config).await }) } /// Check if the global audit system is running diff --git a/crates/audit/src/lib.rs b/crates/audit/src/lib.rs index c724e43f..8207bc23 100644 --- a/crates/audit/src/lib.rs +++ b/crates/audit/src/lib.rs @@ -25,7 +25,7 @@ pub mod observability; pub mod registry; pub mod system; -pub use entity::{ApiDetails, AuditEntry, LogRecord, ObjectVersion}; +pub use entity::{ApiDetails, AuditEntry, ObjectVersion}; pub use error::{AuditError, AuditResult}; pub use global::*; pub use observability::{AuditMetrics, AuditMetricsReport, PerformanceValidation}; diff --git a/crates/audit/src/registry.rs b/crates/audit/src/registry.rs index 59126c67..30aa325a 100644 --- a/crates/audit/src/registry.rs +++ b/crates/audit/src/registry.rs @@ -14,6 +14,7 @@ use crate::{AuditEntry, AuditError, AuditResult}; use futures::{StreamExt, stream::FuturesUnordered}; +use hashbrown::{HashMap, HashSet}; use rustfs_config::{ DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_BATCH_SIZE, @@ -25,7 +26,6 @@ use rustfs_targets::{ Target, TargetError, target::{ChannelTargetType, TargetType, mqtt::MQTTArgs, webhook::WebhookArgs}, }; -use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; use tracing::{debug, error, info, warn}; @@ -251,7 +251,7 @@ impl AuditRegistry { sections.extend(successes_by_section.keys().cloned()); for section_name in sections { - let mut section_map: HashMap = HashMap::new(); + let mut section_map: std::collections::HashMap = std::collections::HashMap::new(); // The default entry (if present) is written back to `_` if let Some(default_cfg) = section_defaults.get(§ion_name) { @@ -277,7 +277,7 @@ impl AuditRegistry { // 7. Save the new configuration to the system let Some(store) = rustfs_ecstore::new_object_layer_fn() else { - return Err(AuditError::ServerNotInitialized( + return Err(AuditError::StorageNotAvailable( "Failed to save target configuration: server storage not initialized".to_string(), )); }; @@ -286,7 +286,7 @@ impl AuditRegistry { Ok(_) => info!("New audit configuration saved to system successfully"), Err(e) => { error!(error = %e, "Failed to save new audit configuration"); - return Err(AuditError::SaveConfig(e.to_string())); + return Err(AuditError::SaveConfig(Box::new(e))); } } } diff --git a/crates/audit/src/system.rs b/crates/audit/src/system.rs index a45b7eea..bcbd37e7 100644 --- a/crates/audit/src/system.rs +++ b/crates/audit/src/system.rs @@ -146,7 +146,7 @@ impl AuditSystem { warn!("Audit system is already paused"); Ok(()) } - _ => Err(AuditError::Configuration("Cannot pause audit system in current state".to_string())), + _ => Err(AuditError::Configuration("Cannot pause audit system in current state".to_string(), None)), } } @@ -164,7 +164,7 @@ impl AuditSystem { warn!("Audit system is already running"); Ok(()) } - _ => Err(AuditError::Configuration("Cannot resume audit system in current state".to_string())), + _ => Err(AuditError::Configuration("Cannot resume audit system in current state".to_string(), None)), } } @@ -460,7 +460,7 @@ impl AuditSystem { info!(target_id = %target_id, "Target enabled"); Ok(()) } else { - Err(AuditError::Configuration(format!("Target not found: {target_id}"))) + Err(AuditError::Configuration(format!("Target not found: {target_id}"), None)) } } @@ -473,7 +473,7 @@ impl AuditSystem { info!(target_id = %target_id, "Target disabled"); Ok(()) } else { - Err(AuditError::Configuration(format!("Target not found: {target_id}"))) + Err(AuditError::Configuration(format!("Target not found: {target_id}"), None)) } } @@ -487,7 +487,7 @@ impl AuditSystem { info!(target_id = %target_id, "Target removed"); Ok(()) } else { - Err(AuditError::Configuration(format!("Target not found: {target_id}"))) + Err(AuditError::Configuration(format!("Target not found: {target_id}"), None)) } } diff --git a/crates/audit/tests/integration_test.rs b/crates/audit/tests/integration_test.rs index c0e2384a..d889c84e 100644 --- a/crates/audit/tests/integration_test.rs +++ b/crates/audit/tests/integration_test.rs @@ -52,7 +52,7 @@ async fn test_config_parsing_webhook() { // We expect this to fail due to server storage not being initialized // but the parsing should work correctly match result { - Err(AuditError::ServerNotInitialized(_)) => { + Err(AuditError::StorageNotAvailable(_)) => { // This is expected in test environment } Err(e) => { diff --git a/crates/audit/tests/performance_test.rs b/crates/audit/tests/performance_test.rs index 32dc87e0..4080c47b 100644 --- a/crates/audit/tests/performance_test.rs +++ b/crates/audit/tests/performance_test.rs @@ -73,7 +73,7 @@ async fn test_concurrent_target_creation() { // Verify it fails with expected error (server not initialized) match result { - Err(AuditError::ServerNotInitialized(_)) => { + Err(AuditError::StorageNotAvailable(_)) => { // Expected in test environment } Err(e) => { @@ -103,17 +103,17 @@ async fn test_audit_log_dispatch_performance() { use std::collections::HashMap; let id = 1; - let mut req_header = HashMap::new(); + let mut req_header = hashbrown::HashMap::new(); req_header.insert("authorization".to_string(), format!("Bearer test-token-{id}")); req_header.insert("content-type".to_string(), "application/octet-stream".to_string()); - let mut resp_header = HashMap::new(); + let mut resp_header = hashbrown::HashMap::new(); resp_header.insert("x-response".to_string(), "ok".to_string()); - let mut tags = HashMap::new(); + let mut tags = hashbrown::HashMap::new(); tags.insert(format!("tag-{id}"), json!("sample")); - let mut req_query = HashMap::new(); + let mut req_query = hashbrown::HashMap::new(); req_query.insert("id".to_string(), id.to_string()); let api_details = ApiDetails { diff --git a/crates/audit/tests/system_integration_test.rs b/crates/audit/tests/system_integration_test.rs index 9948d898..267a9fc1 100644 --- a/crates/audit/tests/system_integration_test.rs +++ b/crates/audit/tests/system_integration_test.rs @@ -35,7 +35,7 @@ async fn test_complete_audit_system_lifecycle() { // Should fail in test environment but state handling should work match start_result { - Err(AuditError::ServerNotInitialized(_)) => { + Err(AuditError::StorageNotAvailable(_)) => { // Expected in test environment assert_eq!(system.get_state().await, system::AuditSystemState::Stopped); } @@ -168,7 +168,7 @@ async fn test_config_parsing_with_multiple_instances() { // Should fail due to server storage not initialized, but parsing should work match result { - Err(AuditError::ServerNotInitialized(_)) => { + Err(AuditError::StorageNotAvailable(_)) => { // Expected - parsing worked but save failed } Err(e) => { @@ -182,48 +182,6 @@ async fn test_config_parsing_with_multiple_instances() { } } -// #[tokio::test] -// async fn test_environment_variable_precedence() { -// // Test that environment variables override config file settings -// // This test validates the ENV > file instance > file default precedence -// // Set some test environment variables -// std::env::set_var("RUSTFS_AUDIT_WEBHOOK_ENABLE_TEST", "on"); -// std::env::set_var("RUSTFS_AUDIT_WEBHOOK_ENDPOINT_TEST", "http://env.example.com/audit"); -// std::env::set_var("RUSTFS_AUDIT_WEBHOOK_AUTH_TOKEN_TEST", "env-token"); -// let mut registry = AuditRegistry::new(); -// -// // Create config that should be overridden by env vars -// let mut config = Config(HashMap::new()); -// let mut webhook_section = HashMap::new(); -// -// let mut test_kvs = KVS::new(); -// test_kvs.insert("enable".to_string(), "off".to_string()); // Should be overridden -// test_kvs.insert("endpoint".to_string(), "http://file.example.com/audit".to_string()); // Should be overridden -// test_kvs.insert("batch_size".to_string(), "10".to_string()); // Should remain from file -// webhook_section.insert("test".to_string(), test_kvs); -// -// config.0.insert("audit_webhook".to_string(), webhook_section); -// -// // Try to create targets - should use env vars for endpoint/enable, file for batch_size -// let result = registry.create_targets_from_config(&config).await; -// // Clean up env vars -// std::env::remove_var("RUSTFS_AUDIT_WEBHOOK_ENABLE_TEST"); -// std::env::remove_var("RUSTFS_AUDIT_WEBHOOK_ENDPOINT_TEST"); -// std::env::remove_var("RUSTFS_AUDIT_WEBHOOK_AUTH_TOKEN_TEST"); -// // Should fail due to server storage, but precedence logic should work -// match result { -// Err(AuditError::ServerNotInitialized(_)) => { -// // Expected - precedence parsing worked but save failed -// } -// Err(e) => { -// println!("Environment precedence test error: {}", e); -// } -// Ok(_) => { -// println!("Unexpected success in environment precedence test"); -// } -// } -// } - #[test] fn test_target_type_validation() { use rustfs_targets::target::TargetType; @@ -315,19 +273,18 @@ fn create_sample_audit_entry_with_id(id: u32) -> AuditEntry { use chrono::Utc; use rustfs_targets::EventName; use serde_json::json; - use std::collections::HashMap; - let mut req_header = HashMap::new(); + let mut req_header = hashbrown::HashMap::new(); req_header.insert("authorization".to_string(), format!("Bearer test-token-{id}")); req_header.insert("content-type".to_string(), "application/octet-stream".to_string()); - let mut resp_header = HashMap::new(); + let mut resp_header = hashbrown::HashMap::new(); resp_header.insert("x-response".to_string(), "ok".to_string()); - let mut tags = HashMap::new(); + let mut tags = hashbrown::HashMap::new(); tags.insert(format!("tag-{id}"), json!("sample")); - let mut req_query = HashMap::new(); + let mut req_query = hashbrown::HashMap::new(); req_query.insert("id".to_string(), id.to_string()); let api_details = ApiDetails { diff --git a/crates/notify/Cargo.toml b/crates/notify/Cargo.toml index 5f2ca8ac..bdccac6c 100644 --- a/crates/notify/Cargo.toml +++ b/crates/notify/Cargo.toml @@ -35,7 +35,6 @@ chrono = { workspace = true, features = ["serde"] } futures = { workspace = true } form_urlencoded = { workspace = true } hashbrown = { workspace = true } -once_cell = { workspace = true } quick-xml = { workspace = true, features = ["serialize", "async-tokio"] } rayon = { workspace = true } rumqttc = { workspace = true } diff --git a/crates/notify/src/error.rs b/crates/notify/src/error.rs index b06c6451..317ef7f9 100644 --- a/crates/notify/src/error.rs +++ b/crates/notify/src/error.rs @@ -12,11 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use rustfs_targets::TargetError; -use rustfs_targets::arn::TargetID; +use rustfs_targets::{TargetError, arn::TargetID}; use std::io; use thiserror::Error; +/// Errors related to the notification system's lifecycle. +#[derive(Debug, Error)] +pub enum LifecycleError { + /// Error indicating the system has already been initialized. + #[error("System has already been initialized")] + AlreadyInitialized, + + /// Error indicating the system has not been initialized yet. + #[error("System has not been initialized")] + NotInitialized, +} + /// Error types for the notification system #[derive(Debug, Error)] pub enum NotificationError { @@ -38,11 +49,8 @@ pub enum NotificationError { #[error("Rule configuration error: {0}")] RuleConfiguration(String), - #[error("System initialization error: {0}")] - Initialization(String), - - #[error("Notification system has already been initialized")] - AlreadyInitialized, + #[error("System lifecycle error: {0}")] + Lifecycle(#[from] LifecycleError), #[error("I/O error: {0}")] Io(io::Error), @@ -56,6 +64,9 @@ pub enum NotificationError { #[error("Target '{0}' not found")] TargetNotFound(TargetID), - #[error("Server not initialized")] - ServerNotInitialized, + #[error("System initialization error: {0}")] + Initialization(String), + + #[error("Storage not available: {0}")] + StorageNotAvailable(String), } diff --git a/crates/notify/src/event.rs b/crates/notify/src/event.rs index 786771f9..97958e30 100644 --- a/crates/notify/src/event.rs +++ b/crates/notify/src/event.rs @@ -276,3 +276,120 @@ impl EventArgs { self.req_params.contains_key("x-rustfs-source-replication-request") } } + +/// Builder for [`EventArgs`]. +/// +/// This builder provides a fluent API to construct an `EventArgs` instance, +/// ensuring that all required fields are provided. +/// +/// # Example +/// +/// ```ignore +/// let args = EventArgsBuilder::new( +/// EventName::ObjectCreatedPut, +/// "my-bucket", +/// object_info, +/// ) +/// .host("localhost:9000") +/// .user_agent("my-app/1.0") +/// .build(); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct EventArgsBuilder { + event_name: EventName, + bucket_name: String, + object: rustfs_ecstore::store_api::ObjectInfo, + req_params: HashMap, + resp_elements: HashMap, + version_id: String, + host: String, + user_agent: String, +} + +impl EventArgsBuilder { + /// Creates a new builder with the required fields. + pub fn new(event_name: EventName, bucket_name: impl Into, object: rustfs_ecstore::store_api::ObjectInfo) -> Self { + Self { + event_name, + bucket_name: bucket_name.into(), + object, + ..Default::default() + } + } + + /// Sets the event name. + pub fn event_name(mut self, event_name: EventName) -> Self { + self.event_name = event_name; + self + } + + /// Sets the bucket name. + pub fn bucket_name(mut self, bucket_name: impl Into) -> Self { + self.bucket_name = bucket_name.into(); + self + } + + /// Sets the object information. + pub fn object(mut self, object: rustfs_ecstore::store_api::ObjectInfo) -> Self { + self.object = object; + self + } + + /// Sets the request parameters. + pub fn req_params(mut self, req_params: HashMap) -> Self { + self.req_params = req_params; + self + } + + /// Adds a single request parameter. + pub fn req_param(mut self, key: impl Into, value: impl Into) -> Self { + self.req_params.insert(key.into(), value.into()); + self + } + + /// Sets the response elements. + pub fn resp_elements(mut self, resp_elements: HashMap) -> Self { + self.resp_elements = resp_elements; + self + } + + /// Adds a single response element. + pub fn resp_element(mut self, key: impl Into, value: impl Into) -> Self { + self.resp_elements.insert(key.into(), value.into()); + self + } + + /// Sets the version ID. + pub fn version_id(mut self, version_id: impl Into) -> Self { + self.version_id = version_id.into(); + self + } + + /// Sets the host. + pub fn host(mut self, host: impl Into) -> Self { + self.host = host.into(); + self + } + + /// Sets the user agent. + pub fn user_agent(mut self, user_agent: impl Into) -> Self { + self.user_agent = user_agent.into(); + self + } + + /// Builds the final `EventArgs` instance. + /// + /// This method consumes the builder and returns the constructed `EventArgs`. + pub fn build(self) -> EventArgs { + EventArgs { + event_name: self.event_name, + bucket_name: self.bucket_name, + object: self.object, + req_params: self.req_params, + resp_elements: self.resp_elements, + version_id: self.version_id, + host: self.host, + user_agent: self.user_agent, + } + } +} diff --git a/crates/notify/src/global.rs b/crates/notify/src/global.rs index 99e759db..58c25277 100644 --- a/crates/notify/src/global.rs +++ b/crates/notify/src/global.rs @@ -12,17 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{BucketNotificationConfig, Event, EventArgs, NotificationError, NotificationSystem}; -use once_cell::sync::Lazy; +use crate::{BucketNotificationConfig, Event, EventArgs, LifecycleError, NotificationError, NotificationSystem}; use rustfs_ecstore::config::Config; -use rustfs_targets::EventName; -use rustfs_targets::arn::TargetID; +use rustfs_targets::{EventName, arn::TargetID}; use std::sync::{Arc, OnceLock}; -use tracing::{error, instrument}; +use tracing::error; static NOTIFICATION_SYSTEM: OnceLock> = OnceLock::new(); -// Create a globally unique Notifier instance -static GLOBAL_NOTIFIER: Lazy = Lazy::new(|| Notifier {}); /// Initialize the global notification system with the given configuration. /// This function should only be called once throughout the application life cycle. @@ -34,7 +30,7 @@ pub async fn initialize(config: Config) -> Result<(), NotificationError> { match NOTIFICATION_SYSTEM.set(Arc::new(system)) { Ok(_) => Ok(()), - Err(_) => Err(NotificationError::AlreadyInitialized), + Err(_) => Err(NotificationError::Lifecycle(LifecycleError::AlreadyInitialized)), } } @@ -49,14 +45,11 @@ pub fn is_notification_system_initialized() -> bool { NOTIFICATION_SYSTEM.get().is_some() } -/// Returns a reference to the global Notifier instance. -pub fn notifier_instance() -> &'static Notifier { - &GLOBAL_NOTIFIER -} +/// A module providing the public API for event notification. +pub mod notifier_global { + use super::*; + use tracing::instrument; -pub struct Notifier {} - -impl Notifier { /// Notify an event asynchronously. /// This is the only entry point for all event notifications in the system. /// # Parameter @@ -67,8 +60,8 @@ impl Notifier { /// /// # Using /// This function is used to notify events in the system, such as object creation, deletion, or updates. - #[instrument(skip(self, args))] - pub async fn notify(&self, args: EventArgs) { + #[instrument(skip(args))] + pub async fn notify(args: EventArgs) { // Dependency injection or service positioning mode obtain NotificationSystem instance let notification_sys = match notification_system() { // If the notification system itself cannot be retrieved, it will be returned directly @@ -110,7 +103,6 @@ impl Notifier { /// # Using /// This function allows you to dynamically add notification rules for a specific bucket. pub async fn add_bucket_notification_rule( - &self, bucket_name: &str, region: &str, event_names: &[EventName], @@ -137,7 +129,7 @@ impl Notifier { // Get global NotificationSystem let notification_sys = match notification_system() { Some(sys) => sys, - None => return Err(NotificationError::ServerNotInitialized), + None => return Err(NotificationError::Lifecycle(LifecycleError::NotInitialized)), }; // Loading configuration @@ -159,7 +151,6 @@ impl Notifier { /// # Using /// Supports notification rules for adding multiple event types, prefixes, suffixes, and targets to the same bucket in batches. pub async fn add_event_specific_rules( - &self, bucket_name: &str, region: &str, event_rules: &[(Vec, String, String, Vec)], @@ -176,10 +167,7 @@ impl Notifier { } // Get global NotificationSystem instance - let notification_sys = match notification_system() { - Some(sys) => sys, - None => return Err(NotificationError::ServerNotInitialized), - }; + let notification_sys = notification_system().ok_or(NotificationError::Lifecycle(LifecycleError::NotInitialized))?; // Loading configuration notification_sys @@ -196,12 +184,9 @@ impl Notifier { /// This function allows you to clear all notification rules for a specific bucket. /// This is useful when you want to reset the notification configuration for a bucket. /// - pub async fn clear_bucket_notification_rules(&self, bucket_name: &str) -> Result<(), NotificationError> { + pub async fn clear_bucket_notification_rules(bucket_name: &str) -> Result<(), NotificationError> { // Get global NotificationSystem instance - let notification_sys = match notification_system() { - Some(sys) => sys, - None => return Err(NotificationError::ServerNotInitialized), - }; + let notification_sys = notification_system().ok_or(NotificationError::Lifecycle(LifecycleError::NotInitialized))?; // Clear configuration notification_sys.remove_bucket_notification_config(bucket_name).await; diff --git a/crates/notify/src/integration.rs b/crates/notify/src/integration.rs index d4b82336..ef661706 100644 --- a/crates/notify/src/integration.rs +++ b/crates/notify/src/integration.rs @@ -199,7 +199,9 @@ impl NotificationSystem { F: FnMut(&mut Config) -> bool, // The closure returns a boolean value indicating whether the configuration has been changed { let Some(store) = rustfs_ecstore::global::new_object_layer_fn() else { - return Err(NotificationError::ServerNotInitialized); + return Err(NotificationError::StorageNotAvailable( + "Failed to save target configuration: server storage not initialized".to_string(), + )); }; let mut new_config = rustfs_ecstore::config::com::read_config_without_migrate(store.clone()) diff --git a/crates/notify/src/lib.rs b/crates/notify/src/lib.rs index 5da8d58a..cc514dbe 100644 --- a/crates/notify/src/lib.rs +++ b/crates/notify/src/lib.rs @@ -18,18 +18,18 @@ //! It supports sending events to various targets //! (like Webhook and MQTT) and includes features like event persistence and retry on failure. -pub mod error; -pub mod event; +mod error; +mod event; pub mod factory; -pub mod global; +mod global; pub mod integration; pub mod notifier; pub mod registry; pub mod rules; pub mod stream; -// Re-exports -pub use error::NotificationError; -pub use event::{Event, EventArgs}; -pub use global::{initialize, is_notification_system_initialized, notification_system}; + +pub use error::{LifecycleError, NotificationError}; +pub use event::{Event, EventArgs, EventArgsBuilder}; +pub use global::{initialize, is_notification_system_initialized, notification_system, notifier_global}; pub use integration::NotificationSystem; pub use rules::BucketNotificationConfig; diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 62dceacc..39e7abc6 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -112,7 +112,6 @@ mime_guess = { workspace = true } pin-project-lite.workspace = true rust-embed = { workspace = true, features = ["interpolate-folder-path"] } s3s.workspace = true -scopeguard.workspace = true shadow-rs = { workspace = true, features = ["build", "metadata"] } sysinfo = { workspace = true, features = ["multithread"] } thiserror = { workspace = true } diff --git a/rustfs/src/admin/handlers/event.rs b/rustfs/src/admin/handlers/event.rs index c7887b0d..8aabbf5f 100644 --- a/rustfs/src/admin/handlers/event.rs +++ b/rustfs/src/admin/handlers/event.rs @@ -134,7 +134,7 @@ impl Operation for NotificationTarget { check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; // 3. Get notification system instance - let Some(ns) = rustfs_notify::global::notification_system() else { + let Some(ns) = rustfs_notify::notification_system() else { return Err(s3_error!(InternalError, "notification system not initialized")); }; @@ -300,7 +300,7 @@ impl Operation for ListNotificationTargets { check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; // 2. Get notification system instance - let Some(ns) = rustfs_notify::global::notification_system() else { + let Some(ns) = rustfs_notify::notification_system() else { return Err(s3_error!(InternalError, "notification system not initialized")); }; @@ -351,7 +351,7 @@ impl Operation for ListTargetsArns { check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; // 2. Get notification system instance - let Some(ns) = rustfs_notify::global::notification_system() else { + let Some(ns) = rustfs_notify::notification_system() else { return Err(s3_error!(InternalError, "notification system not initialized")); }; @@ -401,7 +401,7 @@ impl Operation for RemoveNotificationTarget { check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?; // 3. Get notification system instance - let Some(ns) = rustfs_notify::global::notification_system() else { + let Some(ns) = rustfs_notify::notification_system() else { return Err(s3_error!(InternalError, "notification system not initialized")); }; diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index edcc61a9..989f40d0 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -58,7 +58,7 @@ use rustfs_ecstore::{ update_erasure_type, }; use rustfs_iam::init_iam_sys; -use rustfs_notify::global::notifier_instance; +use rustfs_notify::notifier_global; use rustfs_obs::{init_obs, set_global_guard}; use rustfs_targets::arn::TargetID; use rustfs_utils::net::parse_and_resolve_address; @@ -517,8 +517,7 @@ async fn add_bucket_notification_configuration(buckets: Vec) { process_topic_configurations(&mut event_rules, cfg.topic_configurations.clone(), TargetID::from_str); process_lambda_configurations(&mut event_rules, cfg.lambda_function_configurations.clone(), TargetID::from_str); - if let Err(e) = notifier_instance() - .add_event_specific_rules(bucket, region, &event_rules) + if let Err(e) = notifier_global::add_event_specific_rules(bucket, region, &event_rules) .await .map_err(|e| s3_error!(InternalError, "Failed to add rules: {e}")) { diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 4a631536..4942308a 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -14,6 +14,8 @@ use crate::auth::get_condition_values; use crate::error::ApiError; +use crate::storage::entity; +use crate::storage::helper::OperationHelper; use crate::storage::options::{filter_object_metadata, get_content_sha256}; use crate::storage::{ access::{ReqInfo, authorize_request}, @@ -84,7 +86,7 @@ use rustfs_kms::{ service_manager::get_global_encryption_service, types::{EncryptionMetadata, ObjectEncryptionContext}, }; -use rustfs_notify::global::notifier_instance; +use rustfs_notify::{EventArgsBuilder, notifier_global}; use rustfs_policy::{ auth, policy::{ @@ -102,11 +104,10 @@ use rustfs_targets::{ EventName, arn::{TargetID, TargetIDError}, }; -use rustfs_utils::http::{AMZ_CHECKSUM_MODE, AMZ_CHECKSUM_TYPE}; use rustfs_utils::{ - CompressionAlgorithm, + CompressionAlgorithm, extract_req_params_header, extract_resp_elements, get_request_host, get_request_user_agent, http::{ - AMZ_BUCKET_REPLICATION_STATUS, + AMZ_BUCKET_REPLICATION_STATUS, AMZ_CHECKSUM_MODE, AMZ_CHECKSUM_TYPE, headers::{ AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING, AMZ_RESTORE_EXPIRY_DAYS, AMZ_RESTORE_REQUEST_DATE, RESERVED_METADATA_PREFIX_LOWER, @@ -353,6 +354,7 @@ impl FS { } async fn put_object_extract(&self, req: S3Request) -> S3Result> { + let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:PutObject").suppress_event(); let input = req.input; let PutObjectInput { @@ -495,20 +497,20 @@ impl FS { ..Default::default() }; - let event_args = rustfs_notify::event::EventArgs { + let event_args = rustfs_notify::EventArgs { event_name: EventName::ObjectCreatedPut, bucket_name: bucket.clone(), object: _obj_info.clone(), - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), + req_params: extract_req_params_header(&req.headers), + resp_elements: extract_resp_elements(&S3Response::new(output.clone())), version_id: version_id.clone(), - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), + host: get_request_host(&req.headers), + user_agent: get_request_user_agent(&req.headers), }; // Asynchronous call will not block the response of the current request tokio::spawn(async move { - notifier_instance().notify(event_args).await; + notifier_global::notify(event_args).await; }); } } @@ -575,7 +577,9 @@ impl FS { checksum_crc64nvme, ..Default::default() }; - Ok(S3Response::new(output)) + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } } @@ -602,6 +606,7 @@ impl S3 for FS { fields(start_time=?time::OffsetDateTime::now_utc()) )] async fn create_bucket(&self, req: S3Request) -> S3Result> { + let helper = OperationHelper::new(&req, EventName::BucketCreated, "s3:CreateBucket"); let CreateBucketInput { bucket, object_lock_enabled_for_bucket, @@ -628,28 +633,15 @@ impl S3 for FS { let output = CreateBucketOutput::default(); - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::BucketCreated, - bucket_name: bucket.clone(), - object: ObjectInfo { ..Default::default() }, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), - version_id: String::new(), - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; - - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(output)) + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } /// Copy an object from one location to another #[instrument(level = "debug", skip(self, req))] async fn copy_object(&self, req: S3Request) -> S3Result> { + let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedCopy, "s3:CopyObject"); let CopyObjectInput { copy_source, bucket, @@ -830,28 +822,12 @@ impl S3 for FS { ..Default::default() }; - let version_id = match req.input.version_id { - Some(v) => v.to_string(), - None => String::new(), - }; + let version_id = req.input.version_id.clone().unwrap_or_default(); + helper = helper.object(object_info).version_id(version_id); - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::ObjectCreatedCopy, - bucket_name: bucket.clone(), - object: object_info, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), - version_id, - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; - - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(output)) + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } async fn restore_object(&self, req: S3Request) -> S3Result> { @@ -902,7 +878,7 @@ impl S3 for FS { } //let mut api_err; - let mut _status_code = http::StatusCode::OK; + let mut _status_code = StatusCode::OK; let mut already_restored = false; if let Err(_err) = rreq.validate(store.clone()) { //api_err = to_api_err(ErrMalformedXML); @@ -919,7 +895,7 @@ impl S3 for FS { )); } if !obj_info.restore_ongoing && obj_info.restore_expires.unwrap().unix_timestamp() != 0 { - _status_code = http::StatusCode::ACCEPTED; + _status_code = StatusCode::ACCEPTED; already_restored = true; } } @@ -1086,12 +1062,13 @@ impl S3 for FS { restore_output_path: None, }; - return Ok(S3Response::with_headers(output, header)); + Ok(S3Response::with_headers(output, header)) } /// Delete a bucket #[instrument(level = "debug", skip(self, req))] async fn delete_bucket(&self, req: S3Request) -> S3Result> { + let helper = OperationHelper::new(&req, EventName::BucketRemoved, "s3:DeleteBucket"); let input = req.input; // TODO: DeleteBucketInput doesn't have force parameter? let Some(store) = new_object_layer_fn() else { @@ -1109,28 +1086,15 @@ impl S3 for FS { .await .map_err(ApiError::from)?; - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::BucketRemoved, - bucket_name: input.bucket, - object: ObjectInfo { ..Default::default() }, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteBucketOutput {})), - version_id: String::new(), - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; - - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(DeleteBucketOutput {})) + let result = Ok(S3Response::new(DeleteBucketOutput {})); + let _ = helper.complete(&result); + result } /// Delete an object #[instrument(level = "debug", skip(self, req))] async fn delete_object(&self, mut req: S3Request) -> S3Result> { + let mut helper = OperationHelper::new(&req, EventName::ObjectRemovedDelete, "s3:DeleteObject"); let DeleteObjectInput { bucket, key, version_id, .. } = req.input.clone(); @@ -1233,32 +1197,20 @@ impl S3 for FS { EventName::ObjectRemovedDelete }; - let event_args = rustfs_notify::event::EventArgs { - event_name, - bucket_name: bucket.clone(), - object: ObjectInfo { - name: key.clone(), - bucket: bucket.clone(), - ..Default::default() - }, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteBucketOutput {})), - version_id: version_id.map(|v| v.to_string()).unwrap_or_default(), - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; + helper = helper.event_name(event_name); + helper = helper + .object(obj_info) + .version_id(version_id.map(|v| v.to_string()).unwrap_or_default()); - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(output)) + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } /// Delete multiple objects #[instrument(level = "debug", skip(self, req))] async fn delete_objects(&self, req: S3Request) -> S3Result> { + let helper = OperationHelper::new(&req, EventName::ObjectRemovedDelete, "s3:DeleteObjects").suppress_event(); let DeleteObjectsInput { bucket, delete, .. } = req.input; if delete.objects.is_empty() || delete.objects.len() > 1000 { @@ -1393,10 +1345,12 @@ impl S3 for FS { .map(|v| v.as_ref().map(|v| v.clone().into())) .collect::>>() as &[Option], ) { - return Err(S3Error::with_message(S3ErrorCode::NoSuchBucket, "Bucket not found".to_string())); + let result = Err(S3Error::with_message(S3ErrorCode::NoSuchBucket, "Bucket not found".to_string())); + let _ = helper.complete(&result); + return result; } - for (i, err) in errs.into_iter().enumerate() { + for (i, err) in errs.iter().enumerate() { let obj = dobjs[i].clone(); // let replication_state = obj.replication_state.clone().unwrap_or_default(); @@ -1426,7 +1380,7 @@ impl S3 for FS { continue; } - if let Some(err) = err { + if let Some(err) = err.clone() { delete_results[*didx].error = Some(Error { code: Some(err.to_string()), key: Some(object_to_delete[i].object_name.clone()), @@ -1481,39 +1435,39 @@ impl S3 for FS { } } - // Asynchronous call will not block the response of the current request + let req_headers = req.headers.clone(); tokio::spawn(async move { - for dobj in dobjs { - let version_id = match dobj.version_id { - None => String::new(), - Some(v) => v.to_string(), - }; - let mut event_name = EventName::ObjectRemovedDelete; - if dobj.delete_marker { - event_name = EventName::ObjectRemovedDeleteMarkerCreated; - } + for res in delete_results { + if let Some(dobj) = res.delete_object { + let event_name = if dobj.delete_marker { + EventName::ObjectRemovedDeleteMarkerCreated + } else { + EventName::ObjectRemovedDelete + }; + let event_args = EventArgsBuilder::new( + event_name, + bucket.clone(), + ObjectInfo { + name: dobj.object_name.clone(), + bucket: bucket.clone(), + ..Default::default() + }, + ) + .version_id(dobj.version_id.map(|v| v.to_string()).unwrap_or_default()) + .req_params(extract_req_params_header(&req_headers)) + .resp_elements(extract_resp_elements(&S3Response::new(DeleteObjectsOutput::default()))) + .host(get_request_host(&req_headers)) + .user_agent(get_request_user_agent(&req_headers)) + .build(); - let event_args = rustfs_notify::event::EventArgs { - event_name, - bucket_name: bucket.clone(), - object: ObjectInfo { - name: dobj.object_name, - bucket: bucket.clone(), - ..Default::default() - }, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteObjectsOutput { - ..Default::default() - })), - version_id, - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; - notifier_instance().notify(event_args).await; + notifier_global::notify(event_args).await; + } } }); - Ok(S3Response::new(output)) + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } /// Get bucket location @@ -1548,6 +1502,7 @@ impl S3 for FS { fields(start_time=?time::OffsetDateTime::now_utc()) )] async fn get_object(&self, req: S3Request) -> S3Result> { + let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGet, "s3:GetObject"); // mc get 3 let GetObjectInput { @@ -1879,27 +1834,12 @@ impl S3 for FS { ..Default::default() }; - let version_id = match req.input.version_id { - None => String::new(), - Some(v) => v.to_string(), - }; - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::ObjectAccessedGet, - bucket_name: bucket.clone(), - object: event_info, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(GetObjectOutput { ..Default::default() })), - version_id, - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; + let version_id = req.input.version_id.clone().unwrap_or_default(); + helper = helper.object(event_info).version_id(version_id); - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(output)) + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } #[instrument(level = "debug", skip(self, req))] @@ -1921,6 +1861,7 @@ impl S3 for FS { #[instrument(level = "debug", skip(self, req))] async fn head_object(&self, req: S3Request) -> S3Result> { + let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedHead, "s3:HeadObject"); // mc get 2 let HeadObjectInput { bucket, @@ -2055,27 +1996,13 @@ impl S3 for FS { ..Default::default() }; - let version_id = match req.input.version_id { - None => String::new(), - Some(v) => v.to_string(), - }; - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::ObjectAccessedGet, - bucket_name: bucket, - object: event_info, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), - version_id, - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; + let version_id = req.input.version_id.clone().unwrap_or_default(); + helper = helper.object(event_info).version_id(version_id); - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); - Ok(S3Response::new(output)) + result } #[instrument(level = "debug", skip(self))] @@ -2344,6 +2271,7 @@ impl S3 for FS { // #[instrument(level = "debug", skip(self, req))] async fn put_object(&self, req: S3Request) -> S3Result> { + let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:PutObject"); if req .headers .get("X-Amz-Meta-Snowball-Auto-Extract") @@ -2360,7 +2288,6 @@ impl S3 for FS { return Err(s3_error!(InvalidStorageClass)); } } - let event_version_id = input.version_id.as_ref().map(|v| v.to_string()).unwrap_or_default(); let PutObjectInput { body, bucket, @@ -2612,7 +2539,6 @@ impl S3 for FS { .put_object(&bucket, &key, &mut reader, &opts) .await .map_err(ApiError::from)?; - let event_info = obj_info.clone(); let e_tag = obj_info.etag.clone().map(|etag| to_s3s_etag(&etag)); let repoptions = @@ -2671,23 +2597,9 @@ impl S3 for FS { ..Default::default() }; - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::ObjectCreatedPut, - bucket_name: bucket.clone(), - object: event_info, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), - version_id: event_version_id, - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; - - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(output)) + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } #[instrument(level = "debug", skip(self, req))] @@ -2695,6 +2607,7 @@ impl S3 for FS { &self, req: S3Request, ) -> S3Result> { + let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:CreateMultipartUpload"); let CreateMultipartUploadInput { bucket, key, @@ -2826,8 +2739,6 @@ impl S3 for FS { .await .map_err(ApiError::from)?; - let object_name = key.clone(); - let bucket_name = bucket.clone(); let output = CreateMultipartUploadOutput { bucket: Some(bucket), key: Some(key), @@ -2840,31 +2751,9 @@ impl S3 for FS { ..Default::default() }; - let version_id = match req.input.version_id { - Some(v) => v.to_string(), - None => String::new(), - }; - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::ObjectCreatedCompleteMultipartUpload, - bucket_name: bucket_name.clone(), - object: ObjectInfo { - name: object_name, - bucket: bucket_name, - ..Default::default() - }, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), - version_id, - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; - - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(output)) + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } #[instrument(level = "debug", skip(self, req))] @@ -3430,6 +3319,7 @@ impl S3 for FS { &self, req: S3Request, ) -> S3Result> { + let helper = OperationHelper::new(&req, EventName::ObjectCreatedCompleteMultipartUpload, "s3:CompleteMultipartUpload"); let input = req.input; let CompleteMultipartUploadInput { multipart_upload, @@ -3536,6 +3426,26 @@ impl S3 for FS { } let output = CompleteMultipartUploadOutput { + bucket: Some(bucket.clone()), + key: Some(key.clone()), + e_tag: obj_info.etag.clone().map(|etag| to_s3s_etag(&etag)), + location: Some("us-east-1".to_string()), + server_side_encryption: server_side_encryption.clone(), // TDD: Return encryption info + ssekms_key_id: ssekms_key_id.clone(), // TDD: Return KMS key ID if present + checksum_crc32: checksum_crc32.clone(), + checksum_crc32c: checksum_crc32c.clone(), + checksum_sha1: checksum_sha1.clone(), + checksum_sha256: checksum_sha256.clone(), + checksum_crc64nvme: checksum_crc64nvme.clone(), + checksum_type: checksum_type.clone(), + ..Default::default() + }; + info!( + "TDD: Created output: SSE={:?}, KMS={:?}", + output.server_side_encryption, output.ssekms_key_id + ); + + let helper_output = entity::CompleteMultipartUploadOutput { bucket: Some(bucket.clone()), key: Some(key.clone()), e_tag: obj_info.etag.clone().map(|etag| to_s3s_etag(&etag)), @@ -3550,10 +3460,6 @@ impl S3 for FS { checksum_type, ..Default::default() }; - info!( - "TDD: Created output: SSE={:?}, KMS={:?}", - output.server_side_encryption, output.ssekms_key_id - ); let mt2 = HashMap::new(); let repoptions = @@ -3569,6 +3475,8 @@ impl S3 for FS { "TDD: About to return S3Response with output: SSE={:?}, KMS={:?}", output.server_side_encryption, output.ssekms_key_id ); + let helper_result = Ok(S3Response::new(helper_output)); + let _ = helper.complete(&helper_result); Ok(S3Response::new(output)) } @@ -3655,6 +3563,7 @@ impl S3 for FS { #[instrument(level = "debug", skip(self, req))] async fn put_object_tagging(&self, req: S3Request) -> S3Result> { + let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutTagging, "s3:PutObjectTagging"); let PutObjectTaggingInput { bucket, key: object, @@ -3706,31 +3615,12 @@ impl S3 for FS { .await .map_err(ApiError::from)?; - let version_id = match req.input.version_id { - Some(v) => v.to_string(), - None => String::new(), - }; - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::ObjectCreatedPutTagging, - bucket_name: bucket.clone(), - object: ObjectInfo { - name: object.clone(), - bucket, - ..Default::default() - }, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(PutObjectTaggingOutput { version_id: None })), - version_id, - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; + let version_id = req.input.version_id.clone().unwrap_or_default(); + helper = helper.version_id(version_id); - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(PutObjectTaggingOutput { version_id: None })) + let result = Ok(S3Response::new(PutObjectTaggingOutput { version_id: None })); + let _ = helper.complete(&result); + result } #[instrument(level = "debug", skip(self))] @@ -3760,6 +3650,7 @@ impl S3 for FS { &self, req: S3Request, ) -> S3Result> { + let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedDeleteTagging, "s3:DeleteObjectTagging"); let DeleteObjectTaggingInput { bucket, key: object, .. } = req.input.clone(); let Some(store) = new_object_layer_fn() else { @@ -3773,31 +3664,12 @@ impl S3 for FS { .await .map_err(ApiError::from)?; - let version_id = match req.input.version_id { - Some(v) => v.to_string(), - None => Uuid::new_v4().to_string(), - }; - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::ObjectCreatedDeleteTagging, - bucket_name: bucket.clone(), - object: ObjectInfo { - name: object.clone(), - bucket, - ..Default::default() - }, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteObjectTaggingOutput { version_id: None })), - version_id, - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; + let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string()); + helper = helper.version_id(version_id); - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None })) + let result = Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None })); + let _ = helper.complete(&result); + result } #[instrument(level = "debug", skip(self))] @@ -4391,7 +4263,7 @@ impl S3 for FS { let region = rustfs_ecstore::global::get_global_region().unwrap_or_else(|| req.region.clone().unwrap_or_default()); // Purge old rules and resolve new rules in parallel - let clear_rules = notifier_instance().clear_bucket_notification_rules(&bucket); + let clear_rules = notifier_global::clear_bucket_notification_rules(&bucket); let parse_rules = async { let mut event_rules = Vec::new(); @@ -4419,8 +4291,7 @@ impl S3 for FS { clear_result.map_err(|e| s3_error!(InternalError, "Failed to clear rules: {e}"))?; // Add a new notification rule - notifier_instance() - .add_event_specific_rules(&bucket, ®ion, &event_rules) + notifier_global::add_event_specific_rules(&bucket, ®ion, &event_rules) .await .map_err(|e| s3_error!(InternalError, "Failed to add rules: {e}"))?; @@ -4532,6 +4403,7 @@ impl S3 for FS { &self, req: S3Request, ) -> S3Result> { + let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedAttributes, "s3:GetObjectAttributes"); let GetObjectAttributesInput { bucket, key, .. } = req.input.clone(); let Some(store) = new_object_layer_fn() else { @@ -4550,31 +4422,19 @@ impl S3 for FS { object_parts: None, ..Default::default() }; - let version_id = match req.input.version_id { - Some(v) => v.to_string(), - None => String::new(), - }; - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::ObjectAccessedAttributes, - bucket_name: bucket.clone(), - object: ObjectInfo { + + let version_id = req.input.version_id.clone().unwrap_or_default(); + helper = helper + .object(ObjectInfo { name: key.clone(), bucket, ..Default::default() - }, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), - version_id, - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; + }) + .version_id(version_id); - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(output)) + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } async fn put_object_acl(&self, req: S3Request) -> S3Result> { @@ -4690,6 +4550,7 @@ impl S3 for FS { &self, req: S3Request, ) -> S3Result> { + let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGetLegalHold, "s3:GetObjectLegalHold"); let GetObjectLegalHoldInput { bucket, key, version_id, .. } = req.input.clone(); @@ -4732,33 +4593,19 @@ impl S3 for FS { }), }; - let version_id = match req.input.version_id { - Some(v) => v.to_string(), - None => Uuid::new_v4().to_string(), - }; - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::ObjectAccessedGetLegalHold, - bucket_name: bucket.clone(), - object: object_info, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), - version_id, - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; + let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string()); + helper = helper.object(object_info).version_id(version_id); - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(output)) + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } async fn put_object_legal_hold( &self, req: S3Request, ) -> S3Result> { + let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutLegalHold, "s3:PutObjectLegalHold"); let PutObjectLegalHoldInput { bucket, key, @@ -4811,33 +4658,19 @@ impl S3 for FS { let output = PutObjectLegalHoldOutput { request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)), }; - let version_id = match req.input.version_id { - Some(v) => v.to_string(), - None => String::new(), - }; - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::ObjectCreatedPutLegalHold, - bucket_name: bucket.clone(), - object: info, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), - version_id, - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; + let version_id = req.input.version_id.clone().unwrap_or_default(); + helper = helper.object(info).version_id(version_id); - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(output)) + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } async fn get_object_retention( &self, req: S3Request, ) -> S3Result> { + let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGetRetention, "s3:GetObjectRetention"); let GetObjectRetentionInput { bucket, key, version_id, .. } = req.input.clone(); @@ -4872,33 +4705,19 @@ impl S3 for FS { let output = GetObjectRetentionOutput { retention: Some(ObjectLockRetention { mode, retain_until_date }), }; - let version_id = match req.input.version_id { - Some(v) => v.to_string(), - None => String::new(), - }; - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::ObjectAccessedGetRetention, - bucket_name: bucket.clone(), - object: object_info, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), - version_id, - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; + let version_id = req.input.version_id.clone().unwrap_or_default(); + helper = helper.object(object_info).version_id(version_id); - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(output)) + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } async fn put_object_retention( &self, req: S3Request, ) -> S3Result> { + let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutRetention, "s3:PutObjectRetention"); let PutObjectRetentionInput { bucket, key, @@ -4947,27 +4766,12 @@ impl S3 for FS { request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)), }; - let version_id = match req.input.version_id { - Some(v) => v.to_string(), - None => Uuid::new_v4().to_string(), - }; - let event_args = rustfs_notify::event::EventArgs { - event_name: EventName::ObjectCreatedPutRetention, - bucket_name: bucket.clone(), - object: object_info, - req_params: rustfs_utils::extract_req_params_header(&req.headers), - resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())), - version_id, - host: rustfs_utils::get_request_host(&req.headers), - user_agent: rustfs_utils::get_request_user_agent(&req.headers), - }; + let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string()); + helper = helper.object(object_info).version_id(version_id); - // Asynchronous call will not block the response of the current request - tokio::spawn(async move { - notifier_instance().notify(event_args).await; - }); - - Ok(S3Response::new(output)) + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } } diff --git a/rustfs/src/storage/entity.rs b/rustfs/src/storage/entity.rs new file mode 100644 index 00000000..7273100d --- /dev/null +++ b/rustfs/src/storage/entity.rs @@ -0,0 +1,63 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![allow(dead_code)] + +use s3s::dto::{ + BucketKeyEnabled, BucketName, ChecksumCRC32, ChecksumCRC32C, ChecksumCRC64NVME, ChecksumSHA1, ChecksumSHA256, ChecksumType, + ETag, Expiration, Location, ObjectKey, ObjectVersionId, RequestCharged, SSEKMSKeyId, ServerSideEncryption, +}; + +#[derive(Debug, Clone, Default)] +pub struct CompleteMultipartUploadOutput { + pub bucket: Option, + pub bucket_key_enabled: Option, + pub checksum_crc32: Option, + pub checksum_crc32c: Option, + pub checksum_crc64nvme: Option, + pub checksum_sha1: Option, + pub checksum_sha256: Option, + pub checksum_type: Option, + pub e_tag: Option, + pub expiration: Option, + pub key: Option, + pub location: Option, + pub request_charged: Option, + pub ssekms_key_id: Option, + pub server_side_encryption: Option, + pub version_id: Option, +} + +impl From for CompleteMultipartUploadOutput { + fn from(output: s3s::dto::CompleteMultipartUploadOutput) -> Self { + Self { + bucket: output.bucket, + bucket_key_enabled: output.bucket_key_enabled, + checksum_crc32: output.checksum_crc32, + checksum_crc32c: output.checksum_crc32c, + checksum_crc64nvme: output.checksum_crc64nvme, + checksum_sha1: output.checksum_sha1, + checksum_sha256: output.checksum_sha256, + checksum_type: output.checksum_type, + e_tag: output.e_tag, + expiration: output.expiration, + key: output.key, + location: output.location, + request_charged: output.request_charged, + ssekms_key_id: output.ssekms_key_id, + server_side_encryption: output.server_side_encryption, + version_id: output.version_id, + } + } +} diff --git a/rustfs/src/storage/helper.rs b/rustfs/src/storage/helper.rs new file mode 100644 index 00000000..f72f1b1a --- /dev/null +++ b/rustfs/src/storage/helper.rs @@ -0,0 +1,209 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use http::StatusCode; +use rustfs_audit::{ + entity::{ApiDetails, ApiDetailsBuilder, AuditEntryBuilder}, + global::AuditLogger, +}; +use rustfs_ecstore::store_api::ObjectInfo; +use rustfs_notify::{EventArgsBuilder, notifier_global}; +use rustfs_targets::EventName; +use rustfs_utils::{ + extract_req_params, extract_req_params_header, extract_resp_elements, get_request_host, get_request_user_agent, +}; +use s3s::{S3Request, S3Response, S3Result}; +use std::future::Future; +use tokio::runtime::{Builder, Handle}; + +/// Schedules an asynchronous task on the current runtime; +/// if there is no runtime, creates a minimal runtime execution on a new thread. +fn spawn_background(fut: F) +where + F: Future + Send + 'static, +{ + if let Ok(handle) = Handle::try_current() { + drop(handle.spawn(fut)); + } else { + std::thread::spawn(|| { + if let Ok(rt) = Builder::new_current_thread().enable_all().build() { + rt.block_on(fut); + } + }); + } +} + +/// A unified helper structure for building and distributing audit logs and event notifications via RAII mode at the end of an S3 operation scope. +pub struct OperationHelper { + audit_builder: Option, + api_builder: ApiDetailsBuilder, + event_builder: Option, + start_time: std::time::Instant, +} + +impl OperationHelper { + /// Create a new OperationHelper for S3 requests. + pub fn new(req: &S3Request, event: EventName, trigger: &'static str) -> Self { + // Parse path -> bucket/object + let path = req.uri.path().trim_start_matches('/'); + let mut segs = path.splitn(2, '/'); + let bucket = segs.next().unwrap_or("").to_string(); + let object_key = segs.next().unwrap_or("").to_string(); + + // Infer remote address + let remote_host = req + .headers + .get("x-forwarded-for") + .and_then(|v| v.to_str().ok()) + .or_else(|| req.headers.get("x-real-ip").and_then(|v| v.to_str().ok())) + .unwrap_or("") + .to_string(); + + // Initialize audit builder + let mut api_builder = ApiDetailsBuilder::new().name(trigger); + if !bucket.is_empty() { + api_builder = api_builder.bucket(&bucket); + } + if !object_key.is_empty() { + api_builder = api_builder.object(&object_key); + } + // Audit builder + let mut audit_builder = AuditEntryBuilder::new("1.0", event, trigger, ApiDetails::default()) + .remote_host(remote_host) + .user_agent(get_request_user_agent(&req.headers)) + .req_host(get_request_host(&req.headers)) + .req_path(req.uri.path().to_string()) + .req_query(extract_req_params(req)); + + if let Some(req_id) = req.headers.get("x-amz-request-id") { + if let Ok(id_str) = req_id.to_str() { + audit_builder = audit_builder.request_id(id_str); + } + } + + // initialize event builder + // object is a placeholder that must be set later using the `object()` method. + let event_builder = EventArgsBuilder::new(event, bucket, ObjectInfo::default()) + .host(get_request_host(&req.headers)) + .user_agent(get_request_user_agent(&req.headers)) + .req_params(extract_req_params_header(&req.headers)); + + Self { + audit_builder: Some(audit_builder), + api_builder, + event_builder: Some(event_builder), + start_time: std::time::Instant::now(), + } + } + + /// Sets the ObjectInfo for event notification. + pub fn object(mut self, object_info: ObjectInfo) -> Self { + if let Some(builder) = self.event_builder.take() { + self.event_builder = Some(builder.object(object_info)); + } + self + } + + /// Set the version ID for event notifications. + pub fn version_id(mut self, version_id: impl Into) -> Self { + if let Some(builder) = self.event_builder.take() { + self.event_builder = Some(builder.version_id(version_id)); + } + self + } + + /// Set the event name for event notifications. + pub fn event_name(mut self, event_name: EventName) -> Self { + if let Some(builder) = self.event_builder.take() { + self.event_builder = Some(builder.event_name(event_name)); + } + + if let Some(builder) = self.audit_builder.take() { + self.audit_builder = Some(builder.event(event_name)); + } + + self + } + + /// Complete operational details from S3 results. + /// This method should be called immediately before the function returns. + /// It consumes and prepares auxiliary structures for use during `drop`. + pub fn complete(mut self, result: &S3Result>) -> Self { + // Complete audit log + if let Some(builder) = self.audit_builder.take() { + let (status, status_code, error_msg) = match result { + Ok(res) => ("success".to_string(), res.status.unwrap_or(StatusCode::OK).as_u16() as i32, None), + Err(e) => ( + "failure".to_string(), + e.status_code().unwrap_or(StatusCode::BAD_REQUEST).as_u16() as i32, + e.message().map(|s| s.to_string()), + ), + }; + + let ttr = self.start_time.elapsed(); + let api_details = self + .api_builder + .clone() + .status(status) + .status_code(status_code) + .time_to_response(format!("{:.2?}", ttr)) + .time_to_response_in_ns(ttr.as_nanos().to_string()) + .build(); + + let mut final_builder = builder.api(api_details.clone()); + if let Some(err) = error_msg { + final_builder = final_builder.error(err); + } + self.audit_builder = Some(final_builder); + self.api_builder = ApiDetailsBuilder(api_details); // Store final details for Drop use + } + + // Completion event notification (only on success) + if let (Some(builder), Ok(res)) = (self.event_builder.take(), result) { + self.event_builder = Some(builder.resp_elements(extract_resp_elements(res))); + } + + self + } + + /// Suppresses the automatic event notification on drop. + pub fn suppress_event(mut self) -> Self { + self.event_builder = None; + self + } +} + +impl Drop for OperationHelper { + fn drop(&mut self) { + // Distribute audit logs + if let Some(builder) = self.audit_builder.take() { + spawn_background(async move { + AuditLogger::log(builder.build()).await; + }); + } + + // Distribute event notification (only on success) + if self.api_builder.0.status.as_deref() == Some("success") { + if let Some(builder) = self.event_builder.take() { + let event_args = builder.build(); + // Avoid generating notifications for copy requests + if !event_args.is_replication_request() { + spawn_background(async move { + notifier_global::notify(event_args).await; + }); + } + } + } + } +} diff --git a/rustfs/src/storage/mod.rs b/rustfs/src/storage/mod.rs index 86c5b977..754b4abe 100644 --- a/rustfs/src/storage/mod.rs +++ b/rustfs/src/storage/mod.rs @@ -14,6 +14,7 @@ pub mod access; pub mod ecfs; -// pub mod error; +pub(crate) mod entity; +pub(crate) mod helper; pub mod options; pub mod tonic_service;