Compare commits

...

2 Commits

Author SHA1 Message Date
houseme
f7e188eee7 feat: upgrade datafusion to v50.0.0 and update related dependencies f… (#563)
* feat: upgrade datafusion to v50.0.0 and update related dependencies for compatibility

* fix

* fmt
2025-09-18 23:30:25 +08:00
houseme
4b9cb512f2 remove crate rustfs-audit-logger (#562) 2025-09-18 17:46:46 +08:00
22 changed files with 617 additions and 1827 deletions

865
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -16,7 +16,6 @@
members = [
"rustfs", # Core file system implementation
"crates/appauth", # Application authentication and authorization
"crates/audit-logger", # Audit logging system for file operations
"crates/common", # Shared utilities and data structures
"crates/config", # Configuration management
"crates/crypto", # Cryptography and security features
@@ -64,7 +63,6 @@ all = "warn"
rustfs-ahm = { path = "crates/ahm", version = "0.0.5" }
rustfs-s3select-api = { path = "crates/s3select-api", version = "0.0.5" }
rustfs-appauth = { path = "crates/appauth", version = "0.0.5" }
rustfs-audit-logger = { path = "crates/audit-logger", version = "0.0.5" }
rustfs-common = { path = "crates/common", version = "0.0.5" }
rustfs-crypto = { path = "crates/crypto", version = "0.0.5" }
rustfs-ecstore = { path = "crates/ecstore", version = "0.0.5" }
@@ -98,7 +96,7 @@ async-trait = "0.1.89"
async-compression = { version = "0.4.19" }
atomic_enum = "0.3.0"
aws-config = { version = "1.8.6" }
aws-sdk-s3 = "1.101.0"
aws-sdk-s3 = "1.106.0"
axum = "0.8.4"
axum-extra = "0.10.1"
axum-server = "0.7.2"
@@ -106,10 +104,10 @@ base64-simd = "0.8.0"
base64 = "0.22.1"
brotli = "8.0.2"
bytes = { version = "1.10.1", features = ["serde"] }
bytesize = "2.0.1"
bytesize = "2.1.0"
byteorder = "1.5.0"
cfg-if = "1.0.3"
crc-fast = "1.5.0"
crc-fast = "1.3.0"
chacha20poly1305 = { version = "0.10.1" }
chrono = { version = "0.4.42", features = ["serde"] }
clap = { version = "4.5.47", features = ["derive", "env"] }
@@ -118,7 +116,7 @@ crc32fast = "1.5.0"
criterion = { version = "0.7", features = ["html_reports"] }
crossbeam-queue = "0.3.12"
dashmap = "6.1.0"
datafusion = "46.0.1"
datafusion = "50.0.0"
derive_builder = "0.20.2"
enumset = "1.1.10"
flatbuffers = "25.2.10"
@@ -134,7 +132,7 @@ highway = { version = "1.3.0" }
hickory-resolver = { version = "0.25.2", features = ["tls-ring"] }
hmac = "0.12.1"
hyper = "1.7.0"
hyper-util = { version = "0.1.16", features = [
hyper-util = { version = "0.1.17", features = [
"tokio",
"server-auto",
"server-graceful",
@@ -158,7 +156,7 @@ nix = { version = "0.30.1", features = ["fs"] }
nu-ansi-term = "0.50.1"
num_cpus = { version = "1.17.0" }
nvml-wrapper = "0.11.0"
object_store = "0.11.2"
object_store = "0.12.3"
once_cell = "1.21.3"
opentelemetry = { version = "0.30.0" }
opentelemetry-appender-tracing = { version = "0.30.1", features = [
@@ -209,7 +207,7 @@ rustls-pki-types = "1.12.0"
rustls-pemfile = "2.2.0"
s3s = { version = "0.12.0-minio-preview.3" }
schemars = "1.0.4"
serde = { version = "1.0.223", features = ["derive"] }
serde = { version = "1.0.225", features = ["derive"] }
serde_json = { version = "1.0.145", features = ["raw_value"] }
serde_urlencoded = "0.7.1"
serial_test = "3.2.0"
@@ -224,7 +222,7 @@ snap = "1.1.1"
socket2 = "0.6.0"
strum = { version = "0.27.2", features = ["derive"] }
sysinfo = "0.37.0"
sysctl = "0.6.0"
sysctl = "0.7.1"
tempfile = "3.22.0"
temp-env = "0.3.6"
test-case = "3.3.1"
@@ -237,7 +235,7 @@ time = { version = "0.3.43", features = [
"serde",
] }
tokio = { version = "1.47.1", features = ["fs", "rt-multi-thread"] }
tokio-rustls = { version = "0.26.2", default-features = false }
tokio-rustls = { version = "0.26.3", default-features = false }
tokio-stream = { version = "0.1.17" }
tokio-tar = "0.3.1"
tokio-test = "0.4.4"
@@ -260,14 +258,14 @@ uuid = { version = "1.18.1", features = [
"fast-rng",
"macro-diagnostics",
] }
wildmatch = { version = "2.4.0", features = ["serde"] }
wildmatch = { version = "2.5.0", features = ["serde"] }
winapi = { version = "0.3.9" }
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
zip = "2.4.2"
zip = "5.1.1"
zstd = "0.13.3"
[workspace.metadata.cargo-shear]
ignored = ["rustfs", "rust-i18n", "rustfs-mcp", "rustfs-audit-logger", "tokio-test"]
ignored = ["rustfs", "rust-i18n", "rustfs-mcp", "tokio-test"]
[profile.wasm-dev]
inherits = "dev"

View File

@@ -1,44 +0,0 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "rustfs-audit-logger"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
homepage.workspace = true
description = "Audit logging system for RustFS, providing detailed logging of file operations and system events."
documentation = "https://docs.rs/audit-logger/latest/audit_logger/"
keywords = ["audit", "logging", "file-operations", "system-events", "RustFS"]
categories = ["web-programming", "development-tools::profiling", "asynchronous", "api-bindings", "development-tools::debugging"]
[dependencies]
rustfs-targets = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true, features = ["std", "attributes"] }
tracing-core = { workspace = true }
tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread", "rt", "time", "macros"] }
url = { workspace = true }
uuid = { workspace = true }
thiserror = { workspace = true }
figment = { version = "0.10", features = ["json", "env"] }
[lints]
workspace = true

View File

@@ -1,34 +0,0 @@
{
"console": {
"enabled": true
},
"logger_webhook": {
"default": {
"enabled": true,
"endpoint": "http://localhost:3000/logs",
"auth_token": "secret-token-for-logs",
"batch_size": 5,
"queue_size": 1000,
"max_retry": 3,
"retry_interval": "2s"
}
},
"audit_webhook": {
"splunk": {
"enabled": true,
"endpoint": "http://localhost:3000/audit",
"auth_token": "secret-token-for-audit",
"batch_size": 10
}
},
"audit_kafka": {
"default": {
"enabled": false,
"brokers": [
"kafka1:9092",
"kafka2:9092"
],
"topic": "minio-audit-events"
}
}
}

View File

@@ -1,17 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
fn main() {
println!("Audit Logger Example");
}

View File

@@ -1,90 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use crate::entry::ObjectVersion;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Args - defines the arguments for API operations
/// Args is used to define the arguments for API operations.
///
/// # Example
/// ```
/// use rustfs_audit_logger::Args;
/// use std::collections::HashMap;
///
/// let args = Args::new()
/// .set_bucket(Some("my-bucket".to_string()))
/// .set_object(Some("my-object".to_string()))
/// .set_version_id(Some("123".to_string()))
/// .set_metadata(Some(HashMap::new()));
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, Default, Eq, PartialEq)]
pub struct Args {
#[serde(rename = "bucket", skip_serializing_if = "Option::is_none")]
pub bucket: Option<String>,
#[serde(rename = "object", skip_serializing_if = "Option::is_none")]
pub object: Option<String>,
#[serde(rename = "versionId", skip_serializing_if = "Option::is_none")]
pub version_id: Option<String>,
#[serde(rename = "objects", skip_serializing_if = "Option::is_none")]
pub objects: Option<Vec<ObjectVersion>>,
#[serde(rename = "metadata", skip_serializing_if = "Option::is_none")]
pub metadata: Option<HashMap<String, String>>,
}
impl Args {
/// Create a new Args object
pub fn new() -> Self {
Args {
bucket: None,
object: None,
version_id: None,
objects: None,
metadata: None,
}
}
/// Set the bucket
pub fn set_bucket(mut self, bucket: Option<String>) -> Self {
self.bucket = bucket;
self
}
/// Set the object
pub fn set_object(mut self, object: Option<String>) -> Self {
self.object = object;
self
}
/// Set the version ID
pub fn set_version_id(mut self, version_id: Option<String>) -> Self {
self.version_id = version_id;
self
}
/// Set the objects
pub fn set_objects(mut self, objects: Option<Vec<ObjectVersion>>) -> Self {
self.objects = objects;
self
}
/// Set the metadata
pub fn set_metadata(mut self, metadata: Option<HashMap<String, String>>) -> Self {
self.metadata = metadata;
self
}
}

View File

@@ -1,469 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use crate::{BaseLogEntry, LogRecord, ObjectVersion};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// API details structure
/// ApiDetails is used to define the details of an API operation
///
/// The `ApiDetails` structure contains the following fields:
/// - `name` - the name of the API operation
/// - `bucket` - the bucket name
/// - `object` - the object name
/// - `objects` - the list of objects
/// - `status` - the status of the API operation
/// - `status_code` - the status code of the API operation
/// - `input_bytes` - the input bytes
/// - `output_bytes` - the output bytes
/// - `header_bytes` - the header bytes
/// - `time_to_first_byte` - the time to first byte
/// - `time_to_first_byte_in_ns` - the time to first byte in nanoseconds
/// - `time_to_response` - the time to response
/// - `time_to_response_in_ns` - the time to response in nanoseconds
///
/// The `ApiDetails` structure contains the following methods:
/// - `new` - create a new `ApiDetails` with default values
/// - `set_name` - set the name
/// - `set_bucket` - set the bucket
/// - `set_object` - set the object
/// - `set_objects` - set the objects
/// - `set_status` - set the status
/// - `set_status_code` - set the status code
/// - `set_input_bytes` - set the input bytes
/// - `set_output_bytes` - set the output bytes
/// - `set_header_bytes` - set the header bytes
/// - `set_time_to_first_byte` - set the time to first byte
/// - `set_time_to_first_byte_in_ns` - set the time to first byte in nanoseconds
/// - `set_time_to_response` - set the time to response
/// - `set_time_to_response_in_ns` - set the time to response in nanoseconds
///
/// # Example
/// ```
/// use rustfs_audit_logger::ApiDetails;
/// use rustfs_audit_logger::ObjectVersion;
///
/// let api = ApiDetails::new()
/// .set_name(Some("GET".to_string()))
/// .set_bucket(Some("my-bucket".to_string()))
/// .set_object(Some("my-object".to_string()))
/// .set_objects(vec![ObjectVersion::new_with_object_name("my-object".to_string())])
/// .set_status(Some("OK".to_string()))
/// .set_status_code(Some(200))
/// .set_input_bytes(100)
/// .set_output_bytes(200)
/// .set_header_bytes(Some(50))
/// .set_time_to_first_byte(Some("100ms".to_string()))
/// .set_time_to_first_byte_in_ns(Some("100000000ns".to_string()))
/// .set_time_to_response(Some("200ms".to_string()))
/// .set_time_to_response_in_ns(Some("200000000ns".to_string()));
/// ```
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
pub struct ApiDetails {
#[serde(rename = "name", skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(rename = "bucket", skip_serializing_if = "Option::is_none")]
pub bucket: Option<String>,
#[serde(rename = "object", skip_serializing_if = "Option::is_none")]
pub object: Option<String>,
#[serde(rename = "objects", skip_serializing_if = "Vec::is_empty", default)]
pub objects: Vec<ObjectVersion>,
#[serde(rename = "status", skip_serializing_if = "Option::is_none")]
pub status: Option<String>,
#[serde(rename = "statusCode", skip_serializing_if = "Option::is_none")]
pub status_code: Option<i32>,
#[serde(rename = "rx")]
pub input_bytes: i64,
#[serde(rename = "tx")]
pub output_bytes: i64,
#[serde(rename = "txHeaders", skip_serializing_if = "Option::is_none")]
pub header_bytes: Option<i64>,
#[serde(rename = "timeToFirstByte", skip_serializing_if = "Option::is_none")]
pub time_to_first_byte: Option<String>,
#[serde(rename = "timeToFirstByteInNS", skip_serializing_if = "Option::is_none")]
pub time_to_first_byte_in_ns: Option<String>,
#[serde(rename = "timeToResponse", skip_serializing_if = "Option::is_none")]
pub time_to_response: Option<String>,
#[serde(rename = "timeToResponseInNS", skip_serializing_if = "Option::is_none")]
pub time_to_response_in_ns: Option<String>,
}
impl ApiDetails {
/// Create a new `ApiDetails` with default values
pub fn new() -> Self {
ApiDetails {
name: None,
bucket: None,
object: None,
objects: Vec::new(),
status: None,
status_code: None,
input_bytes: 0,
output_bytes: 0,
header_bytes: None,
time_to_first_byte: None,
time_to_first_byte_in_ns: None,
time_to_response: None,
time_to_response_in_ns: None,
}
}
/// Set the name
pub fn set_name(mut self, name: Option<String>) -> Self {
self.name = name;
self
}
/// Set the bucket
pub fn set_bucket(mut self, bucket: Option<String>) -> Self {
self.bucket = bucket;
self
}
/// Set the object
pub fn set_object(mut self, object: Option<String>) -> Self {
self.object = object;
self
}
/// Set the objects
pub fn set_objects(mut self, objects: Vec<ObjectVersion>) -> Self {
self.objects = objects;
self
}
/// Set the status
pub fn set_status(mut self, status: Option<String>) -> Self {
self.status = status;
self
}
/// Set the status code
pub fn set_status_code(mut self, status_code: Option<i32>) -> Self {
self.status_code = status_code;
self
}
/// Set the input bytes
pub fn set_input_bytes(mut self, input_bytes: i64) -> Self {
self.input_bytes = input_bytes;
self
}
/// Set the output bytes
pub fn set_output_bytes(mut self, output_bytes: i64) -> Self {
self.output_bytes = output_bytes;
self
}
/// Set the header bytes
pub fn set_header_bytes(mut self, header_bytes: Option<i64>) -> Self {
self.header_bytes = header_bytes;
self
}
/// Set the time to first byte
pub fn set_time_to_first_byte(mut self, time_to_first_byte: Option<String>) -> Self {
self.time_to_first_byte = time_to_first_byte;
self
}
/// Set the time to first byte in nanoseconds
pub fn set_time_to_first_byte_in_ns(mut self, time_to_first_byte_in_ns: Option<String>) -> Self {
self.time_to_first_byte_in_ns = time_to_first_byte_in_ns;
self
}
/// Set the time to response
pub fn set_time_to_response(mut self, time_to_response: Option<String>) -> Self {
self.time_to_response = time_to_response;
self
}
/// Set the time to response in nanoseconds
pub fn set_time_to_response_in_ns(mut self, time_to_response_in_ns: Option<String>) -> Self {
self.time_to_response_in_ns = time_to_response_in_ns;
self
}
}
/// Entry - audit entry logs
/// AuditLogEntry is used to define the structure of an audit log entry
///
/// The `AuditLogEntry` structure contains the following fields:
/// - `base` - the base log entry
/// - `version` - the version of the audit log entry
/// - `deployment_id` - the deployment ID
/// - `event` - the event
/// - `entry_type` - the type of audit message
/// - `api` - the API details
/// - `remote_host` - the remote host
/// - `user_agent` - the user agent
/// - `req_path` - the request path
/// - `req_host` - the request host
/// - `req_claims` - the request claims
/// - `req_query` - the request query
/// - `req_header` - the request header
/// - `resp_header` - the response header
/// - `access_key` - the access key
/// - `parent_user` - the parent user
/// - `error` - the error
///
/// The `AuditLogEntry` structure contains the following methods:
/// - `new` - create a new `AuditEntry` with default values
/// - `new_with_values` - create a new `AuditEntry` with version, time, event and api details
/// - `with_base` - set the base log entry
/// - `set_version` - set the version
/// - `set_deployment_id` - set the deployment ID
/// - `set_event` - set the event
/// - `set_entry_type` - set the entry type
/// - `set_api` - set the API details
/// - `set_remote_host` - set the remote host
/// - `set_user_agent` - set the user agent
/// - `set_req_path` - set the request path
/// - `set_req_host` - set the request host
/// - `set_req_claims` - set the request claims
/// - `set_req_query` - set the request query
/// - `set_req_header` - set the request header
/// - `set_resp_header` - set the response header
/// - `set_access_key` - set the access key
/// - `set_parent_user` - set the parent user
/// - `set_error` - set the error
///
/// # Example
/// ```
/// use rustfs_audit_logger::AuditLogEntry;
/// use rustfs_audit_logger::ApiDetails;
/// use std::collections::HashMap;
///
/// let entry = AuditLogEntry::new()
/// .set_version("1.0".to_string())
/// .set_deployment_id(Some("123".to_string()))
/// .set_event("event".to_string())
/// .set_entry_type(Some("type".to_string()))
/// .set_api(ApiDetails::new())
/// .set_remote_host(Some("remote-host".to_string()))
/// .set_user_agent(Some("user-agent".to_string()))
/// .set_req_path(Some("req-path".to_string()))
/// .set_req_host(Some("req-host".to_string()))
/// .set_req_claims(Some(HashMap::new()))
/// .set_req_query(Some(HashMap::new()))
/// .set_req_header(Some(HashMap::new()))
/// .set_resp_header(Some(HashMap::new()))
/// .set_access_key(Some("access-key".to_string()))
/// .set_parent_user(Some("parent-user".to_string()))
/// .set_error(Some("error".to_string()));
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
pub struct AuditLogEntry {
#[serde(flatten)]
pub base: BaseLogEntry,
pub version: String,
#[serde(rename = "deploymentid", skip_serializing_if = "Option::is_none")]
pub deployment_id: Option<String>,
pub event: String,
// Class of audit message - S3, admin ops, bucket management
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub entry_type: Option<String>,
pub api: ApiDetails,
#[serde(rename = "remotehost", skip_serializing_if = "Option::is_none")]
pub remote_host: Option<String>,
#[serde(rename = "userAgent", skip_serializing_if = "Option::is_none")]
pub user_agent: Option<String>,
#[serde(rename = "requestPath", skip_serializing_if = "Option::is_none")]
pub req_path: Option<String>,
#[serde(rename = "requestHost", skip_serializing_if = "Option::is_none")]
pub req_host: Option<String>,
#[serde(rename = "requestClaims", skip_serializing_if = "Option::is_none")]
pub req_claims: Option<HashMap<String, Value>>,
#[serde(rename = "requestQuery", skip_serializing_if = "Option::is_none")]
pub req_query: Option<HashMap<String, String>>,
#[serde(rename = "requestHeader", skip_serializing_if = "Option::is_none")]
pub req_header: Option<HashMap<String, String>>,
#[serde(rename = "responseHeader", skip_serializing_if = "Option::is_none")]
pub resp_header: Option<HashMap<String, String>>,
#[serde(rename = "accessKey", skip_serializing_if = "Option::is_none")]
pub access_key: Option<String>,
#[serde(rename = "parentUser", skip_serializing_if = "Option::is_none")]
pub parent_user: Option<String>,
#[serde(rename = "error", skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
impl AuditLogEntry {
/// Create a new `AuditEntry` with default values
pub fn new() -> Self {
AuditLogEntry {
base: BaseLogEntry::new(),
version: String::new(),
deployment_id: None,
event: String::new(),
entry_type: None,
api: ApiDetails::new(),
remote_host: None,
user_agent: None,
req_path: None,
req_host: None,
req_claims: None,
req_query: None,
req_header: None,
resp_header: None,
access_key: None,
parent_user: None,
error: None,
}
}
/// Create a new `AuditEntry` with version, time, event and api details
pub fn new_with_values(version: String, time: DateTime<Utc>, event: String, api: ApiDetails) -> Self {
let mut base = BaseLogEntry::new();
base.timestamp = time;
AuditLogEntry {
base,
version,
deployment_id: None,
event,
entry_type: None,
api,
remote_host: None,
user_agent: None,
req_path: None,
req_host: None,
req_claims: None,
req_query: None,
req_header: None,
resp_header: None,
access_key: None,
parent_user: None,
error: None,
}
}
/// Set the base log entry
pub fn with_base(mut self, base: BaseLogEntry) -> Self {
self.base = base;
self
}
/// Set the version
pub fn set_version(mut self, version: String) -> Self {
self.version = version;
self
}
/// Set the deployment ID
pub fn set_deployment_id(mut self, deployment_id: Option<String>) -> Self {
self.deployment_id = deployment_id;
self
}
/// Set the event
pub fn set_event(mut self, event: String) -> Self {
self.event = event;
self
}
/// Set the entry type
pub fn set_entry_type(mut self, entry_type: Option<String>) -> Self {
self.entry_type = entry_type;
self
}
/// Set the API details
pub fn set_api(mut self, api: ApiDetails) -> Self {
self.api = api;
self
}
/// Set the remote host
pub fn set_remote_host(mut self, remote_host: Option<String>) -> Self {
self.remote_host = remote_host;
self
}
/// Set the user agent
pub fn set_user_agent(mut self, user_agent: Option<String>) -> Self {
self.user_agent = user_agent;
self
}
/// Set the request path
pub fn set_req_path(mut self, req_path: Option<String>) -> Self {
self.req_path = req_path;
self
}
/// Set the request host
pub fn set_req_host(mut self, req_host: Option<String>) -> Self {
self.req_host = req_host;
self
}
/// Set the request claims
pub fn set_req_claims(mut self, req_claims: Option<HashMap<String, Value>>) -> Self {
self.req_claims = req_claims;
self
}
/// Set the request query
pub fn set_req_query(mut self, req_query: Option<HashMap<String, String>>) -> Self {
self.req_query = req_query;
self
}
/// Set the request header
pub fn set_req_header(mut self, req_header: Option<HashMap<String, String>>) -> Self {
self.req_header = req_header;
self
}
/// Set the response header
pub fn set_resp_header(mut self, resp_header: Option<HashMap<String, String>>) -> Self {
self.resp_header = resp_header;
self
}
/// Set the access key
pub fn set_access_key(mut self, access_key: Option<String>) -> Self {
self.access_key = access_key;
self
}
/// Set the parent user
pub fn set_parent_user(mut self, parent_user: Option<String>) -> Self {
self.parent_user = parent_user;
self
}
/// Set the error
pub fn set_error(mut self, error: Option<String>) -> Self {
self.error = error;
self
}
}
impl LogRecord for AuditLogEntry {
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
fn get_timestamp(&self) -> DateTime<Utc> {
self.base.timestamp
}
}

View File

@@ -1,108 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// Base log entry structure shared by all log types
/// This structure is used to serialize log entries to JSON
/// and send them to the log sinks
/// This structure is also used to deserialize log entries from JSON
/// This structure is also used to store log entries in the database
/// This structure is also used to query log entries from the database
///
/// The `BaseLogEntry` structure contains the following fields:
/// - `timestamp` - the timestamp of the log entry
/// - `request_id` - the request ID of the log entry
/// - `message` - the message of the log entry
/// - `tags` - the tags of the log entry
///
/// The `BaseLogEntry` structure contains the following methods:
/// - `new` - create a new `BaseLogEntry` with default values
/// - `message` - set the message
/// - `request_id` - set the request ID
/// - `tags` - set the tags
/// - `timestamp` - set the timestamp
///
/// # Example
/// ```
/// use rustfs_audit_logger::BaseLogEntry;
/// use chrono::{DateTime, Utc};
/// use std::collections::HashMap;
///
/// let timestamp = Utc::now();
/// let request = Some("req-123".to_string());
/// let message = Some("This is a log message".to_string());
/// let tags = Some(HashMap::new());
///
/// let entry = BaseLogEntry::new()
/// .timestamp(timestamp)
/// .request_id(request)
/// .message(message)
/// .tags(tags);
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct BaseLogEntry {
#[serde(rename = "time")]
pub timestamp: DateTime<Utc>,
#[serde(rename = "requestID", skip_serializing_if = "Option::is_none")]
pub request_id: Option<String>,
#[serde(rename = "message", skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(rename = "tags", skip_serializing_if = "Option::is_none")]
pub tags: Option<HashMap<String, Value>>,
}
impl BaseLogEntry {
/// Create a new BaseLogEntry with default values
pub fn new() -> Self {
BaseLogEntry {
timestamp: Utc::now(),
request_id: None,
message: None,
tags: None,
}
}
/// Set the message
pub fn message(mut self, message: Option<String>) -> Self {
self.message = message;
self
}
/// Set the request ID
pub fn request_id(mut self, request_id: Option<String>) -> Self {
self.request_id = request_id;
self
}
/// Set the tags
pub fn tags(mut self, tags: Option<HashMap<String, Value>>) -> Self {
self.tags = tags;
self
}
/// Set the timestamp
pub fn timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
self.timestamp = timestamp;
self
}
}

View File

@@ -1,159 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
pub(crate) mod args;
pub(crate) mod audit;
pub(crate) mod base;
pub(crate) mod unified;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tracing_core::Level;
/// ObjectVersion is used across multiple modules
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ObjectVersion {
#[serde(rename = "name")]
pub object_name: String,
#[serde(rename = "versionId", skip_serializing_if = "Option::is_none")]
pub version_id: Option<String>,
}
impl ObjectVersion {
/// Create a new ObjectVersion object
pub fn new() -> Self {
ObjectVersion {
object_name: String::new(),
version_id: None,
}
}
/// Create a new ObjectVersion with object name
pub fn new_with_object_name(object_name: String) -> Self {
ObjectVersion {
object_name,
version_id: None,
}
}
/// Set the object name
pub fn set_object_name(mut self, object_name: String) -> Self {
self.object_name = object_name;
self
}
/// Set the version ID
pub fn set_version_id(mut self, version_id: Option<String>) -> Self {
self.version_id = version_id;
self
}
}
impl Default for ObjectVersion {
fn default() -> Self {
Self::new()
}
}
/// Log kind/level enum
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub enum LogKind {
#[serde(rename = "INFO")]
#[default]
Info,
#[serde(rename = "WARNING")]
Warning,
#[serde(rename = "ERROR")]
Error,
#[serde(rename = "FATAL")]
Fatal,
}
/// Trait for types that can be serialized to JSON and have a timestamp
/// This trait is used by `ServerLogEntry` to convert the log entry to JSON
/// and get the timestamp of the log entry
/// This trait is implemented by `ServerLogEntry`
///
/// # Example
/// ```
/// use rustfs_audit_logger::LogRecord;
/// use chrono::{DateTime, Utc};
/// use rustfs_audit_logger::ServerLogEntry;
/// use tracing_core::Level;
///
/// let log_entry = ServerLogEntry::new(Level::INFO, "api_handler".to_string());
/// let json = log_entry.to_json();
/// let timestamp = log_entry.get_timestamp();
/// ```
pub trait LogRecord {
fn to_json(&self) -> String;
fn get_timestamp(&self) -> chrono::DateTime<chrono::Utc>;
}
/// Wrapper for `tracing_core::Level` to implement `Serialize` and `Deserialize`
/// for `ServerLogEntry`
/// This is necessary because `tracing_core::Level` does not implement `Serialize`
/// and `Deserialize`
/// This is a workaround to allow `ServerLogEntry` to be serialized and deserialized
/// using `serde`
///
/// # Example
/// ```
/// use rustfs_audit_logger::SerializableLevel;
/// use tracing_core::Level;
///
/// let level = Level::INFO;
/// let serializable_level = SerializableLevel::from(level);
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SerializableLevel(pub Level);
impl From<Level> for SerializableLevel {
fn from(level: Level) -> Self {
SerializableLevel(level)
}
}
impl From<SerializableLevel> for Level {
fn from(serializable_level: SerializableLevel) -> Self {
serializable_level.0
}
}
impl Serialize for SerializableLevel {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.0.as_str())
}
}
impl<'de> Deserialize<'de> for SerializableLevel {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
match s.as_str() {
"TRACE" => Ok(SerializableLevel(Level::TRACE)),
"DEBUG" => Ok(SerializableLevel(Level::DEBUG)),
"INFO" => Ok(SerializableLevel(Level::INFO)),
"WARN" => Ok(SerializableLevel(Level::WARN)),
"ERROR" => Ok(SerializableLevel(Level::ERROR)),
_ => Err(D::Error::custom("unknown log level")),
}
}
}

View File

@@ -1,266 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use crate::{AuditLogEntry, BaseLogEntry, LogKind, LogRecord, SerializableLevel};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tracing_core::Level;
/// Server log entry with structured fields
/// ServerLogEntry is used to log structured log entries from the server
///
/// The `ServerLogEntry` structure contains the following fields:
/// - `base` - the base log entry
/// - `level` - the log level
/// - `source` - the source of the log entry
/// - `user_id` - the user ID
/// - `fields` - the structured fields of the log entry
///
/// The `ServerLogEntry` structure contains the following methods:
/// - `new` - create a new `ServerLogEntry` with specified level and source
/// - `with_base` - set the base log entry
/// - `user_id` - set the user ID
/// - `fields` - set the fields
/// - `add_field` - add a field
///
/// # Example
/// ```
/// use rustfs_audit_logger::ServerLogEntry;
/// use tracing_core::Level;
///
/// let entry = ServerLogEntry::new(Level::INFO, "test_module".to_string())
/// .user_id(Some("user-456".to_string()))
/// .add_field("operation".to_string(), "login".to_string());
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ServerLogEntry {
#[serde(flatten)]
pub base: BaseLogEntry,
pub level: SerializableLevel,
pub source: String,
#[serde(rename = "userId", skip_serializing_if = "Option::is_none")]
pub user_id: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub fields: Vec<(String, String)>,
}
impl ServerLogEntry {
/// Create a new ServerLogEntry with specified level and source
pub fn new(level: Level, source: String) -> Self {
ServerLogEntry {
base: BaseLogEntry::new(),
level: SerializableLevel(level),
source,
user_id: None,
fields: Vec::new(),
}
}
/// Set the base log entry
pub fn with_base(mut self, base: BaseLogEntry) -> Self {
self.base = base;
self
}
/// Set the user ID
pub fn user_id(mut self, user_id: Option<String>) -> Self {
self.user_id = user_id;
self
}
/// Set fields
pub fn fields(mut self, fields: Vec<(String, String)>) -> Self {
self.fields = fields;
self
}
/// Add a field
pub fn add_field(mut self, key: String, value: String) -> Self {
self.fields.push((key, value));
self
}
}
impl LogRecord for ServerLogEntry {
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
fn get_timestamp(&self) -> DateTime<Utc> {
self.base.timestamp
}
}
/// Console log entry structure
/// ConsoleLogEntry is used to log console log entries
/// The `ConsoleLogEntry` structure contains the following fields:
/// - `base` - the base log entry
/// - `level` - the log level
/// - `console_msg` - the console message
/// - `node_name` - the node name
/// - `err` - the error message
///
/// The `ConsoleLogEntry` structure contains the following methods:
/// - `new` - create a new `ConsoleLogEntry`
/// - `new_with_console_msg` - create a new `ConsoleLogEntry` with console message and node name
/// - `with_base` - set the base log entry
/// - `set_level` - set the log level
/// - `set_node_name` - set the node name
/// - `set_console_msg` - set the console message
/// - `set_err` - set the error message
///
/// # Example
/// ```
/// use rustfs_audit_logger::ConsoleLogEntry;
///
/// let entry = ConsoleLogEntry::new_with_console_msg("Test message".to_string(), "node-123".to_string());
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsoleLogEntry {
#[serde(flatten)]
pub base: BaseLogEntry,
pub level: LogKind,
pub console_msg: String,
pub node_name: String,
#[serde(skip)]
pub err: Option<String>,
}
impl ConsoleLogEntry {
/// Create a new ConsoleLogEntry
pub fn new() -> Self {
ConsoleLogEntry {
base: BaseLogEntry::new(),
level: LogKind::Info,
console_msg: String::new(),
node_name: String::new(),
err: None,
}
}
/// Create a new ConsoleLogEntry with console message and node name
pub fn new_with_console_msg(console_msg: String, node_name: String) -> Self {
ConsoleLogEntry {
base: BaseLogEntry::new(),
level: LogKind::Info,
console_msg,
node_name,
err: None,
}
}
/// Set the base log entry
pub fn with_base(mut self, base: BaseLogEntry) -> Self {
self.base = base;
self
}
/// Set the log level
pub fn set_level(mut self, level: LogKind) -> Self {
self.level = level;
self
}
/// Set the node name
pub fn set_node_name(mut self, node_name: String) -> Self {
self.node_name = node_name;
self
}
/// Set the console message
pub fn set_console_msg(mut self, console_msg: String) -> Self {
self.console_msg = console_msg;
self
}
/// Set the error message
pub fn set_err(mut self, err: Option<String>) -> Self {
self.err = err;
self
}
}
impl Default for ConsoleLogEntry {
fn default() -> Self {
Self::new()
}
}
impl LogRecord for ConsoleLogEntry {
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
fn get_timestamp(&self) -> DateTime<Utc> {
self.base.timestamp
}
}
/// Unified log entry type
/// UnifiedLogEntry is used to log different types of log entries
///
/// The `UnifiedLogEntry` enum contains the following variants:
/// - `Server` - a server log entry
/// - `Audit` - an audit log entry
/// - `Console` - a console log entry
///
/// The `UnifiedLogEntry` enum contains the following methods:
/// - `to_json` - convert the log entry to JSON
/// - `get_timestamp` - get the timestamp of the log entry
///
/// # Example
/// ```
/// use rustfs_audit_logger::{UnifiedLogEntry, ServerLogEntry};
/// use tracing_core::Level;
///
/// let server_entry = ServerLogEntry::new(Level::INFO, "test_module".to_string());
/// let unified = UnifiedLogEntry::Server(server_entry);
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum UnifiedLogEntry {
#[serde(rename = "server")]
Server(ServerLogEntry),
#[serde(rename = "audit")]
Audit(Box<AuditLogEntry>),
#[serde(rename = "console")]
Console(ConsoleLogEntry),
}
impl LogRecord for UnifiedLogEntry {
fn to_json(&self) -> String {
match self {
UnifiedLogEntry::Server(entry) => entry.to_json(),
UnifiedLogEntry::Audit(entry) => entry.to_json(),
UnifiedLogEntry::Console(entry) => entry.to_json(),
}
}
fn get_timestamp(&self) -> DateTime<Utc> {
match self {
UnifiedLogEntry::Server(entry) => entry.get_timestamp(),
UnifiedLogEntry::Audit(entry) => entry.get_timestamp(),
UnifiedLogEntry::Console(entry) => entry.get_timestamp(),
}
}
}

View File

@@ -1,8 +0,0 @@
mod entry;
mod logger;
pub use entry::args::Args;
pub use entry::audit::{ApiDetails, AuditLogEntry};
pub use entry::base::BaseLogEntry;
pub use entry::unified::{ConsoleLogEntry, ServerLogEntry, UnifiedLogEntry};
pub use entry::{LogKind, LogRecord, ObjectVersion, SerializableLevel};

View File

@@ -1,29 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
// Default value function
fn default_batch_size() -> usize {
10
}
fn default_queue_size() -> usize {
10000
}
fn default_max_retry() -> u32 {
5
}
fn default_retry_interval() -> std::time::Duration {
std::time::Duration::from_secs(3)
}

View File

@@ -1,13 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

View File

@@ -1,108 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use chrono::{DateTime, Utc};
use serde::Serialize;
use std::collections::HashMap;
use uuid::Uuid;
///A Trait for a log entry that can be serialized and sent
pub trait Loggable: Serialize + Send + Sync + 'static {
fn to_json(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
}
/// Standard log entries
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct LogEntry {
pub deployment_id: String,
pub level: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub trace: Option<Trace>,
pub time: DateTime<Utc>,
pub request_id: String,
}
impl Loggable for LogEntry {}
/// Audit log entry
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct AuditEntry {
pub version: String,
pub deployment_id: String,
pub time: DateTime<Utc>,
pub trigger: String,
pub api: ApiDetails,
pub remote_host: String,
pub request_id: String,
pub user_agent: String,
pub access_key: String,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub tags: HashMap<String, String>,
}
impl Loggable for AuditEntry {}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Trace {
pub message: String,
pub source: Vec<String>,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub variables: HashMap<String, String>,
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ApiDetails {
pub name: String,
pub bucket: String,
pub object: String,
pub status: String,
pub status_code: u16,
pub time_to_first_byte: String,
pub time_to_response: String,
}
// Helper functions to create entries
impl AuditEntry {
pub fn new(api_name: &str, bucket: &str, object: &str) -> Self {
AuditEntry {
version: "1".to_string(),
deployment_id: "global-deployment-id".to_string(),
time: Utc::now(),
trigger: "incoming".to_string(),
api: ApiDetails {
name: api_name.to_string(),
bucket: bucket.to_string(),
object: object.to_string(),
status: "OK".to_string(),
status_code: 200,
time_to_first_byte: "10ms".to_string(),
time_to_response: "50ms".to_string(),
},
remote_host: "127.0.0.1".to_string(),
request_id: Uuid::new_v4().to_string(),
user_agent: "Rust-Client/1.0".to_string(),
access_key: "minioadmin".to_string(),
tags: HashMap::new(),
}
}
}

View File

@@ -1,13 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

View File

@@ -1,36 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
pub mod config;
pub mod dispatch;
pub mod entry;
pub mod factory;
use async_trait::async_trait;
use std::error::Error;
/// General Log Target Trait
#[async_trait]
pub trait Target: Send + Sync {
/// Send a single logizable entry
async fn send(&self, entry: Box<Self>) -> Result<(), Box<dyn Error + Send>>;
/// Returns the unique name of the target
fn name(&self) -> &str;
/// Close target gracefully, ensuring all buffered logs are processed
async fn shutdown(&self);
}

View File

@@ -173,6 +173,7 @@ impl PartialEq for Functions {
}
#[derive(Clone, Serialize, Deserialize)]
#[allow(dead_code)]
pub struct Value;
#[cfg(test)]

View File

@@ -19,19 +19,10 @@ use futures::pin_mut;
use futures::{Stream, StreamExt};
use futures_core::stream::BoxStream;
use http::HeaderMap;
use object_store::Attributes;
use object_store::GetOptions;
use object_store::GetResult;
use object_store::ListResult;
use object_store::MultipartUpload;
use object_store::ObjectMeta;
use object_store::ObjectStore;
use object_store::PutMultipartOpts;
use object_store::PutOptions;
use object_store::PutPayload;
use object_store::PutResult;
use object_store::path::Path;
use object_store::{Error as o_Error, Result};
use object_store::{
Attributes, Error as o_Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, path::Path,
};
use pin_project_lite::pin_project;
use rustfs_common::DEFAULT_DELIMITER;
use rustfs_ecstore::StorageAPI;
@@ -102,7 +93,7 @@ impl ObjectStore for EcObjectStore {
unimplemented!()
}
async fn put_multipart_opts(&self, _location: &Path, _opts: PutMultipartOpts) -> Result<Box<dyn MultipartUpload>> {
async fn put_multipart_opts(&self, _location: &Path, _opts: PutMultipartOptions) -> Result<Box<dyn MultipartUpload>> {
unimplemented!()
}
@@ -122,7 +113,7 @@ impl ObjectStore for EcObjectStore {
let meta = ObjectMeta {
location: location.clone(),
last_modified: Utc::now(),
size: reader.object_info.size as usize,
size: reader.object_info.size as u64,
e_tag: reader.object_info.etag,
version: None,
};
@@ -151,12 +142,12 @@ impl ObjectStore for EcObjectStore {
Ok(GetResult {
payload,
meta,
range: 0..reader.object_info.size as usize,
range: 0..reader.object_info.size as u64,
attributes,
})
}
async fn get_ranges(&self, _location: &Path, _ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
async fn get_ranges(&self, _location: &Path, _ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
unimplemented!()
}
@@ -175,7 +166,7 @@ impl ObjectStore for EcObjectStore {
Ok(ObjectMeta {
location: location.clone(),
last_modified: Utc::now(),
size: info.size as usize,
size: info.size as u64,
e_tag: info.etag,
version: None,
})
@@ -185,7 +176,7 @@ impl ObjectStore for EcObjectStore {
unimplemented!()
}
fn list(&self, _prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, _prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
unimplemented!()
}

View File

@@ -61,7 +61,7 @@ impl MetadataProvider {
Self {
provider,
current_session_table_provider,
config_options: session.inner().config_options().clone(),
config_options: session.inner().config_options().as_ref().clone(),
session,
func_manager,
}

View File

@@ -26,7 +26,6 @@ use datafusion::{
propagate_empty_relation::PropagateEmptyRelation, push_down_filter::PushDownFilter, push_down_limit::PushDownLimit,
replace_distinct_aggregate::ReplaceDistinctWithAggregate, scalar_subquery_to_join::ScalarSubqueryToJoin,
simplify_expressions::SimplifyExpressions, single_distinct_to_groupby::SingleDistinctToGroupBy,
unwrap_cast_in_comparison::UnwrapCastInComparison,
},
};
use rustfs_s3select_api::{
@@ -66,7 +65,6 @@ impl Default for DefaultLogicalOptimizer {
let rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> = vec![
// df default rules start
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(ReplaceDistinctWithAggregate::new()),
Arc::new(EliminateJoin::new()),
Arc::new(DecorrelatePredicateSubquery::new()),
@@ -91,7 +89,6 @@ impl Default for DefaultLogicalOptimizer {
// The previous optimizations added expressions and projections,
// that might benefit from the following rules
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(CommonSubexprEliminate::new()),
// PushDownProjection can pushdown Projections through Limits, do PushDownLimit again.
Arc::new(PushDownLimit::new()),

View File

@@ -51,7 +51,6 @@ rustfs-obs = { workspace = true }
rustfs-utils = { workspace = true, features = ["full"] }
rustfs-protos = { workspace = true }
rustfs-s3select-query = { workspace = true }
rustfs-audit-logger = { workspace = true }
rustfs-targets = { workspace = true }
atoi = { workspace = true }
atomic_enum = { workspace = true }

View File

@@ -143,7 +143,15 @@ async fn run(opt: config::Opt) -> Result<()> {
let server_port = server_addr.port();
let server_address = server_addr.to_string();
info!("server_address {}, ip:{}", &server_address, server_addr.ip());
info!(
target: "rustfs::main::run",
server_address = %server_address,
ip = %server_addr.ip(),
port = %server_port,
version = %version::get_version(),
"Starting RustFS server at {}",
&server_address
);
// Set up AK and SK
rustfs_ecstore::global::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone()));
@@ -162,6 +170,7 @@ async fn run(opt: config::Opt) -> Result<()> {
for (i, eps) in endpoint_pools.as_ref().iter().enumerate() {
info!(
target: "rustfs::main::run",
"Formatting {}st pool, {} set(s), {} drives per set.",
i + 1,
eps.set_count,
@@ -175,12 +184,20 @@ async fn run(opt: config::Opt) -> Result<()> {
for (i, eps) in endpoint_pools.as_ref().iter().enumerate() {
info!(
target: "rustfs::main::run",
id = i,
set_count = eps.set_count,
drives_per_set = eps.drives_per_set,
cmd = ?eps.cmd_line,
"created endpoints {}, set_count:{}, drives_per_set: {}, cmd: {:?}",
i, eps.set_count, eps.drives_per_set, eps.cmd_line
);
for ep in eps.endpoints.as_ref().iter() {
info!(" - {}", ep);
info!(
target: "rustfs::main::run",
" - endpoint: {}", ep
);
}
}
@@ -202,7 +219,12 @@ async fn run(opt: config::Opt) -> Result<()> {
external_port, server_port
);
}
info!("Using external address {} for endpoint access", external_addr);
info!(
target: "rustfs::main::run",
external_address = %external_addr,
external_port = %external_port,
"Using external address {} for endpoint access", external_addr
);
rustfs_ecstore::global::set_global_rustfs_external_port(external_port);
set_global_addr(&opt.external_address).await;
(external_addr.ip(), external_port)
@@ -279,7 +301,12 @@ async fn run(opt: config::Opt) -> Result<()> {
let enable_scanner = parse_bool_env_var("RUSTFS_ENABLE_SCANNER", true);
let enable_heal = parse_bool_env_var("RUSTFS_ENABLE_HEAL", true);
info!("Background services configuration: scanner={}, heal={}", enable_scanner, enable_heal);
info!(
target: "rustfs::main::run",
enable_scanner = enable_scanner,
enable_heal = enable_heal,
"Background services configuration: scanner={}, heal={}", enable_scanner, enable_heal
);
// Initialize heal manager and scanner based on environment variables
if enable_heal || enable_scanner {
@@ -289,11 +316,11 @@ async fn run(opt: config::Opt) -> Result<()> {
let heal_manager = init_heal_manager(heal_storage, None).await?;
if enable_scanner {
info!("Starting scanner with heal manager...");
info!(target: "rustfs::main::run","Starting scanner with heal manager...");
let scanner = Scanner::new(Some(ScannerConfig::default()), Some(heal_manager));
scanner.start().await?;
} else {
info!("Scanner disabled, but heal manager is initialized and available");
info!(target: "rustfs::main::run","Scanner disabled, but heal manager is initialized and available");
}
} else if enable_scanner {
info!("Starting scanner without heal manager...");
@@ -301,7 +328,7 @@ async fn run(opt: config::Opt) -> Result<()> {
scanner.start().await?;
}
} else {
info!("Both scanner and heal are disabled, skipping AHM service initialization");
info!(target: "rustfs::main::run","Both scanner and heal are disabled, skipping AHM service initialization");
}
// print server info
@@ -311,21 +338,6 @@ async fn run(opt: config::Opt) -> Result<()> {
init_update_check();
// if opt.console_enable {
// debug!("console is enabled");
// let console_address = opt.console_address.clone();
// let tls_path = opt.tls_path.clone();
//
// if console_address.is_empty() {
// error!("console_address is empty");
// return Err(Error::other("console_address is empty".to_string()));
// }
//
// tokio::spawn(async move {
// console::start_static_file_server(&console_address, tls_path).await;
// });
// }
// Perform hibernation for 1 second
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
// listen to the shutdown signal
@@ -340,7 +352,7 @@ async fn run(opt: config::Opt) -> Result<()> {
}
}
info!("server is stopped state: {:?}", state_manager.current_state());
info!(target: "rustfs::main::run","server is stopped state: {:?}", state_manager.current_state());
Ok(())
}
@@ -357,7 +369,10 @@ fn parse_bool_env_var(var_name: &str, default: bool) -> bool {
/// Handles the shutdown process of the server
async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) {
info!("Shutdown signal received in main thread");
info!(
target: "rustfs::main::handle_shutdown",
"Shutdown signal received in main thread"
);
// update the status to stopping first
state_manager.update(ServiceState::Stopping);
@@ -367,19 +382,31 @@ async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &toki
// Stop background services based on what was enabled
if enable_scanner || enable_heal {
info!("Stopping background services (data scanner and auto heal)...");
info!(
target: "rustfs::main::handle_shutdown",
"Stopping background services (data scanner and auto heal)..."
);
shutdown_background_services();
info!("Stopping AHM services...");
info!(
target: "rustfs::main::handle_shutdown",
"Stopping AHM services..."
);
shutdown_ahm_services();
} else {
info!("Background services were disabled, skipping AHM shutdown");
info!(
target: "rustfs::main::handle_shutdown",
"Background services were disabled, skipping AHM shutdown"
);
}
// Stop the notification system
shutdown_event_notifier().await;
info!("Server is stopping...");
info!(
target: "rustfs::main::handle_shutdown",
"Server is stopping..."
);
let _ = shutdown_tx.send(());
// Wait for the worker thread to complete the cleaning work
@@ -387,12 +414,18 @@ async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &toki
// the last updated status is stopped
state_manager.update(ServiceState::Stopped);
info!("Server stopped current ");
info!(
target: "rustfs::main::handle_shutdown",
"Server stopped current "
);
}
#[instrument]
async fn init_event_notifier() {
info!("Initializing event notifier...");
info!(
target: "rustfs::main::init_event_notifier",
"Initializing event notifier..."
);
// 1. Get the global configuration loaded by ecstore
let server_config = match GLOBAL_SERVER_CONFIG.get() {
@@ -403,7 +436,10 @@ async fn init_event_notifier() {
}
};
info!("Global server configuration loaded successfully");
info!(
target: "rustfs::main::init_event_notifier",
"Global server configuration loaded successfully"
);
// 2. Check if the notify subsystem exists in the configuration, and skip initialization if it doesn't
if server_config
.get_value(rustfs_config::notify::NOTIFY_MQTT_SUB_SYS, DEFAULT_DELIMITER)
@@ -412,18 +448,27 @@ async fn init_event_notifier() {
.get_value(rustfs_config::notify::NOTIFY_WEBHOOK_SUB_SYS, DEFAULT_DELIMITER)
.is_none()
{
info!("'notify' subsystem not configured, skipping event notifier initialization.");
info!(
target: "rustfs::main::init_event_notifier",
"'notify' subsystem not configured, skipping event notifier initialization."
);
return;
}
info!("Event notifier configuration found, proceeding with initialization.");
info!(
target: "rustfs::main::init_event_notifier",
"Event notifier configuration found, proceeding with initialization."
);
// 3. Initialize the notification system asynchronously with a global configuration
// Use direct await for better error handling and faster initialization
if let Err(e) = rustfs_notify::initialize(server_config).await {
error!("Failed to initialize event notifier system: {}", e);
} else {
info!("Event notifier system initialized successfully.");
info!(
target: "rustfs::main::init_event_notifier",
"Event notifier system initialized successfully."
);
}
}
@@ -492,7 +537,10 @@ async fn add_bucket_notification_configuration(buckets: Vec<String>) {
match has_notification_config {
Some(cfg) => {
info!("Bucket '{}' has existing notification configuration: {:?}", bucket, cfg);
info!(
target: "rustfs::main::add_bucket_notification_configuration",
bucket = %bucket,
"Bucket '{}' has existing notification configuration: {:?}", bucket, cfg);
let mut event_rules = Vec::new();
process_queue_configurations(&mut event_rules, cfg.queue_configurations.clone(), TargetID::from_str);
@@ -508,7 +556,10 @@ async fn add_bucket_notification_configuration(buckets: Vec<String>) {
}
}
None => {
info!("Bucket '{}' has no existing notification configuration.", bucket);
info!(
target: "rustfs::main::add_bucket_notification_configuration",
bucket = %bucket,
"Bucket '{}' has no existing notification configuration.", bucket);
}
}
}