Compare commits

...

5 Commits

Author SHA1 Message Date
majinghe
4595bf7db6 fix docker compose running with no such file error (#519)
* fix docker compose running with no such file error

* fix observability docker compose
2025-09-13 13:04:06 +08:00
guojidan
f372ccf4a8 disable pprof on win (#524)
Signed-off-by: junxiang Mu <1948535941@qq.com>
2025-09-12 18:43:45 +08:00
guojidan
9ce867f585 feat(lock): Optimize lock management performance in high-concurrency scenarios (#523)
Increase the size of the notification pool to reduce the thundering herd effect under high concurrency
Implement an adaptive timeout mechanism that dynamically adjusts based on system load and priority
Add a lock protection mechanism to prevent premature cleanup of active locks
Add lock acquisition methods for high-priority and critical-priority locks
Improve the cleanup strategy to be more conservative under high load
Add detailed debug logs to assist in diagnosing lock issues

Signed-off-by: junxiang Mu <1948535941@qq.com>
2025-09-12 18:17:07 +08:00
guojidan
124c31a68b refactor(profiling): Remove performance profiling support for Windows and optimize dependency management (#518)
Remove the pprof performance profiling functionality on the Windows platform, as this platform does not support the relevant features
Move the pprof dependency to the platform-specific configuration for non-Windows systems
Update the performance profiling endpoint handling logic to distinguish between platform support statuses
Add the CLAUDE.md document to explain project build and architecture information

Signed-off-by: RustFS Developer <dandan@rustfs.com>
Co-authored-by: RustFS Developer <dandan@rustfs.com>
2025-09-12 09:11:44 +08:00
guojidan
62a01f3801 Performance: improve (#514)
* Performance: improve

Signed-off-by: junxiang Mu <1948535941@qq.com>

* remove dirty

Signed-off-by: junxiang Mu <1948535941@qq.com>

* fix some err

Signed-off-by: junxiang Mu <1948535941@qq.com>

---------

Signed-off-by: junxiang Mu <1948535941@qq.com>
2025-09-11 19:48:28 +08:00
32 changed files with 3343 additions and 238 deletions

View File

@@ -14,18 +14,27 @@
services:
tempo-init:
image: busybox:latest
command: ["sh", "-c", "chown -R 10001:10001 /var/tempo"]
volumes:
- ./tempo-data:/var/tempo
user: root
networks:
- otel-network
restart: "no"
tempo:
image: grafana/tempo:latest
#user: root # The container must be started with root to execute chown in the script
#entrypoint: [ "/etc/tempo/entrypoint.sh" ] # Specify a custom entry point
user: "10001" # The container must be started with root to execute chown in the script
command: [ "-config.file=/etc/tempo.yaml" ] # This is passed as a parameter to the entry point script
volumes:
- ./tempo-entrypoint.sh:/etc/tempo/entrypoint.sh # Mount entry point script
- ./tempo.yaml:/etc/tempo.yaml
- ./tempo.yaml:/etc/tempo.yaml:ro
- ./tempo-data:/var/tempo
ports:
- "3200:3200" # tempo
- "24317:4317" # otlp grpc
restart: unless-stopped
networks:
- otel-network
@@ -94,4 +103,4 @@ networks:
driver: bridge
name: "network_otel_config"
driver_opts:
com.docker.network.enable_ipv6: "true"
com.docker.network.enable_ipv6: "true"

View File

@@ -1,8 +0,0 @@
#!/bin/sh
# Run as root to fix directory permissions
chown -R 10001:10001 /var/tempo
# Use su-exec (a lightweight sudo/gosu alternative, commonly used in Alpine mirroring)
# Switch to user 10001 and execute the original command (CMD) passed to the script
# "$@" represents all parameters passed to this script, i.e. command in docker-compose
exec su-exec 10001:10001 /tempo "$@"

122
CLAUDE.md Normal file
View File

@@ -0,0 +1,122 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
RustFS is a high-performance distributed object storage software built with Rust, providing S3-compatible APIs and advanced features like data lakes, AI, and big data support. It's designed as an alternative to MinIO with better performance and a more business-friendly Apache 2.0 license.
## Build Commands
### Primary Build Commands
- `cargo build --release` - Build the main RustFS binary
- `./build-rustfs.sh` - Recommended build script that handles console resources and cross-platform compilation
- `./build-rustfs.sh --dev` - Development build with debug symbols
- `make build` or `just build` - Use Make/Just for standardized builds
### Platform-Specific Builds
- `./build-rustfs.sh --platform x86_64-unknown-linux-musl` - Build for musl target
- `./build-rustfs.sh --platform aarch64-unknown-linux-gnu` - Build for ARM64
- `make build-musl` or `just build-musl` - Build musl variant
- `make build-cross-all` - Build all supported architectures
### Testing Commands
- `cargo test --workspace --exclude e2e_test` - Run unit tests (excluding e2e tests)
- `cargo nextest run --all --exclude e2e_test` - Use nextest if available (faster)
- `cargo test --all --doc` - Run documentation tests
- `make test` or `just test` - Run full test suite
### Code Quality
- `cargo fmt --all` - Format code
- `cargo clippy --all-targets --all-features -- -D warnings` - Lint code
- `make pre-commit` or `just pre-commit` - Run all quality checks (fmt, clippy, check, test)
### Docker Build Commands
- `make docker-buildx` - Build multi-architecture production images
- `make docker-dev-local` - Build development image for local use
- `./docker-buildx.sh --push` - Build and push production images
## Architecture Overview
### Core Components
**Main Binary (`rustfs/`):**
- Entry point at `rustfs/src/main.rs`
- Core modules: admin, auth, config, server, storage, license management, profiling
- HTTP server with S3-compatible APIs
- Service state management and graceful shutdown
- Parallel service initialization with DNS resolver, bucket metadata, and IAM
**Key Crates (`crates/`):**
- `ecstore` - Erasure coding storage implementation (core storage layer)
- `iam` - Identity and Access Management
- `madmin` - Management dashboard and admin API interface
- `s3select-api` & `s3select-query` - S3 Select API and query engine
- `config` - Configuration management with notify features
- `crypto` - Cryptography and security features
- `lock` - Distributed locking implementation
- `filemeta` - File metadata management
- `rio` - Rust I/O utilities and abstractions
- `common` - Shared utilities and data structures
- `protos` - Protocol buffer definitions
- `audit-logger` - Audit logging for file operations
- `notify` - Event notification system
- `obs` - Observability utilities
- `workers` - Worker thread pools and task scheduling
- `appauth` - Application authentication and authorization
### Build System
- Cargo workspace with 25+ crates
- Custom `build-rustfs.sh` script for advanced build options
- Multi-architecture Docker builds via `docker-buildx.sh`
- Both Make and Just task runners supported
- Cross-compilation support for multiple Linux targets
### Key Dependencies
- `axum` - HTTP framework for S3 API server
- `tokio` - Async runtime
- `s3s` - S3 protocol implementation library
- `datafusion` - For S3 Select query processing
- `hyper`/`hyper-util` - HTTP client/server utilities
- `rustls` - TLS implementation
- `serde`/`serde_json` - Serialization
- `tracing` - Structured logging and observability
- `pprof` - Performance profiling with flamegraph support
- `tikv-jemallocator` - Memory allocator for Linux GNU builds
### Development Workflow
- Console resources are embedded during build via `rust-embed`
- Protocol buffers generated via custom `gproto` binary
- E2E tests in separate crate (`e2e_test`)
- Shadow build for version/metadata embedding
- Support for both GNU and musl libc targets
### Performance & Observability
- Performance profiling available with `pprof` integration (disabled on Windows)
- Profiling enabled via environment variables in production
- Built-in observability with OpenTelemetry integration
- Background services (scanner, heal) can be controlled via environment variables:
- `RUSTFS_ENABLE_SCANNER` (default: true)
- `RUSTFS_ENABLE_HEAL` (default: true)
### Service Architecture
- Service state management with graceful shutdown handling
- Parallel initialization of core systems (DNS, bucket metadata, IAM)
- Event notification system with MQTT and webhook support
- Auto-heal and data scanner for storage integrity
- Jemalloc allocator for Linux GNU targets for better performance
## Environment Variables
- `RUSTFS_ENABLE_SCANNER` - Enable/disable background data scanner
- `RUSTFS_ENABLE_HEAL` - Enable/disable auto-heal functionality
- Various profiling and observability controls
## Code Style
- Communicate with me in Chinese, but only English can be used in code files.
- Code that may cause program crashes (such as unwrap/expect) must not be used, except for testing purposes.
- Code that may cause performance issues (such as blocking IO) must not be used, except for testing purposes.
- Code that may cause memory leaks must not be used, except for testing purposes.
- Code that may cause deadlocks must not be used, except for testing purposes.
- Code that may cause undefined behavior must not be used, except for testing purposes.
- Code that may cause panics must not be used, except for testing purposes.
- Code that may cause data races must not be used, except for testing purposes.

256
Cargo.lock generated
View File

@@ -75,6 +75,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "aligned-vec"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc890384c8602f339876ded803c97ad529f3842aba97f6392b3dba0dd171769b"
dependencies = [
"equator",
]
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
@@ -1645,6 +1654,15 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "cpp_demangle"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96e58d342ad113c2b878f16d5d034c03be492ae460cdbc02b7f0f2284d310c7d"
dependencies = [
"cfg-if",
]
[[package]]
name = "cpufeatures"
version = "0.2.17"
@@ -2439,6 +2457,15 @@ dependencies = [
"sqlparser",
]
[[package]]
name = "debugid"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d"
dependencies = [
"uuid",
]
[[package]]
name = "deflate64"
version = "0.1.9"
@@ -2696,6 +2723,26 @@ dependencies = [
"syn 2.0.106",
]
[[package]]
name = "equator"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4711b213838dfee0117e3be6ac926007d7f433d7bbe33595975d4190cb07e6fc"
dependencies = [
"equator-macro",
]
[[package]]
name = "equator-macro"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]]
name = "equivalent"
version = "1.0.2"
@@ -2785,6 +2832,18 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "findshlibs"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64"
dependencies = [
"cc",
"lazy_static",
"libc",
"winapi",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
@@ -2856,7 +2915,7 @@ checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095"
dependencies = [
"futures-core",
"futures-sink",
"spin",
"spin 0.9.8",
]
[[package]]
@@ -3620,6 +3679,24 @@ dependencies = [
"hashbrown 0.15.5",
]
[[package]]
name = "inferno"
version = "0.11.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
dependencies = [
"ahash",
"indexmap",
"is-terminal",
"itoa",
"log",
"num-format",
"once_cell",
"quick-xml 0.26.0",
"rgb",
"str_stack",
]
[[package]]
name = "inlinable_string"
version = "0.1.15"
@@ -3709,6 +3786,17 @@ dependencies = [
"serde",
]
[[package]]
name = "is-terminal"
version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9"
dependencies = [
"hermit-abi",
"libc",
"windows-sys 0.59.0",
]
[[package]]
name = "is_debug"
version = "1.1.0"
@@ -3815,7 +3903,7 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
dependencies = [
"spin",
"spin 0.9.8",
]
[[package]]
@@ -4115,6 +4203,15 @@ version = "2.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
[[package]]
name = "memmap2"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "843a98750cd611cc2965a8213b53b43e715f13c37a9e096c6408e69990961db7"
dependencies = [
"libc",
]
[[package]]
name = "memoffset"
version = "0.9.1"
@@ -4230,6 +4327,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "nix"
version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b"
dependencies = [
"bitflags 1.3.2",
"cfg-if",
"libc",
]
[[package]]
name = "nix"
version = "0.29.0"
@@ -4393,6 +4501,16 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-format"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3"
dependencies = [
"arrayvec",
"itoa",
]
[[package]]
name = "num-integer"
version = "0.1.46"
@@ -5044,6 +5162,30 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "pprof"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38a01da47675efa7673b032bf8efd8214f1917d89685e07e395ab125ea42b187"
dependencies = [
"aligned-vec",
"backtrace",
"cfg-if",
"findshlibs",
"inferno",
"libc",
"log",
"nix 0.26.4",
"once_cell",
"protobuf",
"protobuf-codegen",
"smallvec",
"spin 0.10.0",
"symbolic-demangle",
"tempfile",
"thiserror 2.0.16",
]
[[package]]
name = "ppv-lite86"
version = "0.2.21"
@@ -5181,6 +5323,57 @@ dependencies = [
"prost 0.14.1",
]
[[package]]
name = "protobuf"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4"
dependencies = [
"once_cell",
"protobuf-support",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf-codegen"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace"
dependencies = [
"anyhow",
"once_cell",
"protobuf",
"protobuf-parse",
"regex",
"tempfile",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf-parse"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973"
dependencies = [
"anyhow",
"indexmap",
"log",
"protobuf",
"protobuf-support",
"tempfile",
"thiserror 1.0.69",
"which",
]
[[package]]
name = "protobuf-support"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6"
dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "psm"
version = "0.1.26"
@@ -5210,6 +5403,15 @@ dependencies = [
"pulldown-cmark",
]
[[package]]
name = "quick-xml"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd"
dependencies = [
"memchr",
]
[[package]]
name = "quick-xml"
version = "0.37.5"
@@ -5592,6 +5794,15 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rgb"
version = "0.8.52"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c6a884d2998352bb4daf0183589aec883f16a6da1f4dde84d8e2e9a5409a1ce"
dependencies = [
"bytemuck",
]
[[package]]
name = "ring"
version = "0.17.14"
@@ -5786,6 +5997,7 @@ dependencies = [
"opentelemetry",
"percent-encoding",
"pin-project-lite",
"pprof",
"reqwest",
"rust-embed",
"rustfs-ahm",
@@ -5981,9 +6193,11 @@ dependencies = [
"hyper-util",
"lazy_static",
"md-5",
"moka",
"nix 0.30.1",
"num_cpus",
"once_cell",
"parking_lot",
"path-absolutize",
"pin-project-lite",
"quick-xml 0.38.3",
@@ -7146,6 +7360,15 @@ dependencies = [
"lock_api",
]
[[package]]
name = "spin"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.6.0"
@@ -7223,6 +7446,12 @@ dependencies = [
"thiserror 2.0.16",
]
[[package]]
name = "str_stack"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
[[package]]
name = "strsim"
version = "0.11.1"
@@ -7334,6 +7563,29 @@ dependencies = [
"sval_nested",
]
[[package]]
name = "symbolic-common"
version = "12.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9da12f8fecbbeaa1ee62c1d50dc656407e007c3ee7b2a41afce4b5089eaef15e"
dependencies = [
"debugid",
"memmap2",
"stable_deref_trait",
"uuid",
]
[[package]]
name = "symbolic-demangle"
version = "12.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fd35afe0ef9d35d3dcd41c67ddf882fc832a387221338153b7cd685a105495c"
dependencies = [
"cpp_demangle",
"rustc-demangle",
"symbolic-common",
]
[[package]]
name = "syn"
version = "1.0.109"

View File

@@ -101,6 +101,8 @@ rustfs-signer.workspace = true
rustfs-checksums.workspace = true
futures-util.workspace = true
async-recursion.workspace = true
parking_lot = "0.12"
moka = { version = "0.12", features = ["future"] }
[target.'cfg(not(windows))'.dependencies]
nix = { workspace = true }

View File

@@ -0,0 +1,231 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! High-performance batch processor using JoinSet
//!
//! This module provides optimized batching utilities to reduce async runtime overhead
//! and improve concurrent operation performance.
use crate::disk::error::{Error, Result};
use std::future::Future;
use std::sync::Arc;
use tokio::task::JoinSet;
/// Batch processor that executes tasks concurrently with a semaphore
pub struct AsyncBatchProcessor {
max_concurrent: usize,
}
impl AsyncBatchProcessor {
pub fn new(max_concurrent: usize) -> Self {
Self { max_concurrent }
}
/// Execute a batch of tasks concurrently with concurrency control
pub async fn execute_batch<T, F>(&self, tasks: Vec<F>) -> Vec<Result<T>>
where
T: Send + 'static,
F: Future<Output = Result<T>> + Send + 'static,
{
if tasks.is_empty() {
return Vec::new();
}
let semaphore = Arc::new(tokio::sync::Semaphore::new(self.max_concurrent));
let mut join_set = JoinSet::new();
let mut results = Vec::with_capacity(tasks.len());
for _ in 0..tasks.len() {
results.push(Err(Error::other("Not completed")));
}
// Spawn all tasks with semaphore control
for (i, task) in tasks.into_iter().enumerate() {
let sem = semaphore.clone();
join_set.spawn(async move {
let _permit = sem.acquire().await.map_err(|_| Error::other("Semaphore error"))?;
let result = task.await;
Ok::<(usize, Result<T>), Error>((i, result))
});
}
// Collect results
while let Some(join_result) = join_set.join_next().await {
match join_result {
Ok(Ok((index, task_result))) => {
if index < results.len() {
results[index] = task_result;
}
}
Ok(Err(e)) => {
// Semaphore or other system error - this is rare
tracing::warn!("Batch processor system error: {:?}", e);
}
Err(join_error) => {
// Task panicked - log but continue
tracing::warn!("Task panicked in batch processor: {:?}", join_error);
}
}
}
results
}
/// Execute batch with early termination when sufficient successful results are obtained
pub async fn execute_batch_with_quorum<T, F>(&self, tasks: Vec<F>, required_successes: usize) -> Result<Vec<T>>
where
T: Send + 'static,
F: Future<Output = Result<T>> + Send + 'static,
{
let results = self.execute_batch(tasks).await;
let mut successes = Vec::new();
for value in results.into_iter().flatten() {
successes.push(value);
if successes.len() >= required_successes {
return Ok(successes);
}
}
if successes.len() >= required_successes {
Ok(successes)
} else {
Err(Error::other(format!(
"Insufficient successful results: got {}, needed {}",
successes.len(),
required_successes
)))
}
}
}
/// Global batch processor instances
pub struct GlobalBatchProcessors {
read_processor: AsyncBatchProcessor,
write_processor: AsyncBatchProcessor,
metadata_processor: AsyncBatchProcessor,
}
impl GlobalBatchProcessors {
pub fn new() -> Self {
Self {
read_processor: AsyncBatchProcessor::new(16), // Higher concurrency for reads
write_processor: AsyncBatchProcessor::new(8), // Lower concurrency for writes
metadata_processor: AsyncBatchProcessor::new(12), // Medium concurrency for metadata
}
}
pub fn read_processor(&self) -> &AsyncBatchProcessor {
&self.read_processor
}
pub fn write_processor(&self) -> &AsyncBatchProcessor {
&self.write_processor
}
pub fn metadata_processor(&self) -> &AsyncBatchProcessor {
&self.metadata_processor
}
}
impl Default for GlobalBatchProcessors {
fn default() -> Self {
Self::new()
}
}
// Global instance
use std::sync::OnceLock;
static GLOBAL_PROCESSORS: OnceLock<GlobalBatchProcessors> = OnceLock::new();
pub fn get_global_processors() -> &'static GlobalBatchProcessors {
GLOBAL_PROCESSORS.get_or_init(GlobalBatchProcessors::new)
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[tokio::test]
async fn test_batch_processor_basic() {
let processor = AsyncBatchProcessor::new(4);
let tasks: Vec<_> = (0..10)
.map(|i| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
Ok::<i32, Error>(i)
})
.collect();
let results = processor.execute_batch(tasks).await;
assert_eq!(results.len(), 10);
// All tasks should succeed
for (i, result) in results.iter().enumerate() {
assert!(result.is_ok());
assert_eq!(result.as_ref().unwrap(), &(i as i32));
}
}
#[tokio::test]
async fn test_batch_processor_with_errors() {
let processor = AsyncBatchProcessor::new(2);
let tasks: Vec<_> = (0..5)
.map(|i| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
if i % 2 == 0 {
Ok::<i32, Error>(i)
} else {
Err(Error::other("Test error"))
}
})
.collect();
let results = processor.execute_batch(tasks).await;
assert_eq!(results.len(), 5);
// Check results pattern
for (i, result) in results.iter().enumerate() {
if i % 2 == 0 {
assert!(result.is_ok());
assert_eq!(result.as_ref().unwrap(), &(i as i32));
} else {
assert!(result.is_err());
}
}
}
#[tokio::test]
async fn test_batch_processor_quorum() {
let processor = AsyncBatchProcessor::new(4);
let tasks: Vec<_> = (0..10)
.map(|i| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
if i < 3 {
Ok::<i32, Error>(i)
} else {
Err(Error::other("Test error"))
}
})
.collect();
let results = processor.execute_batch_with_quorum(tasks, 2).await;
assert!(results.is_ok());
let successes = results.unwrap();
assert!(successes.len() >= 2);
}
}

View File

@@ -321,7 +321,7 @@ impl ExpiryState {
let mut state = GLOBAL_ExpiryState.write().await;
while state.tasks_tx.len() < n {
let (tx, rx) = mpsc::channel(10000);
let (tx, rx) = mpsc::channel(1000);
let api = api.clone();
let rx = Arc::new(tokio::sync::Mutex::new(rx));
state.tasks_tx.push(tx);
@@ -432,7 +432,7 @@ pub struct TransitionState {
impl TransitionState {
#[allow(clippy::new_ret_no_self)]
pub fn new() -> Arc<Self> {
let (tx1, rx1) = bounded(100000);
let (tx1, rx1) = bounded(1000);
let (tx2, rx2) = bounded(1);
Arc::new(Self {
transition_tx: tx1,
@@ -467,8 +467,12 @@ impl TransitionState {
}
pub async fn init(api: Arc<ECStore>) {
let mut n = 10; //globalAPIConfig.getTransitionWorkers();
let tw = 10; //globalILMConfig.getTransitionWorkers();
let max_workers = std::env::var("RUSTFS_MAX_TRANSITION_WORKERS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or_else(|| std::cmp::min(num_cpus::get() as i64, 16));
let mut n = max_workers;
let tw = 8; //globalILMConfig.getTransitionWorkers();
if tw > 0 {
n = tw;
}
@@ -561,8 +565,18 @@ impl TransitionState {
pub async fn update_workers_inner(api: Arc<ECStore>, n: i64) {
let mut n = n;
if n == 0 {
n = 100;
let max_workers = std::env::var("RUSTFS_MAX_TRANSITION_WORKERS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or_else(|| std::cmp::min(num_cpus::get() as i64, 16));
n = max_workers;
}
// Allow environment override of maximum workers
let absolute_max = std::env::var("RUSTFS_ABSOLUTE_MAX_WORKERS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(32);
n = std::cmp::min(n, absolute_max);
let mut num_workers = GLOBAL_TransitionState.num_workers.load(Ordering::SeqCst);
while num_workers < n {
@@ -585,7 +599,10 @@ impl TransitionState {
}
pub async fn init_background_expiry(api: Arc<ECStore>) {
let mut workers = num_cpus::get() / 2;
let mut workers = std::env::var("RUSTFS_MAX_EXPIRY_WORKERS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or_else(|| std::cmp::min(num_cpus::get(), 16));
//globalILMConfig.getExpirationWorkers()
if let Ok(env_expiration_workers) = env::var("_RUSTFS_ILM_EXPIRATION_WORKERS") {
if let Ok(num_expirations) = env_expiration_workers.parse::<usize>() {
@@ -594,7 +611,10 @@ pub async fn init_background_expiry(api: Arc<ECStore>) {
}
if workers == 0 {
workers = 100;
workers = std::env::var("RUSTFS_DEFAULT_EXPIRY_WORKERS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(8);
}
//let expiry_state = GLOBAL_ExpiryStSate.write().await;

View File

@@ -12,13 +12,45 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{fs::Metadata, path::Path};
use std::{
fs::Metadata,
path::Path,
sync::{Arc, OnceLock},
};
use tokio::{
fs::{self, File},
io,
};
static READONLY_OPTIONS: OnceLock<Arc<fs::OpenOptions>> = OnceLock::new();
static WRITEONLY_OPTIONS: OnceLock<Arc<fs::OpenOptions>> = OnceLock::new();
static READWRITE_OPTIONS: OnceLock<Arc<fs::OpenOptions>> = OnceLock::new();
fn get_readonly_options() -> &'static Arc<fs::OpenOptions> {
READONLY_OPTIONS.get_or_init(|| {
let mut opts = fs::OpenOptions::new();
opts.read(true);
Arc::new(opts)
})
}
fn get_writeonly_options() -> &'static Arc<fs::OpenOptions> {
WRITEONLY_OPTIONS.get_or_init(|| {
let mut opts = fs::OpenOptions::new();
opts.write(true);
Arc::new(opts)
})
}
fn get_readwrite_options() -> &'static Arc<fs::OpenOptions> {
READWRITE_OPTIONS.get_or_init(|| {
let mut opts = fs::OpenOptions::new();
opts.read(true).write(true);
Arc::new(opts)
})
}
#[cfg(not(windows))]
pub fn same_file(f1: &Metadata, f2: &Metadata) -> bool {
use std::os::unix::fs::MetadataExt;
@@ -84,35 +116,28 @@ pub const O_APPEND: FileMode = 0x00400;
// create_new: bool,
pub async fn open_file(path: impl AsRef<Path>, mode: FileMode) -> io::Result<File> {
let mut opts = fs::OpenOptions::new();
match mode & (O_RDONLY | O_WRONLY | O_RDWR) {
O_RDONLY => {
opts.read(true);
}
O_WRONLY => {
opts.write(true);
}
O_RDWR => {
opts.read(true);
opts.write(true);
}
_ => (),
let base_opts = match mode & (O_RDONLY | O_WRONLY | O_RDWR) {
O_RDONLY => get_readonly_options(),
O_WRONLY => get_writeonly_options(),
O_RDWR => get_readwrite_options(),
_ => get_readonly_options(),
};
if mode & O_CREATE != 0 {
opts.create(true);
if (mode & (O_CREATE | O_APPEND | O_TRUNC)) != 0 {
let mut opts = (**base_opts).clone();
if mode & O_CREATE != 0 {
opts.create(true);
}
if mode & O_APPEND != 0 {
opts.append(true);
}
if mode & O_TRUNC != 0 {
opts.truncate(true);
}
opts.open(path.as_ref()).await
} else {
base_opts.open(path.as_ref()).await
}
if mode & O_APPEND != 0 {
opts.append(true);
}
if mode & O_TRUNC != 0 {
opts.truncate(true);
}
opts.open(path.as_ref()).await
}
pub async fn access(path: impl AsRef<Path>) -> io::Result<()> {
@@ -121,7 +146,7 @@ pub async fn access(path: impl AsRef<Path>) -> io::Result<()> {
}
pub fn access_std(path: impl AsRef<Path>) -> io::Result<()> {
tokio::task::block_in_place(|| std::fs::metadata(path))?;
std::fs::metadata(path)?;
Ok(())
}
@@ -130,7 +155,7 @@ pub async fn lstat(path: impl AsRef<Path>) -> io::Result<Metadata> {
}
pub fn lstat_std(path: impl AsRef<Path>) -> io::Result<Metadata> {
tokio::task::block_in_place(|| std::fs::metadata(path))
std::fs::metadata(path)
}
pub async fn make_dir_all(path: impl AsRef<Path>) -> io::Result<()> {
@@ -159,26 +184,22 @@ pub async fn remove_all(path: impl AsRef<Path>) -> io::Result<()> {
#[tracing::instrument(level = "debug", skip_all)]
pub fn remove_std(path: impl AsRef<Path>) -> io::Result<()> {
let path = path.as_ref();
tokio::task::block_in_place(|| {
let meta = std::fs::metadata(path)?;
if meta.is_dir() {
std::fs::remove_dir(path)
} else {
std::fs::remove_file(path)
}
})
let meta = std::fs::metadata(path)?;
if meta.is_dir() {
std::fs::remove_dir(path)
} else {
std::fs::remove_file(path)
}
}
pub fn remove_all_std(path: impl AsRef<Path>) -> io::Result<()> {
let path = path.as_ref();
tokio::task::block_in_place(|| {
let meta = std::fs::metadata(path)?;
if meta.is_dir() {
std::fs::remove_dir_all(path)
} else {
std::fs::remove_file(path)
}
})
let meta = std::fs::metadata(path)?;
if meta.is_dir() {
std::fs::remove_dir_all(path)
} else {
std::fs::remove_file(path)
}
}
pub async fn mkdir(path: impl AsRef<Path>) -> io::Result<()> {
@@ -190,7 +211,7 @@ pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<
}
pub fn rename_std(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<()> {
tokio::task::block_in_place(|| std::fs::rename(from, to))
std::fs::rename(from, to)
}
#[tracing::instrument(level = "debug", skip_all)]

View File

@@ -41,18 +41,21 @@ use tokio::time::interval;
use crate::erasure_coding::bitrot_verify;
use bytes::Bytes;
use path_absolutize::Absolutize;
// use path_absolutize::Absolutize; // Replaced with direct path operations for better performance
use crate::file_cache::{get_global_file_cache, prefetch_metadata_patterns, read_metadata_cached};
use parking_lot::RwLock as ParkingLotRwLock;
use rustfs_filemeta::{
Cache, FileInfo, FileInfoOpts, FileMeta, MetaCacheEntry, MetacacheWriter, ObjectPartInfo, Opts, RawFileInfo, UpdateFn,
get_file_info, read_xl_meta_no_data,
};
use rustfs_utils::HashAlgorithm;
use rustfs_utils::os::get_info;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::io::SeekFrom;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use std::{
fs::Metadata,
@@ -101,6 +104,9 @@ pub struct LocalDisk {
pub major: u64,
pub minor: u64,
pub nrrequests: u64,
// Performance optimization fields
path_cache: Arc<ParkingLotRwLock<HashMap<String, PathBuf>>>,
current_dir: Arc<OnceLock<PathBuf>>,
// pub id: Mutex<Option<Uuid>>,
// pub format_data: Mutex<Vec<u8>>,
// pub format_file_info: Mutex<Option<Metadata>>,
@@ -130,8 +136,9 @@ impl Debug for LocalDisk {
impl LocalDisk {
pub async fn new(ep: &Endpoint, cleanup: bool) -> Result<Self> {
debug!("Creating local disk");
let root = match PathBuf::from(ep.get_file_path()).absolutize() {
Ok(path) => path.into_owned(),
// Use optimized path resolution instead of absolutize() for better performance
let root = match std::fs::canonicalize(ep.get_file_path()) {
Ok(path) => path,
Err(e) => {
if e.kind() == ErrorKind::NotFound {
return Err(DiskError::VolumeNotFound);
@@ -144,10 +151,8 @@ impl LocalDisk {
// TODO: 删除 tmp 数据
}
let format_path = Path::new(RUSTFS_META_BUCKET)
.join(Path::new(super::FORMAT_CONFIG_FILE))
.absolutize_virtually(&root)?
.into_owned();
// Use optimized path resolution instead of absolutize_virtually
let format_path = root.join(RUSTFS_META_BUCKET).join(super::FORMAT_CONFIG_FILE);
debug!("format_path: {:?}", format_path);
let (format_data, format_meta) = read_file_exists(&format_path).await?;
@@ -227,6 +232,8 @@ impl LocalDisk {
// format_file_info: Mutex::new(format_meta),
// format_data: Mutex::new(format_data),
// format_last_check: Mutex::new(format_last_check),
path_cache: Arc::new(ParkingLotRwLock::new(HashMap::with_capacity(2048))),
current_dir: Arc::new(OnceLock::new()),
exit_signal: None,
};
let (info, _root) = get_disk_info(root).await?;
@@ -351,19 +358,178 @@ impl LocalDisk {
self.make_volumes(defaults).await
}
// Optimized path resolution with caching
pub fn resolve_abs_path(&self, path: impl AsRef<Path>) -> Result<PathBuf> {
Ok(path.as_ref().absolutize_virtually(&self.root)?.into_owned())
let path_ref = path.as_ref();
let path_str = path_ref.to_string_lossy();
// Fast cache read
{
let cache = self.path_cache.read();
if let Some(cached_path) = cache.get(path_str.as_ref()) {
return Ok(cached_path.clone());
}
}
// Calculate absolute path without using path_absolutize for better performance
let abs_path = if path_ref.is_absolute() {
path_ref.to_path_buf()
} else {
self.root.join(path_ref)
};
// Normalize path components to avoid filesystem calls
let normalized = self.normalize_path_components(&abs_path);
// Cache the result
{
let mut cache = self.path_cache.write();
// Simple cache size control
if cache.len() >= 4096 {
// Clear half the cache - simple eviction strategy
let keys_to_remove: Vec<_> = cache.keys().take(cache.len() / 2).cloned().collect();
for key in keys_to_remove {
cache.remove(&key);
}
}
cache.insert(path_str.into_owned(), normalized.clone());
}
Ok(normalized)
}
// Lightweight path normalization without filesystem calls
fn normalize_path_components(&self, path: &Path) -> PathBuf {
let mut result = PathBuf::new();
for component in path.components() {
match component {
std::path::Component::Normal(name) => {
result.push(name);
}
std::path::Component::ParentDir => {
result.pop();
}
std::path::Component::CurDir => {
// Ignore current directory components
}
std::path::Component::RootDir => {
result.push(component);
}
std::path::Component::Prefix(_prefix) => {
result.push(component);
}
}
}
result
}
// Highly optimized object path generation
pub fn get_object_path(&self, bucket: &str, key: &str) -> Result<PathBuf> {
let dir = Path::new(&bucket);
let file_path = Path::new(&key);
self.resolve_abs_path(dir.join(file_path))
// For high-frequency paths, use faster string concatenation
let cache_key = if key.is_empty() {
bucket.to_string()
} else {
// Use with_capacity to pre-allocate, reducing memory reallocations
let mut path_str = String::with_capacity(bucket.len() + key.len() + 1);
path_str.push_str(bucket);
path_str.push('/');
path_str.push_str(key);
path_str
};
// Fast path: directly calculate based on root, avoiding cache lookup overhead for simple cases
Ok(self.root.join(&cache_key))
}
pub fn get_bucket_path(&self, bucket: &str) -> Result<PathBuf> {
let dir = Path::new(&bucket);
self.resolve_abs_path(dir)
Ok(self.root.join(bucket))
}
// Batch path generation with single lock acquisition
pub fn get_object_paths_batch(&self, requests: &[(String, String)]) -> Result<Vec<PathBuf>> {
let mut results = Vec::with_capacity(requests.len());
let mut cache_misses = Vec::new();
// First attempt to get all paths from cache
{
let cache = self.path_cache.read();
for (i, (bucket, key)) in requests.iter().enumerate() {
let cache_key = format!("{}/{}", bucket, key);
if let Some(cached_path) = cache.get(&cache_key) {
results.push((i, cached_path.clone()));
} else {
cache_misses.push((i, bucket, key, cache_key));
}
}
}
// Handle cache misses
if !cache_misses.is_empty() {
let mut new_entries = Vec::new();
for (i, _bucket, _key, cache_key) in cache_misses {
let path = self.root.join(&cache_key);
results.push((i, path.clone()));
new_entries.push((cache_key, path));
}
// Batch update cache
{
let mut cache = self.path_cache.write();
for (key, path) in new_entries {
cache.insert(key, path);
}
}
}
// Sort results back to original order
results.sort_by_key(|(i, _)| *i);
Ok(results.into_iter().map(|(_, path)| path).collect())
}
// Optimized metadata reading with caching
pub async fn read_metadata_cached(&self, path: PathBuf) -> Result<Arc<FileMeta>> {
read_metadata_cached(path).await
}
// Smart prefetching for related files
pub async fn read_version_with_prefetch(
&self,
volume: &str,
path: &str,
version_id: &str,
opts: &ReadOptions,
) -> Result<FileInfo> {
let file_path = self.get_object_path(volume, path)?;
// Async prefetch related files, don't block current read
if let Some(parent) = file_path.parent() {
prefetch_metadata_patterns(parent, &[super::STORAGE_FORMAT_FILE, "part.1", "part.2", "part.meta"]).await;
}
// Main read logic
let file_dir = self.get_bucket_path(volume)?;
let (data, _) = self.read_raw(volume, file_dir, file_path, opts.read_data).await?;
get_file_info(&data, volume, path, version_id, FileInfoOpts { data: opts.read_data })
.await
.map_err(|_e| DiskError::Unexpected)
}
// Batch metadata reading for multiple objects
pub async fn read_metadata_batch(&self, requests: Vec<(String, String)>) -> Result<Vec<Option<Arc<FileMeta>>>> {
let paths: Vec<PathBuf> = requests
.iter()
.map(|(bucket, key)| self.get_object_path(bucket, &format!("{}/{}", key, super::STORAGE_FORMAT_FILE)))
.collect::<Result<Vec<_>>>()?;
let cache = get_global_file_cache();
let results = cache.get_metadata_batch(paths).await;
Ok(results.into_iter().map(|r| r.ok()).collect())
}
// /// Write to the filesystem atomically.
@@ -549,7 +715,15 @@ impl LocalDisk {
}
async fn read_metadata(&self, file_path: impl AsRef<Path>) -> Result<Vec<u8>> {
// TODO: support timeout
// Try to use cached file content reading for better performance, with safe fallback
let path = file_path.as_ref().to_path_buf();
// First, try the cache
if let Ok(bytes) = get_global_file_cache().get_file_content(path.clone()).await {
return Ok(bytes.to_vec());
}
// Fallback to direct read if cache fails
let (data, _) = self.read_metadata_with_dmtime(file_path.as_ref()).await?;
Ok(data)
}

View File

@@ -668,7 +668,7 @@ pub struct VolumeInfo {
pub created: Option<OffsetDateTime>,
}
#[derive(Deserialize, Serialize, Debug, Default)]
#[derive(Deserialize, Serialize, Debug, Default, Clone)]
pub struct ReadOptions {
pub incl_free_versions: bool,
pub read_data: bool,

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_utils::{XHost, check_local_server_addr, get_host_ip, get_host_ip_async, is_local_host};
use rustfs_utils::{XHost, check_local_server_addr, get_host_ip, is_local_host};
use tracing::{error, instrument, warn};
use crate::{
@@ -248,8 +248,7 @@ impl PoolEndpointList {
Ok(ips) => ips,
Err(e) => {
error!("host {} not found, error:{}", host, e);
get_host_ip_async(host.clone())
.map_err(|e| Error::other(format!("host '{host}' cannot resolve: {e}")))?
return Err(Error::other(format!("host '{host}' cannot resolve: {e}")));
}
};
host_ip_cache.insert(host.clone(), ips);

View File

@@ -0,0 +1,332 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! High-performance file content and metadata caching using moka
//!
//! This module provides optimized caching for file operations to reduce
//! redundant I/O and improve overall system performance.
use super::disk::error::{Error, Result};
use bytes::Bytes;
use moka::future::Cache;
use rustfs_filemeta::FileMeta;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
pub struct OptimizedFileCache {
// Use moka as high-performance async cache
metadata_cache: Cache<PathBuf, Arc<FileMeta>>,
file_content_cache: Cache<PathBuf, Bytes>,
// Performance monitoring
cache_hits: std::sync::atomic::AtomicU64,
cache_misses: std::sync::atomic::AtomicU64,
}
impl OptimizedFileCache {
pub fn new() -> Self {
Self {
metadata_cache: Cache::builder()
.max_capacity(2048)
.time_to_live(Duration::from_secs(300)) // 5 minutes TTL
.time_to_idle(Duration::from_secs(60)) // 1 minute idle
.build(),
file_content_cache: Cache::builder()
.max_capacity(512) // Smaller file content cache
.time_to_live(Duration::from_secs(120))
.weigher(|_key: &PathBuf, value: &Bytes| value.len() as u32)
.build(),
cache_hits: std::sync::atomic::AtomicU64::new(0),
cache_misses: std::sync::atomic::AtomicU64::new(0),
}
}
pub async fn get_metadata(&self, path: PathBuf) -> Result<Arc<FileMeta>> {
if let Some(cached) = self.metadata_cache.get(&path).await {
self.cache_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(cached);
}
self.cache_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// Cache miss, read file
let data = tokio::fs::read(&path)
.await
.map_err(|e| Error::other(format!("Read metadata failed: {}", e)))?;
let mut meta = FileMeta::default();
meta.unmarshal_msg(&data)?;
let arc_meta = Arc::new(meta);
self.metadata_cache.insert(path, arc_meta.clone()).await;
Ok(arc_meta)
}
pub async fn get_file_content(&self, path: PathBuf) -> Result<Bytes> {
if let Some(cached) = self.file_content_cache.get(&path).await {
self.cache_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(cached);
}
self.cache_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let data = tokio::fs::read(&path)
.await
.map_err(|e| Error::other(format!("Read file failed: {}", e)))?;
let bytes = Bytes::from(data);
self.file_content_cache.insert(path, bytes.clone()).await;
Ok(bytes)
}
// Prefetch related files
pub async fn prefetch_related(&self, base_path: &Path, patterns: &[&str]) {
let mut prefetch_tasks = Vec::new();
for pattern in patterns {
let path = base_path.join(pattern);
if tokio::fs::metadata(&path).await.is_ok() {
let cache = self.clone();
let path_clone = path.clone();
prefetch_tasks.push(async move {
let _ = cache.get_metadata(path_clone).await;
});
}
}
// Parallel prefetch, don't wait for completion
if !prefetch_tasks.is_empty() {
tokio::spawn(async move {
futures::future::join_all(prefetch_tasks).await;
});
}
}
// Batch metadata reading with deduplication
pub async fn get_metadata_batch(
&self,
paths: Vec<PathBuf>,
) -> Vec<std::result::Result<Arc<FileMeta>, rustfs_filemeta::Error>> {
let mut results = Vec::with_capacity(paths.len());
let mut cache_futures = Vec::new();
// First, attempt to get from cache
for (i, path) in paths.iter().enumerate() {
if let Some(cached) = self.metadata_cache.get(path).await {
results.push((i, Ok(cached)));
self.cache_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
} else {
cache_futures.push((i, path.clone()));
}
}
// For cache misses, read from filesystem
if !cache_futures.is_empty() {
let mut fs_results = Vec::new();
for (i, path) in cache_futures {
self.cache_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
match tokio::fs::read(&path).await {
Ok(data) => {
let mut meta = FileMeta::default();
match meta.unmarshal_msg(&data) {
Ok(_) => {
let arc_meta = Arc::new(meta);
self.metadata_cache.insert(path, arc_meta.clone()).await;
fs_results.push((i, Ok(arc_meta)));
}
Err(e) => {
fs_results.push((i, Err(e)));
}
}
}
Err(_e) => {
fs_results.push((i, Err(rustfs_filemeta::Error::Unexpected)));
}
}
}
results.extend(fs_results);
}
// Sort results back to original order
results.sort_by_key(|(i, _)| *i);
results.into_iter().map(|(_, result)| result).collect()
}
// Invalidate cache entries for a path
pub async fn invalidate(&self, path: &Path) {
self.metadata_cache.remove(path).await;
self.file_content_cache.remove(path).await;
}
// Get cache statistics
pub fn get_stats(&self) -> FileCacheStats {
let hits = self.cache_hits.load(std::sync::atomic::Ordering::Relaxed);
let misses = self.cache_misses.load(std::sync::atomic::Ordering::Relaxed);
let hit_rate = if hits + misses > 0 {
(hits as f64 / (hits + misses) as f64) * 100.0
} else {
0.0
};
FileCacheStats {
metadata_cache_size: self.metadata_cache.entry_count(),
content_cache_size: self.file_content_cache.entry_count(),
cache_hits: hits,
cache_misses: misses,
hit_rate,
total_weight: 0, // Simplified for compatibility
}
}
// Clear all caches
pub async fn clear(&self) {
self.metadata_cache.invalidate_all();
self.file_content_cache.invalidate_all();
// Wait for invalidation to complete
self.metadata_cache.run_pending_tasks().await;
self.file_content_cache.run_pending_tasks().await;
}
}
impl Clone for OptimizedFileCache {
fn clone(&self) -> Self {
Self {
metadata_cache: self.metadata_cache.clone(),
file_content_cache: self.file_content_cache.clone(),
cache_hits: std::sync::atomic::AtomicU64::new(self.cache_hits.load(std::sync::atomic::Ordering::Relaxed)),
cache_misses: std::sync::atomic::AtomicU64::new(self.cache_misses.load(std::sync::atomic::Ordering::Relaxed)),
}
}
}
#[derive(Debug)]
pub struct FileCacheStats {
pub metadata_cache_size: u64,
pub content_cache_size: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub hit_rate: f64,
pub total_weight: u64,
}
impl Default for OptimizedFileCache {
fn default() -> Self {
Self::new()
}
}
// Global cache instance
use std::sync::OnceLock;
static GLOBAL_FILE_CACHE: OnceLock<OptimizedFileCache> = OnceLock::new();
pub fn get_global_file_cache() -> &'static OptimizedFileCache {
GLOBAL_FILE_CACHE.get_or_init(OptimizedFileCache::new)
}
// Utility functions for common operations
pub async fn read_metadata_cached(path: PathBuf) -> Result<Arc<FileMeta>> {
get_global_file_cache().get_metadata(path).await
}
pub async fn read_file_content_cached(path: PathBuf) -> Result<Bytes> {
get_global_file_cache().get_file_content(path).await
}
pub async fn prefetch_metadata_patterns(base_path: &Path, patterns: &[&str]) {
get_global_file_cache().prefetch_related(base_path, patterns).await;
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::tempdir;
#[tokio::test]
async fn test_file_cache_basic() {
let cache = OptimizedFileCache::new();
// Create a temporary file
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.txt");
let mut file = std::fs::File::create(&file_path).unwrap();
writeln!(file, "test content").unwrap();
drop(file);
// First read should be cache miss
let content1 = cache.get_file_content(file_path.clone()).await.unwrap();
assert_eq!(content1, Bytes::from("test content\n"));
// Second read should be cache hit
let content2 = cache.get_file_content(file_path.clone()).await.unwrap();
assert_eq!(content2, content1);
let stats = cache.get_stats();
assert!(stats.cache_hits > 0);
assert!(stats.cache_misses > 0);
}
#[tokio::test]
async fn test_metadata_batch_read() {
let cache = OptimizedFileCache::new();
// Create test files
let dir = tempdir().unwrap();
let mut paths = Vec::new();
for i in 0..5 {
let file_path = dir.path().join(format!("test_{}.txt", i));
let mut file = std::fs::File::create(&file_path).unwrap();
writeln!(file, "content {}", i).unwrap();
paths.push(file_path);
}
// Note: This test would need actual FileMeta files to work properly
// For now, we just test that the function runs without errors
let results = cache.get_metadata_batch(paths).await;
assert_eq!(results.len(), 5);
}
#[tokio::test]
async fn test_cache_invalidation() {
let cache = OptimizedFileCache::new();
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.txt");
let mut file = std::fs::File::create(&file_path).unwrap();
writeln!(file, "test content").unwrap();
drop(file);
// Read file to populate cache
let _ = cache.get_file_content(file_path.clone()).await.unwrap();
// Invalidate cache
cache.invalidate(&file_path).await;
// Next read should be cache miss again
let _ = cache.get_file_content(file_path.clone()).await.unwrap();
let stats = cache.get_stats();
assert!(stats.cache_misses >= 2);
}
}

View File

@@ -16,6 +16,7 @@
extern crate core;
pub mod admin_server_info;
pub mod batch_processor;
pub mod bitrot;
pub mod bucket;
pub mod cache_value;
@@ -29,6 +30,7 @@ pub mod disks_layout;
pub mod endpoints;
pub mod erasure_coding;
pub mod error;
pub mod file_cache;
pub mod global;
pub mod lock_utils;
pub mod metrics_realtime;

View File

@@ -15,6 +15,7 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
use crate::batch_processor::{AsyncBatchProcessor, get_global_processors};
use crate::bitrot::{create_bitrot_reader, create_bitrot_writer};
use crate::bucket::lifecycle::lifecycle::TRANSITION_COMPLETE;
use crate::bucket::versioning::VersioningApi;
@@ -232,7 +233,10 @@ impl SetDisks {
});
}
let results = join_all(futures).await;
// Use optimized batch processor for disk info retrieval
let processor = get_global_processors().metadata_processor();
let results = processor.execute_batch(futures).await;
for result in results {
match result {
Ok(res) => {
@@ -507,21 +511,28 @@ impl SetDisks {
#[tracing::instrument(skip(disks))]
async fn cleanup_multipart_path(disks: &[Option<DiskStore>], paths: &[String]) {
let mut futures = Vec::with_capacity(disks.len());
let mut errs = Vec::with_capacity(disks.len());
for disk in disks.iter() {
futures.push(async move {
if let Some(disk) = disk {
disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, paths).await
} else {
Err(DiskError::DiskNotFound)
// Use improved simple batch processor instead of join_all for better performance
let processor = get_global_processors().write_processor();
let tasks: Vec<_> = disks
.iter()
.map(|disk| {
let disk = disk.clone();
let paths = paths.to_vec();
async move {
if let Some(disk) = disk {
disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, &paths).await
} else {
Err(DiskError::DiskNotFound)
}
}
})
}
.collect();
let results = join_all(futures).await;
let results = processor.execute_batch(tasks).await;
for result in results {
match result {
Ok(_) => {
@@ -545,21 +556,32 @@ impl SetDisks {
part_numbers: &[usize],
read_quorum: usize,
) -> disk::error::Result<Vec<ObjectPartInfo>> {
let mut futures = Vec::with_capacity(disks.len());
for (i, disk) in disks.iter().enumerate() {
futures.push(async move {
if let Some(disk) = disk {
disk.read_parts(bucket, part_meta_paths).await
} else {
Err(DiskError::DiskNotFound)
}
});
}
let mut errs = Vec::with_capacity(disks.len());
let mut object_parts = Vec::with_capacity(disks.len());
let results = join_all(futures).await;
// Use batch processor for better performance
let processor = get_global_processors().read_processor();
let bucket = bucket.to_string();
let part_meta_paths = part_meta_paths.to_vec();
let tasks: Vec<_> = disks
.iter()
.map(|disk| {
let disk = disk.clone();
let bucket = bucket.clone();
let part_meta_paths = part_meta_paths.clone();
async move {
if let Some(disk) = disk {
disk.read_parts(&bucket, &part_meta_paths).await
} else {
Err(DiskError::DiskNotFound)
}
}
})
.collect();
let results = processor.execute_batch(tasks).await;
for result in results {
match result {
Ok(res) => {
@@ -1369,22 +1391,71 @@ impl SetDisks {
})
});
// Wait for all tasks to complete
let results = join_all(futures).await;
for result in results {
match result? {
Ok(res) => {
ress.push(res);
errors.push(None);
}
Err(e) => {
match result {
Ok(res) => match res {
Ok(file_info) => {
ress.push(file_info);
errors.push(None);
}
Err(e) => {
ress.push(FileInfo::default());
errors.push(Some(e));
}
},
Err(_) => {
ress.push(FileInfo::default());
errors.push(Some(e));
errors.push(Some(DiskError::Unexpected));
}
}
}
Ok((ress, errors))
}
// Optimized version using batch processor with quorum support
pub async fn read_version_optimized(
&self,
bucket: &str,
object: &str,
version_id: &str,
opts: &ReadOptions,
) -> Result<Vec<rustfs_filemeta::FileInfo>> {
// Use existing disk selection logic
let disks = self.disks.read().await;
let required_reads = self.format.erasure.sets.len();
// Clone parameters outside the closure to avoid lifetime issues
let bucket = bucket.to_string();
let object = object.to_string();
let version_id = version_id.to_string();
let opts = opts.clone();
let processor = get_global_processors().read_processor();
let tasks: Vec<_> = disks
.iter()
.take(required_reads + 2) // Read a few extra for reliability
.filter_map(|disk| {
disk.as_ref().map(|d| {
let disk = d.clone();
let bucket = bucket.clone();
let object = object.clone();
let version_id = version_id.clone();
let opts = opts.clone();
async move { disk.read_version(&bucket, &bucket, &object, &version_id, &opts).await }
})
})
.collect();
match processor.execute_batch_with_quorum(tasks, required_reads).await {
Ok(results) => Ok(results),
Err(_) => Err(DiskError::FileNotFound.into()), // Use existing error type
}
}
async fn read_all_xl(
disks: &[Option<DiskStore>],
bucket: &str,
@@ -1403,10 +1474,11 @@ impl SetDisks {
object: &str,
read_data: bool,
) -> (Vec<Option<RawFileInfo>>, Vec<Option<DiskError>>) {
let mut futures = Vec::with_capacity(disks.len());
let mut ress = Vec::with_capacity(disks.len());
let mut errors = Vec::with_capacity(disks.len());
let mut futures = Vec::with_capacity(disks.len());
for disk in disks.iter() {
futures.push(async move {
if let Some(disk) = disk {

View File

@@ -302,17 +302,19 @@ impl TierConfigMgr {
}
pub async fn get_driver<'a>(&'a mut self, tier_name: &str) -> std::result::Result<&'a WarmBackendImpl, AdminError> {
Ok(match self.driver_cache.entry(tier_name.to_string()) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => {
let t = self.tiers.get(tier_name);
if t.is_none() {
return Err(ERR_TIER_NOT_FOUND.clone());
}
let d = new_warm_backend(t.expect("err"), false).await?;
e.insert(d)
}
})
// Return cached driver if present
if self.driver_cache.contains_key(tier_name) {
return Ok(self.driver_cache.get(tier_name).unwrap());
}
// Get tier configuration and create new driver
let tier_config = self.tiers.get(tier_name).ok_or_else(|| ERR_TIER_NOT_FOUND.clone())?;
let driver = new_warm_backend(tier_config, false).await?;
// Insert and return reference
self.driver_cache.insert(tier_name.to_string(), driver);
Ok(self.driver_cache.get(tier_name).unwrap())
}
pub async fn reload(&mut self, api: Arc<ECStore>) -> std::result::Result<(), std::io::Error> {

File diff suppressed because it is too large Load Diff

View File

@@ -27,7 +27,7 @@ use crate::fast_lock::{
/// High-performance object lock manager
#[derive(Debug)]
pub struct FastObjectLockManager {
shards: Vec<Arc<LockShard>>,
pub shards: Vec<Arc<LockShard>>,
shard_mask: usize,
config: LockConfig,
metrics: Arc<GlobalMetrics>,
@@ -66,7 +66,12 @@ impl FastObjectLockManager {
pub async fn acquire_lock(&self, request: ObjectLockRequest) -> Result<FastLockGuard, LockResult> {
let shard = self.get_shard(&request.key);
match shard.acquire_lock(&request).await {
Ok(()) => Ok(FastLockGuard::new(request.key, request.mode, request.owner, shard.clone())),
Ok(()) => {
let guard = FastLockGuard::new(request.key, request.mode, request.owner, shard.clone());
// Register guard to prevent premature cleanup
shard.register_guard(guard.guard_id());
Ok(guard)
}
Err(err) => Err(err),
}
}
@@ -117,6 +122,54 @@ impl FastObjectLockManager {
self.acquire_lock(request).await
}
/// Acquire high-priority read lock - optimized for database queries
pub async fn acquire_high_priority_read_lock(
&self,
bucket: impl Into<Arc<str>>,
object: impl Into<Arc<str>>,
owner: impl Into<Arc<str>>,
) -> Result<FastLockGuard, LockResult> {
let request =
ObjectLockRequest::new_read(bucket, object, owner).with_priority(crate::fast_lock::types::LockPriority::High);
self.acquire_lock(request).await
}
/// Acquire high-priority write lock - optimized for database queries
pub async fn acquire_high_priority_write_lock(
&self,
bucket: impl Into<Arc<str>>,
object: impl Into<Arc<str>>,
owner: impl Into<Arc<str>>,
) -> Result<FastLockGuard, LockResult> {
let request =
ObjectLockRequest::new_write(bucket, object, owner).with_priority(crate::fast_lock::types::LockPriority::High);
self.acquire_lock(request).await
}
/// Acquire critical priority read lock - for system operations
pub async fn acquire_critical_read_lock(
&self,
bucket: impl Into<Arc<str>>,
object: impl Into<Arc<str>>,
owner: impl Into<Arc<str>>,
) -> Result<FastLockGuard, LockResult> {
let request =
ObjectLockRequest::new_read(bucket, object, owner).with_priority(crate::fast_lock::types::LockPriority::Critical);
self.acquire_lock(request).await
}
/// Acquire critical priority write lock - for system operations
pub async fn acquire_critical_write_lock(
&self,
bucket: impl Into<Arc<str>>,
object: impl Into<Arc<str>>,
owner: impl Into<Arc<str>>,
) -> Result<FastLockGuard, LockResult> {
let request =
ObjectLockRequest::new_write(bucket, object, owner).with_priority(crate::fast_lock::types::LockPriority::Critical);
self.acquire_lock(request).await
}
/// Acquire multiple locks atomically - optimized version
pub async fn acquire_locks_batch(&self, batch_request: BatchLockRequest) -> BatchLockResult {
// Pre-sort requests by (shard_id, key) to avoid deadlocks
@@ -304,7 +357,7 @@ impl FastObjectLockManager {
}
/// Get shard for object key
fn get_shard(&self, key: &crate::fast_lock::types::ObjectKey) -> &Arc<LockShard> {
pub fn get_shard(&self, key: &crate::fast_lock::types::ObjectKey) -> &Arc<LockShard> {
let index = key.shard_index(self.shard_mask);
&self.shards[index]
}
@@ -362,6 +415,18 @@ impl Drop for FastObjectLockManager {
}
}
impl Clone for FastObjectLockManager {
fn clone(&self) -> Self {
Self {
shards: self.shards.clone(),
shard_mask: self.shard_mask,
config: self.config.clone(),
metrics: self.metrics.clone(),
cleanup_handle: RwLock::new(None), // Don't clone the cleanup task
}
}
}
#[async_trait::async_trait]
impl LockManager for FastObjectLockManager {
async fn acquire_lock(&self, request: ObjectLockRequest) -> Result<FastLockGuard, LockResult> {

View File

@@ -53,8 +53,11 @@ pub const DEFAULT_SHARD_COUNT: usize = 1024;
/// Default lock timeout
pub const DEFAULT_LOCK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
/// Default acquire timeout
pub const DEFAULT_ACQUIRE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
/// Default acquire timeout - increased for database workloads
pub const DEFAULT_ACQUIRE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
/// Maximum acquire timeout for high-load scenarios
pub const MAX_ACQUIRE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
/// Lock cleanup interval
pub const CLEANUP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);

View File

@@ -18,7 +18,8 @@ use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use tokio::sync::Notify;
/// Optimized notification pool to reduce memory overhead and thundering herd effects
static NOTIFY_POOL: Lazy<Vec<Arc<Notify>>> = Lazy::new(|| (0..64).map(|_| Arc::new(Notify::new())).collect());
/// Increased pool size for better performance under high concurrency
static NOTIFY_POOL: Lazy<Vec<Arc<Notify>>> = Lazy::new(|| (0..128).map(|_| Arc::new(Notify::new())).collect());
/// Optimized notification system for object locks
#[derive(Debug)]

View File

@@ -24,6 +24,7 @@ use crate::fast_lock::{
state::ObjectLockState,
types::{LockMode, LockResult, ObjectKey, ObjectLockRequest},
};
use std::collections::HashSet;
/// Lock shard to reduce global contention
#[derive(Debug)]
@@ -36,6 +37,8 @@ pub struct LockShard {
metrics: ShardMetrics,
/// Shard ID for debugging
_shard_id: usize,
/// Active guard IDs to prevent cleanup of locks with live guards
active_guards: parking_lot::Mutex<HashSet<u64>>,
}
impl LockShard {
@@ -45,6 +48,7 @@ impl LockShard {
object_pool: ObjectStatePool::new(),
metrics: ShardMetrics::new(),
_shard_id: shard_id,
active_guards: parking_lot::Mutex::new(HashSet::new()),
}
}
@@ -123,7 +127,12 @@ impl LockShard {
/// Slow path with async waiting
async fn acquire_lock_slow_path(&self, request: &ObjectLockRequest, start_time: Instant) -> Result<(), LockResult> {
let deadline = start_time + request.acquire_timeout;
// Use adaptive timeout based on current load and request priority
let adaptive_timeout = self.calculate_adaptive_timeout(request);
let deadline = start_time + adaptive_timeout;
let mut retry_count = 0u32;
const MAX_RETRIES: u32 = 10;
loop {
// Get or create object state
@@ -157,8 +166,22 @@ impl LockShard {
return Err(LockResult::Timeout);
}
// Wait for notification using optimized notify system
// Use intelligent wait strategy: mix of notification wait and exponential backoff
let remaining = deadline - Instant::now();
if retry_count < MAX_RETRIES && remaining > Duration::from_millis(10) {
// For early retries, use a brief exponential backoff instead of full notification wait
let backoff_ms = std::cmp::min(10 << retry_count, 100); // 10ms, 20ms, 40ms, 80ms, 100ms max
let backoff_duration = Duration::from_millis(backoff_ms);
if backoff_duration < remaining {
tokio::time::sleep(backoff_duration).await;
retry_count += 1;
continue;
}
}
// If we've exhausted quick retries or have little time left, use notification wait
let wait_result = match request.mode {
LockMode::Shared => {
state.atomic_state.inc_readers_waiting();
@@ -179,7 +202,7 @@ impl LockShard {
return Err(LockResult::Timeout);
}
// Continue the loop to try acquisition again
retry_count += 1;
}
}
@@ -203,10 +226,30 @@ impl LockShard {
should_cleanup = !state.is_locked() && !state.atomic_state.has_waiters();
} else {
should_cleanup = false;
// Additional diagnostics for release failures
let current_mode = state.current_mode();
let is_locked = state.is_locked();
let has_waiters = state.atomic_state.has_waiters();
tracing::debug!(
"Lock release failed in shard: key={}, owner={}, mode={:?}, current_mode={:?}, is_locked={}, has_waiters={}",
key,
owner,
mode,
current_mode,
is_locked,
has_waiters
);
}
} else {
result = false;
should_cleanup = false;
tracing::debug!(
"Lock release failed - key not found in shard: key={}, owner={}, mode={:?}",
key,
owner,
mode
);
}
}
@@ -218,6 +261,134 @@ impl LockShard {
result
}
/// Release lock with guard ID tracking for double-release prevention
pub fn release_lock_with_guard(&self, key: &ObjectKey, owner: &Arc<str>, mode: LockMode, guard_id: u64) -> bool {
// First, try to remove the guard from active set
let guard_was_active = {
let mut guards = self.active_guards.lock();
guards.remove(&guard_id)
};
// If guard was not active, this is a double-release attempt
if !guard_was_active {
tracing::debug!(
"Double-release attempt blocked: key={}, owner={}, mode={:?}, guard_id={}",
key,
owner,
mode,
guard_id
);
return false;
}
// Proceed with normal release
let should_cleanup;
let result;
{
let objects = self.objects.read();
if let Some(state) = objects.get(key) {
result = match mode {
LockMode::Shared => state.release_shared(owner),
LockMode::Exclusive => state.release_exclusive(owner),
};
if result {
self.metrics.record_release();
should_cleanup = !state.is_locked() && !state.atomic_state.has_waiters();
} else {
should_cleanup = false;
}
} else {
result = false;
should_cleanup = false;
}
}
if should_cleanup {
self.schedule_cleanup(key.clone());
}
result
}
/// Register a guard to prevent premature cleanup
pub fn register_guard(&self, guard_id: u64) {
let mut guards = self.active_guards.lock();
guards.insert(guard_id);
}
/// Unregister a guard (called when guard is dropped)
pub fn unregister_guard(&self, guard_id: u64) {
let mut guards = self.active_guards.lock();
guards.remove(&guard_id);
}
/// Get count of active guards (for testing)
#[cfg(test)]
pub fn active_guard_count(&self) -> usize {
let guards = self.active_guards.lock();
guards.len()
}
/// Check if a guard is active (for testing)
#[cfg(test)]
pub fn is_guard_active(&self, guard_id: u64) -> bool {
let guards = self.active_guards.lock();
guards.contains(&guard_id)
}
/// Calculate adaptive timeout based on current system load and request priority
fn calculate_adaptive_timeout(&self, request: &ObjectLockRequest) -> Duration {
let base_timeout = request.acquire_timeout;
// Get current shard load metrics
let lock_count = {
let objects = self.objects.read();
objects.len()
};
let active_guard_count = {
let guards = self.active_guards.lock();
guards.len()
};
// Calculate load factor with more generous thresholds for database workloads
let total_load = (lock_count + active_guard_count) as f64;
let load_factor = total_load / 500.0; // Lowered threshold for faster scaling
// More aggressive priority multipliers for database scenarios
let priority_multiplier = match request.priority {
crate::fast_lock::types::LockPriority::Critical => 3.0, // Increased
crate::fast_lock::types::LockPriority::High => 2.0, // Increased
crate::fast_lock::types::LockPriority::Normal => 1.2, // Slightly increased base
crate::fast_lock::types::LockPriority::Low => 0.9,
};
// More generous load-based scaling
let load_multiplier = if load_factor > 2.0 {
// Very high load: drastically extend timeout
1.0 + (load_factor * 2.0)
} else if load_factor > 1.0 {
// High load: significantly extend timeout
1.0 + (load_factor * 1.8)
} else if load_factor > 0.3 {
// Medium load: moderately extend timeout
1.0 + (load_factor * 1.2)
} else {
// Low load: still give some buffer
1.1
};
let total_multiplier = priority_multiplier * load_multiplier;
let adaptive_timeout_secs =
(base_timeout.as_secs_f64() * total_multiplier).min(crate::fast_lock::MAX_ACQUIRE_TIMEOUT.as_secs_f64());
// Ensure minimum reasonable timeout even for low priority
let min_timeout_secs = base_timeout.as_secs_f64() * 0.8;
Duration::from_secs_f64(adaptive_timeout_secs.max(min_timeout_secs))
}
/// Batch acquire locks with ordering to prevent deadlocks
pub async fn acquire_locks_batch(
&self,
@@ -324,22 +495,44 @@ impl LockShard {
pub fn adaptive_cleanup(&self) -> usize {
let current_load = self.current_load_factor();
let lock_count = self.lock_count();
let active_guard_count = self.active_guards.lock().len();
// Be much more conservative if there are active guards or very high load
if active_guard_count > 0 && current_load > 0.8 {
tracing::debug!(
"Skipping aggressive cleanup due to {} active guards and high load ({:.2})",
active_guard_count,
current_load
);
// Only clean very old entries when under high load with active guards
return self.cleanup_expired_batch(3, 1_200_000); // 20 minutes, smaller batches
}
// Under extreme load, skip cleanup entirely to reduce contention
if current_load > 1.5 && active_guard_count > 10 {
tracing::debug!(
"Skipping all cleanup due to extreme load ({:.2}) and {} active guards",
current_load,
active_guard_count
);
return 0;
}
// Dynamically adjust cleanup strategy based on load
let cleanup_batch_size = match current_load {
load if load > 0.9 => lock_count / 20, // High load: small batch cleanup
load if load > 0.7 => lock_count / 10, // Medium load: moderate cleanup
_ => lock_count / 5, // Low load: aggressive cleanup
load if load > 0.9 => lock_count / 50, // Much smaller batches for high load
load if load > 0.7 => lock_count / 20, // Smaller batches for medium load
_ => lock_count / 10, // More conservative even for low load
};
// Use longer timeout for high load scenarios
// Use much longer timeouts to prevent premature cleanup
let cleanup_threshold_millis = match current_load {
load if load > 0.8 => 300_000, // 5 minutes for high load
load if load > 0.5 => 180_000, // 3 minutes for medium load
_ => 60_000, // 1 minute for low load
load if load > 0.8 => 600_000, // 10 minutes for high load
load if load > 0.5 => 300_000, // 5 minutes for medium load
_ => 120_000, // 2 minutes for low load
};
self.cleanup_expired_batch(cleanup_batch_size.max(10), cleanup_threshold_millis)
self.cleanup_expired_batch_protected(cleanup_batch_size.max(5), cleanup_threshold_millis)
}
/// Cleanup expired and unused locks
@@ -378,6 +571,19 @@ impl LockShard {
cleaned
}
/// Protected batch cleanup that respects active guards
fn cleanup_expired_batch_protected(&self, max_batch_size: usize, cleanup_threshold_millis: u64) -> usize {
let active_guards = self.active_guards.lock();
let guard_count = active_guards.len();
drop(active_guards); // Release lock early
if guard_count > 0 {
tracing::debug!("Cleanup with {} active guards, being conservative", guard_count);
}
self.cleanup_expired_batch(max_batch_size, cleanup_threshold_millis)
}
/// Batch cleanup with limited processing to avoid blocking
fn cleanup_expired_batch(&self, max_batch_size: usize, cleanup_threshold_millis: u64) -> usize {
let mut cleaned = 0;

View File

@@ -373,11 +373,23 @@ impl ObjectLockState {
}
true
} else {
// Inconsistency - re-add owner
// Inconsistency detected - atomic state shows no shared lock but owner was found
tracing::warn!(
"Atomic state inconsistency during shared lock release: owner={}, remaining_owners={}",
owner,
shared.len()
);
// Re-add owner to maintain consistency
shared.push(owner.clone());
false
}
} else {
// Owner not found in shared owners list
tracing::debug!(
"Shared lock release failed - owner not found: owner={}, current_owners={:?}",
owner,
shared.iter().map(|s| s.as_ref()).collect::<Vec<_>>()
);
false
}
}
@@ -401,9 +413,21 @@ impl ObjectLockState {
}
true
} else {
// Atomic state inconsistency - current owner matches but atomic release failed
tracing::warn!(
"Atomic state inconsistency during exclusive lock release: owner={}, atomic_state={:b}",
owner,
self.atomic_state.state.load(Ordering::Acquire)
);
false
}
} else {
// Owner mismatch
tracing::debug!(
"Exclusive lock release failed - owner mismatch: expected_owner={}, actual_owner={:?}",
owner,
current.as_ref().map(|s| s.as_ref())
);
false
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Serialize, de::Error};
use time::OffsetDateTime;
use super::Policy;
@@ -59,15 +59,17 @@ impl TryFrom<Vec<u8>> for PolicyDoc {
type Error = serde_json::Error;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
match serde_json::from_slice::<PolicyDoc>(&value) {
Ok(res) => Ok(res),
Err(err) => match serde_json::from_slice::<Policy>(&value) {
Ok(res2) => Ok(Self {
policy: res2,
..Default::default()
}),
Err(_) => Err(err),
},
// Try to parse as PolicyDoc first
if let Ok(policy_doc) = serde_json::from_slice::<PolicyDoc>(&value) {
return Ok(policy_doc);
}
// Fall back to parsing as Policy and wrap in PolicyDoc
serde_json::from_slice::<Policy>(&value)
.map(|policy| Self {
policy,
..Default::default()
})
.map_err(|_| serde_json::Error::custom("Failed to parse as PolicyDoc or Policy".to_string()))
}
}

View File

@@ -16,17 +16,40 @@ use bytes::Bytes;
use futures::pin_mut;
use futures::{Stream, StreamExt};
use std::net::Ipv6Addr;
use std::sync::LazyLock;
use std::sync::{LazyLock, Mutex};
use std::{
collections::HashSet,
collections::{HashMap, HashSet},
fmt::Display,
net::{IpAddr, SocketAddr, TcpListener, ToSocketAddrs},
time::{Duration, Instant},
};
use transform_stream::AsyncTryStream;
use url::{Host, Url};
static LOCAL_IPS: LazyLock<Vec<IpAddr>> = LazyLock::new(|| must_get_local_ips().unwrap());
#[derive(Debug, Clone)]
struct DnsCacheEntry {
ips: HashSet<IpAddr>,
cached_at: Instant,
}
impl DnsCacheEntry {
fn new(ips: HashSet<IpAddr>) -> Self {
Self {
ips,
cached_at: Instant::now(),
}
}
fn is_expired(&self, ttl: Duration) -> bool {
self.cached_at.elapsed() > ttl
}
}
static DNS_CACHE: LazyLock<Mutex<HashMap<String, DnsCacheEntry>>> = LazyLock::new(|| Mutex::new(HashMap::new()));
const DNS_CACHE_TTL: Duration = Duration::from_secs(300); // 5 minutes
/// helper for validating if the provided arg is an ip address.
pub fn is_socket_addr(addr: &str) -> bool {
// TODO IPv6 zone information?
@@ -87,19 +110,19 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> std::io::R
}
/// returns IP address of given host using layered DNS resolution.
pub fn get_host_ip_async(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
///
/// This is the async version of `get_host_ip()` that provides enhanced DNS resolution
/// with Kubernetes support when the "net" feature is enabled.
pub async fn get_host_ip_async(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
match host {
Host::Domain(domain) => {
#[cfg(feature = "net")]
{
use crate::dns_resolver::resolve_domain;
let handle = tokio::runtime::Handle::current();
handle.block_on(async {
match resolve_domain(domain).await {
Ok(ips) => Ok(ips.into_iter().collect()),
Err(e) => Err(std::io::Error::other(format!("DNS resolution failed: {}", e))),
}
})
match resolve_domain(domain).await {
Ok(ips) => Ok(ips.into_iter().collect()),
Err(e) => Err(std::io::Error::other(format!("DNS resolution failed: {}", e))),
}
}
#[cfg(not(feature = "net"))]
{
@@ -128,17 +151,41 @@ pub fn get_host_ip_async(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
/// returns IP address of given host using standard resolution.
///
/// **Note**: This function uses standard library DNS resolution.
/// **Note**: This function uses standard library DNS resolution with caching.
/// For enhanced DNS resolution with Kubernetes support, use `get_host_ip_async()`.
pub fn get_host_ip(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
match host {
Host::Domain(domain) => match (domain, 0)
.to_socket_addrs()
.map(|v| v.map(|v| v.ip()).collect::<HashSet<_>>())
{
Ok(ips) => Ok(ips),
Err(err) => Err(std::io::Error::other(err)),
},
Host::Domain(domain) => {
// Check cache first
if let Ok(mut cache) = DNS_CACHE.lock() {
if let Some(entry) = cache.get(domain) {
if !entry.is_expired(DNS_CACHE_TTL) {
return Ok(entry.ips.clone());
}
// Remove expired entry
cache.remove(domain);
}
}
// Perform DNS resolution
match (domain, 0)
.to_socket_addrs()
.map(|v| v.map(|v| v.ip()).collect::<HashSet<_>>())
{
Ok(ips) => {
// Cache the result
if let Ok(mut cache) = DNS_CACHE.lock() {
cache.insert(domain.to_string(), DnsCacheEntry::new(ips.clone()));
// Limit cache size to prevent memory bloat
if cache.len() > 1000 {
cache.retain(|_, v| !v.is_expired(DNS_CACHE_TTL));
}
}
Ok(ips)
}
Err(err) => Err(std::io::Error::other(err)),
}
}
Host::Ipv4(ip) => {
let mut set = HashSet::with_capacity(1);
set.insert(IpAddr::V4(ip));

View File

@@ -41,7 +41,7 @@ services:
- rustfs_data_1:/data/rustfs1
- rustfs_data_2:/data/rustfs2
- rustfs_data_3:/data/rustfs3
- ./logs:/app/logs
- logs_data:/app/logs
networks:
- rustfs-network
restart: unless-stopped
@@ -95,7 +95,7 @@ services:
command:
- --config=/etc/otelcol-contrib/otel-collector.yml
volumes:
- ./.docker/observability/otel-collector.yml:/etc/otelcol-contrib/otel-collector.yml:ro
- ./.docker/observability/otel-collector-config.yaml:/etc/otelcol-contrib/otel-collector.yml:ro
ports:
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP HTTP receiver
@@ -219,3 +219,5 @@ volumes:
driver: local
redis_data:
driver: local
logs_data:
driver: local

326
docs/PERFORMANCE_TESTING.md Normal file
View File

@@ -0,0 +1,326 @@
# RustFS 性能测试指南
本文档提供了对 RustFS 进行性能测试和性能分析的完整方法和工具。
## 概览
RustFS 提供了多种性能测试和分析工具:
1. **性能分析Profiling** - 使用内置的 pprof 接口收集 CPU 性能数据
2. **负载测试Load Testing** - 使用多种客户端工具模拟高并发请求
3. **监控和分析** - 查看性能指标和识别性能瓶颈
## 前置条件
### 1. 启用性能分析
在启动 RustFS 时,需要设置环境变量启用性能分析功能:
```bash
export RUSTFS_ENABLE_PROFILING=true
./rustfs
```
### 2. 安装依赖工具
确保系统中安装了以下工具:
```bash
# 基础工具
curl # HTTP 请求
jq # JSON 处理 (可选)
# 分析工具
go # Go pprof 工具 (可选,用于 protobuf 格式)
python3 # Python 负载测试脚本
# macOS 用户
brew install curl jq go python3
# Ubuntu/Debian 用户
sudo apt-get install curl jq golang-go python3
```
## 性能测试方法
### 方法 1使用专业脚本推荐
项目提供了完整的性能分析脚本:
```bash
# 查看脚本帮助
./scripts/profile_rustfs.sh help
# 检查性能分析状态
./scripts/profile_rustfs.sh status
# 收集火焰图30秒
./scripts/profile_rustfs.sh flamegraph
# 收集 protobuf 格式性能数据
./scripts/profile_rustfs.sh protobuf
# 收集两种格式的性能数据
./scripts/profile_rustfs.sh both
# 自定义参数
./scripts/profile_rustfs.sh -d 60 -u http://192.168.1.100:9000 both
```
### 方法 2使用 Python 综合测试
Python 脚本提供了负载测试和性能分析的一体化解决方案:
```bash
# 运行综合性能分析
python3 test_load.py
```
此脚本会:
1. 启动后台负载测试(多线程 S3 操作)
2. 并行收集性能分析数据
3. 生成火焰图用于分析
### 方法 3使用简单负载测试
对于快速测试,可以使用 bash 脚本:
```bash
# 运行简单负载测试
./simple_load_test.sh
```
## 性能分析输出格式
### 1. 火焰图SVG 格式)
- **用途**: 可视化 CPU 使用情况
- **文件**: `rustfs_profile_TIMESTAMP.svg`
- **查看方式**: 使用浏览器打开 SVG 文件
- **分析要点**:
- 宽度表示 CPU 使用时间
- 高度表示调用栈深度
- 点击可以放大特定函数
```bash
# 在浏览器中打开
open profiles/rustfs_profile_20240911_143000.svg
```
### 2. Protobuf 格式
- **用途**: 使用 Go pprof 工具进行详细分析
- **文件**: `rustfs_profile_TIMESTAMP.pb`
- **分析工具**: `go tool pprof`
```bash
# 使用 Go pprof 分析
go tool pprof profiles/rustfs_profile_20240911_143000.pb
# pprof 常用命令
(pprof) top # 显示 CPU 使用率最高的函数
(pprof) list func # 显示指定函数的源代码
(pprof) web # 生成 web 界面(需要 graphviz
(pprof) png # 生成 PNG 图片
(pprof) help # 查看所有命令
```
## API 接口使用
### 检查性能分析状态
```bash
curl "http://127.0.0.1:9000/rustfs/admin/debug/pprof/status"
```
返回示例:
```json
{
"enabled": "true",
"sampling_rate": "100"
}
```
### 收集性能数据
```bash
# 收集 30 秒的火焰图
curl "http://127.0.0.1:9000/rustfs/admin/debug/pprof/profile?seconds=30&format=flamegraph" \
-o profile.svg
# 收集 protobuf 格式数据
curl "http://127.0.0.1:9000/rustfs/admin/debug/pprof/profile?seconds=30&format=protobuf" \
-o profile.pb
```
**参数说明**:
- `seconds`: 收集时长1-300 秒)
- `format`: 输出格式(`flamegraph`/`svg``protobuf`/`pb`
## 负载测试场景
### 1. S3 API 负载测试
使用 Python 脚本进行完整的 S3 操作负载测试:
```python
# 基本配置
tester = S3LoadTester(
endpoint="http://127.0.0.1:9000",
access_key="rustfsadmin",
secret_key="rustfsadmin"
)
# 运行负载测试
# 4 个线程,每个线程执行 10 次操作
tester.run_load_test(num_threads=4, operations_per_thread=10)
```
每次操作包括:
1. 上传 1MB 对象
2. 下载对象
3. 删除对象
### 2. 自定义负载测试
```bash
# 创建测试桶
curl -X PUT "http://127.0.0.1:9000/test-bucket"
# 并发上传测试
for i in {1..10}; do
echo "test data $i" | curl -X PUT "http://127.0.0.1:9000/test-bucket/object-$i" -d @- &
done
wait
# 并发下载测试
for i in {1..10}; do
curl "http://127.0.0.1:9000/test-bucket/object-$i" > /dev/null &
done
wait
```
## 性能分析最佳实践
### 1. 测试环境准备
- 确保 RustFS 已启用性能分析: `RUSTFS_ENABLE_PROFILING=true`
- 使用独立的测试环境,避免其他程序干扰
- 确保有足够的磁盘空间存储分析文件
### 2. 数据收集建议
- **预热阶段**: 先运行 5-10 分钟的轻量负载
- **数据收集**: 在稳定负载下收集 30-60 秒的性能数据
- **多次采样**: 收集多个样本进行对比分析
### 3. 分析重点
在火焰图中重点关注:
1. **宽度最大的函数** - CPU 使用时间最长
2. **平顶函数** - 可能的性能瓶颈
3. **深度调用栈** - 可能的递归或复杂逻辑
4. **意外的系统调用** - I/O 或内存分配问题
### 4. 常见性能问题
- **锁竞争**: 查找 `std::sync` 相关函数
- **内存分配**: 查找 `alloc` 相关函数
- **I/O 等待**: 查找文件系统或网络 I/O 函数
- **序列化开销**: 查找 JSON/XML 解析函数
## 故障排除
### 1. 性能分析未启用
错误信息:`{"enabled":"false"}`
解决方案:
```bash
export RUSTFS_ENABLE_PROFILING=true
# 重启 RustFS
```
### 2. 连接被拒绝
错误信息:`Connection refused`
检查项:
- RustFS 是否正在运行
- 端口是否正确(默认 9000
- 防火墙设置
### 3. 分析文件过大
如果生成的分析文件过大:
- 减少收集时间(如 15-30 秒)
- 降低负载测试的并发度
- 使用 protobuf 格式而非 SVG
## 配置参数
### 环境变量
| 变量 | 默认值 | 描述 |
|------|--------|------|
| `RUSTFS_ENABLE_PROFILING` | `false` | 启用性能分析 |
| `RUSTFS_URL` | `http://127.0.0.1:9000` | RustFS 服务器地址 |
| `PROFILE_DURATION` | `30` | 性能数据收集时长(秒) |
| `OUTPUT_DIR` | `./profiles` | 输出文件目录 |
### 脚本参数
```bash
./scripts/profile_rustfs.sh [OPTIONS] [COMMAND]
OPTIONS:
-u, --url URL RustFS URL
-d, --duration SECONDS Profile duration
-o, --output DIR Output directory
COMMANDS:
status 检查状态
flamegraph 收集火焰图
protobuf 收集 protobuf 数据
both 收集两种格式(默认)
```
## 输出文件位置
- **脚本输出**: `./profiles/` 目录
- **Python 脚本**: `/tmp/rustfs_profiles/` 目录
- **文件命名**: `rustfs_profile_TIMESTAMP.{svg|pb}`
## 示例工作流程
1. **启动 RustFS**:
```bash
RUSTFS_ENABLE_PROFILING=true ./rustfs
```
2. **验证性能分析可用**:
```bash
./scripts/profile_rustfs.sh status
```
3. **开始负载测试**:
```bash
python3 test_load.py &
```
4. **收集性能数据**:
```bash
./scripts/profile_rustfs.sh -d 60 both
```
5. **分析结果**:
```bash
# 查看火焰图
open profiles/rustfs_profile_*.svg
# 或使用 pprof 分析
go tool pprof profiles/rustfs_profile_*.pb
```
通过这个完整的性能测试流程,你可以系统地分析 RustFS 的性能特征,识别瓶颈,并进行有针对性的优化。

View File

@@ -56,4 +56,5 @@ if [ "${RUSTFS_ACCESS_KEY}" = "rustfsadmin" ] || [ "${RUSTFS_SECRET_KEY}" = "rus
fi
echo "Starting: $*"
set -- "$@" $LOCAL_VOLUMES
exec "$@"

View File

@@ -118,6 +118,9 @@ libsystemd.workspace = true
[target.'cfg(all(target_os = "linux", target_env = "gnu"))'.dependencies]
tikv-jemallocator = "0.6"
[target.'cfg(not(target_os = "windows"))'.dependencies]
pprof = { version = "0.15.0", features = ["flamegraph", "protobuf-codec"] }
[build-dependencies]
http.workspace = true
futures.workspace = true

View File

@@ -81,6 +81,8 @@ pub mod sts;
pub mod tier;
pub mod trace;
pub mod user;
#[cfg(not(target_os = "windows"))]
use pprof::protos::Message;
use urlencoding::decode;
#[allow(dead_code)]
@@ -1233,6 +1235,172 @@ async fn count_bucket_objects(
}
}
pub struct ProfileHandler {}
#[async_trait::async_trait]
impl Operation for ProfileHandler {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
#[cfg(target_os = "windows")]
{
return Ok(S3Response::new((
StatusCode::NOT_IMPLEMENTED,
Body::from("CPU profiling is not supported on Windows platform".to_string()),
)));
}
#[cfg(not(target_os = "windows"))]
{
use crate::profiling;
if !profiling::is_profiler_enabled() {
return Ok(S3Response::new((
StatusCode::SERVICE_UNAVAILABLE,
Body::from("Profiler not enabled. Set RUSTFS_ENABLE_PROFILING=true to enable profiling".to_string()),
)));
}
let queries = extract_query_params(&req.uri);
let seconds = queries.get("seconds").and_then(|s| s.parse::<u64>().ok()).unwrap_or(30);
let format = queries.get("format").cloned().unwrap_or_else(|| "protobuf".to_string());
if seconds > 300 {
return Ok(S3Response::new((
StatusCode::BAD_REQUEST,
Body::from("Profile duration cannot exceed 300 seconds".to_string()),
)));
}
let guard = match profiling::get_profiler_guard() {
Some(guard) => guard,
None => {
return Ok(S3Response::new((
StatusCode::SERVICE_UNAVAILABLE,
Body::from("Profiler not initialized".to_string()),
)));
}
};
info!("Starting CPU profile collection for {} seconds", seconds);
tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
let guard_lock = match guard.lock() {
Ok(guard) => guard,
Err(_) => {
error!("Failed to acquire profiler guard lock");
return Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from("Failed to acquire profiler lock".to_string()),
)));
}
};
let report = match guard_lock.report().build() {
Ok(report) => report,
Err(e) => {
error!("Failed to build profiler report: {}", e);
return Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from(format!("Failed to build profile report: {}", e)),
)));
}
};
info!("CPU profile collection completed");
match format.as_str() {
"protobuf" | "pb" => {
let profile = report.pprof().unwrap();
let mut body = Vec::new();
if let Err(e) = profile.write_to_vec(&mut body) {
error!("Failed to serialize protobuf profile: {}", e);
return Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from("Failed to serialize profile".to_string()),
)));
}
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, "application/octet-stream".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(body)), headers))
}
"flamegraph" | "svg" => {
let mut flamegraph_buf = Vec::new();
match report.flamegraph(&mut flamegraph_buf) {
Ok(()) => (),
Err(e) => {
error!("Failed to generate flamegraph: {}", e);
return Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from(format!("Failed to generate flamegraph: {}", e)),
)));
}
};
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, "image/svg+xml".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(flamegraph_buf)), headers))
}
_ => Ok(S3Response::new((
StatusCode::BAD_REQUEST,
Body::from("Unsupported format. Use 'protobuf' or 'flamegraph'".to_string()),
))),
}
}
}
}
pub struct ProfileStatusHandler {}
#[async_trait::async_trait]
impl Operation for ProfileStatusHandler {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
use std::collections::HashMap;
#[cfg(target_os = "windows")]
let status = HashMap::from([
("enabled", "false"),
("status", "not_supported"),
("platform", "windows"),
("message", "CPU profiling is not supported on Windows platform"),
]);
#[cfg(not(target_os = "windows"))]
let status = {
use crate::profiling;
if profiling::is_profiler_enabled() {
HashMap::from([
("enabled", "true"),
("status", "running"),
("supported_formats", "protobuf, flamegraph"),
("max_duration_seconds", "300"),
("endpoint", "/rustfs/admin/debug/pprof/profile"),
])
} else {
HashMap::from([
("enabled", "false"),
("status", "disabled"),
("message", "Set RUSTFS_ENABLE_PROFILING=true to enable profiling"),
])
}
};
match serde_json::to_string(&status) {
Ok(json) => {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(json)), headers))
}
Err(e) => {
error!("Failed to serialize status: {}", e);
Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from("Failed to serialize status".to_string()),
)))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -214,6 +214,21 @@ pub fn make_admin_route(console_enabled: bool) -> std::io::Result<impl S3Route>
AdminOperation(&RemoveRemoteTargetHandler {}),
)?;
// Performance profiling endpoints (available on all platforms, with platform-specific responses)
#[cfg(not(target_os = "windows"))]
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/debug/pprof/profile").as_str(),
AdminOperation(&handlers::ProfileHandler {}),
)?;
#[cfg(not(target_os = "windows"))]
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/debug/pprof/status").as_str(),
AdminOperation(&handlers::ProfileStatusHandler {}),
)?;
Ok(r)
}

View File

@@ -18,6 +18,8 @@ mod config;
mod error;
// mod grpc;
pub mod license;
#[cfg(not(target_os = "windows"))]
mod profiling;
mod server;
mod storage;
mod update;
@@ -94,6 +96,10 @@ async fn main() -> Result<()> {
// Store in global storage
set_global_guard(guard).map_err(Error::other)?;
// Initialize performance profiling if enabled
#[cfg(not(target_os = "windows"))]
profiling::start_profiling_if_enabled();
// Run parameters
run(opt).await
}
@@ -102,10 +108,12 @@ async fn main() -> Result<()> {
async fn run(opt: config::Opt) -> Result<()> {
debug!("opt: {:?}", &opt);
// Initialize global DNS resolver early for enhanced DNS resolution
if let Err(e) = init_global_dns_resolver().await {
warn!("Failed to initialize global DNS resolver: {}. Using standard DNS resolution.", e);
}
// Initialize global DNS resolver early for enhanced DNS resolution (concurrent)
let dns_init = tokio::spawn(async {
if let Err(e) = init_global_dns_resolver().await {
warn!("Failed to initialize global DNS resolver: {}. Using standard DNS resolution.", e);
}
});
if let Some(region) = &opt.region {
rustfs_ecstore::global::set_global_region(region.clone());
@@ -176,6 +184,9 @@ async fn run(opt: config::Opt) -> Result<()> {
// Initialize event notifier
init_event_notifier().await;
// Wait for DNS initialization to complete before network-heavy operations
dns_init.await.map_err(Error::other)?;
let buckets_list = store
.list_bucket(&BucketOptions {
no_metadata: true,
@@ -187,14 +198,31 @@ async fn run(opt: config::Opt) -> Result<()> {
// Collect bucket names into a vector
let buckets: Vec<String> = buckets_list.into_iter().map(|v| v.name).collect();
// Initialize the bucket metadata system
init_bucket_metadata_sys(store.clone(), buckets.clone()).await;
// Parallelize initialization tasks for better network performance
let bucket_metadata_task = tokio::spawn({
let store = store.clone();
let buckets = buckets.clone();
async move {
init_bucket_metadata_sys(store, buckets).await;
}
});
// Initialize the IAM system
init_iam_sys(store.clone()).await?;
let iam_init_task = tokio::spawn({
let store = store.clone();
async move { init_iam_sys(store).await }
});
// add bucket notification configuration
add_bucket_notification_configuration(buckets).await;
let notification_config_task = tokio::spawn({
let buckets = buckets.clone();
async move {
add_bucket_notification_configuration(buckets).await;
}
});
// Wait for all parallel initialization tasks to complete
bucket_metadata_task.await.map_err(Error::other)?;
iam_init_task.await.map_err(Error::other)??;
notification_config_task.await.map_err(Error::other)?;
// Initialize the global notification system
new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| {
@@ -239,12 +267,13 @@ async fn run(opt: config::Opt) -> Result<()> {
// initialize bucket replication pool
init_bucket_replication_pool().await;
// Async update check (optional)
// Async update check with timeout (optional)
tokio::spawn(async {
use crate::update::{UpdateCheckError, check_updates};
match check_updates().await {
Ok(result) => {
// Add timeout to prevent hanging network calls
match tokio::time::timeout(std::time::Duration::from_secs(30), check_updates()).await {
Ok(Ok(result)) => {
if result.update_available {
if let Some(latest) = &result.latest_version {
info!(
@@ -262,12 +291,15 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("✅ Version check: Current version is up to date: {}", result.current_version);
}
}
Err(UpdateCheckError::HttpError(e)) => {
Ok(Err(UpdateCheckError::HttpError(e))) => {
debug!("Version check: network error (this is normal): {}", e);
}
Err(e) => {
Ok(Err(e)) => {
debug!("Version check: failed (this is normal): {}", e);
}
Err(_) => {
debug!("Version check: timeout after 30 seconds (this is normal)");
}
}
});
@@ -364,14 +396,12 @@ async fn init_event_notifier() {
info!("Event notifier configuration found, proceeding with initialization.");
// 3. Initialize the notification system asynchronously with a global configuration
// Put it into a separate task to avoid blocking the main initialization process
tokio::spawn(async move {
if let Err(e) = rustfs_notify::initialize(server_config).await {
error!("Failed to initialize event notifier system: {}", e);
} else {
info!("Event notifier system initialized successfully.");
}
});
// Use direct await for better error handling and faster initialization
if let Err(e) = rustfs_notify::initialize(server_config).await {
error!("Failed to initialize event notifier system: {}", e);
} else {
info!("Event notifier system initialized successfully.");
}
}
#[instrument(skip_all)]

63
rustfs/src/profiling.rs Normal file
View File

@@ -0,0 +1,63 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use pprof::ProfilerGuard;
use std::sync::{Arc, Mutex, OnceLock};
use tracing::info;
static PROFILER_GUARD: OnceLock<Arc<Mutex<ProfilerGuard<'static>>>> = OnceLock::new();
pub fn init_profiler() -> Result<(), Box<dyn std::error::Error>> {
let guard = pprof::ProfilerGuardBuilder::default()
.frequency(1000)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|e| format!("Failed to build profiler guard: {}", e))?;
PROFILER_GUARD
.set(Arc::new(Mutex::new(guard)))
.map_err(|_| "Failed to set profiler guard (already initialized)")?;
info!("Performance profiler initialized");
Ok(())
}
pub fn is_profiler_enabled() -> bool {
PROFILER_GUARD.get().is_some()
}
pub fn get_profiler_guard() -> Option<Arc<Mutex<ProfilerGuard<'static>>>> {
PROFILER_GUARD.get().cloned()
}
pub fn start_profiling_if_enabled() {
let enable_profiling = std::env::var("RUSTFS_ENABLE_PROFILING")
.unwrap_or_else(|_| "false".to_string())
.parse::<bool>()
.unwrap_or(false);
if enable_profiling {
match init_profiler() {
Ok(()) => {
info!("Performance profiling enabled via RUSTFS_ENABLE_PROFILING environment variable");
}
Err(e) => {
tracing::error!("Failed to initialize profiler: {}", e);
info!("Performance profiling disabled due to initialization error");
}
}
} else {
info!("Performance profiling disabled. Set RUSTFS_ENABLE_PROFILING=true to enable");
}
}

View File

@@ -157,6 +157,8 @@ pub async fn start_http_server(
b.build()
};
// Server will be created per connection - this ensures isolation
tokio::spawn(async move {
// Record the PID-related metrics of the current process
let meter = opentelemetry::global::meter("system");