mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
Fix/fix improve for audit (#1418)
This commit is contained in:
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -8503,9 +8503,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.23.35"
|
||||
version = "0.23.36"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f"
|
||||
checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b"
|
||||
dependencies = [
|
||||
"aws-lc-rs",
|
||||
"log",
|
||||
@@ -8864,9 +8864,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.148"
|
||||
version = "1.0.149"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3084b546a1dd6289475996f182a22aba973866ea8e8b02c51d9f46b1336a22da"
|
||||
checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"memchr",
|
||||
@@ -10466,9 +10466,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
|
||||
|
||||
[[package]]
|
||||
name = "url"
|
||||
version = "2.5.7"
|
||||
version = "2.5.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b"
|
||||
checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed"
|
||||
dependencies = [
|
||||
"form_urlencoded",
|
||||
"idna",
|
||||
|
||||
@@ -50,7 +50,7 @@ resolver = "2"
|
||||
edition = "2024"
|
||||
license = "Apache-2.0"
|
||||
repository = "https://github.com/rustfs/rustfs"
|
||||
rust-version = "1.88"
|
||||
rust-version = "1.90"
|
||||
version = "0.0.5"
|
||||
homepage = "https://rustfs.com"
|
||||
description = "RustFS is a high-performance distributed object storage software built using Rust, one of the most popular languages worldwide. "
|
||||
@@ -136,7 +136,7 @@ rmcp = { version = "0.12.0" }
|
||||
rmp = { version = "0.8.15" }
|
||||
rmp-serde = { version = "1.3.1" }
|
||||
serde = { version = "1.0.228", features = ["derive"] }
|
||||
serde_json = { version = "1.0.148", features = ["raw_value"] }
|
||||
serde_json = { version = "1.0.149", features = ["raw_value"] }
|
||||
serde_urlencoded = "0.7.1"
|
||||
schemars = "1.2.0"
|
||||
|
||||
@@ -150,7 +150,7 @@ hmac = { version = "0.13.0-rc.3" }
|
||||
jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
|
||||
pbkdf2 = "0.13.0-rc.5"
|
||||
rsa = { version = "0.10.0-rc.11" }
|
||||
rustls = { version = "0.23.35" }
|
||||
rustls = { version = "0.23.36", default-features = false, features = ["aws-lc-rs", "logging", "tls12", "prefer-post-quantum", "std"] }
|
||||
rustls-pemfile = "2.2.0"
|
||||
rustls-pki-types = "1.13.2"
|
||||
sha1 = "0.11.0-rc.3"
|
||||
@@ -245,7 +245,7 @@ tracing-error = "0.2.1"
|
||||
tracing-opentelemetry = "0.32.0"
|
||||
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "time"] }
|
||||
transform-stream = "0.3.1"
|
||||
url = "2.5.7"
|
||||
url = "2.5.8"
|
||||
urlencoding = "2.1.3"
|
||||
uuid = { version = "1.19.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
|
||||
vaultrs = { version = "0.7.4" }
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM alpine:3.22 AS build
|
||||
FROM alpine:3.23 AS build
|
||||
|
||||
ARG TARGETARCH
|
||||
ARG RELEASE=latest
|
||||
@@ -40,7 +40,7 @@ RUN set -eux; \
|
||||
rm -rf rustfs.zip /build/.tmp || true
|
||||
|
||||
|
||||
FROM alpine:3.22
|
||||
FROM alpine:3.23
|
||||
|
||||
ARG RELEASE=latest
|
||||
ARG BUILD_DATE
|
||||
|
||||
@@ -16,7 +16,7 @@ ARG BUILDPLATFORM
|
||||
# -----------------------------
|
||||
# Build stage
|
||||
# -----------------------------
|
||||
FROM rust:1.88-bookworm AS builder
|
||||
FROM rust:1.91-trixie AS builder
|
||||
|
||||
# Re-declare args after FROM
|
||||
ARG TARGETPLATFORM
|
||||
|
||||
@@ -21,6 +21,7 @@ use futures::stream::FuturesUnordered;
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use rustfs_config::{DEFAULT_DELIMITER, ENABLE_KEY, ENV_PREFIX, EnableState, audit::AUDIT_ROUTE_PREFIX};
|
||||
use rustfs_ecstore::config::{Config, KVS};
|
||||
use rustfs_targets::arn::TargetID;
|
||||
use rustfs_targets::{Target, TargetError, target::ChannelTargetType};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
@@ -392,4 +393,80 @@ impl AuditRegistry {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates a unique key for a target based on its type and ID
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
|
||||
/// * `target_id` - The identifier for the target instance.
|
||||
///
|
||||
/// # Returns
|
||||
/// * `String` - The unique key for the target.
|
||||
pub fn create_key(&self, target_type: &str, target_id: &str) -> String {
|
||||
let key = TargetID::new(target_id.to_string(), target_type.to_string());
|
||||
info!(target_type = %target_type, "Create key for {}", key);
|
||||
key.to_string()
|
||||
}
|
||||
|
||||
/// Enables a target (placeholder, assumes target exists)
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
|
||||
/// * `target_id` - The identifier for the target instance.
|
||||
///
|
||||
/// # Returns
|
||||
/// * `AuditResult<()>` - Result indicating success or failure.
|
||||
pub fn enable_target(&self, target_type: &str, target_id: &str) -> AuditResult<()> {
|
||||
let key = self.create_key(target_type, target_id);
|
||||
if self.get_target(&key).is_some() {
|
||||
info!("Target {}-{} enabled", target_type, target_id);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(AuditError::Configuration(
|
||||
format!("Target not found: {}-{}", target_type, target_id),
|
||||
None,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Disables a target (placeholder, assumes target exists)
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
|
||||
/// * `target_id` - The identifier for the target instance.
|
||||
///
|
||||
/// # Returns
|
||||
/// * `AuditResult<()>` - Result indicating success or failure.
|
||||
pub fn disable_target(&self, target_type: &str, target_id: &str) -> AuditResult<()> {
|
||||
let key = self.create_key(target_type, target_id);
|
||||
if self.get_target(&key).is_some() {
|
||||
info!("Target {}-{} disabled", target_type, target_id);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(AuditError::Configuration(
|
||||
format!("Target not found: {}-{}", target_type, target_id),
|
||||
None,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Upserts a target into the registry
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `target_type` - The type of the target (e.g., "webhook", "mqtt").
|
||||
/// * `target_id` - The identifier for the target instance.
|
||||
/// * `target` - The target instance to be upserted.
|
||||
///
|
||||
/// # Returns
|
||||
/// * `AuditResult<()>` - Result indicating success or failure.
|
||||
pub fn upsert_target(
|
||||
&mut self,
|
||||
target_type: &str,
|
||||
target_id: &str,
|
||||
target: Box<dyn Target<AuditEntry> + Send + Sync>,
|
||||
) -> AuditResult<()> {
|
||||
let key = self.create_key(target_type, target_id);
|
||||
self.targets.insert(key, target);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -274,9 +274,9 @@ impl AuditSystem {
|
||||
drop(state);
|
||||
|
||||
let registry = self.registry.lock().await;
|
||||
let target_ids = registry.list_targets();
|
||||
let target_keys = registry.list_targets();
|
||||
|
||||
if target_ids.is_empty() {
|
||||
if target_keys.is_empty() {
|
||||
warn!("No audit targets configured for dispatch");
|
||||
return Ok(());
|
||||
}
|
||||
@@ -284,22 +284,22 @@ impl AuditSystem {
|
||||
// Dispatch to all targets concurrently
|
||||
let mut tasks = Vec::new();
|
||||
|
||||
for target_id in target_ids {
|
||||
if let Some(target) = registry.get_target(&target_id) {
|
||||
for target_key in target_keys {
|
||||
if let Some(target) = registry.get_target(&target_key) {
|
||||
let entry_clone = Arc::clone(&entry);
|
||||
let target_id_clone = target_id.clone();
|
||||
let target_key_clone = target_key.clone();
|
||||
|
||||
// Create EntityTarget for the audit log entry
|
||||
let entity_target = EntityTarget {
|
||||
object_name: entry.api.name.clone().unwrap_or_default(),
|
||||
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
|
||||
event_name: rustfs_targets::EventName::ObjectCreatedPut, // Default, should be derived from entry
|
||||
event_name: entry.event, // Default, should be derived from entry
|
||||
data: (*entry_clone).clone(),
|
||||
};
|
||||
|
||||
let task = async move {
|
||||
let result = target.save(Arc::new(entity_target)).await;
|
||||
(target_id_clone, result)
|
||||
(target_key_clone, result)
|
||||
};
|
||||
|
||||
tasks.push(task);
|
||||
@@ -312,14 +312,14 @@ impl AuditSystem {
|
||||
let mut errors = Vec::new();
|
||||
let mut success_count = 0;
|
||||
|
||||
for (target_id, result) in results {
|
||||
for (target_key, result) in results {
|
||||
match result {
|
||||
Ok(_) => {
|
||||
success_count += 1;
|
||||
observability::record_target_success();
|
||||
}
|
||||
Err(e) => {
|
||||
error!(target_id = %target_id, error = %e, "Failed to dispatch audit log to target");
|
||||
error!(target_id = %target_key, error = %e, "Failed to dispatch audit log to target");
|
||||
errors.push(e);
|
||||
observability::record_target_failure();
|
||||
}
|
||||
@@ -360,18 +360,18 @@ impl AuditSystem {
|
||||
drop(state);
|
||||
|
||||
let registry = self.registry.lock().await;
|
||||
let target_ids = registry.list_targets();
|
||||
let target_keys = registry.list_targets();
|
||||
|
||||
if target_ids.is_empty() {
|
||||
if target_keys.is_empty() {
|
||||
warn!("No audit targets configured for batch dispatch");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut tasks = Vec::new();
|
||||
for target_id in target_ids {
|
||||
if let Some(target) = registry.get_target(&target_id) {
|
||||
for target_key in target_keys {
|
||||
if let Some(target) = registry.get_target(&target_key) {
|
||||
let entries_clone: Vec<_> = entries.iter().map(Arc::clone).collect();
|
||||
let target_id_clone = target_id.clone();
|
||||
let target_key_clone = target_key.clone();
|
||||
|
||||
let task = async move {
|
||||
let mut success_count = 0;
|
||||
@@ -380,7 +380,7 @@ impl AuditSystem {
|
||||
let entity_target = EntityTarget {
|
||||
object_name: entry.api.name.clone().unwrap_or_default(),
|
||||
bucket_name: entry.api.bucket.clone().unwrap_or_default(),
|
||||
event_name: rustfs_targets::EventName::ObjectCreatedPut,
|
||||
event_name: entry.event,
|
||||
data: (*entry).clone(),
|
||||
};
|
||||
match target.save(Arc::new(entity_target)).await {
|
||||
@@ -388,7 +388,7 @@ impl AuditSystem {
|
||||
Err(e) => errors.push(e),
|
||||
}
|
||||
}
|
||||
(target_id_clone, success_count, errors)
|
||||
(target_key_clone, success_count, errors)
|
||||
};
|
||||
tasks.push(task);
|
||||
}
|
||||
@@ -418,6 +418,7 @@ impl AuditSystem {
|
||||
}
|
||||
|
||||
/// Starts the audit stream processing for a target with batching and retry logic
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `store` - The store from which to read audit entries
|
||||
/// * `target` - The target to which audit entries will be sent
|
||||
@@ -501,7 +502,7 @@ impl AuditSystem {
|
||||
/// Enables a specific target
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `target_id` - The ID of the target to enable
|
||||
/// * `target_id` - The ID of the target to enable, TargetID to string
|
||||
///
|
||||
/// # Returns
|
||||
/// * `AuditResult<()>` - Result indicating success or failure
|
||||
@@ -520,7 +521,7 @@ impl AuditSystem {
|
||||
/// Disables a specific target
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `target_id` - The ID of the target to disable
|
||||
/// * `target_id` - The ID of the target to disable, TargetID to string
|
||||
///
|
||||
/// # Returns
|
||||
/// * `AuditResult<()>` - Result indicating success or failure
|
||||
@@ -539,7 +540,7 @@ impl AuditSystem {
|
||||
/// Removes a target from the system
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `target_id` - The ID of the target to remove
|
||||
/// * `target_id` - The ID of the target to remove, TargetID to string
|
||||
///
|
||||
/// # Returns
|
||||
/// * `AuditResult<()>` - Result indicating success or failure
|
||||
@@ -559,7 +560,7 @@ impl AuditSystem {
|
||||
/// Updates or inserts a target
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `target_id` - The ID of the target to upsert
|
||||
/// * `target_id` - The ID of the target to upsert, TargetID to string
|
||||
/// * `target` - The target instance to insert or update
|
||||
///
|
||||
/// # Returns
|
||||
@@ -596,7 +597,7 @@ impl AuditSystem {
|
||||
/// Gets information about a specific target
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `target_id` - The ID of the target to retrieve
|
||||
/// * `target_id` - The ID of the target to retrieve, TargetID to string
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Option<String>` - Target ID if found
|
||||
|
||||
@@ -37,11 +37,6 @@ impl TargetID {
|
||||
Self { id, name }
|
||||
}
|
||||
|
||||
/// Convert to string representation
|
||||
pub fn to_id_string(&self) -> String {
|
||||
format!("{}:{}", self.id, self.name)
|
||||
}
|
||||
|
||||
/// Create an ARN
|
||||
pub fn to_arn(&self, region: &str) -> ARN {
|
||||
ARN {
|
||||
@@ -80,7 +75,7 @@ impl Serialize for TargetID {
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
serializer.serialize_str(&self.to_id_string())
|
||||
serializer.serialize_str(&self.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,7 +125,7 @@ impl ARN {
|
||||
if self.target_id.id.is_empty() && self.target_id.name.is_empty() && self.region.is_empty() {
|
||||
return String::new();
|
||||
}
|
||||
format!("{}:{}:{}", ARN_PREFIX, self.region, self.target_id.to_id_string())
|
||||
format!("{}:{}:{}", ARN_PREFIX, self.region, self.target_id)
|
||||
}
|
||||
|
||||
/// Parsing ARN from string
|
||||
|
||||
@@ -378,20 +378,11 @@ pub async fn start_http_server(
|
||||
|
||||
// Enable TCP Keepalive to detect dead clients (e.g. power loss)
|
||||
// Idle: 10s, Interval: 5s, Retries: 3
|
||||
let ka = {
|
||||
#[cfg(not(target_os = "openbsd"))]
|
||||
let ka = TcpKeepalive::new()
|
||||
.with_time(Duration::from_secs(10))
|
||||
.with_interval(Duration::from_secs(5))
|
||||
.with_retries(3);
|
||||
|
||||
// On OpenBSD socket2 only supports configuring the initial
|
||||
// TCP keepalive timeout; intervals and retries cannot be set.
|
||||
#[cfg(target_os = "openbsd")]
|
||||
let ka = TcpKeepalive::new().with_time(Duration::from_secs(10));
|
||||
|
||||
ka
|
||||
};
|
||||
let mut ka = TcpKeepalive::new().with_time(Duration::from_secs(10));
|
||||
#[cfg(not(target_os = "openbsd"))]
|
||||
{
|
||||
ka = ka.with_interval(Duration::from_secs(5)).with_retries(3);
|
||||
}
|
||||
|
||||
if let Err(err) = socket_ref.set_tcp_keepalive(&ka) {
|
||||
warn!(?err, "Failed to set TCP_KEEPALIVE");
|
||||
|
||||
@@ -162,9 +162,17 @@ impl OperationHelper {
|
||||
.build();
|
||||
|
||||
let mut final_builder = builder.api(api_details.clone());
|
||||
if let Ok(res) = result {
|
||||
final_builder = final_builder.resp_header(extract_resp_elements(res));
|
||||
}
|
||||
if let Some(err) = error_msg {
|
||||
final_builder = final_builder.error(err);
|
||||
}
|
||||
|
||||
if let Some(sk) = rustfs_credentials::get_global_access_key_opt() {
|
||||
final_builder = final_builder.access_key(&sk);
|
||||
}
|
||||
|
||||
self.audit_builder = Some(final_builder);
|
||||
self.api_builder = ApiDetailsBuilder(api_details); // Store final details for Drop use
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user