From a95e549430851f3c34ee4cc8dd7be60aa200e8c4 Mon Sep 17 00:00:00 2001 From: houseme Date: Wed, 7 Jan 2026 18:05:52 +0800 Subject: [PATCH] Fix/fix improve for audit (#1418) --- Cargo.lock | 12 +++--- Cargo.toml | 8 ++-- Dockerfile | 4 +- Dockerfile.source | 2 +- crates/audit/src/registry.rs | 77 ++++++++++++++++++++++++++++++++++++ crates/audit/src/system.rs | 43 ++++++++++---------- crates/targets/src/arn.rs | 9 +---- rustfs/src/server/http.rs | 19 +++------ rustfs/src/storage/helper.rs | 8 ++++ 9 files changed, 127 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 875747ac..5185f080 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8503,9 +8503,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.35" +version = "0.23.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" +checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" dependencies = [ "aws-lc-rs", "log", @@ -8864,9 +8864,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.148" +version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3084b546a1dd6289475996f182a22aba973866ea8e8b02c51d9f46b1336a22da" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ "itoa", "memchr", @@ -10466,9 +10466,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.7" +version = "2.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" dependencies = [ "form_urlencoded", "idna", diff --git a/Cargo.toml b/Cargo.toml index 083860d2..b34b260c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,7 @@ resolver = "2" edition = "2024" license = "Apache-2.0" repository = "https://github.com/rustfs/rustfs" -rust-version = "1.88" +rust-version = "1.90" version = "0.0.5" homepage = "https://rustfs.com" description = "RustFS is a high-performance distributed object storage software built using Rust, one of the most popular languages worldwide. " @@ -136,7 +136,7 @@ rmcp = { version = "0.12.0" } rmp = { version = "0.8.15" } rmp-serde = { version = "1.3.1" } serde = { version = "1.0.228", features = ["derive"] } -serde_json = { version = "1.0.148", features = ["raw_value"] } +serde_json = { version = "1.0.149", features = ["raw_value"] } serde_urlencoded = "0.7.1" schemars = "1.2.0" @@ -150,7 +150,7 @@ hmac = { version = "0.13.0-rc.3" } jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] } pbkdf2 = "0.13.0-rc.5" rsa = { version = "0.10.0-rc.11" } -rustls = { version = "0.23.35" } +rustls = { version = "0.23.36", default-features = false, features = ["aws-lc-rs", "logging", "tls12", "prefer-post-quantum", "std"] } rustls-pemfile = "2.2.0" rustls-pki-types = "1.13.2" sha1 = "0.11.0-rc.3" @@ -245,7 +245,7 @@ tracing-error = "0.2.1" tracing-opentelemetry = "0.32.0" tracing-subscriber = { version = "0.3.22", features = ["env-filter", "time"] } transform-stream = "0.3.1" -url = "2.5.7" +url = "2.5.8" urlencoding = "2.1.3" uuid = { version = "1.19.0", features = ["v4", "fast-rng", "macro-diagnostics"] } vaultrs = { version = "0.7.4" } diff --git a/Dockerfile b/Dockerfile index 1f303ae6..8f22a10c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM alpine:3.22 AS build +FROM alpine:3.23 AS build ARG TARGETARCH ARG RELEASE=latest @@ -40,7 +40,7 @@ RUN set -eux; \ rm -rf rustfs.zip /build/.tmp || true -FROM alpine:3.22 +FROM alpine:3.23 ARG RELEASE=latest ARG BUILD_DATE diff --git a/Dockerfile.source b/Dockerfile.source index 6cea2782..280d606d 100644 --- a/Dockerfile.source +++ b/Dockerfile.source @@ -16,7 +16,7 @@ ARG BUILDPLATFORM # ----------------------------- # Build stage # ----------------------------- -FROM rust:1.88-bookworm AS builder +FROM rust:1.91-trixie AS builder # Re-declare args after FROM ARG TARGETPLATFORM diff --git a/crates/audit/src/registry.rs b/crates/audit/src/registry.rs index 76edcabf..e3620b66 100644 --- a/crates/audit/src/registry.rs +++ b/crates/audit/src/registry.rs @@ -21,6 +21,7 @@ use futures::stream::FuturesUnordered; use hashbrown::{HashMap, HashSet}; use rustfs_config::{DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX, EnableState, audit::AUDIT_ROUTE_PREFIX}; use rustfs_ecstore::config::{Config, KVS}; +use rustfs_targets::arn::TargetID; use rustfs_targets::{Target, TargetError, target::ChannelTargetType}; use std::str::FromStr; use std::sync::Arc; @@ -392,4 +393,80 @@ impl AuditRegistry { Ok(()) } + + /// Creates a unique key for a target based on its type and ID + /// + /// # Arguments + /// * `target_type` - The type of the target (e.g., "webhook", "mqtt"). + /// * `target_id` - The identifier for the target instance. + /// + /// # Returns + /// * `String` - The unique key for the target. + pub fn create_key(&self, target_type: &str, target_id: &str) -> String { + let key = TargetID::new(target_id.to_string(), target_type.to_string()); + info!(target_type = %target_type, "Create key for {}", key); + key.to_string() + } + + /// Enables a target (placeholder, assumes target exists) + /// + /// # Arguments + /// * `target_type` - The type of the target (e.g., "webhook", "mqtt"). + /// * `target_id` - The identifier for the target instance. + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure. + pub fn enable_target(&self, target_type: &str, target_id: &str) -> AuditResult<()> { + let key = self.create_key(target_type, target_id); + if self.get_target(&key).is_some() { + info!("Target {}-{} enabled", target_type, target_id); + Ok(()) + } else { + Err(AuditError::Configuration( + format!("Target not found: {}-{}", target_type, target_id), + None, + )) + } + } + + /// Disables a target (placeholder, assumes target exists) + /// + /// # Arguments + /// * `target_type` - The type of the target (e.g., "webhook", "mqtt"). + /// * `target_id` - The identifier for the target instance. + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure. + pub fn disable_target(&self, target_type: &str, target_id: &str) -> AuditResult<()> { + let key = self.create_key(target_type, target_id); + if self.get_target(&key).is_some() { + info!("Target {}-{} disabled", target_type, target_id); + Ok(()) + } else { + Err(AuditError::Configuration( + format!("Target not found: {}-{}", target_type, target_id), + None, + )) + } + } + + /// Upserts a target into the registry + /// + /// # Arguments + /// * `target_type` - The type of the target (e.g., "webhook", "mqtt"). + /// * `target_id` - The identifier for the target instance. + /// * `target` - The target instance to be upserted. + /// + /// # Returns + /// * `AuditResult<()>` - Result indicating success or failure. + pub fn upsert_target( + &mut self, + target_type: &str, + target_id: &str, + target: Box + Send + Sync>, + ) -> AuditResult<()> { + let key = self.create_key(target_type, target_id); + self.targets.insert(key, target); + Ok(()) + } } diff --git a/crates/audit/src/system.rs b/crates/audit/src/system.rs index 0441f280..d9116b65 100644 --- a/crates/audit/src/system.rs +++ b/crates/audit/src/system.rs @@ -274,9 +274,9 @@ impl AuditSystem { drop(state); let registry = self.registry.lock().await; - let target_ids = registry.list_targets(); + let target_keys = registry.list_targets(); - if target_ids.is_empty() { + if target_keys.is_empty() { warn!("No audit targets configured for dispatch"); return Ok(()); } @@ -284,22 +284,22 @@ impl AuditSystem { // Dispatch to all targets concurrently let mut tasks = Vec::new(); - for target_id in target_ids { - if let Some(target) = registry.get_target(&target_id) { + for target_key in target_keys { + if let Some(target) = registry.get_target(&target_key) { let entry_clone = Arc::clone(&entry); - let target_id_clone = target_id.clone(); + let target_key_clone = target_key.clone(); // Create EntityTarget for the audit log entry let entity_target = EntityTarget { object_name: entry.api.name.clone().unwrap_or_default(), bucket_name: entry.api.bucket.clone().unwrap_or_default(), - event_name: rustfs_targets::EventName::ObjectCreatedPut, // Default, should be derived from entry + event_name: entry.event, // Default, should be derived from entry data: (*entry_clone).clone(), }; let task = async move { let result = target.save(Arc::new(entity_target)).await; - (target_id_clone, result) + (target_key_clone, result) }; tasks.push(task); @@ -312,14 +312,14 @@ impl AuditSystem { let mut errors = Vec::new(); let mut success_count = 0; - for (target_id, result) in results { + for (target_key, result) in results { match result { Ok(_) => { success_count += 1; observability::record_target_success(); } Err(e) => { - error!(target_id = %target_id, error = %e, "Failed to dispatch audit log to target"); + error!(target_id = %target_key, error = %e, "Failed to dispatch audit log to target"); errors.push(e); observability::record_target_failure(); } @@ -360,18 +360,18 @@ impl AuditSystem { drop(state); let registry = self.registry.lock().await; - let target_ids = registry.list_targets(); + let target_keys = registry.list_targets(); - if target_ids.is_empty() { + if target_keys.is_empty() { warn!("No audit targets configured for batch dispatch"); return Ok(()); } let mut tasks = Vec::new(); - for target_id in target_ids { - if let Some(target) = registry.get_target(&target_id) { + for target_key in target_keys { + if let Some(target) = registry.get_target(&target_key) { let entries_clone: Vec<_> = entries.iter().map(Arc::clone).collect(); - let target_id_clone = target_id.clone(); + let target_key_clone = target_key.clone(); let task = async move { let mut success_count = 0; @@ -380,7 +380,7 @@ impl AuditSystem { let entity_target = EntityTarget { object_name: entry.api.name.clone().unwrap_or_default(), bucket_name: entry.api.bucket.clone().unwrap_or_default(), - event_name: rustfs_targets::EventName::ObjectCreatedPut, + event_name: entry.event, data: (*entry).clone(), }; match target.save(Arc::new(entity_target)).await { @@ -388,7 +388,7 @@ impl AuditSystem { Err(e) => errors.push(e), } } - (target_id_clone, success_count, errors) + (target_key_clone, success_count, errors) }; tasks.push(task); } @@ -418,6 +418,7 @@ impl AuditSystem { } /// Starts the audit stream processing for a target with batching and retry logic + /// /// # Arguments /// * `store` - The store from which to read audit entries /// * `target` - The target to which audit entries will be sent @@ -501,7 +502,7 @@ impl AuditSystem { /// Enables a specific target /// /// # Arguments - /// * `target_id` - The ID of the target to enable + /// * `target_id` - The ID of the target to enable, TargetID to string /// /// # Returns /// * `AuditResult<()>` - Result indicating success or failure @@ -520,7 +521,7 @@ impl AuditSystem { /// Disables a specific target /// /// # Arguments - /// * `target_id` - The ID of the target to disable + /// * `target_id` - The ID of the target to disable, TargetID to string /// /// # Returns /// * `AuditResult<()>` - Result indicating success or failure @@ -539,7 +540,7 @@ impl AuditSystem { /// Removes a target from the system /// /// # Arguments - /// * `target_id` - The ID of the target to remove + /// * `target_id` - The ID of the target to remove, TargetID to string /// /// # Returns /// * `AuditResult<()>` - Result indicating success or failure @@ -559,7 +560,7 @@ impl AuditSystem { /// Updates or inserts a target /// /// # Arguments - /// * `target_id` - The ID of the target to upsert + /// * `target_id` - The ID of the target to upsert, TargetID to string /// * `target` - The target instance to insert or update /// /// # Returns @@ -596,7 +597,7 @@ impl AuditSystem { /// Gets information about a specific target /// /// # Arguments - /// * `target_id` - The ID of the target to retrieve + /// * `target_id` - The ID of the target to retrieve, TargetID to string /// /// # Returns /// * `Option` - Target ID if found diff --git a/crates/targets/src/arn.rs b/crates/targets/src/arn.rs index a2ef3528..1853ad9b 100644 --- a/crates/targets/src/arn.rs +++ b/crates/targets/src/arn.rs @@ -37,11 +37,6 @@ impl TargetID { Self { id, name } } - /// Convert to string representation - pub fn to_id_string(&self) -> String { - format!("{}:{}", self.id, self.name) - } - /// Create an ARN pub fn to_arn(&self, region: &str) -> ARN { ARN { @@ -80,7 +75,7 @@ impl Serialize for TargetID { where S: Serializer, { - serializer.serialize_str(&self.to_id_string()) + serializer.serialize_str(&self.to_string()) } } @@ -130,7 +125,7 @@ impl ARN { if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() { return String::new(); } - format!("{}:{}:{}", ARN_PREFIX, self.region, self.target_id.to_id_string()) + format!("{}:{}:{}", ARN_PREFIX, self.region, self.target_id) } /// Parsing ARN from string diff --git a/rustfs/src/server/http.rs b/rustfs/src/server/http.rs index 33c0bf40..95debdcc 100644 --- a/rustfs/src/server/http.rs +++ b/rustfs/src/server/http.rs @@ -378,20 +378,11 @@ pub async fn start_http_server( // Enable TCP Keepalive to detect dead clients (e.g. power loss) // Idle: 10s, Interval: 5s, Retries: 3 - let ka = { - #[cfg(not(target_os = "openbsd"))] - let ka = TcpKeepalive::new() - .with_time(Duration::from_secs(10)) - .with_interval(Duration::from_secs(5)) - .with_retries(3); - - // On OpenBSD socket2 only supports configuring the initial - // TCP keepalive timeout; intervals and retries cannot be set. - #[cfg(target_os = "openbsd")] - let ka = TcpKeepalive::new().with_time(Duration::from_secs(10)); - - ka - }; + let mut ka = TcpKeepalive::new().with_time(Duration::from_secs(10)); + #[cfg(not(target_os = "openbsd"))] + { + ka = ka.with_interval(Duration::from_secs(5)).with_retries(3); + } if let Err(err) = socket_ref.set_tcp_keepalive(&ka) { warn!(?err, "Failed to set TCP_KEEPALIVE"); diff --git a/rustfs/src/storage/helper.rs b/rustfs/src/storage/helper.rs index cf732fd1..aa4f0094 100644 --- a/rustfs/src/storage/helper.rs +++ b/rustfs/src/storage/helper.rs @@ -162,9 +162,17 @@ impl OperationHelper { .build(); let mut final_builder = builder.api(api_details.clone()); + if let Ok(res) = result { + final_builder = final_builder.resp_header(extract_resp_elements(res)); + } if let Some(err) = error_msg { final_builder = final_builder.error(err); } + + if let Some(sk) = rustfs_credentials::get_global_access_key_opt() { + final_builder = final_builder.access_key(&sk); + } + self.audit_builder = Some(final_builder); self.api_builder = ApiDetailsBuilder(api_details); // Store final details for Drop use }