Compare commits

...

10 Commits

Author SHA1 Message Date
Copilot
e5f0760009 Fix entrypoint.sh incorrectly passing logs directory as data volume with improved separation (#561)
* Initial plan

* Fix entrypoint.sh: separate log directory from data volumes

Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com>

* Improve separation: use functions and RUSTFS_OBS_LOG_DIRECTORY env var

Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: overtrue <1472352+overtrue@users.noreply.github.com>
2025-09-18 17:05:14 +08:00
houseme
a6c211f4ea Feature/add dns logs (#558)
* add logs

* improve code for dns and logger
2025-09-18 12:00:43 +08:00
shiro.lee
f049c656d9 fix: list_objects does not return common_prefixes field. (#543) (#554)
Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>
2025-09-18 07:27:37 +08:00
majinghe
65dd947350 add tls support for docker compose (#553)
* add tls support for docker compose

* update docker compose file with comment
2025-09-17 22:45:23 +08:00
0xdx2
57f082ee2b fix: enforce max-keys limit to 1000 in S3 implementation (#549)
Co-authored-by: damon <damonxue2@gmail.com>
2025-09-16 18:02:24 +08:00
weisd
ae7e86d7ef refactor: simplify initialization flow and modernize string formatting (#548) 2025-09-16 15:44:50 +08:00
houseme
a12a3bedc3 feat(obs): optimize WriteMode selection logic in init_telemetry (#546)
- Refactor WriteMode selection to ensure all variables moved into thread closures are owned types, preventing lifetime issues.
- Simplify and clarify WriteMode assignment for production and non-production environments.
- Improve code readability and maintainability for logger initialization.
2025-09-16 08:25:37 +08:00
Copilot
cafec06b7e [Optimization] Enhance obs module telemetry.rs with environment-aware logging and production security (#539)
* Initial plan

* Implement environment-aware logging with production stdout auto-disable

Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>

* add mimalloc crate

* fix

* improve code

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: houseme <4829346+houseme@users.noreply.github.com>
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: loverustfs <155562731+loverustfs@users.noreply.github.com>
2025-09-15 14:52:20 +08:00
Parm Gill
1770679e66 Adding a toggle for update check (#532) 2025-09-14 22:26:48 +08:00
jon
a4fbf596e6 add startup logo (#528)
* add startup logo

* Replace logo ASCII art in main.rs

---------

Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: 安正超 <anzhengchao@gmail.com>
2025-09-14 12:04:00 +08:00
32 changed files with 763 additions and 336 deletions

46
Cargo.lock generated
View File

@@ -2942,6 +2942,7 @@ dependencies = [
"chrono",
"crossbeam-channel",
"crossbeam-queue",
"flate2",
"log",
"notify-debouncer-mini",
"nu-ansi-term",
@@ -4021,6 +4022,16 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de"
[[package]]
name = "libmimalloc-sys"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "667f4fec20f29dfc6bc7357c582d91796c169ad7e2fce709468aefeb2c099870"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "libredox"
version = "0.1.9"
@@ -4244,6 +4255,15 @@ dependencies = [
"autocfg",
]
[[package]]
name = "mimalloc"
version = "0.1.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1ee66a4b64c74f4ef288bcbb9192ad9c3feaad75193129ac8509af543894fd8"
dependencies = [
"libmimalloc-sys",
]
[[package]]
name = "mime"
version = "0.3.17"
@@ -6015,6 +6035,7 @@ dependencies = [
"hyper-util",
"libsystemd",
"matchit",
"mimalloc",
"mime_guess",
"opentelemetry",
"percent-encoding",
@@ -6983,10 +7004,11 @@ checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc"
[[package]]
name = "serde"
version = "1.0.219"
version = "1.0.223"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6"
checksum = "a505d71960adde88e293da5cb5eda57093379f64e61cf77bf0e6a63af07a7bac"
dependencies = [
"serde_core",
"serde_derive",
]
@@ -7024,10 +7046,19 @@ dependencies = [
]
[[package]]
name = "serde_derive"
version = "1.0.219"
name = "serde_core"
version = "1.0.223"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00"
checksum = "20f57cbd357666aa7b3ac84a90b4ea328f1d4ddb6772b430caa5d9e1309bb9e9"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.223"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d428d07faf17e306e699ec1e91996e5a165ba5d6bce5b5155173e91a8a01a56"
dependencies = [
"proc-macro2",
"quote",
@@ -7056,14 +7087,15 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.143"
version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d401abef1d108fbd9cbaebc3e46611f4b1021f714a0597a71f41ee463f5f4a5a"
checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c"
dependencies = [
"itoa",
"memchr",
"ryu",
"serde",
"serde_core",
]
[[package]]

View File

@@ -123,7 +123,7 @@ derive_builder = "0.20.2"
enumset = "1.1.10"
flatbuffers = "25.2.10"
flate2 = "1.1.2"
flexi_logger = { version = "0.31.2", features = ["trc", "dont_minimize_extra_stacks"] }
flexi_logger = { version = "0.31.2", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv"] }
form_urlencoded = "1.2.2"
futures = "0.3.31"
futures-core = "0.3.31"
@@ -209,8 +209,8 @@ rustls-pki-types = "1.12.0"
rustls-pemfile = "2.2.0"
s3s = { version = "0.12.0-minio-preview.3" }
schemars = "1.0.4"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = { version = "1.0.143", features = ["raw_value"] }
serde = { version = "1.0.223", features = ["derive"] }
serde_json = { version = "1.0.145", features = ["raw_value"] }
serde_urlencoded = "0.7.1"
serial_test = "3.2.0"
sha1 = "0.10.6"

View File

@@ -136,6 +136,8 @@ To get started with RustFS, follow these steps:
5. **Create a Bucket**: Use the console to create a new bucket for your objects.
6. **Upload Objects**: You can upload files directly through the console or use S3-compatible APIs to interact with your RustFS instance.
**NOTE**: If you want to access RustFS instance with `https`, you can refer to [TLS configuration docs](https://docs.rustfs.com/integration/tls-configured.html).
## Documentation
For detailed documentation, including configuration options, API references, and advanced usage, please visit our [Documentation](https://docs.rustfs.com).

View File

@@ -86,6 +86,8 @@ RustFS 是一个使用 Rust全球最受欢迎的编程语言之一构建
4. **创建存储桶**:使用控制台为您的对象创建新的存储桶。
5. **上传对象**:您可以直接通过控制台上传文件,或使用 S3 兼容的 API 与您的 RustFS 实例交互。
**注意**:如果你想通过 `https` 来访问 RustFS 实例,请参考 [TLS 配置文档](https://docs.rustfs.com/zh/integration/tls-configured.html)
## 文档
有关详细文档包括配置选项、API 参考和高级用法,请访问我们的[文档](https://docs.rustfs.com)。

View File

@@ -340,7 +340,7 @@ impl HealTask {
Ok((result, error)) => {
if let Some(e) = error {
// Check if this is a "File not found" error during delete operations
let error_msg = format!("{}", e);
let error_msg = format!("{e}");
if error_msg.contains("File not found") || error_msg.contains("not found") {
info!(
"Object {}/{} not found during heal - likely deleted intentionally, treating as successful",
@@ -395,7 +395,7 @@ impl HealTask {
}
Err(e) => {
// Check if this is a "File not found" error during delete operations
let error_msg = format!("{}", e);
let error_msg = format!("{e}");
if error_msg.contains("File not found") || error_msg.contains("not found") {
info!(
"Object {}/{} not found during heal - likely deleted intentionally, treating as successful",

View File

@@ -79,3 +79,13 @@ pub const ENV_CONSOLE_AUTH_TIMEOUT: &str = "RUSTFS_CONSOLE_AUTH_TIMEOUT";
/// Example: RUSTFS_CONSOLE_AUTH_TIMEOUT=3600
/// Example: --console-auth-timeout 3600
pub const DEFAULT_CONSOLE_AUTH_TIMEOUT: u64 = 3600;
/// Toggle update check
/// It controls whether to check for newer versions of rustfs
/// Default value: true
/// Environment variable: RUSTFS_CHECK_UPDATE
/// Example: RUSTFS_CHECK_UPDATE=false
pub const ENV_UPDATE_CHECK: &str = "RUSTFS_CHECK_UPDATE";
/// Default value for update toggle
pub const DEFAULT_UPDATE_CHECK: bool = true;

View File

@@ -29,7 +29,70 @@ pub const ENV_OBS_LOG_ROTATION_SIZE_MB: &str = "RUSTFS_OBS_LOG_ROTATION_SIZE_MB"
pub const ENV_OBS_LOG_ROTATION_TIME: &str = "RUSTFS_OBS_LOG_ROTATION_TIME";
pub const ENV_OBS_LOG_KEEP_FILES: &str = "RUSTFS_OBS_LOG_KEEP_FILES";
/// Log pool capacity for async logging
pub const ENV_OBS_LOG_POOL_CAPA: &str = "RUSTFS_OBS_LOG_POOL_CAPA";
/// Log message capacity for async logging
pub const ENV_OBS_LOG_MESSAGE_CAPA: &str = "RUSTFS_OBS_LOG_MESSAGE_CAPA";
/// Log flush interval in milliseconds for async logging
pub const ENV_OBS_LOG_FLUSH_MS: &str = "RUSTFS_OBS_LOG_FLUSH_MS";
/// Default values for log pool
pub const DEFAULT_OBS_LOG_POOL_CAPA: usize = 10240;
/// Default values for message capacity
pub const DEFAULT_OBS_LOG_MESSAGE_CAPA: usize = 32768;
/// Default values for flush interval in milliseconds
pub const DEFAULT_OBS_LOG_FLUSH_MS: u64 = 200;
/// Audit logger queue capacity environment variable key
pub const ENV_AUDIT_LOGGER_QUEUE_CAPACITY: &str = "RUSTFS_AUDIT_LOGGER_QUEUE_CAPACITY";
// Default values for observability configuration
/// Default values for observability configuration
pub const DEFAULT_AUDIT_LOGGER_QUEUE_CAPACITY: usize = 10000;
/// Default values for observability configuration
// ### Supported Environment Values
// - `production` - Secure file-only logging
// - `development` - Full debugging with stdout
// - `test` - Test environment with stdout support
// - `staging` - Staging environment with stdout support
pub const DEFAULT_OBS_ENVIRONMENT_PRODUCTION: &str = "production";
pub const DEFAULT_OBS_ENVIRONMENT_DEVELOPMENT: &str = "development";
pub const DEFAULT_OBS_ENVIRONMENT_TEST: &str = "test";
pub const DEFAULT_OBS_ENVIRONMENT_STAGING: &str = "staging";
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_env_keys() {
assert_eq!(ENV_OBS_ENDPOINT, "RUSTFS_OBS_ENDPOINT");
assert_eq!(ENV_OBS_USE_STDOUT, "RUSTFS_OBS_USE_STDOUT");
assert_eq!(ENV_OBS_SAMPLE_RATIO, "RUSTFS_OBS_SAMPLE_RATIO");
assert_eq!(ENV_OBS_METER_INTERVAL, "RUSTFS_OBS_METER_INTERVAL");
assert_eq!(ENV_OBS_SERVICE_NAME, "RUSTFS_OBS_SERVICE_NAME");
assert_eq!(ENV_OBS_SERVICE_VERSION, "RUSTFS_OBS_SERVICE_VERSION");
assert_eq!(ENV_OBS_ENVIRONMENT, "RUSTFS_OBS_ENVIRONMENT");
assert_eq!(ENV_OBS_LOGGER_LEVEL, "RUSTFS_OBS_LOGGER_LEVEL");
assert_eq!(ENV_OBS_LOCAL_LOGGING_ENABLED, "RUSTFS_OBS_LOCAL_LOGGING_ENABLED");
assert_eq!(ENV_OBS_LOG_DIRECTORY, "RUSTFS_OBS_LOG_DIRECTORY");
assert_eq!(ENV_OBS_LOG_FILENAME, "RUSTFS_OBS_LOG_FILENAME");
assert_eq!(ENV_OBS_LOG_ROTATION_SIZE_MB, "RUSTFS_OBS_LOG_ROTATION_SIZE_MB");
assert_eq!(ENV_OBS_LOG_ROTATION_TIME, "RUSTFS_OBS_LOG_ROTATION_TIME");
assert_eq!(ENV_OBS_LOG_KEEP_FILES, "RUSTFS_OBS_LOG_KEEP_FILES");
assert_eq!(ENV_AUDIT_LOGGER_QUEUE_CAPACITY, "RUSTFS_AUDIT_LOGGER_QUEUE_CAPACITY");
}
#[test]
fn test_default_values() {
assert_eq!(DEFAULT_AUDIT_LOGGER_QUEUE_CAPACITY, 10000);
assert_eq!(DEFAULT_OBS_ENVIRONMENT_PRODUCTION, "production");
assert_eq!(DEFAULT_OBS_ENVIRONMENT_DEVELOPMENT, "development");
assert_eq!(DEFAULT_OBS_ENVIRONMENT_TEST, "test");
assert_eq!(DEFAULT_OBS_ENVIRONMENT_STAGING, "staging");
}
}

View File

@@ -458,7 +458,7 @@ impl LocalDisk {
{
let cache = self.path_cache.read();
for (i, (bucket, key)) in requests.iter().enumerate() {
let cache_key = format!("{}/{}", bucket, key);
let cache_key = format!("{bucket}/{key}");
if let Some(cached_path) = cache.get(&cache_key) {
results.push((i, cached_path.clone()));
} else {

View File

@@ -13,13 +13,12 @@
// limitations under the License.
use rustfs_utils::{XHost, check_local_server_addr, get_host_ip, is_local_host};
use tracing::{error, instrument, warn};
use tracing::{error, info, instrument, warn};
use crate::{
disk::endpoint::{Endpoint, EndpointType},
disks_layout::DisksLayout,
global::global_rustfs_port,
// utils::net::{self, XHost},
};
use std::io::{Error, Result};
use std::{
@@ -242,15 +241,32 @@ impl PoolEndpointList {
let host = ep.url.host().unwrap();
let host_ip_set = if let Some(set) = host_ip_cache.get(&host) {
info!(
target: "rustfs::ecstore::endpoints",
host = %host,
endpoint = %ep.to_string(),
from = "cache",
"Create pool endpoints host '{}' found in cache for endpoint '{}'", host, ep.to_string()
);
set
} else {
let ips = match get_host_ip(host.clone()).await {
Ok(ips) => ips,
Err(e) => {
error!("host {} not found, error:{}", host, e);
error!("Create pool endpoints host {} not found, error:{}", host, e);
return Err(Error::other(format!("host '{host}' cannot resolve: {e}")));
}
};
info!(
target: "rustfs::ecstore::endpoints",
host = %host,
endpoint = %ep.to_string(),
from = "get_host_ip",
"Create pool endpoints host '{}' resolved to ips {:?} for endpoint '{}'",
host,
ips,
ep.to_string()
);
host_ip_cache.insert(host.clone(), ips);
host_ip_cache.get(&host).unwrap()
};

View File

@@ -65,7 +65,7 @@ impl OptimizedFileCache {
// Cache miss, read file
let data = tokio::fs::read(&path)
.await
.map_err(|e| Error::other(format!("Read metadata failed: {}", e)))?;
.map_err(|e| Error::other(format!("Read metadata failed: {e}")))?;
let mut meta = FileMeta::default();
meta.unmarshal_msg(&data)?;
@@ -86,7 +86,7 @@ impl OptimizedFileCache {
let data = tokio::fs::read(&path)
.await
.map_err(|e| Error::other(format!("Read file failed: {}", e)))?;
.map_err(|e| Error::other(format!("Read file failed: {e}")))?;
let bytes = Bytes::from(data);
self.file_content_cache.insert(path, bytes.clone()).await;
@@ -295,9 +295,9 @@ mod tests {
let mut paths = Vec::new();
for i in 0..5 {
let file_path = dir.path().join(format!("test_{}.txt", i));
let file_path = dir.path().join(format!("test_{i}.txt"));
let mut file = std::fs::File::create(&file_path).unwrap();
writeln!(file, "content {}", i).unwrap();
writeln!(file, "content {i}").unwrap();
paths.push(file_path);
}

View File

@@ -2430,7 +2430,7 @@ impl SetDisks {
.map_err(|e| {
let elapsed = start_time.elapsed();
error!("Failed to acquire write lock for heal operation after {:?}: {:?}", elapsed, e);
DiskError::other(format!("Failed to acquire write lock for heal operation: {:?}", e))
DiskError::other(format!("Failed to acquire write lock for heal operation: {e:?}"))
})?;
let elapsed = start_time.elapsed();
info!("Successfully acquired write lock for object: {} in {:?}", object, elapsed);
@@ -3045,7 +3045,7 @@ impl SetDisks {
.fast_lock_manager
.acquire_write_lock("", object, self.locker_owner.as_str())
.await
.map_err(|e| DiskError::other(format!("Failed to acquire write lock for heal directory operation: {:?}", e)))?;
.map_err(|e| DiskError::other(format!("Failed to acquire write lock for heal directory operation: {e:?}")))?;
let disks = {
let disks = self.disks.read().await;
@@ -5594,7 +5594,7 @@ impl StorageAPI for SetDisks {
self.fast_lock_manager
.acquire_write_lock("", object, self.locker_owner.as_str())
.await
.map_err(|e| Error::other(format!("Failed to acquire write lock for heal operation: {:?}", e)))?,
.map_err(|e| Error::other(format!("Failed to acquire write lock for heal operation: {e:?}")))?,
)
} else {
None

View File

@@ -23,7 +23,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Lock system status: {}", if manager.is_disabled() { "DISABLED" } else { "ENABLED" });
match std::env::var("RUSTFS_ENABLE_LOCKS") {
Ok(value) => println!("RUSTFS_ENABLE_LOCKS set to: {}", value),
Ok(value) => println!("RUSTFS_ENABLE_LOCKS set to: {value}"),
Err(_) => println!("RUSTFS_ENABLE_LOCKS not set (defaults to enabled)"),
}
@@ -34,7 +34,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Lock acquired successfully! Disabled: {}", guard.is_disabled());
}
Err(e) => {
println!("Failed to acquire lock: {:?}", e);
println!("Failed to acquire lock: {e:?}");
}
}

View File

@@ -87,7 +87,7 @@ impl LockClient for LocalClient {
current_owner,
current_mode,
}) => Ok(LockResponse::failure(
format!("Lock conflict: resource held by {} in {:?} mode", current_owner, current_mode),
format!("Lock conflict: resource held by {current_owner} in {current_mode:?} mode"),
std::time::Duration::ZERO,
)),
Err(crate::fast_lock::LockResult::Acquired) => {
@@ -131,7 +131,7 @@ impl LockClient for LocalClient {
current_owner,
current_mode,
}) => Ok(LockResponse::failure(
format!("Lock conflict: resource held by {} in {:?} mode", current_owner, current_mode),
format!("Lock conflict: resource held by {current_owner} in {current_mode:?} mode"),
std::time::Duration::ZERO,
)),
Err(crate::fast_lock::LockResult::Acquired) => {

View File

@@ -409,14 +409,14 @@ mod tests {
// Acquire multiple guards and verify unique IDs
let mut guards = Vec::new();
for i in 0..100 {
let object_name = format!("object_{}", i);
let object_name = format!("object_{i}");
let guard = manager
.acquire_write_lock("bucket", object_name.as_str(), "owner")
.await
.expect("Failed to acquire lock");
let guard_id = guard.guard_id();
assert!(guard_ids.insert(guard_id), "Guard ID {} is not unique", guard_id);
assert!(guard_ids.insert(guard_id), "Guard ID {guard_id} is not unique");
guards.push(guard);
}
@@ -501,7 +501,7 @@ mod tests {
let handle = tokio::spawn(async move {
for i in 0..10 {
let object_name = format!("obj_{}_{}", task_id, i);
let object_name = format!("obj_{task_id}_{i}");
// Acquire lock
let mut guard = match manager.acquire_write_lock("bucket", object_name.as_str(), "owner").await {
@@ -535,7 +535,7 @@ mod tests {
let blocked = double_release_blocked.load(Ordering::SeqCst);
// Should have many successful releases and all double releases blocked
assert!(successes > 150, "Expected many successful releases, got {}", successes);
assert!(successes > 150, "Expected many successful releases, got {successes}");
assert_eq!(blocked, successes, "All double releases should be blocked");
// Verify no active guards remain
@@ -567,7 +567,7 @@ mod tests {
// Acquire multiple locks for the same object to ensure they're in the same shard
let mut guards = Vec::new();
for i in 0..10 {
let owner_name = format!("owner_{}", i);
let owner_name = format!("owner_{i}");
let guard = manager
.acquire_read_lock("bucket", "shared_object", owner_name.as_str())
.await
@@ -586,7 +586,7 @@ mod tests {
let cleaned = shard.adaptive_cleanup();
// Should clean very little due to active guards
assert!(cleaned <= 5, "Should be conservative with active guards, cleaned: {}", cleaned);
assert!(cleaned <= 5, "Should be conservative with active guards, cleaned: {cleaned}");
// Locks should be protected by active guards
let remaining_locks = shard.lock_count();
@@ -809,7 +809,7 @@ mod tests {
let manager_clone = manager.clone();
let handle = tokio::spawn(async move {
let _guard = manager_clone
.acquire_read_lock("bucket", format!("object-{}", i), "background")
.acquire_read_lock("bucket", format!("object-{i}"), "background")
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
});
@@ -842,7 +842,7 @@ mod tests {
// High priority should generally perform reasonably well
// This is more of a performance validation than a strict requirement
println!("High priority: {:?}, Normal: {:?}", high_priority_duration, normal_duration);
println!("High priority: {high_priority_duration:?}, Normal: {normal_duration:?}");
// Both operations should complete in reasonable time (less than 100ms in test environment)
// This validates that the priority system isn't causing severe degradation
@@ -858,7 +858,7 @@ mod tests {
let mut _guards = Vec::new();
for i in 0..100 {
if let Ok(guard) = manager
.acquire_write_lock("bucket", format!("load-object-{}", i), "loader")
.acquire_write_lock("bucket", format!("load-object-{i}"), "loader")
.await
{
_guards.push(guard);
@@ -909,8 +909,8 @@ mod tests {
// Try to acquire all locks for this "query"
for obj_id in 0..objects_per_query {
let bucket = "databend";
let object = format!("table_partition_{}_{}", query_id, obj_id);
let owner = format!("query_{}", query_id);
let object = format!("table_partition_{query_id}_{obj_id}");
let owner = format!("query_{query_id}");
match manager_clone.acquire_high_priority_read_lock(bucket, object, owner).await {
Ok(guard) => query_locks.push(guard),
@@ -935,8 +935,8 @@ mod tests {
let manager_clone = manager.clone();
let handle = tokio::spawn(async move {
let bucket = "databend";
let object = format!("write_target_{}", write_id);
let owner = format!("writer_{}", write_id);
let object = format!("write_target_{write_id}");
let owner = format!("writer_{write_id}");
match manager_clone.acquire_write_lock(bucket, object, owner).await {
Ok(_guard) => {
@@ -988,7 +988,7 @@ mod tests {
let handle = tokio::spawn(async move {
let bucket = "test";
let object = format!("extreme_load_object_{}", i % 20); // Force some contention
let owner = format!("user_{}", i);
let owner = format!("user_{i}");
// Mix of read and write locks to create realistic contention
let result = if i % 3 == 0 {
@@ -1066,8 +1066,8 @@ mod tests {
};
let bucket = "datacenter-shared";
let object = format!("table_{}_{}_partition_{}", table_id, client_id, table_idx);
let owner = format!("client_{}_query_{}", client_id, query_id);
let object = format!("table_{table_id}_{client_id}_partition_{table_idx}");
let owner = format!("client_{client_id}_query_{query_id}");
// Mix of operations - mostly reads with some writes
let lock_result = if table_idx == 0 && query_id % 7 == 0 {
@@ -1129,10 +1129,10 @@ mod tests {
}
println!("\n=== Multi-Client Datacenter Simulation Results ===");
println!("Total execution time: {:?}", total_time);
println!("Total clients: {}", num_clients);
println!("Queries per client: {}", queries_per_client);
println!("Total queries executed: {}", total_queries);
println!("Total execution time: {total_time:?}");
println!("Total clients: {num_clients}");
println!("Queries per client: {queries_per_client}");
println!("Total queries executed: {total_queries}");
println!(
"Successful queries: {} ({:.1}%)",
successful_queries,
@@ -1179,8 +1179,7 @@ mod tests {
// Performance assertion - should complete in reasonable time
assert!(
total_time < std::time::Duration::from_secs(120),
"Multi-client test took too long: {:?}",
total_time
"Multi-client test took too long: {total_time:?}"
);
}
@@ -1209,8 +1208,8 @@ mod tests {
client_attempts += 1;
let bucket = "hot-data";
let object = format!("popular_table_{}", obj_id);
let owner = format!("thundering_client_{}", client_id);
let object = format!("popular_table_{obj_id}");
let owner = format!("thundering_client_{client_id}");
// Simulate different access patterns
let result = match obj_id % 3 {
@@ -1265,12 +1264,12 @@ mod tests {
let success_rate = total_successes as f64 / total_attempts as f64;
println!("\n=== Thundering Herd Scenario Results ===");
println!("Concurrent clients: {}", num_concurrent_clients);
println!("Hot objects: {}", hot_objects);
println!("Total attempts: {}", total_attempts);
println!("Total successes: {}", total_successes);
println!("Concurrent clients: {num_concurrent_clients}");
println!("Hot objects: {hot_objects}");
println!("Total attempts: {total_attempts}");
println!("Total successes: {total_successes}");
println!("Success rate: {:.1}%", success_rate * 100.0);
println!("Total time: {:?}", total_time);
println!("Total time: {total_time:?}");
println!(
"Average time per operation: {:.1}ms",
total_time.as_millis() as f64 / total_attempts as f64
@@ -1286,8 +1285,7 @@ mod tests {
// Should handle this volume in reasonable time
assert!(
total_time < std::time::Duration::from_secs(180),
"Thundering herd test took too long: {:?}",
total_time
"Thundering herd test took too long: {total_time:?}"
);
}
@@ -1314,7 +1312,7 @@ mod tests {
for op_id in 0..operations_per_transaction {
let bucket = "oltp-data";
let object = format!("record_{}_{}", oltp_id * 10 + tx_id, op_id);
let owner = format!("oltp_{}_{}", oltp_id, tx_id);
let owner = format!("oltp_{oltp_id}_{tx_id}");
// OLTP is mostly writes
let result = manager_clone.acquire_write_lock(bucket, object, owner).await;
@@ -1358,7 +1356,7 @@ mod tests {
table_id % 20, // Some overlap between queries
query_id
);
let owner = format!("olap_{}_{}", olap_id, query_id);
let owner = format!("olap_{olap_id}_{query_id}");
// OLAP is mostly reads with high priority
let result = manager_clone.acquire_critical_read_lock(bucket, object, owner).await;
@@ -1408,7 +1406,7 @@ mod tests {
let olap_success_rate = olap_successes as f64 / olap_attempts as f64;
println!("\n=== Mixed Workload Stress Test Results ===");
println!("Total time: {:?}", total_time);
println!("Total time: {total_time:?}");
println!(
"OLTP: {}/{} transactions succeeded ({:.1}%)",
oltp_successes,

View File

@@ -152,14 +152,14 @@ pub mod performance_comparison {
for i in 0..1000 {
let guard = fast_manager
.acquire_write_lock("bucket", format!("object_{}", i), owner)
.acquire_write_lock("bucket", format!("object_{i}"), owner)
.await
.expect("Failed to acquire fast lock");
guards.push(guard);
}
let fast_duration = start.elapsed();
println!("Fast lock: 1000 acquisitions in {:?}", fast_duration);
println!("Fast lock: 1000 acquisitions in {fast_duration:?}");
// Release all
drop(guards);

View File

@@ -27,7 +27,7 @@ mod tests {
let mut guards = Vec::new();
for i in 0..100 {
let bucket = format!("test-bucket-{}", i % 10); // Reuse some bucket names
let object = format!("test-object-{}", i);
let object = format!("test-object-{i}");
let guard = manager
.acquire_write_lock(bucket.as_str(), object.as_str(), "test-owner")
@@ -53,10 +53,7 @@ mod tests {
0.0
};
println!(
"Pool stats - Hits: {}, Misses: {}, Releases: {}, Pool size: {}",
hits, misses, releases, pool_size
);
println!("Pool stats - Hits: {hits}, Misses: {misses}, Releases: {releases}, Pool size: {pool_size}");
println!("Hit rate: {:.2}%", hit_rate * 100.0);
// We should see some pool activity
@@ -82,7 +79,7 @@ mod tests {
.expect("Failed to acquire second read lock");
let duration = start.elapsed();
println!("Two read locks on different objects took: {:?}", duration);
println!("Two read locks on different objects took: {duration:?}");
// Should be very fast since no contention
assert!(duration < Duration::from_millis(10), "Read locks should be fast with no contention");
@@ -103,7 +100,7 @@ mod tests {
.expect("Failed to acquire second read lock on same object");
let duration = start.elapsed();
println!("Two read locks on same object took: {:?}", duration);
println!("Two read locks on same object took: {duration:?}");
// Should still be fast since read locks are compatible
assert!(duration < Duration::from_millis(10), "Compatible read locks should be fast");
@@ -132,7 +129,7 @@ mod tests {
.expect("Failed to acquire second read lock");
let second_duration = start.elapsed();
println!("First lock: {:?}, Second lock: {:?}", first_duration, second_duration);
println!("First lock: {first_duration:?}, Second lock: {second_duration:?}");
// Both should be very fast (sub-millisecond typically)
assert!(first_duration < Duration::from_millis(10));
@@ -157,7 +154,7 @@ mod tests {
let result = manager.acquire_locks_batch(batch).await;
let duration = start.elapsed();
println!("Batch operation took: {:?}", duration);
println!("Batch operation took: {duration:?}");
assert!(result.all_acquired, "All locks should be acquired");
assert_eq!(result.successful_locks.len(), 3);

View File

@@ -40,7 +40,7 @@ rustfs-config = { workspace = true, features = ["constants", "observability"] }
rustfs-utils = { workspace = true, features = ["ip", "path"] }
async-trait = { workspace = true }
chrono = { workspace = true }
flexi_logger = { workspace = true, features = ["trc", "kv"] }
flexi_logger = { workspace = true }
nu-ansi-term = { workspace = true }
nvml-wrapper = { workspace = true, optional = true }
opentelemetry = { workspace = true }
@@ -62,6 +62,7 @@ serde_json = { workspace = true }
sysinfo = { workspace = true }
thiserror = { workspace = true }
# Only enable kafka features and related dependencies on Linux
[target.'cfg(target_os = "linux")'.dependencies]
rdkafka = { workspace = true, features = ["tokio"], optional = true }

View File

@@ -21,12 +21,57 @@
## ✨ Features
- **Environment-Aware Logging**: Automatically configures logging behavior based on deployment environment
- Production: File-only logging (stdout disabled by default for security and log aggregation)
- Development/Test: Full logging with stdout support for debugging
- OpenTelemetry integration for distributed tracing
- Prometheus metrics collection and exposition
- Structured logging with configurable levels
- Structured logging with configurable levels and rotation
- Performance profiling and analytics
- Real-time health checks and status monitoring
- Custom dashboards and alerting integration
- Enhanced error handling and resilience
## 🚀 Environment-Aware Logging
The obs module automatically adapts logging behavior based on your deployment environment:
### Production Environment
```bash
# Set production environment - disables stdout logging by default
export RUSTFS_OBS_ENVIRONMENT=production
# All logs go to files only (no stdout) for security and log aggregation
# Enhanced error handling with clear failure diagnostics
```
### Development/Test Environment
```bash
# Set development environment - enables stdout logging
export RUSTFS_OBS_ENVIRONMENT=development
# Logs appear both in files and stdout for easier debugging
# Full span tracking and verbose error messages
```
### Configuration Override
You can always override the environment defaults:
```rust
use rustfs_obs::OtelConfig;
let config = OtelConfig {
endpoint: "".to_string(),
use_stdout: Some(true), // Explicit override - forces stdout even in production
environment: Some("production".to_string()),
..Default::default()
};
```
### Supported Environment Values
- `production` - Secure file-only logging
- `development` - Full debugging with stdout
- `test` - Test environment with stdout support
- `staging` - Staging environment with stdout support
## 📚 Documentation

View File

@@ -13,15 +13,19 @@
// limitations under the License.
use crate::OtelConfig;
use flexi_logger::{Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode, style};
use flexi_logger::{
Age, Cleanup, Criterion, DeferredNow, FileSpec, LogSpecification, Naming, Record, WriteMode,
WriteMode::{AsyncWith, BufferAndFlush},
style,
};
use nu_ansi_term::Color;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{KeyValue, global};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::{
Resource,
logs::SdkLoggerProvider,
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
};
@@ -29,15 +33,19 @@ use opentelemetry_semantic_conventions::{
SCHEMA_URL,
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
};
use rustfs_config::observability::ENV_OBS_LOG_DIRECTORY;
use rustfs_config::{
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,
observability::{
DEFAULT_OBS_ENVIRONMENT_PRODUCTION, DEFAULT_OBS_LOG_FLUSH_MS, DEFAULT_OBS_LOG_MESSAGE_CAPA, DEFAULT_OBS_LOG_POOL_CAPA,
ENV_OBS_LOG_DIRECTORY,
},
};
use rustfs_utils::get_local_ip_with_default;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::fs;
use std::io::IsTerminal;
use std::time::Duration;
use std::{env, fs};
use tracing::info;
use tracing_error::ErrorLayer;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
@@ -121,7 +129,7 @@ fn resource(config: &OtelConfig) -> Resource {
/// Creates a periodic reader for stdout metrics
fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout::MetricExporter> {
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default())
.with_interval(std::time::Duration::from_secs(interval))
.with_interval(Duration::from_secs(interval))
.build()
}
@@ -129,11 +137,23 @@ fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout:
pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
// avoid repeated access to configuration fields
let endpoint = &config.endpoint;
let use_stdout = config.use_stdout.unwrap_or(USE_STDOUT);
let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT);
// Environment-aware stdout configuration
// Check for explicit environment control via RUSTFS_OBS_ENVIRONMENT
let is_production = environment.to_lowercase() == DEFAULT_OBS_ENVIRONMENT_PRODUCTION;
// Default stdout behavior based on environment
let default_use_stdout = if is_production {
false // Disable stdout in production for security and log aggregation
} else {
USE_STDOUT // Use configured default for dev/test environments
};
let use_stdout = config.use_stdout.unwrap_or(default_use_stdout);
let meter_interval = config.meter_interval.unwrap_or(METER_INTERVAL);
let logger_level = config.logger_level.as_deref().unwrap_or(DEFAULT_LOG_LEVEL);
let service_name = config.service_name.as_deref().unwrap_or(APP_NAME);
let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT);
// Configure flexi_logger to cut by time and size
let mut flexi_logger_handle = None;
@@ -144,7 +164,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
// initialize tracer provider
let tracer_provider = {
let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO);
let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 {
let sampler = if (0.0..1.0).contains(&sample_ratio) {
Sampler::TraceIdRatioBased(sample_ratio)
} else {
Sampler::AlwaysOn
@@ -197,7 +217,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
builder = builder.with_reader(
PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(meter_interval))
.with_interval(Duration::from_secs(meter_interval))
.build(),
);
@@ -249,7 +269,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
.with_line_number(true);
// Only add full span events tracking in the development environment
if environment != ENVIRONMENT {
if !is_production {
layer = layer.with_span_events(FmtSpan::FULL);
}
@@ -257,8 +277,7 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
};
let filter = build_env_filter(logger_level, None);
let otel_filter = build_env_filter(logger_level, None);
let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(otel_filter);
let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(build_env_filter(logger_level, None));
let tracer = tracer_provider.tracer(Cow::Borrowed(service_name).to_string());
// Configure registry to avoid repeated calls to filter methods
@@ -280,78 +299,96 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
"OpenTelemetry telemetry initialized with OTLP endpoint: {}, logger_level: {},RUST_LOG env: {}",
endpoint,
logger_level,
std::env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string())
env::var("RUST_LOG").unwrap_or_else(|_| "Not set".to_string())
);
}
}
OtelGuard {
return OtelGuard {
tracer_provider: Some(tracer_provider),
meter_provider: Some(meter_provider),
logger_provider: Some(logger_provider),
_flexi_logger_handles: flexi_logger_handle,
}
} else {
// Obtain the log directory and file name configuration
let default_log_directory = rustfs_utils::dirs::get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY);
let log_directory = config.log_directory.as_deref().unwrap_or(default_log_directory.as_str());
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
if let Err(e) = fs::create_dir_all(log_directory) {
eprintln!("Failed to create log directory {log_directory}: {e}");
}
#[cfg(unix)]
{
// Linux/macOS Setting Permissions
// Set the log directory permissions to 755 (rwxr-xr-x)
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
match fs::set_permissions(log_directory, Permissions::from_mode(0o755)) {
Ok(_) => eprintln!("Log directory permissions set to 755: {log_directory}"),
Err(e) => eprintln!("Failed to set log directory permissions {log_directory}: {e}"),
}
}
// Build log cutting conditions
let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
// Cut by time and size at the same time
(Some(time), Some(size)) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::AgeOrSize(age, size * 1024 * 1024) // Convert to bytes
}
// Cut by time only
(Some(time), None) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::Age(age)
}
// Cut by size only
(None, Some(size)) => {
Criterion::Size(size * 1024 * 1024) // Convert to bytes
}
// By default, it is cut by the day
_ => Criterion::Age(Age::Day),
};
}
// The number of log files retained
let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES);
// Obtain the log directory and file name configuration
let default_log_directory = rustfs_utils::dirs::get_log_directory_to_string(ENV_OBS_LOG_DIRECTORY);
let log_directory = config.log_directory.as_deref().unwrap_or(default_log_directory.as_str());
let log_filename = config.log_filename.as_deref().unwrap_or(service_name);
// Parsing the log level
let log_spec = LogSpecification::parse(logger_level).unwrap_or(LogSpecification::info());
// Enhanced error handling for directory creation
if let Err(e) = fs::create_dir_all(log_directory) {
eprintln!("ERROR: Failed to create log directory '{log_directory}': {e}");
eprintln!("Ensure the parent directory exists and you have write permissions.");
eprintln!("Attempting to continue with logging, but file logging may fail.");
} else {
eprintln!("Log directory ready: {log_directory}");
}
// Convert the logger_level string to the corresponding LevelFilter
let level_filter = match logger_level.to_lowercase().as_str() {
#[cfg(unix)]
{
// Linux/macOS Setting Permissions with better error handling
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
match fs::set_permissions(log_directory, Permissions::from_mode(0o755)) {
Ok(_) => eprintln!("Log directory permissions set to 755: {log_directory}"),
Err(e) => {
eprintln!("WARNING: Failed to set log directory permissions for '{log_directory}': {e}");
eprintln!("This may affect log file access. Consider checking directory ownership and permissions.");
}
}
}
// Build log cutting conditions
let rotation_criterion = match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
// Cut by time and size at the same time
(Some(time), Some(size)) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::AgeOrSize(age, size * 1024 * 1024) // Convert to bytes
}
// Cut by time only
(Some(time), None) => {
let age = match time.to_lowercase().as_str() {
"hour" => Age::Hour,
"day" => Age::Day,
"minute" => Age::Minute,
"second" => Age::Second,
_ => Age::Day, // The default is by day
};
Criterion::Age(age)
}
// Cut by size only
(None, Some(size)) => {
Criterion::Size(size * 1024 * 1024) // Convert to bytes
}
// By default, it is cut by the day
_ => Criterion::Age(Age::Day),
};
// The number of log files retained
let keep_files = config.log_keep_files.unwrap_or(DEFAULT_LOG_KEEP_FILES);
// Parsing the log level
let log_spec = LogSpecification::parse(logger_level).unwrap_or_else(|e| {
eprintln!("WARNING: Invalid logger level '{logger_level}': {e}. Using default 'info' level.");
LogSpecification::info()
});
// Environment-aware stdout configuration
// In production: disable stdout completely (Duplicate::None)
// In development/test: use level-based filtering
let level_filter = if is_production {
flexi_logger::Duplicate::None // No stdout output in production
} else {
// Convert the logger_level string to the corresponding LevelFilter for dev/test
match logger_level.to_lowercase().as_str() {
"trace" => flexi_logger::Duplicate::Trace,
"debug" => flexi_logger::Duplicate::Debug,
"info" => flexi_logger::Duplicate::Info,
@@ -359,56 +396,114 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
"error" => flexi_logger::Duplicate::Error,
"off" => flexi_logger::Duplicate::None,
_ => flexi_logger::Duplicate::Info, // the default is info
};
}
};
// Configure the flexi_logger
let flexi_logger_result = flexi_logger::Logger::try_with_env_or_str(logger_level)
.unwrap_or_else(|e| {
eprintln!("Invalid logger level: {logger_level}, using default: {DEFAULT_LOG_LEVEL}, failed error: {e:?}");
flexi_logger::Logger::with(log_spec.clone())
})
.log_to_file(
FileSpec::default()
.directory(log_directory)
.basename(log_filename)
.suppress_timestamp(),
)
.rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files.into()))
.format_for_files(format_for_file) // Add a custom formatting function for file output
.duplicate_to_stdout(level_filter) // Use dynamic levels
.format_for_stdout(format_with_color) // Add a custom formatting function for terminal output
.write_mode(WriteMode::BufferAndFlush)
.append() // Avoid clearing existing logs at startup
.print_message() // Startup information output to console
.start();
if let Ok(logger) = flexi_logger_result {
// Save the logger handle to keep the logging
flexi_logger_handle = Some(logger);
eprintln!("Flexi logger initialized with file logging to {log_directory}/{log_filename}.log");
// Log logging of log cutting conditions
match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
(Some(time), Some(size)) => eprintln!(
"Log rotation configured for: every {time} or when size exceeds {size}MB, keeping {keep_files} files"
),
(Some(time), None) => eprintln!("Log rotation configured for: every {time}, keeping {keep_files} files"),
(None, Some(size)) => {
eprintln!("Log rotation configured for: when size exceeds {size}MB, keeping {keep_files} files")
}
_ => eprintln!("Log rotation configured for: daily, keeping {keep_files} files"),
// Choose write mode based on environment
let write_mode = if is_production {
get_env_async_with().unwrap_or_else(|| {
eprintln!(
"Using default Async write mode in production. To customize, set RUSTFS_OBS_LOG_POOL_CAPA, RUSTFS_OBS_LOG_MESSAGE_CAPA, and RUSTFS_OBS_LOG_FLUSH_MS environment variables."
);
AsyncWith {
pool_capa: DEFAULT_OBS_LOG_POOL_CAPA,
message_capa: DEFAULT_OBS_LOG_MESSAGE_CAPA,
flush_interval: Duration::from_millis(DEFAULT_OBS_LOG_FLUSH_MS),
}
})
} else {
BufferAndFlush
};
// Configure the flexi_logger with enhanced error handling
let mut flexi_logger_builder = flexi_logger::Logger::try_with_env_or_str(logger_level)
.unwrap_or_else(|e| {
eprintln!("WARNING: Invalid logger configuration '{logger_level}': {e:?}");
eprintln!("Falling back to default configuration with level: {DEFAULT_LOG_LEVEL}");
flexi_logger::Logger::with(log_spec.clone())
})
.log_to_file(
FileSpec::default()
.directory(log_directory)
.basename(log_filename)
.suppress_timestamp(),
)
.rotate(rotation_criterion, Naming::TimestampsDirect, Cleanup::KeepLogFiles(keep_files.into()))
.format_for_files(format_for_file) // Add a custom formatting function for file output
.write_mode(write_mode)
.append(); // Avoid clearing existing logs at startup
// Environment-aware stdout configuration
flexi_logger_builder = flexi_logger_builder.duplicate_to_stdout(level_filter);
// Only add stdout formatting and startup messages in non-production environments
if !is_production {
flexi_logger_builder = flexi_logger_builder
.format_for_stdout(format_with_color) // Add a custom formatting function for terminal output
.print_message(); // Startup information output to console
}
let flexi_logger_result = flexi_logger_builder.start();
if let Ok(logger) = flexi_logger_result {
// Save the logger handle to keep the logging
flexi_logger_handle = Some(logger);
// Environment-aware success messages
if is_production {
eprintln!("Production logging initialized: file-only mode to {log_directory}/{log_filename}.log");
eprintln!("Stdout logging disabled in production environment for security and log aggregation.");
} else {
eprintln!("Failed to initialize flexi_logger: {:?}", flexi_logger_result.err());
eprintln!("Development/Test logging initialized with file logging to {log_directory}/{log_filename}.log");
eprintln!("Stdout logging enabled for debugging. Environment: {environment}");
}
OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
_flexi_logger_handles: flexi_logger_handle,
// Log rotation configuration details
match (config.log_rotation_time.as_deref(), config.log_rotation_size_mb) {
(Some(time), Some(size)) => {
eprintln!("Log rotation configured for: every {time} or when size exceeds {size}MB, keeping {keep_files} files")
}
(Some(time), None) => eprintln!("Log rotation configured for: every {time}, keeping {keep_files} files"),
(None, Some(size)) => {
eprintln!("Log rotation configured for: when size exceeds {size}MB, keeping {keep_files} files")
}
_ => eprintln!("Log rotation configured for: daily, keeping {keep_files} files"),
}
} else {
eprintln!("CRITICAL: Failed to initialize flexi_logger: {:?}", flexi_logger_result.err());
eprintln!("Possible causes:");
eprintln!(" 1. Insufficient permissions to write to log directory: {log_directory}");
eprintln!(" 2. Log directory does not exist or is not accessible");
eprintln!(" 3. Invalid log configuration parameters");
eprintln!(" 4. Disk space issues");
eprintln!("Application will continue but logging to files will not work properly.");
}
OtelGuard {
tracer_provider: None,
meter_provider: None,
logger_provider: None,
_flexi_logger_handles: flexi_logger_handle,
}
}
// Read the AsyncWith parameter from the environment variable
fn get_env_async_with() -> Option<WriteMode> {
let pool_capa = env::var("RUSTFS_OBS_LOG_POOL_CAPA")
.ok()
.and_then(|v| v.parse::<usize>().ok());
let message_capa = env::var("RUSTFS_OBS_LOG_MESSAGE_CAPA")
.ok()
.and_then(|v| v.parse::<usize>().ok());
let flush_ms = env::var("RUSTFS_OBS_LOG_FLUSH_MS").ok().and_then(|v| v.parse::<u64>().ok());
match (pool_capa, message_capa, flush_ms) {
(Some(pool), Some(msg), Some(flush)) => Some(AsyncWith {
pool_capa: pool,
message_capa: msg,
flush_interval: Duration::from_millis(flush),
}),
_ => None,
}
}
@@ -473,3 +568,140 @@ fn format_for_file(w: &mut dyn std::io::Write, now: &mut DeferredNow, record: &R
record.args()
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_production_environment_detection() {
// Test production environment logic
let production_envs = vec!["production", "PRODUCTION", "Production"];
for env_value in production_envs {
let is_production = env_value.to_lowercase() == "production";
assert!(is_production, "Should detect '{env_value}' as production environment");
}
}
#[test]
fn test_non_production_environment_detection() {
// Test non-production environment logic
let non_production_envs = vec!["development", "test", "staging", "dev", "local"];
for env_value in non_production_envs {
let is_production = env_value.to_lowercase() == "production";
assert!(!is_production, "Should not detect '{env_value}' as production environment");
}
}
#[test]
fn test_stdout_behavior_logic() {
// Test the stdout behavior logic without environment manipulation
struct TestCase {
is_production: bool,
config_use_stdout: Option<bool>,
expected_use_stdout: bool,
description: &'static str,
}
let test_cases = vec![
TestCase {
is_production: true,
config_use_stdout: None,
expected_use_stdout: false,
description: "Production with no config should disable stdout",
},
TestCase {
is_production: false,
config_use_stdout: None,
expected_use_stdout: USE_STDOUT,
description: "Non-production with no config should use default",
},
TestCase {
is_production: true,
config_use_stdout: Some(true),
expected_use_stdout: true,
description: "Production with explicit true should enable stdout",
},
TestCase {
is_production: true,
config_use_stdout: Some(false),
expected_use_stdout: false,
description: "Production with explicit false should disable stdout",
},
TestCase {
is_production: false,
config_use_stdout: Some(true),
expected_use_stdout: true,
description: "Non-production with explicit true should enable stdout",
},
];
for case in test_cases {
let default_use_stdout = if case.is_production { false } else { USE_STDOUT };
let actual_use_stdout = case.config_use_stdout.unwrap_or(default_use_stdout);
assert_eq!(actual_use_stdout, case.expected_use_stdout, "Test case failed: {}", case.description);
}
}
#[test]
fn test_log_level_filter_mapping_logic() {
// Test the log level mapping logic used in the real implementation
let test_cases = vec![
("trace", "Trace"),
("debug", "Debug"),
("info", "Info"),
("warn", "Warn"),
("warning", "Warn"),
("error", "Error"),
("off", "None"),
("invalid_level", "Info"), // Should default to Info
];
for (input_level, expected_variant) in test_cases {
let filter_variant = match input_level.to_lowercase().as_str() {
"trace" => "Trace",
"debug" => "Debug",
"info" => "Info",
"warn" | "warning" => "Warn",
"error" => "Error",
"off" => "None",
_ => "Info", // default case
};
assert_eq!(
filter_variant, expected_variant,
"Log level '{input_level}' should map to '{expected_variant}'"
);
}
}
#[test]
fn test_otel_config_environment_defaults() {
// Test that OtelConfig properly handles environment detection logic
let config = OtelConfig {
endpoint: "".to_string(),
use_stdout: None,
environment: Some("production".to_string()),
..Default::default()
};
// Simulate the logic from init_telemetry
let environment = config.environment.as_deref().unwrap_or(ENVIRONMENT);
assert_eq!(environment, "production");
// Test with development environment
let dev_config = OtelConfig {
endpoint: "".to_string(),
use_stdout: None,
environment: Some("development".to_string()),
..Default::default()
};
let dev_environment = dev_config.environment.as_deref().unwrap_or(ENVIRONMENT);
assert_eq!(dev_environment, "development");
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
//! Layered DNS resolution utility for Kubernetes environments
//!
//! This module provides robust DNS resolution with multiple fallback layers:
@@ -396,7 +398,7 @@ mod tests {
// Test cache stats (note: moka cache might not immediately reflect changes)
let (total, _weighted_size) = resolver.cache_stats().await;
// Cache should have at least the entry we just added (might be 0 due to async nature)
assert!(total <= 1, "Cache should have at most 1 entry, got {}", total);
assert!(total <= 1, "Cache should have at most 1 entry, got {total}");
}
#[tokio::test]
@@ -407,12 +409,12 @@ mod tests {
match resolver.resolve("localhost").await {
Ok(ips) => {
assert!(!ips.is_empty());
println!("Resolved localhost to: {:?}", ips);
println!("Resolved localhost to: {ips:?}");
}
Err(e) => {
// In some test environments, even localhost might fail
// This is acceptable as long as our error handling works
println!("DNS resolution failed (might be expected in test environments): {}", e);
println!("DNS resolution failed (might be expected in test environments): {e}");
}
}
}
@@ -428,7 +430,7 @@ mod tests {
assert!(result.is_err());
if let Err(e) = result {
println!("Expected error for invalid domain: {}", e);
println!("Expected error for invalid domain: {e}");
// Should be AllAttemptsFailed since both system and public DNS should fail
assert!(matches!(e, DnsError::AllAttemptsFailed { .. }));
}
@@ -464,10 +466,10 @@ mod tests {
match resolve_domain("localhost").await {
Ok(ips) => {
assert!(!ips.is_empty());
println!("Global resolver resolved localhost to: {:?}", ips);
println!("Global resolver resolved localhost to: {ips:?}");
}
Err(e) => {
println!("Global resolver DNS resolution failed (might be expected in test environments): {}", e);
println!("Global resolver DNS resolution failed (might be expected in test environments): {e}");
}
}
}

View File

@@ -118,17 +118,17 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> std::io::R
pub async fn get_host_ip(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
match host {
Host::Domain(domain) => {
match crate::dns_resolver::resolve_domain(domain).await {
Ok(ips) => {
info!("Resolved domain {domain} using custom DNS resolver: {ips:?}");
return Ok(ips.into_iter().collect());
}
Err(err) => {
error!(
"Failed to resolve domain {domain} using custom DNS resolver, falling back to system resolver,err: {err}"
);
}
}
// match crate::dns_resolver::resolve_domain(domain).await {
// Ok(ips) => {
// info!("Resolved domain {domain} using custom DNS resolver: {ips:?}");
// return Ok(ips.into_iter().collect());
// }
// Err(err) => {
// error!(
// "Failed to resolve domain {domain} using custom DNS resolver, falling back to system resolver,err: {err}"
// );
// }
// }
// Check cache first
if let Ok(mut cache) = DNS_CACHE.lock() {
if let Some(entry) = cache.get(domain) {
@@ -430,7 +430,7 @@ mod test {
match get_host_ip(invalid_host.clone()).await {
Ok(ips) => {
// Depending on DNS resolver behavior, it might return empty set or error
assert!(ips.is_empty(), "Expected empty IP set for invalid domain, got: {:?}", ips);
assert!(ips.is_empty(), "Expected empty IP set for invalid domain, got: {ips:?}");
}
Err(_) => {
error!("Expected error for invalid domain");

View File

@@ -40,6 +40,7 @@ services:
- RUSTFS_ACCESS_KEY=rustfsadmin
- RUSTFS_SECRET_KEY=rustfsadmin
- RUSTFS_LOG_LEVEL=info
- RUSTFS_TLS_PATH=/opt/tls
- RUSTFS_OBS_ENDPOINT=http://otel-collector:4317
volumes:
- rustfs_data_0:/data/rustfs0
@@ -47,6 +48,7 @@ services:
- rustfs_data_2:/data/rustfs2
- rustfs_data_3:/data/rustfs3
- logs_data:/app/logs
- .docker/tls/:/opt/tls # TLS configuration, you should create tls directory and put your tls files in it and then specify the path here
networks:
- rustfs-network
restart: unless-stopped

View File

@@ -12,49 +12,68 @@ elif [ "$1" = "rustfs" ]; then
set -- /usr/bin/rustfs "$@"
fi
# 2) Parse and create local mount directories (ignore http/https), ensure /logs is included
VOLUME_RAW="${RUSTFS_VOLUMES:-/data}"
# Convert comma/tab to space
VOLUME_LIST=$(echo "$VOLUME_RAW" | tr ',\t' ' ')
LOCAL_VOLUMES=""
for vol in $VOLUME_LIST; do
case "$vol" in
/*)
case "$vol" in
http://*|https://*) : ;;
*) LOCAL_VOLUMES="$LOCAL_VOLUMES $vol" ;;
esac
;;
*)
: # skip non-local paths
;;
esac
done
# Ensure /logs is included
case " $LOCAL_VOLUMES " in
*" /logs "*) : ;;
*) LOCAL_VOLUMES="$LOCAL_VOLUMES /logs" ;;
esac
# 2) Process data volumes (separate from log directory)
DATA_VOLUMES=""
process_data_volumes() {
VOLUME_RAW="${RUSTFS_VOLUMES:-/data}"
# Convert comma/tab to space
VOLUME_LIST=$(echo "$VOLUME_RAW" | tr ',\t' ' ')
for vol in $VOLUME_LIST; do
case "$vol" in
/*)
case "$vol" in
http://*|https://*) : ;;
*) DATA_VOLUMES="$DATA_VOLUMES $vol" ;;
esac
;;
*)
: # skip non-local paths
;;
esac
done
echo "Initializing data directories:$DATA_VOLUMES"
for vol in $DATA_VOLUMES; do
if [ ! -d "$vol" ]; then
echo " mkdir -p $vol"
mkdir -p "$vol"
# If target user is specified, try to set directory owner to that user (non-recursive to avoid large disk overhead)
if [ -n "$RUSTFS_UID" ] && [ -n "$RUSTFS_GID" ]; then
chown "$RUSTFS_UID:$RUSTFS_GID" "$vol" 2>/dev/null || true
elif [ -n "$RUSTFS_USERNAME" ] && [ -n "$RUSTFS_GROUPNAME" ]; then
chown "$RUSTFS_USERNAME:$RUSTFS_GROUPNAME" "$vol" 2>/dev/null || true
fi
fi
done
}
echo "Initializing mount directories:$LOCAL_VOLUMES"
for vol in $LOCAL_VOLUMES; do
if [ ! -d "$vol" ]; then
echo " mkdir -p $vol"
mkdir -p "$vol"
# 3) Process log directory (separate from data volumes)
process_log_directory() {
LOG_DIR="${RUSTFS_OBS_LOG_DIRECTORY:-/logs}"
echo "Initializing log directory: $LOG_DIR"
if [ ! -d "$LOG_DIR" ]; then
echo " mkdir -p $LOG_DIR"
mkdir -p "$LOG_DIR"
# If target user is specified, try to set directory owner to that user (non-recursive to avoid large disk overhead)
if [ -n "$RUSTFS_UID" ] && [ -n "$RUSTFS_GID" ]; then
chown "$RUSTFS_UID:$RUSTFS_GID" "$vol" 2>/dev/null || true
chown "$RUSTFS_UID:$RUSTFS_GID" "$LOG_DIR" 2>/dev/null || true
elif [ -n "$RUSTFS_USERNAME" ] && [ -n "$RUSTFS_GROUPNAME" ]; then
chown "$RUSTFS_USERNAME:$RUSTFS_GROUPNAME" "$vol" 2>/dev/null || true
chown "$RUSTFS_USERNAME:$RUSTFS_GROUPNAME" "$LOG_DIR" 2>/dev/null || true
fi
fi
done
}
# 3) Default credentials warning
# Execute the separate processes
process_data_volumes
process_log_directory
# 4) Default credentials warning
if [ "${RUSTFS_ACCESS_KEY}" = "rustfsadmin" ] || [ "${RUSTFS_SECRET_KEY}" = "rustfsadmin" ]; then
echo "!!!WARNING: Using default RUSTFS_ACCESS_KEY or RUSTFS_SECRET_KEY. Override them in production!"
fi
echo "Starting: $*"
set -- "$@" $LOCAL_VOLUMES
set -- "$@" $DATA_VOLUMES
exec "$@"

View File

@@ -122,6 +122,9 @@ libsystemd.workspace = true
[target.'cfg(all(target_os = "linux", target_env = "gnu"))'.dependencies]
tikv-jemallocator = "0.6"
[target.'cfg(all(target_os = "linux", target_env = "musl"))'.dependencies]
mimalloc = "0.1"
[target.'cfg(not(target_os = "windows"))'.dependencies]
pprof = { version = "0.15.0", features = ["flamegraph", "protobuf-codec"] }

View File

@@ -241,7 +241,7 @@ pub async fn config_handler(uri: Uri, Host(host): Host, headers: HeaderMap) -> i
if ip.is_ipv6() { format!("[{ip}]") } else { format!("{ip}") }
} else if let Ok(ip) = raw_host.parse::<IpAddr>() {
// Pure IP (no ports)
if ip.is_ipv6() { format!("[{}]", ip) } else { ip.to_string() }
if ip.is_ipv6() { format!("[{ip}]") } else { ip.to_string() }
} else {
// The domain name may not be able to resolve directly to IP, remove the port
raw_host.split(':').next().unwrap_or(raw_host).to_string()

View File

@@ -1322,7 +1322,7 @@ impl Operation for ProfileHandler {
error!("Failed to build profiler report: {}", e);
return Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from(format!("Failed to build profile report: {}", e)),
Body::from(format!("Failed to build profile report: {e}")),
)));
}
};
@@ -1353,7 +1353,7 @@ impl Operation for ProfileHandler {
error!("Failed to generate flamegraph: {}", e);
return Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from(format!("Failed to generate flamegraph: {}", e)),
Body::from(format!("Failed to generate flamegraph: {e}")),
)));
}
};

View File

@@ -41,6 +41,8 @@ use rustfs_ahm::{
};
use rustfs_common::globals::set_global_addr;
use rustfs_config::DEFAULT_DELIMITER;
use rustfs_config::DEFAULT_UPDATE_CHECK;
use rustfs_config::ENV_UPDATE_CHECK;
use rustfs_ecstore::bucket::metadata_sys;
use rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
use rustfs_ecstore::cmd::bucket_replication::init_bucket_replication_pool;
@@ -62,7 +64,6 @@ use rustfs_iam::init_iam_sys;
use rustfs_notify::global::notifier_instance;
use rustfs_obs::{init_obs, set_global_guard};
use rustfs_targets::arn::TargetID;
use rustfs_utils::dns_resolver::init_global_dns_resolver;
use rustfs_utils::net::parse_and_resolve_address;
use s3s::s3_error;
use std::io::{Error, Result};
@@ -74,6 +75,18 @@ use tracing::{debug, error, info, instrument, warn};
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[cfg(all(target_os = "linux", target_env = "musl"))]
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
const LOGO: &str = r#"
░█▀▄░█░█░█▀▀░▀█▀░█▀▀░█▀▀
░█▀▄░█░█░▀▀█░░█░░█▀▀░▀▀█
░▀░▀░▀▀▀░▀▀▀░░▀░░▀░░░▀▀▀
"#;
#[instrument]
fn print_server_info() {
let current_year = chrono::Utc::now().year();
@@ -97,6 +110,9 @@ async fn main() -> Result<()> {
// Initialize Observability
let (_logger, guard) = init_obs(Some(opt.clone().obs_endpoint)).await;
// print startup logo
info!("{}", LOGO);
// Store in global storage
set_global_guard(guard).map_err(Error::other)?;
@@ -112,12 +128,12 @@ async fn main() -> Result<()> {
async fn run(opt: config::Opt) -> Result<()> {
debug!("opt: {:?}", &opt);
// Initialize global DNS resolver early for enhanced DNS resolution (concurrent)
let dns_init = tokio::spawn(async {
if let Err(e) = init_global_dns_resolver().await {
warn!("Failed to initialize global DNS resolver: {}. Using standard DNS resolution.", e);
}
});
// // Initialize global DNS resolver early for enhanced DNS resolution (concurrent)
// let dns_init = tokio::spawn(async {
// if let Err(e) = rustfs_utils::dns_resolver::init_global_dns_resolver().await {
// warn!("Failed to initialize global DNS resolver: {}. Using standard DNS resolution.", e);
// }
// });
if let Some(region) = &opt.region {
rustfs_ecstore::global::set_global_region(region.clone());
@@ -136,8 +152,8 @@ async fn run(opt: config::Opt) -> Result<()> {
set_global_addr(&opt.address).await;
// Wait for DNS initialization to complete before network-heavy operations
dns_init.await.map_err(Error::other)?;
// // Wait for DNS initialization to complete before network-heavy operations
// dns_init.await.map_err(Error::other)?;
// For RPC
let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(server_address.clone().as_str(), opt.volumes.clone())
@@ -244,31 +260,11 @@ async fn run(opt: config::Opt) -> Result<()> {
// Collect bucket names into a vector
let buckets: Vec<String> = buckets_list.into_iter().map(|v| v.name).collect();
// Parallelize initialization tasks for better network performance
let bucket_metadata_task = tokio::spawn({
let store = store.clone();
let buckets = buckets.clone();
async move {
init_bucket_metadata_sys(store, buckets).await;
}
});
init_bucket_metadata_sys(store.clone(), buckets.clone()).await;
let iam_init_task = tokio::spawn({
let store = store.clone();
async move { init_iam_sys(store).await }
});
init_iam_sys(store.clone()).await.map_err(Error::other)?;
let notification_config_task = tokio::spawn({
let buckets = buckets.clone();
async move {
add_bucket_notification_configuration(buckets).await;
}
});
// Wait for all parallel initialization tasks to complete
bucket_metadata_task.await.map_err(Error::other)?;
iam_init_task.await.map_err(Error::other)??;
notification_config_task.await.map_err(Error::other)?;
add_bucket_notification_configuration(buckets.clone()).await;
// Initialize the global notification system
new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| {
@@ -313,41 +309,7 @@ async fn run(opt: config::Opt) -> Result<()> {
// initialize bucket replication pool
init_bucket_replication_pool().await;
// Async update check with timeout (optional)
tokio::spawn(async {
use crate::update::{UpdateCheckError, check_updates};
// Add timeout to prevent hanging network calls
match tokio::time::timeout(std::time::Duration::from_secs(30), check_updates()).await {
Ok(Ok(result)) => {
if result.update_available {
if let Some(latest) = &result.latest_version {
info!(
"🚀 Version check: New version available: {} -> {} (current: {})",
result.current_version, latest.version, result.current_version
);
if let Some(notes) = &latest.release_notes {
info!("📝 Release notes: {}", notes);
}
if let Some(url) = &latest.download_url {
info!("🔗 Download URL: {}", url);
}
}
} else {
debug!("✅ Version check: Current version is up to date: {}", result.current_version);
}
}
Ok(Err(UpdateCheckError::HttpError(e))) => {
debug!("Version check: network error (this is normal): {}", e);
}
Ok(Err(e)) => {
debug!("Version check: failed (this is normal): {}", e);
}
Err(_) => {
debug!("Version check: timeout after 30 seconds (this is normal)");
}
}
});
init_update_check();
// if opt.console_enable {
// debug!("console is enabled");
@@ -465,6 +427,53 @@ async fn init_event_notifier() {
}
}
fn init_update_check() {
let update_check_enable = std::env::var(ENV_UPDATE_CHECK)
.unwrap_or_else(|_| DEFAULT_UPDATE_CHECK.to_string())
.parse::<bool>()
.unwrap_or(DEFAULT_UPDATE_CHECK);
if !update_check_enable {
return;
}
// Async update check with timeout
tokio::spawn(async {
use crate::update::{UpdateCheckError, check_updates};
// Add timeout to prevent hanging network calls
match tokio::time::timeout(std::time::Duration::from_secs(30), check_updates()).await {
Ok(Ok(result)) => {
if result.update_available {
if let Some(latest) = &result.latest_version {
info!(
"🚀 Version check: New version available: {} -> {} (current: {})",
result.current_version, latest.version, result.current_version
);
if let Some(notes) = &latest.release_notes {
info!("📝 Release notes: {}", notes);
}
if let Some(url) = &latest.download_url {
info!("🔗 Download URL: {}", url);
}
}
} else {
debug!("✅ Version check: Current version is up to date: {}", result.current_version);
}
}
Ok(Err(UpdateCheckError::HttpError(e))) => {
debug!("Version check: network error (this is normal): {}", e);
}
Ok(Err(e)) => {
debug!("Version check: failed (this is normal): {}", e);
}
Err(_) => {
debug!("Version check: timeout after 30 seconds (this is normal)");
}
}
});
}
#[instrument(skip_all)]
async fn add_bucket_notification_configuration(buckets: Vec<String>) {
let region_opt = rustfs_ecstore::global::get_global_region();

View File

@@ -23,7 +23,7 @@ pub fn init_profiler() -> Result<(), Box<dyn std::error::Error>> {
.frequency(1000)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|e| format!("Failed to build profiler guard: {}", e))?;
.map_err(|e| format!("Failed to build profiler guard: {e}"))?;
PROFILER_GUARD
.set(Arc::new(Mutex::new(guard)))

View File

@@ -16,7 +16,7 @@ use crate::admin::console::static_handler;
use crate::config::Opt;
use axum::{Router, extract::Request, middleware, response::Json, routing::get};
use axum_server::tls_rustls::RustlsConfig;
use http::{HeaderValue, Method, header};
use http::{HeaderValue, Method};
use rustfs_config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_utils::net::parse_and_resolve_address;
use serde_json::json;
@@ -230,15 +230,15 @@ async fn health_check() -> Json<serde_json::Value> {
pub fn parse_cors_origins(origins: Option<&String>) -> CorsLayer {
let cors_layer = CorsLayer::new()
.allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE, Method::OPTIONS])
.allow_headers([header::CONTENT_TYPE, header::AUTHORIZATION, header::ACCEPT, header::ORIGIN]);
.allow_headers(Any);
match origins {
Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any),
Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any).expose_headers(Any),
Some(origins_str) => {
let origins: Vec<&str> = origins_str.split(',').map(|s| s.trim()).collect();
if origins.is_empty() {
warn!("Empty CORS origins provided, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
} else {
// Parse origins with proper error handling
let mut valid_origins = Vec::new();
@@ -255,10 +255,10 @@ pub fn parse_cors_origins(origins: Option<&String>) -> CorsLayer {
if valid_origins.is_empty() {
warn!("No valid CORS origins found, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
} else {
info!("Console CORS origins configured: {:?}", valid_origins);
cors_layer.allow_origin(AllowOrigin::list(valid_origins))
cors_layer.allow_origin(AllowOrigin::list(valid_origins)).expose_headers(Any)
}
}
}

View File

@@ -65,26 +65,15 @@ fn parse_cors_origins(origins: Option<&String>) -> CorsLayer {
Method::HEAD,
Method::OPTIONS,
])
.allow_headers([
http::header::CONTENT_TYPE,
http::header::AUTHORIZATION,
http::header::ACCEPT,
http::header::ORIGIN,
// Note: X_AMZ_* headers are custom and may need to be defined
// http::header::X_AMZ_CONTENT_SHA256,
// http::header::X_AMZ_DATE,
// http::header::X_AMZ_SECURITY_TOKEN,
// http::header::X_AMZ_USER_AGENT,
http::header::RANGE,
]);
.allow_headers(Any);
match origins {
Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any),
Some(origins_str) if origins_str == "*" => cors_layer.allow_origin(Any).expose_headers(Any),
Some(origins_str) => {
let origins: Vec<&str> = origins_str.split(',').map(|s| s.trim()).collect();
if origins.is_empty() {
warn!("Empty CORS origins provided, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
} else {
// Parse origins with proper error handling
let mut valid_origins = Vec::new();
@@ -101,16 +90,16 @@ fn parse_cors_origins(origins: Option<&String>) -> CorsLayer {
if valid_origins.is_empty() {
warn!("No valid CORS origins found, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
} else {
info!("Endpoint CORS origins configured: {:?}", valid_origins);
cors_layer.allow_origin(AllowOrigin::list(valid_origins))
cors_layer.allow_origin(AllowOrigin::list(valid_origins)).expose_headers(Any)
}
}
}
None => {
debug!("No CORS origins configured for endpoint, using permissive CORS");
cors_layer.allow_origin(Any)
cors_layer.allow_origin(Any).expose_headers(Any)
}
}
}

View File

@@ -1209,6 +1209,7 @@ impl S3 for FS {
name: v2.name,
prefix: v2.prefix,
max_keys: v2.max_keys,
common_prefixes: v2.common_prefixes,
..Default::default()
}))
}
@@ -1230,6 +1231,7 @@ impl S3 for FS {
let prefix = prefix.unwrap_or_default();
let max_keys = match max_keys {
Some(v) if v > 0 && v <= 1000 => v,
Some(v) if v > 1000 => 1000,
None => 1000,
_ => return Err(s3_error!(InvalidArgument, "max-keys must be between 1 and 1000")),
};

View File

@@ -64,6 +64,9 @@ export RUSTFS_OBS_LOCAL_LOGGING_ENABLED=true # Whether to enable local logging
export RUSTFS_OBS_LOG_DIRECTORY="$current_dir/deploy/logs" # Log directory
export RUSTFS_OBS_LOG_ROTATION_TIME="hour" # Log rotation time unit, can be "second", "minute", "hour", "day"
export RUSTFS_OBS_LOG_ROTATION_SIZE_MB=100 # Log rotation size in MB
export RUSTFS_OBS_LOG_POOL_CAPA=10240
export RUSTFS_OBS_LOG_MESSAGE_CAPA=32768
export RUSTFS_OBS_LOG_FLUSH_MS=300
export RUSTFS_SINKS_FILE_PATH="$current_dir/deploy/logs"
export RUSTFS_SINKS_FILE_BUFFER_SIZE=12