feat(storage): refactor audit and notification with OperationHelper (#825)

* improve code for audit

* improve code ecfs.rs

* improve code

* improve code for ecfs.rs

* feat(storage): refactor audit and notification with OperationHelper

This commit introduces a significant refactoring of the audit logging and event notification mechanisms within `ecfs.rs`.

The core of this change is the new `OperationHelper` struct, which encapsulates and simplifies the logic for both concerns. It replaces the previous `AuditHelper` and manual event dispatching.

Key improvements include:

- **Unified Handling**: `OperationHelper` manages both audit and notification builders, providing a single, consistent entry point for S3 operations.
- **RAII for Automation**: By leveraging the `Drop` trait, the helper automatically dispatches logs and notifications when it goes out of scope. This simplifies S3 method implementations and ensures cleanup even on early returns.
- **Fluent API**: A builder-like pattern with methods such as `.object()`, `.version_id()`, and `.suppress_event()` makes the code more readable and expressive.
- **Context-Aware Logic**: The helper's `.complete()` method intelligently populates log details based on the operation's `S3Result` and only triggers notifications on success.
- **Modular Design**: All helper logic is now isolated in `rustfs/src/storage/helper.rs`, improving separation of concerns and making `ecfs.rs` cleaner.

This refactoring significantly enhances code clarity, reduces boilerplate, and improves the robustness of logging and notification handling across the storage layer.

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* improve code for audit and notify

* fix

* fix

* fix
This commit is contained in:
houseme
2025-11-10 17:30:50 +08:00
committed by GitHub
parent b26aad4129
commit 98be7df0f5
25 changed files with 905 additions and 835 deletions

137
Cargo.lock generated
View File

@@ -505,7 +505,7 @@ checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -527,7 +527,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -538,7 +538,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -564,7 +564,7 @@ checksum = "99e1aca718ea7b89985790c94aad72d77533063fe00bc497bb79a7c2dae6a661"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -1193,7 +1193,7 @@ dependencies = [
"regex",
"rustc-hash",
"shlex",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -1273,7 +1273,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -1357,9 +1357,9 @@ dependencies = [
[[package]]
name = "bytesize"
version = "2.1.0"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5c434ae3cf0089ca203e9019ebe529c47ff45cefe8af7c85ecb734ef541822f"
checksum = "c99fa31e08a43eaa5913ef68d7e01c37a2bdce6ed648168239ad33b7d30a9cd8"
[[package]]
name = "bytestring"
@@ -1614,7 +1614,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -1707,7 +1707,7 @@ checksum = "a08a8aee16926ee1c4ad18868b8c3dfe5106359053f91e035861ec2a17116988"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -2024,7 +2024,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -2082,7 +2082,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim 0.11.1",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -2096,7 +2096,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim 0.11.1",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -2118,7 +2118,7 @@ checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
dependencies = [
"darling_core 0.20.11",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -2129,7 +2129,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81"
dependencies = [
"darling_core 0.21.3",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -2604,7 +2604,7 @@ checksum = "ec6f637bce95efac05cdfb9b6c19579ed4aa5f6b94d951cfa5bb054b7bb4f730"
dependencies = [
"datafusion-expr",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -2843,7 +2843,7 @@ checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -2885,7 +2885,7 @@ dependencies = [
"darling 0.20.11",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -2905,7 +2905,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c"
dependencies = [
"derive_builder_core 0.20.2",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -2925,7 +2925,7 @@ checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
"unicode-xid",
]
@@ -2988,7 +2988,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -3159,7 +3159,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -3180,7 +3180,7 @@ dependencies = [
"darling 0.21.3",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -3219,7 +3219,7 @@ checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -3501,7 +3501,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -5841,7 +5841,7 @@ dependencies = [
"phf_shared 0.11.3",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -5879,7 +5879,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -6074,7 +6074,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b"
dependencies = [
"proc-macro2",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -6133,7 +6133,7 @@ dependencies = [
"pulldown-cmark",
"pulldown-cmark-to-cmark",
"regex",
"syn 2.0.109",
"syn 2.0.110",
"tempfile",
]
@@ -6147,7 +6147,7 @@ dependencies = [
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -6160,7 +6160,7 @@ dependencies = [
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -6246,9 +6246,9 @@ dependencies = [
[[package]]
name = "pulldown-cmark-to-cmark"
version = "21.0.0"
version = "21.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5b6a0769a491a08b31ea5c62494a8f144ee0987d86d670a8af4df1e1b7cde75"
checksum = "8246feae3db61428fd0bb94285c690b460e4517d83152377543ca802357785f1"
dependencies = [
"pulldown-cmark",
]
@@ -6455,7 +6455,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b"
dependencies = [
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -6507,7 +6507,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -6666,7 +6666,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde_json",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -6753,7 +6753,7 @@ dependencies = [
"quote",
"rust-embed-utils",
"shellexpand",
"syn 2.0.109",
"syn 2.0.110",
"walkdir",
]
@@ -6849,7 +6849,6 @@ dependencies = [
"rustfs-zip",
"rustls 0.23.35",
"s3s",
"scopeguard",
"serde",
"serde_json",
"serde_urlencoded",
@@ -6922,6 +6921,7 @@ dependencies = [
"chrono",
"const-str",
"futures",
"hashbrown 0.16.0",
"metrics",
"rumqttc",
"rustfs-config",
@@ -7210,7 +7210,6 @@ dependencies = [
"form_urlencoded",
"futures",
"hashbrown 0.16.0",
"once_cell",
"quick-xml 0.38.3",
"rayon",
"rumqttc",
@@ -7765,7 +7764,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -7919,7 +7918,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -7930,7 +7929,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -8015,7 +8014,7 @@ dependencies = [
"darling 0.21.3",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -8040,7 +8039,7 @@ checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -8229,7 +8228,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -8315,7 +8314,7 @@ checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -8409,7 +8408,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -8421,7 +8420,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -8544,9 +8543,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.109"
version = "2.0.110"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f17c7e013e88258aa9543dcbe81aca68a667a9ac37cd69c9fbc07858bfe0e2f"
checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea"
dependencies = [
"proc-macro2",
"quote",
@@ -8591,7 +8590,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -8690,7 +8689,7 @@ dependencies = [
"cfg-if",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -8701,7 +8700,7 @@ checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
"test-case-core",
]
@@ -8731,7 +8730,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -8742,7 +8741,7 @@ checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -8898,7 +8897,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -9041,7 +9040,7 @@ dependencies = [
"prettyplease",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -9066,7 +9065,7 @@ dependencies = [
"prost-build",
"prost-types",
"quote",
"syn 2.0.109",
"syn 2.0.110",
"tempfile",
"tonic-build",
]
@@ -9159,7 +9158,7 @@ checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -9401,7 +9400,7 @@ checksum = "d9384a660318abfbd7f8932c34d67e4d1ec511095f95972ddc01e19d7ba8413f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -9557,7 +9556,7 @@ dependencies = [
"bumpalo",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
"wasm-bindgen-shared",
]
@@ -9731,7 +9730,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -9742,7 +9741,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -10012,7 +10011,7 @@ dependencies = [
"darling 0.20.11",
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -10077,7 +10076,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
"synstructure 0.13.2",
]
@@ -10098,7 +10097,7 @@ checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -10118,7 +10117,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
"synstructure 0.13.2",
]
@@ -10139,7 +10138,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]
@@ -10172,7 +10171,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.109",
"syn 2.0.110",
]
[[package]]

View File

@@ -123,7 +123,7 @@ tower-http = { version = "0.6.6", features = ["cors"] }
# Serialization and Data Formats
bytes = { version = "1.10.1", features = ["serde"] }
bytesize = "2.1.0"
bytesize = "2.2.0"
byteorder = "1.5.0"
flatbuffers = "25.9.23"
form_urlencoded = "1.2.2"
@@ -226,7 +226,6 @@ rumqttc = { version = "0.25.0" }
rust-embed = { version = "8.9.0" }
rustc-hash = { version = "2.1.1" }
s3s = { version = "0.12.0-rc.3", features = ["minio"] }
scopeguard = "1.2.0"
serial_test = "3.2.0"
shadow-rs = { version = "1.4.0", default-features = false }
siphasher = "1.0.1"
@@ -280,7 +279,7 @@ mimalloc = "0.1"
[workspace.metadata.cargo-shear]
ignored = ["rustfs", "rustfs-mcp", "tokio-test", "scopeguard"]
ignored = ["rustfs", "rustfs-mcp", "tokio-test"]
[profile.release]
opt-level = 3

View File

@@ -32,6 +32,7 @@ rustfs-ecstore = { workspace = true }
chrono = { workspace = true }
const-str = { workspace = true }
futures = { workspace = true }
hashbrown = { workspace = true }
metrics = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }

View File

@@ -13,18 +13,10 @@
// limitations under the License.
use chrono::{DateTime, Utc};
use hashbrown::HashMap;
use rustfs_targets::EventName;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// Trait for types that can be serialized to JSON and have a timestamp
pub trait LogRecord {
/// Serialize the record to a JSON string
fn to_json(&self) -> String;
/// Get the timestamp of the record
fn get_timestamp(&self) -> chrono::DateTime<chrono::Utc>;
}
/// ObjectVersion represents an object version with key and versionId
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
@@ -36,19 +28,12 @@ pub struct ObjectVersion {
}
impl ObjectVersion {
/// Set the object name (chainable)
pub fn set_object_name(&mut self, name: String) -> &mut Self {
self.object_name = name;
self
}
/// Set the version ID (chainable)
pub fn set_version_id(&mut self, version_id: Option<String>) -> &mut Self {
self.version_id = version_id;
self
pub fn new(object_name: String, version_id: Option<String>) -> Self {
Self { object_name, version_id }
}
}
/// ApiDetails contains API information for the audit entry
/// `ApiDetails` contains API information for the audit entry.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ApiDetails {
#[serde(skip_serializing_if = "Option::is_none")]
@@ -79,75 +64,86 @@ pub struct ApiDetails {
pub time_to_response_in_ns: Option<String>,
}
impl ApiDetails {
/// Set API name (chainable)
pub fn set_name(&mut self, name: Option<String>) -> &mut Self {
self.name = name;
/// Builder for `ApiDetails`.
#[derive(Default, Clone)]
pub struct ApiDetailsBuilder(pub ApiDetails);
impl ApiDetailsBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn name(mut self, name: impl Into<String>) -> Self {
self.0.name = Some(name.into());
self
}
/// Set bucket name (chainable)
pub fn set_bucket(&mut self, bucket: Option<String>) -> &mut Self {
self.bucket = bucket;
pub fn bucket(mut self, bucket: impl Into<String>) -> Self {
self.0.bucket = Some(bucket.into());
self
}
/// Set object name (chainable)
pub fn set_object(&mut self, object: Option<String>) -> &mut Self {
self.object = object;
pub fn object(mut self, object: impl Into<String>) -> Self {
self.0.object = Some(object.into());
self
}
/// Set objects list (chainable)
pub fn set_objects(&mut self, objects: Option<Vec<ObjectVersion>>) -> &mut Self {
self.objects = objects;
pub fn objects(mut self, objects: Vec<ObjectVersion>) -> Self {
self.0.objects = Some(objects);
self
}
/// Set status (chainable)
pub fn set_status(&mut self, status: Option<String>) -> &mut Self {
self.status = status;
pub fn status(mut self, status: impl Into<String>) -> Self {
self.0.status = Some(status.into());
self
}
/// Set status code (chainable)
pub fn set_status_code(&mut self, code: Option<i32>) -> &mut Self {
self.status_code = code;
pub fn status_code(mut self, code: i32) -> Self {
self.0.status_code = Some(code);
self
}
/// Set input bytes (chainable)
pub fn set_input_bytes(&mut self, bytes: Option<i64>) -> &mut Self {
self.input_bytes = bytes;
pub fn input_bytes(mut self, bytes: i64) -> Self {
self.0.input_bytes = Some(bytes);
self
}
/// Set output bytes (chainable)
pub fn set_output_bytes(&mut self, bytes: Option<i64>) -> &mut Self {
self.output_bytes = bytes;
pub fn output_bytes(mut self, bytes: i64) -> Self {
self.0.output_bytes = Some(bytes);
self
}
/// Set header bytes (chainable)
pub fn set_header_bytes(&mut self, bytes: Option<i64>) -> &mut Self {
self.header_bytes = bytes;
pub fn header_bytes(mut self, bytes: i64) -> Self {
self.0.header_bytes = Some(bytes);
self
}
/// Set time to first byte (chainable)
pub fn set_time_to_first_byte(&mut self, t: Option<String>) -> &mut Self {
self.time_to_first_byte = t;
pub fn time_to_first_byte(mut self, t: impl Into<String>) -> Self {
self.0.time_to_first_byte = Some(t.into());
self
}
/// Set time to first byte in nanoseconds (chainable)
pub fn set_time_to_first_byte_in_ns(&mut self, t: Option<String>) -> &mut Self {
self.time_to_first_byte_in_ns = t;
pub fn time_to_first_byte_in_ns(mut self, t: impl Into<String>) -> Self {
self.0.time_to_first_byte_in_ns = Some(t.into());
self
}
/// Set time to response (chainable)
pub fn set_time_to_response(&mut self, t: Option<String>) -> &mut Self {
self.time_to_response = t;
pub fn time_to_response(mut self, t: impl Into<String>) -> Self {
self.0.time_to_response = Some(t.into());
self
}
/// Set time to response in nanoseconds (chainable)
pub fn set_time_to_response_in_ns(&mut self, t: Option<String>) -> &mut Self {
self.time_to_response_in_ns = t;
pub fn time_to_response_in_ns(mut self, t: impl Into<String>) -> Self {
self.0.time_to_response_in_ns = Some(t.into());
self
}
pub fn build(self) -> ApiDetails {
self.0
}
}
/// AuditEntry represents an audit log entry
/// `AuditEntry` represents an audit log entry.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct AuditEntry {
pub version: String,
@@ -155,6 +151,7 @@ pub struct AuditEntry {
pub deployment_id: Option<String>,
#[serde(rename = "siteName", skip_serializing_if = "Option::is_none")]
pub site_name: Option<String>,
#[serde(with = "chrono::serde::ts_milliseconds")]
pub time: DateTime<Utc>,
pub event: EventName,
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
@@ -191,200 +188,130 @@ pub struct AuditEntry {
pub error: Option<String>,
}
impl AuditEntry {
/// Create a new AuditEntry with required fields
#[allow(clippy::too_many_arguments)]
pub fn new(
version: String,
deployment_id: Option<String>,
site_name: Option<String>,
time: DateTime<Utc>,
event: EventName,
entry_type: Option<String>,
trigger: String,
api: ApiDetails,
) -> Self {
AuditEntry {
version,
deployment_id,
site_name,
time,
/// Constructor for `AuditEntry`.
pub struct AuditEntryBuilder(AuditEntry);
impl AuditEntryBuilder {
/// Create a new builder with all required fields.
pub fn new(version: impl Into<String>, event: EventName, trigger: impl Into<String>, api: ApiDetails) -> Self {
Self(AuditEntry {
version: version.into(),
time: Utc::now(),
event,
entry_type,
trigger,
trigger: trigger.into(),
api,
remote_host: None,
request_id: None,
user_agent: None,
req_path: None,
req_host: None,
req_node: None,
req_claims: None,
req_query: None,
req_header: None,
resp_header: None,
tags: None,
access_key: None,
parent_user: None,
error: None,
}
..Default::default()
})
}
/// Set version (chainable)
pub fn set_version(&mut self, version: String) -> &mut Self {
self.version = version;
self
}
/// Set deployment ID (chainable)
pub fn set_deployment_id(&mut self, id: Option<String>) -> &mut Self {
self.deployment_id = id;
self
}
/// Set site name (chainable)
pub fn set_site_name(&mut self, name: Option<String>) -> &mut Self {
self.site_name = name;
self
}
/// Set time (chainable)
pub fn set_time(&mut self, time: DateTime<Utc>) -> &mut Self {
self.time = time;
self
}
/// Set event (chainable)
pub fn set_event(&mut self, event: EventName) -> &mut Self {
self.event = event;
self
}
/// Set entry type (chainable)
pub fn set_entry_type(&mut self, entry_type: Option<String>) -> &mut Self {
self.entry_type = entry_type;
self
}
/// Set trigger (chainable)
pub fn set_trigger(&mut self, trigger: String) -> &mut Self {
self.trigger = trigger;
self
}
/// Set API details (chainable)
pub fn set_api(&mut self, api: ApiDetails) -> &mut Self {
self.api = api;
self
}
/// Set remote host (chainable)
pub fn set_remote_host(&mut self, host: Option<String>) -> &mut Self {
self.remote_host = host;
self
}
/// Set request ID (chainable)
pub fn set_request_id(&mut self, id: Option<String>) -> &mut Self {
self.request_id = id;
self
}
/// Set user agent (chainable)
pub fn set_user_agent(&mut self, agent: Option<String>) -> &mut Self {
self.user_agent = agent;
self
}
/// Set request path (chainable)
pub fn set_req_path(&mut self, path: Option<String>) -> &mut Self {
self.req_path = path;
self
}
/// Set request host (chainable)
pub fn set_req_host(&mut self, host: Option<String>) -> &mut Self {
self.req_host = host;
self
}
/// Set request node (chainable)
pub fn set_req_node(&mut self, node: Option<String>) -> &mut Self {
self.req_node = node;
self
}
/// Set request claims (chainable)
pub fn set_req_claims(&mut self, claims: Option<HashMap<String, Value>>) -> &mut Self {
self.req_claims = claims;
self
}
/// Set request query (chainable)
pub fn set_req_query(&mut self, query: Option<HashMap<String, String>>) -> &mut Self {
self.req_query = query;
self
}
/// Set request header (chainable)
pub fn set_req_header(&mut self, header: Option<HashMap<String, String>>) -> &mut Self {
self.req_header = header;
self
}
/// Set response header (chainable)
pub fn set_resp_header(&mut self, header: Option<HashMap<String, String>>) -> &mut Self {
self.resp_header = header;
self
}
/// Set tags (chainable)
pub fn set_tags(&mut self, tags: Option<HashMap<String, Value>>) -> &mut Self {
self.tags = tags;
self
}
/// Set access key (chainable)
pub fn set_access_key(&mut self, key: Option<String>) -> &mut Self {
self.access_key = key;
self
}
/// Set parent user (chainable)
pub fn set_parent_user(&mut self, user: Option<String>) -> &mut Self {
self.parent_user = user;
self
}
/// Set error message (chainable)
pub fn set_error(&mut self, error: Option<String>) -> &mut Self {
self.error = error;
// event
pub fn version(mut self, version: impl Into<String>) -> Self {
self.0.version = version.into();
self
}
/// Build AuditEntry from context or parameters (example, can be extended)
pub fn from_context(
version: String,
deployment_id: Option<String>,
time: DateTime<Utc>,
event: EventName,
trigger: String,
api: ApiDetails,
tags: Option<HashMap<String, Value>>,
) -> Self {
AuditEntry {
version,
deployment_id,
site_name: None,
time,
event,
entry_type: None,
trigger,
api,
remote_host: None,
request_id: None,
user_agent: None,
req_path: None,
req_host: None,
req_node: None,
req_claims: None,
req_query: None,
req_header: None,
resp_header: None,
tags,
access_key: None,
parent_user: None,
error: None,
}
}
}
impl LogRecord for AuditEntry {
/// Serialize AuditEntry to JSON string
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
/// Get the timestamp of the audit entry
fn get_timestamp(&self) -> DateTime<Utc> {
self.time
pub fn event(mut self, event: EventName) -> Self {
self.0.event = event;
self
}
pub fn api(mut self, api_details: ApiDetails) -> Self {
self.0.api = api_details;
self
}
pub fn deployment_id(mut self, id: impl Into<String>) -> Self {
self.0.deployment_id = Some(id.into());
self
}
pub fn site_name(mut self, name: impl Into<String>) -> Self {
self.0.site_name = Some(name.into());
self
}
pub fn time(mut self, time: DateTime<Utc>) -> Self {
self.0.time = time;
self
}
pub fn entry_type(mut self, entry_type: impl Into<String>) -> Self {
self.0.entry_type = Some(entry_type.into());
self
}
pub fn remote_host(mut self, host: impl Into<String>) -> Self {
self.0.remote_host = Some(host.into());
self
}
pub fn request_id(mut self, id: impl Into<String>) -> Self {
self.0.request_id = Some(id.into());
self
}
pub fn user_agent(mut self, agent: impl Into<String>) -> Self {
self.0.user_agent = Some(agent.into());
self
}
pub fn req_path(mut self, path: impl Into<String>) -> Self {
self.0.req_path = Some(path.into());
self
}
pub fn req_host(mut self, host: impl Into<String>) -> Self {
self.0.req_host = Some(host.into());
self
}
pub fn req_node(mut self, node: impl Into<String>) -> Self {
self.0.req_node = Some(node.into());
self
}
pub fn req_claims(mut self, claims: HashMap<String, Value>) -> Self {
self.0.req_claims = Some(claims);
self
}
pub fn req_query(mut self, query: HashMap<String, String>) -> Self {
self.0.req_query = Some(query);
self
}
pub fn req_header(mut self, header: HashMap<String, String>) -> Self {
self.0.req_header = Some(header);
self
}
pub fn resp_header(mut self, header: HashMap<String, String>) -> Self {
self.0.resp_header = Some(header);
self
}
pub fn tags(mut self, tags: HashMap<String, Value>) -> Self {
self.0.tags = Some(tags);
self
}
pub fn access_key(mut self, key: impl Into<String>) -> Self {
self.0.access_key = Some(key.into());
self
}
pub fn parent_user(mut self, user: impl Into<String>) -> Self {
self.0.parent_user = Some(user.into());
self
}
pub fn error(mut self, error: impl Into<String>) -> Self {
self.0.error = Some(error.into());
self
}
/// Construct the final `AuditEntry`.
pub fn build(self) -> AuditEntry {
self.0
}
}

View File

@@ -21,7 +21,7 @@ pub type AuditResult<T> = Result<T, AuditError>;
#[derive(Error, Debug)]
pub enum AuditError {
#[error("Configuration error: {0}")]
Configuration(String),
Configuration(String, #[source] Option<Box<dyn std::error::Error + Send + Sync>>),
#[error("config not loaded")]
ConfigNotLoaded,
@@ -35,11 +35,14 @@ pub enum AuditError {
#[error("System already initialized")]
AlreadyInitialized,
#[error("Storage not available: {0}")]
StorageNotAvailable(String),
#[error("Failed to save configuration: {0}")]
SaveConfig(String),
SaveConfig(#[source] Box<dyn std::error::Error + Send + Sync>),
#[error("Failed to load configuration: {0}")]
LoadConfig(String),
LoadConfig(#[source] Box<dyn std::error::Error + Send + Sync>),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
@@ -49,7 +52,4 @@ pub enum AuditError {
#[error("Join error: {0}")]
Join(#[from] tokio::task::JoinError),
#[error("Server storage not initialized: {0}")]
ServerNotInitialized(String),
}

View File

@@ -15,7 +15,7 @@
use crate::{AuditEntry, AuditResult, AuditSystem};
use rustfs_ecstore::config::Config;
use std::sync::{Arc, OnceLock};
use tracing::{error, warn};
use tracing::{error, trace, warn};
/// Global audit system instance
static AUDIT_SYSTEM: OnceLock<Arc<AuditSystem>> = OnceLock::new();
@@ -30,6 +30,19 @@ pub fn audit_system() -> Option<Arc<AuditSystem>> {
AUDIT_SYSTEM.get().cloned()
}
/// A helper macro for executing closures if the global audit system is initialized.
/// If not initialized, log a warning and return `Ok(())`.
macro_rules! with_audit_system {
($async_closure:expr) => {
if let Some(system) = audit_system() {
(async move { $async_closure(system).await }).await
} else {
warn!("Audit system not initialized, operation skipped.");
Ok(())
}
};
}
/// Start the global audit system with configuration
pub async fn start_audit_system(config: Config) -> AuditResult<()> {
let system = init_audit_system();
@@ -38,32 +51,17 @@ pub async fn start_audit_system(config: Config) -> AuditResult<()> {
/// Stop the global audit system
pub async fn stop_audit_system() -> AuditResult<()> {
if let Some(system) = audit_system() {
system.close().await
} else {
warn!("Audit system not initialized, cannot stop");
Ok(())
}
with_audit_system!(|system: Arc<AuditSystem>| async move { system.close().await })
}
/// Pause the global audit system
pub async fn pause_audit_system() -> AuditResult<()> {
if let Some(system) = audit_system() {
system.pause().await
} else {
warn!("Audit system not initialized, cannot pause");
Ok(())
}
with_audit_system!(|system: Arc<AuditSystem>| async move { system.pause().await })
}
/// Resume the global audit system
pub async fn resume_audit_system() -> AuditResult<()> {
if let Some(system) = audit_system() {
system.resume().await
} else {
warn!("Audit system not initialized, cannot resume");
Ok(())
}
with_audit_system!(|system: Arc<AuditSystem>| async move { system.resume().await })
}
/// Dispatch an audit log entry to all targets
@@ -72,23 +70,23 @@ pub async fn dispatch_audit_log(entry: Arc<AuditEntry>) -> AuditResult<()> {
if system.is_running().await {
system.dispatch(entry).await
} else {
// System not running, just drop the log entry without error
// The system is initialized but not running (for example, it is suspended). Silently discard log entries based on original logic.
// For debugging purposes, it can be useful to add a trace log here.
trace!("Audit system is not running, dropping audit entry.");
Ok(())
}
} else {
// System not initialized, just drop the log entry without error
// The system is not initialized at all. This is a more important state.
// It might be better to return an error or log a warning.
warn!("Audit system not initialized, dropping audit entry.");
// If this should be a hard failure, you can return Err(AuditError::NotInitialized("..."))
Ok(())
}
}
/// Reload the global audit system configuration
pub async fn reload_audit_config(config: Config) -> AuditResult<()> {
if let Some(system) = audit_system() {
system.reload_config(config).await
} else {
warn!("Audit system not initialized, cannot reload config");
Ok(())
}
with_audit_system!(|system: Arc<AuditSystem>| async move { system.reload_config(config).await })
}
/// Check if the global audit system is running

View File

@@ -25,7 +25,7 @@ pub mod observability;
pub mod registry;
pub mod system;
pub use entity::{ApiDetails, AuditEntry, LogRecord, ObjectVersion};
pub use entity::{ApiDetails, AuditEntry, ObjectVersion};
pub use error::{AuditError, AuditResult};
pub use global::*;
pub use observability::{AuditMetrics, AuditMetricsReport, PerformanceValidation};

View File

@@ -14,6 +14,7 @@
use crate::{AuditEntry, AuditError, AuditResult};
use futures::{StreamExt, stream::FuturesUnordered};
use hashbrown::{HashMap, HashSet};
use rustfs_config::{
DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR,
MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_BATCH_SIZE,
@@ -25,7 +26,6 @@ use rustfs_targets::{
Target, TargetError,
target::{ChannelTargetType, TargetType, mqtt::MQTTArgs, webhook::WebhookArgs},
};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, error, info, warn};
@@ -251,7 +251,7 @@ impl AuditRegistry {
sections.extend(successes_by_section.keys().cloned());
for section_name in sections {
let mut section_map: HashMap<String, KVS> = HashMap::new();
let mut section_map: std::collections::HashMap<String, KVS> = std::collections::HashMap::new();
// The default entry (if present) is written back to `_`
if let Some(default_cfg) = section_defaults.get(&section_name) {
@@ -277,7 +277,7 @@ impl AuditRegistry {
// 7. Save the new configuration to the system
let Some(store) = rustfs_ecstore::new_object_layer_fn() else {
return Err(AuditError::ServerNotInitialized(
return Err(AuditError::StorageNotAvailable(
"Failed to save target configuration: server storage not initialized".to_string(),
));
};
@@ -286,7 +286,7 @@ impl AuditRegistry {
Ok(_) => info!("New audit configuration saved to system successfully"),
Err(e) => {
error!(error = %e, "Failed to save new audit configuration");
return Err(AuditError::SaveConfig(e.to_string()));
return Err(AuditError::SaveConfig(Box::new(e)));
}
}
}

View File

@@ -146,7 +146,7 @@ impl AuditSystem {
warn!("Audit system is already paused");
Ok(())
}
_ => Err(AuditError::Configuration("Cannot pause audit system in current state".to_string())),
_ => Err(AuditError::Configuration("Cannot pause audit system in current state".to_string(), None)),
}
}
@@ -164,7 +164,7 @@ impl AuditSystem {
warn!("Audit system is already running");
Ok(())
}
_ => Err(AuditError::Configuration("Cannot resume audit system in current state".to_string())),
_ => Err(AuditError::Configuration("Cannot resume audit system in current state".to_string(), None)),
}
}
@@ -460,7 +460,7 @@ impl AuditSystem {
info!(target_id = %target_id, "Target enabled");
Ok(())
} else {
Err(AuditError::Configuration(format!("Target not found: {target_id}")))
Err(AuditError::Configuration(format!("Target not found: {target_id}"), None))
}
}
@@ -473,7 +473,7 @@ impl AuditSystem {
info!(target_id = %target_id, "Target disabled");
Ok(())
} else {
Err(AuditError::Configuration(format!("Target not found: {target_id}")))
Err(AuditError::Configuration(format!("Target not found: {target_id}"), None))
}
}
@@ -487,7 +487,7 @@ impl AuditSystem {
info!(target_id = %target_id, "Target removed");
Ok(())
} else {
Err(AuditError::Configuration(format!("Target not found: {target_id}")))
Err(AuditError::Configuration(format!("Target not found: {target_id}"), None))
}
}

View File

@@ -52,7 +52,7 @@ async fn test_config_parsing_webhook() {
// We expect this to fail due to server storage not being initialized
// but the parsing should work correctly
match result {
Err(AuditError::ServerNotInitialized(_)) => {
Err(AuditError::StorageNotAvailable(_)) => {
// This is expected in test environment
}
Err(e) => {

View File

@@ -73,7 +73,7 @@ async fn test_concurrent_target_creation() {
// Verify it fails with expected error (server not initialized)
match result {
Err(AuditError::ServerNotInitialized(_)) => {
Err(AuditError::StorageNotAvailable(_)) => {
// Expected in test environment
}
Err(e) => {
@@ -103,17 +103,17 @@ async fn test_audit_log_dispatch_performance() {
use std::collections::HashMap;
let id = 1;
let mut req_header = HashMap::new();
let mut req_header = hashbrown::HashMap::new();
req_header.insert("authorization".to_string(), format!("Bearer test-token-{id}"));
req_header.insert("content-type".to_string(), "application/octet-stream".to_string());
let mut resp_header = HashMap::new();
let mut resp_header = hashbrown::HashMap::new();
resp_header.insert("x-response".to_string(), "ok".to_string());
let mut tags = HashMap::new();
let mut tags = hashbrown::HashMap::new();
tags.insert(format!("tag-{id}"), json!("sample"));
let mut req_query = HashMap::new();
let mut req_query = hashbrown::HashMap::new();
req_query.insert("id".to_string(), id.to_string());
let api_details = ApiDetails {

View File

@@ -35,7 +35,7 @@ async fn test_complete_audit_system_lifecycle() {
// Should fail in test environment but state handling should work
match start_result {
Err(AuditError::ServerNotInitialized(_)) => {
Err(AuditError::StorageNotAvailable(_)) => {
// Expected in test environment
assert_eq!(system.get_state().await, system::AuditSystemState::Stopped);
}
@@ -168,7 +168,7 @@ async fn test_config_parsing_with_multiple_instances() {
// Should fail due to server storage not initialized, but parsing should work
match result {
Err(AuditError::ServerNotInitialized(_)) => {
Err(AuditError::StorageNotAvailable(_)) => {
// Expected - parsing worked but save failed
}
Err(e) => {
@@ -182,48 +182,6 @@ async fn test_config_parsing_with_multiple_instances() {
}
}
// #[tokio::test]
// async fn test_environment_variable_precedence() {
// // Test that environment variables override config file settings
// // This test validates the ENV > file instance > file default precedence
// // Set some test environment variables
// std::env::set_var("RUSTFS_AUDIT_WEBHOOK_ENABLE_TEST", "on");
// std::env::set_var("RUSTFS_AUDIT_WEBHOOK_ENDPOINT_TEST", "http://env.example.com/audit");
// std::env::set_var("RUSTFS_AUDIT_WEBHOOK_AUTH_TOKEN_TEST", "env-token");
// let mut registry = AuditRegistry::new();
//
// // Create config that should be overridden by env vars
// let mut config = Config(HashMap::new());
// let mut webhook_section = HashMap::new();
//
// let mut test_kvs = KVS::new();
// test_kvs.insert("enable".to_string(), "off".to_string()); // Should be overridden
// test_kvs.insert("endpoint".to_string(), "http://file.example.com/audit".to_string()); // Should be overridden
// test_kvs.insert("batch_size".to_string(), "10".to_string()); // Should remain from file
// webhook_section.insert("test".to_string(), test_kvs);
//
// config.0.insert("audit_webhook".to_string(), webhook_section);
//
// // Try to create targets - should use env vars for endpoint/enable, file for batch_size
// let result = registry.create_targets_from_config(&config).await;
// // Clean up env vars
// std::env::remove_var("RUSTFS_AUDIT_WEBHOOK_ENABLE_TEST");
// std::env::remove_var("RUSTFS_AUDIT_WEBHOOK_ENDPOINT_TEST");
// std::env::remove_var("RUSTFS_AUDIT_WEBHOOK_AUTH_TOKEN_TEST");
// // Should fail due to server storage, but precedence logic should work
// match result {
// Err(AuditError::ServerNotInitialized(_)) => {
// // Expected - precedence parsing worked but save failed
// }
// Err(e) => {
// println!("Environment precedence test error: {}", e);
// }
// Ok(_) => {
// println!("Unexpected success in environment precedence test");
// }
// }
// }
#[test]
fn test_target_type_validation() {
use rustfs_targets::target::TargetType;
@@ -315,19 +273,18 @@ fn create_sample_audit_entry_with_id(id: u32) -> AuditEntry {
use chrono::Utc;
use rustfs_targets::EventName;
use serde_json::json;
use std::collections::HashMap;
let mut req_header = HashMap::new();
let mut req_header = hashbrown::HashMap::new();
req_header.insert("authorization".to_string(), format!("Bearer test-token-{id}"));
req_header.insert("content-type".to_string(), "application/octet-stream".to_string());
let mut resp_header = HashMap::new();
let mut resp_header = hashbrown::HashMap::new();
resp_header.insert("x-response".to_string(), "ok".to_string());
let mut tags = HashMap::new();
let mut tags = hashbrown::HashMap::new();
tags.insert(format!("tag-{id}"), json!("sample"));
let mut req_query = HashMap::new();
let mut req_query = hashbrown::HashMap::new();
req_query.insert("id".to_string(), id.to_string());
let api_details = ApiDetails {

View File

@@ -35,7 +35,6 @@ chrono = { workspace = true, features = ["serde"] }
futures = { workspace = true }
form_urlencoded = { workspace = true }
hashbrown = { workspace = true }
once_cell = { workspace = true }
quick-xml = { workspace = true, features = ["serialize", "async-tokio"] }
rayon = { workspace = true }
rumqttc = { workspace = true }

View File

@@ -12,11 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_targets::TargetError;
use rustfs_targets::arn::TargetID;
use rustfs_targets::{TargetError, arn::TargetID};
use std::io;
use thiserror::Error;
/// Errors related to the notification system's lifecycle.
#[derive(Debug, Error)]
pub enum LifecycleError {
/// Error indicating the system has already been initialized.
#[error("System has already been initialized")]
AlreadyInitialized,
/// Error indicating the system has not been initialized yet.
#[error("System has not been initialized")]
NotInitialized,
}
/// Error types for the notification system
#[derive(Debug, Error)]
pub enum NotificationError {
@@ -38,11 +49,8 @@ pub enum NotificationError {
#[error("Rule configuration error: {0}")]
RuleConfiguration(String),
#[error("System initialization error: {0}")]
Initialization(String),
#[error("Notification system has already been initialized")]
AlreadyInitialized,
#[error("System lifecycle error: {0}")]
Lifecycle(#[from] LifecycleError),
#[error("I/O error: {0}")]
Io(io::Error),
@@ -56,6 +64,9 @@ pub enum NotificationError {
#[error("Target '{0}' not found")]
TargetNotFound(TargetID),
#[error("Server not initialized")]
ServerNotInitialized,
#[error("System initialization error: {0}")]
Initialization(String),
#[error("Storage not available: {0}")]
StorageNotAvailable(String),
}

View File

@@ -276,3 +276,120 @@ impl EventArgs {
self.req_params.contains_key("x-rustfs-source-replication-request")
}
}
/// Builder for [`EventArgs`].
///
/// This builder provides a fluent API to construct an `EventArgs` instance,
/// ensuring that all required fields are provided.
///
/// # Example
///
/// ```ignore
/// let args = EventArgsBuilder::new(
/// EventName::ObjectCreatedPut,
/// "my-bucket",
/// object_info,
/// )
/// .host("localhost:9000")
/// .user_agent("my-app/1.0")
/// .build();
/// ```
#[derive(Debug, Clone, Default)]
pub struct EventArgsBuilder {
event_name: EventName,
bucket_name: String,
object: rustfs_ecstore::store_api::ObjectInfo,
req_params: HashMap<String, String>,
resp_elements: HashMap<String, String>,
version_id: String,
host: String,
user_agent: String,
}
impl EventArgsBuilder {
/// Creates a new builder with the required fields.
pub fn new(event_name: EventName, bucket_name: impl Into<String>, object: rustfs_ecstore::store_api::ObjectInfo) -> Self {
Self {
event_name,
bucket_name: bucket_name.into(),
object,
..Default::default()
}
}
/// Sets the event name.
pub fn event_name(mut self, event_name: EventName) -> Self {
self.event_name = event_name;
self
}
/// Sets the bucket name.
pub fn bucket_name(mut self, bucket_name: impl Into<String>) -> Self {
self.bucket_name = bucket_name.into();
self
}
/// Sets the object information.
pub fn object(mut self, object: rustfs_ecstore::store_api::ObjectInfo) -> Self {
self.object = object;
self
}
/// Sets the request parameters.
pub fn req_params(mut self, req_params: HashMap<String, String>) -> Self {
self.req_params = req_params;
self
}
/// Adds a single request parameter.
pub fn req_param(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.req_params.insert(key.into(), value.into());
self
}
/// Sets the response elements.
pub fn resp_elements(mut self, resp_elements: HashMap<String, String>) -> Self {
self.resp_elements = resp_elements;
self
}
/// Adds a single response element.
pub fn resp_element(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.resp_elements.insert(key.into(), value.into());
self
}
/// Sets the version ID.
pub fn version_id(mut self, version_id: impl Into<String>) -> Self {
self.version_id = version_id.into();
self
}
/// Sets the host.
pub fn host(mut self, host: impl Into<String>) -> Self {
self.host = host.into();
self
}
/// Sets the user agent.
pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
self.user_agent = user_agent.into();
self
}
/// Builds the final `EventArgs` instance.
///
/// This method consumes the builder and returns the constructed `EventArgs`.
pub fn build(self) -> EventArgs {
EventArgs {
event_name: self.event_name,
bucket_name: self.bucket_name,
object: self.object,
req_params: self.req_params,
resp_elements: self.resp_elements,
version_id: self.version_id,
host: self.host,
user_agent: self.user_agent,
}
}
}

View File

@@ -12,17 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{BucketNotificationConfig, Event, EventArgs, NotificationError, NotificationSystem};
use once_cell::sync::Lazy;
use crate::{BucketNotificationConfig, Event, EventArgs, LifecycleError, NotificationError, NotificationSystem};
use rustfs_ecstore::config::Config;
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use rustfs_targets::{EventName, arn::TargetID};
use std::sync::{Arc, OnceLock};
use tracing::{error, instrument};
use tracing::error;
static NOTIFICATION_SYSTEM: OnceLock<Arc<NotificationSystem>> = OnceLock::new();
// Create a globally unique Notifier instance
static GLOBAL_NOTIFIER: Lazy<Notifier> = Lazy::new(|| Notifier {});
/// Initialize the global notification system with the given configuration.
/// This function should only be called once throughout the application life cycle.
@@ -34,7 +30,7 @@ pub async fn initialize(config: Config) -> Result<(), NotificationError> {
match NOTIFICATION_SYSTEM.set(Arc::new(system)) {
Ok(_) => Ok(()),
Err(_) => Err(NotificationError::AlreadyInitialized),
Err(_) => Err(NotificationError::Lifecycle(LifecycleError::AlreadyInitialized)),
}
}
@@ -49,14 +45,11 @@ pub fn is_notification_system_initialized() -> bool {
NOTIFICATION_SYSTEM.get().is_some()
}
/// Returns a reference to the global Notifier instance.
pub fn notifier_instance() -> &'static Notifier {
&GLOBAL_NOTIFIER
}
/// A module providing the public API for event notification.
pub mod notifier_global {
use super::*;
use tracing::instrument;
pub struct Notifier {}
impl Notifier {
/// Notify an event asynchronously.
/// This is the only entry point for all event notifications in the system.
/// # Parameter
@@ -67,8 +60,8 @@ impl Notifier {
///
/// # Using
/// This function is used to notify events in the system, such as object creation, deletion, or updates.
#[instrument(skip(self, args))]
pub async fn notify(&self, args: EventArgs) {
#[instrument(skip(args))]
pub async fn notify(args: EventArgs) {
// Dependency injection or service positioning mode obtain NotificationSystem instance
let notification_sys = match notification_system() {
// If the notification system itself cannot be retrieved, it will be returned directly
@@ -110,7 +103,6 @@ impl Notifier {
/// # Using
/// This function allows you to dynamically add notification rules for a specific bucket.
pub async fn add_bucket_notification_rule(
&self,
bucket_name: &str,
region: &str,
event_names: &[EventName],
@@ -137,7 +129,7 @@ impl Notifier {
// Get global NotificationSystem
let notification_sys = match notification_system() {
Some(sys) => sys,
None => return Err(NotificationError::ServerNotInitialized),
None => return Err(NotificationError::Lifecycle(LifecycleError::NotInitialized)),
};
// Loading configuration
@@ -159,7 +151,6 @@ impl Notifier {
/// # Using
/// Supports notification rules for adding multiple event types, prefixes, suffixes, and targets to the same bucket in batches.
pub async fn add_event_specific_rules(
&self,
bucket_name: &str,
region: &str,
event_rules: &[(Vec<EventName>, String, String, Vec<TargetID>)],
@@ -176,10 +167,7 @@ impl Notifier {
}
// Get global NotificationSystem instance
let notification_sys = match notification_system() {
Some(sys) => sys,
None => return Err(NotificationError::ServerNotInitialized),
};
let notification_sys = notification_system().ok_or(NotificationError::Lifecycle(LifecycleError::NotInitialized))?;
// Loading configuration
notification_sys
@@ -196,12 +184,9 @@ impl Notifier {
/// This function allows you to clear all notification rules for a specific bucket.
/// This is useful when you want to reset the notification configuration for a bucket.
///
pub async fn clear_bucket_notification_rules(&self, bucket_name: &str) -> Result<(), NotificationError> {
pub async fn clear_bucket_notification_rules(bucket_name: &str) -> Result<(), NotificationError> {
// Get global NotificationSystem instance
let notification_sys = match notification_system() {
Some(sys) => sys,
None => return Err(NotificationError::ServerNotInitialized),
};
let notification_sys = notification_system().ok_or(NotificationError::Lifecycle(LifecycleError::NotInitialized))?;
// Clear configuration
notification_sys.remove_bucket_notification_config(bucket_name).await;

View File

@@ -199,7 +199,9 @@ impl NotificationSystem {
F: FnMut(&mut Config) -> bool, // The closure returns a boolean value indicating whether the configuration has been changed
{
let Some(store) = rustfs_ecstore::global::new_object_layer_fn() else {
return Err(NotificationError::ServerNotInitialized);
return Err(NotificationError::StorageNotAvailable(
"Failed to save target configuration: server storage not initialized".to_string(),
));
};
let mut new_config = rustfs_ecstore::config::com::read_config_without_migrate(store.clone())

View File

@@ -18,18 +18,18 @@
//! It supports sending events to various targets
//! (like Webhook and MQTT) and includes features like event persistence and retry on failure.
pub mod error;
pub mod event;
mod error;
mod event;
pub mod factory;
pub mod global;
mod global;
pub mod integration;
pub mod notifier;
pub mod registry;
pub mod rules;
pub mod stream;
// Re-exports
pub use error::NotificationError;
pub use event::{Event, EventArgs};
pub use global::{initialize, is_notification_system_initialized, notification_system};
pub use error::{LifecycleError, NotificationError};
pub use event::{Event, EventArgs, EventArgsBuilder};
pub use global::{initialize, is_notification_system_initialized, notification_system, notifier_global};
pub use integration::NotificationSystem;
pub use rules::BucketNotificationConfig;

View File

@@ -112,7 +112,6 @@ mime_guess = { workspace = true }
pin-project-lite.workspace = true
rust-embed = { workspace = true, features = ["interpolate-folder-path"] }
s3s.workspace = true
scopeguard.workspace = true
shadow-rs = { workspace = true, features = ["build", "metadata"] }
sysinfo = { workspace = true, features = ["multithread"] }
thiserror = { workspace = true }

View File

@@ -134,7 +134,7 @@ impl Operation for NotificationTarget {
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
// 3. Get notification system instance
let Some(ns) = rustfs_notify::global::notification_system() else {
let Some(ns) = rustfs_notify::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
@@ -300,7 +300,7 @@ impl Operation for ListNotificationTargets {
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
// 2. Get notification system instance
let Some(ns) = rustfs_notify::global::notification_system() else {
let Some(ns) = rustfs_notify::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
@@ -351,7 +351,7 @@ impl Operation for ListTargetsArns {
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
// 2. Get notification system instance
let Some(ns) = rustfs_notify::global::notification_system() else {
let Some(ns) = rustfs_notify::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};
@@ -401,7 +401,7 @@ impl Operation for RemoveNotificationTarget {
check_key_valid(get_session_token(&req.uri, &req.headers).unwrap_or_default(), &input_cred.access_key).await?;
// 3. Get notification system instance
let Some(ns) = rustfs_notify::global::notification_system() else {
let Some(ns) = rustfs_notify::notification_system() else {
return Err(s3_error!(InternalError, "notification system not initialized"));
};

View File

@@ -58,7 +58,7 @@ use rustfs_ecstore::{
update_erasure_type,
};
use rustfs_iam::init_iam_sys;
use rustfs_notify::global::notifier_instance;
use rustfs_notify::notifier_global;
use rustfs_obs::{init_obs, set_global_guard};
use rustfs_targets::arn::TargetID;
use rustfs_utils::net::parse_and_resolve_address;
@@ -517,8 +517,7 @@ async fn add_bucket_notification_configuration(buckets: Vec<String>) {
process_topic_configurations(&mut event_rules, cfg.topic_configurations.clone(), TargetID::from_str);
process_lambda_configurations(&mut event_rules, cfg.lambda_function_configurations.clone(), TargetID::from_str);
if let Err(e) = notifier_instance()
.add_event_specific_rules(bucket, region, &event_rules)
if let Err(e) = notifier_global::add_event_specific_rules(bucket, region, &event_rules)
.await
.map_err(|e| s3_error!(InternalError, "Failed to add rules: {e}"))
{

View File

@@ -14,6 +14,8 @@
use crate::auth::get_condition_values;
use crate::error::ApiError;
use crate::storage::entity;
use crate::storage::helper::OperationHelper;
use crate::storage::options::{filter_object_metadata, get_content_sha256};
use crate::storage::{
access::{ReqInfo, authorize_request},
@@ -84,7 +86,7 @@ use rustfs_kms::{
service_manager::get_global_encryption_service,
types::{EncryptionMetadata, ObjectEncryptionContext},
};
use rustfs_notify::global::notifier_instance;
use rustfs_notify::{EventArgsBuilder, notifier_global};
use rustfs_policy::{
auth,
policy::{
@@ -102,11 +104,10 @@ use rustfs_targets::{
EventName,
arn::{TargetID, TargetIDError},
};
use rustfs_utils::http::{AMZ_CHECKSUM_MODE, AMZ_CHECKSUM_TYPE};
use rustfs_utils::{
CompressionAlgorithm,
CompressionAlgorithm, extract_req_params_header, extract_resp_elements, get_request_host, get_request_user_agent,
http::{
AMZ_BUCKET_REPLICATION_STATUS,
AMZ_BUCKET_REPLICATION_STATUS, AMZ_CHECKSUM_MODE, AMZ_CHECKSUM_TYPE,
headers::{
AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING, AMZ_RESTORE_EXPIRY_DAYS, AMZ_RESTORE_REQUEST_DATE,
RESERVED_METADATA_PREFIX_LOWER,
@@ -353,6 +354,7 @@ impl FS {
}
async fn put_object_extract(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:PutObject").suppress_event();
let input = req.input;
let PutObjectInput {
@@ -495,20 +497,20 @@ impl FS {
..Default::default()
};
let event_args = rustfs_notify::event::EventArgs {
let event_args = rustfs_notify::EventArgs {
event_name: EventName::ObjectCreatedPut,
bucket_name: bucket.clone(),
object: _obj_info.clone(),
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
req_params: extract_req_params_header(&req.headers),
resp_elements: extract_resp_elements(&S3Response::new(output.clone())),
version_id: version_id.clone(),
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
host: get_request_host(&req.headers),
user_agent: get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
notifier_global::notify(event_args).await;
});
}
}
@@ -575,7 +577,9 @@ impl FS {
checksum_crc64nvme,
..Default::default()
};
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
}
@@ -602,6 +606,7 @@ impl S3 for FS {
fields(start_time=?time::OffsetDateTime::now_utc())
)]
async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
let helper = OperationHelper::new(&req, EventName::BucketCreated, "s3:CreateBucket");
let CreateBucketInput {
bucket,
object_lock_enabled_for_bucket,
@@ -628,28 +633,15 @@ impl S3 for FS {
let output = CreateBucketOutput::default();
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::BucketCreated,
bucket_name: bucket.clone(),
object: ObjectInfo { ..Default::default() },
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id: String::new(),
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
/// Copy an object from one location to another
#[instrument(level = "debug", skip(self, req))]
async fn copy_object(&self, req: S3Request<CopyObjectInput>) -> S3Result<S3Response<CopyObjectOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedCopy, "s3:CopyObject");
let CopyObjectInput {
copy_source,
bucket,
@@ -830,28 +822,12 @@ impl S3 for FS {
..Default::default()
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.object(object_info).version_id(version_id);
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedCopy,
bucket_name: bucket.clone(),
object: object_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
async fn restore_object(&self, req: S3Request<RestoreObjectInput>) -> S3Result<S3Response<RestoreObjectOutput>> {
@@ -902,7 +878,7 @@ impl S3 for FS {
}
//let mut api_err;
let mut _status_code = http::StatusCode::OK;
let mut _status_code = StatusCode::OK;
let mut already_restored = false;
if let Err(_err) = rreq.validate(store.clone()) {
//api_err = to_api_err(ErrMalformedXML);
@@ -919,7 +895,7 @@ impl S3 for FS {
));
}
if !obj_info.restore_ongoing && obj_info.restore_expires.unwrap().unix_timestamp() != 0 {
_status_code = http::StatusCode::ACCEPTED;
_status_code = StatusCode::ACCEPTED;
already_restored = true;
}
}
@@ -1086,12 +1062,13 @@ impl S3 for FS {
restore_output_path: None,
};
return Ok(S3Response::with_headers(output, header));
Ok(S3Response::with_headers(output, header))
}
/// Delete a bucket
#[instrument(level = "debug", skip(self, req))]
async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
let helper = OperationHelper::new(&req, EventName::BucketRemoved, "s3:DeleteBucket");
let input = req.input;
// TODO: DeleteBucketInput doesn't have force parameter?
let Some(store) = new_object_layer_fn() else {
@@ -1109,28 +1086,15 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::BucketRemoved,
bucket_name: input.bucket,
object: ObjectInfo { ..Default::default() },
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteBucketOutput {})),
version_id: String::new(),
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(DeleteBucketOutput {}))
let result = Ok(S3Response::new(DeleteBucketOutput {}));
let _ = helper.complete(&result);
result
}
/// Delete an object
#[instrument(level = "debug", skip(self, req))]
async fn delete_object(&self, mut req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectRemovedDelete, "s3:DeleteObject");
let DeleteObjectInput {
bucket, key, version_id, ..
} = req.input.clone();
@@ -1233,32 +1197,20 @@ impl S3 for FS {
EventName::ObjectRemovedDelete
};
let event_args = rustfs_notify::event::EventArgs {
event_name,
bucket_name: bucket.clone(),
object: ObjectInfo {
name: key.clone(),
bucket: bucket.clone(),
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteBucketOutput {})),
version_id: version_id.map(|v| v.to_string()).unwrap_or_default(),
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
helper = helper.event_name(event_name);
helper = helper
.object(obj_info)
.version_id(version_id.map(|v| v.to_string()).unwrap_or_default());
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
/// Delete multiple objects
#[instrument(level = "debug", skip(self, req))]
async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
let helper = OperationHelper::new(&req, EventName::ObjectRemovedDelete, "s3:DeleteObjects").suppress_event();
let DeleteObjectsInput { bucket, delete, .. } = req.input;
if delete.objects.is_empty() || delete.objects.len() > 1000 {
@@ -1393,10 +1345,12 @@ impl S3 for FS {
.map(|v| v.as_ref().map(|v| v.clone().into()))
.collect::<Vec<Option<DiskError>>>() as &[Option<DiskError>],
) {
return Err(S3Error::with_message(S3ErrorCode::NoSuchBucket, "Bucket not found".to_string()));
let result = Err(S3Error::with_message(S3ErrorCode::NoSuchBucket, "Bucket not found".to_string()));
let _ = helper.complete(&result);
return result;
}
for (i, err) in errs.into_iter().enumerate() {
for (i, err) in errs.iter().enumerate() {
let obj = dobjs[i].clone();
// let replication_state = obj.replication_state.clone().unwrap_or_default();
@@ -1426,7 +1380,7 @@ impl S3 for FS {
continue;
}
if let Some(err) = err {
if let Some(err) = err.clone() {
delete_results[*didx].error = Some(Error {
code: Some(err.to_string()),
key: Some(object_to_delete[i].object_name.clone()),
@@ -1481,39 +1435,39 @@ impl S3 for FS {
}
}
// Asynchronous call will not block the response of the current request
let req_headers = req.headers.clone();
tokio::spawn(async move {
for dobj in dobjs {
let version_id = match dobj.version_id {
None => String::new(),
Some(v) => v.to_string(),
};
let mut event_name = EventName::ObjectRemovedDelete;
if dobj.delete_marker {
event_name = EventName::ObjectRemovedDeleteMarkerCreated;
}
for res in delete_results {
if let Some(dobj) = res.delete_object {
let event_name = if dobj.delete_marker {
EventName::ObjectRemovedDeleteMarkerCreated
} else {
EventName::ObjectRemovedDelete
};
let event_args = EventArgsBuilder::new(
event_name,
bucket.clone(),
ObjectInfo {
name: dobj.object_name.clone(),
bucket: bucket.clone(),
..Default::default()
},
)
.version_id(dobj.version_id.map(|v| v.to_string()).unwrap_or_default())
.req_params(extract_req_params_header(&req_headers))
.resp_elements(extract_resp_elements(&S3Response::new(DeleteObjectsOutput::default())))
.host(get_request_host(&req_headers))
.user_agent(get_request_user_agent(&req_headers))
.build();
let event_args = rustfs_notify::event::EventArgs {
event_name,
bucket_name: bucket.clone(),
object: ObjectInfo {
name: dobj.object_name,
bucket: bucket.clone(),
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteObjectsOutput {
..Default::default()
})),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
notifier_instance().notify(event_args).await;
notifier_global::notify(event_args).await;
}
}
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
/// Get bucket location
@@ -1548,6 +1502,7 @@ impl S3 for FS {
fields(start_time=?time::OffsetDateTime::now_utc())
)]
async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGet, "s3:GetObject");
// mc get 3
let GetObjectInput {
@@ -1879,27 +1834,12 @@ impl S3 for FS {
..Default::default()
};
let version_id = match req.input.version_id {
None => String::new(),
Some(v) => v.to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedGet,
bucket_name: bucket.clone(),
object: event_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(GetObjectOutput { ..Default::default() })),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.object(event_info).version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
#[instrument(level = "debug", skip(self, req))]
@@ -1921,6 +1861,7 @@ impl S3 for FS {
#[instrument(level = "debug", skip(self, req))]
async fn head_object(&self, req: S3Request<HeadObjectInput>) -> S3Result<S3Response<HeadObjectOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedHead, "s3:HeadObject");
// mc get 2
let HeadObjectInput {
bucket,
@@ -2055,27 +1996,13 @@ impl S3 for FS {
..Default::default()
};
let version_id = match req.input.version_id {
None => String::new(),
Some(v) => v.to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedGet,
bucket_name: bucket,
object: event_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.object(event_info).version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
Ok(S3Response::new(output))
result
}
#[instrument(level = "debug", skip(self))]
@@ -2344,6 +2271,7 @@ impl S3 for FS {
// #[instrument(level = "debug", skip(self, req))]
async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:PutObject");
if req
.headers
.get("X-Amz-Meta-Snowball-Auto-Extract")
@@ -2360,7 +2288,6 @@ impl S3 for FS {
return Err(s3_error!(InvalidStorageClass));
}
}
let event_version_id = input.version_id.as_ref().map(|v| v.to_string()).unwrap_or_default();
let PutObjectInput {
body,
bucket,
@@ -2612,7 +2539,6 @@ impl S3 for FS {
.put_object(&bucket, &key, &mut reader, &opts)
.await
.map_err(ApiError::from)?;
let event_info = obj_info.clone();
let e_tag = obj_info.etag.clone().map(|etag| to_s3s_etag(&etag));
let repoptions =
@@ -2671,23 +2597,9 @@ impl S3 for FS {
..Default::default()
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPut,
bucket_name: bucket.clone(),
object: event_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id: event_version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
#[instrument(level = "debug", skip(self, req))]
@@ -2695,6 +2607,7 @@ impl S3 for FS {
&self,
req: S3Request<CreateMultipartUploadInput>,
) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
let helper = OperationHelper::new(&req, EventName::ObjectCreatedPut, "s3:CreateMultipartUpload");
let CreateMultipartUploadInput {
bucket,
key,
@@ -2826,8 +2739,6 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let object_name = key.clone();
let bucket_name = bucket.clone();
let output = CreateMultipartUploadOutput {
bucket: Some(bucket),
key: Some(key),
@@ -2840,31 +2751,9 @@ impl S3 for FS {
..Default::default()
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedCompleteMultipartUpload,
bucket_name: bucket_name.clone(),
object: ObjectInfo {
name: object_name,
bucket: bucket_name,
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
#[instrument(level = "debug", skip(self, req))]
@@ -3430,6 +3319,7 @@ impl S3 for FS {
&self,
req: S3Request<CompleteMultipartUploadInput>,
) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
let helper = OperationHelper::new(&req, EventName::ObjectCreatedCompleteMultipartUpload, "s3:CompleteMultipartUpload");
let input = req.input;
let CompleteMultipartUploadInput {
multipart_upload,
@@ -3536,6 +3426,26 @@ impl S3 for FS {
}
let output = CompleteMultipartUploadOutput {
bucket: Some(bucket.clone()),
key: Some(key.clone()),
e_tag: obj_info.etag.clone().map(|etag| to_s3s_etag(&etag)),
location: Some("us-east-1".to_string()),
server_side_encryption: server_side_encryption.clone(), // TDD: Return encryption info
ssekms_key_id: ssekms_key_id.clone(), // TDD: Return KMS key ID if present
checksum_crc32: checksum_crc32.clone(),
checksum_crc32c: checksum_crc32c.clone(),
checksum_sha1: checksum_sha1.clone(),
checksum_sha256: checksum_sha256.clone(),
checksum_crc64nvme: checksum_crc64nvme.clone(),
checksum_type: checksum_type.clone(),
..Default::default()
};
info!(
"TDD: Created output: SSE={:?}, KMS={:?}",
output.server_side_encryption, output.ssekms_key_id
);
let helper_output = entity::CompleteMultipartUploadOutput {
bucket: Some(bucket.clone()),
key: Some(key.clone()),
e_tag: obj_info.etag.clone().map(|etag| to_s3s_etag(&etag)),
@@ -3550,10 +3460,6 @@ impl S3 for FS {
checksum_type,
..Default::default()
};
info!(
"TDD: Created output: SSE={:?}, KMS={:?}",
output.server_side_encryption, output.ssekms_key_id
);
let mt2 = HashMap::new();
let repoptions =
@@ -3569,6 +3475,8 @@ impl S3 for FS {
"TDD: About to return S3Response with output: SSE={:?}, KMS={:?}",
output.server_side_encryption, output.ssekms_key_id
);
let helper_result = Ok(S3Response::new(helper_output));
let _ = helper.complete(&helper_result);
Ok(S3Response::new(output))
}
@@ -3655,6 +3563,7 @@ impl S3 for FS {
#[instrument(level = "debug", skip(self, req))]
async fn put_object_tagging(&self, req: S3Request<PutObjectTaggingInput>) -> S3Result<S3Response<PutObjectTaggingOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutTagging, "s3:PutObjectTagging");
let PutObjectTaggingInput {
bucket,
key: object,
@@ -3706,31 +3615,12 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPutTagging,
bucket_name: bucket.clone(),
object: ObjectInfo {
name: object.clone(),
bucket,
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(PutObjectTaggingOutput { version_id: None })),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(PutObjectTaggingOutput { version_id: None }))
let result = Ok(S3Response::new(PutObjectTaggingOutput { version_id: None }));
let _ = helper.complete(&result);
result
}
#[instrument(level = "debug", skip(self))]
@@ -3760,6 +3650,7 @@ impl S3 for FS {
&self,
req: S3Request<DeleteObjectTaggingInput>,
) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedDeleteTagging, "s3:DeleteObjectTagging");
let DeleteObjectTaggingInput { bucket, key: object, .. } = req.input.clone();
let Some(store) = new_object_layer_fn() else {
@@ -3773,31 +3664,12 @@ impl S3 for FS {
.await
.map_err(ApiError::from)?;
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => Uuid::new_v4().to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedDeleteTagging,
bucket_name: bucket.clone(),
object: ObjectInfo {
name: object.clone(),
bucket,
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(DeleteObjectTaggingOutput { version_id: None })),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
helper = helper.version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None }))
let result = Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None }));
let _ = helper.complete(&result);
result
}
#[instrument(level = "debug", skip(self))]
@@ -4391,7 +4263,7 @@ impl S3 for FS {
let region = rustfs_ecstore::global::get_global_region().unwrap_or_else(|| req.region.clone().unwrap_or_default());
// Purge old rules and resolve new rules in parallel
let clear_rules = notifier_instance().clear_bucket_notification_rules(&bucket);
let clear_rules = notifier_global::clear_bucket_notification_rules(&bucket);
let parse_rules = async {
let mut event_rules = Vec::new();
@@ -4419,8 +4291,7 @@ impl S3 for FS {
clear_result.map_err(|e| s3_error!(InternalError, "Failed to clear rules: {e}"))?;
// Add a new notification rule
notifier_instance()
.add_event_specific_rules(&bucket, &region, &event_rules)
notifier_global::add_event_specific_rules(&bucket, &region, &event_rules)
.await
.map_err(|e| s3_error!(InternalError, "Failed to add rules: {e}"))?;
@@ -4532,6 +4403,7 @@ impl S3 for FS {
&self,
req: S3Request<GetObjectAttributesInput>,
) -> S3Result<S3Response<GetObjectAttributesOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedAttributes, "s3:GetObjectAttributes");
let GetObjectAttributesInput { bucket, key, .. } = req.input.clone();
let Some(store) = new_object_layer_fn() else {
@@ -4550,31 +4422,19 @@ impl S3 for FS {
object_parts: None,
..Default::default()
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedAttributes,
bucket_name: bucket.clone(),
object: ObjectInfo {
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper
.object(ObjectInfo {
name: key.clone(),
bucket,
..Default::default()
},
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
})
.version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
async fn put_object_acl(&self, req: S3Request<PutObjectAclInput>) -> S3Result<S3Response<PutObjectAclOutput>> {
@@ -4690,6 +4550,7 @@ impl S3 for FS {
&self,
req: S3Request<GetObjectLegalHoldInput>,
) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGetLegalHold, "s3:GetObjectLegalHold");
let GetObjectLegalHoldInput {
bucket, key, version_id, ..
} = req.input.clone();
@@ -4732,33 +4593,19 @@ impl S3 for FS {
}),
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => Uuid::new_v4().to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedGetLegalHold,
bucket_name: bucket.clone(),
object: object_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
helper = helper.object(object_info).version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
async fn put_object_legal_hold(
&self,
req: S3Request<PutObjectLegalHoldInput>,
) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutLegalHold, "s3:PutObjectLegalHold");
let PutObjectLegalHoldInput {
bucket,
key,
@@ -4811,33 +4658,19 @@ impl S3 for FS {
let output = PutObjectLegalHoldOutput {
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPutLegalHold,
bucket_name: bucket.clone(),
object: info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.object(info).version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
async fn get_object_retention(
&self,
req: S3Request<GetObjectRetentionInput>,
) -> S3Result<S3Response<GetObjectRetentionOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedGetRetention, "s3:GetObjectRetention");
let GetObjectRetentionInput {
bucket, key, version_id, ..
} = req.input.clone();
@@ -4872,33 +4705,19 @@ impl S3 for FS {
let output = GetObjectRetentionOutput {
retention: Some(ObjectLockRetention { mode, retain_until_date }),
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => String::new(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectAccessedGetRetention,
bucket_name: bucket.clone(),
object: object_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_default();
helper = helper.object(object_info).version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
async fn put_object_retention(
&self,
req: S3Request<PutObjectRetentionInput>,
) -> S3Result<S3Response<PutObjectRetentionOutput>> {
let mut helper = OperationHelper::new(&req, EventName::ObjectCreatedPutRetention, "s3:PutObjectRetention");
let PutObjectRetentionInput {
bucket,
key,
@@ -4947,27 +4766,12 @@ impl S3 for FS {
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
};
let version_id = match req.input.version_id {
Some(v) => v.to_string(),
None => Uuid::new_v4().to_string(),
};
let event_args = rustfs_notify::event::EventArgs {
event_name: EventName::ObjectCreatedPutRetention,
bucket_name: bucket.clone(),
object: object_info,
req_params: rustfs_utils::extract_req_params_header(&req.headers),
resp_elements: rustfs_utils::extract_resp_elements(&S3Response::new(output.clone())),
version_id,
host: rustfs_utils::get_request_host(&req.headers),
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
};
let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
helper = helper.object(object_info).version_id(version_id);
// Asynchronous call will not block the response of the current request
tokio::spawn(async move {
notifier_instance().notify(event_args).await;
});
Ok(S3Response::new(output))
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
result
}
}

View File

@@ -0,0 +1,63 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use s3s::dto::{
BucketKeyEnabled, BucketName, ChecksumCRC32, ChecksumCRC32C, ChecksumCRC64NVME, ChecksumSHA1, ChecksumSHA256, ChecksumType,
ETag, Expiration, Location, ObjectKey, ObjectVersionId, RequestCharged, SSEKMSKeyId, ServerSideEncryption,
};
#[derive(Debug, Clone, Default)]
pub struct CompleteMultipartUploadOutput {
pub bucket: Option<BucketName>,
pub bucket_key_enabled: Option<BucketKeyEnabled>,
pub checksum_crc32: Option<ChecksumCRC32>,
pub checksum_crc32c: Option<ChecksumCRC32C>,
pub checksum_crc64nvme: Option<ChecksumCRC64NVME>,
pub checksum_sha1: Option<ChecksumSHA1>,
pub checksum_sha256: Option<ChecksumSHA256>,
pub checksum_type: Option<ChecksumType>,
pub e_tag: Option<ETag>,
pub expiration: Option<Expiration>,
pub key: Option<ObjectKey>,
pub location: Option<Location>,
pub request_charged: Option<RequestCharged>,
pub ssekms_key_id: Option<SSEKMSKeyId>,
pub server_side_encryption: Option<ServerSideEncryption>,
pub version_id: Option<ObjectVersionId>,
}
impl From<s3s::dto::CompleteMultipartUploadOutput> for CompleteMultipartUploadOutput {
fn from(output: s3s::dto::CompleteMultipartUploadOutput) -> Self {
Self {
bucket: output.bucket,
bucket_key_enabled: output.bucket_key_enabled,
checksum_crc32: output.checksum_crc32,
checksum_crc32c: output.checksum_crc32c,
checksum_crc64nvme: output.checksum_crc64nvme,
checksum_sha1: output.checksum_sha1,
checksum_sha256: output.checksum_sha256,
checksum_type: output.checksum_type,
e_tag: output.e_tag,
expiration: output.expiration,
key: output.key,
location: output.location,
request_charged: output.request_charged,
ssekms_key_id: output.ssekms_key_id,
server_side_encryption: output.server_side_encryption,
version_id: output.version_id,
}
}
}

View File

@@ -0,0 +1,209 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use http::StatusCode;
use rustfs_audit::{
entity::{ApiDetails, ApiDetailsBuilder, AuditEntryBuilder},
global::AuditLogger,
};
use rustfs_ecstore::store_api::ObjectInfo;
use rustfs_notify::{EventArgsBuilder, notifier_global};
use rustfs_targets::EventName;
use rustfs_utils::{
extract_req_params, extract_req_params_header, extract_resp_elements, get_request_host, get_request_user_agent,
};
use s3s::{S3Request, S3Response, S3Result};
use std::future::Future;
use tokio::runtime::{Builder, Handle};
/// Schedules an asynchronous task on the current runtime;
/// if there is no runtime, creates a minimal runtime execution on a new thread.
fn spawn_background<F>(fut: F)
where
F: Future<Output = ()> + Send + 'static,
{
if let Ok(handle) = Handle::try_current() {
drop(handle.spawn(fut));
} else {
std::thread::spawn(|| {
if let Ok(rt) = Builder::new_current_thread().enable_all().build() {
rt.block_on(fut);
}
});
}
}
/// A unified helper structure for building and distributing audit logs and event notifications via RAII mode at the end of an S3 operation scope.
pub struct OperationHelper {
audit_builder: Option<AuditEntryBuilder>,
api_builder: ApiDetailsBuilder,
event_builder: Option<EventArgsBuilder>,
start_time: std::time::Instant,
}
impl OperationHelper {
/// Create a new OperationHelper for S3 requests.
pub fn new(req: &S3Request<impl Send + Sync>, event: EventName, trigger: &'static str) -> Self {
// Parse path -> bucket/object
let path = req.uri.path().trim_start_matches('/');
let mut segs = path.splitn(2, '/');
let bucket = segs.next().unwrap_or("").to_string();
let object_key = segs.next().unwrap_or("").to_string();
// Infer remote address
let remote_host = req
.headers
.get("x-forwarded-for")
.and_then(|v| v.to_str().ok())
.or_else(|| req.headers.get("x-real-ip").and_then(|v| v.to_str().ok()))
.unwrap_or("")
.to_string();
// Initialize audit builder
let mut api_builder = ApiDetailsBuilder::new().name(trigger);
if !bucket.is_empty() {
api_builder = api_builder.bucket(&bucket);
}
if !object_key.is_empty() {
api_builder = api_builder.object(&object_key);
}
// Audit builder
let mut audit_builder = AuditEntryBuilder::new("1.0", event, trigger, ApiDetails::default())
.remote_host(remote_host)
.user_agent(get_request_user_agent(&req.headers))
.req_host(get_request_host(&req.headers))
.req_path(req.uri.path().to_string())
.req_query(extract_req_params(req));
if let Some(req_id) = req.headers.get("x-amz-request-id") {
if let Ok(id_str) = req_id.to_str() {
audit_builder = audit_builder.request_id(id_str);
}
}
// initialize event builder
// object is a placeholder that must be set later using the `object()` method.
let event_builder = EventArgsBuilder::new(event, bucket, ObjectInfo::default())
.host(get_request_host(&req.headers))
.user_agent(get_request_user_agent(&req.headers))
.req_params(extract_req_params_header(&req.headers));
Self {
audit_builder: Some(audit_builder),
api_builder,
event_builder: Some(event_builder),
start_time: std::time::Instant::now(),
}
}
/// Sets the ObjectInfo for event notification.
pub fn object(mut self, object_info: ObjectInfo) -> Self {
if let Some(builder) = self.event_builder.take() {
self.event_builder = Some(builder.object(object_info));
}
self
}
/// Set the version ID for event notifications.
pub fn version_id(mut self, version_id: impl Into<String>) -> Self {
if let Some(builder) = self.event_builder.take() {
self.event_builder = Some(builder.version_id(version_id));
}
self
}
/// Set the event name for event notifications.
pub fn event_name(mut self, event_name: EventName) -> Self {
if let Some(builder) = self.event_builder.take() {
self.event_builder = Some(builder.event_name(event_name));
}
if let Some(builder) = self.audit_builder.take() {
self.audit_builder = Some(builder.event(event_name));
}
self
}
/// Complete operational details from S3 results.
/// This method should be called immediately before the function returns.
/// It consumes and prepares auxiliary structures for use during `drop`.
pub fn complete(mut self, result: &S3Result<S3Response<impl Send + Sync>>) -> Self {
// Complete audit log
if let Some(builder) = self.audit_builder.take() {
let (status, status_code, error_msg) = match result {
Ok(res) => ("success".to_string(), res.status.unwrap_or(StatusCode::OK).as_u16() as i32, None),
Err(e) => (
"failure".to_string(),
e.status_code().unwrap_or(StatusCode::BAD_REQUEST).as_u16() as i32,
e.message().map(|s| s.to_string()),
),
};
let ttr = self.start_time.elapsed();
let api_details = self
.api_builder
.clone()
.status(status)
.status_code(status_code)
.time_to_response(format!("{:.2?}", ttr))
.time_to_response_in_ns(ttr.as_nanos().to_string())
.build();
let mut final_builder = builder.api(api_details.clone());
if let Some(err) = error_msg {
final_builder = final_builder.error(err);
}
self.audit_builder = Some(final_builder);
self.api_builder = ApiDetailsBuilder(api_details); // Store final details for Drop use
}
// Completion event notification (only on success)
if let (Some(builder), Ok(res)) = (self.event_builder.take(), result) {
self.event_builder = Some(builder.resp_elements(extract_resp_elements(res)));
}
self
}
/// Suppresses the automatic event notification on drop.
pub fn suppress_event(mut self) -> Self {
self.event_builder = None;
self
}
}
impl Drop for OperationHelper {
fn drop(&mut self) {
// Distribute audit logs
if let Some(builder) = self.audit_builder.take() {
spawn_background(async move {
AuditLogger::log(builder.build()).await;
});
}
// Distribute event notification (only on success)
if self.api_builder.0.status.as_deref() == Some("success") {
if let Some(builder) = self.event_builder.take() {
let event_args = builder.build();
// Avoid generating notifications for copy requests
if !event_args.is_replication_request() {
spawn_background(async move {
notifier_global::notify(event_args).await;
});
}
}
}
}
}

View File

@@ -14,6 +14,7 @@
pub mod access;
pub mod ecfs;
// pub mod error;
pub(crate) mod entity;
pub(crate) mod helper;
pub mod options;
pub mod tonic_service;