Performance: improve (#514)

* Performance: improve

Signed-off-by: junxiang Mu <1948535941@qq.com>

* remove dirty

Signed-off-by: junxiang Mu <1948535941@qq.com>

* fix some err

Signed-off-by: junxiang Mu <1948535941@qq.com>

---------

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
guojidan
2025-09-11 19:48:28 +08:00
committed by GitHub
parent 70e6bec2a4
commit 62a01f3801
23 changed files with 1902 additions and 167 deletions

257
Cargo.lock generated
View File

@@ -75,6 +75,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "aligned-vec"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc890384c8602f339876ded803c97ad529f3842aba97f6392b3dba0dd171769b"
dependencies = [
"equator",
]
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
@@ -1645,6 +1654,15 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "cpp_demangle"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96e58d342ad113c2b878f16d5d034c03be492ae460cdbc02b7f0f2284d310c7d"
dependencies = [
"cfg-if",
]
[[package]]
name = "cpufeatures"
version = "0.2.17"
@@ -2439,6 +2457,15 @@ dependencies = [
"sqlparser",
]
[[package]]
name = "debugid"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d"
dependencies = [
"uuid",
]
[[package]]
name = "deflate64"
version = "0.1.9"
@@ -2696,6 +2723,26 @@ dependencies = [
"syn 2.0.106",
]
[[package]]
name = "equator"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4711b213838dfee0117e3be6ac926007d7f433d7bbe33595975d4190cb07e6fc"
dependencies = [
"equator-macro",
]
[[package]]
name = "equator-macro"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]]
name = "equivalent"
version = "1.0.2"
@@ -2785,6 +2832,18 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "findshlibs"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64"
dependencies = [
"cc",
"lazy_static",
"libc",
"winapi",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
@@ -2856,7 +2915,7 @@ checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095"
dependencies = [
"futures-core",
"futures-sink",
"spin",
"spin 0.9.8",
]
[[package]]
@@ -3620,6 +3679,24 @@ dependencies = [
"hashbrown 0.15.5",
]
[[package]]
name = "inferno"
version = "0.11.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
dependencies = [
"ahash",
"indexmap",
"is-terminal",
"itoa",
"log",
"num-format",
"once_cell",
"quick-xml 0.26.0",
"rgb",
"str_stack",
]
[[package]]
name = "inlinable_string"
version = "0.1.15"
@@ -3709,6 +3786,17 @@ dependencies = [
"serde",
]
[[package]]
name = "is-terminal"
version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9"
dependencies = [
"hermit-abi",
"libc",
"windows-sys 0.59.0",
]
[[package]]
name = "is_debug"
version = "1.1.0"
@@ -3815,7 +3903,7 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
dependencies = [
"spin",
"spin 0.9.8",
]
[[package]]
@@ -4115,6 +4203,15 @@ version = "2.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0"
[[package]]
name = "memmap2"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "843a98750cd611cc2965a8213b53b43e715f13c37a9e096c6408e69990961db7"
dependencies = [
"libc",
]
[[package]]
name = "memoffset"
version = "0.9.1"
@@ -4230,6 +4327,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "nix"
version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b"
dependencies = [
"bitflags 1.3.2",
"cfg-if",
"libc",
]
[[package]]
name = "nix"
version = "0.29.0"
@@ -4393,6 +4501,16 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-format"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3"
dependencies = [
"arrayvec",
"itoa",
]
[[package]]
name = "num-integer"
version = "0.1.46"
@@ -5044,6 +5162,30 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "pprof"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38a01da47675efa7673b032bf8efd8214f1917d89685e07e395ab125ea42b187"
dependencies = [
"aligned-vec",
"backtrace",
"cfg-if",
"findshlibs",
"inferno",
"libc",
"log",
"nix 0.26.4",
"once_cell",
"protobuf",
"protobuf-codegen",
"smallvec",
"spin 0.10.0",
"symbolic-demangle",
"tempfile",
"thiserror 2.0.16",
]
[[package]]
name = "ppv-lite86"
version = "0.2.21"
@@ -5181,6 +5323,57 @@ dependencies = [
"prost 0.14.1",
]
[[package]]
name = "protobuf"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4"
dependencies = [
"once_cell",
"protobuf-support",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf-codegen"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace"
dependencies = [
"anyhow",
"once_cell",
"protobuf",
"protobuf-parse",
"regex",
"tempfile",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf-parse"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973"
dependencies = [
"anyhow",
"indexmap",
"log",
"protobuf",
"protobuf-support",
"tempfile",
"thiserror 1.0.69",
"which",
]
[[package]]
name = "protobuf-support"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6"
dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "psm"
version = "0.1.26"
@@ -5210,6 +5403,15 @@ dependencies = [
"pulldown-cmark",
]
[[package]]
name = "quick-xml"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd"
dependencies = [
"memchr",
]
[[package]]
name = "quick-xml"
version = "0.37.5"
@@ -5592,6 +5794,15 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rgb"
version = "0.8.52"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c6a884d2998352bb4daf0183589aec883f16a6da1f4dde84d8e2e9a5409a1ce"
dependencies = [
"bytemuck",
]
[[package]]
name = "ring"
version = "0.17.14"
@@ -5786,6 +5997,7 @@ dependencies = [
"opentelemetry",
"percent-encoding",
"pin-project-lite",
"pprof",
"reqwest",
"rust-embed",
"rustfs-ahm",
@@ -5981,9 +6193,11 @@ dependencies = [
"hyper-util",
"lazy_static",
"md-5",
"moka",
"nix 0.30.1",
"num_cpus",
"once_cell",
"parking_lot",
"path-absolutize",
"pin-project-lite",
"quick-xml 0.38.3",
@@ -6086,6 +6300,7 @@ dependencies = [
"heapless",
"once_cell",
"parking_lot",
"pprof",
"rustfs-protos",
"serde",
"serde_json",
@@ -7146,6 +7361,15 @@ dependencies = [
"lock_api",
]
[[package]]
name = "spin"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.6.0"
@@ -7223,6 +7447,12 @@ dependencies = [
"thiserror 2.0.16",
]
[[package]]
name = "str_stack"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
[[package]]
name = "strsim"
version = "0.11.1"
@@ -7334,6 +7564,29 @@ dependencies = [
"sval_nested",
]
[[package]]
name = "symbolic-common"
version = "12.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9da12f8fecbbeaa1ee62c1d50dc656407e007c3ee7b2a41afce4b5089eaef15e"
dependencies = [
"debugid",
"memmap2",
"stable_deref_trait",
"uuid",
]
[[package]]
name = "symbolic-demangle"
version = "12.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fd35afe0ef9d35d3dcd41c67ddf882fc832a387221338153b7cd685a105495c"
dependencies = [
"cpp_demangle",
"rustc-demangle",
"symbolic-common",
]
[[package]]
name = "syn"
version = "1.0.109"

View File

@@ -176,6 +176,7 @@ opentelemetry-semantic-conventions = { version = "0.30.0", features = [
parking_lot = "0.12.4"
path-absolutize = "3.1.1"
path-clean = "1.0.1"
pprof = { version = "0.15.0", features = ["flamegraph", "protobuf-codec"] }
blake3 = { version = "1.8.2" }
pbkdf2 = "0.12.2"
percent-encoding = "2.3.2"

View File

@@ -101,6 +101,8 @@ rustfs-signer.workspace = true
rustfs-checksums.workspace = true
futures-util.workspace = true
async-recursion.workspace = true
parking_lot = "0.12"
moka = { version = "0.12", features = ["future"] }
[target.'cfg(not(windows))'.dependencies]
nix = { workspace = true }

View File

@@ -0,0 +1,231 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! High-performance batch processor using JoinSet
//!
//! This module provides optimized batching utilities to reduce async runtime overhead
//! and improve concurrent operation performance.
use crate::disk::error::{Error, Result};
use std::future::Future;
use std::sync::Arc;
use tokio::task::JoinSet;
/// Batch processor that executes tasks concurrently with a semaphore
pub struct AsyncBatchProcessor {
max_concurrent: usize,
}
impl AsyncBatchProcessor {
pub fn new(max_concurrent: usize) -> Self {
Self { max_concurrent }
}
/// Execute a batch of tasks concurrently with concurrency control
pub async fn execute_batch<T, F>(&self, tasks: Vec<F>) -> Vec<Result<T>>
where
T: Send + 'static,
F: Future<Output = Result<T>> + Send + 'static,
{
if tasks.is_empty() {
return Vec::new();
}
let semaphore = Arc::new(tokio::sync::Semaphore::new(self.max_concurrent));
let mut join_set = JoinSet::new();
let mut results = Vec::with_capacity(tasks.len());
for _ in 0..tasks.len() {
results.push(Err(Error::other("Not completed")));
}
// Spawn all tasks with semaphore control
for (i, task) in tasks.into_iter().enumerate() {
let sem = semaphore.clone();
join_set.spawn(async move {
let _permit = sem.acquire().await.map_err(|_| Error::other("Semaphore error"))?;
let result = task.await;
Ok::<(usize, Result<T>), Error>((i, result))
});
}
// Collect results
while let Some(join_result) = join_set.join_next().await {
match join_result {
Ok(Ok((index, task_result))) => {
if index < results.len() {
results[index] = task_result;
}
}
Ok(Err(e)) => {
// Semaphore or other system error - this is rare
tracing::warn!("Batch processor system error: {:?}", e);
}
Err(join_error) => {
// Task panicked - log but continue
tracing::warn!("Task panicked in batch processor: {:?}", join_error);
}
}
}
results
}
/// Execute batch with early termination when sufficient successful results are obtained
pub async fn execute_batch_with_quorum<T, F>(&self, tasks: Vec<F>, required_successes: usize) -> Result<Vec<T>>
where
T: Send + 'static,
F: Future<Output = Result<T>> + Send + 'static,
{
let results = self.execute_batch(tasks).await;
let mut successes = Vec::new();
for value in results.into_iter().flatten() {
successes.push(value);
if successes.len() >= required_successes {
return Ok(successes);
}
}
if successes.len() >= required_successes {
Ok(successes)
} else {
Err(Error::other(format!(
"Insufficient successful results: got {}, needed {}",
successes.len(),
required_successes
)))
}
}
}
/// Global batch processor instances
pub struct GlobalBatchProcessors {
read_processor: AsyncBatchProcessor,
write_processor: AsyncBatchProcessor,
metadata_processor: AsyncBatchProcessor,
}
impl GlobalBatchProcessors {
pub fn new() -> Self {
Self {
read_processor: AsyncBatchProcessor::new(16), // Higher concurrency for reads
write_processor: AsyncBatchProcessor::new(8), // Lower concurrency for writes
metadata_processor: AsyncBatchProcessor::new(12), // Medium concurrency for metadata
}
}
pub fn read_processor(&self) -> &AsyncBatchProcessor {
&self.read_processor
}
pub fn write_processor(&self) -> &AsyncBatchProcessor {
&self.write_processor
}
pub fn metadata_processor(&self) -> &AsyncBatchProcessor {
&self.metadata_processor
}
}
impl Default for GlobalBatchProcessors {
fn default() -> Self {
Self::new()
}
}
// Global instance
use std::sync::OnceLock;
static GLOBAL_PROCESSORS: OnceLock<GlobalBatchProcessors> = OnceLock::new();
pub fn get_global_processors() -> &'static GlobalBatchProcessors {
GLOBAL_PROCESSORS.get_or_init(GlobalBatchProcessors::new)
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[tokio::test]
async fn test_batch_processor_basic() {
let processor = AsyncBatchProcessor::new(4);
let tasks: Vec<_> = (0..10)
.map(|i| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
Ok::<i32, Error>(i)
})
.collect();
let results = processor.execute_batch(tasks).await;
assert_eq!(results.len(), 10);
// All tasks should succeed
for (i, result) in results.iter().enumerate() {
assert!(result.is_ok());
assert_eq!(result.as_ref().unwrap(), &(i as i32));
}
}
#[tokio::test]
async fn test_batch_processor_with_errors() {
let processor = AsyncBatchProcessor::new(2);
let tasks: Vec<_> = (0..5)
.map(|i| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
if i % 2 == 0 {
Ok::<i32, Error>(i)
} else {
Err(Error::other("Test error"))
}
})
.collect();
let results = processor.execute_batch(tasks).await;
assert_eq!(results.len(), 5);
// Check results pattern
for (i, result) in results.iter().enumerate() {
if i % 2 == 0 {
assert!(result.is_ok());
assert_eq!(result.as_ref().unwrap(), &(i as i32));
} else {
assert!(result.is_err());
}
}
}
#[tokio::test]
async fn test_batch_processor_quorum() {
let processor = AsyncBatchProcessor::new(4);
let tasks: Vec<_> = (0..10)
.map(|i| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
if i < 3 {
Ok::<i32, Error>(i)
} else {
Err(Error::other("Test error"))
}
})
.collect();
let results = processor.execute_batch_with_quorum(tasks, 2).await;
assert!(results.is_ok());
let successes = results.unwrap();
assert!(successes.len() >= 2);
}
}

View File

@@ -321,7 +321,7 @@ impl ExpiryState {
let mut state = GLOBAL_ExpiryState.write().await;
while state.tasks_tx.len() < n {
let (tx, rx) = mpsc::channel(10000);
let (tx, rx) = mpsc::channel(1000);
let api = api.clone();
let rx = Arc::new(tokio::sync::Mutex::new(rx));
state.tasks_tx.push(tx);
@@ -432,7 +432,7 @@ pub struct TransitionState {
impl TransitionState {
#[allow(clippy::new_ret_no_self)]
pub fn new() -> Arc<Self> {
let (tx1, rx1) = bounded(100000);
let (tx1, rx1) = bounded(1000);
let (tx2, rx2) = bounded(1);
Arc::new(Self {
transition_tx: tx1,
@@ -467,8 +467,12 @@ impl TransitionState {
}
pub async fn init(api: Arc<ECStore>) {
let mut n = 10; //globalAPIConfig.getTransitionWorkers();
let tw = 10; //globalILMConfig.getTransitionWorkers();
let max_workers = std::env::var("RUSTFS_MAX_TRANSITION_WORKERS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or_else(|| std::cmp::min(num_cpus::get() as i64, 16));
let mut n = max_workers;
let tw = 8; //globalILMConfig.getTransitionWorkers();
if tw > 0 {
n = tw;
}
@@ -561,8 +565,18 @@ impl TransitionState {
pub async fn update_workers_inner(api: Arc<ECStore>, n: i64) {
let mut n = n;
if n == 0 {
n = 100;
let max_workers = std::env::var("RUSTFS_MAX_TRANSITION_WORKERS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or_else(|| std::cmp::min(num_cpus::get() as i64, 16));
n = max_workers;
}
// Allow environment override of maximum workers
let absolute_max = std::env::var("RUSTFS_ABSOLUTE_MAX_WORKERS")
.ok()
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(32);
n = std::cmp::min(n, absolute_max);
let mut num_workers = GLOBAL_TransitionState.num_workers.load(Ordering::SeqCst);
while num_workers < n {
@@ -585,7 +599,10 @@ impl TransitionState {
}
pub async fn init_background_expiry(api: Arc<ECStore>) {
let mut workers = num_cpus::get() / 2;
let mut workers = std::env::var("RUSTFS_MAX_EXPIRY_WORKERS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or_else(|| std::cmp::min(num_cpus::get(), 16));
//globalILMConfig.getExpirationWorkers()
if let Ok(env_expiration_workers) = env::var("_RUSTFS_ILM_EXPIRATION_WORKERS") {
if let Ok(num_expirations) = env_expiration_workers.parse::<usize>() {
@@ -594,7 +611,10 @@ pub async fn init_background_expiry(api: Arc<ECStore>) {
}
if workers == 0 {
workers = 100;
workers = std::env::var("RUSTFS_DEFAULT_EXPIRY_WORKERS")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(8);
}
//let expiry_state = GLOBAL_ExpiryStSate.write().await;

View File

@@ -12,13 +12,45 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{fs::Metadata, path::Path};
use std::{
fs::Metadata,
path::Path,
sync::{Arc, OnceLock},
};
use tokio::{
fs::{self, File},
io,
};
static READONLY_OPTIONS: OnceLock<Arc<fs::OpenOptions>> = OnceLock::new();
static WRITEONLY_OPTIONS: OnceLock<Arc<fs::OpenOptions>> = OnceLock::new();
static READWRITE_OPTIONS: OnceLock<Arc<fs::OpenOptions>> = OnceLock::new();
fn get_readonly_options() -> &'static Arc<fs::OpenOptions> {
READONLY_OPTIONS.get_or_init(|| {
let mut opts = fs::OpenOptions::new();
opts.read(true);
Arc::new(opts)
})
}
fn get_writeonly_options() -> &'static Arc<fs::OpenOptions> {
WRITEONLY_OPTIONS.get_or_init(|| {
let mut opts = fs::OpenOptions::new();
opts.write(true);
Arc::new(opts)
})
}
fn get_readwrite_options() -> &'static Arc<fs::OpenOptions> {
READWRITE_OPTIONS.get_or_init(|| {
let mut opts = fs::OpenOptions::new();
opts.read(true).write(true);
Arc::new(opts)
})
}
#[cfg(not(windows))]
pub fn same_file(f1: &Metadata, f2: &Metadata) -> bool {
use std::os::unix::fs::MetadataExt;
@@ -84,35 +116,28 @@ pub const O_APPEND: FileMode = 0x00400;
// create_new: bool,
pub async fn open_file(path: impl AsRef<Path>, mode: FileMode) -> io::Result<File> {
let mut opts = fs::OpenOptions::new();
match mode & (O_RDONLY | O_WRONLY | O_RDWR) {
O_RDONLY => {
opts.read(true);
}
O_WRONLY => {
opts.write(true);
}
O_RDWR => {
opts.read(true);
opts.write(true);
}
_ => (),
let base_opts = match mode & (O_RDONLY | O_WRONLY | O_RDWR) {
O_RDONLY => get_readonly_options(),
O_WRONLY => get_writeonly_options(),
O_RDWR => get_readwrite_options(),
_ => get_readonly_options(),
};
if mode & O_CREATE != 0 {
opts.create(true);
if (mode & (O_CREATE | O_APPEND | O_TRUNC)) != 0 {
let mut opts = (**base_opts).clone();
if mode & O_CREATE != 0 {
opts.create(true);
}
if mode & O_APPEND != 0 {
opts.append(true);
}
if mode & O_TRUNC != 0 {
opts.truncate(true);
}
opts.open(path.as_ref()).await
} else {
base_opts.open(path.as_ref()).await
}
if mode & O_APPEND != 0 {
opts.append(true);
}
if mode & O_TRUNC != 0 {
opts.truncate(true);
}
opts.open(path.as_ref()).await
}
pub async fn access(path: impl AsRef<Path>) -> io::Result<()> {
@@ -121,7 +146,7 @@ pub async fn access(path: impl AsRef<Path>) -> io::Result<()> {
}
pub fn access_std(path: impl AsRef<Path>) -> io::Result<()> {
tokio::task::block_in_place(|| std::fs::metadata(path))?;
std::fs::metadata(path)?;
Ok(())
}
@@ -130,7 +155,7 @@ pub async fn lstat(path: impl AsRef<Path>) -> io::Result<Metadata> {
}
pub fn lstat_std(path: impl AsRef<Path>) -> io::Result<Metadata> {
tokio::task::block_in_place(|| std::fs::metadata(path))
std::fs::metadata(path)
}
pub async fn make_dir_all(path: impl AsRef<Path>) -> io::Result<()> {
@@ -159,26 +184,22 @@ pub async fn remove_all(path: impl AsRef<Path>) -> io::Result<()> {
#[tracing::instrument(level = "debug", skip_all)]
pub fn remove_std(path: impl AsRef<Path>) -> io::Result<()> {
let path = path.as_ref();
tokio::task::block_in_place(|| {
let meta = std::fs::metadata(path)?;
if meta.is_dir() {
std::fs::remove_dir(path)
} else {
std::fs::remove_file(path)
}
})
let meta = std::fs::metadata(path)?;
if meta.is_dir() {
std::fs::remove_dir(path)
} else {
std::fs::remove_file(path)
}
}
pub fn remove_all_std(path: impl AsRef<Path>) -> io::Result<()> {
let path = path.as_ref();
tokio::task::block_in_place(|| {
let meta = std::fs::metadata(path)?;
if meta.is_dir() {
std::fs::remove_dir_all(path)
} else {
std::fs::remove_file(path)
}
})
let meta = std::fs::metadata(path)?;
if meta.is_dir() {
std::fs::remove_dir_all(path)
} else {
std::fs::remove_file(path)
}
}
pub async fn mkdir(path: impl AsRef<Path>) -> io::Result<()> {
@@ -190,7 +211,7 @@ pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<
}
pub fn rename_std(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<()> {
tokio::task::block_in_place(|| std::fs::rename(from, to))
std::fs::rename(from, to)
}
#[tracing::instrument(level = "debug", skip_all)]

View File

@@ -41,18 +41,21 @@ use tokio::time::interval;
use crate::erasure_coding::bitrot_verify;
use bytes::Bytes;
use path_absolutize::Absolutize;
// use path_absolutize::Absolutize; // Replaced with direct path operations for better performance
use crate::file_cache::{get_global_file_cache, prefetch_metadata_patterns, read_metadata_cached};
use parking_lot::RwLock as ParkingLotRwLock;
use rustfs_filemeta::{
Cache, FileInfo, FileInfoOpts, FileMeta, MetaCacheEntry, MetacacheWriter, ObjectPartInfo, Opts, RawFileInfo, UpdateFn,
get_file_info, read_xl_meta_no_data,
};
use rustfs_utils::HashAlgorithm;
use rustfs_utils::os::get_info;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::io::SeekFrom;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use std::{
fs::Metadata,
@@ -101,6 +104,9 @@ pub struct LocalDisk {
pub major: u64,
pub minor: u64,
pub nrrequests: u64,
// Performance optimization fields
path_cache: Arc<ParkingLotRwLock<HashMap<String, PathBuf>>>,
current_dir: Arc<OnceLock<PathBuf>>,
// pub id: Mutex<Option<Uuid>>,
// pub format_data: Mutex<Vec<u8>>,
// pub format_file_info: Mutex<Option<Metadata>>,
@@ -130,8 +136,9 @@ impl Debug for LocalDisk {
impl LocalDisk {
pub async fn new(ep: &Endpoint, cleanup: bool) -> Result<Self> {
debug!("Creating local disk");
let root = match PathBuf::from(ep.get_file_path()).absolutize() {
Ok(path) => path.into_owned(),
// Use optimized path resolution instead of absolutize() for better performance
let root = match std::fs::canonicalize(ep.get_file_path()) {
Ok(path) => path,
Err(e) => {
if e.kind() == ErrorKind::NotFound {
return Err(DiskError::VolumeNotFound);
@@ -144,10 +151,8 @@ impl LocalDisk {
// TODO: 删除 tmp 数据
}
let format_path = Path::new(RUSTFS_META_BUCKET)
.join(Path::new(super::FORMAT_CONFIG_FILE))
.absolutize_virtually(&root)?
.into_owned();
// Use optimized path resolution instead of absolutize_virtually
let format_path = root.join(RUSTFS_META_BUCKET).join(super::FORMAT_CONFIG_FILE);
debug!("format_path: {:?}", format_path);
let (format_data, format_meta) = read_file_exists(&format_path).await?;
@@ -227,6 +232,8 @@ impl LocalDisk {
// format_file_info: Mutex::new(format_meta),
// format_data: Mutex::new(format_data),
// format_last_check: Mutex::new(format_last_check),
path_cache: Arc::new(ParkingLotRwLock::new(HashMap::with_capacity(2048))),
current_dir: Arc::new(OnceLock::new()),
exit_signal: None,
};
let (info, _root) = get_disk_info(root).await?;
@@ -351,19 +358,178 @@ impl LocalDisk {
self.make_volumes(defaults).await
}
// Optimized path resolution with caching
pub fn resolve_abs_path(&self, path: impl AsRef<Path>) -> Result<PathBuf> {
Ok(path.as_ref().absolutize_virtually(&self.root)?.into_owned())
let path_ref = path.as_ref();
let path_str = path_ref.to_string_lossy();
// Fast cache read
{
let cache = self.path_cache.read();
if let Some(cached_path) = cache.get(path_str.as_ref()) {
return Ok(cached_path.clone());
}
}
// Calculate absolute path without using path_absolutize for better performance
let abs_path = if path_ref.is_absolute() {
path_ref.to_path_buf()
} else {
self.root.join(path_ref)
};
// Normalize path components to avoid filesystem calls
let normalized = self.normalize_path_components(&abs_path);
// Cache the result
{
let mut cache = self.path_cache.write();
// Simple cache size control
if cache.len() >= 4096 {
// Clear half the cache - simple eviction strategy
let keys_to_remove: Vec<_> = cache.keys().take(cache.len() / 2).cloned().collect();
for key in keys_to_remove {
cache.remove(&key);
}
}
cache.insert(path_str.into_owned(), normalized.clone());
}
Ok(normalized)
}
// Lightweight path normalization without filesystem calls
fn normalize_path_components(&self, path: &Path) -> PathBuf {
let mut result = PathBuf::new();
for component in path.components() {
match component {
std::path::Component::Normal(name) => {
result.push(name);
}
std::path::Component::ParentDir => {
result.pop();
}
std::path::Component::CurDir => {
// Ignore current directory components
}
std::path::Component::RootDir => {
result.push(component);
}
std::path::Component::Prefix(_prefix) => {
result.push(component);
}
}
}
result
}
// Highly optimized object path generation
pub fn get_object_path(&self, bucket: &str, key: &str) -> Result<PathBuf> {
let dir = Path::new(&bucket);
let file_path = Path::new(&key);
self.resolve_abs_path(dir.join(file_path))
// For high-frequency paths, use faster string concatenation
let cache_key = if key.is_empty() {
bucket.to_string()
} else {
// Use with_capacity to pre-allocate, reducing memory reallocations
let mut path_str = String::with_capacity(bucket.len() + key.len() + 1);
path_str.push_str(bucket);
path_str.push('/');
path_str.push_str(key);
path_str
};
// Fast path: directly calculate based on root, avoiding cache lookup overhead for simple cases
Ok(self.root.join(&cache_key))
}
pub fn get_bucket_path(&self, bucket: &str) -> Result<PathBuf> {
let dir = Path::new(&bucket);
self.resolve_abs_path(dir)
Ok(self.root.join(bucket))
}
// Batch path generation with single lock acquisition
pub fn get_object_paths_batch(&self, requests: &[(String, String)]) -> Result<Vec<PathBuf>> {
let mut results = Vec::with_capacity(requests.len());
let mut cache_misses = Vec::new();
// First attempt to get all paths from cache
{
let cache = self.path_cache.read();
for (i, (bucket, key)) in requests.iter().enumerate() {
let cache_key = format!("{}/{}", bucket, key);
if let Some(cached_path) = cache.get(&cache_key) {
results.push((i, cached_path.clone()));
} else {
cache_misses.push((i, bucket, key, cache_key));
}
}
}
// Handle cache misses
if !cache_misses.is_empty() {
let mut new_entries = Vec::new();
for (i, _bucket, _key, cache_key) in cache_misses {
let path = self.root.join(&cache_key);
results.push((i, path.clone()));
new_entries.push((cache_key, path));
}
// Batch update cache
{
let mut cache = self.path_cache.write();
for (key, path) in new_entries {
cache.insert(key, path);
}
}
}
// Sort results back to original order
results.sort_by_key(|(i, _)| *i);
Ok(results.into_iter().map(|(_, path)| path).collect())
}
// Optimized metadata reading with caching
pub async fn read_metadata_cached(&self, path: PathBuf) -> Result<Arc<FileMeta>> {
read_metadata_cached(path).await
}
// Smart prefetching for related files
pub async fn read_version_with_prefetch(
&self,
volume: &str,
path: &str,
version_id: &str,
opts: &ReadOptions,
) -> Result<FileInfo> {
let file_path = self.get_object_path(volume, path)?;
// Async prefetch related files, don't block current read
if let Some(parent) = file_path.parent() {
prefetch_metadata_patterns(parent, &[super::STORAGE_FORMAT_FILE, "part.1", "part.2", "part.meta"]).await;
}
// Main read logic
let file_dir = self.get_bucket_path(volume)?;
let (data, _) = self.read_raw(volume, file_dir, file_path, opts.read_data).await?;
get_file_info(&data, volume, path, version_id, FileInfoOpts { data: opts.read_data })
.await
.map_err(|_e| DiskError::Unexpected)
}
// Batch metadata reading for multiple objects
pub async fn read_metadata_batch(&self, requests: Vec<(String, String)>) -> Result<Vec<Option<Arc<FileMeta>>>> {
let paths: Vec<PathBuf> = requests
.iter()
.map(|(bucket, key)| self.get_object_path(bucket, &format!("{}/{}", key, super::STORAGE_FORMAT_FILE)))
.collect::<Result<Vec<_>>>()?;
let cache = get_global_file_cache();
let results = cache.get_metadata_batch(paths).await;
Ok(results.into_iter().map(|r| r.ok()).collect())
}
// /// Write to the filesystem atomically.
@@ -549,7 +715,15 @@ impl LocalDisk {
}
async fn read_metadata(&self, file_path: impl AsRef<Path>) -> Result<Vec<u8>> {
// TODO: support timeout
// Try to use cached file content reading for better performance, with safe fallback
let path = file_path.as_ref().to_path_buf();
// First, try the cache
if let Ok(bytes) = get_global_file_cache().get_file_content(path.clone()).await {
return Ok(bytes.to_vec());
}
// Fallback to direct read if cache fails
let (data, _) = self.read_metadata_with_dmtime(file_path.as_ref()).await?;
Ok(data)
}

View File

@@ -668,7 +668,7 @@ pub struct VolumeInfo {
pub created: Option<OffsetDateTime>,
}
#[derive(Deserialize, Serialize, Debug, Default)]
#[derive(Deserialize, Serialize, Debug, Default, Clone)]
pub struct ReadOptions {
pub incl_free_versions: bool,
pub read_data: bool,

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_utils::{XHost, check_local_server_addr, get_host_ip, get_host_ip_async, is_local_host};
use rustfs_utils::{XHost, check_local_server_addr, get_host_ip, is_local_host};
use tracing::{error, instrument, warn};
use crate::{
@@ -248,8 +248,7 @@ impl PoolEndpointList {
Ok(ips) => ips,
Err(e) => {
error!("host {} not found, error:{}", host, e);
get_host_ip_async(host.clone())
.map_err(|e| Error::other(format!("host '{host}' cannot resolve: {e}")))?
return Err(Error::other(format!("host '{host}' cannot resolve: {e}")));
}
};
host_ip_cache.insert(host.clone(), ips);

View File

@@ -0,0 +1,332 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! High-performance file content and metadata caching using moka
//!
//! This module provides optimized caching for file operations to reduce
//! redundant I/O and improve overall system performance.
use super::disk::error::{Error, Result};
use bytes::Bytes;
use moka::future::Cache;
use rustfs_filemeta::FileMeta;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
pub struct OptimizedFileCache {
// Use moka as high-performance async cache
metadata_cache: Cache<PathBuf, Arc<FileMeta>>,
file_content_cache: Cache<PathBuf, Bytes>,
// Performance monitoring
cache_hits: std::sync::atomic::AtomicU64,
cache_misses: std::sync::atomic::AtomicU64,
}
impl OptimizedFileCache {
pub fn new() -> Self {
Self {
metadata_cache: Cache::builder()
.max_capacity(2048)
.time_to_live(Duration::from_secs(300)) // 5 minutes TTL
.time_to_idle(Duration::from_secs(60)) // 1 minute idle
.build(),
file_content_cache: Cache::builder()
.max_capacity(512) // Smaller file content cache
.time_to_live(Duration::from_secs(120))
.weigher(|_key: &PathBuf, value: &Bytes| value.len() as u32)
.build(),
cache_hits: std::sync::atomic::AtomicU64::new(0),
cache_misses: std::sync::atomic::AtomicU64::new(0),
}
}
pub async fn get_metadata(&self, path: PathBuf) -> Result<Arc<FileMeta>> {
if let Some(cached) = self.metadata_cache.get(&path).await {
self.cache_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(cached);
}
self.cache_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// Cache miss, read file
let data = tokio::fs::read(&path)
.await
.map_err(|e| Error::other(format!("Read metadata failed: {}", e)))?;
let mut meta = FileMeta::default();
meta.unmarshal_msg(&data)?;
let arc_meta = Arc::new(meta);
self.metadata_cache.insert(path, arc_meta.clone()).await;
Ok(arc_meta)
}
pub async fn get_file_content(&self, path: PathBuf) -> Result<Bytes> {
if let Some(cached) = self.file_content_cache.get(&path).await {
self.cache_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Ok(cached);
}
self.cache_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let data = tokio::fs::read(&path)
.await
.map_err(|e| Error::other(format!("Read file failed: {}", e)))?;
let bytes = Bytes::from(data);
self.file_content_cache.insert(path, bytes.clone()).await;
Ok(bytes)
}
// Prefetch related files
pub async fn prefetch_related(&self, base_path: &Path, patterns: &[&str]) {
let mut prefetch_tasks = Vec::new();
for pattern in patterns {
let path = base_path.join(pattern);
if tokio::fs::metadata(&path).await.is_ok() {
let cache = self.clone();
let path_clone = path.clone();
prefetch_tasks.push(async move {
let _ = cache.get_metadata(path_clone).await;
});
}
}
// Parallel prefetch, don't wait for completion
if !prefetch_tasks.is_empty() {
tokio::spawn(async move {
futures::future::join_all(prefetch_tasks).await;
});
}
}
// Batch metadata reading with deduplication
pub async fn get_metadata_batch(
&self,
paths: Vec<PathBuf>,
) -> Vec<std::result::Result<Arc<FileMeta>, rustfs_filemeta::Error>> {
let mut results = Vec::with_capacity(paths.len());
let mut cache_futures = Vec::new();
// First, attempt to get from cache
for (i, path) in paths.iter().enumerate() {
if let Some(cached) = self.metadata_cache.get(path).await {
results.push((i, Ok(cached)));
self.cache_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
} else {
cache_futures.push((i, path.clone()));
}
}
// For cache misses, read from filesystem
if !cache_futures.is_empty() {
let mut fs_results = Vec::new();
for (i, path) in cache_futures {
self.cache_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
match tokio::fs::read(&path).await {
Ok(data) => {
let mut meta = FileMeta::default();
match meta.unmarshal_msg(&data) {
Ok(_) => {
let arc_meta = Arc::new(meta);
self.metadata_cache.insert(path, arc_meta.clone()).await;
fs_results.push((i, Ok(arc_meta)));
}
Err(e) => {
fs_results.push((i, Err(e)));
}
}
}
Err(_e) => {
fs_results.push((i, Err(rustfs_filemeta::Error::Unexpected)));
}
}
}
results.extend(fs_results);
}
// Sort results back to original order
results.sort_by_key(|(i, _)| *i);
results.into_iter().map(|(_, result)| result).collect()
}
// Invalidate cache entries for a path
pub async fn invalidate(&self, path: &Path) {
self.metadata_cache.remove(path).await;
self.file_content_cache.remove(path).await;
}
// Get cache statistics
pub fn get_stats(&self) -> FileCacheStats {
let hits = self.cache_hits.load(std::sync::atomic::Ordering::Relaxed);
let misses = self.cache_misses.load(std::sync::atomic::Ordering::Relaxed);
let hit_rate = if hits + misses > 0 {
(hits as f64 / (hits + misses) as f64) * 100.0
} else {
0.0
};
FileCacheStats {
metadata_cache_size: self.metadata_cache.entry_count(),
content_cache_size: self.file_content_cache.entry_count(),
cache_hits: hits,
cache_misses: misses,
hit_rate,
total_weight: 0, // Simplified for compatibility
}
}
// Clear all caches
pub async fn clear(&self) {
self.metadata_cache.invalidate_all();
self.file_content_cache.invalidate_all();
// Wait for invalidation to complete
self.metadata_cache.run_pending_tasks().await;
self.file_content_cache.run_pending_tasks().await;
}
}
impl Clone for OptimizedFileCache {
fn clone(&self) -> Self {
Self {
metadata_cache: self.metadata_cache.clone(),
file_content_cache: self.file_content_cache.clone(),
cache_hits: std::sync::atomic::AtomicU64::new(self.cache_hits.load(std::sync::atomic::Ordering::Relaxed)),
cache_misses: std::sync::atomic::AtomicU64::new(self.cache_misses.load(std::sync::atomic::Ordering::Relaxed)),
}
}
}
#[derive(Debug)]
pub struct FileCacheStats {
pub metadata_cache_size: u64,
pub content_cache_size: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub hit_rate: f64,
pub total_weight: u64,
}
impl Default for OptimizedFileCache {
fn default() -> Self {
Self::new()
}
}
// Global cache instance
use std::sync::OnceLock;
static GLOBAL_FILE_CACHE: OnceLock<OptimizedFileCache> = OnceLock::new();
pub fn get_global_file_cache() -> &'static OptimizedFileCache {
GLOBAL_FILE_CACHE.get_or_init(OptimizedFileCache::new)
}
// Utility functions for common operations
pub async fn read_metadata_cached(path: PathBuf) -> Result<Arc<FileMeta>> {
get_global_file_cache().get_metadata(path).await
}
pub async fn read_file_content_cached(path: PathBuf) -> Result<Bytes> {
get_global_file_cache().get_file_content(path).await
}
pub async fn prefetch_metadata_patterns(base_path: &Path, patterns: &[&str]) {
get_global_file_cache().prefetch_related(base_path, patterns).await;
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::tempdir;
#[tokio::test]
async fn test_file_cache_basic() {
let cache = OptimizedFileCache::new();
// Create a temporary file
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.txt");
let mut file = std::fs::File::create(&file_path).unwrap();
writeln!(file, "test content").unwrap();
drop(file);
// First read should be cache miss
let content1 = cache.get_file_content(file_path.clone()).await.unwrap();
assert_eq!(content1, Bytes::from("test content\n"));
// Second read should be cache hit
let content2 = cache.get_file_content(file_path.clone()).await.unwrap();
assert_eq!(content2, content1);
let stats = cache.get_stats();
assert!(stats.cache_hits > 0);
assert!(stats.cache_misses > 0);
}
#[tokio::test]
async fn test_metadata_batch_read() {
let cache = OptimizedFileCache::new();
// Create test files
let dir = tempdir().unwrap();
let mut paths = Vec::new();
for i in 0..5 {
let file_path = dir.path().join(format!("test_{}.txt", i));
let mut file = std::fs::File::create(&file_path).unwrap();
writeln!(file, "content {}", i).unwrap();
paths.push(file_path);
}
// Note: This test would need actual FileMeta files to work properly
// For now, we just test that the function runs without errors
let results = cache.get_metadata_batch(paths).await;
assert_eq!(results.len(), 5);
}
#[tokio::test]
async fn test_cache_invalidation() {
let cache = OptimizedFileCache::new();
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.txt");
let mut file = std::fs::File::create(&file_path).unwrap();
writeln!(file, "test content").unwrap();
drop(file);
// Read file to populate cache
let _ = cache.get_file_content(file_path.clone()).await.unwrap();
// Invalidate cache
cache.invalidate(&file_path).await;
// Next read should be cache miss again
let _ = cache.get_file_content(file_path.clone()).await.unwrap();
let stats = cache.get_stats();
assert!(stats.cache_misses >= 2);
}
}

View File

@@ -16,6 +16,7 @@
extern crate core;
pub mod admin_server_info;
pub mod batch_processor;
pub mod bitrot;
pub mod bucket;
pub mod cache_value;
@@ -29,6 +30,7 @@ pub mod disks_layout;
pub mod endpoints;
pub mod erasure_coding;
pub mod error;
pub mod file_cache;
pub mod global;
pub mod lock_utils;
pub mod metrics_realtime;

View File

@@ -15,6 +15,7 @@
#![allow(unused_imports)]
#![allow(unused_variables)]
use crate::batch_processor::{AsyncBatchProcessor, get_global_processors};
use crate::bitrot::{create_bitrot_reader, create_bitrot_writer};
use crate::bucket::lifecycle::lifecycle::TRANSITION_COMPLETE;
use crate::bucket::versioning::VersioningApi;
@@ -232,7 +233,10 @@ impl SetDisks {
});
}
let results = join_all(futures).await;
// Use optimized batch processor for disk info retrieval
let processor = get_global_processors().metadata_processor();
let results = processor.execute_batch(futures).await;
for result in results {
match result {
Ok(res) => {
@@ -507,21 +511,28 @@ impl SetDisks {
#[tracing::instrument(skip(disks))]
async fn cleanup_multipart_path(disks: &[Option<DiskStore>], paths: &[String]) {
let mut futures = Vec::with_capacity(disks.len());
let mut errs = Vec::with_capacity(disks.len());
for disk in disks.iter() {
futures.push(async move {
if let Some(disk) = disk {
disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, paths).await
} else {
Err(DiskError::DiskNotFound)
// Use improved simple batch processor instead of join_all for better performance
let processor = get_global_processors().write_processor();
let tasks: Vec<_> = disks
.iter()
.map(|disk| {
let disk = disk.clone();
let paths = paths.to_vec();
async move {
if let Some(disk) = disk {
disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, &paths).await
} else {
Err(DiskError::DiskNotFound)
}
}
})
}
.collect();
let results = join_all(futures).await;
let results = processor.execute_batch(tasks).await;
for result in results {
match result {
Ok(_) => {
@@ -545,21 +556,32 @@ impl SetDisks {
part_numbers: &[usize],
read_quorum: usize,
) -> disk::error::Result<Vec<ObjectPartInfo>> {
let mut futures = Vec::with_capacity(disks.len());
for (i, disk) in disks.iter().enumerate() {
futures.push(async move {
if let Some(disk) = disk {
disk.read_parts(bucket, part_meta_paths).await
} else {
Err(DiskError::DiskNotFound)
}
});
}
let mut errs = Vec::with_capacity(disks.len());
let mut object_parts = Vec::with_capacity(disks.len());
let results = join_all(futures).await;
// Use batch processor for better performance
let processor = get_global_processors().read_processor();
let bucket = bucket.to_string();
let part_meta_paths = part_meta_paths.to_vec();
let tasks: Vec<_> = disks
.iter()
.map(|disk| {
let disk = disk.clone();
let bucket = bucket.clone();
let part_meta_paths = part_meta_paths.clone();
async move {
if let Some(disk) = disk {
disk.read_parts(&bucket, &part_meta_paths).await
} else {
Err(DiskError::DiskNotFound)
}
}
})
.collect();
let results = processor.execute_batch(tasks).await;
for result in results {
match result {
Ok(res) => {
@@ -1369,22 +1391,71 @@ impl SetDisks {
})
});
// Wait for all tasks to complete
let results = join_all(futures).await;
for result in results {
match result? {
Ok(res) => {
ress.push(res);
errors.push(None);
}
Err(e) => {
match result {
Ok(res) => match res {
Ok(file_info) => {
ress.push(file_info);
errors.push(None);
}
Err(e) => {
ress.push(FileInfo::default());
errors.push(Some(e));
}
},
Err(_) => {
ress.push(FileInfo::default());
errors.push(Some(e));
errors.push(Some(DiskError::Unexpected));
}
}
}
Ok((ress, errors))
}
// Optimized version using batch processor with quorum support
pub async fn read_version_optimized(
&self,
bucket: &str,
object: &str,
version_id: &str,
opts: &ReadOptions,
) -> Result<Vec<rustfs_filemeta::FileInfo>> {
// Use existing disk selection logic
let disks = self.disks.read().await;
let required_reads = self.format.erasure.sets.len();
// Clone parameters outside the closure to avoid lifetime issues
let bucket = bucket.to_string();
let object = object.to_string();
let version_id = version_id.to_string();
let opts = opts.clone();
let processor = get_global_processors().read_processor();
let tasks: Vec<_> = disks
.iter()
.take(required_reads + 2) // Read a few extra for reliability
.filter_map(|disk| {
disk.as_ref().map(|d| {
let disk = d.clone();
let bucket = bucket.clone();
let object = object.clone();
let version_id = version_id.clone();
let opts = opts.clone();
async move { disk.read_version(&bucket, &bucket, &object, &version_id, &opts).await }
})
})
.collect();
match processor.execute_batch_with_quorum(tasks, required_reads).await {
Ok(results) => Ok(results),
Err(_) => Err(DiskError::FileNotFound.into()), // Use existing error type
}
}
async fn read_all_xl(
disks: &[Option<DiskStore>],
bucket: &str,
@@ -1403,10 +1474,11 @@ impl SetDisks {
object: &str,
read_data: bool,
) -> (Vec<Option<RawFileInfo>>, Vec<Option<DiskError>>) {
let mut futures = Vec::with_capacity(disks.len());
let mut ress = Vec::with_capacity(disks.len());
let mut errors = Vec::with_capacity(disks.len());
let mut futures = Vec::with_capacity(disks.len());
for disk in disks.iter() {
futures.push(async move {
if let Some(disk) = disk {

View File

@@ -302,17 +302,19 @@ impl TierConfigMgr {
}
pub async fn get_driver<'a>(&'a mut self, tier_name: &str) -> std::result::Result<&'a WarmBackendImpl, AdminError> {
Ok(match self.driver_cache.entry(tier_name.to_string()) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => {
let t = self.tiers.get(tier_name);
if t.is_none() {
return Err(ERR_TIER_NOT_FOUND.clone());
}
let d = new_warm_backend(t.expect("err"), false).await?;
e.insert(d)
}
})
// Return cached driver if present
if self.driver_cache.contains_key(tier_name) {
return Ok(self.driver_cache.get(tier_name).unwrap());
}
// Get tier configuration and create new driver
let tier_config = self.tiers.get(tier_name).ok_or_else(|| ERR_TIER_NOT_FOUND.clone())?;
let driver = new_warm_backend(tier_config, false).await?;
// Insert and return reference
self.driver_cache.insert(tier_name.to_string(), driver);
Ok(self.driver_cache.get(tier_name).unwrap())
}
pub async fn reload(&mut self, api: Arc<ECStore>) -> std::result::Result<(), std::io::Error> {

View File

@@ -47,3 +47,4 @@ smallvec = "1.11"
smartstring = "1.0"
crossbeam-queue = "0.3"
heapless = "0.8"
pprof.workspace = true

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Serialize, de::Error};
use time::OffsetDateTime;
use super::Policy;
@@ -59,15 +59,17 @@ impl TryFrom<Vec<u8>> for PolicyDoc {
type Error = serde_json::Error;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
match serde_json::from_slice::<PolicyDoc>(&value) {
Ok(res) => Ok(res),
Err(err) => match serde_json::from_slice::<Policy>(&value) {
Ok(res2) => Ok(Self {
policy: res2,
..Default::default()
}),
Err(_) => Err(err),
},
// Try to parse as PolicyDoc first
if let Ok(policy_doc) = serde_json::from_slice::<PolicyDoc>(&value) {
return Ok(policy_doc);
}
// Fall back to parsing as Policy and wrap in PolicyDoc
serde_json::from_slice::<Policy>(&value)
.map(|policy| Self {
policy,
..Default::default()
})
.map_err(|_| serde_json::Error::custom("Failed to parse as PolicyDoc or Policy".to_string()))
}
}

View File

@@ -16,17 +16,40 @@ use bytes::Bytes;
use futures::pin_mut;
use futures::{Stream, StreamExt};
use std::net::Ipv6Addr;
use std::sync::LazyLock;
use std::sync::{LazyLock, Mutex};
use std::{
collections::HashSet,
collections::{HashMap, HashSet},
fmt::Display,
net::{IpAddr, SocketAddr, TcpListener, ToSocketAddrs},
time::{Duration, Instant},
};
use transform_stream::AsyncTryStream;
use url::{Host, Url};
static LOCAL_IPS: LazyLock<Vec<IpAddr>> = LazyLock::new(|| must_get_local_ips().unwrap());
#[derive(Debug, Clone)]
struct DnsCacheEntry {
ips: HashSet<IpAddr>,
cached_at: Instant,
}
impl DnsCacheEntry {
fn new(ips: HashSet<IpAddr>) -> Self {
Self {
ips,
cached_at: Instant::now(),
}
}
fn is_expired(&self, ttl: Duration) -> bool {
self.cached_at.elapsed() > ttl
}
}
static DNS_CACHE: LazyLock<Mutex<HashMap<String, DnsCacheEntry>>> = LazyLock::new(|| Mutex::new(HashMap::new()));
const DNS_CACHE_TTL: Duration = Duration::from_secs(300); // 5 minutes
/// helper for validating if the provided arg is an ip address.
pub fn is_socket_addr(addr: &str) -> bool {
// TODO IPv6 zone information?
@@ -87,19 +110,19 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> std::io::R
}
/// returns IP address of given host using layered DNS resolution.
pub fn get_host_ip_async(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
///
/// This is the async version of `get_host_ip()` that provides enhanced DNS resolution
/// with Kubernetes support when the "net" feature is enabled.
pub async fn get_host_ip_async(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
match host {
Host::Domain(domain) => {
#[cfg(feature = "net")]
{
use crate::dns_resolver::resolve_domain;
let handle = tokio::runtime::Handle::current();
handle.block_on(async {
match resolve_domain(domain).await {
Ok(ips) => Ok(ips.into_iter().collect()),
Err(e) => Err(std::io::Error::other(format!("DNS resolution failed: {}", e))),
}
})
match resolve_domain(domain).await {
Ok(ips) => Ok(ips.into_iter().collect()),
Err(e) => Err(std::io::Error::other(format!("DNS resolution failed: {}", e))),
}
}
#[cfg(not(feature = "net"))]
{
@@ -128,17 +151,41 @@ pub fn get_host_ip_async(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
/// returns IP address of given host using standard resolution.
///
/// **Note**: This function uses standard library DNS resolution.
/// **Note**: This function uses standard library DNS resolution with caching.
/// For enhanced DNS resolution with Kubernetes support, use `get_host_ip_async()`.
pub fn get_host_ip(host: Host<&str>) -> std::io::Result<HashSet<IpAddr>> {
match host {
Host::Domain(domain) => match (domain, 0)
.to_socket_addrs()
.map(|v| v.map(|v| v.ip()).collect::<HashSet<_>>())
{
Ok(ips) => Ok(ips),
Err(err) => Err(std::io::Error::other(err)),
},
Host::Domain(domain) => {
// Check cache first
if let Ok(mut cache) = DNS_CACHE.lock() {
if let Some(entry) = cache.get(domain) {
if !entry.is_expired(DNS_CACHE_TTL) {
return Ok(entry.ips.clone());
}
// Remove expired entry
cache.remove(domain);
}
}
// Perform DNS resolution
match (domain, 0)
.to_socket_addrs()
.map(|v| v.map(|v| v.ip()).collect::<HashSet<_>>())
{
Ok(ips) => {
// Cache the result
if let Ok(mut cache) = DNS_CACHE.lock() {
cache.insert(domain.to_string(), DnsCacheEntry::new(ips.clone()));
// Limit cache size to prevent memory bloat
if cache.len() > 1000 {
cache.retain(|_, v| !v.is_expired(DNS_CACHE_TTL));
}
}
Ok(ips)
}
Err(err) => Err(std::io::Error::other(err)),
}
}
Host::Ipv4(ip) => {
let mut set = HashSet::with_capacity(1);
set.insert(IpAddr::V4(ip));

326
docs/PERFORMANCE_TESTING.md Normal file
View File

@@ -0,0 +1,326 @@
# RustFS 性能测试指南
本文档提供了对 RustFS 进行性能测试和性能分析的完整方法和工具。
## 概览
RustFS 提供了多种性能测试和分析工具:
1. **性能分析Profiling** - 使用内置的 pprof 接口收集 CPU 性能数据
2. **负载测试Load Testing** - 使用多种客户端工具模拟高并发请求
3. **监控和分析** - 查看性能指标和识别性能瓶颈
## 前置条件
### 1. 启用性能分析
在启动 RustFS 时,需要设置环境变量启用性能分析功能:
```bash
export RUSTFS_ENABLE_PROFILING=true
./rustfs
```
### 2. 安装依赖工具
确保系统中安装了以下工具:
```bash
# 基础工具
curl # HTTP 请求
jq # JSON 处理 (可选)
# 分析工具
go # Go pprof 工具 (可选,用于 protobuf 格式)
python3 # Python 负载测试脚本
# macOS 用户
brew install curl jq go python3
# Ubuntu/Debian 用户
sudo apt-get install curl jq golang-go python3
```
## 性能测试方法
### 方法 1使用专业脚本推荐
项目提供了完整的性能分析脚本:
```bash
# 查看脚本帮助
./scripts/profile_rustfs.sh help
# 检查性能分析状态
./scripts/profile_rustfs.sh status
# 收集火焰图30秒
./scripts/profile_rustfs.sh flamegraph
# 收集 protobuf 格式性能数据
./scripts/profile_rustfs.sh protobuf
# 收集两种格式的性能数据
./scripts/profile_rustfs.sh both
# 自定义参数
./scripts/profile_rustfs.sh -d 60 -u http://192.168.1.100:9000 both
```
### 方法 2使用 Python 综合测试
Python 脚本提供了负载测试和性能分析的一体化解决方案:
```bash
# 运行综合性能分析
python3 test_load.py
```
此脚本会:
1. 启动后台负载测试(多线程 S3 操作)
2. 并行收集性能分析数据
3. 生成火焰图用于分析
### 方法 3使用简单负载测试
对于快速测试,可以使用 bash 脚本:
```bash
# 运行简单负载测试
./simple_load_test.sh
```
## 性能分析输出格式
### 1. 火焰图SVG 格式)
- **用途**: 可视化 CPU 使用情况
- **文件**: `rustfs_profile_TIMESTAMP.svg`
- **查看方式**: 使用浏览器打开 SVG 文件
- **分析要点**:
- 宽度表示 CPU 使用时间
- 高度表示调用栈深度
- 点击可以放大特定函数
```bash
# 在浏览器中打开
open profiles/rustfs_profile_20240911_143000.svg
```
### 2. Protobuf 格式
- **用途**: 使用 Go pprof 工具进行详细分析
- **文件**: `rustfs_profile_TIMESTAMP.pb`
- **分析工具**: `go tool pprof`
```bash
# 使用 Go pprof 分析
go tool pprof profiles/rustfs_profile_20240911_143000.pb
# pprof 常用命令
(pprof) top # 显示 CPU 使用率最高的函数
(pprof) list func # 显示指定函数的源代码
(pprof) web # 生成 web 界面(需要 graphviz
(pprof) png # 生成 PNG 图片
(pprof) help # 查看所有命令
```
## API 接口使用
### 检查性能分析状态
```bash
curl "http://127.0.0.1:9000/rustfs/admin/debug/pprof/status"
```
返回示例:
```json
{
"enabled": "true",
"sampling_rate": "100"
}
```
### 收集性能数据
```bash
# 收集 30 秒的火焰图
curl "http://127.0.0.1:9000/rustfs/admin/debug/pprof/profile?seconds=30&format=flamegraph" \
-o profile.svg
# 收集 protobuf 格式数据
curl "http://127.0.0.1:9000/rustfs/admin/debug/pprof/profile?seconds=30&format=protobuf" \
-o profile.pb
```
**参数说明**:
- `seconds`: 收集时长1-300 秒)
- `format`: 输出格式(`flamegraph`/`svg``protobuf`/`pb`
## 负载测试场景
### 1. S3 API 负载测试
使用 Python 脚本进行完整的 S3 操作负载测试:
```python
# 基本配置
tester = S3LoadTester(
endpoint="http://127.0.0.1:9000",
access_key="rustfsadmin",
secret_key="rustfsadmin"
)
# 运行负载测试
# 4 个线程,每个线程执行 10 次操作
tester.run_load_test(num_threads=4, operations_per_thread=10)
```
每次操作包括:
1. 上传 1MB 对象
2. 下载对象
3. 删除对象
### 2. 自定义负载测试
```bash
# 创建测试桶
curl -X PUT "http://127.0.0.1:9000/test-bucket"
# 并发上传测试
for i in {1..10}; do
echo "test data $i" | curl -X PUT "http://127.0.0.1:9000/test-bucket/object-$i" -d @- &
done
wait
# 并发下载测试
for i in {1..10}; do
curl "http://127.0.0.1:9000/test-bucket/object-$i" > /dev/null &
done
wait
```
## 性能分析最佳实践
### 1. 测试环境准备
- 确保 RustFS 已启用性能分析: `RUSTFS_ENABLE_PROFILING=true`
- 使用独立的测试环境,避免其他程序干扰
- 确保有足够的磁盘空间存储分析文件
### 2. 数据收集建议
- **预热阶段**: 先运行 5-10 分钟的轻量负载
- **数据收集**: 在稳定负载下收集 30-60 秒的性能数据
- **多次采样**: 收集多个样本进行对比分析
### 3. 分析重点
在火焰图中重点关注:
1. **宽度最大的函数** - CPU 使用时间最长
2. **平顶函数** - 可能的性能瓶颈
3. **深度调用栈** - 可能的递归或复杂逻辑
4. **意外的系统调用** - I/O 或内存分配问题
### 4. 常见性能问题
- **锁竞争**: 查找 `std::sync` 相关函数
- **内存分配**: 查找 `alloc` 相关函数
- **I/O 等待**: 查找文件系统或网络 I/O 函数
- **序列化开销**: 查找 JSON/XML 解析函数
## 故障排除
### 1. 性能分析未启用
错误信息:`{"enabled":"false"}`
解决方案:
```bash
export RUSTFS_ENABLE_PROFILING=true
# 重启 RustFS
```
### 2. 连接被拒绝
错误信息:`Connection refused`
检查项:
- RustFS 是否正在运行
- 端口是否正确(默认 9000
- 防火墙设置
### 3. 分析文件过大
如果生成的分析文件过大:
- 减少收集时间(如 15-30 秒)
- 降低负载测试的并发度
- 使用 protobuf 格式而非 SVG
## 配置参数
### 环境变量
| 变量 | 默认值 | 描述 |
|------|--------|------|
| `RUSTFS_ENABLE_PROFILING` | `false` | 启用性能分析 |
| `RUSTFS_URL` | `http://127.0.0.1:9000` | RustFS 服务器地址 |
| `PROFILE_DURATION` | `30` | 性能数据收集时长(秒) |
| `OUTPUT_DIR` | `./profiles` | 输出文件目录 |
### 脚本参数
```bash
./scripts/profile_rustfs.sh [OPTIONS] [COMMAND]
OPTIONS:
-u, --url URL RustFS URL
-d, --duration SECONDS Profile duration
-o, --output DIR Output directory
COMMANDS:
status 检查状态
flamegraph 收集火焰图
protobuf 收集 protobuf 数据
both 收集两种格式(默认)
```
## 输出文件位置
- **脚本输出**: `./profiles/` 目录
- **Python 脚本**: `/tmp/rustfs_profiles/` 目录
- **文件命名**: `rustfs_profile_TIMESTAMP.{svg|pb}`
## 示例工作流程
1. **启动 RustFS**:
```bash
RUSTFS_ENABLE_PROFILING=true ./rustfs
```
2. **验证性能分析可用**:
```bash
./scripts/profile_rustfs.sh status
```
3. **开始负载测试**:
```bash
python3 test_load.py &
```
4. **收集性能数据**:
```bash
./scripts/profile_rustfs.sh -d 60 both
```
5. **分析结果**:
```bash
# 查看火焰图
open profiles/rustfs_profile_*.svg
# 或使用 pprof 分析
go tool pprof profiles/rustfs_profile_*.pb
```
通过这个完整的性能测试流程,你可以系统地分析 RustFS 的性能特征,识别瓶颈,并进行有针对性的优化。

View File

@@ -107,6 +107,7 @@ url = { workspace = true }
urlencoding = { workspace = true }
uuid = { workspace = true }
zip = { workspace = true }
pprof.workspace = true
[target.'cfg(any(target_os = "macos", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))'.dependencies]
sysctl = { workspace = true }

View File

@@ -81,6 +81,7 @@ pub mod sts;
pub mod tier;
pub mod trace;
pub mod user;
use pprof::protos::Message;
use urlencoding::decode;
#[allow(dead_code)]
@@ -1233,6 +1234,149 @@ async fn count_bucket_objects(
}
}
pub struct ProfileHandler {}
#[async_trait::async_trait]
impl Operation for ProfileHandler {
async fn call(&self, req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
use crate::profiling;
if !profiling::is_profiler_enabled() {
return Ok(S3Response::new((
StatusCode::SERVICE_UNAVAILABLE,
Body::from("Profiler not enabled. Set RUSTFS_ENABLE_PROFILING=true to enable profiling".to_string()),
)));
}
let queries = extract_query_params(&req.uri);
let seconds = queries.get("seconds").and_then(|s| s.parse::<u64>().ok()).unwrap_or(30);
let format = queries.get("format").cloned().unwrap_or_else(|| "protobuf".to_string());
if seconds > 300 {
return Ok(S3Response::new((
StatusCode::BAD_REQUEST,
Body::from("Profile duration cannot exceed 300 seconds".to_string()),
)));
}
let guard = match profiling::get_profiler_guard() {
Some(guard) => guard,
None => {
return Ok(S3Response::new((
StatusCode::SERVICE_UNAVAILABLE,
Body::from("Profiler not initialized".to_string()),
)));
}
};
info!("Starting CPU profile collection for {} seconds", seconds);
tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
let guard_lock = match guard.lock() {
Ok(guard) => guard,
Err(_) => {
error!("Failed to acquire profiler guard lock");
return Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from("Failed to acquire profiler lock".to_string()),
)));
}
};
let report = match guard_lock.report().build() {
Ok(report) => report,
Err(e) => {
error!("Failed to build profiler report: {}", e);
return Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from(format!("Failed to build profile report: {}", e)),
)));
}
};
info!("CPU profile collection completed");
match format.as_str() {
"protobuf" | "pb" => {
let profile = report.pprof().unwrap();
let mut body = Vec::new();
if let Err(e) = profile.write_to_vec(&mut body) {
error!("Failed to serialize protobuf profile: {}", e);
return Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from("Failed to serialize profile".to_string()),
)));
}
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, "application/octet-stream".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(body)), headers))
}
"flamegraph" | "svg" => {
let mut flamegraph_buf = Vec::new();
match report.flamegraph(&mut flamegraph_buf) {
Ok(()) => (),
Err(e) => {
error!("Failed to generate flamegraph: {}", e);
return Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from(format!("Failed to generate flamegraph: {}", e)),
)));
}
};
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, "image/svg+xml".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(flamegraph_buf)), headers))
}
_ => Ok(S3Response::new((
StatusCode::BAD_REQUEST,
Body::from("Unsupported format. Use 'protobuf' or 'flamegraph'".to_string()),
))),
}
}
}
pub struct ProfileStatusHandler {}
#[async_trait::async_trait]
impl Operation for ProfileStatusHandler {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
use crate::profiling;
use std::collections::HashMap;
let status = if profiling::is_profiler_enabled() {
HashMap::from([
("enabled", "true"),
("status", "running"),
("supported_formats", "protobuf, flamegraph"),
("max_duration_seconds", "300"),
("endpoint", "/rustfs/admin/debug/pprof/profile"),
])
} else {
HashMap::from([
("enabled", "false"),
("status", "disabled"),
("message", "Set RUSTFS_ENABLE_PROFILING=true to enable profiling"),
])
};
match serde_json::to_string(&status) {
Ok(json) => {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::from(json)), headers))
}
Err(e) => {
error!("Failed to serialize status: {}", e);
Ok(S3Response::new((
StatusCode::INTERNAL_SERVER_ERROR,
Body::from("Failed to serialize status".to_string()),
)))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -214,6 +214,18 @@ pub fn make_admin_route(console_enabled: bool) -> std::io::Result<impl S3Route>
AdminOperation(&RemoveRemoteTargetHandler {}),
)?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/debug/pprof/profile").as_str(),
AdminOperation(&handlers::ProfileHandler {}),
)?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/debug/pprof/status").as_str(),
AdminOperation(&handlers::ProfileStatusHandler {}),
)?;
Ok(r)
}

View File

@@ -18,6 +18,7 @@ mod config;
mod error;
// mod grpc;
pub mod license;
mod profiling;
mod server;
mod storage;
mod update;
@@ -94,6 +95,9 @@ async fn main() -> Result<()> {
// Store in global storage
set_global_guard(guard).map_err(Error::other)?;
// Initialize performance profiling if enabled
profiling::start_profiling_if_enabled();
// Run parameters
run(opt).await
}
@@ -102,10 +106,12 @@ async fn main() -> Result<()> {
async fn run(opt: config::Opt) -> Result<()> {
debug!("opt: {:?}", &opt);
// Initialize global DNS resolver early for enhanced DNS resolution
if let Err(e) = init_global_dns_resolver().await {
warn!("Failed to initialize global DNS resolver: {}. Using standard DNS resolution.", e);
}
// Initialize global DNS resolver early for enhanced DNS resolution (concurrent)
let dns_init = tokio::spawn(async {
if let Err(e) = init_global_dns_resolver().await {
warn!("Failed to initialize global DNS resolver: {}. Using standard DNS resolution.", e);
}
});
if let Some(region) = &opt.region {
rustfs_ecstore::global::set_global_region(region.clone());
@@ -176,6 +182,9 @@ async fn run(opt: config::Opt) -> Result<()> {
// Initialize event notifier
init_event_notifier().await;
// Wait for DNS initialization to complete before network-heavy operations
dns_init.await.map_err(Error::other)?;
let buckets_list = store
.list_bucket(&BucketOptions {
no_metadata: true,
@@ -187,14 +196,31 @@ async fn run(opt: config::Opt) -> Result<()> {
// Collect bucket names into a vector
let buckets: Vec<String> = buckets_list.into_iter().map(|v| v.name).collect();
// Initialize the bucket metadata system
init_bucket_metadata_sys(store.clone(), buckets.clone()).await;
// Parallelize initialization tasks for better network performance
let bucket_metadata_task = tokio::spawn({
let store = store.clone();
let buckets = buckets.clone();
async move {
init_bucket_metadata_sys(store, buckets).await;
}
});
// Initialize the IAM system
init_iam_sys(store.clone()).await?;
let iam_init_task = tokio::spawn({
let store = store.clone();
async move { init_iam_sys(store).await }
});
// add bucket notification configuration
add_bucket_notification_configuration(buckets).await;
let notification_config_task = tokio::spawn({
let buckets = buckets.clone();
async move {
add_bucket_notification_configuration(buckets).await;
}
});
// Wait for all parallel initialization tasks to complete
bucket_metadata_task.await.map_err(Error::other)?;
iam_init_task.await.map_err(Error::other)??;
notification_config_task.await.map_err(Error::other)?;
// Initialize the global notification system
new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| {
@@ -239,12 +265,13 @@ async fn run(opt: config::Opt) -> Result<()> {
// initialize bucket replication pool
init_bucket_replication_pool().await;
// Async update check (optional)
// Async update check with timeout (optional)
tokio::spawn(async {
use crate::update::{UpdateCheckError, check_updates};
match check_updates().await {
Ok(result) => {
// Add timeout to prevent hanging network calls
match tokio::time::timeout(std::time::Duration::from_secs(30), check_updates()).await {
Ok(Ok(result)) => {
if result.update_available {
if let Some(latest) = &result.latest_version {
info!(
@@ -262,12 +289,15 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("✅ Version check: Current version is up to date: {}", result.current_version);
}
}
Err(UpdateCheckError::HttpError(e)) => {
Ok(Err(UpdateCheckError::HttpError(e))) => {
debug!("Version check: network error (this is normal): {}", e);
}
Err(e) => {
Ok(Err(e)) => {
debug!("Version check: failed (this is normal): {}", e);
}
Err(_) => {
debug!("Version check: timeout after 30 seconds (this is normal)");
}
}
});
@@ -364,14 +394,12 @@ async fn init_event_notifier() {
info!("Event notifier configuration found, proceeding with initialization.");
// 3. Initialize the notification system asynchronously with a global configuration
// Put it into a separate task to avoid blocking the main initialization process
tokio::spawn(async move {
if let Err(e) = rustfs_notify::initialize(server_config).await {
error!("Failed to initialize event notifier system: {}", e);
} else {
info!("Event notifier system initialized successfully.");
}
});
// Use direct await for better error handling and faster initialization
if let Err(e) = rustfs_notify::initialize(server_config).await {
error!("Failed to initialize event notifier system: {}", e);
} else {
info!("Event notifier system initialized successfully.");
}
}
#[instrument(skip_all)]

63
rustfs/src/profiling.rs Normal file
View File

@@ -0,0 +1,63 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use pprof::ProfilerGuard;
use std::sync::{Arc, Mutex, OnceLock};
use tracing::info;
static PROFILER_GUARD: OnceLock<Arc<Mutex<ProfilerGuard<'static>>>> = OnceLock::new();
pub fn init_profiler() -> Result<(), Box<dyn std::error::Error>> {
let guard = pprof::ProfilerGuardBuilder::default()
.frequency(1000)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|e| format!("Failed to build profiler guard: {}", e))?;
PROFILER_GUARD
.set(Arc::new(Mutex::new(guard)))
.map_err(|_| "Failed to set profiler guard (already initialized)")?;
info!("Performance profiler initialized");
Ok(())
}
pub fn is_profiler_enabled() -> bool {
PROFILER_GUARD.get().is_some()
}
pub fn get_profiler_guard() -> Option<Arc<Mutex<ProfilerGuard<'static>>>> {
PROFILER_GUARD.get().cloned()
}
pub fn start_profiling_if_enabled() {
let enable_profiling = std::env::var("RUSTFS_ENABLE_PROFILING")
.unwrap_or_else(|_| "false".to_string())
.parse::<bool>()
.unwrap_or(false);
if enable_profiling {
match init_profiler() {
Ok(()) => {
info!("Performance profiling enabled via RUSTFS_ENABLE_PROFILING environment variable");
}
Err(e) => {
tracing::error!("Failed to initialize profiler: {}", e);
info!("Performance profiling disabled due to initialization error");
}
}
} else {
info!("Performance profiling disabled. Set RUSTFS_ENABLE_PROFILING=true to enable");
}
}

View File

@@ -157,6 +157,8 @@ pub async fn start_http_server(
b.build()
};
// Server will be created per connection - this ensures isolation
tokio::spawn(async move {
// Record the PID-related metrics of the current process
let meter = opentelemetry::global::meter("system");