Compare commits

...

12 Commits

Author SHA1 Message Date
houseme
f7e188eee7 feat: upgrade datafusion to v50.0.0 and update related dependencies f… (#563)
* feat: upgrade datafusion to v50.0.0 and update related dependencies for compatibility

* fix

* fmt
2025-09-18 23:30:25 +08:00
houseme
4b9cb512f2 remove crate rustfs-audit-logger (#562) 2025-09-18 17:46:46 +08:00
Copilot
e5f0760009 Fix entrypoint.sh incorrectly passing logs directory as data volume with improved separation (#561)
* Initial plan

* Fix entrypoint.sh: separate log directory from data volumes

Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com>

* Improve separation: use functions and RUSTFS_OBS_LOG_DIRECTORY env var

Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com>
2025-09-18 17:05:14 +08:00
houseme
a6c211f4ea Feature/add dns logs (#558)
* add logs

* improve code for dns and logger
2025-09-18 12:00:43 +08:00
shiro.lee
f049c656d9 fix: list_objects does not return common_prefixes field. (#543) (#554)
Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>
2025-09-18 07:27:37 +08:00
majinghe
65dd947350 add tls support for docker compose (#553)
* add tls support for docker compose

* update docker compose file with comment
2025-09-17 22:45:23 +08:00
0xdx2
57f082ee2b fix: enforce max-keys limit to 1000 in S3 implementation (#549)
Co-authored-by: damon <damonxue2@gmail.com>
2025-09-16 18:02:24 +08:00
weisd
ae7e86d7ef refactor: simplify initialization flow and modernize string formatting (#548) 2025-09-16 15:44:50 +08:00
houseme
a12a3bedc3 feat(obs): optimize WriteMode selection logic in init_telemetry (#546)
- Refactor WriteMode selection to ensure all variables moved into thread closures are owned types, preventing lifetime issues.
- Simplify and clarify WriteMode assignment for production and non-production environments.
- Improve code readability and maintainability for logger initialization.
2025-09-16 08:25:37 +08:00
Copilot
cafec06b7e [Optimization] Enhance obs module telemetry.rs with environment-aware logging and production security (#539)
* Initial plan

* Implement environment-aware logging with production stdout auto-disable

Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>

* add mimalloc crate

* fix

* improve code

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>
2025-09-15 14:52:20 +08:00
Parm Gill
1770679e66 Adding a toggle for update check (#532) 2025-09-14 22:26:48 +08:00
jon
a4fbf596e6 add startup logo (#528)
* add startup logo

* Replace logo ASCII art in main.rs

---------

Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: 安正超 <anzhengchao@gmail.com>
2025-09-14 12:04:00 +08:00
50 changed files with 1428 additions and 2211 deletions

901
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -16,7 +16,6 @@
members = [
"rustfs", # Core file system implementation
"crates/appauth", # Application authentication and authorization
"crates/audit-logger", # Audit logging system for file operations
"crates/common", # Shared utilities and data structures
"crates/config", # Configuration management
"crates/crypto", # Cryptography and security features
@@ -64,7 +63,6 @@ all = "warn"
rustfs-ahm = { path = "crates/ahm", version = "0.0.5" }
rustfs-s3select-api = { path = "crates/s3select-api", version = "0.0.5" }
rustfs-appauth = { path = "crates/appauth", version = "0.0.5" }
rustfs-audit-logger = { path = "crates/audit-logger", version = "0.0.5" }
rustfs-common = { path = "crates/common", version = "0.0.5" }
rustfs-crypto = { path = "crates/crypto", version = "0.0.5" }
rustfs-ecstore = { path = "crates/ecstore", version = "0.0.5" }
@@ -98,7 +96,7 @@ async-trait = "0.1.89"
async-compression = { version = "0.4.19" }
atomic_enum = "0.3.0"
aws-config = { version = "1.8.6" }
aws-sdk-s3 = "1.101.0"
aws-sdk-s3 = "1.106.0"
axum = "0.8.4"
axum-extra = "0.10.1"
axum-server = "0.7.2"
@@ -106,10 +104,10 @@ base64-simd = "0.8.0"
base64 = "0.22.1"
brotli = "8.0.2"
bytes = { version = "1.10.1", features = ["serde"] }
bytesize = "2.0.1"
bytesize = "2.1.0"
byteorder = "1.5.0"
cfg-if = "1.0.3"
crc-fast = "1.5.0"
crc-fast = "1.3.0"
chacha20poly1305 = { version = "0.10.1" }
chrono = { version = "0.4.42", features = ["serde"] }
clap = { version = "4.5.47", features = ["derive", "env"] }
@@ -118,12 +116,12 @@ crc32fast = "1.5.0"
criterion = { version = "0.7", features = ["html_reports"] }
crossbeam-queue = "0.3.12"
dashmap = "6.1.0"
datafusion = "46.0.1"
datafusion = "50.0.0"
derive_builder = "0.20.2"
enumset = "1.1.10"
flatbuffers = "25.2.10"
flate2 = "1.1.2"
flexi_logger = { version = "0.31.2", features = ["trc", "dont_minimize_extra_stacks"] }
flexi_logger = { version = "0.31.2", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] }
form_urlencoded = "1.2.2"
futures = "0.3.31"
futures-core = "0.3.31"
@@ -134,7 +132,7 @@ highway = { version = "1.3.0" }
hickory-resolver = { version = "0.25.2", features = ["tls-ring"] }
hmac = "0.12.1"
hyper = "1.7.0"
hyper-util = { version = "0.1.16", features = [
hyper-util = { version = "0.1.17", features = [
"tokio",
"server-auto",
"server-graceful",
@@ -158,7 +156,7 @@ nix = { version = "0.30.1", features = ["fs"] }
nu-ansi-term = "0.50.1"
num_cpus = { version = "1.17.0" }
nvml-wrapper = "0.11.0"
object_store = "0.11.2"
object_store = "0.12.3"
once_cell = "1.21.3"
opentelemetry = { version = "0.30.0" }
opentelemetry-appender-tracing = { version = "0.30.1", features = [
@@ -209,8 +207,8 @@ rustls-pki-types = "1.12.0"
rustls-pemfile = "2.2.0"
s3s = { version = "0.12.0-minio-preview.3" }
schemars = "1.0.4"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = { version = "1.0.143", features = ["raw_value"] }
serde = { version = "1.0.225", features = ["derive"] }
serde_json = { version = "1.0.145", features = ["raw_value"] }
serde_urlencoded = "0.7.1"
serial_test = "3.2.0"
sha1 = "0.10.6"
@@ -224,7 +222,7 @@ snap = "1.1.1"
socket2 = "0.6.0"
strum = { version = "0.27.2", features = ["derive"] }
sysinfo = "0.37.0"
sysctl = "0.6.0"
sysctl = "0.7.1"
tempfile = "3.22.0"
temp-env = "0.3.6"
test-case = "3.3.1"
@@ -237,7 +235,7 @@ time = { version = "0.3.43", features = [
"serde",
] }
tokio = { version = "1.47.1", features = ["fs", "rt-multi-thread"] }
tokio-rustls = { version = "0.26.2", default-features = false }
tokio-rustls = { version = "0.26.3", default-features = false }
tokio-stream = { version = "0.1.17" }
tokio-tar = "0.3.1"
tokio-test = "0.4.4"
@@ -260,14 +258,14 @@ uuid = { version = "1.18.1", features = [
"fast-rng",
"macro-diagnostics",
] }
wildmatch = { version = "2.4.0", features = ["serde"] }
wildmatch = { version = "2.5.0", features = ["serde"] }
winapi = { version = "0.3.9" }
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
zip = "2.4.2"
zip = "5.1.1"
zstd = "0.13.3"
[workspace.metadata.cargo-shear]
ignored = ["rustfs", "rust-i18n", "rustfs-mcp", "rustfs-audit-logger", "tokio-test"]
ignored = ["rustfs", "rust-i18n", "rustfs-mcp", "tokio-test"]
[profile.wasm-dev]
inherits = "dev"

View File

@@ -136,6 +136,8 @@ To get started with RustFS, follow these steps:
5. **Create a Bucket**: Use the console to create a new bucket for your objects.
6. **Upload Objects**: You can upload files directly through the console or use S3-compatible APIs to interact with your RustFS instance.
**NOTE**: If you want to access RustFS instance with `https`, you can refer to [TLS configuration docs](https://docs.rustfs.com/integration/tls-configured.html).
## Documentation
For detailed documentation, including configuration options, API references, and advanced usage, please visit our [Documentation](https://docs.rustfs.com).

View File

@@ -86,6 +86,8 @@ RustFS 是一个使用 Rust全球最受欢迎的编程语言之一构建
4. **创建存储桶**:使用控制台为您的对象创建新的存储桶。
5. **上传对象**:您可以直接通过控制台上传文件,或使用 S3 兼容的 API 与您的 RustFS 实例交互。
**注意**:如果你想通过 `https` 来访问 RustFS 实例,请参考 [TLS 配置文档](https://docs.rustfs.com/zh/integration/tls-configured.html)
## 文档
有关详细文档包括配置选项、API 参考和高级用法,请访问我们的[文档](https://docs.rustfs.com)。

View File

@@ -340,7 +340,7 @@ impl HealTask {
Ok((result, error)) => {
if let Some(e) = error {
// Check if this is a "File not found" error during delete operations
let error_msg = format!("{}", e);
let error_msg = format!("{e}");
if error_msg.contains("File not found") || error_msg.contains("not found") {
info!(
"Object {}/{} not found during heal - likely deleted intentionally, treating as successful",
@@ -395,7 +395,7 @@ impl HealTask {
}
Err(e) => {
// Check if this is a "File not found" error during delete operations
let error_msg = format!("{}", e);
let error_msg = format!("{e}");
if error_msg.contains("File not found") || error_msg.contains("not found") {
info!(
"Object {}/{} not found during heal - likely deleted intentionally, treating as successful",

View File

@@ -1,44 +0,0 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "rustfs-audit-logger"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
homepage.workspace = true
description = "Audit logging system for RustFS, providing detailed logging of file operations and system events."
documentation = "https://docs.rs/audit-logger/latest/audit_logger/"
keywords = ["audit", "logging", "file-operations", "system-events", "RustFS"]
categories = ["web-programming", "development-tools::profiling", "asynchronous", "api-bindings", "development-tools::debugging"]
[dependencies]
rustfs-targets = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true, features = ["std", "attributes"] }
tracing-core = { workspace = true }
tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread", "rt", "time", "macros"] }
url = { workspace = true }
uuid = { workspace = true }
thiserror = { workspace = true }
figment = { version = "0.10", features = ["json", "env"] }
[lints]
workspace = true

View File

@@ -1,34 +0,0 @@
{
"console": {
"enabled": true
},
"logger_webhook": {
"default": {
"enabled": true,
"endpoint": "http://localhost:3000/logs",
"auth_token": "secret-token-for-logs",
"batch_size": 5,
"queue_size": 1000,
"max_retry": 3,
"retry_interval": "2s"
}
},
"audit_webhook": {
"splunk": {
"enabled": true,
"endpoint": "http://localhost:3000/audit",
"auth_token": "secret-token-for-audit",
"batch_size": 10
}
},
"audit_kafka": {
"default": {
"enabled": false,
"brokers": [
"kafka1:9092",
"kafka2:9092"
],
"topic": "minio-audit-events"
}
}
}

View File

@@ -1,17 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
fn main() {
println!("Audit Logger Example");
}

View File

@@ -1,90 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use crate::entry::ObjectVersion;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Args - defines the arguments for API operations
/// Args is used to define the arguments for API operations.
///
/// # Example
/// ```
/// use rustfs_audit_logger::Args;
/// use std::collections::HashMap;
///
/// let args = Args::new()
/// .set_bucket(Some("my-bucket".to_string()))
/// .set_object(Some("my-object".to_string()))
/// .set_version_id(Some("123".to_string()))
/// .set_metadata(Some(HashMap::new()));
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, Default, Eq, PartialEq)]
pub struct Args {
#[serde(rename = "bucket", skip_serializing_if = "Option::is_none")]
pub bucket: Option<String>,
#[serde(rename = "object", skip_serializing_if = "Option::is_none")]
pub object: Option<String>,
#[serde(rename = "versionId", skip_serializing_if = "Option::is_none")]
pub version_id: Option<String>,
#[serde(rename = "objects", skip_serializing_if = "Option::is_none")]
pub objects: Option<Vec<ObjectVersion>>,
#[serde(rename = "metadata", skip_serializing_if = "Option::is_none")]
pub metadata: Option<HashMap<String, String>>,
}
impl Args {
/// Create a new Args object
pub fn new() -> Self {
Args {
bucket: None,
object: None,
version_id: None,
objects: None,
metadata: None,
}
}
/// Set the bucket
pub fn set_bucket(mut self, bucket: Option<String>) -> Self {
self.bucket = bucket;
self
}
/// Set the object
pub fn set_object(mut self, object: Option<String>) -> Self {
self.object = object;
self
}
/// Set the version ID
pub fn set_version_id(mut self, version_id: Option<String>) -> Self {
self.version_id = version_id;
self
}
/// Set the objects
pub fn set_objects(mut self, objects: Option<Vec<ObjectVersion>>) -> Self {
self.objects = objects;
self
}
/// Set the metadata
pub fn set_metadata(mut self, metadata: Option<HashMap<String, String>>) -> Self {
self.metadata = metadata;
self
}
}

View File

@@ -1,469 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use crate::{BaseLogEntry, LogRecord, ObjectVersion};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// API details structure
/// ApiDetails is used to define the details of an API operation
///
/// The `ApiDetails` structure contains the following fields:
/// - `name` - the name of the API operation
/// - `bucket` - the bucket name
/// - `object` - the object name
/// - `objects` - the list of objects
/// - `status` - the status of the API operation
/// - `status_code` - the status code of the API operation
/// - `input_bytes` - the input bytes
/// - `output_bytes` - the output bytes
/// - `header_bytes` - the header bytes
/// - `time_to_first_byte` - the time to first byte
/// - `time_to_first_byte_in_ns` - the time to first byte in nanoseconds
/// - `time_to_response` - the time to response
/// - `time_to_response_in_ns` - the time to response in nanoseconds
///
/// The `ApiDetails` structure contains the following methods:
/// - `new` - create a new `ApiDetails` with default values
/// - `set_name` - set the name
/// - `set_bucket` - set the bucket
/// - `set_object` - set the object
/// - `set_objects` - set the objects
/// - `set_status` - set the status
/// - `set_status_code` - set the status code
/// - `set_input_bytes` - set the input bytes
/// - `set_output_bytes` - set the output bytes
/// - `set_header_bytes` - set the header bytes
/// - `set_time_to_first_byte` - set the time to first byte
/// - `set_time_to_first_byte_in_ns` - set the time to first byte in nanoseconds
/// - `set_time_to_response` - set the time to response
/// - `set_time_to_response_in_ns` - set the time to response in nanoseconds
///
/// # Example
/// ```
/// use rustfs_audit_logger::ApiDetails;
/// use rustfs_audit_logger::ObjectVersion;
///
/// let api = ApiDetails::new()
/// .set_name(Some("GET".to_string()))
/// .set_bucket(Some("my-bucket".to_string()))
/// .set_object(Some("my-object".to_string()))
/// .set_objects(vec![ObjectVersion::new_with_object_name("my-object".to_string())])
/// .set_status(Some("OK".to_string()))
/// .set_status_code(Some(200))
/// .set_input_bytes(100)
/// .set_output_bytes(200)
/// .set_header_bytes(Some(50))
/// .set_time_to_first_byte(Some("100ms".to_string()))
/// .set_time_to_first_byte_in_ns(Some("100000000ns".to_string()))
/// .set_time_to_response(Some("200ms".to_string()))
/// .set_time_to_response_in_ns(Some("200000000ns".to_string()));
/// ```
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
pub struct ApiDetails {
#[serde(rename = "name", skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(rename = "bucket", skip_serializing_if = "Option::is_none")]
pub bucket: Option<String>,
#[serde(rename = "object", skip_serializing_if = "Option::is_none")]
pub object: Option<String>,
#[serde(rename = "objects", skip_serializing_if = "Vec::is_empty", default)]
pub objects: Vec<ObjectVersion>,
#[serde(rename = "status", skip_serializing_if = "Option::is_none")]
pub status: Option<String>,
#[serde(rename = "statusCode", skip_serializing_if = "Option::is_none")]
pub status_code: Option<i32>,
#[serde(rename = "rx")]
pub input_bytes: i64,
#[serde(rename = "tx")]
pub output_bytes: i64,
#[serde(rename = "txHeaders", skip_serializing_if = "Option::is_none")]
pub header_bytes: Option<i64>,
#[serde(rename = "timeToFirstByte", skip_serializing_if = "Option::is_none")]
pub time_to_first_byte: Option<String>,
#[serde(rename = "timeToFirstByteInNS", skip_serializing_if = "Option::is_none")]
pub time_to_first_byte_in_ns: Option<String>,
#[serde(rename = "timeToResponse", skip_serializing_if = "Option::is_none")]
pub time_to_response: Option<String>,
#[serde(rename = "timeToResponseInNS", skip_serializing_if = "Option::is_none")]
pub time_to_response_in_ns: Option<String>,
}
impl ApiDetails {
/// Create a new `ApiDetails` with default values
pub fn new() -> Self {
ApiDetails {
name: None,
bucket: None,
object: None,
objects: Vec::new(),
status: None,
status_code: None,
input_bytes: 0,
output_bytes: 0,
header_bytes: None,
time_to_first_byte: None,
time_to_first_byte_in_ns: None,
time_to_response: None,
time_to_response_in_ns: None,
}
}
/// Set the name
pub fn set_name(mut self, name: Option<String>) -> Self {
self.name = name;
self
}
/// Set the bucket
pub fn set_bucket(mut self, bucket: Option<String>) -> Self {
self.bucket = bucket;
self
}
/// Set the object
pub fn set_object(mut self, object: Option<String>) -> Self {
self.object = object;
self
}
/// Set the objects
pub fn set_objects(mut self, objects: Vec<ObjectVersion>) -> Self {
self.objects = objects;
self
}
/// Set the status
pub fn set_status(mut self, status: Option<String>) -> Self {
self.status = status;
self
}
/// Set the status code
pub fn set_status_code(mut self, status_code: Option<i32>) -> Self {
self.status_code = status_code;
self
}
/// Set the input bytes
pub fn set_input_bytes(mut self, input_bytes: i64) -> Self {
self.input_bytes = input_bytes;
self
}
/// Set the output bytes
pub fn set_output_bytes(mut self, output_bytes: i64) -> Self {
self.output_bytes = output_bytes;
self
}
/// Set the header bytes
pub fn set_header_bytes(mut self, header_bytes: Option<i64>) -> Self {
self.header_bytes = header_bytes;
self
}
/// Set the time to first byte
pub fn set_time_to_first_byte(mut self, time_to_first_byte: Option<String>) -> Self {
self.time_to_first_byte = time_to_first_byte;
self
}
/// Set the time to first byte in nanoseconds
pub fn set_time_to_first_byte_in_ns(mut self, time_to_first_byte_in_ns: Option<String>) -> Self {
self.time_to_first_byte_in_ns = time_to_first_byte_in_ns;
self
}
/// Set the time to response
pub fn set_time_to_response(mut self, time_to_response: Option<String>) -> Self {
self.time_to_response = time_to_response;
self
}
/// Set the time to response in nanoseconds
pub fn set_time_to_response_in_ns(mut self, time_to_response_in_ns: Option<String>) -> Self {
self.time_to_response_in_ns = time_to_response_in_ns;
self
}
}
/// Entry - audit entry logs
/// AuditLogEntry is used to define the structure of an audit log entry
///
/// The `AuditLogEntry` structure contains the following fields:
/// - `base` - the base log entry
/// - `version` - the version of the audit log entry
/// - `deployment_id` - the deployment ID
/// - `event` - the event
/// - `entry_type` - the type of audit message
/// - `api` - the API details
/// - `remote_host` - the remote host
/// - `user_agent` - the user agent
/// - `req_path` - the request path
/// - `req_host` - the request host
/// - `req_claims` - the request claims
/// - `req_query` - the request query
/// - `req_header` - the request header
/// - `resp_header` - the response header
/// - `access_key` - the access key
/// - `parent_user` - the parent user
/// - `error` - the error
///
/// The `AuditLogEntry` structure contains the following methods:
/// - `new` - create a new `AuditEntry` with default values
/// - `new_with_values` - create a new `AuditEntry` with version, time, event and api details
/// - `with_base` - set the base log entry
/// - `set_version` - set the version
/// - `set_deployment_id` - set the deployment ID
/// - `set_event` - set the event
/// - `set_entry_type` - set the entry type
/// - `set_api` - set the API details
/// - `set_remote_host` - set the remote host
/// - `set_user_agent` - set the user agent
/// - `set_req_path` - set the request path
/// - `set_req_host` - set the request host
/// - `set_req_claims` - set the request claims
/// - `set_req_query` - set the request query
/// - `set_req_header` - set the request header
/// - `set_resp_header` - set the response header
/// - `set_access_key` - set the access key
/// - `set_parent_user` - set the parent user
/// - `set_error` - set the error
///
/// # Example
/// ```
/// use rustfs_audit_logger::AuditLogEntry;
/// use rustfs_audit_logger::ApiDetails;
/// use std::collections::HashMap;
///
/// let entry = AuditLogEntry::new()
/// .set_version("1.0".to_string())
/// .set_deployment_id(Some("123".to_string()))
/// .set_event("event".to_string())
/// .set_entry_type(Some("type".to_string()))
/// .set_api(ApiDetails::new())
/// .set_remote_host(Some("remote-host".to_string()))
/// .set_user_agent(Some("user-agent".to_string()))
/// .set_req_path(Some("req-path".to_string()))
/// .set_req_host(Some("req-host".to_string()))
/// .set_req_claims(Some(HashMap::new()))
/// .set_req_query(Some(HashMap::new()))
/// .set_req_header(Some(HashMap::new()))
/// .set_resp_header(Some(HashMap::new()))
/// .set_access_key(Some("access-key".to_string()))
/// .set_parent_user(Some("parent-user".to_string()))
/// .set_error(Some("error".to_string()));
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
pub struct AuditLogEntry {
#[serde(flatten)]
pub base: BaseLogEntry,
pub version: String,
#[serde(rename = "deploymentid", skip_serializing_if = "Option::is_none")]
pub deployment_id: Option<String>,
pub event: String,
// Class of audit message - S3, admin ops, bucket management
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub entry_type: Option<String>,
pub api: ApiDetails,
#[serde(rename = "remotehost", skip_serializing_if = "Option::is_none")]
pub remote_host: Option<String>,
#[serde(rename = "userAgent", skip_serializing_if = "Option::is_none")]
pub user_agent: Option<String>,
#[serde(rename = "requestPath", skip_serializing_if = "Option::is_none")]
pub req_path: Option<String>,
#[serde(rename = "requestHost", skip_serializing_if = "Option::is_none")]
pub req_host: Option<String>,
#[serde(rename = "requestClaims", skip_serializing_if = "Option::is_none")]
pub req_claims: Option<HashMap<String, Value>>,
#[serde(rename = "requestQuery", skip_serializing_if = "Option::is_none")]
pub req_query: Option<HashMap<String, String>>,
#[serde(rename = "requestHeader", skip_serializing_if = "Option::is_none")]
pub req_header: Option<HashMap<String, String>>,
#[serde(rename = "responseHeader", skip_serializing_if = "Option::is_none")]
pub resp_header: Option<HashMap<String, String>>,
#[serde(rename = "accessKey", skip_serializing_if = "Option::is_none")]
pub access_key: Option<String>,
#[serde(rename = "parentUser", skip_serializing_if = "Option::is_none")]
pub parent_user: Option<String>,
#[serde(rename = "error", skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
impl AuditLogEntry {
/// Create a new `AuditEntry` with default values
pub fn new() -> Self {
AuditLogEntry {
base: BaseLogEntry::new(),
version: String::new(),
deployment_id: None,
event: String::new(),
entry_type: None,
api: ApiDetails::new(),
remote_host: None,
user_agent: None,
req_path: None,
req_host: None,
req_claims: None,
req_query: None,
req_header: None,
resp_header: None,
access_key: None,
parent_user: None,
error: None,
}
}
/// Create a new `AuditEntry` with version, time, event and api details
pub fn new_with_values(version: String, time: DateTime<Utc>, event: String, api: ApiDetails) -> Self {
let mut base = BaseLogEntry::new();
base.timestamp = time;
AuditLogEntry {
base,
version,
deployment_id: None,
event,
entry_type: None,
api,
remote_host: None,
user_agent: None,
req_path: None,
req_host: None,
req_claims: None,
req_query: None,
req_header: None,
resp_header: None,
access_key: None,
parent_user: None,
error: None,
}
}
/// Set the base log entry
pub fn with_base(mut self, base: BaseLogEntry) -> Self {
self.base = base;
self
}
/// Set the version
pub fn set_version(mut self, version: String) -> Self {
self.version = version;
self
}
/// Set the deployment ID
pub fn set_deployment_id(mut self, deployment_id: Option<String>) -> Self {
self.deployment_id = deployment_id;
self
}
/// Set the event
pub fn set_event(mut self, event: String) -> Self {
self.event = event;
self
}
/// Set the entry type
pub fn set_entry_type(mut self, entry_type: Option<String>) -> Self {
self.entry_type = entry_type;
self
}
/// Set the API details
pub fn set_api(mut self, api: ApiDetails) -> Self {
self.api = api;
self
}
/// Set the remote host
pub fn set_remote_host(mut self, remote_host: Option<String>) -> Self {
self.remote_host = remote_host;
self
}
/// Set the user agent
pub fn set_user_agent(mut self, user_agent: Option<String>) -> Self {
self.user_agent = user_agent;
self
}
/// Set the request path
pub fn set_req_path(mut self, req_path: Option<String>) -> Self {
self.req_path = req_path;
self
}
/// Set the request host
pub fn set_req_host(mut self, req_host: Option<String>) -> Self {
self.req_host = req_host;
self
}
/// Set the request claims
pub fn set_req_claims(mut self, req_claims: Option<HashMap<String, Value>>) -> Self {
self.req_claims = req_claims;
self
}
/// Set the request query
pub fn set_req_query(mut self, req_query: Option<HashMap<String, String>>) -> Self {
self.req_query = req_query;
self
}
/// Set the request header
pub fn set_req_header(mut self, req_header: Option<HashMap<String, String>>) -> Self {
self.req_header = req_header;
self
}
/// Set the response header
pub fn set_resp_header(mut self, resp_header: Option<HashMap<String, String>>) -> Self {
self.resp_header = resp_header;
self
}
/// Set the access key
pub fn set_access_key(mut self, access_key: Option<String>) -> Self {
self.access_key = access_key;
self
}
/// Set the parent user
pub fn set_parent_user(mut self, parent_user: Option<String>) -> Self {
self.parent_user = parent_user;
self
}
/// Set the error
pub fn set_error(mut self, error: Option<String>) -> Self {
self.error = error;
self
}
}
impl LogRecord for AuditLogEntry {
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
fn get_timestamp(&self) -> DateTime<Utc> {
self.base.timestamp
}
}

View File

@@ -1,108 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// Base log entry structure shared by all log types
/// This structure is used to serialize log entries to JSON
/// and send them to the log sinks
/// This structure is also used to deserialize log entries from JSON
/// This structure is also used to store log entries in the database
/// This structure is also used to query log entries from the database
///
/// The `BaseLogEntry` structure contains the following fields:
/// - `timestamp` - the timestamp of the log entry
/// - `request_id` - the request ID of the log entry
/// - `message` - the message of the log entry
/// - `tags` - the tags of the log entry
///
/// The `BaseLogEntry` structure contains the following methods:
/// - `new` - create a new `BaseLogEntry` with default values
/// - `message` - set the message
/// - `request_id` - set the request ID
/// - `tags` - set the tags
/// - `timestamp` - set the timestamp
///
/// # Example
/// ```
/// use rustfs_audit_logger::BaseLogEntry;
/// use chrono::{DateTime, Utc};
/// use std::collections::HashMap;
///
/// let timestamp = Utc::now();
/// let request = Some("req-123".to_string());
/// let message = Some("This is a log message".to_string());
/// let tags = Some(HashMap::new());
///
/// let entry = BaseLogEntry::new()
/// .timestamp(timestamp)
/// .request_id(request)
/// .message(message)
/// .tags(tags);
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct BaseLogEntry {
#[serde(rename = "time")]
pub timestamp: DateTime<Utc>,
#[serde(rename = "requestID", skip_serializing_if = "Option::is_none")]
pub request_id: Option<String>,
#[serde(rename = "message", skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(rename = "tags", skip_serializing_if = "Option::is_none")]
pub tags: Option<HashMap<String, Value>>,
}
impl BaseLogEntry {
/// Create a new BaseLogEntry with default values
pub fn new() -> Self {
BaseLogEntry {
timestamp: Utc::now(),
request_id: None,
message: None,
tags: None,
}
}
/// Set the message
pub fn message(mut self, message: Option<String>) -> Self {
self.message = message;
self
}
/// Set the request ID
pub fn request_id(mut self, request_id: Option<String>) -> Self {
self.request_id = request_id;
self
}
/// Set the tags
pub fn tags(mut self, tags: Option<HashMap<String, Value>>) -> Self {
self.tags = tags;
self
}
/// Set the timestamp
pub fn timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
self.timestamp = timestamp;
self
}
}

View File

@@ -1,159 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
pub(crate) mod args;
pub(crate) mod audit;
pub(crate) mod base;
pub(crate) mod unified;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tracing_core::Level;
/// ObjectVersion is used across multiple modules
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ObjectVersion {
#[serde(rename = "name")]
pub object_name: String,
#[serde(rename = "versionId", skip_serializing_if = "Option::is_none")]
pub version_id: Option<String>,
}
impl ObjectVersion {
/// Create a new ObjectVersion object
pub fn new() -> Self {
ObjectVersion {
object_name: String::new(),
version_id: None,
}
}
/// Create a new ObjectVersion with object name
pub fn new_with_object_name(object_name: String) -> Self {
ObjectVersion {
object_name,
version_id: None,
}
}
/// Set the object name
pub fn set_object_name(mut self, object_name: String) -> Self {
self.object_name = object_name;
self
}
/// Set the version ID
pub fn set_version_id(mut self, version_id: Option<String>) -> Self {
self.version_id = version_id;
self
}
}
impl Default for ObjectVersion {
fn default() -> Self {
Self::new()
}
}
/// Log kind/level enum
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub enum LogKind {
#[serde(rename = "INFO")]
#[default]
Info,
#[serde(rename = "WARNING")]
Warning,
#[serde(rename = "ERROR")]
Error,
#[serde(rename = "FATAL")]
Fatal,
}
/// Trait for types that can be serialized to JSON and have a timestamp
/// This trait is used by `ServerLogEntry` to convert the log entry to JSON
/// and get the timestamp of the log entry
/// This trait is implemented by `ServerLogEntry`
///
/// # Example
/// ```
/// use rustfs_audit_logger::LogRecord;
/// use chrono::{DateTime, Utc};
/// use rustfs_audit_logger::ServerLogEntry;
/// use tracing_core::Level;
///
/// let log_entry = ServerLogEntry::new(Level::INFO, "api_handler".to_string());
/// let json = log_entry.to_json();
/// let timestamp = log_entry.get_timestamp();
/// ```
pub trait LogRecord {
fn to_json(&self) -> String;
fn get_timestamp(&self) -> chrono::DateTime<chrono::Utc>;
}
/// Wrapper for `tracing_core::Level` to implement `Serialize` and `Deserialize`
/// for `ServerLogEntry`
/// This is necessary because `tracing_core::Level` does not implement `Serialize`
/// and `Deserialize`
/// This is a workaround to allow `ServerLogEntry` to be serialized and deserialized
/// using `serde`
///
/// # Example
/// ```
/// use rustfs_audit_logger::SerializableLevel;
/// use tracing_core::Level;
///
/// let level = Level::INFO;
/// let serializable_level = SerializableLevel::from(level);
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SerializableLevel(pub Level);
impl From<Level> for SerializableLevel {
fn from(level: Level) -> Self {
SerializableLevel(level)
}
}
impl From<SerializableLevel> for Level {
fn from(serializable_level: SerializableLevel) -> Self {
serializable_level.0
}
}
impl Serialize for SerializableLevel {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.0.as_str())
}
}
impl<'de> Deserialize<'de> for SerializableLevel {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
match s.as_str() {
"TRACE" => Ok(SerializableLevel(Level::TRACE)),
"DEBUG" => Ok(SerializableLevel(Level::DEBUG)),
"INFO" => Ok(SerializableLevel(Level::INFO)),
"WARN" => Ok(SerializableLevel(Level::WARN)),
"ERROR" => Ok(SerializableLevel(Level::ERROR)),
_ => Err(D::Error::custom("unknown log level")),
}
}
}

View File

@@ -1,266 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use crate::{AuditLogEntry, BaseLogEntry, LogKind, LogRecord, SerializableLevel};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tracing_core::Level;
/// Server log entry with structured fields
/// ServerLogEntry is used to log structured log entries from the server
///
/// The `ServerLogEntry` structure contains the following fields:
/// - `base` - the base log entry
/// - `level` - the log level
/// - `source` - the source of the log entry
/// - `user_id` - the user ID
/// - `fields` - the structured fields of the log entry
///
/// The `ServerLogEntry` structure contains the following methods:
/// - `new` - create a new `ServerLogEntry` with specified level and source
/// - `with_base` - set the base log entry
/// - `user_id` - set the user ID
/// - `fields` - set the fields
/// - `add_field` - add a field
///
/// # Example
/// ```
/// use rustfs_audit_logger::ServerLogEntry;
/// use tracing_core::Level;
///
/// let entry = ServerLogEntry::new(Level::INFO, "test_module".to_string())
/// .user_id(Some("user-456".to_string()))
/// .add_field("operation".to_string(), "login".to_string());
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ServerLogEntry {
#[serde(flatten)]
pub base: BaseLogEntry,
pub level: SerializableLevel,
pub source: String,
#[serde(rename = "userId", skip_serializing_if = "Option::is_none")]
pub user_id: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub fields: Vec<(String, String)>,
}
impl ServerLogEntry {
/// Create a new ServerLogEntry with specified level and source
pub fn new(level: Level, source: String) -> Self {
ServerLogEntry {
base: BaseLogEntry::new(),
level: SerializableLevel(level),
source,
user_id: None,
fields: Vec::new(),
}
}
/// Set the base log entry
pub fn with_base(mut self, base: BaseLogEntry) -> Self {
self.base = base;
self
}
/// Set the user ID
pub fn user_id(mut self, user_id: Option<String>) -> Self {
self.user_id = user_id;
self
}
/// Set fields
pub fn fields(mut self, fields: Vec<(String, String)>) -> Self {
self.fields = fields;
self
}
/// Add a field
pub fn add_field(mut self, key: String, value: String) -> Self {
self.fields.push((key, value));
self
}
}
impl LogRecord for ServerLogEntry {
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
fn get_timestamp(&self) -> DateTime<Utc> {
self.base.timestamp
}
}
/// Console log entry structure
/// ConsoleLogEntry is used to log console log entries
/// The `ConsoleLogEntry` structure contains the following fields:
/// - `base` - the base log entry
/// - `level` - the log level
/// - `console_msg` - the console message
/// - `node_name` - the node name
/// - `err` - the error message
///
/// The `ConsoleLogEntry` structure contains the following methods:
/// - `new` - create a new `ConsoleLogEntry`
/// - `new_with_console_msg` - create a new `ConsoleLogEntry` with console message and node name
/// - `with_base` - set the base log entry
/// - `set_level` - set the log level
/// - `set_node_name` - set the node name
/// - `set_console_msg` - set the console message
/// - `set_err` - set the error message
///
/// # Example
/// ```
/// use rustfs_audit_logger::ConsoleLogEntry;
///
/// let entry = ConsoleLogEntry::new_with_console_msg("Test message".to_string(), "node-123".to_string());
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsoleLogEntry {
#[serde(flatten)]
pub base: BaseLogEntry,
pub level: LogKind,
pub console_msg: String,
pub node_name: String,
#[serde(skip)]
pub err: Option<String>,
}
impl ConsoleLogEntry {
/// Create a new ConsoleLogEntry
pub fn new() -> Self {
ConsoleLogEntry {
base: BaseLogEntry::new(),
level: LogKind::Info,
console_msg: String::new(),
node_name: String::new(),
err: None,
}
}
/// Create a new ConsoleLogEntry with console message and node name
pub fn new_with_console_msg(console_msg: String, node_name: String) -> Self {
ConsoleLogEntry {
base: BaseLogEntry::new(),
level: LogKind::Info,
console_msg,
node_name,
err: None,
}
}
/// Set the base log entry
pub fn with_base(mut self, base: BaseLogEntry) -> Self {
self.base = base;
self
}
/// Set the log level
pub fn set_level(mut self, level: LogKind) -> Self {
self.level = level;
self
}
/// Set the node name
pub fn set_node_name(mut self, node_name: String) -> Self {
self.node_name = node_name;
self
}
/// Set the console message
pub fn set_console_msg(mut self, console_msg: String) -> Self {
self.console_msg = console_msg;
self
}
/// Set the error message
pub fn set_err(mut self, err: Option<String>) -> Self {
self.err = err;
self
}
}
impl Default for ConsoleLogEntry {
fn default() -> Self {
Self::new()
}
}
impl LogRecord for ConsoleLogEntry {
fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| String::from("{}"))
}
fn get_timestamp(&self) -> DateTime<Utc> {
self.base.timestamp
}
}
/// Unified log entry type
/// UnifiedLogEntry is used to log different types of log entries
///
/// The `UnifiedLogEntry` enum contains the following variants:
/// - `Server` - a server log entry
/// - `Audit` - an audit log entry
/// - `Console` - a console log entry
///
/// The `UnifiedLogEntry` enum contains the following methods:
/// - `to_json` - convert the log entry to JSON
/// - `get_timestamp` - get the timestamp of the log entry
///
/// # Example
/// ```
/// use rustfs_audit_logger::{UnifiedLogEntry, ServerLogEntry};
/// use tracing_core::Level;
///
/// let server_entry = ServerLogEntry::new(Level::INFO, "test_module".to_string());
/// let unified = UnifiedLogEntry::Server(server_entry);
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum UnifiedLogEntry {
#[serde(rename = "server")]
Server(ServerLogEntry),
#[serde(rename = "audit")]
Audit(Box<AuditLogEntry>),
#[serde(rename = "console")]
Console(ConsoleLogEntry),
}
impl LogRecord for UnifiedLogEntry {
fn to_json(&self) -> String {
match self {
UnifiedLogEntry::Server(entry) => entry.to_json(),
UnifiedLogEntry::Audit(entry) => entry.to_json(),
UnifiedLogEntry::Console(entry) => entry.to_json(),
}
}
fn get_timestamp(&self) -> DateTime<Utc> {
match self {
UnifiedLogEntry::Server(entry) => entry.get_timestamp(),
UnifiedLogEntry::Audit(entry) => entry.get_timestamp(),
UnifiedLogEntry::Console(entry) => entry.get_timestamp(),
}
}
}

View File

@@ -1,8 +0,0 @@
mod entry;
mod logger;
pub use entry::args::Args;
pub use entry::audit::{ApiDetails, AuditLogEntry};
pub use entry::base::BaseLogEntry;
pub use entry::unified::{ConsoleLogEntry, ServerLogEntry, UnifiedLogEntry};
pub use entry::{LogKind, LogRecord, ObjectVersion, SerializableLevel};

View File

@@ -1,29 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
// Default value function
fn default_batch_size() -> usize {
10
}
fn default_queue_size() -> usize {
10000
}
fn default_max_retry() -> u32 {
5
}
fn default_retry_interval() -> std::time::Duration {
std::time::Duration::from_secs(3)
}

View File

@@ -1,13 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

View File

@@ -1,108 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use chrono::{DateTime, Utc};
use serde::Serialize;
use std::collections::HashMap;
use uuid::Uuid;
///A Trait for a log entry that can be serialized and sent
pub trait Loggable: Serialize + Send + Sync + 'static {
fn to_json(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
}
/// Standard log entries
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct LogEntry {
pub deployment_id: String,
pub level: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub trace: Option<Trace>,
pub time: DateTime<Utc>,
pub request_id: String,
}
impl Loggable for LogEntry {}
/// Audit log entry
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct AuditEntry {
pub version: String,
pub deployment_id: String,
pub time: DateTime<Utc>,
pub trigger: String,
pub api: ApiDetails,
pub remote_host: String,
pub request_id: String,
pub user_agent: String,
pub access_key: String,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub tags: HashMap<String, String>,
}
impl Loggable for AuditEntry {}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Trace {
pub message: String,
pub source: Vec<String>,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub variables: HashMap<String, String>,
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ApiDetails {
pub name: String,
pub bucket: String,
pub object: String,
pub status: String,
pub status_code: u16,
pub time_to_first_byte: String,
pub time_to_response: String,
}
// Helper functions to create entries
impl AuditEntry {
pub fn new(api_name: &str, bucket: &str, object: &str) -> Self {
AuditEntry {
version: "1".to_string(),
deployment_id: "global-deployment-id".to_string(),
time: Utc::now(),
trigger: "incoming".to_string(),
api: ApiDetails {
name: api_name.to_string(),
bucket: bucket.to_string(),
object: object.to_string(),
status: "OK".to_string(),
status_code: 200,
time_to_first_byte: "10ms".to_string(),
time_to_response: "50ms".to_string(),
},
remote_host: "127.0.0.1".to_string(),
request_id: Uuid::new_v4().to_string(),
user_agent: "Rust-Client/1.0".to_string(),
access_key: "minioadmin".to_string(),
tags: HashMap::new(),
}
}
}

View File

@@ -1,13 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

View File

@@ -1,36 +0,0 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
pub mod config;
pub mod dispatch;
pub mod entry;
pub mod factory;
use async_trait::async_trait;
use std::error::Error;
/// General Log Target Trait
#[async_trait]
pub trait Target: Send + Sync {
/// Send a single logizable entry
async fn send(&self, entry: Box<Self>) -> Result<(), Box<dyn Error + Send>>;
/// Returns the unique name of the target
fn name(&self) -> &str;
/// Close target gracefully, ensuring all buffered logs are processed
async fn shutdown(&self);
}

View File

@@ -79,3 +79,13 @@ pub const ENV_CONSOLE_AUTH_TIMEOUT: &str = "RUSTFS_CONSOLE_AUTH_TIMEOUT";
/// Example: RUSTFS_CONSOLE_AUTH_TIMEOUT=3600
/// Example: --console-auth-timeout 3600
pub const DEFAULT_CONSOLE_AUTH_TIMEOUT: u64 = 3600;
/// Toggle update check
/// It controls whether to check for newer versions of rustfs
/// Default value: true
/// Environment variable: RUSTFS_CHECK_UPDATE
/// Example: RUSTFS_CHECK_UPDATE=false
pub const ENV_UPDATE_CHECK: &str = "RUSTFS_CHECK_UPDATE";
/// Default value for update toggle
pub const DEFAULT_UPDATE_CHECK: bool = true;

View File

@@ -29,7 +29,70 @@ pub const ENV_OBS_LOG_ROTATION_SIZE_MB: &str = "RUSTFS_OBS_LOG_ROTATION_SIZE_MB"
pub const ENV_OBS_LOG_ROTATION_TIME: &str = "RUSTFS_OBS_LOG_ROTATION_TIME";
pub const ENV_OBS_LOG_KEEP_FILES: &str = "RUSTFS_OBS_LOG_KEEP_FILES";
/// Log pool capacity for async logging
pub const ENV_OBS_LOG_POOL_CAPA: &str = "RUSTFS_OBS_LOG_POOL_CAPA";
/// Log message capacity for async logging
pub const ENV_OBS_LOG_MESSAGE_CAPA: &str = "RUSTFS_OBS_LOG_MESSAGE_CAPA";
/// Log flush interval in milliseconds for async logging
pub const ENV_OBS_LOG_FLUSH_MS: &str = "RUSTFS_OBS_LOG_FLUSH_MS";
/// Default values for log pool
pub const DEFAULT_OBS_LOG_POOL_CAPA: usize = 10240;
/// Default values for message capacity
pub const DEFAULT_OBS_LOG_MESSAGE_CAPA: usize = 32768;
/// Default values for flush interval in milliseconds
pub const DEFAULT_OBS_LOG_FLUSH_MS: u64 = 200;
/// Audit logger queue capacity environment variable key
pub const ENV_AUDIT_LOGGER_QUEUE_CAPACITY: &str = "RUSTFS_AUDIT_LOGGER_QUEUE_CAPACITY";
// Default values for observability configuration
/// Default values for observability configuration
pub const DEFAULT_AUDIT_LOGGER_QUEUE_CAPACITY: usize = 10000;
/// Default values for observability configuration
// ### Supported Environment Values
// - `production` - Secure file-only logging
// - `development` - Full debugging with stdout
// - `test` - Test environment with stdout support
// - `staging` - Staging environment with stdout support
pub const DEFAULT_OBS_ENVIRONMENT_PRODUCTION: &str = "production";
pub const DEFAULT_OBS_ENVIRONMENT_DEVELOPMENT: &str = "development";
pub const DEFAULT_OBS_ENVIRONMENT_TEST: &str = "test";
pub const DEFAULT_OBS_ENVIRONMENT_STAGING: &str = "staging";
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_env_keys() {
assert_eq!(ENV_OBS_ENDPOINT, "RUSTFS_OBS_ENDPOINT");
assert_eq!(ENV_OBS_USE_STDOUT, "RUSTFS_OBS_USE_STDOUT");
assert_eq!(ENV_OBS_SAMPLE_RATIO, "RUSTFS_OBS_SAMPLE_RATIO");
assert_eq!(ENV_OBS_METER_INTERVAL, "RUSTFS_OBS_METER_INTERVAL");
assert_eq!(ENV_OBS_SERVICE_NAME, "RUSTFS_OBS_SERVICE_NAME");
assert_eq!(ENV_OBS_SERVICE_VERSION, "RUSTFS_OBS_SERVICE_VERSION");
assert_eq!(ENV_OBS_ENVIRONMENT, "RUSTFS_OBS_ENVIRONMENT");
assert_eq!(ENV_OBS_LOGGER_LEVEL, "RUSTFS_OBS_LOGGER_LEVEL");
assert_eq!(ENV_OBS_LOCAL_LOGGING_ENABLED, "RUSTFS_OBS_LOCAL_LOGGING_ENABLED");
assert_eq!(ENV_OBS_LOG_DIRECTORY, "RUSTFS_OBS_LOG_DIRECTORY");
assert_eq!(ENV_OBS_LOG_FILENAME, "RUSTFS_OBS_LOG_FILENAME");
assert_eq!(ENV_OBS_LOG_ROTATION_SIZE_MB, "RUSTFS_OBS_LOG_ROTATION_SIZE_MB");
assert_eq!(ENV_OBS_LOG_ROTATION_TIME, "RUSTFS_OBS_LOG_ROTATION_TIME");
assert_eq!(ENV_OBS_LOG_KEEP_FILES, "RUSTFS_OBS_LOG_KEEP_FILES");
assert_eq!(ENV_AUDIT_LOGGER_QUEUE_CAPACITY, "RUSTFS_AUDIT_LOGGER_QUEUE_CAPACITY");
}
#[test]
fn test_default_values() {
assert_eq!(DEFAULT_AUDIT_LOGGER_QUEUE_CAPACITY, 10000);
assert_eq!(DEFAULT_OBS_ENVIRONMENT_PRODUCTION, "production");
assert_eq!(DEFAULT_OBS_ENVIRONMENT_DEVELOPMENT, "development");
assert_eq!(DEFAULT_OBS_ENVIRONMENT_TEST, "test");
assert_eq!(DEFAULT_OBS_ENVIRONMENT_STAGING, "staging");
}
}

View File

@@ -458,7 +458,7 @@ impl LocalDisk {
{
let cache = self.path_cache.read();
for (i, (bucket, key)) in requests.iter().enumerate() {
let cache_key = format!("{}/{}", bucket, key);
let cache_key = format!("{bucket}/{key}");
if let Some(cached_path) = cache.get(&cache_key) {
results.push((i, cached_path.clone()));
} else {

View File

@@ -13,13 +13,12 @@
// limitations under the License.
use rustfs_utils::{XHost, check_local_server_addr, get_host_ip, is_local_host};
use tracing::{error, instrument, warn};
use tracing::{error, info, instrument, warn};
use crate::{
disk::endpoint::{Endpoint, EndpointType},
disks_layout::DisksLayout,
global::global_rustfs_port,
// utils::net::{self, XHost},
};
use std::io::{Error, Result};
use std::{
@@ -242,15 +241,32 @@ impl PoolEndpointList {
let host = ep.url.host().unwrap();
let host_ip_set = if let Some(set) = host_ip_cache.get(&host) {
info!(
target: "rustfs::ecstore::endpoints",
host = %host,
endpoint = %ep.to_string(),
from = "cache",
"Create pool endpoints host '{}' found in cache for endpoint '{}'", host, ep.to_string()
);
set
} else {
let ips = match get_host_ip(host.clone()).await {
Ok(ips) => ips,
Err(e) => {
error!("host {} not found, error:{}", host, e);
error!("Create pool endpoints host {} not found, error:{}", host, e);
return Err(Error::other(format!("host '{host}' cannot resolve: {e}")));
}
};
info!(
target: "rustfs::ecstore::endpoints",
host = %host,
endpoint = %ep.to_string(),
from = "get_host_ip",
"Create pool endpoints host '{}' resolved to ips {:?} for endpoint '{}'",
host,
ips,
ep.to_string()
);
host_ip_cache.insert(host.clone(), ips);
host_ip_cache.get(&host).unwrap()
};

View File

@@ -65,7 +65,7 @@ impl OptimizedFileCache {
// Cache miss, read file
let data = tokio::fs::read(&path)
.await
.map_err(|e| Error::other(format!("Read metadata failed: {}", e)))?;
.map_err(|e| Error::other(format!("Read metadata failed: {e}")))?;
let mut meta = FileMeta::default();
meta.unmarshal_msg(&data)?;
@@ -86,7 +86,7 @@ impl OptimizedFileCache {
let data = tokio::fs::read(&path)
.await
.map_err(|e| Error::other(format!("Read file failed: {}", e)))?;
.map_err(|e| Error::other(format!("Read file failed: {e}")))?;
let bytes = Bytes::from(data);
self.file_content_cache.insert(path, bytes.clone()).await;
@@ -295,9 +295,9 @@ mod tests {
let mut paths = Vec::new();
for i in 0..5 {
let file_path = dir.path().join(format!("test_{}.txt", i));
let file_path = dir.path().join(format!("test_{i}.txt"));
let mut file = std::fs::File::create(&file_path).unwrap();
writeln!(file, "content {}", i).unwrap();
writeln!(file, "content {i}").unwrap();
paths.push(file_path);
}

View File

@@ -2430,7 +2430,7 @@ impl SetDisks {
.map_err(|e| {
let elapsed = start_time.elapsed();
error!("Failed to acquire write lock for heal operation after {:?}: {:?}", elapsed, e);
DiskError::other(format!("Failed to acquire write lock for heal operation: {:?}", e))
DiskError::other(format!("Failed to acquire write lock for heal operation: {e:?}"))
})?;
let elapsed = start_time.elapsed();
info!("Successfully acquired write lock for object: {} in {:?}", object, elapsed);
@@ -3045,7 +3045,7 @@ impl SetDisks {
.fast_lock_manager
.acquire_write_lock("", object, self.locker_owner.as_str())
.await
.map_err(|e| DiskError::other(format!("Failed to acquire write lock for heal directory operation: {:?}", e)))?;
.map_err(|e| DiskError::other(format!("Failed to acquire write lock for heal directory operation: {e:?}")))?;
let disks = {
let disks = self.disks.read().await;
@@ -5594,7 +5594,7 @@ impl StorageAPI for SetDisks {
self.fast_lock_manager
.acquire_write_lock("", object, self.locker_owner.as_str())
.await
.map_err(|e| Error::other(format!("Failed to acquire write lock for heal operation: {:?}", e)))?,
.map_err(|e| Error::other(format!("Failed to acquire write lock for heal operation: {e:?}")))?,
)
} else {
None

View File

@@ -23,7 +23,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Lock system status: {}", if manager.is_disabled() { "DISABLED" } else { "ENABLED" });
match std::env::var("RUSTFS_ENABLE_LOCKS") {
Ok(value) => println!("RUSTFS_ENABLE_LOCKS set to: {}", value),
Ok(value) => println!("RUSTFS_ENABLE_LOCKS set to: {value}"),
Err(_) => println!("RUSTFS_ENABLE_LOCKS not set (defaults to enabled)"),
}
@@ -34,7 +34,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Lock acquired successfully! Disabled: {}", guard.is_disabled());
}
Err(e) => {
println!("Failed to acquire lock: {:?}", e);
println!("Failed to acquire lock: {e:?}");
}
}

View File

@@ -87,7 +87,7 @@ impl LockClient for LocalClient {
current_owner,
current_mode,
}) => Ok(LockResponse::failure(
format!("Lock conflict: resource held by {} in {:?} mode", current_owner, current_mode),
format!("Lock conflict: resource held by {current_owner} in {current_mode:?} mode"),
std::time::Duration::ZERO,
)),
Err(crate::fast_lock::LockResult::Acquired) => {
@@ -131,7 +131,7 @@ impl LockClient for LocalClient {
current_owner,
current_mode,
}) => Ok(LockResponse::failure(
format!("Lock conflict: resource held by {} in {:?} mode", current_owner, current_mode),
format!("Lock conflict: resource held by {current_owner} in {current_mode:?} mode"),
std::time::Duration::ZERO,
)),
Err(crate::fast_lock::LockResult::Acquired) => {

View File

@@ -409,14 +409,14 @@ mod tests {
// Acquire multiple guards and verify unique IDs
let mut guards = Vec::new();
for i in 0..100 {
let object_name = format!("object_{}", i);
let object_name = format!("object_{i}");
let guard = manager
.acquire_write_lock("bucket", object_name.as_str(), "owner")
.await
.expect("Failed to acquire lock");
let guard_id = guard.guard_id();
assert!(guard_ids.insert(guard_id), "Guard ID {} is not unique", guard_id);
assert!(guard_ids.insert(guard_id), "Guard ID {guard_id} is not unique");
guards.push(guard);
}
@@ -501,7 +501,7 @@ mod tests {
let handle = tokio::spawn(async move {
for i in 0..10 {
let object_name = format!("obj_{}_{}", task_id, i);
let object_name = format!("obj_{task_id}_{i}");
// Acquire lock
let mut guard = match manager.acquire_write_lock("bucket", object_name.as_str(), "owner").await {
@@ -535,7 +535,7 @@ mod tests {
let blocked = double_release_blocked.load(Ordering::SeqCst);
// Should have many successful releases and all double releases blocked
assert!(successes > 150, "Expected many successful releases, got {}", successes);
assert!(successes > 150, "Expected many successful releases, got {successes}");
assert_eq!(blocked, successes, "All double releases should be blocked");
// Verify no active guards remain
@@ -567,7 +567,7 @@ mod tests {
// Acquire multiple locks for the same object to ensure they're in the same shard
let mut guards = Vec::new();
for i in 0..10 {
let owner_name = format!("owner_{}", i);
let owner_name = format!("owner_{i}");
let guard = manager
.acquire_read_lock("bucket", "shared_object", owner_name.as_str())
.await
@@ -586,7 +586,7 @@ mod tests {
let cleaned = shard.adaptive_cleanup();
// Should clean very little due to active guards
assert!(cleaned <= 5, "Should be conservative with active guards, cleaned: {}", cleaned);
assert!(cleaned <= 5, "Should be conservative with active guards, cleaned: {cleaned}");
// Locks should be protected by active guards
let remaining_locks = shard.lock_count();
@@ -809,7 +809,7 @@ mod tests {
let manager_clone = manager.clone();
let handle = tokio::spawn(async move {
let _guard = manager_clone
.acquire_read_lock("bucket", format!("object-{}", i), "background")
.acquire_read_lock("bucket", format!("object-{i}"), "background")
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
});
@@ -842,7 +842,7 @@ mod tests {
// High priority should generally perform reasonably well
// This is more of a performance validation than a strict requirement
println!("High priority: {:?}, Normal: {:?}", high_priority_duration, normal_duration);
println!("High priority: {high_priority_duration:?}, Normal: {normal_duration:?}");
// Both operations should complete in reasonable time (less than 100ms in test environment)
// This validates that the priority system isn't causing severe degradation
@@ -858,7 +858,7 @@ mod tests {
let mut _guards = Vec::new();
for i in 0..100 {
if let Ok(guard) = manager
.acquire_write_lock("bucket", format!("load-object-{}", i), "loader")
.acquire_write_lock("bucket", format!("load-object-{i}"), "loader")
.await
{
_guards.push(guard);
@@ -909,8 +909,8 @@ mod tests {
// Try to acquire all locks for this "query"
for obj_id in 0..objects_per_query {
let bucket = "databend";
let object = format!("table_partition_{}_{}", query_id, obj_id);
let owner = format!("query_{}", query_id);
let object = format!("table_partition_{query_id}_{obj_id}");
let owner = format!("query_{query_id}");
match manager_clone.acquire_high_priority_read_lock(bucket, object, owner).await {
Ok(guard) => query_locks.push(guard),
@@ -935,8 +935,8 @@ mod tests {
let manager_clone = manager.clone();
let handle = tokio::spawn(async move {
let bucket = "databend";
let object = format!("write_target_{}", write_id);
let owner = format!("writer_{}", write_id);
let object = format!("write_target_{write_id}");
let owner = format!("writer_{write_id}");
match manager_clone.acquire_write_lock(bucket, object, owner).await {
Ok(_guard) => {
@@ -988,7 +988,7 @@ mod tests {
let handle = tokio::spawn(async move {
let bucket = "test";
let object = format!("extreme_load_object_{}", i % 20); // Force some contention
let owner = format!("user_{}", i);
let owner = format!("user_{i}");
// Mix of read and write locks to create realistic contention
let result = if i % 3 == 0 {
@@ -1066,8 +1066,8 @@ mod tests {
};
let bucket = "datacenter-shared";
let object = format!("table_{}_{}_partition_{}", table_id, client_id, table_idx);
let owner = format!("client_{}_query_{}", client_id, query_id);
let object = format!("table_{table_id}_{client_id}_partition_{table_idx}");
let owner = format!("client_{client_id}_query_{query_id}");
// Mix of operations - mostly reads with some writes
let lock_result = if table_idx == 0 && query_id % 7 == 0 {
@@ -1129,10 +1129,10 @@ mod tests {
}
println!("\n=== Multi-Client Datacenter Simulation Results ===");
println!("Total execution time: {:?}", total_time);
println!("Total clients: {}", num_clients);
println!("Queries per client: {}", queries_per_client);
println!("Total queries executed: {}", total_queries);
println!("Total execution time: {total_time:?}");
println!("Total clients: {num_clients}");
println!("Queries per client: {queries_per_client}");
println!("Total queries executed: {total_queries}");
println!(
"Successful queries: {} ({:.1}%)",
successful_queries,
@@ -1179,8 +1179,7 @@ mod tests {
// Performance assertion - should complete in reasonable time
assert!(
total_time < std::time::Duration::from_secs(120),
"Multi-client test took too long: {:?}",
total_time
"Multi-client test took too long: {total_time:?}"
);
}
@@ -1209,8 +1208,8 @@ mod tests {
client_attempts += 1;
let bucket = "hot-data";
let object = format!("popular_table_{}", obj_id);
let owner = format!("thundering_client_{}", client_id);
let object = format!("popular_table_{obj_id}");
let owner = format!("thundering_client_{client_id}");
// Simulate different access patterns
let result = match obj_id % 3 {
@@ -1265,12 +1264,12 @@ mod tests {
let success_rate = total_successes as f64 / total_attempts as f64;
println!("\n=== Thundering Herd Scenario Results ===");
println!("Concurrent clients: {}", num_concurrent_clients);
println!("Hot objects: {}", hot_objects);
println!("Total attempts: {}", total_attempts);
println!("Total successes: {}", total_successes);
println!("Concurrent clients: {num_concurrent_clients}");
println!("Hot objects: {hot_objects}");
println!("Total attempts: {total_attempts}");
println!("Total successes: {total_successes}");
println!("Success rate: {:.1}%", success_rate * 100.0);
println!("Total time: {:?}", total_time);
println!("Total time: {total_time:?}");
println!(
"Average time per operation: {:.1}ms",
total_time.as_millis() as f64 / total_attempts as f64
@@ -1286,8 +1285,7 @@ mod tests {
// Should handle this volume in reasonable time
assert!(
total_time < std::time::Duration::from_secs(180),
"Thundering herd test took too long: {:?}",
total_time
"Thundering herd test took too long: {total_time:?}"
);
}
@@ -1314,7 +1312,7 @@ mod tests {
for op_id in 0..operations_per_transaction {
let bucket = "oltp-data";
let object = format!("record_{}_{}", oltp_id * 10 + tx_id, op_id);
let owner = format!("oltp_{}_{}", oltp_id, tx_id);
let owner = format!("oltp_{oltp_id}_{tx_id}");
// OLTP is mostly writes
let result = manager_clone.acquire_write_lock(bucket, object, owner).await;
@@ -1358,7 +1356,7 @@ mod tests {
table_id % 20, // Some overlap between queries
query_id
);
let owner = format!("olap_{}_{}", olap_id, query_id);
let owner = format!("olap_{olap_id}_{query_id}");
// OLAP is mostly reads with high priority
let result = manager_clone.acquire_critical_read_lock(bucket, object, owner).await;
@@ -1408,7 +1406,7 @@ mod tests {
let olap_success_rate = olap_successes as f64 / olap_attempts as f64;
println!("\n=== Mixed Workload Stress Test Results ===");
println!("Total time: {:?}", total_time);
println!("Total time: {total_time:?}");
println!(
"OLTP: {}/{} transactions succeeded ({:.1}%)",
oltp_successes,

View File

@@ -152,14 +152,14 @@ pub mod performance_comparison {
for i in 0..1000 {
let guard = fast_manager
.acquire_write_lock("bucket", format!("object_{}", i), owner)
.acquire_write_lock("bucket", format!("object_{i}"), owner)
.await
.expect("Failed to acquire fast lock");
guards.push(guard);
}
let fast_duration = start.elapsed();
println!("Fast lock: 1000 acquisitions in {:?}", fast_duration);
println!("Fast lock: 1000 acquisitions in {fast_duration:?}");
// Release all
drop(guards);

View File

@@ -27,7 +27,7 @@ mod tests {
let mut guards = Vec::new();
for i in 0..100 {
let bucket = format!("test-bucket-{}", i % 10); // Reuse some bucket names
let object = format!("test-object-{}", i);
let object = format!("test-object-{i}");
let guard = manager
.acquire_write_lock(bucket.as_str(), object.as_str(), "test-owner")
@@ -53,10 +53,7 @@ mod tests {
0.0
};
println!(
"Pool stats - Hits: {}, Misses: {}, Releases: {}, Pool size: {}",
hits, misses, releases, pool_size
);
println!("Pool stats - Hits: {hits}, Misses: {misses}, Releases: {releases}, Pool size: {pool_size}");
println!("Hit rate: {:.2}%", hit_rate * 100.0);
// We should see some pool activity
@@ -82,7 +79,7 @@ mod tests {
.expect("Failed to acquire second read lock");
let duration = start.elapsed();
println!("Two read locks on different objects took: {:?}", duration);
println!("Two read locks on different objects took: {duration:?}");
// Should be very fast since no contention
assert!(duration < Duration::from_millis(10), "Read locks should be fast with no contention");
@@ -103,7 +100,7 @@ mod tests {
.expect("Failed to acquire second read lock on same object");
let duration = start.elapsed();
println!("Two read locks on same object took: {:?}", duration);
println!("Two read locks on same object took: {duration:?}");
// Should still be fast since read locks are compatible
assert!(duration < Duration::from_millis(10), "Compatible read locks should be fast");
@@ -132,7 +129,7 @@ mod tests {
.expect("Failed to acquire second read lock");
let second_duration = start.elapsed();
println!("First lock: {:?}, Second lock: {:?}", first_duration, second_duration);
println!("First lock: {first_duration:?}, Second lock: {second_duration:?}");
// Both should be very fast (sub-millisecond typically)
assert!(first_duration < Duration::from_millis(10));
@@ -157,7 +154,7 @@ mod tests {
let result = manager.acquire_locks_batch(batch).await;
let duration = start.elapsed();
println!("Batch operation took: {:?}", duration);
println!("Batch operation took: {duration:?}");
assert!(result.all_acquired, "All locks should be acquired");
assert_eq!(result.successful_locks.len(), 3);

View File

@@ -40,7 +40,7 @@ rustfs-config = { workspace = true, features = ["constants", "observability"] }
rustfs-utils = { workspace = true, features = ["ip", "path"] }
async-trait = { workspace = true }
chrono = { workspace = true }
flexi_logger = { workspace = true, features = ["trc", "kv"] }
flexi_logger = { workspace = true }
nu-ansi-term = { workspace = true }
nvml-wrapper = { workspace = true, optional = true }
opentelemetry = { workspace = true }
@@ -62,6 +62,7 @@ serde_json = { workspace = true }
sysinfo = { workspace = true }
thiserror = { workspace = true }
# Only enable kafka features and related dependencies on Linux
[target.'cfg(target_os = "linux")'.dependencies]
rdkafka = { workspace = true, features = ["tokio"], optional = true }

View File

@@ -21,12 +21,57 @@
## ✨ Features
- **Environment-Aware Logging**: Automatically configures logging behavior based on deployment environment
- Production: File-only logging (stdout disabled by default for security and log aggregation)
- Development/Test: Full logging with stdout support for debugging
- OpenTelemetry integration for distributed tracing
- Prometheus metrics collection and exposition
- Structured logging with configurable levels
- Structured logging with configurable levels and rotation
- Performance profiling and analytics
- Real-time health checks and status monitoring
- Custom dashboards and alerting integration
- Enhanced error handling and resilience
## 🚀 Environment-Aware Logging
The obs module automatically adapts logging behavior based on your deployment environment:
### Production Environment
```bash
# Set production environment - disables stdout logging by default
export RUSTFS_OBS_ENVIRONMENT=production
# All logs go to files only (no stdout) for security and log aggregation
# Enhanced error handling with clear failure diagnostics
```
### Development/Test Environment
```bash
# Set development environment - enables stdout logging
export RUSTFS_OBS_ENVIRONMENT=development
# Logs appear both in files and stdout for easier debugging
# Full span tracking and verbose error messages
```
### Configuration Override
You can always override the environment defaults:
```rust
use rustfs_obs::OtelConfig;
let config = OtelConfig {
endpoint: "".to_string(),
use_stdout: Some(true), // Explicit override - forces stdout even in production
environment: Some("production".to_string()),
..Default::default()
};
```
### Supported Environment Values
- `production` - Secure file-only logging
- `development` - Full debugging with stdout
- `test` - Test environment with stdout support
- `staging` - Staging environment with stdout support
## 📚 Documentation

View File

@@ -13,15 +13,19 @@
// limitations under the License.
use crate::OtelConfig;
use flexi_logger::{Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode, style};
use flexi_logger::{
Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode,
WriteMode::{AsyncWith, BufferAndFlush},
style,
};
use nu_ansi_term::Color;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{KeyValue, global};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::{
Resource,
logs::SdkLoggerProvider,
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
};
@@ -29,15 +33,19 @@ use opentelemetry_semantic_conventions::{
SCHEMA_URL,
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
};
use rustfs_config::observability::ENV_OBS_LOG_DIRECTORY;
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,
observability::{
DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_POOL_CAPA,
ENV_OBS_LOG_DIRECTORY,
},
};
use rustfs_utils::get_local_ip_with_default;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::fs;
use std::io::IsTerminal;
use std::time::Duration;
use std::{env, fs};
use tracing::info;
use tracing_error::ErrorLayer;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
@@ -121,7 +129,7 @@ fn resource(config: &OtelConfig) -> Resource {
/// Creates a periodic reader for stdout metrics
fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout::MetricExporter> {
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default())
.with_interval(std::time::Duration::from_secs(interval))
.with_interval(Duration::from_secs(interval))
.build()
}
@@ -129,11 +137,23 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout:
pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
// avoid repeated access to configuration fields
let endpoint = &config.endpoint;
let use_stdout = config.use_stdout.unwrap_or(USE_STDOUT);
let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT);
// Environment-aware stdout configuration
// Check for explicit environment control via RUSTFS_OBS_ENVIRONMENT
let is_production = environment.to_lowercase() == DEFAULT_OBS_ENVIRONMENT_PRODUCTION;
// Default stdout behavior based on environment
let default_use_stdout = if is_production {
false // Disable stdout in production for security and log aggregation
} else {
USE_STDOUT // Use configured default for dev/test environments
};
let use_stdout = config.use_stdout.unwrap_or(default_use_stdout);
let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL);
let logger_level = config.logger_level.as_deref().unwrap_or(DEFAULT_LOG_LEVEL);
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME);
let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT);
// Configure flexi_logger to cut by time and size
let mut flexi_logger_handle = None;
@@ -144,7 +164,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
// initialize tracer provider
let tracer_provider = {
let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO);
let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 {
let sampler = if (0.0..1.0).contains(&sample_ratio) {
Sampler::TraceIdRatioBased(sample_ratio)
} else {
Sampler::AlwaysOn
@@ -197,7 +217,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(meter_interval))
.with_interval(Duration::from_secs(meter_interval))
.build(),
);
@@ -249,7 +269,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
.with_line_number(true);
// Only add full span events tracking in the development environment
if environment != ENVIRONMENT {
if !is_production {
layer = layer.with_span_events(FmtSpan::FULL);
}
@@ -257,8 +277,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
};
let filter = build_env_filter(logger_level, None);
let otel_filter = build_env_filter(logger_level, None);
let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(otel_filter);
let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(build_env_filter(logger_level, None));
let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string());
// Configure registry to avoid repeated calls to filter methods
@@ -280,78 +299,96 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}",
endpoint,
logger_level,
std::env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string())
env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string())
);
}
}
OtelGuard {
return OtelGuard {
tracer_provider: Some(tracer_provider),
meter_provider: Some(meter_provider),
logger_provider: Some(logger_provider),
_flexi_logger_handles: flexi_logger_handle,
}
} else {
// Obtain the log directory and file name configuration
let default_log_directory = rustfs_utils::dirs::get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY);
let log_directory = config.log_directory.as_deref().unwrap_or(default_log_directory.as_str());
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
if let Err(e) = fs::create_dir_all(log_directory) {
eprintln!("Failed to create log directory {log_directory}: {e}");
}
#[cfg(unix)]
{
// Linux/macOS Setting Permissions
// Set the log directory permissions to 755 (rwxr-xr-x)
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
match fs::set_permissions(log_directory, Permissions::from_mode(0o755)) {
Ok(_) => eprintln!("Log directory permissions set to 755: {log_directory}"),
Err(e) => eprintln!("Failed to set log directory permissions {log_directory}: {e}"),
}
}
// Build log cutting conditions
let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
// Cut by time and size at the same time
(Some(time), Some(size)) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::AgeOrSize(age, size * 1024 * 1024) // Convert to bytes
}
// Cut by time only
(Some(time), None) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::Age(age)
}
// Cut by size only
(None, Some(size)) => {
Criterion::Size(size * 1024 * 1024) // Convert to bytes
}
// By default, it is cut by the day
_ => Criterion::Age(Age::Day),
};
}
// The number of log files retained
let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES);
// Obtain the log directory and file name configuration
let default_log_directory = rustfs_utils::dirs::get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY);
let log_directory = config.log_directory.as_deref().unwrap_or(default_log_directory.as_str());
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
// Parsing the log level
let log_spec = LogSpecification::parse(logger_level).unwrap_or(LogSpecification::info());
// Enhanced error handling for directory creation
if let Err(e) = fs::create_dir_all(log_directory) {
eprintln!("ERROR: Failed to create log directory '{log_directory}': {e}");
eprintln!("Ensure the parent directory exists and you have write permissions.");
eprintln!("Attempting to continue with logging, but file logging may fail.");
} else {
eprintln!("Log directory ready: {log_directory}");
}
// Convert the logger_level string to the corresponding LevelFilter
let level_filter = match logger_level.to_lowercase().as_str() {
#[cfg(unix)]
{
// Linux/macOS Setting Permissions with better error handling
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
match fs::set_permissions(log_directory, Permissions::from_mode(0o755)) {
Ok(_) => eprintln!("Log directory permissions set to 755: {log_directory}"),
Err(e) => {
eprintln!("WARNING: Failed to set log directory permissions for '{log_directory}': {e}");
eprintln!("This may affect log file access. Consider checking directory ownership and permissions.");
}
}
}
// Build log cutting conditions
let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
// Cut by time and size at the same time
(Some(time), Some(size)) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::AgeOrSize(age, size * 1024 * 1024) // Convert to bytes
}
// Cut by time only
(Some(time), None) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::Age(age)
}
// Cut by size only
(None, Some(size)) => {
Criterion::Size(size * 1024 * 1024) // Convert to bytes
}
// By default, it is cut by the day
_ => Criterion::Age(Age::Day),
};
// The number of log files retained
let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES);
// Parsing the log level
let log_spec = LogSpecification::parse(logger_level).unwrap_or_else(|e| {
eprintln!("WARNING: Invalid logger level '{logger_level}': {e}. Using default 'info' level.");
LogSpecification::info()
});
// Environment-aware stdout configuration
// In production: disable stdout completely (Duplicate::None)
// In development/test: use level-based filtering
let level_filter = if is_production {
flexi_logger::Duplicate::None // No stdout output in production
} else {
// Convert the logger_level string to the corresponding LevelFilter for dev/test
match logger_level.to_lowercase().as_str() {
"trace" => flexi_logger::Duplicate::Trace,
"debug" => flexi_logger::Duplicate::Debug,
"info" => flexi_logger::Duplicate::Info,
@@ -359,56 +396,114 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
"error" => flexi_logger::Duplicate::Error,
"off" => flexi_logger::Duplicate::None,
_ => flexi_logger::Duplicate::Info, // the default is info
};
}
};
// Configure the flexi_logger
let flexi_logger_result = flexi_logger::Logger::try_with_env_or_str(logger_level)
.unwrap_or_else(|e| {
eprintln!("Invalid logger level: {logger_level}, using default: {DEFAULT_LOG_LEVEL}, failed error: {e:?}");
flexi_logger::Logger::with(log_spec.clone())
})
.log_to_file(
FileSpec::default()
.directory(log_directory)
.basename(log_filename)
.suppress_timestamp(),
)
.rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files.into()))
.format_for_files(format_for_file) // Add a custom formatting function for file output
.duplicate_to_stdout(level_filter) // Use dynamic levels
.format_for_stdout(format_with_color) // Add a custom formatting function for terminal output
.write_mode(WriteMode::BufferAndFlush)
.append() // Avoid clearing existing logs at startup
.print_message() // Startup information output to console
.start();
if let Ok(logger) = flexi_logger_result {
// Save the logger handle to keep the logging
flexi_logger_handle = Some(logger);
eprintln!("Flexi logger initialized with file logging to {log_directory}/{log_filename}.log");
// Log logging of log cutting conditions
match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
(Some(time), Some(size)) => eprintln!(
"Log rotation configured for: every {time} or when size exceeds {size}MB, keeping {keep_files} files"
),
(Some(time), None) => eprintln!("Log rotation configured for: every {time}, keeping {keep_files} files"),
(None, Some(size)) => {
eprintln!("Log rotation configured for: when size exceeds {size}MB, keeping {keep_files} files")
}
_ => eprintln!("Log rotation configured for: daily, keeping {keep_files} files"),
// Choose write mode based on environment
let write_mode = if is_production {
get_env_async_with().unwrap_or_else(|| {
eprintln!(
"Using default Async write mode in production. To customize, set RUSTFS_OBS_LOG_POOL_CAPA, RUSTFS_OBS_LOG_MESSAGE_CAPA, and RUSTFS_OBS_LOG_FLUSH_MS environment variables."
);
AsyncWith {
pool_capa: DEFAULT_OBS_LOG_POOL_CAPA,
message_capa: DEFAULT_OBS_LOG_MESSAGE_CAPA,
flush_interval: Duration::from_millis(DEFAULT_OBS_LOG_FLUSH_MS),
}
})
} else {
BufferAndFlush
};
// Configure the flexi_logger with enhanced error handling
let mut flexi_logger_builder = flexi_logger::Logger::try_with_env_or_str(logger_level)
.unwrap_or_else(|e| {
eprintln!("WARNING: Invalid logger configuration '{logger_level}': {e:?}");
eprintln!("Falling back to default configuration with level: {DEFAULT_LOG_LEVEL}");
flexi_logger::Logger::with(log_spec.clone())
})
.log_to_file(
FileSpec::default()
.directory(log_directory)
.basename(log_filename)
.suppress_timestamp(),
)
.rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files.into()))
.format_for_files(format_for_file) // Add a custom formatting function for file output
.write_mode(write_mode)
.append(); // Avoid clearing existing logs at startup
// Environment-aware stdout configuration
flexi_logger_builder = flexi_logger_builder.duplicate_to_stdout(level_filter);
// Only add stdout formatting and startup messages in non-production environments
if !is_production {
flexi_logger_builder = flexi_logger_builder
.format_for_stdout(format_with_color) // Add a custom formatting function for terminal output
.print_message(); // Startup information output to console
}
let flexi_logger_result = flexi_logger_builder.start();
if let Ok(logger) = flexi_logger_result {
// Save the logger handle to keep the logging
flexi_logger_handle = Some(logger);
// Environment-aware success messages
if is_production {
eprintln!("Production logging initialized: file-only mode to {log_directory}/{log_filename}.log");
eprintln!("Stdout logging disabled in production environment for security and log aggregation.");
} else {
eprintln!("Failed to initialize flexi_logger: {:?}", flexi_logger_result.err());
eprintln!("Development/Test logging initialized with file logging to {log_directory}/{log_filename}.log");
eprintln!("Stdout logging enabled for debugging. Environment: {environment}");
}
OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
_flexi_logger_handles: flexi_logger_handle,
// Log rotation configuration details
match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
(Some(time), Some(size)) => {
eprintln!("Log rotation configured for: every {time} or when size exceeds {size}MB, keeping {keep_files} files")
}
(Some(time), None) => eprintln!("Log rotation configured for: every {time}, keeping {keep_files} files"),
(None, Some(size)) => {
eprintln!("Log rotation configured for: when size exceeds {size}MB, keeping {keep_files} files")
}
_ => eprintln!("Log rotation configured for: daily, keeping {keep_files} files"),
}
} else {
eprintln!("CRITICAL: Failed to initialize flexi_logger: {:?}", flexi_logger_result.err());
eprintln!("Possible causes:");
eprintln!(" 1. Insufficient permissions to write to log directory: {log_directory}");
eprintln!(" 2. Log directory does not exist or is not accessible");
eprintln!(" 3. Invalid log configuration parameters");
eprintln!(" 4. Disk space issues");
eprintln!("Application will continue but logging to files will not work properly.");
}
OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
_flexi_logger_handles: flexi_logger_handle,
}
}
// Read the AsyncWith parameter from the environment variable
fn get_env_async_with() -> Option<WriteMode> {
let pool_capa = env::var("RUSTFS_OBS_LOG_POOL_CAPA")
.ok()
.and_then(|v| v.parse::<usize>().ok());
let message_capa = env::var("RUSTFS_OBS_LOG_MESSAGE_CAPA")
.ok()
.and_then(|v| v.parse::<usize>().ok());
let flush_ms = env::var("RUSTFS_OBS_LOG_FLUSH_MS").ok().and_then(|v| v.parse::<u64>().ok());
match (pool_capa, message_capa, flush_ms) {
(Some(pool), Some(msg), Some(flush)) => Some(AsyncWith {
pool_capa: pool,
message_capa: msg,
flush_interval: Duration::from_millis(flush),
}),
_ => None,
}
}
@@ -473,3 +568,140 @@ fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &R
record.args()
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_production_environment_detection() {
// Test production environment logic
let production_envs = vec!["production", "PRODUCTION", "Production"];
for env_value in production_envs {
let is_production = env_value.to_lowercase() == "production";
assert!(is_production, "Should detect '{env_value}' as production environment");
}
}
#[test]
fn test_non_production_environment_detection() {
// Test non-production environment logic
let non_production_envs = vec!["development", "test", "staging", "dev", "local"];
for env_value in non_production_envs {
let is_production = env_value.to_lowercase() == "production";
assert!(!is_production, "Should not detect '{env_value}' as production environment");
}
}
#[test]
fn test_stdout_behavior_logic() {
// Test the stdout behavior logic without environment manipulation
struct TestCase {
is_production: bool,
config_use_stdout: Option<bool>,
expected_use_stdout: bool,
description: &'static str,
}
let test_cases = vec![
TestCase {
is_production: true,
config_use_stdout: None,
expected_use_stdout: false,
description: "Production with no config should disable stdout",
},
TestCase {
is_production: false,
config_use_stdout: None,
expected_use_stdout: USE_STDOUT,
description: "Non-production with no config should use default",
},
TestCase {
is_production: true,
config_use_stdout: Some(true),
expected_use_stdout: true,
description: "Production with explicit true should enable stdout",
},
TestCase {
is_production: true,
config_use_stdout: Some(false),
expected_use_stdout: false,
description: "Production with explicit false should disable stdout",
},
TestCase {
is_production: false,
config_use_stdout: Some(true),
expected_use_stdout: true,
description: "Non-production with explicit true should enable stdout",
},
];
for case in test_cases {
let default_use_stdout = if case.is_production { false } else { USE_STDOUT };
let actual_use_stdout = case.config_use_stdout.unwrap_or(default_use_stdout);
assert_eq!(actual_use_stdout, case.expected_use_stdout, "Test case failed: {}", case.description);
}
}
#[test]
fn test_log_level_filter_mapping_logic() {
// Test the log level mapping logic used in the real implementation
let test_cases = vec![
("trace", "Trace"),
("debug", "Debug"),
("info", "Info"),
("warn", "Warn"),
("warning", "Warn"),
("error", "Error"),
("off", "None"),
("invalid_level", "Info"), // Should default to Info
];
for (input_level, expected_variant) in test_cases {
let filter_variant = match input_level.to_lowercase().as_str() {
"trace" => "Trace",
"debug" => "Debug",
"info" => "Info",
"warn" | "warning" => "Warn",
"error" => "Error",
"off" => "None",
_ => "Info", // default case
};
assert_eq!(
filter_variant, expected_variant,
"Log level '{input_level}' should map to '{expected_variant}'"
);
}
}
#[test]
fn test_otel_config_environment_defaults() {
// Test that OtelConfig properly handles environment detection logic
let config = OtelConfig {
endpoint: "".to_string(),
use_stdout: None,
environment: Some("production".to_string()),
..Default::default()
};
// Simulate the logic from init_telemetry
let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT);
assert_eq!(environment, "production");
// Test with development environment
let dev_config = OtelConfig {
endpoint: "".to_string(),
use_stdout: None,
environment: Some("development".to_string()),
..Default::default()
};
let dev_environment = dev_config.environment.as_deref().unwrap_or(ENVIRONMENT);
assert_eq!(dev_environment, "development");
}
}

View File

@@ -173,6 +173,7 @@ impl PartialEq for Functions {
}
#[derive(Clone, Serialize, Deserialize)]
#[allow(dead_code)]
pub struct Value;
#[cfg(test)]

View File

@@ -19,19 +19,10 @@ use futures::pin_mut;
use futures::{Stream, StreamExt};
use futures_core::stream::BoxStream;
use http::HeaderMap;
use object_store::Attributes;
use object_store::GetOptions;
use object_store::GetResult;
use object_store::ListResult;
use object_store::MultipartUpload;
use object_store::ObjectMeta;
use object_store::ObjectStore;
use object_store::PutMultipartOpts;
use object_store::PutOptions;
use object_store::PutPayload;
use object_store::PutResult;
use object_store::path::Path;
use object_store::{Error as o_Error, Result};
use object_store::{
Attributes, Error as o_Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, path::Path,
};
use pin_project_lite::pin_project;
use rustfs_common::DEFAULT_DELIMITER;
use rustfs_ecstore::StorageAPI;
@@ -102,7 +93,7 @@ impl ObjectStore for EcObjectStore {
unimplemented!()
}
async fn put_multipart_opts(&self, _location: &Path, _opts: PutMultipartOpts) -> Result<Box<dyn MultipartUpload>> {
async fn put_multipart_opts(&self, _location: &Path, _opts: PutMultipartOptions) -> Result<Box<dyn MultipartUpload>> {
unimplemented!()
}
@@ -122,7 +113,7 @@ impl ObjectStore for EcObjectStore {
let meta = ObjectMeta {
location: location.clone(),
last_modified: Utc::now(),
size: reader.object_info.size as usize,
size: reader.object_info.size as u64,
e_tag: reader.object_info.etag,
version: None,
};
@@ -151,12 +142,12 @@ impl ObjectStore for EcObjectStore {
Ok(GetResult {
payload,
meta,
range: 0..reader.object_info.size as usize,
range: 0..reader.object_info.size as u64,
attributes,
})
}
async fn get_ranges(&self, _location: &Path, _ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
async fn get_ranges(&self, _location: &Path, _ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
unimplemented!()
}
@@ -175,7 +166,7 @@ impl ObjectStore for EcObjectStore {
Ok(ObjectMeta {
location: location.clone(),
last_modified: Utc::now(),
size: info.size as usize,
size: info.size as u64,
e_tag: info.etag,
version: None,
})
@@ -185,7 +176,7 @@ impl ObjectStore for EcObjectStore {
unimplemented!()
}
fn list(&self, _prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, _prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
unimplemented!()
}

View File

@@ -61,7 +61,7 @@ impl MetadataProvider {
Self {
provider,
current_session_table_provider,
config_options: session.inner().config_options().clone(),
config_options: session.inner().config_options().as_ref().clone(),
session,
func_manager,
}

View File

@@ -26,7 +26,6 @@ use datafusion::{
propagate_empty_relation::PropagateEmptyRelation, push_down_filter::PushDownFilter, push_down_limit::PushDownLimit,
replace_distinct_aggregate::ReplaceDistinctWithAggregate, scalar_subquery_to_join::ScalarSubqueryToJoin,
simplify_expressions::SimplifyExpressions, single_distinct_to_groupby::SingleDistinctToGroupBy,
unwrap_cast_in_comparison::UnwrapCastInComparison,
},
};
use rustfs_s3select_api::{
@@ -66,7 +65,6 @@ impl Default for DefaultLogicalOptimizer {
let rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> = vec![
// df default rules start
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(ReplaceDistinctWithAggregate::new()),
Arc::new(EliminateJoin::new()),
Arc::new(DecorrelatePredicateSubquery::new()),
@@ -91,7 +89,6 @@ impl Default for DefaultLogicalOptimizer {
// The previous optimizations added expressions and projections,
// that might benefit from the following rules
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Arc::new(CommonSubexprEliminate::new()),
// PushDownProjection can pushdown Projections through Limits, do PushDownLimit again.
Arc::new(PushDownLimit::new()),

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
//! Layered DNS resolution utility for Kubernetes environments
//!
//! This module provides robust DNS resolution with multiple fallback layers:
@@ -396,7 +398,7 @@ mod tests {
// Test cache stats (note: moka cache might not immediately reflect changes)
let (total, _weighted_size) = resolver.cache_stats().await;
// Cache should have at least the entry we just added (might be 0 due to async nature)
assert!(total <= 1, "Cache should have at most 1 entry, got {}", total);
assert!(total <= 1, "Cache should have at most 1 entry, got {total}");
}
#[tokio::test]
@@ -407,12 +409,12 @@ mod tests {
match resolver.resolve("localhost").await {
Ok(ips) => {
assert!(!ips.is_empty());
println!("Resolved localhost to: {:?}", ips);
println!("Resolved localhost to: {ips:?}");
}
Err(e) => {
// In some test environments, even localhost might fail
// This is acceptable as long as our error handling works
println!("DNS resolution failed (might be expected in test environments): {}", e);
println!("DNS resolution failed (might be expected in test environments): {e}");
}
}
}
@@ -428,7 +430,7 @@ mod tests {
assert!(result.is_err());
if let Err(e) = result {
println!("Expected error for invalid domain: {}", e);
println!("Expected error for invalid domain: {e}");
// Should be AllAttemptsFailed since both system and public DNS should fail
assert!(matches!(e, DnsError::AllAttemptsFailed { .. }));
}
@@ -464,10 +466,10 @@ mod tests {
match resolve_domain("localhost").await {
Ok(ips) => {
assert!(!ips.is_empty());
println!("Global resolver resolved localhost to: {:?}", ips);
println!("Global resolver resolved localhost to: {ips:?}");
}
Err(e) => {
println!("Global resolver DNS resolution failed (might be expected in test environments): {}", e);
println!("Global resolver DNS resolution failed (might be expected in test environments): {e}");
}
}
}

View File

@@ -118,17 +118,17 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> std::io::R
pub async fn get_host_ip(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
match host {
Host::Domain(domain) => {
match crate::dns_resolver::resolve_domain(domain).await {
Ok(ips) => {
info!("Resolved domain {domain} using custom DNS resolver: {ips:?}");
return Ok(ips.into_iter().collect());
}
Err(err) => {
error!(
"Failed to resolve domain {domain} using custom DNS resolver, falling back to system resolver,err: {err}"
);
}
}
// match crate::dns_resolver::resolve_domain(domain).await {
// Ok(ips) => {
// info!("Resolved domain {domain} using custom DNS resolver: {ips:?}");
// return Ok(ips.into_iter().collect());
// }
// Err(err) => {
// error!(
// "Failed to resolve domain {domain} using custom DNS resolver, falling back to system resolver,err: {err}"
// );
// }
// }
// Check cache first
if let Ok(mut cache) = DNS_CACHE.lock() {
if let Some(entry) = cache.get(domain) {
@@ -430,7 +430,7 @@ mod test {
match get_host_ip(invalid_host.clone()).await {
Ok(ips) => {
// Depending on DNS resolver behavior, it might return empty set or error
assert!(ips.is_empty(), "Expected empty IP set for invalid domain, got: {:?}", ips);
assert!(ips.is_empty(), "Expected empty IP set for invalid domain, got: {ips:?}");
}
Err(_) => {
error!("Expected error for invalid domain");

View File

@@ -40,6 +40,7 @@ services:
- RUSTFS_ACCESS_KEY=rustfsadmin
- RUSTFS_SECRET_KEY=rustfsadmin
- RUSTFS_LOG_LEVEL=info
- RUSTFS_TLS_PATH=/opt/tls
- RUSTFS_OBS_ENDPOINT=http://otel-collector:4317
volumes:
- rustfs_data_0:/data/rustfs0
@@ -47,6 +48,7 @@ services:
- rustfs_data_2:/data/rustfs2
- rustfs_data_3:/data/rustfs3
- logs_data:/app/logs
- .docker/tls/:/opt/tls # TLS configuration, you should create tls directory and put your tls files in it and then specify the path here
networks:
- rustfs-network
restart: unless-stopped

View File

@@ -12,49 +12,68 @@ elif [ "$1" = "rustfs" ]; then
set -- /usr/bin/rustfs "$@"
fi
# 2) Parse and create local mount directories (ignore http/https), ensure /logs is included
VOLUME_RAW="${RUSTFS_VOLUMES:-/data}"
# Convert comma/tab to space
VOLUME_LIST=$(echo "$VOLUME_RAW" | tr ',\t' ' ')
LOCAL_VOLUMES=""
for vol in $VOLUME_LIST; do
case "$vol" in
/*)
case "$vol" in
http://*|https://*) : ;;
*) LOCAL_VOLUMES="$LOCAL_VOLUMES $vol" ;;
esac
;;
*)
: # skip non-local paths
;;
esac
done
# Ensure /logs is included
case " $LOCAL_VOLUMES " in
*" /logs "*) : ;;
*) LOCAL_VOLUMES="$LOCAL_VOLUMES /logs" ;;
esac
# 2) Process data volumes (separate from log directory)
DATA_VOLUMES=""
process_data_volumes() {
VOLUME_RAW="${RUSTFS_VOLUMES:-/data}"
# Convert comma/tab to space
VOLUME_LIST=$(echo "$VOLUME_RAW" | tr ',\t' ' ')
for vol in $VOLUME_LIST; do
case "$vol" in
/*)
case "$vol" in
http://*|https://*) : ;;
*) DATA_VOLUMES="$DATA_VOLUMES $vol" ;;
esac
;;
*)
: # skip non-local paths
;;
esac
done
echo "Initializing data directories:$DATA_VOLUMES"
for vol in $DATA_VOLUMES; do
if [ ! -d "$vol" ]; then
echo " mkdir -p $vol"
mkdir -p "$vol"
# If target user is specified, try to set directory owner to that user (non-recursive to avoid large disk overhead)
if [ -n "$RUSTFS_UID" ] && [ -n "$RUSTFS_GID" ]; then
chown "$RUSTFS_UID:$RUSTFS_GID" "$vol" 2>/dev/null || true
elif [ -n "$RUSTFS_USERNAME" ] && [ -n "$RUSTFS_GROUPNAME" ]; then
chown "$RUSTFS_USERNAME:$RUSTFS_GROUPNAME" "$vol" 2>/dev/null || true
fi
fi
done
}
echo "Initializing mount directories:$LOCAL_VOLUMES"
for vol in $LOCAL_VOLUMES; do
if [ ! -d "$vol" ]; then
echo " mkdir -p $vol"
mkdir -p "$vol"
# 3) Process log directory (separate from data volumes)
process_log_directory() {
LOG_DIR="${RUSTFS_OBS_LOG_DIRECTORY:-/logs}"
echo "Initializing log directory: $LOG_DIR"
if [ ! -d "$LOG_DIR" ]; then
echo " mkdir -p $LOG_DIR"
mkdir -p "$LOG_DIR"
# If target user is specified, try to set directory owner to that user (non-recursive to avoid large disk overhead)
if [ -n "$RUSTFS_UID" ] && [ -n "$RUSTFS_GID" ]; then
chown "$RUSTFS_UID:$RUSTFS_GID" "$vol" 2>/dev/null || true
chown "$RUSTFS_UID:$RUSTFS_GID" "$LOG_DIR" 2>/dev/null || true
elif [ -n "$RUSTFS_USERNAME" ] && [ -n "$RUSTFS_GROUPNAME" ]; then
chown "$RUSTFS_USERNAME:$RUSTFS_GROUPNAME" "$vol" 2>/dev/null || true
chown "$RUSTFS_USERNAME:$RUSTFS_GROUPNAME" "$LOG_DIR" 2>/dev/null || true
fi
fi
done
}
# 3) Default credentials warning
# Execute the separate processes
process_data_volumes
process_log_directory
# 4) Default credentials warning
if [ "${RUSTFS_ACCESS_KEY}" = "rustfsadmin" ] || [ "${RUSTFS_SECRET_KEY}" = "rustfsadmin" ]; then
echo "!!!WARNING: Using default RUSTFS_ACCESS_KEY or RUSTFS_SECRET_KEY. Override them in production!"
fi
echo "Starting: $*"
set -- "$@" $LOCAL_VOLUMES
set -- "$@" $DATA_VOLUMES
exec "$@"

View File

@@ -51,7 +51,6 @@ rustfs-obs = { workspace = true }
rustfs-utils = { workspace = true, features = ["full"] }
rustfs-protos = { workspace = true }
rustfs-s3select-query = { workspace = true }
rustfs-audit-logger = { workspace = true }
rustfs-targets = { workspace = true }
atoi = { workspace = true }
atomic_enum = { workspace = true }
@@ -122,6 +121,9 @@ libsystemd.workspace = true
[target.'cfg(all(target_os = "linux", target_env = "gnu"))'.dependencies]
tikv-jemallocator = "0.6"
[target.'cfg(all(target_os = "linux", target_env = "musl"))'.dependencies]
mimalloc = "0.1"
[target.'cfg(not(target_os = "windows"))'.dependencies]
pprof = { version = "0.15.0", features = ["flamegraph", "protobuf-codec"] }

View File

@@ -241,7 +241,7 @@ pub async fn config_handler(uri: Uri, Host(host): Host, headers: HeaderMap) -> i
if ip.is_ipv6() { format!("[{ip}]") } else { format!("{ip}") }
} else if let Ok(ip) = raw_host.parse::<IpAddr>() {
// Pure IP (no ports)
if ip.is_ipv6() { format!("[{}]", ip) } else { ip.to_string() }
if ip.is_ipv6() { format!("[{ip}]") } else { ip.to_string() }
} else {
// The domain name may not be able to resolve directly to IP, remove the port
raw_host.split(':').next().unwrap_or(raw_host).to_string()

View File

@@ -1322,7 +1322,7 @@ impl Operation for ProfileHandler {
error!("Failed to build profiler report: {}", e);
return Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from(format!("Failed to build profile report: {}", e)),
Body::from(format!("Failed to build profile report: {e}")),
)));
}
};
@@ -1353,7 +1353,7 @@ impl Operation for ProfileHandler {
error!("Failed to generate flamegraph: {}", e);
return Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from(format!("Failed to generate flamegraph: {}", e)),
Body::from(format!("Failed to generate flamegraph: {e}")),
)));
}
};

View File

@@ -41,6 +41,8 @@ use rustfs_ahm::{
};
use rustfs_common::globals::set_global_addr;
use rustfs_config::DEFAULT_DELIMITER;
use rustfs_config::DEFAULT_UPDATE_CHECK;
use rustfs_config::ENV_UPDATE_CHECK;
use rustfs_ecstore::bucket::metadata_sys;
use rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
use rustfs_ecstore::cmd::bucket_replication::init_bucket_replication_pool;
@@ -62,7 +64,6 @@ use rustfs_iam::init_iam_sys;
use rustfs_notify::global::notifier_instance;
use rustfs_obs::{init_obs, set_global_guard};
use rustfs_targets::arn::TargetID;
use rustfs_utils::dns_resolver::init_global_dns_resolver;
use rustfs_utils::net::parse_and_resolve_address;
use s3s::s3_error;
use std::io::{Error, Result};
@@ -74,6 +75,18 @@ use tracing::{debug, error, info, instrument, warn};
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[cfg(all(target_os = "linux", target_env = "musl"))]
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
const LOGO: &str = r#"
░█▀▄░█░█░█▀▀░▀█▀░█▀▀░█▀▀
░█▀▄░█░█░▀▀█░░█░░█▀▀░▀▀█
░▀░▀░▀▀▀░▀▀▀░░▀░░▀░░░▀▀▀
"#;
#[instrument]
fn print_server_info() {
let current_year = chrono::Utc::now().year();
@@ -97,6 +110,9 @@ async fn main() -> Result<()> {
// Initialize Observability
let (_logger, guard) = init_obs(Some(opt.clone().obs_endpoint)).await;
// print startup logo
info!("{}", LOGO);
// Store in global storage
set_global_guard(guard).map_err(Error::other)?;
@@ -112,12 +128,12 @@ async fn main() -> Result<()> {
async fn run(opt: config::Opt) -> Result<()> {
debug!("opt: {:?}", &opt);
// Initialize global DNS resolver early for enhanced DNS resolution (concurrent)
let dns_init = tokio::spawn(async {
if let Err(e) = init_global_dns_resolver().await {
warn!("Failed to initialize global DNS resolver: {}. Using standard DNS resolution.", e);
}
});
// // Initialize global DNS resolver early for enhanced DNS resolution (concurrent)
// let dns_init = tokio::spawn(async {
// if let Err(e) = rustfs_utils::dns_resolver::init_global_dns_resolver().await {
// warn!("Failed to initialize global DNS resolver: {}. Using standard DNS resolution.", e);
// }
// });
if let Some(region) = &opt.region {
rustfs_ecstore::global::set_global_region(region.clone());
@@ -127,7 +143,15 @@ async fn run(opt: config::Opt) -> Result<()> {
let server_port = server_addr.port();
let server_address = server_addr.to_string();
info!("server_address {}, ip:{}", &server_address, server_addr.ip());
info!(
target: "rustfs::main::run",
server_address = %server_address,
ip = %server_addr.ip(),
port = %server_port,
version = %version::get_version(),
"Starting RustFS server at {}",
&server_address
);
// Set up AK and SK
rustfs_ecstore::global::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone()));
@@ -136,8 +160,8 @@ async fn run(opt: config::Opt) -> Result<()> {
set_global_addr(&opt.address).await;
// Wait for DNS initialization to complete before network-heavy operations
dns_init.await.map_err(Error::other)?;
// // Wait for DNS initialization to complete before network-heavy operations
// dns_init.await.map_err(Error::other)?;
// For RPC
let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(server_address.clone().as_str(), opt.volumes.clone())
@@ -146,6 +170,7 @@ async fn run(opt: config::Opt) -> Result<()> {
for (i, eps) in endpoint_pools.as_ref().iter().enumerate() {
info!(
target: "rustfs::main::run",
"Formatting {}st pool, {} set(s), {} drives per set.",
i + 1,
eps.set_count,
@@ -159,12 +184,20 @@ async fn run(opt: config::Opt) -> Result<()> {
for (i, eps) in endpoint_pools.as_ref().iter().enumerate() {
info!(
target: "rustfs::main::run",
id = i,
set_count = eps.set_count,
drives_per_set = eps.drives_per_set,
cmd = ?eps.cmd_line,
"created endpoints {}, set_count:{}, drives_per_set: {}, cmd: {:?}",
i, eps.set_count, eps.drives_per_set, eps.cmd_line
);
for ep in eps.endpoints.as_ref().iter() {
info!(" - {}", ep);
info!(
target: "rustfs::main::run",
" - endpoint: {}", ep
);
}
}
@@ -186,7 +219,12 @@ async fn run(opt: config::Opt) -> Result<()> {
external_port, server_port
);
}
info!("Using external address {} for endpoint access", external_addr);
info!(
target: "rustfs::main::run",
external_address = %external_addr,
external_port = %external_port,
"Using external address {} for endpoint access", external_addr
);
rustfs_ecstore::global::set_global_rustfs_external_port(external_port);
set_global_addr(&opt.external_address).await;
(external_addr.ip(), external_port)
@@ -244,31 +282,11 @@ async fn run(opt: config::Opt) -> Result<()> {
// Collect bucket names into a vector
let buckets: Vec<String> = buckets_list.into_iter().map(|v| v.name).collect();
// Parallelize initialization tasks for better network performance
let bucket_metadata_task = tokio::spawn({
let store = store.clone();
let buckets = buckets.clone();
async move {
init_bucket_metadata_sys(store, buckets).await;
}
});
init_bucket_metadata_sys(store.clone(), buckets.clone()).await;
let iam_init_task = tokio::spawn({
let store = store.clone();
async move { init_iam_sys(store).await }
});
init_iam_sys(store.clone()).await.map_err(Error::other)?;
let notification_config_task = tokio::spawn({
let buckets = buckets.clone();
async move {
add_bucket_notification_configuration(buckets).await;
}
});
// Wait for all parallel initialization tasks to complete
bucket_metadata_task.await.map_err(Error::other)?;
iam_init_task.await.map_err(Error::other)??;
notification_config_task.await.map_err(Error::other)?;
add_bucket_notification_configuration(buckets.clone()).await;
// Initialize the global notification system
new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| {
@@ -283,7 +301,12 @@ async fn run(opt: config::Opt) -> Result<()> {
let enable_scanner = parse_bool_env_var("RUSTFS_ENABLE_SCANNER", true);
let enable_heal = parse_bool_env_var("RUSTFS_ENABLE_HEAL", true);
info!("Background services configuration: scanner={}, heal={}", enable_scanner, enable_heal);
info!(
target: "rustfs::main::run",
enable_scanner = enable_scanner,
enable_heal = enable_heal,
"Background services configuration: scanner={}, heal={}", enable_scanner, enable_heal
);
// Initialize heal manager and scanner based on environment variables
if enable_heal || enable_scanner {
@@ -293,11 +316,11 @@ async fn run(opt: config::Opt) -> Result<()> {
let heal_manager = init_heal_manager(heal_storage, None).await?;
if enable_scanner {
info!("Starting scanner with heal manager...");
info!(target: "rustfs::main::run","Starting scanner with heal manager...");
let scanner = Scanner::new(Some(ScannerConfig::default()), Some(heal_manager));
scanner.start().await?;
} else {
info!("Scanner disabled, but heal manager is initialized and available");
info!(target: "rustfs::main::run","Scanner disabled, but heal manager is initialized and available");
}
} else if enable_scanner {
info!("Starting scanner without heal manager...");
@@ -305,7 +328,7 @@ async fn run(opt: config::Opt) -> Result<()> {
scanner.start().await?;
}
} else {
info!("Both scanner and heal are disabled, skipping AHM service initialization");
info!(target: "rustfs::main::run","Both scanner and heal are disabled, skipping AHM service initialization");
}
// print server info
@@ -313,7 +336,153 @@ async fn run(opt: config::Opt) -> Result<()> {
// initialize bucket replication pool
init_bucket_replication_pool().await;
// Async update check with timeout (optional)
init_update_check();
// Perform hibernation for 1 second
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
// listen to the shutdown signal
match wait_for_shutdown().await {
#[cfg(unix)]
ShutdownSignal::CtrlC | ShutdownSignal::Sigint | ShutdownSignal::Sigterm => {
handle_shutdown(&state_manager, &shutdown_tx).await;
}
#[cfg(not(unix))]
ShutdownSignal::CtrlC => {
handle_shutdown(&state_manager, &shutdown_tx).await;
}
}
info!(target: "rustfs::main::run","server is stopped state: {:?}", state_manager.current_state());
Ok(())
}
/// Parse a boolean environment variable with default value
///
/// Returns true if the environment variable is not set or set to true/1/yes/on/enabled,
/// false if set to false/0/no/off/disabled
fn parse_bool_env_var(var_name: &str, default: bool) -> bool {
std::env::var(var_name)
.unwrap_or_else(|_| default.to_string())
.parse::<bool>()
.unwrap_or(default)
}
/// Handles the shutdown process of the server
async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) {
info!(
target: "rustfs::main::handle_shutdown",
"Shutdown signal received in main thread"
);
// update the status to stopping first
state_manager.update(ServiceState::Stopping);
// Check environment variables to determine what services need to be stopped
let enable_scanner = parse_bool_env_var("RUSTFS_ENABLE_SCANNER", true);
let enable_heal = parse_bool_env_var("RUSTFS_ENABLE_HEAL", true);
// Stop background services based on what was enabled
if enable_scanner || enable_heal {
info!(
target: "rustfs::main::handle_shutdown",
"Stopping background services (data scanner and auto heal)..."
);
shutdown_background_services();
info!(
target: "rustfs::main::handle_shutdown",
"Stopping AHM services..."
);
shutdown_ahm_services();
} else {
info!(
target: "rustfs::main::handle_shutdown",
"Background services were disabled, skipping AHM shutdown"
);
}
// Stop the notification system
shutdown_event_notifier().await;
info!(
target: "rustfs::main::handle_shutdown",
"Server is stopping..."
);
let _ = shutdown_tx.send(());
// Wait for the worker thread to complete the cleaning work
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
// the last updated status is stopped
state_manager.update(ServiceState::Stopped);
info!(
target: "rustfs::main::handle_shutdown",
"Server stopped current "
);
}
#[instrument]
async fn init_event_notifier() {
info!(
target: "rustfs::main::init_event_notifier",
"Initializing event notifier..."
);
// 1. Get the global configuration loaded by ecstore
let server_config = match GLOBAL_SERVER_CONFIG.get() {
Some(config) => config.clone(), // Clone the config to pass ownership
None => {
error!("Event notifier initialization failed: Global server config not loaded.");
return;
}
};
info!(
target: "rustfs::main::init_event_notifier",
"Global server configuration loaded successfully"
);
// 2. Check if the notify subsystem exists in the configuration, and skip initialization if it doesn't
if server_config
.get_value(rustfs_config::notify::NOTIFY_MQTT_SUB_SYS, DEFAULT_DELIMITER)
.is_none()
|| server_config
.get_value(rustfs_config::notify::NOTIFY_WEBHOOK_SUB_SYS, DEFAULT_DELIMITER)
.is_none()
{
info!(
target: "rustfs::main::init_event_notifier",
"'notify' subsystem not configured, skipping event notifier initialization."
);
return;
}
info!(
target: "rustfs::main::init_event_notifier",
"Event notifier configuration found, proceeding with initialization."
);
// 3. Initialize the notification system asynchronously with a global configuration
// Use direct await for better error handling and faster initialization
if let Err(e) = rustfs_notify::initialize(server_config).await {
error!("Failed to initialize event notifier system: {}", e);
} else {
info!(
target: "rustfs::main::init_event_notifier",
"Event notifier system initialized successfully."
);
}
}
fn init_update_check() {
let update_check_enable = std::env::var(ENV_UPDATE_CHECK)
.unwrap_or_else(|_| DEFAULT_UPDATE_CHECK.to_string())
.parse::<bool>()
.unwrap_or(DEFAULT_UPDATE_CHECK);
if !update_check_enable {
return;
}
// Async update check with timeout
tokio::spawn(async {
use crate::update::{UpdateCheckError, check_updates};
@@ -348,121 +517,6 @@ async fn run(opt: config::Opt) -> Result<()> {
}
}
});
// if opt.console_enable {
// debug!("console is enabled");
// let console_address = opt.console_address.clone();
// let tls_path = opt.tls_path.clone();
//
// if console_address.is_empty() {
// error!("console_address is empty");
// return Err(Error::other("console_address is empty".to_string()));
// }
//
// tokio::spawn(async move {
// console::start_static_file_server(&console_address, tls_path).await;
// });
// }
// Perform hibernation for 1 second
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
// listen to the shutdown signal
match wait_for_shutdown().await {
#[cfg(unix)]
ShutdownSignal::CtrlC | ShutdownSignal::Sigint | ShutdownSignal::Sigterm => {
handle_shutdown(&state_manager, &shutdown_tx).await;
}
#[cfg(not(unix))]
ShutdownSignal::CtrlC => {
handle_shutdown(&state_manager, &shutdown_tx).await;
}
}
info!("server is stopped state: {:?}", state_manager.current_state());
Ok(())
}
/// Parse a boolean environment variable with default value
///
/// Returns true if the environment variable is not set or set to true/1/yes/on/enabled,
/// false if set to false/0/no/off/disabled
fn parse_bool_env_var(var_name: &str, default: bool) -> bool {
std::env::var(var_name)
.unwrap_or_else(|_| default.to_string())
.parse::<bool>()
.unwrap_or(default)
}
/// Handles the shutdown process of the server
async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) {
info!("Shutdown signal received in main thread");
// update the status to stopping first
state_manager.update(ServiceState::Stopping);
// Check environment variables to determine what services need to be stopped
let enable_scanner = parse_bool_env_var("RUSTFS_ENABLE_SCANNER", true);
let enable_heal = parse_bool_env_var("RUSTFS_ENABLE_HEAL", true);
// Stop background services based on what was enabled
if enable_scanner || enable_heal {
info!("Stopping background services (data scanner and auto heal)...");
shutdown_background_services();
info!("Stopping AHM services...");
shutdown_ahm_services();
} else {
info!("Background services were disabled, skipping AHM shutdown");
}
// Stop the notification system
shutdown_event_notifier().await;
info!("Server is stopping...");
let _ = shutdown_tx.send(());
// Wait for the worker thread to complete the cleaning work
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
// the last updated status is stopped
state_manager.update(ServiceState::Stopped);
info!("Server stopped current ");
}
#[instrument]
async fn init_event_notifier() {
info!("Initializing event notifier...");
// 1. Get the global configuration loaded by ecstore
let server_config = match GLOBAL_SERVER_CONFIG.get() {
Some(config) => config.clone(), // Clone the config to pass ownership
None => {
error!("Event notifier initialization failed: Global server config not loaded.");
return;
}
};
info!("Global server configuration loaded successfully");
// 2. Check if the notify subsystem exists in the configuration, and skip initialization if it doesn't
if server_config
.get_value(rustfs_config::notify::NOTIFY_MQTT_SUB_SYS, DEFAULT_DELIMITER)
.is_none()
|| server_config
.get_value(rustfs_config::notify::NOTIFY_WEBHOOK_SUB_SYS, DEFAULT_DELIMITER)
.is_none()
{
info!("'notify' subsystem not configured, skipping event notifier initialization.");
return;
}
info!("Event notifier configuration found, proceeding with initialization.");
// 3. Initialize the notification system asynchronously with a global configuration
// Use direct await for better error handling and faster initialization
if let Err(e) = rustfs_notify::initialize(server_config).await {
error!("Failed to initialize event notifier system: {}", e);
} else {
info!("Event notifier system initialized successfully.");
}
}
#[instrument(skip_all)]
@@ -483,7 +537,10 @@ async fn add_bucket_notification_configuration(buckets: Vec<String>) {
match has_notification_config {
Some(cfg) => {
info!("Bucket '{}' has existing notification configuration: {:?}", bucket, cfg);
info!(
target: "rustfs::main::add_bucket_notification_configuration",
bucket = %bucket,
"Bucket '{}' has existing notification configuration: {:?}", bucket, cfg);
let mut event_rules = Vec::new();
process_queue_configurations(&mut event_rules, cfg.queue_configurations.clone(), TargetID::from_str);
@@ -499,7 +556,10 @@ async fn add_bucket_notification_configuration(buckets: Vec<String>) {
}
}
None => {
info!("Bucket '{}' has no existing notification configuration.", bucket);
info!(
target: "rustfs::main::add_bucket_notification_configuration",
bucket = %bucket,
"Bucket '{}' has no existing notification configuration.", bucket);
}
}
}

View File

@@ -23,7 +23,7 @@ pub fn init_profiler() -> Result<(), Box<dyn std::error::Error>> {
.frequency(1000)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|e| format!("Failed to build profiler guard: {}", e))?;
.map_err(|e| format!("Failed to build profiler guard: {e}"))?;
PROFILER_GUARD
.set(Arc::new(Mutex::new(guard)))

View File

@@ -16,7 +16,7 @@ use crate::admin::console::static_handler;
use crate::config::Opt;
use axum::{Router, extract::Request, middleware, response::Json, routing::get};
use axum_server::tls_rustls::RustlsConfig;
use http::{HeaderValue, Method, header};
use http::{HeaderValue, Method};
use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_utils::net::parse_and_resolve_address;
use serde_json::json;
@@ -230,15 +230,15 @@ async fn health_check() -> Json<serde_json::Value> {
pub fn parse_cors_origins(origins: Option<&String>) -> CorsLayer {
let cors_layer = CorsLayer::new()
.allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE, Method::OPTIONS])
.allow_headers([header::CONTENT_TYPE, header::AUTHORIZATION, header::ACCEPT, header::ORIGIN]);
.allow_headers(Any);
match origins {
Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any),
Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any).expose_headers(Any),
Some(origins_str) => {
let origins: Vec<&str> = origins_str.split(',').map(|s| s.trim()).collect();
if origins.is_empty() {
warn!("Empty CORS origins provided, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
} else {
// Parse origins with proper error handling
let mut valid_origins = Vec::new();
@@ -255,10 +255,10 @@ pub fn parse_cors_origins(origins: Option<&String>) -> CorsLayer {
if valid_origins.is_empty() {
warn!("No valid CORS origins found, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
} else {
info!("Console CORS origins configured: {:?}", valid_origins);
cors_layer.allow_origin(AllowOrigin::list(valid_origins))
cors_layer.allow_origin(AllowOrigin::list(valid_origins)).expose_headers(Any)
}
}
}

View File

@@ -65,26 +65,15 @@ fn parse_cors_origins(origins: Option<&String>) -> CorsLayer {
Method::HEAD,
Method::OPTIONS,
])
.allow_headers([
http::header::CONTENT_TYPE,
http::header::AUTHORIZATION,
http::header::ACCEPT,
http::header::ORIGIN,
// Note: X_AMZ_* headers are custom and may need to be defined
// http::header::X_AMZ_CONTENT_SHA256,
// http::header::X_AMZ_DATE,
// http::header::X_AMZ_SECURITY_TOKEN,
// http::header::X_AMZ_USER_AGENT,
http::header::RANGE,
]);
.allow_headers(Any);
match origins {
Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any),
Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any).expose_headers(Any),
Some(origins_str) => {
let origins: Vec<&str> = origins_str.split(',').map(|s| s.trim()).collect();
if origins.is_empty() {
warn!("Empty CORS origins provided, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
} else {
// Parse origins with proper error handling
let mut valid_origins = Vec::new();
@@ -101,16 +90,16 @@ fn parse_cors_origins(origins: Option<&String>) -> CorsLayer {
if valid_origins.is_empty() {
warn!("No valid CORS origins found, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
} else {
info!("Endpoint CORS origins configured: {:?}", valid_origins);
cors_layer.allow_origin(AllowOrigin::list(valid_origins))
cors_layer.allow_origin(AllowOrigin::list(valid_origins)).expose_headers(Any)
}
}
}
None => {
debug!("No CORS origins configured for endpoint, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
}
}
}

View File

@@ -1209,6 +1209,7 @@ impl S3 for FS {
name: v2.name,
prefix: v2.prefix,
max_keys: v2.max_keys,
common_prefixes: v2.common_prefixes,
..Default::default()
}))
}
@@ -1230,6 +1231,7 @@ impl S3 for FS {
let prefix = prefix.unwrap_or_default();
let max_keys = match max_keys {
Some(v) if v > 0 && v <= 1000 => v,
Some(v) if v > 1000 => 1000,
None => 1000,
_ => return Err(s3_error!(InvalidArgument, "max-keys must be between 1 and 1000")),
};

View File

@@ -64,6 +64,9 @@ export RUSTFS_OBS_LOCAL_LOGGING_ENABLED=true # Whether to enable local logging
export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
export RUSTFS_OBS_LOG_ROTATION_TIME="hour" # Log rotation time unit, can be "second", "minute", "hour", "day"
export RUSTFS_OBS_LOG_ROTATION_SIZE_MB=100 # Log rotation size in MB
export RUSTFS_OBS_LOG_POOL_CAPA=10240
export RUSTFS_OBS_LOG_MESSAGE_CAPA=32768
export RUSTFS_OBS_LOG_FLUSH_MS=300
export RUSTFS_SINKS_FILE_PATH="$current_dir/deploy/logs"
export RUSTFS_SINKS_FILE_BUFFER_SIZE=12