feat(targets): extract targets module into a standalone crate (#441)

* init audit logger module

* add audit webhook default config kvs

* feat: Add comprehensive tests for authentication module (#309)

* feat: add comprehensive tests for authentication module

- Add 33 unit tests covering all public functions in auth.rs
- Test IAMAuth struct creation and secret key validation
- Test check_claims_from_token with various credential types and scenarios
- Test session token extraction from headers and query parameters
- Test condition values generation for different user types
- Test query parameter parsing with edge cases
- Test Credentials helper methods (is_expired, is_temp, is_service_account)
- Ensure tests handle global state dependencies gracefully
- All tests pass successfully with 100% coverage of testable functions

* style: fix code formatting issues

* Add verification script for checking PR branch statuses and tests

Co-authored-by: anzhengchao <anzhengchao@gmail.com>

* fix: resolve clippy uninlined format args warning

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>

* feat: add basic tests for core storage module (#313)

* feat: add basic tests for core storage module

- Add 6 unit tests for FS struct and basic functionality
- Test FS creation, Debug and Clone trait implementations
- Test RUSTFS_OWNER constant definition and values
- Test S3 error code creation and handling
- Test compression format detection for common file types
- Include comprehensive documentation about integration test needs

Note: Full S3 API testing requires complex setup with storage backend,
global configuration, and network infrastructure - better suited for
integration tests rather than unit tests.

* style: fix code formatting issues

* fix: resolve clippy warnings in storage tests

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>

* feat: add tests for admin handlers module (#314)

* feat: add tests for admin handlers module

- Add 5 new unit tests for admin handler functionality
- Test AccountInfo struct creation, serialization and default values
- Test creation of all admin handler structs (13 handlers)
- Test HealOpts JSON serialization and deserialization
- Test HealOpts URL encoding/decoding with proper field types
- Maintain existing test while adding comprehensive coverage
- Include documentation about integration test requirements

All tests pass successfully with proper error handling for complex dependencies.

* style: fix code formatting issues

* fix: resolve clippy warnings in admin handlers tests

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>

* build(deps): bump the dependencies group with 3 updates (#326)

* perf: avoid transmitting parity shards when the object is good (#322)

* upgrade version

* Fix: fix data integrity check

Signed-off-by: junxiang Mu <1948535941@qq.com>

* Fix: Separate Clippy's fix and check commands into two commands.

Signed-off-by: junxiang Mu <1948535941@qq.com>

* fix: miss inline metadata (#345)

* Update dependabot.yml

* fix: Fixed an issue where the list_objects_v2 API did not return dire… (#352)

* fix: Fixed an issue where the list_objects_v2 API did not return directory names when they conflicted with file names in the same bucket (e.g., test/ vs. test.txt, aaa/ vs. aaa.csv) (#335)

* fix: adjusted the order of directory listings

* init

* fix

* fix

* feat: add docker usage for rustfs mcp (#365)

* feat: enhance metadata extraction with object name for MIME type detection

Signed-off-by: junxiang Mu <1948535941@qq.com>

* Feature: lock support auto release

Signed-off-by: junxiang Mu <1948535941@qq.com>

* improve lock

Signed-off-by: junxiang Mu <1948535941@qq.com>

* Fix: fix scanner detect

Signed-off-by: junxiang Mu <1948535941@qq.com>

* Fix: clippy && fmt

Signed-off-by: junxiang Mu <1948535941@qq.com>

* refactor(ecstore): Optimize memory usage for object integrity verification

Change the object integrity verification from reading all data to streaming processing to avoid memory overflow caused by large objects.

Modify the TLS key log check to use environment variables directly instead of configuration constants.

Add memory limits for object data reading in the AHM module.

Signed-off-by: junxiang Mu <1948535941@qq.com>

* Chore: reduce PR template checklist

Signed-off-by: junxiang Mu <1948535941@qq.com>

* Chore: remove comment code (#376)

Signed-off-by: junxiang Mu <1948535941@qq.com>

* chore: upgrade actions/checkout from v4 to v5 (#381)

* chore: upgrade actions/checkout from v4 to v5

- Update GitHub Actions checkout action version
- Ensure compatibility with latest workflow features
- Maintain existing checkout behavior and configuration

* upgrade version

* fix

* add and improve code for notify

* feat: extend rustfs mcp with bucket creation and deletion (#416)

* feat: extend rustfs mcp with bucket creation and deletion

* update file to fix pipeline error

* change variable name to fix pipeline error

* fix(ecstore): add async-recursion to resolve nightly trait solver reg… (#415)

* fix(ecstore): add async-recursion to resolve nightly trait solver regression

The newest nightly compiler switched to the new trait solver, which
currently rejects async recursive functions that were previously accepted.
This causes the following compilation failures:

- `LocalDisk::delete_file()`
- `LocalDisk::scan_dir()`

Add `async-recursion` as a workspace dependency and annotate both functions with `#[async_recursion]` so that the crate compiles cleanly with the latest nightly and will continue to build once the new solver lands in stable.

Signed-off-by: reigadegr <2722688642@qq.com>

* fix: resolve duplicate bound error in scan_dir function

Replaced inline trait bounds with where clause to avoid duplication caused by macro expansion.

Signed-off-by: reigadegr <2722688642@qq.com>

---------

Signed-off-by: reigadegr <2722688642@qq.com>
Co-authored-by: 安正超 <anzhengchao@gmail.com>

* fix:make bucket exists (#428)

* feat: include user-defined metadata in S3 response (#431)

* fix: simplify Docker entrypoint following efficient user switching pattern (#421)

* fix: simplify Docker entrypoint following efficient user switching pattern

- Remove ALL file permission modifications (no chown at all)
- Use chroot --userspec or gosu to switch user context
- Extremely simple and fast implementation
- Zero filesystem modifications for permissions

Fixes #388

* Update entrypoint.sh

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update entrypoint.sh

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update entrypoint.sh

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* wip

* wip

* wip

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* docs: update doc/docker-data-dir README.md (#432)

* add targets crates

* feat(targets): extract targets module into a standalone crate

- Move all target-related code (MQTT, Webhook, etc.) into a new `targets` crate
- Update imports and dependencies to reference the new crate
- Refactor interfaces to ensure compatibility with the new crate structure
- Adjust Cargo.toml and workspace configuration accordingly

* fix

* fix

---------

Signed-off-by: junxiang Mu <1948535941@qq.com>
Signed-off-by: reigadegr <2722688642@qq.com>
Co-authored-by: 安正超 <anzhengchao@gmail.com>
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: zzhpro <56196563+zzhpro@users.noreply.github.com>
Co-authored-by: junxiang Mu <1948535941@qq.com>
Co-authored-by: weisd <im@weisd.in>
Co-authored-by: shiro.lee <69624924+shiroleeee@users.noreply.github.com>
Co-authored-by: majinghe <42570491+majinghe@users.noreply.github.com>
Co-authored-by: guojidan <63799833+guojidan@users.noreply.github.com>
Co-authored-by: reigadegr <103645642+reigadegr@users.noreply.github.com>
Co-authored-by: 0xdx2 <xuedamon2@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
houseme
2025-08-21 22:33:07 +08:00
committed by GitHub
parent 357cced49c
commit adc07e5209
63 changed files with 2837 additions and 882 deletions

631
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -17,6 +17,7 @@ members = [
"rustfs", # Core file system implementation
"cli/rustfs-gui", # Graphical user interface client
"crates/appauth", # Application authentication and authorization
"crates/audit-logger", # Audit logging system for file operations
"crates/common", # Shared utilities and data structures
"crates/config", # Configuration management
"crates/crypto", # Cryptography and security features
@@ -30,6 +31,7 @@ members = [
"crates/obs", # Observability utilities
"crates/protos", # Protocol buffer definitions
"crates/rio", # Rust I/O utilities and abstractions
"crates/targets", # Target-specific configurations and utilities
"crates/s3select-api", # S3 Select API interface
"crates/s3select-query", # S3 Select query engine
"crates/signer", # client signer
@@ -37,7 +39,7 @@ members = [
"crates/utils", # Utility functions and helpers
"crates/workers", # Worker thread pools and task scheduling
"crates/zip", # ZIP file handling and compression
"crates/ahm",
"crates/ahm", # Asynchronous Hash Map for concurrent data structures
"crates/mcp", # MCP server for S3 operations
]
resolver = "2"
@@ -59,15 +61,11 @@ unsafe_code = "deny"
[workspace.lints.clippy]
all = "warn"
[patch.crates-io]
rustfs-utils = { path = "crates/utils" }
rustfs-filemeta = { path = "crates/filemeta" }
rustfs-rio = { path = "crates/rio" }
[workspace.dependencies]
rustfs-ahm = { path = "crates/ahm", version = "0.0.5" }
rustfs-s3select-api = { path = "crates/s3select-api", version = "0.0.5" }
rustfs-appauth = { path = "crates/appauth", version = "0.0.5" }
rustfs-audit-logger = { path = "crates/audit-logger", version = "0.0.5" }
rustfs-common = { path = "crates/common", version = "0.0.5" }
rustfs-crypto = { path = "crates/crypto", version = "0.0.5" }
rustfs-ecstore = { path = "crates/ecstore", version = "0.0.5" }
@@ -89,6 +87,7 @@ rustfs-signer = { path = "crates/signer", version = "0.0.5" }
rustfs-checksums = { path = "crates/checksums", version = "0.0.5" }
rustfs-workers = { path = "crates/workers", version = "0.0.5" }
rustfs-mcp = { path = "crates/mcp", version = "0.0.5" }
rustfs-targets = { path = "crates/targets", version = "0.0.5" }
aes-gcm = { version = "0.10.3", features = ["std"] }
anyhow = "1.0.99"
arc-swap = "1.7.1"
@@ -96,15 +95,15 @@ argon2 = { version = "0.5.3", features = ["std"] }
atoi = "2.0.0"
async-channel = "2.5.0"
async-recursion = "1.1.1"
async-trait = "0.1.88"
async-trait = "0.1.89"
async-compression = { version = "0.4.19" }
atomic_enum = "0.3.0"
aws-config = { version = "1.8.4" }
aws-config = { version = "1.8.5" }
aws-sdk-s3 = "1.101.0"
axum = "0.8.4"
base64-simd = "0.8.0"
base64 = "0.22.1"
brotli = "8.0.1"
brotli = "8.0.2"
bytes = { version = "1.10.1", features = ["serde"] }
bytesize = "2.0.1"
byteorder = "1.5.0"
@@ -112,7 +111,7 @@ cfg-if = "1.0.1"
crc-fast = "1.4.0"
chacha20poly1305 = { version = "0.10.1" }
chrono = { version = "0.4.41", features = ["serde"] }
clap = { version = "4.5.44", features = ["derive", "env"] }
clap = { version = "4.5.45", features = ["derive", "env"] }
const-str = { version = "0.6.4", features = ["std", "proc"] }
crc32fast = "1.5.0"
criterion = { version = "0.7", features = ["html_reports"] }
@@ -121,7 +120,7 @@ datafusion = "46.0.1"
derive_builder = "0.20.2"
dioxus = { version = "0.6.3", features = ["router"] }
dirs = "6.0.0"
enumset = "1.1.7"
enumset = "1.1.9"
flatbuffers = "25.2.10"
flate2 = "1.1.2"
flexi_logger = { version = "0.31.2", features = ["trc", "dont_minimize_extra_stacks"] }
@@ -134,7 +133,7 @@ hex = "0.4.3"
hex-simd = "0.8.0"
highway = { version = "1.3.0" }
hmac = "0.12.1"
hyper = "1.6.0"
hyper = "1.7.0"
hyper-util = { version = "0.1.16", features = [
"tokio",
"server-auto",
@@ -193,7 +192,7 @@ rand = "0.9.2"
rdkafka = { version = "0.38.0", features = ["tokio"] }
reed-solomon-simd = { version = "3.0.1" }
regex = { version = "1.11.1" }
reqwest = { version = "0.12.22", default-features = false, features = [
reqwest = { version = "0.12.23", default-features = false, features = [
"rustls-tls",
"charset",
"http2",
@@ -220,7 +219,7 @@ rustls-pemfile = "2.2.0"
s3s = { version = "0.12.0-minio-preview.3" }
schemars = "1.0.4"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = { version = "1.0.142", features = ["raw_value"] }
serde_json = { version = "1.0.143", features = ["raw_value"] }
serde_urlencoded = "0.7.1"
serial_test = "3.2.0"
sha1 = "0.10.6"
@@ -237,7 +236,7 @@ sysctl = "0.6.0"
tempfile = "3.20.0"
temp-env = "0.3.6"
test-case = "3.3.1"
thiserror = "2.0.14"
thiserror = "2.0.15"
time = { version = "0.3.41", features = [
"std",
"parsing",
@@ -278,7 +277,7 @@ zstd = "0.13.3"
[workspace.metadata.cargo-shear]
ignored = ["rustfs", "rust-i18n", "rustfs-mcp"]
ignored = ["rustfs", "rust-i18n", "rustfs-mcp", "rustfs-audit-logger", "tokio-test"]
[profile.wasm-dev]
inherits = "dev"

View File

@@ -0,0 +1,44 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "rustfs-audit-logger"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
homepage.workspace = true
description = "Audit logging system for RustFS, providing detailed logging of file operations and system events."
documentation = "https://docs.rs/audit-logger/latest/audit_logger/"
keywords = ["audit", "logging", "file-operations", "system-events", "RustFS"]
categories = ["web-programming", "development-tools::profiling", "asynchronous", "api-bindings", "development-tools::debugging"]
[dependencies]
rustfs-targets = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true, features = ["std", "attributes"] }
tracing-core = { workspace = true }
tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread", "rt", "time", "macros"] }
url = { workspace = true }
uuid = { workspace = true }
thiserror = { workspace = true }
figment = { version = "0.10", features = ["json", "env"] }
[lints]
workspace = true

View File

@@ -0,0 +1,34 @@
{
"console": {
"enabled": true
},
"logger_webhook": {
"default": {
"enabled": true,
"endpoint": "http://localhost:3000/logs",
"auth_token": "secret-token-for-logs",
"batch_size": 5,
"queue_size": 1000,
"max_retry": 3,
"retry_interval": "2s"
}
},
"audit_webhook": {
"splunk": {
"enabled": true,
"endpoint": "http://localhost:3000/audit",
"auth_token": "secret-token-for-audit",
"batch_size": 10
}
},
"audit_kafka": {
"default": {
"enabled": false,
"brokers": [
"kafka1:9092",
"kafka2:9092"
],
"topic": "minio-audit-events"
}
}
}

View File

@@ -0,0 +1,17 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
fn main() {
println!("Audit Logger Example");
}

View File

@@ -0,0 +1,90 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use crate::entry::ObjectVersion;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Args - defines the arguments for API operations
/// Args is used to define the arguments for API operations.
///
/// # Example
/// ```
/// use rustfs_audit_logger::Args;
/// use std::collections::HashMap;
///
/// let args = Args::new()
/// .set_bucket(Some("my-bucket".to_string()))
/// .set_object(Some("my-object".to_string()))
/// .set_version_id(Some("123".to_string()))
/// .set_metadata(Some(HashMap::new()));
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, Default, Eq, PartialEq)]
pub struct Args {
#[serde(rename = "bucket", skip_serializing_if = "Option::is_none")]
pub bucket: Option<String>,
#[serde(rename = "object", skip_serializing_if = "Option::is_none")]
pub object: Option<String>,
#[serde(rename = "versionId", skip_serializing_if = "Option::is_none")]
pub version_id: Option<String>,
#[serde(rename = "objects", skip_serializing_if = "Option::is_none")]
pub objects: Option<Vec<ObjectVersion>>,
#[serde(rename = "metadata", skip_serializing_if = "Option::is_none")]
pub metadata: Option<HashMap<String, String>>,
}
impl Args {
/// Create a new Args object
pub fn new() -> Self {
Args {
bucket: None,
object: None,
version_id: None,
objects: None,
metadata: None,
}
}
/// Set the bucket
pub fn set_bucket(mut self, bucket: Option<String>) -> Self {
self.bucket = bucket;
self
}
/// Set the object
pub fn set_object(mut self, object: Option<String>) -> Self {
self.object = object;
self
}
/// Set the version ID
pub fn set_version_id(mut self, version_id: Option<String>) -> Self {
self.version_id = version_id;
self
}
/// Set the objects
pub fn set_objects(mut self, objects: Option<Vec<ObjectVersion>>) -> Self {
self.objects = objects;
self
}
/// Set the metadata
pub fn set_metadata(mut self, metadata: Option<HashMap<String, String>>) -> Self {
self.metadata = metadata;
self
}
}

View File

@@ -0,0 +1,469 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use crate::{BaseLogEntry, LogRecord, ObjectVersion};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// API details structure
/// ApiDetails is used to define the details of an API operation
///
/// The `ApiDetails` structure contains the following fields:
/// - `name` - the name of the API operation
/// - `bucket` - the bucket name
/// - `object` - the object name
/// - `objects` - the list of objects
/// - `status` - the status of the API operation
/// - `status_code` - the status code of the API operation
/// - `input_bytes` - the input bytes
/// - `output_bytes` - the output bytes
/// - `header_bytes` - the header bytes
/// - `time_to_first_byte` - the time to first byte
/// - `time_to_first_byte_in_ns` - the time to first byte in nanoseconds
/// - `time_to_response` - the time to response
/// - `time_to_response_in_ns` - the time to response in nanoseconds
///
/// The `ApiDetails` structure contains the following methods:
/// - `new` - create a new `ApiDetails` with default values
/// - `set_name` - set the name
/// - `set_bucket` - set the bucket
/// - `set_object` - set the object
/// - `set_objects` - set the objects
/// - `set_status` - set the status
/// - `set_status_code` - set the status code
/// - `set_input_bytes` - set the input bytes
/// - `set_output_bytes` - set the output bytes
/// - `set_header_bytes` - set the header bytes
/// - `set_time_to_first_byte` - set the time to first byte
/// - `set_time_to_first_byte_in_ns` - set the time to first byte in nanoseconds
/// - `set_time_to_response` - set the time to response
/// - `set_time_to_response_in_ns` - set the time to response in nanoseconds
///
/// # Example
/// ```
/// use rustfs_audit_logger::ApiDetails;
/// use rustfs_audit_logger::ObjectVersion;
///
/// let api = ApiDetails::new()
/// .set_name(Some("GET".to_string()))
/// .set_bucket(Some("my-bucket".to_string()))
/// .set_object(Some("my-object".to_string()))
/// .set_objects(vec![ObjectVersion::new_with_object_name("my-object".to_string())])
/// .set_status(Some("OK".to_string()))
/// .set_status_code(Some(200))
/// .set_input_bytes(100)
/// .set_output_bytes(200)
/// .set_header_bytes(Some(50))
/// .set_time_to_first_byte(Some("100ms".to_string()))
/// .set_time_to_first_byte_in_ns(Some("100000000ns".to_string()))
/// .set_time_to_response(Some("200ms".to_string()))
/// .set_time_to_response_in_ns(Some("200000000ns".to_string()));
/// ```
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
pub struct ApiDetails {
#[serde(rename = "name", skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(rename = "bucket", skip_serializing_if = "Option::is_none")]
pub bucket: Option<String>,
#[serde(rename = "object", skip_serializing_if = "Option::is_none")]
pub object: Option<String>,
#[serde(rename = "objects", skip_serializing_if = "Vec::is_empty", default)]
pub objects: Vec<ObjectVersion>,
#[serde(rename = "status", skip_serializing_if = "Option::is_none")]
pub status: Option<String>,
#[serde(rename = "statusCode", skip_serializing_if = "Option::is_none")]
pub status_code: Option<i32>,
#[serde(rename = "rx")]
pub input_bytes: i64,
#[serde(rename = "tx")]
pub output_bytes: i64,
#[serde(rename = "txHeaders", skip_serializing_if = "Option::is_none")]
pub header_bytes: Option<i64>,
#[serde(rename = "timeToFirstByte", skip_serializing_if = "Option::is_none")]
pub time_to_first_byte: Option<String>,
#[serde(rename = "timeToFirstByteInNS", skip_serializing_if = "Option::is_none")]
pub time_to_first_byte_in_ns: Option<String>,
#[serde(rename = "timeToResponse", skip_serializing_if = "Option::is_none")]
pub time_to_response: Option<String>,
#[serde(rename = "timeToResponseInNS", skip_serializing_if = "Option::is_none")]
pub time_to_response_in_ns: Option<String>,
}
impl ApiDetails {
/// Create a new `ApiDetails` with default values
pub fn new() -> Self {
ApiDetails {
name: None,
bucket: None,
object: None,
objects: Vec::new(),
status: None,
status_code: None,
input_bytes: 0,
output_bytes: 0,
header_bytes: None,
time_to_first_byte: None,
time_to_first_byte_in_ns: None,
time_to_response: None,
time_to_response_in_ns: None,
}
}
/// Set the name
pub fn set_name(mut self, name: Option<String>) -> Self {
self.name = name;
self
}
/// Set the bucket
pub fn set_bucket(mut self, bucket: Option<String>) -> Self {
self.bucket = bucket;
self
}
/// Set the object
pub fn set_object(mut self, object: Option<String>) -> Self {
self.object = object;
self
}
/// Set the objects
pub fn set_objects(mut self, objects: Vec<ObjectVersion>) -> Self {
self.objects = objects;
self
}
/// Set the status
pub fn set_status(mut self, status: Option<String>) -> Self {
self.status = status;
self
}
/// Set the status code
pub fn set_status_code(mut self, status_code: Option<i32>) -> Self {
self.status_code = status_code;
self
}
/// Set the input bytes
pub fn set_input_bytes(mut self, input_bytes: i64) -> Self {
self.input_bytes = input_bytes;
self
}
/// Set the output bytes
pub fn set_output_bytes(mut self, output_bytes: i64) -> Self {
self.output_bytes = output_bytes;
self
}
/// Set the header bytes
pub fn set_header_bytes(mut self, header_bytes: Option<i64>) -> Self {
self.header_bytes = header_bytes;
self
}
/// Set the time to first byte
pub fn set_time_to_first_byte(mut self, time_to_first_byte: Option<String>) -> Self {
self.time_to_first_byte = time_to_first_byte;
self
}
/// Set the time to first byte in nanoseconds
pub fn set_time_to_first_byte_in_ns(mut self, time_to_first_byte_in_ns: Option<String>) -> Self {
self.time_to_first_byte_in_ns = time_to_first_byte_in_ns;
self
}
/// Set the time to response
pub fn set_time_to_response(mut self, time_to_response: Option<String>) -> Self {
self.time_to_response = time_to_response;
self
}
/// Set the time to response in nanoseconds
pub fn set_time_to_response_in_ns(mut self, time_to_response_in_ns: Option<String>) -> Self {
self.time_to_response_in_ns = time_to_response_in_ns;
self
}
}
/// Entry - audit entry logs
/// AuditLogEntry is used to define the structure of an audit log entry
///
/// The `AuditLogEntry` structure contains the following fields:
/// - `base` - the base log entry
/// - `version` - the version of the audit log entry
/// - `deployment_id` - the deployment ID
/// - `event` - the event
/// - `entry_type` - the type of audit message
/// - `api` - the API details
/// - `remote_host` - the remote host
/// - `user_agent` - the user agent
/// - `req_path` - the request path
/// - `req_host` - the request host
/// - `req_claims` - the request claims
/// - `req_query` - the request query
/// - `req_header` - the request header
/// - `resp_header` - the response header
/// - `access_key` - the access key
/// - `parent_user` - the parent user
/// - `error` - the error
///
/// The `AuditLogEntry` structure contains the following methods:
/// - `new` - create a new `AuditEntry` with default values
/// - `new_with_values` - create a new `AuditEntry` with version, time, event and api details
/// - `with_base` - set the base log entry
/// - `set_version` - set the version
/// - `set_deployment_id` - set the deployment ID
/// - `set_event` - set the event
/// - `set_entry_type` - set the entry type
/// - `set_api` - set the API details
/// - `set_remote_host` - set the remote host
/// - `set_user_agent` - set the user agent
/// - `set_req_path` - set the request path
/// - `set_req_host` - set the request host
/// - `set_req_claims` - set the request claims
/// - `set_req_query` - set the request query
/// - `set_req_header` - set the request header
/// - `set_resp_header` - set the response header
/// - `set_access_key` - set the access key
/// - `set_parent_user` - set the parent user
/// - `set_error` - set the error
///
/// # Example
/// ```
/// use rustfs_audit_logger::AuditLogEntry;
/// use rustfs_audit_logger::ApiDetails;
/// use std::collections::HashMap;
///
/// let entry = AuditLogEntry::new()
/// .set_version("1.0".to_string())
/// .set_deployment_id(Some("123".to_string()))
/// .set_event("event".to_string())
/// .set_entry_type(Some("type".to_string()))
/// .set_api(ApiDetails::new())
/// .set_remote_host(Some("remote-host".to_string()))
/// .set_user_agent(Some("user-agent".to_string()))
/// .set_req_path(Some("req-path".to_string()))
/// .set_req_host(Some("req-host".to_string()))
/// .set_req_claims(Some(HashMap::new()))
/// .set_req_query(Some(HashMap::new()))
/// .set_req_header(Some(HashMap::new()))
/// .set_resp_header(Some(HashMap::new()))
/// .set_access_key(Some("access-key".to_string()))
/// .set_parent_user(Some("parent-user".to_string()))
/// .set_error(Some("error".to_string()));
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
pub struct AuditLogEntry {
#[serde(flatten)]
pub base: BaseLogEntry,
pub version: String,
#[serde(rename = "deploymentid", skip_serializing_if = "Option::is_none")]
pub deployment_id: Option<String>,
pub event: String,
// Class of audit message - S3, admin ops, bucket management
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub entry_type: Option<String>,
pub api: ApiDetails,
#[serde(rename = "remotehost", skip_serializing_if = "Option::is_none")]
pub remote_host: Option<String>,
#[serde(rename = "userAgent", skip_serializing_if = "Option::is_none")]
pub user_agent: Option<String>,
#[serde(rename = "requestPath", skip_serializing_if = "Option::is_none")]
pub req_path: Option<String>,
#[serde(rename = "requestHost", skip_serializing_if = "Option::is_none")]
pub req_host: Option<String>,
#[serde(rename = "requestClaims", skip_serializing_if = "Option::is_none")]
pub req_claims: Option<HashMap<String, Value>>,
#[serde(rename = "requestQuery", skip_serializing_if = "Option::is_none")]
pub req_query: Option<HashMap<String, String>>,
#[serde(rename = "requestHeader", skip_serializing_if = "Option::is_none")]
pub req_header: Option<HashMap<String, String>>,
#[serde(rename = "responseHeader", skip_serializing_if = "Option::is_none")]
pub resp_header: Option<HashMap<String, String>>,
#[serde(rename = "accessKey", skip_serializing_if = "Option::is_none")]
pub access_key: Option<String>,
#[serde(rename = "parentUser", skip_serializing_if = "Option::is_none")]
pub parent_user: Option<String>,
#[serde(rename = "error", skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
impl AuditLogEntry {
/// Create a new `AuditEntry` with default values
pub fn new() -> Self {
AuditLogEntry {
base: BaseLogEntry::new(),
version: String::new(),
deployment_id: None,
event: String::new(),
entry_type: None,
api: ApiDetails::new(),
remote_host: None,
user_agent: None,
req_path: None,
req_host: None,
req_claims: None,
req_query: None,
req_header: None,
resp_header: None,
access_key: None,
parent_user: None,
error: None,
}
}
/// Create a new `AuditEntry` with version, time, event and api details
pub fn new_with_values(version: String, time: DateTime<Utc>, event: String, api: ApiDetails) -> Self {
let mut base = BaseLogEntry::new();
base.timestamp = time;
AuditLogEntry {
base,
version,
deployment_id: None,
event,
entry_type: None,
api,
remote_host: None,
user_agent: None,
req_path: None,
req_host: None,
req_claims: None,
req_query: None,
req_header: None,
resp_header: None,
access_key: None,
parent_user: None,
error: None,
}
}
/// Set the base log entry
pub fn with_base(mut self, base: BaseLogEntry) -> Self {
self.base = base;
self
}
/// Set the version
pub fn set_version(mut self, version: String) -> Self {
self.version = version;
self
}
/// Set the deployment ID
pub fn set_deployment_id(mut self, deployment_id: Option<String>) -> Self {
self.deployment_id = deployment_id;
self
}
/// Set the event
pub fn set_event(mut self, event: String) -> Self {
self.event = event;
self
}
/// Set the entry type
pub fn set_entry_type(mut self, entry_type: Option<String>) -> Self {
self.entry_type = entry_type;
self
}
/// Set the API details
pub fn set_api(mut self, api: ApiDetails) -> Self {
self.api = api;
self
}
/// Set the remote host
pub fn set_remote_host(mut self, remote_host: Option<String>) -> Self {
self.remote_host = remote_host;
self
}
/// Set the user agent
pub fn set_user_agent(mut self, user_agent: Option<String>) -> Self {
self.user_agent = user_agent;
self
}
/// Set the request path
pub fn set_req_path(mut self, req_path: Option<String>) -> Self {
self.req_path = req_path;
self
}
/// Set the request host
pub fn set_req_host(mut self, req_host: Option<String>) -> Self {
self.req_host = req_host;
self
}
/// Set the request claims
pub fn set_req_claims(mut self, req_claims: Option<HashMap<String, Value>>) -> Self {
self.req_claims = req_claims;
self
}
/// Set the request query
pub fn set_req_query(mut self, req_query: Option<HashMap<String, String>>) -> Self {
self.req_query = req_query;
self
}
/// Set the request header
pub fn set_req_header(mut self, req_header: Option<HashMap<String, String>>) -> Self {
self.req_header = req_header;
self
}
/// Set the response header
pub fn set_resp_header(mut self, resp_header: Option<HashMap<String, String>>) -> Self {
self.resp_header = resp_header;
self
}
/// Set the access key
pub fn set_access_key(mut self, access_key: Option<String>) -> Self {
self.access_key = access_key;
self
}
/// Set the parent user
pub fn set_parent_user(mut self, parent_user: Option<String>) -> Self {
self.parent_user = parent_user;
self
}
/// Set the error
pub fn set_error(mut self, error: Option<String>) -> Self {
self.error = error;
self
}
}
impl LogRecord for AuditLogEntry {
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
fn get_timestamp(&self) -> DateTime<Utc> {
self.base.timestamp
}
}

View File

@@ -0,0 +1,108 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// Base log entry structure shared by all log types
/// This structure is used to serialize log entries to JSON
/// and send them to the log sinks
/// This structure is also used to deserialize log entries from JSON
/// This structure is also used to store log entries in the database
/// This structure is also used to query log entries from the database
///
/// The `BaseLogEntry` structure contains the following fields:
/// - `timestamp` - the timestamp of the log entry
/// - `request_id` - the request ID of the log entry
/// - `message` - the message of the log entry
/// - `tags` - the tags of the log entry
///
/// The `BaseLogEntry` structure contains the following methods:
/// - `new` - create a new `BaseLogEntry` with default values
/// - `message` - set the message
/// - `request_id` - set the request ID
/// - `tags` - set the tags
/// - `timestamp` - set the timestamp
///
/// # Example
/// ```
/// use rustfs_audit_logger::BaseLogEntry;
/// use chrono::{DateTime, Utc};
/// use std::collections::HashMap;
///
/// let timestamp = Utc::now();
/// let request = Some("req-123".to_string());
/// let message = Some("This is a log message".to_string());
/// let tags = Some(HashMap::new());
///
/// let entry = BaseLogEntry::new()
/// .timestamp(timestamp)
/// .request_id(request)
/// .message(message)
/// .tags(tags);
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct BaseLogEntry {
#[serde(rename = "time")]
pub timestamp: DateTime<Utc>,
#[serde(rename = "requestID", skip_serializing_if = "Option::is_none")]
pub request_id: Option<String>,
#[serde(rename = "message", skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(rename = "tags", skip_serializing_if = "Option::is_none")]
pub tags: Option<HashMap<String, Value>>,
}
impl BaseLogEntry {
/// Create a new BaseLogEntry with default values
pub fn new() -> Self {
BaseLogEntry {
timestamp: Utc::now(),
request_id: None,
message: None,
tags: None,
}
}
/// Set the message
pub fn message(mut self, message: Option<String>) -> Self {
self.message = message;
self
}
/// Set the request ID
pub fn request_id(mut self, request_id: Option<String>) -> Self {
self.request_id = request_id;
self
}
/// Set the tags
pub fn tags(mut self, tags: Option<HashMap<String, Value>>) -> Self {
self.tags = tags;
self
}
/// Set the timestamp
pub fn timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
self.timestamp = timestamp;
self
}
}

View File

@@ -0,0 +1,159 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
pub(crate) mod args;
pub(crate) mod audit;
pub(crate) mod base;
pub(crate) mod unified;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tracing_core::Level;
/// ObjectVersion is used across multiple modules
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ObjectVersion {
#[serde(rename = "name")]
pub object_name: String,
#[serde(rename = "versionId", skip_serializing_if = "Option::is_none")]
pub version_id: Option<String>,
}
impl ObjectVersion {
/// Create a new ObjectVersion object
pub fn new() -> Self {
ObjectVersion {
object_name: String::new(),
version_id: None,
}
}
/// Create a new ObjectVersion with object name
pub fn new_with_object_name(object_name: String) -> Self {
ObjectVersion {
object_name,
version_id: None,
}
}
/// Set the object name
pub fn set_object_name(mut self, object_name: String) -> Self {
self.object_name = object_name;
self
}
/// Set the version ID
pub fn set_version_id(mut self, version_id: Option<String>) -> Self {
self.version_id = version_id;
self
}
}
impl Default for ObjectVersion {
fn default() -> Self {
Self::new()
}
}
/// Log kind/level enum
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub enum LogKind {
#[serde(rename = "INFO")]
#[default]
Info,
#[serde(rename = "WARNING")]
Warning,
#[serde(rename = "ERROR")]
Error,
#[serde(rename = "FATAL")]
Fatal,
}
/// Trait for types that can be serialized to JSON and have a timestamp
/// This trait is used by `ServerLogEntry` to convert the log entry to JSON
/// and get the timestamp of the log entry
/// This trait is implemented by `ServerLogEntry`
///
/// # Example
/// ```
/// use rustfs_audit_logger::LogRecord;
/// use chrono::{DateTime, Utc};
/// use rustfs_audit_logger::ServerLogEntry;
/// use tracing_core::Level;
///
/// let log_entry = ServerLogEntry::new(Level::INFO, "api_handler".to_string());
/// let json = log_entry.to_json();
/// let timestamp = log_entry.get_timestamp();
/// ```
pub trait LogRecord {
fn to_json(&self) -> String;
fn get_timestamp(&self) -> chrono::DateTime<chrono::Utc>;
}
/// Wrapper for `tracing_core::Level` to implement `Serialize` and `Deserialize`
/// for `ServerLogEntry`
/// This is necessary because `tracing_core::Level` does not implement `Serialize`
/// and `Deserialize`
/// This is a workaround to allow `ServerLogEntry` to be serialized and deserialized
/// using `serde`
///
/// # Example
/// ```
/// use rustfs_audit_logger::SerializableLevel;
/// use tracing_core::Level;
///
/// let level = Level::INFO;
/// let serializable_level = SerializableLevel::from(level);
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SerializableLevel(pub Level);
impl From<Level> for SerializableLevel {
fn from(level: Level) -> Self {
SerializableLevel(level)
}
}
impl From<SerializableLevel> for Level {
fn from(serializable_level: SerializableLevel) -> Self {
serializable_level.0
}
}
impl Serialize for SerializableLevel {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.0.as_str())
}
}
impl<'de> Deserialize<'de> for SerializableLevel {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
match s.as_str() {
"TRACE" => Ok(SerializableLevel(Level::TRACE)),
"DEBUG" => Ok(SerializableLevel(Level::DEBUG)),
"INFO" => Ok(SerializableLevel(Level::INFO)),
"WARN" => Ok(SerializableLevel(Level::WARN)),
"ERROR" => Ok(SerializableLevel(Level::ERROR)),
_ => Err(D::Error::custom("unknown log level")),
}
}
}

View File

@@ -0,0 +1,266 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use crate::{AuditLogEntry, BaseLogEntry, LogKind, LogRecord, SerializableLevel};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tracing_core::Level;
/// Server log entry with structured fields
/// ServerLogEntry is used to log structured log entries from the server
///
/// The `ServerLogEntry` structure contains the following fields:
/// - `base` - the base log entry
/// - `level` - the log level
/// - `source` - the source of the log entry
/// - `user_id` - the user ID
/// - `fields` - the structured fields of the log entry
///
/// The `ServerLogEntry` structure contains the following methods:
/// - `new` - create a new `ServerLogEntry` with specified level and source
/// - `with_base` - set the base log entry
/// - `user_id` - set the user ID
/// - `fields` - set the fields
/// - `add_field` - add a field
///
/// # Example
/// ```
/// use rustfs_audit_logger::ServerLogEntry;
/// use tracing_core::Level;
///
/// let entry = ServerLogEntry::new(Level::INFO, "test_module".to_string())
/// .user_id(Some("user-456".to_string()))
/// .add_field("operation".to_string(), "login".to_string());
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ServerLogEntry {
#[serde(flatten)]
pub base: BaseLogEntry,
pub level: SerializableLevel,
pub source: String,
#[serde(rename = "userId", skip_serializing_if = "Option::is_none")]
pub user_id: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub fields: Vec<(String, String)>,
}
impl ServerLogEntry {
/// Create a new ServerLogEntry with specified level and source
pub fn new(level: Level, source: String) -> Self {
ServerLogEntry {
base: BaseLogEntry::new(),
level: SerializableLevel(level),
source,
user_id: None,
fields: Vec::new(),
}
}
/// Set the base log entry
pub fn with_base(mut self, base: BaseLogEntry) -> Self {
self.base = base;
self
}
/// Set the user ID
pub fn user_id(mut self, user_id: Option<String>) -> Self {
self.user_id = user_id;
self
}
/// Set fields
pub fn fields(mut self, fields: Vec<(String, String)>) -> Self {
self.fields = fields;
self
}
/// Add a field
pub fn add_field(mut self, key: String, value: String) -> Self {
self.fields.push((key, value));
self
}
}
impl LogRecord for ServerLogEntry {
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
fn get_timestamp(&self) -> DateTime<Utc> {
self.base.timestamp
}
}
/// Console log entry structure
/// ConsoleLogEntry is used to log console log entries
/// The `ConsoleLogEntry` structure contains the following fields:
/// - `base` - the base log entry
/// - `level` - the log level
/// - `console_msg` - the console message
/// - `node_name` - the node name
/// - `err` - the error message
///
/// The `ConsoleLogEntry` structure contains the following methods:
/// - `new` - create a new `ConsoleLogEntry`
/// - `new_with_console_msg` - create a new `ConsoleLogEntry` with console message and node name
/// - `with_base` - set the base log entry
/// - `set_level` - set the log level
/// - `set_node_name` - set the node name
/// - `set_console_msg` - set the console message
/// - `set_err` - set the error message
///
/// # Example
/// ```
/// use rustfs_audit_logger::ConsoleLogEntry;
///
/// let entry = ConsoleLogEntry::new_with_console_msg("Test message".to_string(), "node-123".to_string());
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsoleLogEntry {
#[serde(flatten)]
pub base: BaseLogEntry,
pub level: LogKind,
pub console_msg: String,
pub node_name: String,
#[serde(skip)]
pub err: Option<String>,
}
impl ConsoleLogEntry {
/// Create a new ConsoleLogEntry
pub fn new() -> Self {
ConsoleLogEntry {
base: BaseLogEntry::new(),
level: LogKind::Info,
console_msg: String::new(),
node_name: String::new(),
err: None,
}
}
/// Create a new ConsoleLogEntry with console message and node name
pub fn new_with_console_msg(console_msg: String, node_name: String) -> Self {
ConsoleLogEntry {
base: BaseLogEntry::new(),
level: LogKind::Info,
console_msg,
node_name,
err: None,
}
}
/// Set the base log entry
pub fn with_base(mut self, base: BaseLogEntry) -> Self {
self.base = base;
self
}
/// Set the log level
pub fn set_level(mut self, level: LogKind) -> Self {
self.level = level;
self
}
/// Set the node name
pub fn set_node_name(mut self, node_name: String) -> Self {
self.node_name = node_name;
self
}
/// Set the console message
pub fn set_console_msg(mut self, console_msg: String) -> Self {
self.console_msg = console_msg;
self
}
/// Set the error message
pub fn set_err(mut self, err: Option<String>) -> Self {
self.err = err;
self
}
}
impl Default for ConsoleLogEntry {
fn default() -> Self {
Self::new()
}
}
impl LogRecord for ConsoleLogEntry {
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
fn get_timestamp(&self) -> DateTime<Utc> {
self.base.timestamp
}
}
/// Unified log entry type
/// UnifiedLogEntry is used to log different types of log entries
///
/// The `UnifiedLogEntry` enum contains the following variants:
/// - `Server` - a server log entry
/// - `Audit` - an audit log entry
/// - `Console` - a console log entry
///
/// The `UnifiedLogEntry` enum contains the following methods:
/// - `to_json` - convert the log entry to JSON
/// - `get_timestamp` - get the timestamp of the log entry
///
/// # Example
/// ```
/// use rustfs_audit_logger::{UnifiedLogEntry, ServerLogEntry};
/// use tracing_core::Level;
///
/// let server_entry = ServerLogEntry::new(Level::INFO, "test_module".to_string());
/// let unified = UnifiedLogEntry::Server(server_entry);
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum UnifiedLogEntry {
#[serde(rename = "server")]
Server(ServerLogEntry),
#[serde(rename = "audit")]
Audit(Box<AuditLogEntry>),
#[serde(rename = "console")]
Console(ConsoleLogEntry),
}
impl LogRecord for UnifiedLogEntry {
fn to_json(&self) -> String {
match self {
UnifiedLogEntry::Server(entry) => entry.to_json(),
UnifiedLogEntry::Audit(entry) => entry.to_json(),
UnifiedLogEntry::Console(entry) => entry.to_json(),
}
}
fn get_timestamp(&self) -> DateTime<Utc> {
match self {
UnifiedLogEntry::Server(entry) => entry.get_timestamp(),
UnifiedLogEntry::Audit(entry) => entry.get_timestamp(),
UnifiedLogEntry::Console(entry) => entry.get_timestamp(),
}
}
}

View File

@@ -0,0 +1,8 @@
mod entry;
mod logger;
pub use entry::args::Args;
pub use entry::audit::{ApiDetails, AuditLogEntry};
pub use entry::base::BaseLogEntry;
pub use entry::unified::{ConsoleLogEntry, ServerLogEntry, UnifiedLogEntry};
pub use entry::{LogKind, LogRecord, ObjectVersion, SerializableLevel};

View File

@@ -0,0 +1,29 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
// Default value function
fn default_batch_size() -> usize {
10
}
fn default_queue_size() -> usize {
10000
}
fn default_max_retry() -> u32 {
5
}
fn default_retry_interval() -> std::time::Duration {
std::time::Duration::from_secs(3)
}

View File

@@ -0,0 +1,13 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

View File

@@ -0,0 +1,108 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use chrono::{DateTime, Utc};
use serde::Serialize;
use std::collections::HashMap;
use uuid::Uuid;
///A Trait for a log entry that can be serialized and sent
pub trait Loggable: Serialize + Send + Sync + 'static {
fn to_json(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
}
/// Standard log entries
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct LogEntry {
pub deployment_id: String,
pub level: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub trace: Option<Trace>,
pub time: DateTime<Utc>,
pub request_id: String,
}
impl Loggable for LogEntry {}
/// Audit log entry
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct AuditEntry {
pub version: String,
pub deployment_id: String,
pub time: DateTime<Utc>,
pub trigger: String,
pub api: ApiDetails,
pub remote_host: String,
pub request_id: String,
pub user_agent: String,
pub access_key: String,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub tags: HashMap<String, String>,
}
impl Loggable for AuditEntry {}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Trace {
pub message: String,
pub source: Vec<String>,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub variables: HashMap<String, String>,
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ApiDetails {
pub name: String,
pub bucket: String,
pub object: String,
pub status: String,
pub status_code: u16,
pub time_to_first_byte: String,
pub time_to_response: String,
}
// Helper functions to create entries
impl AuditEntry {
pub fn new(api_name: &str, bucket: &str, object: &str) -> Self {
AuditEntry {
version: "1".to_string(),
deployment_id: "global-deployment-id".to_string(),
time: Utc::now(),
trigger: "incoming".to_string(),
api: ApiDetails {
name: api_name.to_string(),
bucket: bucket.to_string(),
object: object.to_string(),
status: "OK".to_string(),
status_code: 200,
time_to_first_byte: "10ms".to_string(),
time_to_response: "50ms".to_string(),
},
remote_host: "127.0.0.1".to_string(),
request_id: Uuid::new_v4().to_string(),
user_agent: "Rust-Client/1.0".to_string(),
access_key: "minioadmin".to_string(),
tags: HashMap::new(),
}
}
}

View File

@@ -0,0 +1,13 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

View File

@@ -0,0 +1,36 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
pub mod config;
pub mod dispatch;
pub mod entry;
pub mod factory;
use async_trait::async_trait;
use std::error::Error;
/// General Log Target Trait
#[async_trait]
pub trait Target: Send + Sync {
/// Send a single logizable entry
async fn send(&self, entry: Box<Self>) -> Result<(), Box<dyn Error + Send>>;
/// Returns the unique name of the target
fn name(&self) -> &str;
/// Close target gracefully, ensuring all buffered logs are processed
async fn shutdown(&self);
}

View File

@@ -31,8 +31,9 @@ const-str = { workspace = true, optional = true }
workspace = true
[features]
default = []
default = ["constants"]
audit = ["dep:const-str", "constants"]
constants = ["dep:const-str"]
notify = ["dep:const-str"]
observability = []
notify = ["dep:const-str", "constants"]
observability = ["constants"]

View File

@@ -0,0 +1,31 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Audit configuration module
//! //! This module defines the configuration for audit systems, including
//! webhook and other audit-related settings.
pub const AUDIT_WEBHOOK_SUB_SYS: &str = "audit_webhook";
pub const AUDIT_STORE_EXTENSION: &str = ".audit";
pub const WEBHOOK_ENDPOINT: &str = "endpoint";
pub const WEBHOOK_AUTH_TOKEN: &str = "auth_token";
pub const WEBHOOK_CLIENT_CERT: &str = "client_cert";
pub const WEBHOOK_CLIENT_KEY: &str = "client_key";
pub const WEBHOOK_BATCH_SIZE: &str = "batch_size";
pub const WEBHOOK_QUEUE_SIZE: &str = "queue_size";
pub const WEBHOOK_QUEUE_DIR: &str = "queue_dir";
pub const WEBHOOK_MAX_RETRY: &str = "max_retry";
pub const WEBHOOK_RETRY_INTERVAL: &str = "retry_interval";
pub const WEBHOOK_HTTP_TIMEOUT: &str = "http_timeout";

View File

@@ -16,6 +16,13 @@ pub const DEFAULT_DELIMITER: &str = "_";
pub const ENV_PREFIX: &str = "RUSTFS_";
pub const ENV_WORD_DELIMITER: &str = "_";
pub const DEFAULT_DIR: &str = "/opt/rustfs/events"; // Default directory for event store
pub const DEFAULT_LIMIT: u64 = 100000; // Default store limit
/// Standard config keys and values.
pub const ENABLE_KEY: &str = "enable";
pub const COMMENT_KEY: &str = "comment";
/// Medium-drawn lines separator
/// This is used to separate words in environment variable names.
pub const ENV_WORD_DELIMITER_DASH: &str = "-";

View File

@@ -20,6 +20,8 @@ pub use constants::app::*;
pub use constants::env::*;
#[cfg(feature = "constants")]
pub use constants::tls::*;
#[cfg(feature = "audit")]
pub mod audit;
#[cfg(feature = "notify")]
pub mod notify;
#[cfg(feature = "observability")]

View File

@@ -29,14 +29,6 @@ pub const NOTIFY_PREFIX: &str = "notify";
pub const NOTIFY_ROUTE_PREFIX: &str = const_str::concat!(NOTIFY_PREFIX, "_");
/// Standard config keys and values.
pub const ENABLE_KEY: &str = "enable";
pub const COMMENT_KEY: &str = "comment";
/// Enable values
pub const ENABLE_ON: &str = "on";
pub const ENABLE_OFF: &str = "off";
#[allow(dead_code)]
pub const NOTIFY_SUB_SYSTEMS: &[&str] = &[NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS];

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::notify::{COMMENT_KEY, ENABLE_KEY};
use crate::{COMMENT_KEY, ENABLE_KEY};
// MQTT Keys
pub const MQTT_BROKER: &str = "broker";

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub const DEFAULT_DIR: &str = "/opt/rustfs/events"; // Default directory for event store
pub const DEFAULT_LIMIT: u64 = 100000; // Default store limit
pub const DEFAULT_EXT: &str = ".unknown"; // Default file extension
pub const COMPRESS_EXT: &str = ".snappy"; // Extension for compressed files

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::notify::{COMMENT_KEY, ENABLE_KEY};
use crate::{COMMENT_KEY, ENABLE_KEY};
// Webhook Keys
pub const WEBHOOK_ENDPOINT: &str = "endpoint";

View File

@@ -34,7 +34,7 @@ workspace = true
default = []
[dependencies]
rustfs-config = { workspace = true, features = ["constants", "notify"] }
rustfs-config = { workspace = true, features = ["constants", "notify", "audit"] }
async-trait.workspace = true
bytes.workspace = true
byteorder = { workspace = true }

View File

@@ -0,0 +1,84 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::config::{KV, KVS};
use rustfs_config::audit::{
WEBHOOK_AUTH_TOKEN, WEBHOOK_BATCH_SIZE, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_HTTP_TIMEOUT,
WEBHOOK_MAX_RETRY, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_SIZE, WEBHOOK_RETRY_INTERVAL,
};
use rustfs_config::{DEFAULT_DIR, DEFAULT_LIMIT, ENABLE_KEY, EnableState};
use std::sync::LazyLock;
#[allow(dead_code)]
#[allow(clippy::declare_interior_mutable_const)]
/// Default KVS for audit webhook settings.
pub const DEFAULT_AUDIT_WEBHOOK_KVS: LazyLock<KVS> = LazyLock::new(|| {
KVS(vec![
KV {
key: ENABLE_KEY.to_owned(),
value: EnableState::Off.to_string(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_ENDPOINT.to_owned(),
value: "".to_owned(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_AUTH_TOKEN.to_owned(),
value: "".to_owned(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_CLIENT_CERT.to_owned(),
value: "".to_owned(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_CLIENT_KEY.to_owned(),
value: "".to_owned(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_BATCH_SIZE.to_owned(),
value: "1".to_owned(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_QUEUE_SIZE.to_owned(),
value: DEFAULT_LIMIT.to_string(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_QUEUE_DIR.to_owned(),
value: DEFAULT_DIR.to_owned(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_MAX_RETRY.to_owned(),
value: "0".to_owned(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_RETRY_INTERVAL.to_owned(),
value: "3s".to_owned(),
hidden_if_empty: false,
},
KV {
key: WEBHOOK_HTTP_TIMEOUT.to_owned(),
value: "5s".to_owned(),
hidden_if_empty: false,
},
])
});

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod audit;
pub mod com;
#[allow(dead_code)]
pub mod heal;
@@ -21,8 +22,9 @@ pub mod storageclass;
use crate::error::Result;
use crate::store::ECStore;
use com::{STORAGE_CLASS_SUB_SYS, lookup_configs, read_config_without_migrate};
use rustfs_config::COMMENT_KEY;
use rustfs_config::DEFAULT_DELIMITER;
use rustfs_config::notify::{COMMENT_KEY, NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS};
use rustfs_config::notify::{NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::LazyLock;

View File

@@ -14,10 +14,11 @@
use crate::config::{KV, KVS};
use rustfs_config::notify::{
COMMENT_KEY, DEFAULT_DIR, DEFAULT_LIMIT, ENABLE_KEY, ENABLE_OFF, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD,
MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN,
WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL,
MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR,
WEBHOOK_QUEUE_LIMIT,
};
use rustfs_config::{COMMENT_KEY, DEFAULT_DIR, DEFAULT_LIMIT, ENABLE_KEY, EnableState};
use std::sync::LazyLock;
/// The default configuration collection of webhooks
@@ -26,7 +27,7 @@ pub static DEFAULT_WEBHOOK_KVS: LazyLock<KVS> = LazyLock::new(|| {
KVS(vec![
KV {
key: ENABLE_KEY.to_owned(),
value: ENABLE_OFF.to_owned(),
value: EnableState::Off.to_string(),
hidden_if_empty: false,
},
KV {
@@ -73,7 +74,7 @@ pub static DEFAULT_MQTT_KVS: LazyLock<KVS> = LazyLock::new(|| {
KVS(vec![
KV {
key: ENABLE_KEY.to_owned(),
value: ENABLE_OFF.to_owned(),
value: EnableState::Off.to_string(),
hidden_if_empty: false,
},
KV {

View File

@@ -29,6 +29,7 @@ documentation = "https://docs.rs/rustfs-notify/latest/rustfs_notify/"
rustfs-config = { workspace = true, features = ["notify", "constants"] }
rustfs-ecstore = { workspace = true }
rustfs-utils = { workspace = true, features = ["path", "sys"] }
rustfs-targets = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
dashmap = { workspace = true }
@@ -36,23 +37,18 @@ futures = { workspace = true }
form_urlencoded = { workspace = true }
once_cell = { workspace = true }
quick-xml = { workspace = true, features = ["serialize", "async-tokio"] }
reqwest = { workspace = true }
rumqttc = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
snap = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "sync", "time"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
uuid = { workspace = true, features = ["v4", "serde"] }
url = { workspace = true }
urlencoding = { workspace = true }
wildmatch = { workspace = true, features = ["serde"] }
[dev-dependencies]
tokio = { workspace = true, features = ["test-util"] }
reqwest = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
axum = { workspace = true }
[lints]

View File

@@ -0,0 +1,59 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::IsTerminal;
use tracing_subscriber::{EnvFilter, fmt, prelude::*, util::SubscriberInitExt};
#[allow(dead_code)]
fn main() {
init_logger(LogLevel::Info);
tracing::info!("Tracing logger initialized with Info level");
}
/// Initialize the tracing log system
pub fn init_logger(level: LogLevel) {
let filter = EnvFilter::default().add_directive(level.into());
tracing_subscriber::registry()
.with(filter)
.with(
fmt::layer()
.with_target(true)
.with_target(true)
.with_ansi(std::io::stdout().is_terminal())
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true),
)
.init();
}
/// Log level definition
pub enum LogLevel {
Debug,
Info,
Warn,
Error,
}
impl From<LogLevel> for tracing_subscriber::filter::Directive {
fn from(level: LogLevel) -> Self {
match level {
LogLevel::Debug => "debug".parse().unwrap(),
LogLevel::Info => "info".parse().unwrap(),
LogLevel::Warn => "warn".parse().unwrap(),
LogLevel::Error => "error".parse().unwrap(),
}
}
}

View File

@@ -12,15 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod base;
use base::{LogLevel, init_logger};
use rustfs_config::EnableState::On;
use rustfs_config::notify::{
DEFAULT_LIMIT, DEFAULT_TARGET, ENABLE_KEY, ENABLE_ON, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT,
MQTT_TOPIC, MQTT_USERNAME, NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT,
WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME,
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
};
use rustfs_config::{DEFAULT_LIMIT, ENABLE_KEY};
use rustfs_ecstore::config::{Config, KV, KVS};
use rustfs_notify::arn::TargetID;
use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger};
use rustfs_notify::{BucketNotificationConfig, Event, NotificationError};
use rustfs_notify::{initialize, notification_system};
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use std::sync::Arc;
use std::time::Duration;
use tracing::info;
@@ -46,7 +51,7 @@ async fn main() -> Result<(), NotificationError> {
let webhook_kvs_vec = vec![
KV {
key: ENABLE_KEY.to_string(),
value: ENABLE_ON.to_string(),
value: On.to_string(),
hidden_if_empty: false,
},
KV {
@@ -85,7 +90,7 @@ async fn main() -> Result<(), NotificationError> {
let mqtt_kvs_vec = vec![
KV {
key: ENABLE_KEY.to_string(),
value: ENABLE_ON.to_string(),
value: On.to_string(),
hidden_if_empty: false,
},
KV {
@@ -136,6 +141,10 @@ async fn main() -> Result<(), NotificationError> {
// Load the configuration and initialize the system
*system.config.write().await = config;
info!("---> Initializing notification system with Webhook and MQTT targets...");
info!("Webhook Endpoint: {}", WEBHOOK_ENDPOINT);
info!("MQTT Broker: {}", MQTT_BROKER);
info!("system.init config: {:?}", system.config.read().await);
system.init().await?;
info!("✅ System initialized with Webhook and MQTT targets.");

View File

@@ -12,16 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Using Global Accessories
mod base;
use base::{LogLevel, init_logger};
use rustfs_config::EnableState::On;
use rustfs_config::notify::{
DEFAULT_LIMIT, DEFAULT_TARGET, ENABLE_KEY, ENABLE_ON, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT,
MQTT_TOPIC, MQTT_USERNAME, NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT,
WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME,
NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
};
use rustfs_config::{DEFAULT_LIMIT, ENABLE_KEY};
use rustfs_ecstore::config::{Config, KV, KVS};
use rustfs_notify::arn::TargetID;
use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger};
use rustfs_notify::{BucketNotificationConfig, Event, NotificationError};
use rustfs_notify::{initialize, notification_system};
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use std::sync::Arc;
use std::time::Duration;
use tracing::info;
@@ -47,7 +51,7 @@ async fn main() -> Result<(), NotificationError> {
let webhook_kvs_vec = vec![
KV {
key: ENABLE_KEY.to_string(),
value: ENABLE_ON.to_string(),
value: On.to_string(),
hidden_if_empty: false,
},
KV {
@@ -95,7 +99,7 @@ async fn main() -> Result<(), NotificationError> {
let mqtt_kvs_vec = vec![
KV {
key: ENABLE_KEY.to_string(),
value: ENABLE_ON.to_string(),
value: On.to_string(),
hidden_if_empty: false,
},
KV {

View File

@@ -1,98 +1,22 @@
// Copyright 2024 RustFS Team
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::arn::TargetID;
use rustfs_targets::TargetError;
use rustfs_targets::arn::TargetID;
use std::io;
use thiserror::Error;
/// Error types for the store
#[derive(Debug, Error)]
pub enum StoreError {
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Deserialization error: {0}")]
Deserialization(String),
#[error("Compression error: {0}")]
Compression(String),
#[error("Entry limit exceeded")]
LimitExceeded,
#[error("Entry not found")]
NotFound,
#[error("Invalid entry: {0}")]
Internal(String), // Added internal error type
}
/// Error types for targets
#[derive(Debug, Error)]
pub enum TargetError {
#[error("Storage error: {0}")]
Storage(String),
#[error("Network error: {0}")]
Network(String),
#[error("Request error: {0}")]
Request(String),
#[error("Timeout error: {0}")]
Timeout(String),
#[error("Authentication error: {0}")]
Authentication(String),
#[error("Configuration error: {0}")]
Configuration(String),
#[error("Encoding error: {0}")]
Encoding(String),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Target not connected")]
NotConnected,
#[error("Target initialization failed: {0}")]
Initialization(String),
#[error("Invalid ARN: {0}")]
InvalidARN(String),
#[error("Unknown error: {0}")]
Unknown(String),
#[error("Target is disabled")]
Disabled,
#[error("Configuration parsing error: {0}")]
ParseError(String),
#[error("Failed to save configuration: {0}")]
SaveConfig(String),
#[error("Server not initialized: {0}")]
ServerNotInitialized(String),
}
/// Error types for the notification system
#[derive(Debug, Error)]
pub enum NotificationError {
@@ -135,9 +59,3 @@ pub enum NotificationError {
#[error("Server not initialized")]
ServerNotInitialized,
}
impl From<url::ParseError> for TargetError {
fn from(err: url::ParseError) -> Self {
TargetError::Configuration(format!("URL parse error: {err}"))
}
}

View File

@@ -13,285 +13,11 @@
// limitations under the License.
use chrono::{DateTime, Utc};
use rustfs_targets::EventName;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use url::form_urlencoded;
/// Error returned when parsing event name string fails。
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParseEventNameError(String);
impl fmt::Display for ParseEventNameError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Invalid event name:{}", self.0)
}
}
impl std::error::Error for ParseEventNameError {}
/// Represents the type of event that occurs on the object.
/// Based on AWS S3 event type and includes RustFS extension.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum EventName {
// Single event type (values are 1-32 for compatible mask logic)
ObjectAccessedGet = 1,
ObjectAccessedGetRetention = 2,
ObjectAccessedGetLegalHold = 3,
ObjectAccessedHead = 4,
ObjectAccessedAttributes = 5,
ObjectCreatedCompleteMultipartUpload = 6,
ObjectCreatedCopy = 7,
ObjectCreatedPost = 8,
ObjectCreatedPut = 9,
ObjectCreatedPutRetention = 10,
ObjectCreatedPutLegalHold = 11,
ObjectCreatedPutTagging = 12,
ObjectCreatedDeleteTagging = 13,
ObjectRemovedDelete = 14,
ObjectRemovedDeleteMarkerCreated = 15,
ObjectRemovedDeleteAllVersions = 16,
ObjectRemovedNoOP = 17,
BucketCreated = 18,
BucketRemoved = 19,
ObjectReplicationFailed = 20,
ObjectReplicationComplete = 21,
ObjectReplicationMissedThreshold = 22,
ObjectReplicationReplicatedAfterThreshold = 23,
ObjectReplicationNotTracked = 24,
ObjectRestorePost = 25,
ObjectRestoreCompleted = 26,
ObjectTransitionFailed = 27,
ObjectTransitionComplete = 28,
ScannerManyVersions = 29, // ObjectManyVersions corresponding to Go
ScannerLargeVersions = 30, // ObjectLargeVersions corresponding to Go
ScannerBigPrefix = 31, // PrefixManyFolders corresponding to Go
LifecycleDelMarkerExpirationDelete = 32, // ILMDelMarkerExpirationDelete corresponding to Go
// Compound "All" event type (no sequential value for mask)
ObjectAccessedAll,
ObjectCreatedAll,
ObjectRemovedAll,
ObjectReplicationAll,
ObjectRestoreAll,
ObjectTransitionAll,
ObjectScannerAll, // New, from Go
Everything, // New, from Go
}
// Single event type sequential array for Everything.expand()
const SINGLE_EVENT_NAMES_IN_ORDER: [EventName; 32] = [
EventName::ObjectAccessedGet,
EventName::ObjectAccessedGetRetention,
EventName::ObjectAccessedGetLegalHold,
EventName::ObjectAccessedHead,
EventName::ObjectAccessedAttributes,
EventName::ObjectCreatedCompleteMultipartUpload,
EventName::ObjectCreatedCopy,
EventName::ObjectCreatedPost,
EventName::ObjectCreatedPut,
EventName::ObjectCreatedPutRetention,
EventName::ObjectCreatedPutLegalHold,
EventName::ObjectCreatedPutTagging,
EventName::ObjectCreatedDeleteTagging,
EventName::ObjectRemovedDelete,
EventName::ObjectRemovedDeleteMarkerCreated,
EventName::ObjectRemovedDeleteAllVersions,
EventName::ObjectRemovedNoOP,
EventName::BucketCreated,
EventName::BucketRemoved,
EventName::ObjectReplicationFailed,
EventName::ObjectReplicationComplete,
EventName::ObjectReplicationMissedThreshold,
EventName::ObjectReplicationReplicatedAfterThreshold,
EventName::ObjectReplicationNotTracked,
EventName::ObjectRestorePost,
EventName::ObjectRestoreCompleted,
EventName::ObjectTransitionFailed,
EventName::ObjectTransitionComplete,
EventName::ScannerManyVersions,
EventName::ScannerLargeVersions,
EventName::ScannerBigPrefix,
EventName::LifecycleDelMarkerExpirationDelete,
];
const LAST_SINGLE_TYPE_VALUE: u32 = EventName::LifecycleDelMarkerExpirationDelete as u32;
impl EventName {
/// The parsed string is EventName.
pub fn parse(s: &str) -> Result<Self, ParseEventNameError> {
match s {
"s3:BucketCreated:*" => Ok(EventName::BucketCreated),
"s3:BucketRemoved:*" => Ok(EventName::BucketRemoved),
"s3:ObjectAccessed:*" => Ok(EventName::ObjectAccessedAll),
"s3:ObjectAccessed:Get" => Ok(EventName::ObjectAccessedGet),
"s3:ObjectAccessed:GetRetention" => Ok(EventName::ObjectAccessedGetRetention),
"s3:ObjectAccessed:GetLegalHold" => Ok(EventName::ObjectAccessedGetLegalHold),
"s3:ObjectAccessed:Head" => Ok(EventName::ObjectAccessedHead),
"s3:ObjectAccessed:Attributes" => Ok(EventName::ObjectAccessedAttributes),
"s3:ObjectCreated:*" => Ok(EventName::ObjectCreatedAll),
"s3:ObjectCreated:CompleteMultipartUpload" => Ok(EventName::ObjectCreatedCompleteMultipartUpload),
"s3:ObjectCreated:Copy" => Ok(EventName::ObjectCreatedCopy),
"s3:ObjectCreated:Post" => Ok(EventName::ObjectCreatedPost),
"s3:ObjectCreated:Put" => Ok(EventName::ObjectCreatedPut),
"s3:ObjectCreated:PutRetention" => Ok(EventName::ObjectCreatedPutRetention),
"s3:ObjectCreated:PutLegalHold" => Ok(EventName::ObjectCreatedPutLegalHold),
"s3:ObjectCreated:PutTagging" => Ok(EventName::ObjectCreatedPutTagging),
"s3:ObjectCreated:DeleteTagging" => Ok(EventName::ObjectCreatedDeleteTagging),
"s3:ObjectRemoved:*" => Ok(EventName::ObjectRemovedAll),
"s3:ObjectRemoved:Delete" => Ok(EventName::ObjectRemovedDelete),
"s3:ObjectRemoved:DeleteMarkerCreated" => Ok(EventName::ObjectRemovedDeleteMarkerCreated),
"s3:ObjectRemoved:NoOP" => Ok(EventName::ObjectRemovedNoOP),
"s3:ObjectRemoved:DeleteAllVersions" => Ok(EventName::ObjectRemovedDeleteAllVersions),
"s3:LifecycleDelMarkerExpiration:Delete" => Ok(EventName::LifecycleDelMarkerExpirationDelete),
"s3:Replication:*" => Ok(EventName::ObjectReplicationAll),
"s3:Replication:OperationFailedReplication" => Ok(EventName::ObjectReplicationFailed),
"s3:Replication:OperationCompletedReplication" => Ok(EventName::ObjectReplicationComplete),
"s3:Replication:OperationMissedThreshold" => Ok(EventName::ObjectReplicationMissedThreshold),
"s3:Replication:OperationReplicatedAfterThreshold" => Ok(EventName::ObjectReplicationReplicatedAfterThreshold),
"s3:Replication:OperationNotTracked" => Ok(EventName::ObjectReplicationNotTracked),
"s3:ObjectRestore:*" => Ok(EventName::ObjectRestoreAll),
"s3:ObjectRestore:Post" => Ok(EventName::ObjectRestorePost),
"s3:ObjectRestore:Completed" => Ok(EventName::ObjectRestoreCompleted),
"s3:ObjectTransition:Failed" => Ok(EventName::ObjectTransitionFailed),
"s3:ObjectTransition:Complete" => Ok(EventName::ObjectTransitionComplete),
"s3:ObjectTransition:*" => Ok(EventName::ObjectTransitionAll),
"s3:Scanner:ManyVersions" => Ok(EventName::ScannerManyVersions),
"s3:Scanner:LargeVersions" => Ok(EventName::ScannerLargeVersions),
"s3:Scanner:BigPrefix" => Ok(EventName::ScannerBigPrefix),
// ObjectScannerAll and Everything cannot be parsed from strings, because the Go version also does not define their string representation.
_ => Err(ParseEventNameError(s.to_string())),
}
}
/// Returns a string representation of the event type.
pub fn as_str(&self) -> &'static str {
match self {
EventName::BucketCreated => "s3:BucketCreated:*",
EventName::BucketRemoved => "s3:BucketRemoved:*",
EventName::ObjectAccessedAll => "s3:ObjectAccessed:*",
EventName::ObjectAccessedGet => "s3:ObjectAccessed:Get",
EventName::ObjectAccessedGetRetention => "s3:ObjectAccessed:GetRetention",
EventName::ObjectAccessedGetLegalHold => "s3:ObjectAccessed:GetLegalHold",
EventName::ObjectAccessedHead => "s3:ObjectAccessed:Head",
EventName::ObjectAccessedAttributes => "s3:ObjectAccessed:Attributes",
EventName::ObjectCreatedAll => "s3:ObjectCreated:*",
EventName::ObjectCreatedCompleteMultipartUpload => "s3:ObjectCreated:CompleteMultipartUpload",
EventName::ObjectCreatedCopy => "s3:ObjectCreated:Copy",
EventName::ObjectCreatedPost => "s3:ObjectCreated:Post",
EventName::ObjectCreatedPut => "s3:ObjectCreated:Put",
EventName::ObjectCreatedPutTagging => "s3:ObjectCreated:PutTagging",
EventName::ObjectCreatedDeleteTagging => "s3:ObjectCreated:DeleteTagging",
EventName::ObjectCreatedPutRetention => "s3:ObjectCreated:PutRetention",
EventName::ObjectCreatedPutLegalHold => "s3:ObjectCreated:PutLegalHold",
EventName::ObjectRemovedAll => "s3:ObjectRemoved:*",
EventName::ObjectRemovedDelete => "s3:ObjectRemoved:Delete",
EventName::ObjectRemovedDeleteMarkerCreated => "s3:ObjectRemoved:DeleteMarkerCreated",
EventName::ObjectRemovedNoOP => "s3:ObjectRemoved:NoOP",
EventName::ObjectRemovedDeleteAllVersions => "s3:ObjectRemoved:DeleteAllVersions",
EventName::LifecycleDelMarkerExpirationDelete => "s3:LifecycleDelMarkerExpiration:Delete",
EventName::ObjectReplicationAll => "s3:Replication:*",
EventName::ObjectReplicationFailed => "s3:Replication:OperationFailedReplication",
EventName::ObjectReplicationComplete => "s3:Replication:OperationCompletedReplication",
EventName::ObjectReplicationNotTracked => "s3:Replication:OperationNotTracked",
EventName::ObjectReplicationMissedThreshold => "s3:Replication:OperationMissedThreshold",
EventName::ObjectReplicationReplicatedAfterThreshold => "s3:Replication:OperationReplicatedAfterThreshold",
EventName::ObjectRestoreAll => "s3:ObjectRestore:*",
EventName::ObjectRestorePost => "s3:ObjectRestore:Post",
EventName::ObjectRestoreCompleted => "s3:ObjectRestore:Completed",
EventName::ObjectTransitionAll => "s3:ObjectTransition:*",
EventName::ObjectTransitionFailed => "s3:ObjectTransition:Failed",
EventName::ObjectTransitionComplete => "s3:ObjectTransition:Complete",
EventName::ScannerManyVersions => "s3:Scanner:ManyVersions",
EventName::ScannerLargeVersions => "s3:Scanner:LargeVersions",
EventName::ScannerBigPrefix => "s3:Scanner:BigPrefix",
// Go's String() returns "" for ObjectScannerAll and Everything
EventName::ObjectScannerAll => "s3:Scanner:*", // Follow the pattern in Go Expand
EventName::Everything => "", // Go String() returns "" to unprocessed
}
}
/// Returns the extended value of the abbreviation event type.
pub fn expand(&self) -> Vec<Self> {
match self {
EventName::ObjectAccessedAll => vec![
EventName::ObjectAccessedGet,
EventName::ObjectAccessedHead,
EventName::ObjectAccessedGetRetention,
EventName::ObjectAccessedGetLegalHold,
EventName::ObjectAccessedAttributes,
],
EventName::ObjectCreatedAll => vec![
EventName::ObjectCreatedCompleteMultipartUpload,
EventName::ObjectCreatedCopy,
EventName::ObjectCreatedPost,
EventName::ObjectCreatedPut,
EventName::ObjectCreatedPutRetention,
EventName::ObjectCreatedPutLegalHold,
EventName::ObjectCreatedPutTagging,
EventName::ObjectCreatedDeleteTagging,
],
EventName::ObjectRemovedAll => vec![
EventName::ObjectRemovedDelete,
EventName::ObjectRemovedDeleteMarkerCreated,
EventName::ObjectRemovedNoOP,
EventName::ObjectRemovedDeleteAllVersions,
],
EventName::ObjectReplicationAll => vec![
EventName::ObjectReplicationFailed,
EventName::ObjectReplicationComplete,
EventName::ObjectReplicationNotTracked,
EventName::ObjectReplicationMissedThreshold,
EventName::ObjectReplicationReplicatedAfterThreshold,
],
EventName::ObjectRestoreAll => vec![EventName::ObjectRestorePost, EventName::ObjectRestoreCompleted],
EventName::ObjectTransitionAll => vec![EventName::ObjectTransitionFailed, EventName::ObjectTransitionComplete],
EventName::ObjectScannerAll => vec![
// New
EventName::ScannerManyVersions,
EventName::ScannerLargeVersions,
EventName::ScannerBigPrefix,
],
EventName::Everything => {
// New
SINGLE_EVENT_NAMES_IN_ORDER.to_vec()
}
// A single type returns to itself directly
_ => vec![*self],
}
}
/// Returns the mask of type.
/// The compound "All" type will be expanded.
pub fn mask(&self) -> u64 {
let value = *self as u32;
if value > 0 && value <= LAST_SINGLE_TYPE_VALUE {
// It's a single type
1u64 << (value - 1)
} else {
// It's a compound type
let mut mask = 0u64;
for n in self.expand() {
mask |= n.mask(); // Recursively call mask
}
mask
}
}
}
impl fmt::Display for EventName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}
/// Convert to `EventName` according to string
impl From<&str> for EventName {
fn from(event_str: &str) -> Self {
EventName::parse(event_str).unwrap_or_else(|e| panic!("{}", e))
}
}
/// Represents the identity of the user who triggered the event
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Identity {
@@ -532,17 +258,6 @@ fn initialize_response_elements(elements: &mut HashMap<String, String>, keys: &[
}
}
/// Represents a log of events for sending to targets
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventLog {
/// The event name
pub event_name: EventName,
/// The object key
pub key: String,
/// The list of events
pub records: Vec<Event>,
}
#[derive(Debug, Clone)]
pub struct EventArgs {
pub event_name: EventName,

View File

@@ -12,19 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{
error::TargetError,
target::{Target, mqtt::MQTTArgs, webhook::WebhookArgs},
};
use async_trait::async_trait;
use rumqttc::QoS;
use rustfs_config::notify::{
DEFAULT_DIR, DEFAULT_LIMIT, ENV_NOTIFY_MQTT_KEYS, ENV_NOTIFY_WEBHOOK_KEYS, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL,
MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME,
NOTIFY_MQTT_KEYS, NOTIFY_WEBHOOK_KEYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT,
WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
ENV_NOTIFY_MQTT_KEYS, ENV_NOTIFY_WEBHOOK_KEYS, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS,
MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, NOTIFY_MQTT_KEYS, NOTIFY_WEBHOOK_KEYS,
WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT,
};
use crate::Event;
use rustfs_config::{DEFAULT_DIR, DEFAULT_LIMIT};
use rustfs_ecstore::config::KVS;
use rustfs_targets::{
Target,
error::TargetError,
target::{mqtt::MQTTArgs, webhook::WebhookArgs},
};
use std::collections::HashSet;
use std::time::Duration;
use tracing::{debug, warn};
@@ -34,7 +37,7 @@ use url::Url;
#[async_trait]
pub trait TargetFactory: Send + Sync {
/// Creates a target from configuration
async fn create_target(&self, id: String, config: &KVS) -> Result<Box<dyn Target + Send + Sync>, TargetError>;
async fn create_target(&self, id: String, config: &KVS) -> Result<Box<dyn Target<Event> + Send + Sync>, TargetError>;
/// Validates target configuration
fn validate_config(&self, id: &str, config: &KVS) -> Result<(), TargetError>;
@@ -53,7 +56,7 @@ pub struct WebhookTargetFactory;
#[async_trait]
impl TargetFactory for WebhookTargetFactory {
async fn create_target(&self, id: String, config: &KVS) -> Result<Box<dyn Target + Send + Sync>, TargetError> {
async fn create_target(&self, id: String, config: &KVS) -> Result<Box<dyn Target<Event> + Send + Sync>, TargetError> {
// All config values are now read directly from the merged `config` KVS.
let endpoint = config
.lookup(WEBHOOK_ENDPOINT)
@@ -72,9 +75,10 @@ impl TargetFactory for WebhookTargetFactory {
.unwrap_or(DEFAULT_LIMIT),
client_cert: config.lookup(WEBHOOK_CLIENT_CERT).unwrap_or_default(),
client_key: config.lookup(WEBHOOK_CLIENT_KEY).unwrap_or_default(),
target_type: rustfs_targets::target::TargetType::NotifyEvent,
};
let target = crate::target::webhook::WebhookTarget::new(id, args)?;
let target = rustfs_targets::target::webhook::WebhookTarget::new(id, args)?;
Ok(Box::new(target))
}
@@ -119,7 +123,7 @@ pub struct MQTTTargetFactory;
#[async_trait]
impl TargetFactory for MQTTTargetFactory {
async fn create_target(&self, id: String, config: &KVS) -> Result<Box<dyn Target + Send + Sync>, TargetError> {
async fn create_target(&self, id: String, config: &KVS) -> Result<Box<dyn Target<Event> + Send + Sync>, TargetError> {
let broker = config
.lookup(MQTT_BROKER)
.ok_or_else(|| TargetError::Configuration("Missing MQTT broker".to_string()))?;
@@ -161,9 +165,10 @@ impl TargetFactory for MQTTTargetFactory {
.lookup(MQTT_QUEUE_LIMIT)
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(DEFAULT_LIMIT),
target_type: rustfs_targets::target::TargetType::NotifyEvent,
};
let target = crate::target::mqtt::MQTTTarget::new(id, args)?;
let target = rustfs_targets::target::mqtt::MQTTTarget::new(id, args)?;
Ok(Box::new(target))
}

View File

@@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{Event, EventArgs, NotificationError, NotificationSystem};
use crate::{BucketNotificationConfig, Event, EventArgs, NotificationError, NotificationSystem};
use once_cell::sync::Lazy;
use rustfs_ecstore::config::Config;
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use std::sync::{Arc, OnceLock};
use tracing::instrument;
use tracing::{error, instrument};
static NOTIFICATION_SYSTEM: OnceLock<Arc<NotificationSystem>> = OnceLock::new();
// Create a globally unique Notifier instance
@@ -57,6 +59,14 @@ pub struct Notifier {}
impl Notifier {
/// Notify an event asynchronously.
/// This is the only entry point for all event notifications in the system.
/// # Parameter
/// - `args`: The event arguments containing details about the event to be notified.
///
/// # Return value
/// Returns `()`, indicating that the notification has been sent.
///
/// # Using
/// This function is used to notify events in the system, such as object creation, deletion, or updates.
#[instrument(skip(self, args))]
pub async fn notify(&self, args: EventArgs) {
// Dependency injection or service positioning mode obtain NotificationSystem instance
@@ -64,7 +74,7 @@ impl Notifier {
// If the notification system itself cannot be retrieved, it will be returned directly
Some(sys) => sys,
None => {
tracing::error!("Notification system is not initialized.");
error!("Notification system is not initialized.");
return;
}
};
@@ -76,6 +86,7 @@ impl Notifier {
// Check if any subscribers are interested in the event
if !notification_sys.has_subscriber(&args.bucket_name, &args.event_name).await {
error!("No subscribers for event: {} in bucket: {}", args.event_name, args.bucket_name);
return;
}
@@ -83,4 +94,96 @@ impl Notifier {
let event = Arc::new(Event::new(args));
notification_sys.send_event(event).await;
}
/// Add notification rules for the specified bucket and load configuration
/// # Parameter
/// - `bucket_name`: The name of the target bucket.
/// - `region`: The area where bucket is located.
/// - `event_names`: A list of event names that trigger notifications.
/// - `prefix`: The prefix of the object key that triggers notifications.
/// - `suffix`: The suffix of the object key that triggers notifications.
/// - `target_ids`: A list of target IDs that will receive notifications.
///
/// # Return value
/// Returns `Result<(), NotificationError>`, Ok on success, and an error on failure
///
/// # Using
/// This function allows you to dynamically add notification rules for a specific bucket.
pub async fn add_bucket_notification_rule(
&self,
bucket_name: &str,
region: &str,
event_names: &[EventName],
prefix: &str,
suffix: &str,
target_ids: &[TargetID],
) -> Result<(), NotificationError> {
// Construct pattern, simple splicing of prefixes and suffixes
let mut pattern = String::new();
if !prefix.is_empty() {
pattern.push_str(prefix);
}
pattern.push('*');
if !suffix.is_empty() {
pattern.push_str(suffix);
}
// Create BucketNotificationConfig
let mut bucket_config = BucketNotificationConfig::new(region);
for target_id in target_ids {
bucket_config.add_rule(event_names, pattern.clone(), target_id.clone());
}
// Get global NotificationSystem
let notification_sys = match notification_system() {
Some(sys) => sys,
None => return Err(NotificationError::ServerNotInitialized),
};
// Loading configuration
notification_sys
.load_bucket_notification_config(bucket_name, &bucket_config)
.await
}
/// Dynamically add notification rules according to different event types.
///
/// # Parameter
/// - `bucket_name`: The name of the target bucket.
/// - `region`: The area where bucket is located.
/// - `event_rules`: Each rule contains a list of event types, prefixes, suffixes, and target IDs.
///
/// # Return value
/// Returns `Result<(), NotificationError>`, Ok on success, and an error on failure.
///
/// # Using
/// Supports notification rules for adding multiple event types, prefixes, suffixes, and targets to the same bucket in batches.
pub async fn add_event_specific_rules(
&self,
bucket_name: &str,
region: &str,
event_rules: &[(Vec<EventName>, &str, &str, Vec<TargetID>)],
) -> Result<(), NotificationError> {
let mut bucket_config = BucketNotificationConfig::new(region);
for (event_names, prefix, suffix, target_ids) in event_rules {
// Use `new_pattern` to construct a matching pattern
let pattern = crate::rules::pattern::new_pattern(Some(prefix), Some(suffix));
for target_id in target_ids {
bucket_config.add_rule(event_names, pattern.clone(), target_id.clone());
}
}
// Get global NotificationSystem instance
let notification_sys = match notification_system() {
Some(sys) => sys,
None => return Err(NotificationError::ServerNotInitialized),
};
// Loading configuration
notification_sys
.load_bucket_notification_config(bucket_name, &bucket_config)
.await
}
}

View File

@@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::arn::TargetID;
use crate::store::{Key, Store};
use crate::{
Event, EventName, StoreError, Target, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry,
rules::BucketNotificationConfig, stream,
Event, error::NotificationError, notifier::EventNotifier, registry::TargetRegistry, rules::BucketNotificationConfig, stream,
};
use rustfs_ecstore::config::{Config, KVS};
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use rustfs_targets::store::{Key, Store};
use rustfs_targets::target::EntityTarget;
use rustfs_targets::{StoreError, Target};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -127,7 +129,7 @@ impl NotificationSystem {
let config = self.config.read().await;
debug!("Initializing notification system with config: {:?}", *config);
let targets: Vec<Box<dyn Target + Send + Sync>> = self.registry.create_targets_from_config(&config).await?;
let targets: Vec<Box<dyn Target<Event> + Send + Sync>> = self.registry.create_targets_from_config(&config).await?;
info!("{} notification targets were created", targets.len());
@@ -318,8 +320,8 @@ impl NotificationSystem {
/// Enhanced event stream startup function, including monitoring and concurrency control
fn enhanced_start_event_stream(
&self,
store: Box<dyn Store<Event, Error = StoreError, Key = Key> + Send>,
target: Arc<dyn Target + Send + Sync>,
store: Box<dyn Store<EntityTarget<Event>, Error = StoreError, Key = Key> + Send>,
target: Arc<dyn Target<Event> + Send + Sync>,
metrics: Arc<NotificationMetrics>,
semaphore: Arc<Semaphore>,
) -> mpsc::Sender<()> {
@@ -348,7 +350,7 @@ impl NotificationSystem {
// Create a new target from configuration
// This function will now be responsible for merging env, creating and persisting the final configuration.
let targets: Vec<Box<dyn Target + Send + Sync>> = self
let targets: Vec<Box<dyn Target<Event> + Send + Sync>> = self
.registry
.create_targets_from_config(&new_config)
.await

View File

@@ -18,7 +18,6 @@
//! It supports sending events to various targets
//! (like Webhook and MQTT) and includes features like event persistence and retry on failure.
pub mod arn;
pub mod error;
pub mod event;
pub mod factory;
@@ -27,59 +26,10 @@ pub mod integration;
pub mod notifier;
pub mod registry;
pub mod rules;
pub mod store;
pub mod stream;
pub mod target;
// Re-exports
pub use error::{NotificationError, StoreError, TargetError};
pub use event::{Event, EventArgs, EventLog, EventName};
pub use error::NotificationError;
pub use event::{Event, EventArgs};
pub use global::{initialize, is_notification_system_initialized, notification_system};
pub use integration::NotificationSystem;
pub use rules::BucketNotificationConfig;
use std::io::IsTerminal;
pub use target::Target;
use tracing_subscriber::{EnvFilter, fmt, prelude::*, util::SubscriberInitExt};
/// Initialize the tracing log system
///
/// # Example
/// ```
/// rustfs_notify::init_logger(rustfs_notify::LogLevel::Info);
/// ```
pub fn init_logger(level: LogLevel) {
let filter = EnvFilter::default().add_directive(level.into());
tracing_subscriber::registry()
.with(filter)
.with(
fmt::layer()
.with_target(true)
.with_target(true)
.with_ansi(std::io::stdout().is_terminal())
.with_thread_names(true)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true),
)
.init();
}
/// Log level definition
pub enum LogLevel {
Debug,
Info,
Warn,
Error,
}
impl From<LogLevel> for tracing_subscriber::filter::Directive {
fn from(level: LogLevel) -> Self {
match level {
LogLevel::Debug => "debug".parse().unwrap(),
LogLevel::Info => "info".parse().unwrap(),
LogLevel::Warn => "warn".parse().unwrap(),
LogLevel::Error => "error".parse().unwrap(),
}
}
}

View File

@@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::arn::TargetID;
use crate::{EventName, error::NotificationError, event::Event, rules::RulesMap, target::Target};
use crate::{error::NotificationError, event::Event, rules::RulesMap};
use dashmap::DashMap;
use rustfs_targets::EventName;
use rustfs_targets::Target;
use rustfs_targets::arn::TargetID;
use rustfs_targets::target::EntityTarget;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument, warn};
@@ -121,7 +124,7 @@ impl EventNotifier {
}
/// Sends an event to the appropriate targets based on the bucket rules
#[instrument(skip(self, event))]
#[instrument(skip_all)]
pub async fn send(&self, event: Arc<Event>) {
let bucket_name = &event.s3.bucket.name;
let object_key = &event.s3.object.key;
@@ -149,8 +152,15 @@ impl EventNotifier {
let target_name_for_task = cloned_target_for_task.name(); // Get the name before generating the task
debug!("Preparing to send event to target: {}", target_name_for_task);
// Use cloned data in closures to avoid borrowing conflicts
// Create an EntityTarget from the event
let entity_target: Arc<EntityTarget<Event>> = Arc::new(EntityTarget {
object_name: object_key.to_string(),
bucket_name: bucket_name.to_string(),
event_name,
data: event_clone.clone().as_ref().clone(),
});
let handle = tokio::spawn(async move {
if let Err(e) = cloned_target_for_task.save(event_clone).await {
if let Err(e) = cloned_target_for_task.save(entity_target.clone()).await {
error!("Failed to send event to target {}: {}", target_name_for_task, e);
} else {
debug!("Successfully saved event to target {}", target_name_for_task);
@@ -180,7 +190,7 @@ impl EventNotifier {
#[instrument(skip(self, targets_to_init))]
pub async fn init_bucket_targets(
&self,
targets_to_init: Vec<Box<dyn Target + Send + Sync>>,
targets_to_init: Vec<Box<dyn Target<Event> + Send + Sync>>,
) -> Result<(), NotificationError> {
// Currently active, simpler logic
let mut target_list_guard = self.target_list.write().await; //Gets a write lock for the TargetList
@@ -189,7 +199,7 @@ impl EventNotifier {
debug!("init bucket target: {}", target_boxed.name());
// TargetList::add method expectations Arc<dyn Target + Send + Sync>
// Therefore, you need to convert Box<dyn Target + Send + Sync> to Arc<dyn Target + Send + Sync>
let target_arc: Arc<dyn Target + Send + Sync> = Arc::from(target_boxed);
let target_arc: Arc<dyn Target<Event> + Send + Sync> = Arc::from(target_boxed);
target_list_guard.add(target_arc)?; // Add Arc<dyn Target> to the list
}
info!(
@@ -203,7 +213,7 @@ impl EventNotifier {
/// A thread-safe list of targets
pub struct TargetList {
targets: HashMap<TargetID, Arc<dyn Target + Send + Sync>>,
targets: HashMap<TargetID, Arc<dyn Target<Event> + Send + Sync>>,
}
impl Default for TargetList {
@@ -219,7 +229,7 @@ impl TargetList {
}
/// Adds a target to the list
pub fn add(&mut self, target: Arc<dyn Target + Send + Sync>) -> Result<(), NotificationError> {
pub fn add(&mut self, target: Arc<dyn Target<Event> + Send + Sync>) -> Result<(), NotificationError> {
let id = target.id();
if self.targets.contains_key(&id) {
// Potentially update or log a warning/error if replacing an existing target.
@@ -231,7 +241,7 @@ impl TargetList {
/// Removes a target by ID. Note: This does not stop its associated event stream.
/// Stream cancellation should be handled by EventNotifier.
pub async fn remove_target_only(&mut self, id: &TargetID) -> Option<Arc<dyn Target + Send + Sync>> {
pub async fn remove_target_only(&mut self, id: &TargetID) -> Option<Arc<dyn Target<Event> + Send + Sync>> {
if let Some(target_arc) = self.targets.remove(id) {
if let Err(e) = target_arc.close().await {
// Target's own close logic
@@ -258,7 +268,7 @@ impl TargetList {
}
/// Returns a target by ID
pub fn get(&self, id: &TargetID) -> Option<Arc<dyn Target + Send + Sync>> {
pub fn get(&self, id: &TargetID) -> Option<Arc<dyn Target<Event> + Send + Sync>> {
self.targets.get(id).cloned()
}

View File

@@ -12,16 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::target::ChannelTargetType;
use crate::{
error::TargetError,
factory::{MQTTTargetFactory, TargetFactory, WebhookTargetFactory},
target::Target,
};
use crate::Event;
use crate::factory::{MQTTTargetFactory, TargetFactory, WebhookTargetFactory};
use futures::stream::{FuturesUnordered, StreamExt};
use rustfs_config::notify::{ENABLE_KEY, NOTIFY_ROUTE_PREFIX};
use rustfs_config::{DEFAULT_DELIMITER, ENV_PREFIX};
use rustfs_config::notify::NOTIFY_ROUTE_PREFIX;
use rustfs_config::{DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX};
use rustfs_ecstore::config::{Config, KVS};
use rustfs_targets::Target;
use rustfs_targets::TargetError;
use rustfs_targets::target::ChannelTargetType;
use std::collections::{HashMap, HashSet};
use tracing::{debug, error, info, warn};
@@ -61,7 +60,7 @@ impl TargetRegistry {
target_type: &str,
id: String,
config: &KVS,
) -> Result<Box<dyn Target + Send + Sync>, TargetError> {
) -> Result<Box<dyn Target<Event> + Send + Sync>, TargetError> {
let factory = self
.factories
.get(target_type)
@@ -83,7 +82,10 @@ impl TargetRegistry {
/// 4. Combine the default configuration, file configuration, and environment variable configuration for each instance.
/// 5. If the instance is enabled, create an asynchronous task for it to instantiate.
/// 6. Concurrency executes all creation tasks and collects results.
pub async fn create_targets_from_config(&self, config: &Config) -> Result<Vec<Box<dyn Target + Send + Sync>>, TargetError> {
pub async fn create_targets_from_config(
&self,
config: &Config,
) -> Result<Vec<Box<dyn Target<Event> + Send + Sync>>, TargetError> {
// Collect only environment variables with the relevant prefix to reduce memory usage
let all_env: Vec<(String, String)> = std::env::vars().filter(|(key, _)| key.starts_with(ENV_PREFIX)).collect();
// A collection of asynchronous tasks for concurrently executing target creation

View File

@@ -14,11 +14,11 @@
use super::rules_map::RulesMap;
use super::xml_config::ParseConfigError as BucketNotificationConfigError;
use crate::EventName;
use crate::arn::TargetID;
use crate::rules::NotificationConfiguration;
use crate::rules::pattern_rules;
use crate::rules::target_id_set;
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::Read;

View File

@@ -14,7 +14,7 @@
use super::pattern;
use super::target_id_set::TargetIdSet;
use crate::arn::TargetID;
use rustfs_targets::arn::TargetID;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

View File

@@ -14,8 +14,8 @@
use super::pattern_rules::PatternRules;
use super::target_id_set::TargetIdSet;
use crate::arn::TargetID;
use crate::event::EventName;
use rustfs_targets::EventName;
use rustfs_targets::arn::TargetID;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::arn::TargetID;
use rustfs_targets::arn::TargetID;
use std::collections::HashSet;
/// TargetIDSet - A collection representation of TargetID.

View File

@@ -13,8 +13,8 @@
// limitations under the License.
use super::pattern;
use crate::arn::{ARN, ArnError, TargetIDError};
use crate::event::EventName;
use rustfs_targets::EventName;
use rustfs_targets::arn::{ARN, ArnError, TargetIDError};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::io::Read;

View File

@@ -12,13 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::{
Event, StoreError,
error::TargetError,
integration::NotificationMetrics,
store::{Key, Store},
target::Target,
};
use crate::{Event, integration::NotificationMetrics};
use rustfs_targets::StoreError;
use rustfs_targets::Target;
use rustfs_targets::TargetError;
use rustfs_targets::store::{Key, Store};
use rustfs_targets::target::EntityTarget;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{Semaphore, mpsc};
@@ -28,7 +27,7 @@ use tracing::{debug, error, info, warn};
/// Streams events from the store to the target
pub async fn stream_events(
store: &mut (dyn Store<Event, Error = StoreError, Key = Key> + Send),
target: &dyn Target,
target: &dyn Target<Event>,
mut cancel_rx: mpsc::Receiver<()>,
) {
info!("Starting event stream for target: {}", target.name());
@@ -107,7 +106,7 @@ pub async fn stream_events(
/// Starts the event streaming process for a target
pub fn start_event_stream(
mut store: Box<dyn Store<Event, Error = StoreError, Key = Key> + Send>,
target: Arc<dyn Target + Send + Sync>,
target: Arc<dyn Target<Event> + Send + Sync>,
) -> mpsc::Sender<()> {
let (cancel_tx, cancel_rx) = mpsc::channel(1);
@@ -121,8 +120,8 @@ pub fn start_event_stream(
/// Start event stream with batch processing
pub fn start_event_stream_with_batching(
mut store: Box<dyn Store<Event, Error = StoreError, Key = Key> + Send>,
target: Arc<dyn Target + Send + Sync>,
mut store: Box<dyn Store<EntityTarget<Event>, Error = StoreError, Key = Key> + Send>,
target: Arc<dyn Target<Event> + Send + Sync>,
metrics: Arc<NotificationMetrics>,
semaphore: Arc<Semaphore>,
) -> mpsc::Sender<()> {
@@ -138,8 +137,8 @@ pub fn start_event_stream_with_batching(
/// Event stream processing with batch processing
pub async fn stream_events_with_batching(
store: &mut (dyn Store<Event, Error = StoreError, Key = Key> + Send),
target: &dyn Target,
store: &mut (dyn Store<EntityTarget<Event>, Error = StoreError, Key = Key> + Send),
target: &dyn Target<Event>,
mut cancel_rx: mpsc::Receiver<()>,
metrics: Arc<NotificationMetrics>,
semaphore: Arc<Semaphore>,
@@ -156,7 +155,7 @@ pub async fn stream_events_with_batching(
const MAX_RETRIES: usize = 5;
const BASE_RETRY_DELAY: Duration = Duration::from_secs(2);
let mut batch = Vec::with_capacity(batch_size);
let mut batch: Vec<EntityTarget<Event>> = Vec::with_capacity(batch_size);
let mut batch_keys = Vec::with_capacity(batch_size);
let mut last_flush = Instant::now();
@@ -234,9 +233,9 @@ pub async fn stream_events_with_batching(
/// Processing event batches
async fn process_batch(
batch: &mut Vec<Event>,
batch: &mut Vec<EntityTarget<Event>>,
batch_keys: &mut Vec<Key>,
target: &dyn Target,
target: &dyn Target<Event>,
max_retries: usize,
base_delay: Duration,
metrics: &Arc<NotificationMetrics>,

View File

@@ -45,4 +45,4 @@ serde_json.workspace = true
md-5 = { workspace = true }
[dev-dependencies]
tokio-test = { workspace = true }
tokio-test = { workspace = true }

31
crates/targets/Cargo.toml Normal file
View File

@@ -0,0 +1,31 @@
[package]
name = "rustfs-targets"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
homepage.workspace = true
description = "Notification target abstraction and implementations for RustFS"
keywords = ["file-system", "notification", "target", "rustfs", "Minio"]
categories = ["web-programming", "development-tools", "filesystem"]
documentation = "https://docs.rs/rustfs-target/latest/rustfs_target/"
[dependencies]
rustfs-config = { workspace = true, features = ["notify", "constants", "audit"] }
rustfs-utils = { workspace = true, features = ["sys"] }
async-trait = { workspace = true }
reqwest = { workspace = true }
rumqttc = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
snap = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "sync", "time"] }
tracing = { workspace = true }
url = { workspace = true }
urlencoding = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
[lints]
workspace = true

View File

@@ -0,0 +1,99 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io;
use thiserror::Error;
/// Error types for the store
#[derive(Debug, Error)]
pub enum StoreError {
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Deserialization error: {0}")]
Deserialization(String),
#[error("Compression error: {0}")]
Compression(String),
#[error("Entry limit exceeded")]
LimitExceeded,
#[error("Entry not found")]
NotFound,
#[error("Invalid entry: {0}")]
Internal(String), // Added internal error type
}
/// Error types for targets
#[derive(Debug, Error)]
pub enum TargetError {
#[error("Storage error: {0}")]
Storage(String),
#[error("Network error: {0}")]
Network(String),
#[error("Request error: {0}")]
Request(String),
#[error("Timeout error: {0}")]
Timeout(String),
#[error("Authentication error: {0}")]
Authentication(String),
#[error("Configuration error: {0}")]
Configuration(String),
#[error("Encoding error: {0}")]
Encoding(String),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Target not connected")]
NotConnected,
#[error("Target initialization failed: {0}")]
Initialization(String),
#[error("Invalid ARN: {0}")]
InvalidARN(String),
#[error("Unknown error: {0}")]
Unknown(String),
#[error("Target is disabled")]
Disabled,
#[error("Configuration parsing error: {0}")]
ParseError(String),
#[error("Failed to save configuration: {0}")]
SaveConfig(String),
#[error("Server not initialized: {0}")]
ServerNotInitialized(String),
}
impl From<url::ParseError> for TargetError {
fn from(err: url::ParseError) -> Self {
TargetError::Configuration(format!("URL parse error: {err}"))
}
}

View File

@@ -0,0 +1,290 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
use std::fmt;
/// Error returned when parsing event name string fails.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParseEventNameError(String);
impl fmt::Display for ParseEventNameError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Invalid event name:{}", self.0)
}
}
impl std::error::Error for ParseEventNameError {}
/// Represents the type of event that occurs on the object.
/// Based on AWS S3 event type and includes RustFS extension.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum EventName {
// Single event type (values are 1-32 for compatible mask logic)
ObjectAccessedGet = 1,
ObjectAccessedGetRetention = 2,
ObjectAccessedGetLegalHold = 3,
ObjectAccessedHead = 4,
ObjectAccessedAttributes = 5,
ObjectCreatedCompleteMultipartUpload = 6,
ObjectCreatedCopy = 7,
ObjectCreatedPost = 8,
ObjectCreatedPut = 9,
ObjectCreatedPutRetention = 10,
ObjectCreatedPutLegalHold = 11,
ObjectCreatedPutTagging = 12,
ObjectCreatedDeleteTagging = 13,
ObjectRemovedDelete = 14,
ObjectRemovedDeleteMarkerCreated = 15,
ObjectRemovedDeleteAllVersions = 16,
ObjectRemovedNoOP = 17,
BucketCreated = 18,
BucketRemoved = 19,
ObjectReplicationFailed = 20,
ObjectReplicationComplete = 21,
ObjectReplicationMissedThreshold = 22,
ObjectReplicationReplicatedAfterThreshold = 23,
ObjectReplicationNotTracked = 24,
ObjectRestorePost = 25,
ObjectRestoreCompleted = 26,
ObjectTransitionFailed = 27,
ObjectTransitionComplete = 28,
ScannerManyVersions = 29, // ObjectManyVersions corresponding to Go
ScannerLargeVersions = 30, // ObjectLargeVersions corresponding to Go
ScannerBigPrefix = 31, // PrefixManyFolders corresponding to Go
LifecycleDelMarkerExpirationDelete = 32, // ILMDelMarkerExpirationDelete corresponding to Go
// Compound "All" event type (no sequential value for mask)
ObjectAccessedAll,
ObjectCreatedAll,
ObjectRemovedAll,
ObjectReplicationAll,
ObjectRestoreAll,
ObjectTransitionAll,
ObjectScannerAll, // New, from Go
Everything, // New, from Go
}
// Single event type sequential array for Everything.expand()
const SINGLE_EVENT_NAMES_IN_ORDER: [EventName; 32] = [
EventName::ObjectAccessedGet,
EventName::ObjectAccessedGetRetention,
EventName::ObjectAccessedGetLegalHold,
EventName::ObjectAccessedHead,
EventName::ObjectAccessedAttributes,
EventName::ObjectCreatedCompleteMultipartUpload,
EventName::ObjectCreatedCopy,
EventName::ObjectCreatedPost,
EventName::ObjectCreatedPut,
EventName::ObjectCreatedPutRetention,
EventName::ObjectCreatedPutLegalHold,
EventName::ObjectCreatedPutTagging,
EventName::ObjectCreatedDeleteTagging,
EventName::ObjectRemovedDelete,
EventName::ObjectRemovedDeleteMarkerCreated,
EventName::ObjectRemovedDeleteAllVersions,
EventName::ObjectRemovedNoOP,
EventName::BucketCreated,
EventName::BucketRemoved,
EventName::ObjectReplicationFailed,
EventName::ObjectReplicationComplete,
EventName::ObjectReplicationMissedThreshold,
EventName::ObjectReplicationReplicatedAfterThreshold,
EventName::ObjectReplicationNotTracked,
EventName::ObjectRestorePost,
EventName::ObjectRestoreCompleted,
EventName::ObjectTransitionFailed,
EventName::ObjectTransitionComplete,
EventName::ScannerManyVersions,
EventName::ScannerLargeVersions,
EventName::ScannerBigPrefix,
EventName::LifecycleDelMarkerExpirationDelete,
];
const LAST_SINGLE_TYPE_VALUE: u32 = EventName::LifecycleDelMarkerExpirationDelete as u32;
impl EventName {
/// The parsed string is EventName.
pub fn parse(s: &str) -> Result<Self, ParseEventNameError> {
match s {
"s3:BucketCreated:*" => Ok(EventName::BucketCreated),
"s3:BucketRemoved:*" => Ok(EventName::BucketRemoved),
"s3:ObjectAccessed:*" => Ok(EventName::ObjectAccessedAll),
"s3:ObjectAccessed:Get" => Ok(EventName::ObjectAccessedGet),
"s3:ObjectAccessed:GetRetention" => Ok(EventName::ObjectAccessedGetRetention),
"s3:ObjectAccessed:GetLegalHold" => Ok(EventName::ObjectAccessedGetLegalHold),
"s3:ObjectAccessed:Head" => Ok(EventName::ObjectAccessedHead),
"s3:ObjectAccessed:Attributes" => Ok(EventName::ObjectAccessedAttributes),
"s3:ObjectCreated:*" => Ok(EventName::ObjectCreatedAll),
"s3:ObjectCreated:CompleteMultipartUpload" => Ok(EventName::ObjectCreatedCompleteMultipartUpload),
"s3:ObjectCreated:Copy" => Ok(EventName::ObjectCreatedCopy),
"s3:ObjectCreated:Post" => Ok(EventName::ObjectCreatedPost),
"s3:ObjectCreated:Put" => Ok(EventName::ObjectCreatedPut),
"s3:ObjectCreated:PutRetention" => Ok(EventName::ObjectCreatedPutRetention),
"s3:ObjectCreated:PutLegalHold" => Ok(EventName::ObjectCreatedPutLegalHold),
"s3:ObjectCreated:PutTagging" => Ok(EventName::ObjectCreatedPutTagging),
"s3:ObjectCreated:DeleteTagging" => Ok(EventName::ObjectCreatedDeleteTagging),
"s3:ObjectRemoved:*" => Ok(EventName::ObjectRemovedAll),
"s3:ObjectRemoved:Delete" => Ok(EventName::ObjectRemovedDelete),
"s3:ObjectRemoved:DeleteMarkerCreated" => Ok(EventName::ObjectRemovedDeleteMarkerCreated),
"s3:ObjectRemoved:NoOP" => Ok(EventName::ObjectRemovedNoOP),
"s3:ObjectRemoved:DeleteAllVersions" => Ok(EventName::ObjectRemovedDeleteAllVersions),
"s3:LifecycleDelMarkerExpiration:Delete" => Ok(EventName::LifecycleDelMarkerExpirationDelete),
"s3:Replication:*" => Ok(EventName::ObjectReplicationAll),
"s3:Replication:OperationFailedReplication" => Ok(EventName::ObjectReplicationFailed),
"s3:Replication:OperationCompletedReplication" => Ok(EventName::ObjectReplicationComplete),
"s3:Replication:OperationMissedThreshold" => Ok(EventName::ObjectReplicationMissedThreshold),
"s3:Replication:OperationReplicatedAfterThreshold" => Ok(EventName::ObjectReplicationReplicatedAfterThreshold),
"s3:Replication:OperationNotTracked" => Ok(EventName::ObjectReplicationNotTracked),
"s3:ObjectRestore:*" => Ok(EventName::ObjectRestoreAll),
"s3:ObjectRestore:Post" => Ok(EventName::ObjectRestorePost),
"s3:ObjectRestore:Completed" => Ok(EventName::ObjectRestoreCompleted),
"s3:ObjectTransition:Failed" => Ok(EventName::ObjectTransitionFailed),
"s3:ObjectTransition:Complete" => Ok(EventName::ObjectTransitionComplete),
"s3:ObjectTransition:*" => Ok(EventName::ObjectTransitionAll),
"s3:Scanner:ManyVersions" => Ok(EventName::ScannerManyVersions),
"s3:Scanner:LargeVersions" => Ok(EventName::ScannerLargeVersions),
"s3:Scanner:BigPrefix" => Ok(EventName::ScannerBigPrefix),
// ObjectScannerAll and Everything cannot be parsed from strings, because the Go version also does not define their string representation.
_ => Err(ParseEventNameError(s.to_string())),
}
}
/// Returns a string representation of the event type.
pub fn as_str(&self) -> &'static str {
match self {
EventName::BucketCreated => "s3:BucketCreated:*",
EventName::BucketRemoved => "s3:BucketRemoved:*",
EventName::ObjectAccessedAll => "s3:ObjectAccessed:*",
EventName::ObjectAccessedGet => "s3:ObjectAccessed:Get",
EventName::ObjectAccessedGetRetention => "s3:ObjectAccessed:GetRetention",
EventName::ObjectAccessedGetLegalHold => "s3:ObjectAccessed:GetLegalHold",
EventName::ObjectAccessedHead => "s3:ObjectAccessed:Head",
EventName::ObjectAccessedAttributes => "s3:ObjectAccessed:Attributes",
EventName::ObjectCreatedAll => "s3:ObjectCreated:*",
EventName::ObjectCreatedCompleteMultipartUpload => "s3:ObjectCreated:CompleteMultipartUpload",
EventName::ObjectCreatedCopy => "s3:ObjectCreated:Copy",
EventName::ObjectCreatedPost => "s3:ObjectCreated:Post",
EventName::ObjectCreatedPut => "s3:ObjectCreated:Put",
EventName::ObjectCreatedPutTagging => "s3:ObjectCreated:PutTagging",
EventName::ObjectCreatedDeleteTagging => "s3:ObjectCreated:DeleteTagging",
EventName::ObjectCreatedPutRetention => "s3:ObjectCreated:PutRetention",
EventName::ObjectCreatedPutLegalHold => "s3:ObjectCreated:PutLegalHold",
EventName::ObjectRemovedAll => "s3:ObjectRemoved:*",
EventName::ObjectRemovedDelete => "s3:ObjectRemoved:Delete",
EventName::ObjectRemovedDeleteMarkerCreated => "s3:ObjectRemoved:DeleteMarkerCreated",
EventName::ObjectRemovedNoOP => "s3:ObjectRemoved:NoOP",
EventName::ObjectRemovedDeleteAllVersions => "s3:ObjectRemoved:DeleteAllVersions",
EventName::LifecycleDelMarkerExpirationDelete => "s3:LifecycleDelMarkerExpiration:Delete",
EventName::ObjectReplicationAll => "s3:Replication:*",
EventName::ObjectReplicationFailed => "s3:Replication:OperationFailedReplication",
EventName::ObjectReplicationComplete => "s3:Replication:OperationCompletedReplication",
EventName::ObjectReplicationNotTracked => "s3:Replication:OperationNotTracked",
EventName::ObjectReplicationMissedThreshold => "s3:Replication:OperationMissedThreshold",
EventName::ObjectReplicationReplicatedAfterThreshold => "s3:Replication:OperationReplicatedAfterThreshold",
EventName::ObjectRestoreAll => "s3:ObjectRestore:*",
EventName::ObjectRestorePost => "s3:ObjectRestore:Post",
EventName::ObjectRestoreCompleted => "s3:ObjectRestore:Completed",
EventName::ObjectTransitionAll => "s3:ObjectTransition:*",
EventName::ObjectTransitionFailed => "s3:ObjectTransition:Failed",
EventName::ObjectTransitionComplete => "s3:ObjectTransition:Complete",
EventName::ScannerManyVersions => "s3:Scanner:ManyVersions",
EventName::ScannerLargeVersions => "s3:Scanner:LargeVersions",
EventName::ScannerBigPrefix => "s3:Scanner:BigPrefix",
// Go's String() returns "" for ObjectScannerAll and Everything
EventName::ObjectScannerAll => "s3:Scanner:*", // Follow the pattern in Go Expand
EventName::Everything => "", // Go String() returns "" to unprocessed
}
}
/// Returns the extended value of the abbreviation event type.
pub fn expand(&self) -> Vec<Self> {
match self {
EventName::ObjectAccessedAll => vec![
EventName::ObjectAccessedGet,
EventName::ObjectAccessedHead,
EventName::ObjectAccessedGetRetention,
EventName::ObjectAccessedGetLegalHold,
EventName::ObjectAccessedAttributes,
],
EventName::ObjectCreatedAll => vec![
EventName::ObjectCreatedCompleteMultipartUpload,
EventName::ObjectCreatedCopy,
EventName::ObjectCreatedPost,
EventName::ObjectCreatedPut,
EventName::ObjectCreatedPutRetention,
EventName::ObjectCreatedPutLegalHold,
EventName::ObjectCreatedPutTagging,
EventName::ObjectCreatedDeleteTagging,
],
EventName::ObjectRemovedAll => vec![
EventName::ObjectRemovedDelete,
EventName::ObjectRemovedDeleteMarkerCreated,
EventName::ObjectRemovedNoOP,
EventName::ObjectRemovedDeleteAllVersions,
],
EventName::ObjectReplicationAll => vec![
EventName::ObjectReplicationFailed,
EventName::ObjectReplicationComplete,
EventName::ObjectReplicationNotTracked,
EventName::ObjectReplicationMissedThreshold,
EventName::ObjectReplicationReplicatedAfterThreshold,
],
EventName::ObjectRestoreAll => vec![EventName::ObjectRestorePost, EventName::ObjectRestoreCompleted],
EventName::ObjectTransitionAll => vec![EventName::ObjectTransitionFailed, EventName::ObjectTransitionComplete],
EventName::ObjectScannerAll => vec![
// New
EventName::ScannerManyVersions,
EventName::ScannerLargeVersions,
EventName::ScannerBigPrefix,
],
EventName::Everything => {
// New
SINGLE_EVENT_NAMES_IN_ORDER.to_vec()
}
// A single type returns to itself directly
_ => vec![*self],
}
}
/// Returns the mask of type.
/// The compound "All" type will be expanded.
pub fn mask(&self) -> u64 {
let value = *self as u32;
if value > 0 && value <= LAST_SINGLE_TYPE_VALUE {
// It's a single type
1u64 << (value - 1)
} else {
// It's a compound type
let mut mask = 0u64;
for n in self.expand() {
mask |= n.mask(); // Recursively call mask
}
mask
}
}
}
impl fmt::Display for EventName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}
/// Convert to `EventName` according to string
impl From<&str> for EventName {
fn from(event_str: &str) -> Self {
EventName::parse(event_str).unwrap_or_else(|e| panic!("{}", e))
}
}

35
crates/targets/src/lib.rs Normal file
View File

@@ -0,0 +1,35 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod arn;
pub mod error;
mod event_name;
pub mod store;
pub mod target;
pub use error::{StoreError, TargetError};
pub use event_name::EventName;
use serde::{Deserialize, Serialize};
pub use target::Target;
/// Represents a log of events for sending to targets
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TargetLog<E> {
/// The event name
pub event_name: EventName,
/// The object key
pub key: String,
/// The list of events
pub records: Vec<E>,
}

View File

@@ -13,7 +13,8 @@
// limitations under the License.
use crate::error::StoreError;
use rustfs_config::notify::{COMPRESS_EXT, DEFAULT_EXT, DEFAULT_LIMIT};
use rustfs_config::DEFAULT_LIMIT;
use rustfs_config::notify::{COMPRESS_EXT, DEFAULT_EXT};
use serde::{Serialize, de::DeserializeOwned};
use snap::raw::{Decoder, Encoder};
use std::sync::{Arc, RwLock};
@@ -123,7 +124,10 @@ pub fn parse_key(s: &str) -> Key {
}
/// Trait for a store that can store and retrieve items of type T
pub trait Store<T>: Send + Sync {
pub trait Store<T>: Send + Sync
where
T: Send + Sync + 'static + Clone + Serialize,
{
/// The error type for the store
type Error;
/// The key type for the store

View File

@@ -14,8 +14,11 @@
use crate::arn::TargetID;
use crate::store::{Key, Store};
use crate::{Event, StoreError, TargetError};
use crate::{EventName, StoreError, TargetError};
use async_trait::async_trait;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::fmt::Formatter;
use std::sync::Arc;
pub mod mqtt;
@@ -23,7 +26,10 @@ pub mod webhook;
/// Trait for notification targets
#[async_trait]
pub trait Target: Send + Sync + 'static {
pub trait Target<E>: Send + Sync + 'static
where
E: Send + Sync + 'static + Clone + Serialize + DeserializeOwned,
{
/// Returns the ID of the target
fn id(&self) -> TargetID;
@@ -36,7 +42,7 @@ pub trait Target: Send + Sync + 'static {
async fn is_active(&self) -> Result<bool, TargetError>;
/// Saves an event (either sends it immediately or stores it for later)
async fn save(&self, event: Arc<Event>) -> Result<(), TargetError>;
async fn save(&self, event: Arc<EntityTarget<E>>) -> Result<(), TargetError>;
/// Sends an event from the store
async fn send_from_store(&self, key: Key) -> Result<(), TargetError>;
@@ -45,10 +51,10 @@ pub trait Target: Send + Sync + 'static {
async fn close(&self) -> Result<(), TargetError>;
/// Returns the store associated with the target (if any)
fn store(&self) -> Option<&(dyn Store<Event, Error = StoreError, Key = Key> + Send + Sync)>;
fn store(&self) -> Option<&(dyn Store<EntityTarget<E>, Error = StoreError, Key = Key> + Send + Sync)>;
/// Returns the type of the target
fn clone_dyn(&self) -> Box<dyn Target + Send + Sync>;
fn clone_dyn(&self) -> Box<dyn Target<E> + Send + Sync>;
/// Initialize the target, such as establishing a connection, etc.
async fn init(&self) -> Result<(), TargetError> {
@@ -60,6 +66,17 @@ pub trait Target: Send + Sync + 'static {
fn is_enabled(&self) -> bool;
}
#[derive(Debug, Serialize, Clone, Deserialize)]
pub struct EntityTarget<E>
where
E: Send + Sync + 'static + Clone + Serialize,
{
pub object_name: String,
pub bucket_name: String,
pub event_name: EventName,
pub data: E,
}
/// The `ChannelTargetType` enum represents the different types of channel Target
/// used in the notification system.
///
@@ -75,7 +92,7 @@ pub trait Target: Send + Sync + 'static {
///
/// example usage:
/// ```rust
/// use rustfs_notify::target::ChannelTargetType;
/// use rustfs_targets::target::ChannelTargetType;
///
/// let target_type = ChannelTargetType::Webhook;
/// assert_eq!(target_type.as_str(), "webhook");
@@ -101,7 +118,7 @@ impl ChannelTargetType {
}
impl std::fmt::Display for ChannelTargetType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ChannelTargetType::Webhook => write!(f, "webhook"),
ChannelTargetType::Kafka => write!(f, "kafka"),
@@ -117,3 +134,28 @@ pub fn parse_bool(value: &str) -> Result<bool, TargetError> {
_ => Err(TargetError::ParseError(format!("Unable to parse boolean: {value}"))),
}
}
/// `TargetType` enum represents the type of target in the notification system.
#[derive(Debug, Clone)]
pub enum TargetType {
AuditLog,
NotifyEvent,
}
impl TargetType {
pub fn as_str(&self) -> &'static str {
match self {
TargetType::AuditLog => "audit_log",
TargetType::NotifyEvent => "notify_event",
}
}
}
impl std::fmt::Display for TargetType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
TargetType::AuditLog => write!(f, "audit_log"),
TargetType::NotifyEvent => write!(f, "notify_event"),
}
}
}

View File

@@ -13,18 +13,13 @@
// limitations under the License.
use crate::store::Key;
use crate::target::ChannelTargetType;
use crate::{
StoreError, Target,
arn::TargetID,
error::TargetError,
event::{Event, EventLog},
store::Store,
};
use crate::target::{ChannelTargetType, EntityTarget, TargetType};
use crate::{StoreError, Target, TargetLog, arn::TargetID, error::TargetError, store::Store};
use async_trait::async_trait;
use rumqttc::{AsyncClient, EventLoop, MqttOptions, Outgoing, Packet, QoS};
use rumqttc::{ConnectionError, mqttbytes::Error as MqttBytesError};
use rustfs_config::notify::STORE_EXTENSION;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::sync::Arc;
use std::{
path::PathBuf,
@@ -62,6 +57,8 @@ pub struct MQTTArgs {
pub queue_dir: String,
/// The maximum number of events to store
pub queue_limit: u64,
/// the target type
pub target_type: TargetType,
}
impl MQTTArgs {
@@ -77,6 +74,10 @@ impl MQTTArgs {
}
}
if self.topic.is_empty() {
return Err(TargetError::Configuration("MQTT topic cannot be empty".to_string()));
}
if !self.queue_dir.is_empty() {
let path = std::path::Path::new(&self.queue_dir);
if !path.is_absolute() {
@@ -100,16 +101,22 @@ struct BgTaskManager {
}
/// A target that sends events to an MQTT broker
pub struct MQTTTarget {
pub struct MQTTTarget<E>
where
E: Send + Sync + 'static + Clone + Serialize + DeserializeOwned,
{
id: TargetID,
args: MQTTArgs,
client: Arc<Mutex<Option<AsyncClient>>>,
store: Option<Box<dyn Store<Event, Error = StoreError, Key = Key> + Send + Sync>>,
store: Option<Box<dyn Store<EntityTarget<E>, Error = StoreError, Key = Key> + Send + Sync>>,
connected: Arc<AtomicBool>,
bg_task_manager: Arc<BgTaskManager>,
}
impl MQTTTarget {
impl<E> MQTTTarget<E>
where
E: Send + Sync + 'static + Clone + Serialize + DeserializeOwned,
{
/// Creates a new MQTTTarget
#[instrument(skip(args), fields(target_id_as_string = %id))]
pub fn new(id: String, args: MQTTArgs) -> Result<Self, TargetError> {
@@ -121,7 +128,12 @@ impl MQTTTarget {
// Ensure the directory name is valid for filesystem
let specific_queue_path = base_path.join(unique_dir_name);
debug!(target_id = %target_id, path = %specific_queue_path.display(), "Initializing queue store for MQTT target");
let store = crate::store::QueueStore::<Event>::new(specific_queue_path, args.queue_limit, STORE_EXTENSION);
let extension = match args.target_type {
TargetType::AuditLog => rustfs_config::audit::AUDIT_STORE_EXTENSION,
TargetType::NotifyEvent => rustfs_config::notify::STORE_EXTENSION,
};
let store = crate::store::QueueStore::<EntityTarget<E>>::new(specific_queue_path, args.queue_limit, extension);
if let Err(e) = store.open() {
error!(
target_id = %target_id,
@@ -130,7 +142,7 @@ impl MQTTTarget {
);
return Err(TargetError::Storage(format!("{e}")));
}
Some(Box::new(store) as Box<dyn Store<Event, Error = StoreError, Key = Key> + Send + Sync>)
Some(Box::new(store) as Box<dyn Store<EntityTarget<E>, Error = StoreError, Key = Key> + Send + Sync>)
} else {
None
};
@@ -237,18 +249,18 @@ impl MQTTTarget {
}
#[instrument(skip(self, event), fields(target_id = %self.id))]
async fn send(&self, event: &Event) -> Result<(), TargetError> {
async fn send(&self, event: &EntityTarget<E>) -> Result<(), TargetError> {
let client_guard = self.client.lock().await;
let client = client_guard
.as_ref()
.ok_or_else(|| TargetError::Configuration("MQTT client not initialized".to_string()))?;
let object_name = urlencoding::decode(&event.s3.object.key)
let object_name = urlencoding::decode(&event.object_name)
.map_err(|e| TargetError::Encoding(format!("Failed to decode object key: {e}")))?;
let key = format!("{}/{}", event.s3.bucket.name, object_name);
let key = format!("{}/{}", event.bucket_name, object_name);
let log = EventLog {
let log = TargetLog {
event_name: event.event_name,
key,
records: vec![event.clone()],
@@ -256,7 +268,6 @@ impl MQTTTarget {
let data = serde_json::to_vec(&log).map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {e}")))?;
// Vec<u8> Convert to String, only for printing logs
let data_string = String::from_utf8(data.clone())
.map_err(|e| TargetError::Encoding(format!("Failed to convert event data to UTF-8: {e}")))?;
debug!("Sending event to mqtt target: {}, event log: {}", self.id, data_string);
@@ -278,7 +289,7 @@ impl MQTTTarget {
Ok(())
}
pub fn clone_target(&self) -> Box<dyn Target + Send + Sync> {
pub fn clone_target(&self) -> Box<dyn Target<E> + Send + Sync> {
Box::new(MQTTTarget {
id: self.id.clone(),
args: self.args.clone(),
@@ -444,7 +455,10 @@ fn is_fatal_mqtt_error(err: &ConnectionError) -> bool {
}
#[async_trait]
impl Target for MQTTTarget {
impl<E> Target<E> for MQTTTarget<E>
where
E: Send + Sync + 'static + Clone + Serialize + DeserializeOwned,
{
fn id(&self) -> TargetID {
self.id.clone()
}
@@ -476,7 +490,7 @@ impl Target for MQTTTarget {
}
#[instrument(skip(self, event), fields(target_id = %self.id))]
async fn save(&self, event: Arc<Event>) -> Result<(), TargetError> {
async fn save(&self, event: Arc<EntityTarget<E>>) -> Result<(), TargetError> {
if let Some(store) = &self.store {
debug!(target_id = %self.id, "Event saved to store start");
// If store is configured, ONLY put the event into the store.
@@ -620,11 +634,11 @@ impl Target for MQTTTarget {
Ok(())
}
fn store(&self) -> Option<&(dyn Store<Event, Error = StoreError, Key = Key> + Send + Sync)> {
fn store(&self) -> Option<&(dyn Store<EntityTarget<E>, Error = StoreError, Key = Key> + Send + Sync)> {
self.store.as_deref()
}
fn clone_dyn(&self) -> Box<dyn Target + Send + Sync> {
fn clone_dyn(&self) -> Box<dyn Target<E> + Send + Sync> {
self.clone_target()
}

View File

@@ -12,17 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::target::ChannelTargetType;
use crate::target::{ChannelTargetType, EntityTarget, TargetType};
use crate::{
StoreError, Target,
StoreError, Target, TargetLog,
arn::TargetID,
error::TargetError,
event::{Event, EventLog},
store::{Key, Store},
};
use async_trait::async_trait;
use reqwest::{Client, StatusCode, Url};
use rustfs_config::notify::STORE_EXTENSION;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::{
path::PathBuf,
sync::{
@@ -53,6 +54,8 @@ pub struct WebhookArgs {
pub client_cert: String,
/// The client key for TLS (PEM format)
pub client_key: String,
/// the target type
pub target_type: TargetType,
}
impl WebhookArgs {
@@ -84,20 +87,26 @@ impl WebhookArgs {
}
/// A target that sends events to a webhook
pub struct WebhookTarget {
pub struct WebhookTarget<E>
where
E: Send + Sync + 'static + Clone + Serialize + DeserializeOwned,
{
id: TargetID,
args: WebhookArgs,
http_client: Arc<Client>,
// Add Send + Sync constraints to ensure thread safety
store: Option<Box<dyn Store<Event, Error = StoreError, Key = Key> + Send + Sync>>,
store: Option<Box<dyn Store<EntityTarget<E>, Error = StoreError, Key = Key> + Send + Sync>>,
initialized: AtomicBool,
addr: String,
cancel_sender: mpsc::Sender<()>,
}
impl WebhookTarget {
impl<E> WebhookTarget<E>
where
E: Send + Sync + 'static + Clone + Serialize + DeserializeOwned,
{
/// Clones the WebhookTarget, creating a new instance with the same configuration
pub fn clone_box(&self) -> Box<dyn Target + Send + Sync> {
pub fn clone_box(&self) -> Box<dyn Target<E> + Send + Sync> {
Box::new(WebhookTarget {
id: self.id.clone(),
args: self.args.clone(),
@@ -144,7 +153,7 @@ impl WebhookTarget {
let queue_store = if !args.queue_dir.is_empty() {
let queue_dir =
PathBuf::from(&args.queue_dir).join(format!("rustfs-{}-{}", ChannelTargetType::Webhook.as_str(), target_id.id));
let store = crate::store::QueueStore::<Event>::new(queue_dir, args.queue_limit, STORE_EXTENSION);
let store = crate::store::QueueStore::<EntityTarget<E>>::new(queue_dir, args.queue_limit, STORE_EXTENSION);
if let Err(e) = store.open() {
error!("Failed to open store for Webhook target {}: {}", target_id.id, e);
@@ -152,7 +161,7 @@ impl WebhookTarget {
}
// Make sure that the Store trait implemented by QueueStore matches the expected error type
Some(Box::new(store) as Box<dyn Store<Event, Error = StoreError, Key = Key> + Send + Sync>)
Some(Box::new(store) as Box<dyn Store<EntityTarget<E>, Error = StoreError, Key = Key> + Send + Sync>)
} else {
None
};
@@ -203,17 +212,17 @@ impl WebhookTarget {
Ok(())
}
async fn send(&self, event: &Event) -> Result<(), TargetError> {
async fn send(&self, event: &EntityTarget<E>) -> Result<(), TargetError> {
info!("Webhook Sending event to webhook target: {}", self.id);
let object_name = urlencoding::decode(&event.s3.object.key)
let object_name = urlencoding::decode(&event.object_name)
.map_err(|e| TargetError::Encoding(format!("Failed to decode object key: {e}")))?;
let key = format!("{}/{}", event.s3.bucket.name, object_name);
let key = format!("{}/{}", event.bucket_name, object_name);
let log = EventLog {
let log = TargetLog {
event_name: event.event_name,
key,
records: vec![event.clone()],
records: vec![event.data.clone()],
};
let data = serde_json::to_vec(&log).map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {e}")))?;
@@ -275,12 +284,14 @@ impl WebhookTarget {
}
#[async_trait]
impl Target for WebhookTarget {
impl<E> Target<E> for WebhookTarget<E>
where
E: Send + Sync + 'static + Clone + Serialize + DeserializeOwned,
{
fn id(&self) -> TargetID {
self.id.clone()
}
// Make sure Future is Send
async fn is_active(&self) -> Result<bool, TargetError> {
let socket_addr = lookup_host(&self.addr)
.await
@@ -305,7 +316,7 @@ impl Target for WebhookTarget {
}
}
async fn save(&self, event: Arc<Event>) -> Result<(), TargetError> {
async fn save(&self, event: Arc<EntityTarget<E>>) -> Result<(), TargetError> {
if let Some(store) = &self.store {
// Call the store method directly, no longer need to acquire the lock
store
@@ -379,17 +390,15 @@ impl Target for WebhookTarget {
Ok(())
}
fn store(&self) -> Option<&(dyn Store<Event, Error = StoreError, Key = Key> + Send + Sync)> {
fn store(&self) -> Option<&(dyn Store<EntityTarget<E>, Error = StoreError, Key = Key> + Send + Sync)> {
// Returns the reference to the internal store
self.store.as_deref()
}
fn clone_dyn(&self) -> Box<dyn Target + Send + Sync> {
fn clone_dyn(&self) -> Box<dyn Target<E> + Send + Sync> {
self.clone_box()
}
// The existing init method can meet the needs well, but we need to make sure it complies with the Target trait
// We can use the existing init method, but adjust the return value to match the trait requirement
async fn init(&self) -> Result<(), TargetError> {
// If the target is disabled, return to success directly
if !self.is_enabled() {

View File

@@ -49,8 +49,10 @@ rustfs-config = { workspace = true, features = ["constants", "notify"] }
rustfs-notify = { workspace = true }
rustfs-obs = { workspace = true }
rustfs-utils = { workspace = true, features = ["full"] }
rustfs-protos.workspace = true
rustfs-protos = { workspace = true }
rustfs-s3select-query = { workspace = true }
rustfs-audit-logger = { workspace = true }
rustfs-targets = { workspace = true }
atoi = { workspace = true }
atomic_enum = { workspace = true }
axum.workspace = true

View File

@@ -18,9 +18,10 @@ use crate::admin::router::Operation;
use crate::auth::{check_key_valid, get_session_token};
use http::{HeaderMap, StatusCode};
use matchit::Params;
use rustfs_config::notify::{ENABLE_KEY, ENABLE_ON, NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS};
use rustfs_notify::EventName;
use rustfs_config::notify::{NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS};
use rustfs_config::{ENABLE_KEY, EnableState};
use rustfs_notify::rules::{BucketNotificationConfig, PatternRules};
use rustfs_targets::EventName;
use s3s::header::CONTENT_LENGTH;
use s3s::{Body, S3Error, S3ErrorCode, S3Request, S3Response, S3Result, header::CONTENT_TYPE, s3_error};
use serde::{Deserialize, Serialize};
@@ -78,7 +79,7 @@ impl Operation for SetNotificationTarget {
.map_err(|e| s3_error!(InvalidArgument, "invalid json body for target config: {}", e))?;
// If there is an enable key, add an enable key value to "on"
if !kvs_map.contains_key(ENABLE_KEY) {
kvs_map.insert(ENABLE_KEY.to_string(), ENABLE_ON.to_string());
kvs_map.insert(ENABLE_KEY.to_string(), EnableState::On.to_string());
}
let kvs = rustfs_ecstore::config::KVS(

View File

@@ -25,7 +25,10 @@ use handlers::{
sts, tier, user,
};
use crate::admin::handlers::event::{ListNotificationTargets, RemoveNotificationTarget, SetNotificationTarget};
use crate::admin::handlers::event::{
GetBucketNotification, ListNotificationTargets, RemoveBucketNotification, RemoveNotificationTarget, SetBucketNotification,
SetNotificationTarget,
};
use handlers::{GetReplicationMetricsHandler, ListRemoteTargetHandler, RemoveRemoteTargetHandler, SetRemoteTargetHandler};
use hyper::Method;
use router::{AdminOperation, S3Router};
@@ -389,5 +392,23 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> std::io::Result<()>
AdminOperation(&RemoveNotificationTarget {}),
)?;
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/target-set-bucket").as_str(),
AdminOperation(&SetBucketNotification {}),
)?;
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/target-get-bucket").as_str(),
AdminOperation(&GetBucketNotification {}),
)?;
r.insert(
Method::POST,
format!("{}{}", ADMIN_PREFIX, "/v3/target-remove-bucket").as_str(),
AdminOperation(&RemoveBucketNotification {}),
)?;
Ok(())
}

View File

@@ -72,7 +72,7 @@ pub struct Opt {
#[arg(long, default_value_t = rustfs_config::DEFAULT_OBS_ENDPOINT.to_string(), env = "RUSTFS_OBS_ENDPOINT")]
pub obs_endpoint: String,
/// tls path for rustfs api and console.
/// tls path for rustfs API and console.
#[arg(long, env = "RUSTFS_TLS_PATH")]
pub tls_path: Option<String>,

View File

@@ -0,0 +1,13 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

View File

@@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod audit;
mod http;
mod hybrid;
mod layer;
mod service_state;
pub(crate) use http::start_http_server;
pub(crate) use service_state::SHUTDOWN_TIMEOUT;
pub(crate) use service_state::ServiceState;

View File

@@ -76,7 +76,6 @@ use rustfs_ecstore::store_api::PutObjReader;
use rustfs_ecstore::store_api::StorageAPI;
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
use rustfs_notify::EventName;
use rustfs_policy::auth;
use rustfs_policy::policy::action::Action;
use rustfs_policy::policy::action::S3Action;
@@ -86,6 +85,7 @@ use rustfs_rio::EtagReader;
use rustfs_rio::HashReader;
use rustfs_rio::Reader;
use rustfs_rio::WarpReader;
use rustfs_targets::EventName;
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::path::path_join_buf;
use rustfs_zip::CompressionFormat;