diff --git a/Cargo.lock b/Cargo.lock index b4e85ffe..a193bcc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,6 +75,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "aligned-vec" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc890384c8602f339876ded803c97ad529f3842aba97f6392b3dba0dd171769b" +dependencies = [ + "equator", +] + [[package]] name = "alloc-no-stdlib" version = "2.0.4" @@ -1645,6 +1654,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpp_demangle" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96e58d342ad113c2b878f16d5d034c03be492ae460cdbc02b7f0f2284d310c7d" +dependencies = [ + "cfg-if", +] + [[package]] name = "cpufeatures" version = "0.2.17" @@ -2439,6 +2457,15 @@ dependencies = [ "sqlparser", ] +[[package]] +name = "debugid" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef552e6f588e446098f6ba40d89ac146c8c7b64aade83c051ee00bb5d2bc18d" +dependencies = [ + "uuid", +] + [[package]] name = "deflate64" version = "0.1.9" @@ -2696,6 +2723,26 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "equator" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4711b213838dfee0117e3be6ac926007d7f433d7bbe33595975d4190cb07e6fc" +dependencies = [ + "equator-macro", +] + +[[package]] +name = "equator-macro" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -2785,6 +2832,18 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "findshlibs" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64" +dependencies = [ + "cc", + "lazy_static", + "libc", + "winapi", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -2856,7 +2915,7 @@ checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" dependencies = [ "futures-core", "futures-sink", - "spin", + "spin 0.9.8", ] [[package]] @@ -3620,6 +3679,24 @@ dependencies = [ "hashbrown 0.15.5", ] +[[package]] +name = "inferno" +version = "0.11.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88" +dependencies = [ + "ahash", + "indexmap", + "is-terminal", + "itoa", + "log", + "num-format", + "once_cell", + "quick-xml 0.26.0", + "rgb", + "str_stack", +] + [[package]] name = "inlinable_string" version = "0.1.15" @@ -3709,6 +3786,17 @@ dependencies = [ "serde", ] +[[package]] +name = "is-terminal" +version = "0.4.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "is_debug" version = "1.1.0" @@ -3815,7 +3903,7 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" dependencies = [ - "spin", + "spin 0.9.8", ] [[package]] @@ -4115,6 +4203,15 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "memmap2" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843a98750cd611cc2965a8213b53b43e715f13c37a9e096c6408e69990961db7" +dependencies = [ + "libc", +] + [[package]] name = "memoffset" version = "0.9.1" @@ -4230,6 +4327,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags 1.3.2", + "cfg-if", + "libc", +] + [[package]] name = "nix" version = "0.29.0" @@ -4393,6 +4501,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-format" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3" +dependencies = [ + "arrayvec", + "itoa", +] + [[package]] name = "num-integer" version = "0.1.46" @@ -5044,6 +5162,30 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "pprof" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38a01da47675efa7673b032bf8efd8214f1917d89685e07e395ab125ea42b187" +dependencies = [ + "aligned-vec", + "backtrace", + "cfg-if", + "findshlibs", + "inferno", + "libc", + "log", + "nix 0.26.4", + "once_cell", + "protobuf", + "protobuf-codegen", + "smallvec", + "spin 0.10.0", + "symbolic-demangle", + "tempfile", + "thiserror 2.0.16", +] + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -5181,6 +5323,57 @@ dependencies = [ "prost 0.14.1", ] +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-codegen" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace" +dependencies = [ + "anyhow", + "once_cell", + "protobuf", + "protobuf-parse", + "regex", + "tempfile", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-parse" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973" +dependencies = [ + "anyhow", + "indexmap", + "log", + "protobuf", + "protobuf-support", + "tempfile", + "thiserror 1.0.69", + "which", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "psm" version = "0.1.26" @@ -5210,6 +5403,15 @@ dependencies = [ "pulldown-cmark", ] +[[package]] +name = "quick-xml" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd" +dependencies = [ + "memchr", +] + [[package]] name = "quick-xml" version = "0.37.5" @@ -5592,6 +5794,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rgb" +version = "0.8.52" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6a884d2998352bb4daf0183589aec883f16a6da1f4dde84d8e2e9a5409a1ce" +dependencies = [ + "bytemuck", +] + [[package]] name = "ring" version = "0.17.14" @@ -5786,6 +5997,7 @@ dependencies = [ "opentelemetry", "percent-encoding", "pin-project-lite", + "pprof", "reqwest", "rust-embed", "rustfs-ahm", @@ -5981,9 +6193,11 @@ dependencies = [ "hyper-util", "lazy_static", "md-5", + "moka", "nix 0.30.1", "num_cpus", "once_cell", + "parking_lot", "path-absolutize", "pin-project-lite", "quick-xml 0.38.3", @@ -6086,6 +6300,7 @@ dependencies = [ "heapless", "once_cell", "parking_lot", + "pprof", "rustfs-protos", "serde", "serde_json", @@ -7146,6 +7361,15 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spin" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.6.0" @@ -7223,6 +7447,12 @@ dependencies = [ "thiserror 2.0.16", ] +[[package]] +name = "str_stack" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb" + [[package]] name = "strsim" version = "0.11.1" @@ -7334,6 +7564,29 @@ dependencies = [ "sval_nested", ] +[[package]] +name = "symbolic-common" +version = "12.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9da12f8fecbbeaa1ee62c1d50dc656407e007c3ee7b2a41afce4b5089eaef15e" +dependencies = [ + "debugid", + "memmap2", + "stable_deref_trait", + "uuid", +] + +[[package]] +name = "symbolic-demangle" +version = "12.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fd35afe0ef9d35d3dcd41c67ddf882fc832a387221338153b7cd685a105495c" +dependencies = [ + "cpp_demangle", + "rustc-demangle", + "symbolic-common", +] + [[package]] name = "syn" version = "1.0.109" diff --git a/Cargo.toml b/Cargo.toml index cbda4991..624535c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -176,6 +176,7 @@ opentelemetry-semantic-conventions = { version = "0.30.0", features = [ parking_lot = "0.12.4" path-absolutize = "3.1.1" path-clean = "1.0.1" +pprof = { version = "0.15.0", features = ["flamegraph", "protobuf-codec"] } blake3 = { version = "1.8.2" } pbkdf2 = "0.12.2" percent-encoding = "2.3.2" diff --git a/crates/ecstore/Cargo.toml b/crates/ecstore/Cargo.toml index 37559a18..1ed2edbd 100644 --- a/crates/ecstore/Cargo.toml +++ b/crates/ecstore/Cargo.toml @@ -101,6 +101,8 @@ rustfs-signer.workspace = true rustfs-checksums.workspace = true futures-util.workspace = true async-recursion.workspace = true +parking_lot = "0.12" +moka = { version = "0.12", features = ["future"] } [target.'cfg(not(windows))'.dependencies] nix = { workspace = true } diff --git a/crates/ecstore/src/batch_processor.rs b/crates/ecstore/src/batch_processor.rs new file mode 100644 index 00000000..c3c3ee7a --- /dev/null +++ b/crates/ecstore/src/batch_processor.rs @@ -0,0 +1,231 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! High-performance batch processor using JoinSet +//! +//! This module provides optimized batching utilities to reduce async runtime overhead +//! and improve concurrent operation performance. + +use crate::disk::error::{Error, Result}; +use std::future::Future; +use std::sync::Arc; +use tokio::task::JoinSet; + +/// Batch processor that executes tasks concurrently with a semaphore +pub struct AsyncBatchProcessor { + max_concurrent: usize, +} + +impl AsyncBatchProcessor { + pub fn new(max_concurrent: usize) -> Self { + Self { max_concurrent } + } + + /// Execute a batch of tasks concurrently with concurrency control + pub async fn execute_batch(&self, tasks: Vec) -> Vec> + where + T: Send + 'static, + F: Future> + Send + 'static, + { + if tasks.is_empty() { + return Vec::new(); + } + + let semaphore = Arc::new(tokio::sync::Semaphore::new(self.max_concurrent)); + let mut join_set = JoinSet::new(); + let mut results = Vec::with_capacity(tasks.len()); + for _ in 0..tasks.len() { + results.push(Err(Error::other("Not completed"))); + } + + // Spawn all tasks with semaphore control + for (i, task) in tasks.into_iter().enumerate() { + let sem = semaphore.clone(); + join_set.spawn(async move { + let _permit = sem.acquire().await.map_err(|_| Error::other("Semaphore error"))?; + let result = task.await; + Ok::<(usize, Result), Error>((i, result)) + }); + } + + // Collect results + while let Some(join_result) = join_set.join_next().await { + match join_result { + Ok(Ok((index, task_result))) => { + if index < results.len() { + results[index] = task_result; + } + } + Ok(Err(e)) => { + // Semaphore or other system error - this is rare + tracing::warn!("Batch processor system error: {:?}", e); + } + Err(join_error) => { + // Task panicked - log but continue + tracing::warn!("Task panicked in batch processor: {:?}", join_error); + } + } + } + + results + } + + /// Execute batch with early termination when sufficient successful results are obtained + pub async fn execute_batch_with_quorum(&self, tasks: Vec, required_successes: usize) -> Result> + where + T: Send + 'static, + F: Future> + Send + 'static, + { + let results = self.execute_batch(tasks).await; + let mut successes = Vec::new(); + + for value in results.into_iter().flatten() { + successes.push(value); + if successes.len() >= required_successes { + return Ok(successes); + } + } + + if successes.len() >= required_successes { + Ok(successes) + } else { + Err(Error::other(format!( + "Insufficient successful results: got {}, needed {}", + successes.len(), + required_successes + ))) + } + } +} + +/// Global batch processor instances +pub struct GlobalBatchProcessors { + read_processor: AsyncBatchProcessor, + write_processor: AsyncBatchProcessor, + metadata_processor: AsyncBatchProcessor, +} + +impl GlobalBatchProcessors { + pub fn new() -> Self { + Self { + read_processor: AsyncBatchProcessor::new(16), // Higher concurrency for reads + write_processor: AsyncBatchProcessor::new(8), // Lower concurrency for writes + metadata_processor: AsyncBatchProcessor::new(12), // Medium concurrency for metadata + } + } + + pub fn read_processor(&self) -> &AsyncBatchProcessor { + &self.read_processor + } + + pub fn write_processor(&self) -> &AsyncBatchProcessor { + &self.write_processor + } + + pub fn metadata_processor(&self) -> &AsyncBatchProcessor { + &self.metadata_processor + } +} + +impl Default for GlobalBatchProcessors { + fn default() -> Self { + Self::new() + } +} + +// Global instance +use std::sync::OnceLock; + +static GLOBAL_PROCESSORS: OnceLock = OnceLock::new(); + +pub fn get_global_processors() -> &'static GlobalBatchProcessors { + GLOBAL_PROCESSORS.get_or_init(GlobalBatchProcessors::new) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[tokio::test] + async fn test_batch_processor_basic() { + let processor = AsyncBatchProcessor::new(4); + + let tasks: Vec<_> = (0..10) + .map(|i| async move { + tokio::time::sleep(Duration::from_millis(10)).await; + Ok::(i) + }) + .collect(); + + let results = processor.execute_batch(tasks).await; + assert_eq!(results.len(), 10); + + // All tasks should succeed + for (i, result) in results.iter().enumerate() { + assert!(result.is_ok()); + assert_eq!(result.as_ref().unwrap(), &(i as i32)); + } + } + + #[tokio::test] + async fn test_batch_processor_with_errors() { + let processor = AsyncBatchProcessor::new(2); + + let tasks: Vec<_> = (0..5) + .map(|i| async move { + tokio::time::sleep(Duration::from_millis(10)).await; + if i % 2 == 0 { + Ok::(i) + } else { + Err(Error::other("Test error")) + } + }) + .collect(); + + let results = processor.execute_batch(tasks).await; + assert_eq!(results.len(), 5); + + // Check results pattern + for (i, result) in results.iter().enumerate() { + if i % 2 == 0 { + assert!(result.is_ok()); + assert_eq!(result.as_ref().unwrap(), &(i as i32)); + } else { + assert!(result.is_err()); + } + } + } + + #[tokio::test] + async fn test_batch_processor_quorum() { + let processor = AsyncBatchProcessor::new(4); + + let tasks: Vec<_> = (0..10) + .map(|i| async move { + tokio::time::sleep(Duration::from_millis(10)).await; + if i < 3 { + Ok::(i) + } else { + Err(Error::other("Test error")) + } + }) + .collect(); + + let results = processor.execute_batch_with_quorum(tasks, 2).await; + assert!(results.is_ok()); + let successes = results.unwrap(); + assert!(successes.len() >= 2); + } +} diff --git a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs index 1394e354..15c34b8a 100644 --- a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs +++ b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs @@ -321,7 +321,7 @@ impl ExpiryState { let mut state = GLOBAL_ExpiryState.write().await; while state.tasks_tx.len() < n { - let (tx, rx) = mpsc::channel(10000); + let (tx, rx) = mpsc::channel(1000); let api = api.clone(); let rx = Arc::new(tokio::sync::Mutex::new(rx)); state.tasks_tx.push(tx); @@ -432,7 +432,7 @@ pub struct TransitionState { impl TransitionState { #[allow(clippy::new_ret_no_self)] pub fn new() -> Arc { - let (tx1, rx1) = bounded(100000); + let (tx1, rx1) = bounded(1000); let (tx2, rx2) = bounded(1); Arc::new(Self { transition_tx: tx1, @@ -467,8 +467,12 @@ impl TransitionState { } pub async fn init(api: Arc) { - let mut n = 10; //globalAPIConfig.getTransitionWorkers(); - let tw = 10; //globalILMConfig.getTransitionWorkers(); + let max_workers = std::env::var("RUSTFS_MAX_TRANSITION_WORKERS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or_else(|| std::cmp::min(num_cpus::get() as i64, 16)); + let mut n = max_workers; + let tw = 8; //globalILMConfig.getTransitionWorkers(); if tw > 0 { n = tw; } @@ -561,8 +565,18 @@ impl TransitionState { pub async fn update_workers_inner(api: Arc, n: i64) { let mut n = n; if n == 0 { - n = 100; + let max_workers = std::env::var("RUSTFS_MAX_TRANSITION_WORKERS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or_else(|| std::cmp::min(num_cpus::get() as i64, 16)); + n = max_workers; } + // Allow environment override of maximum workers + let absolute_max = std::env::var("RUSTFS_ABSOLUTE_MAX_WORKERS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(32); + n = std::cmp::min(n, absolute_max); let mut num_workers = GLOBAL_TransitionState.num_workers.load(Ordering::SeqCst); while num_workers < n { @@ -585,7 +599,10 @@ impl TransitionState { } pub async fn init_background_expiry(api: Arc) { - let mut workers = num_cpus::get() / 2; + let mut workers = std::env::var("RUSTFS_MAX_EXPIRY_WORKERS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or_else(|| std::cmp::min(num_cpus::get(), 16)); //globalILMConfig.getExpirationWorkers() if let Ok(env_expiration_workers) = env::var("_RUSTFS_ILM_EXPIRATION_WORKERS") { if let Ok(num_expirations) = env_expiration_workers.parse::() { @@ -594,7 +611,10 @@ pub async fn init_background_expiry(api: Arc) { } if workers == 0 { - workers = 100; + workers = std::env::var("RUSTFS_DEFAULT_EXPIRY_WORKERS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(8); } //let expiry_state = GLOBAL_ExpiryStSate.write().await; diff --git a/crates/ecstore/src/disk/fs.rs b/crates/ecstore/src/disk/fs.rs index 1d13ef96..df22eb7d 100644 --- a/crates/ecstore/src/disk/fs.rs +++ b/crates/ecstore/src/disk/fs.rs @@ -12,13 +12,45 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fs::Metadata, path::Path}; +use std::{ + fs::Metadata, + path::Path, + sync::{Arc, OnceLock}, +}; use tokio::{ fs::{self, File}, io, }; +static READONLY_OPTIONS: OnceLock> = OnceLock::new(); +static WRITEONLY_OPTIONS: OnceLock> = OnceLock::new(); +static READWRITE_OPTIONS: OnceLock> = OnceLock::new(); + +fn get_readonly_options() -> &'static Arc { + READONLY_OPTIONS.get_or_init(|| { + let mut opts = fs::OpenOptions::new(); + opts.read(true); + Arc::new(opts) + }) +} + +fn get_writeonly_options() -> &'static Arc { + WRITEONLY_OPTIONS.get_or_init(|| { + let mut opts = fs::OpenOptions::new(); + opts.write(true); + Arc::new(opts) + }) +} + +fn get_readwrite_options() -> &'static Arc { + READWRITE_OPTIONS.get_or_init(|| { + let mut opts = fs::OpenOptions::new(); + opts.read(true).write(true); + Arc::new(opts) + }) +} + #[cfg(not(windows))] pub fn same_file(f1: &Metadata, f2: &Metadata) -> bool { use std::os::unix::fs::MetadataExt; @@ -84,35 +116,28 @@ pub const O_APPEND: FileMode = 0x00400; // create_new: bool, pub async fn open_file(path: impl AsRef, mode: FileMode) -> io::Result { - let mut opts = fs::OpenOptions::new(); - - match mode & (O_RDONLY | O_WRONLY | O_RDWR) { - O_RDONLY => { - opts.read(true); - } - O_WRONLY => { - opts.write(true); - } - O_RDWR => { - opts.read(true); - opts.write(true); - } - _ => (), + let base_opts = match mode & (O_RDONLY | O_WRONLY | O_RDWR) { + O_RDONLY => get_readonly_options(), + O_WRONLY => get_writeonly_options(), + O_RDWR => get_readwrite_options(), + _ => get_readonly_options(), }; - if mode & O_CREATE != 0 { - opts.create(true); + if (mode & (O_CREATE | O_APPEND | O_TRUNC)) != 0 { + let mut opts = (**base_opts).clone(); + if mode & O_CREATE != 0 { + opts.create(true); + } + if mode & O_APPEND != 0 { + opts.append(true); + } + if mode & O_TRUNC != 0 { + opts.truncate(true); + } + opts.open(path.as_ref()).await + } else { + base_opts.open(path.as_ref()).await } - - if mode & O_APPEND != 0 { - opts.append(true); - } - - if mode & O_TRUNC != 0 { - opts.truncate(true); - } - - opts.open(path.as_ref()).await } pub async fn access(path: impl AsRef) -> io::Result<()> { @@ -121,7 +146,7 @@ pub async fn access(path: impl AsRef) -> io::Result<()> { } pub fn access_std(path: impl AsRef) -> io::Result<()> { - tokio::task::block_in_place(|| std::fs::metadata(path))?; + std::fs::metadata(path)?; Ok(()) } @@ -130,7 +155,7 @@ pub async fn lstat(path: impl AsRef) -> io::Result { } pub fn lstat_std(path: impl AsRef) -> io::Result { - tokio::task::block_in_place(|| std::fs::metadata(path)) + std::fs::metadata(path) } pub async fn make_dir_all(path: impl AsRef) -> io::Result<()> { @@ -159,26 +184,22 @@ pub async fn remove_all(path: impl AsRef) -> io::Result<()> { #[tracing::instrument(level = "debug", skip_all)] pub fn remove_std(path: impl AsRef) -> io::Result<()> { let path = path.as_ref(); - tokio::task::block_in_place(|| { - let meta = std::fs::metadata(path)?; - if meta.is_dir() { - std::fs::remove_dir(path) - } else { - std::fs::remove_file(path) - } - }) + let meta = std::fs::metadata(path)?; + if meta.is_dir() { + std::fs::remove_dir(path) + } else { + std::fs::remove_file(path) + } } pub fn remove_all_std(path: impl AsRef) -> io::Result<()> { let path = path.as_ref(); - tokio::task::block_in_place(|| { - let meta = std::fs::metadata(path)?; - if meta.is_dir() { - std::fs::remove_dir_all(path) - } else { - std::fs::remove_file(path) - } - }) + let meta = std::fs::metadata(path)?; + if meta.is_dir() { + std::fs::remove_dir_all(path) + } else { + std::fs::remove_file(path) + } } pub async fn mkdir(path: impl AsRef) -> io::Result<()> { @@ -190,7 +211,7 @@ pub async fn rename(from: impl AsRef, to: impl AsRef) -> io::Result< } pub fn rename_std(from: impl AsRef, to: impl AsRef) -> io::Result<()> { - tokio::task::block_in_place(|| std::fs::rename(from, to)) + std::fs::rename(from, to) } #[tracing::instrument(level = "debug", skip_all)] diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index c363988c..859ea612 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -41,18 +41,21 @@ use tokio::time::interval; use crate::erasure_coding::bitrot_verify; use bytes::Bytes; -use path_absolutize::Absolutize; +// use path_absolutize::Absolutize; // Replaced with direct path operations for better performance +use crate::file_cache::{get_global_file_cache, prefetch_metadata_patterns, read_metadata_cached}; +use parking_lot::RwLock as ParkingLotRwLock; use rustfs_filemeta::{ Cache, FileInfo, FileInfoOpts, FileMeta, MetaCacheEntry, MetacacheWriter, ObjectPartInfo, Opts, RawFileInfo, UpdateFn, get_file_info, read_xl_meta_no_data, }; use rustfs_utils::HashAlgorithm; use rustfs_utils::os::get_info; +use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Debug; use std::io::SeekFrom; -use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::{Arc, OnceLock}; use std::time::Duration; use std::{ fs::Metadata, @@ -101,6 +104,9 @@ pub struct LocalDisk { pub major: u64, pub minor: u64, pub nrrequests: u64, + // Performance optimization fields + path_cache: Arc>>, + current_dir: Arc>, // pub id: Mutex>, // pub format_data: Mutex>, // pub format_file_info: Mutex>, @@ -130,8 +136,9 @@ impl Debug for LocalDisk { impl LocalDisk { pub async fn new(ep: &Endpoint, cleanup: bool) -> Result { debug!("Creating local disk"); - let root = match PathBuf::from(ep.get_file_path()).absolutize() { - Ok(path) => path.into_owned(), + // Use optimized path resolution instead of absolutize() for better performance + let root = match std::fs::canonicalize(ep.get_file_path()) { + Ok(path) => path, Err(e) => { if e.kind() == ErrorKind::NotFound { return Err(DiskError::VolumeNotFound); @@ -144,10 +151,8 @@ impl LocalDisk { // TODO: 删除 tmp 数据 } - let format_path = Path::new(RUSTFS_META_BUCKET) - .join(Path::new(super::FORMAT_CONFIG_FILE)) - .absolutize_virtually(&root)? - .into_owned(); + // Use optimized path resolution instead of absolutize_virtually + let format_path = root.join(RUSTFS_META_BUCKET).join(super::FORMAT_CONFIG_FILE); debug!("format_path: {:?}", format_path); let (format_data, format_meta) = read_file_exists(&format_path).await?; @@ -227,6 +232,8 @@ impl LocalDisk { // format_file_info: Mutex::new(format_meta), // format_data: Mutex::new(format_data), // format_last_check: Mutex::new(format_last_check), + path_cache: Arc::new(ParkingLotRwLock::new(HashMap::with_capacity(2048))), + current_dir: Arc::new(OnceLock::new()), exit_signal: None, }; let (info, _root) = get_disk_info(root).await?; @@ -351,19 +358,178 @@ impl LocalDisk { self.make_volumes(defaults).await } + // Optimized path resolution with caching pub fn resolve_abs_path(&self, path: impl AsRef) -> Result { - Ok(path.as_ref().absolutize_virtually(&self.root)?.into_owned()) + let path_ref = path.as_ref(); + let path_str = path_ref.to_string_lossy(); + + // Fast cache read + { + let cache = self.path_cache.read(); + if let Some(cached_path) = cache.get(path_str.as_ref()) { + return Ok(cached_path.clone()); + } + } + + // Calculate absolute path without using path_absolutize for better performance + let abs_path = if path_ref.is_absolute() { + path_ref.to_path_buf() + } else { + self.root.join(path_ref) + }; + + // Normalize path components to avoid filesystem calls + let normalized = self.normalize_path_components(&abs_path); + + // Cache the result + { + let mut cache = self.path_cache.write(); + + // Simple cache size control + if cache.len() >= 4096 { + // Clear half the cache - simple eviction strategy + let keys_to_remove: Vec<_> = cache.keys().take(cache.len() / 2).cloned().collect(); + for key in keys_to_remove { + cache.remove(&key); + } + } + + cache.insert(path_str.into_owned(), normalized.clone()); + } + + Ok(normalized) } + // Lightweight path normalization without filesystem calls + fn normalize_path_components(&self, path: &Path) -> PathBuf { + let mut result = PathBuf::new(); + + for component in path.components() { + match component { + std::path::Component::Normal(name) => { + result.push(name); + } + std::path::Component::ParentDir => { + result.pop(); + } + std::path::Component::CurDir => { + // Ignore current directory components + } + std::path::Component::RootDir => { + result.push(component); + } + std::path::Component::Prefix(_prefix) => { + result.push(component); + } + } + } + + result + } + + // Highly optimized object path generation pub fn get_object_path(&self, bucket: &str, key: &str) -> Result { - let dir = Path::new(&bucket); - let file_path = Path::new(&key); - self.resolve_abs_path(dir.join(file_path)) + // For high-frequency paths, use faster string concatenation + let cache_key = if key.is_empty() { + bucket.to_string() + } else { + // Use with_capacity to pre-allocate, reducing memory reallocations + let mut path_str = String::with_capacity(bucket.len() + key.len() + 1); + path_str.push_str(bucket); + path_str.push('/'); + path_str.push_str(key); + path_str + }; + + // Fast path: directly calculate based on root, avoiding cache lookup overhead for simple cases + Ok(self.root.join(&cache_key)) } pub fn get_bucket_path(&self, bucket: &str) -> Result { - let dir = Path::new(&bucket); - self.resolve_abs_path(dir) + Ok(self.root.join(bucket)) + } + + // Batch path generation with single lock acquisition + pub fn get_object_paths_batch(&self, requests: &[(String, String)]) -> Result> { + let mut results = Vec::with_capacity(requests.len()); + let mut cache_misses = Vec::new(); + + // First attempt to get all paths from cache + { + let cache = self.path_cache.read(); + for (i, (bucket, key)) in requests.iter().enumerate() { + let cache_key = format!("{}/{}", bucket, key); + if let Some(cached_path) = cache.get(&cache_key) { + results.push((i, cached_path.clone())); + } else { + cache_misses.push((i, bucket, key, cache_key)); + } + } + } + + // Handle cache misses + if !cache_misses.is_empty() { + let mut new_entries = Vec::new(); + for (i, _bucket, _key, cache_key) in cache_misses { + let path = self.root.join(&cache_key); + results.push((i, path.clone())); + new_entries.push((cache_key, path)); + } + + // Batch update cache + { + let mut cache = self.path_cache.write(); + for (key, path) in new_entries { + cache.insert(key, path); + } + } + } + + // Sort results back to original order + results.sort_by_key(|(i, _)| *i); + Ok(results.into_iter().map(|(_, path)| path).collect()) + } + + // Optimized metadata reading with caching + pub async fn read_metadata_cached(&self, path: PathBuf) -> Result> { + read_metadata_cached(path).await + } + + // Smart prefetching for related files + pub async fn read_version_with_prefetch( + &self, + volume: &str, + path: &str, + version_id: &str, + opts: &ReadOptions, + ) -> Result { + let file_path = self.get_object_path(volume, path)?; + + // Async prefetch related files, don't block current read + if let Some(parent) = file_path.parent() { + prefetch_metadata_patterns(parent, &[super::STORAGE_FORMAT_FILE, "part.1", "part.2", "part.meta"]).await; + } + + // Main read logic + let file_dir = self.get_bucket_path(volume)?; + let (data, _) = self.read_raw(volume, file_dir, file_path, opts.read_data).await?; + + get_file_info(&data, volume, path, version_id, FileInfoOpts { data: opts.read_data }) + .await + .map_err(|_e| DiskError::Unexpected) + } + + // Batch metadata reading for multiple objects + pub async fn read_metadata_batch(&self, requests: Vec<(String, String)>) -> Result>>> { + let paths: Vec = requests + .iter() + .map(|(bucket, key)| self.get_object_path(bucket, &format!("{}/{}", key, super::STORAGE_FORMAT_FILE))) + .collect::>>()?; + + let cache = get_global_file_cache(); + let results = cache.get_metadata_batch(paths).await; + + Ok(results.into_iter().map(|r| r.ok()).collect()) } // /// Write to the filesystem atomically. @@ -549,7 +715,15 @@ impl LocalDisk { } async fn read_metadata(&self, file_path: impl AsRef) -> Result> { - // TODO: support timeout + // Try to use cached file content reading for better performance, with safe fallback + let path = file_path.as_ref().to_path_buf(); + + // First, try the cache + if let Ok(bytes) = get_global_file_cache().get_file_content(path.clone()).await { + return Ok(bytes.to_vec()); + } + + // Fallback to direct read if cache fails let (data, _) = self.read_metadata_with_dmtime(file_path.as_ref()).await?; Ok(data) } diff --git a/crates/ecstore/src/disk/mod.rs b/crates/ecstore/src/disk/mod.rs index f680a42d..63b3e49f 100644 --- a/crates/ecstore/src/disk/mod.rs +++ b/crates/ecstore/src/disk/mod.rs @@ -668,7 +668,7 @@ pub struct VolumeInfo { pub created: Option, } -#[derive(Deserialize, Serialize, Debug, Default)] +#[derive(Deserialize, Serialize, Debug, Default, Clone)] pub struct ReadOptions { pub incl_free_versions: bool, pub read_data: bool, diff --git a/crates/ecstore/src/endpoints.rs b/crates/ecstore/src/endpoints.rs index 79226111..d9e209ed 100644 --- a/crates/ecstore/src/endpoints.rs +++ b/crates/ecstore/src/endpoints.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use rustfs_utils::{XHost, check_local_server_addr, get_host_ip, get_host_ip_async, is_local_host}; +use rustfs_utils::{XHost, check_local_server_addr, get_host_ip, is_local_host}; use tracing::{error, instrument, warn}; use crate::{ @@ -248,8 +248,7 @@ impl PoolEndpointList { Ok(ips) => ips, Err(e) => { error!("host {} not found, error:{}", host, e); - get_host_ip_async(host.clone()) - .map_err(|e| Error::other(format!("host '{host}' cannot resolve: {e}")))? + return Err(Error::other(format!("host '{host}' cannot resolve: {e}"))); } }; host_ip_cache.insert(host.clone(), ips); diff --git a/crates/ecstore/src/file_cache.rs b/crates/ecstore/src/file_cache.rs new file mode 100644 index 00000000..ad47e505 --- /dev/null +++ b/crates/ecstore/src/file_cache.rs @@ -0,0 +1,332 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! High-performance file content and metadata caching using moka +//! +//! This module provides optimized caching for file operations to reduce +//! redundant I/O and improve overall system performance. + +use super::disk::error::{Error, Result}; +use bytes::Bytes; +use moka::future::Cache; +use rustfs_filemeta::FileMeta; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Duration; + +pub struct OptimizedFileCache { + // Use moka as high-performance async cache + metadata_cache: Cache>, + file_content_cache: Cache, + // Performance monitoring + cache_hits: std::sync::atomic::AtomicU64, + cache_misses: std::sync::atomic::AtomicU64, +} + +impl OptimizedFileCache { + pub fn new() -> Self { + Self { + metadata_cache: Cache::builder() + .max_capacity(2048) + .time_to_live(Duration::from_secs(300)) // 5 minutes TTL + .time_to_idle(Duration::from_secs(60)) // 1 minute idle + .build(), + + file_content_cache: Cache::builder() + .max_capacity(512) // Smaller file content cache + .time_to_live(Duration::from_secs(120)) + .weigher(|_key: &PathBuf, value: &Bytes| value.len() as u32) + .build(), + + cache_hits: std::sync::atomic::AtomicU64::new(0), + cache_misses: std::sync::atomic::AtomicU64::new(0), + } + } + + pub async fn get_metadata(&self, path: PathBuf) -> Result> { + if let Some(cached) = self.metadata_cache.get(&path).await { + self.cache_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + return Ok(cached); + } + + self.cache_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + // Cache miss, read file + let data = tokio::fs::read(&path) + .await + .map_err(|e| Error::other(format!("Read metadata failed: {}", e)))?; + + let mut meta = FileMeta::default(); + meta.unmarshal_msg(&data)?; + + let arc_meta = Arc::new(meta); + self.metadata_cache.insert(path, arc_meta.clone()).await; + + Ok(arc_meta) + } + + pub async fn get_file_content(&self, path: PathBuf) -> Result { + if let Some(cached) = self.file_content_cache.get(&path).await { + self.cache_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + return Ok(cached); + } + + self.cache_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let data = tokio::fs::read(&path) + .await + .map_err(|e| Error::other(format!("Read file failed: {}", e)))?; + + let bytes = Bytes::from(data); + self.file_content_cache.insert(path, bytes.clone()).await; + + Ok(bytes) + } + + // Prefetch related files + pub async fn prefetch_related(&self, base_path: &Path, patterns: &[&str]) { + let mut prefetch_tasks = Vec::new(); + + for pattern in patterns { + let path = base_path.join(pattern); + if tokio::fs::metadata(&path).await.is_ok() { + let cache = self.clone(); + let path_clone = path.clone(); + prefetch_tasks.push(async move { + let _ = cache.get_metadata(path_clone).await; + }); + } + } + + // Parallel prefetch, don't wait for completion + if !prefetch_tasks.is_empty() { + tokio::spawn(async move { + futures::future::join_all(prefetch_tasks).await; + }); + } + } + + // Batch metadata reading with deduplication + pub async fn get_metadata_batch( + &self, + paths: Vec, + ) -> Vec, rustfs_filemeta::Error>> { + let mut results = Vec::with_capacity(paths.len()); + let mut cache_futures = Vec::new(); + + // First, attempt to get from cache + for (i, path) in paths.iter().enumerate() { + if let Some(cached) = self.metadata_cache.get(path).await { + results.push((i, Ok(cached))); + self.cache_hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } else { + cache_futures.push((i, path.clone())); + } + } + + // For cache misses, read from filesystem + if !cache_futures.is_empty() { + let mut fs_results = Vec::new(); + + for (i, path) in cache_futures { + self.cache_misses.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + match tokio::fs::read(&path).await { + Ok(data) => { + let mut meta = FileMeta::default(); + match meta.unmarshal_msg(&data) { + Ok(_) => { + let arc_meta = Arc::new(meta); + self.metadata_cache.insert(path, arc_meta.clone()).await; + fs_results.push((i, Ok(arc_meta))); + } + Err(e) => { + fs_results.push((i, Err(e))); + } + } + } + Err(_e) => { + fs_results.push((i, Err(rustfs_filemeta::Error::Unexpected))); + } + } + } + + results.extend(fs_results); + } + + // Sort results back to original order + results.sort_by_key(|(i, _)| *i); + results.into_iter().map(|(_, result)| result).collect() + } + + // Invalidate cache entries for a path + pub async fn invalidate(&self, path: &Path) { + self.metadata_cache.remove(path).await; + self.file_content_cache.remove(path).await; + } + + // Get cache statistics + pub fn get_stats(&self) -> FileCacheStats { + let hits = self.cache_hits.load(std::sync::atomic::Ordering::Relaxed); + let misses = self.cache_misses.load(std::sync::atomic::Ordering::Relaxed); + let hit_rate = if hits + misses > 0 { + (hits as f64 / (hits + misses) as f64) * 100.0 + } else { + 0.0 + }; + + FileCacheStats { + metadata_cache_size: self.metadata_cache.entry_count(), + content_cache_size: self.file_content_cache.entry_count(), + cache_hits: hits, + cache_misses: misses, + hit_rate, + total_weight: 0, // Simplified for compatibility + } + } + + // Clear all caches + pub async fn clear(&self) { + self.metadata_cache.invalidate_all(); + self.file_content_cache.invalidate_all(); + + // Wait for invalidation to complete + self.metadata_cache.run_pending_tasks().await; + self.file_content_cache.run_pending_tasks().await; + } +} + +impl Clone for OptimizedFileCache { + fn clone(&self) -> Self { + Self { + metadata_cache: self.metadata_cache.clone(), + file_content_cache: self.file_content_cache.clone(), + cache_hits: std::sync::atomic::AtomicU64::new(self.cache_hits.load(std::sync::atomic::Ordering::Relaxed)), + cache_misses: std::sync::atomic::AtomicU64::new(self.cache_misses.load(std::sync::atomic::Ordering::Relaxed)), + } + } +} + +#[derive(Debug)] +pub struct FileCacheStats { + pub metadata_cache_size: u64, + pub content_cache_size: u64, + pub cache_hits: u64, + pub cache_misses: u64, + pub hit_rate: f64, + pub total_weight: u64, +} + +impl Default for OptimizedFileCache { + fn default() -> Self { + Self::new() + } +} + +// Global cache instance +use std::sync::OnceLock; + +static GLOBAL_FILE_CACHE: OnceLock = OnceLock::new(); + +pub fn get_global_file_cache() -> &'static OptimizedFileCache { + GLOBAL_FILE_CACHE.get_or_init(OptimizedFileCache::new) +} + +// Utility functions for common operations +pub async fn read_metadata_cached(path: PathBuf) -> Result> { + get_global_file_cache().get_metadata(path).await +} + +pub async fn read_file_content_cached(path: PathBuf) -> Result { + get_global_file_cache().get_file_content(path).await +} + +pub async fn prefetch_metadata_patterns(base_path: &Path, patterns: &[&str]) { + get_global_file_cache().prefetch_related(base_path, patterns).await; +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write; + use tempfile::tempdir; + + #[tokio::test] + async fn test_file_cache_basic() { + let cache = OptimizedFileCache::new(); + + // Create a temporary file + let dir = tempdir().unwrap(); + let file_path = dir.path().join("test.txt"); + let mut file = std::fs::File::create(&file_path).unwrap(); + writeln!(file, "test content").unwrap(); + drop(file); + + // First read should be cache miss + let content1 = cache.get_file_content(file_path.clone()).await.unwrap(); + assert_eq!(content1, Bytes::from("test content\n")); + + // Second read should be cache hit + let content2 = cache.get_file_content(file_path.clone()).await.unwrap(); + assert_eq!(content2, content1); + + let stats = cache.get_stats(); + assert!(stats.cache_hits > 0); + assert!(stats.cache_misses > 0); + } + + #[tokio::test] + async fn test_metadata_batch_read() { + let cache = OptimizedFileCache::new(); + + // Create test files + let dir = tempdir().unwrap(); + let mut paths = Vec::new(); + + for i in 0..5 { + let file_path = dir.path().join(format!("test_{}.txt", i)); + let mut file = std::fs::File::create(&file_path).unwrap(); + writeln!(file, "content {}", i).unwrap(); + paths.push(file_path); + } + + // Note: This test would need actual FileMeta files to work properly + // For now, we just test that the function runs without errors + let results = cache.get_metadata_batch(paths).await; + assert_eq!(results.len(), 5); + } + + #[tokio::test] + async fn test_cache_invalidation() { + let cache = OptimizedFileCache::new(); + + let dir = tempdir().unwrap(); + let file_path = dir.path().join("test.txt"); + let mut file = std::fs::File::create(&file_path).unwrap(); + writeln!(file, "test content").unwrap(); + drop(file); + + // Read file to populate cache + let _ = cache.get_file_content(file_path.clone()).await.unwrap(); + + // Invalidate cache + cache.invalidate(&file_path).await; + + // Next read should be cache miss again + let _ = cache.get_file_content(file_path.clone()).await.unwrap(); + + let stats = cache.get_stats(); + assert!(stats.cache_misses >= 2); + } +} diff --git a/crates/ecstore/src/lib.rs b/crates/ecstore/src/lib.rs index 770e0402..b99424f8 100644 --- a/crates/ecstore/src/lib.rs +++ b/crates/ecstore/src/lib.rs @@ -16,6 +16,7 @@ extern crate core; pub mod admin_server_info; +pub mod batch_processor; pub mod bitrot; pub mod bucket; pub mod cache_value; @@ -29,6 +30,7 @@ pub mod disks_layout; pub mod endpoints; pub mod erasure_coding; pub mod error; +pub mod file_cache; pub mod global; pub mod lock_utils; pub mod metrics_realtime; diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index db8f4d44..f17f6743 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -15,6 +15,7 @@ #![allow(unused_imports)] #![allow(unused_variables)] +use crate::batch_processor::{AsyncBatchProcessor, get_global_processors}; use crate::bitrot::{create_bitrot_reader, create_bitrot_writer}; use crate::bucket::lifecycle::lifecycle::TRANSITION_COMPLETE; use crate::bucket::versioning::VersioningApi; @@ -232,7 +233,10 @@ impl SetDisks { }); } - let results = join_all(futures).await; + // Use optimized batch processor for disk info retrieval + let processor = get_global_processors().metadata_processor(); + let results = processor.execute_batch(futures).await; + for result in results { match result { Ok(res) => { @@ -507,21 +511,28 @@ impl SetDisks { #[tracing::instrument(skip(disks))] async fn cleanup_multipart_path(disks: &[Option], paths: &[String]) { - let mut futures = Vec::with_capacity(disks.len()); - let mut errs = Vec::with_capacity(disks.len()); - for disk in disks.iter() { - futures.push(async move { - if let Some(disk) = disk { - disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, paths).await - } else { - Err(DiskError::DiskNotFound) + // Use improved simple batch processor instead of join_all for better performance + let processor = get_global_processors().write_processor(); + + let tasks: Vec<_> = disks + .iter() + .map(|disk| { + let disk = disk.clone(); + let paths = paths.to_vec(); + + async move { + if let Some(disk) = disk { + disk.delete_paths(RUSTFS_META_MULTIPART_BUCKET, &paths).await + } else { + Err(DiskError::DiskNotFound) + } } }) - } + .collect(); - let results = join_all(futures).await; + let results = processor.execute_batch(tasks).await; for result in results { match result { Ok(_) => { @@ -545,21 +556,32 @@ impl SetDisks { part_numbers: &[usize], read_quorum: usize, ) -> disk::error::Result> { - let mut futures = Vec::with_capacity(disks.len()); - for (i, disk) in disks.iter().enumerate() { - futures.push(async move { - if let Some(disk) = disk { - disk.read_parts(bucket, part_meta_paths).await - } else { - Err(DiskError::DiskNotFound) - } - }); - } - let mut errs = Vec::with_capacity(disks.len()); let mut object_parts = Vec::with_capacity(disks.len()); - let results = join_all(futures).await; + // Use batch processor for better performance + let processor = get_global_processors().read_processor(); + let bucket = bucket.to_string(); + let part_meta_paths = part_meta_paths.to_vec(); + + let tasks: Vec<_> = disks + .iter() + .map(|disk| { + let disk = disk.clone(); + let bucket = bucket.clone(); + let part_meta_paths = part_meta_paths.clone(); + + async move { + if let Some(disk) = disk { + disk.read_parts(&bucket, &part_meta_paths).await + } else { + Err(DiskError::DiskNotFound) + } + } + }) + .collect(); + + let results = processor.execute_batch(tasks).await; for result in results { match result { Ok(res) => { @@ -1369,22 +1391,71 @@ impl SetDisks { }) }); + // Wait for all tasks to complete let results = join_all(futures).await; + for result in results { - match result? { - Ok(res) => { - ress.push(res); - errors.push(None); - } - Err(e) => { + match result { + Ok(res) => match res { + Ok(file_info) => { + ress.push(file_info); + errors.push(None); + } + Err(e) => { + ress.push(FileInfo::default()); + errors.push(Some(e)); + } + }, + Err(_) => { ress.push(FileInfo::default()); - errors.push(Some(e)); + errors.push(Some(DiskError::Unexpected)); } } } Ok((ress, errors)) } + // Optimized version using batch processor with quorum support + pub async fn read_version_optimized( + &self, + bucket: &str, + object: &str, + version_id: &str, + opts: &ReadOptions, + ) -> Result> { + // Use existing disk selection logic + let disks = self.disks.read().await; + let required_reads = self.format.erasure.sets.len(); + + // Clone parameters outside the closure to avoid lifetime issues + let bucket = bucket.to_string(); + let object = object.to_string(); + let version_id = version_id.to_string(); + let opts = opts.clone(); + + let processor = get_global_processors().read_processor(); + let tasks: Vec<_> = disks + .iter() + .take(required_reads + 2) // Read a few extra for reliability + .filter_map(|disk| { + disk.as_ref().map(|d| { + let disk = d.clone(); + let bucket = bucket.clone(); + let object = object.clone(); + let version_id = version_id.clone(); + let opts = opts.clone(); + + async move { disk.read_version(&bucket, &bucket, &object, &version_id, &opts).await } + }) + }) + .collect(); + + match processor.execute_batch_with_quorum(tasks, required_reads).await { + Ok(results) => Ok(results), + Err(_) => Err(DiskError::FileNotFound.into()), // Use existing error type + } + } + async fn read_all_xl( disks: &[Option], bucket: &str, @@ -1403,10 +1474,11 @@ impl SetDisks { object: &str, read_data: bool, ) -> (Vec>, Vec>) { - let mut futures = Vec::with_capacity(disks.len()); let mut ress = Vec::with_capacity(disks.len()); let mut errors = Vec::with_capacity(disks.len()); + let mut futures = Vec::with_capacity(disks.len()); + for disk in disks.iter() { futures.push(async move { if let Some(disk) = disk { diff --git a/crates/ecstore/src/tier/tier.rs b/crates/ecstore/src/tier/tier.rs index fdfed498..a4080f62 100644 --- a/crates/ecstore/src/tier/tier.rs +++ b/crates/ecstore/src/tier/tier.rs @@ -302,17 +302,19 @@ impl TierConfigMgr { } pub async fn get_driver<'a>(&'a mut self, tier_name: &str) -> std::result::Result<&'a WarmBackendImpl, AdminError> { - Ok(match self.driver_cache.entry(tier_name.to_string()) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(e) => { - let t = self.tiers.get(tier_name); - if t.is_none() { - return Err(ERR_TIER_NOT_FOUND.clone()); - } - let d = new_warm_backend(t.expect("err"), false).await?; - e.insert(d) - } - }) + // Return cached driver if present + if self.driver_cache.contains_key(tier_name) { + return Ok(self.driver_cache.get(tier_name).unwrap()); + } + + // Get tier configuration and create new driver + let tier_config = self.tiers.get(tier_name).ok_or_else(|| ERR_TIER_NOT_FOUND.clone())?; + + let driver = new_warm_backend(tier_config, false).await?; + + // Insert and return reference + self.driver_cache.insert(tier_name.to_string(), driver); + Ok(self.driver_cache.get(tier_name).unwrap()) } pub async fn reload(&mut self, api: Arc) -> std::result::Result<(), std::io::Error> { diff --git a/crates/lock/Cargo.toml b/crates/lock/Cargo.toml index 3d65b815..c2ebd58b 100644 --- a/crates/lock/Cargo.toml +++ b/crates/lock/Cargo.toml @@ -47,3 +47,4 @@ smallvec = "1.11" smartstring = "1.0" crossbeam-queue = "0.3" heapless = "0.8" +pprof.workspace = true diff --git a/crates/policy/src/policy/doc.rs b/crates/policy/src/policy/doc.rs index cd9a0817..fde312b6 100644 --- a/crates/policy/src/policy/doc.rs +++ b/crates/policy/src/policy/doc.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize, de::Error}; use time::OffsetDateTime; use super::Policy; @@ -59,15 +59,17 @@ impl TryFrom> for PolicyDoc { type Error = serde_json::Error; fn try_from(value: Vec) -> Result { - match serde_json::from_slice::(&value) { - Ok(res) => Ok(res), - Err(err) => match serde_json::from_slice::(&value) { - Ok(res2) => Ok(Self { - policy: res2, - ..Default::default() - }), - Err(_) => Err(err), - }, + // Try to parse as PolicyDoc first + if let Ok(policy_doc) = serde_json::from_slice::(&value) { + return Ok(policy_doc); } + + // Fall back to parsing as Policy and wrap in PolicyDoc + serde_json::from_slice::(&value) + .map(|policy| Self { + policy, + ..Default::default() + }) + .map_err(|_| serde_json::Error::custom("Failed to parse as PolicyDoc or Policy".to_string())) } } diff --git a/crates/utils/src/net.rs b/crates/utils/src/net.rs index 71018420..dd81578e 100644 --- a/crates/utils/src/net.rs +++ b/crates/utils/src/net.rs @@ -16,17 +16,40 @@ use bytes::Bytes; use futures::pin_mut; use futures::{Stream, StreamExt}; use std::net::Ipv6Addr; -use std::sync::LazyLock; +use std::sync::{LazyLock, Mutex}; use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, fmt::Display, net::{IpAddr, SocketAddr, TcpListener, ToSocketAddrs}, + time::{Duration, Instant}, }; use transform_stream::AsyncTryStream; use url::{Host, Url}; static LOCAL_IPS: LazyLock> = LazyLock::new(|| must_get_local_ips().unwrap()); +#[derive(Debug, Clone)] +struct DnsCacheEntry { + ips: HashSet, + cached_at: Instant, +} + +impl DnsCacheEntry { + fn new(ips: HashSet) -> Self { + Self { + ips, + cached_at: Instant::now(), + } + } + + fn is_expired(&self, ttl: Duration) -> bool { + self.cached_at.elapsed() > ttl + } +} + +static DNS_CACHE: LazyLock>> = LazyLock::new(|| Mutex::new(HashMap::new())); +const DNS_CACHE_TTL: Duration = Duration::from_secs(300); // 5 minutes + /// helper for validating if the provided arg is an ip address. pub fn is_socket_addr(addr: &str) -> bool { // TODO IPv6 zone information? @@ -87,19 +110,19 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> std::io::R } /// returns IP address of given host using layered DNS resolution. -pub fn get_host_ip_async(host: Host<&str>) -> std::io::Result> { +/// +/// This is the async version of `get_host_ip()` that provides enhanced DNS resolution +/// with Kubernetes support when the "net" feature is enabled. +pub async fn get_host_ip_async(host: Host<&str>) -> std::io::Result> { match host { Host::Domain(domain) => { #[cfg(feature = "net")] { use crate::dns_resolver::resolve_domain; - let handle = tokio::runtime::Handle::current(); - handle.block_on(async { - match resolve_domain(domain).await { - Ok(ips) => Ok(ips.into_iter().collect()), - Err(e) => Err(std::io::Error::other(format!("DNS resolution failed: {}", e))), - } - }) + match resolve_domain(domain).await { + Ok(ips) => Ok(ips.into_iter().collect()), + Err(e) => Err(std::io::Error::other(format!("DNS resolution failed: {}", e))), + } } #[cfg(not(feature = "net"))] { @@ -128,17 +151,41 @@ pub fn get_host_ip_async(host: Host<&str>) -> std::io::Result> { /// returns IP address of given host using standard resolution. /// -/// **Note**: This function uses standard library DNS resolution. +/// **Note**: This function uses standard library DNS resolution with caching. /// For enhanced DNS resolution with Kubernetes support, use `get_host_ip_async()`. pub fn get_host_ip(host: Host<&str>) -> std::io::Result> { match host { - Host::Domain(domain) => match (domain, 0) - .to_socket_addrs() - .map(|v| v.map(|v| v.ip()).collect::>()) - { - Ok(ips) => Ok(ips), - Err(err) => Err(std::io::Error::other(err)), - }, + Host::Domain(domain) => { + // Check cache first + if let Ok(mut cache) = DNS_CACHE.lock() { + if let Some(entry) = cache.get(domain) { + if !entry.is_expired(DNS_CACHE_TTL) { + return Ok(entry.ips.clone()); + } + // Remove expired entry + cache.remove(domain); + } + } + + // Perform DNS resolution + match (domain, 0) + .to_socket_addrs() + .map(|v| v.map(|v| v.ip()).collect::>()) + { + Ok(ips) => { + // Cache the result + if let Ok(mut cache) = DNS_CACHE.lock() { + cache.insert(domain.to_string(), DnsCacheEntry::new(ips.clone())); + // Limit cache size to prevent memory bloat + if cache.len() > 1000 { + cache.retain(|_, v| !v.is_expired(DNS_CACHE_TTL)); + } + } + Ok(ips) + } + Err(err) => Err(std::io::Error::other(err)), + } + } Host::Ipv4(ip) => { let mut set = HashSet::with_capacity(1); set.insert(IpAddr::V4(ip)); diff --git a/docs/PERFORMANCE_TESTING.md b/docs/PERFORMANCE_TESTING.md new file mode 100644 index 00000000..a980cd04 --- /dev/null +++ b/docs/PERFORMANCE_TESTING.md @@ -0,0 +1,326 @@ +# RustFS 性能测试指南 + +本文档提供了对 RustFS 进行性能测试和性能分析的完整方法和工具。 + +## 概览 + +RustFS 提供了多种性能测试和分析工具: + +1. **性能分析(Profiling)** - 使用内置的 pprof 接口收集 CPU 性能数据 +2. **负载测试(Load Testing)** - 使用多种客户端工具模拟高并发请求 +3. **监控和分析** - 查看性能指标和识别性能瓶颈 + +## 前置条件 + +### 1. 启用性能分析 + +在启动 RustFS 时,需要设置环境变量启用性能分析功能: + +```bash +export RUSTFS_ENABLE_PROFILING=true +./rustfs +``` + +### 2. 安装依赖工具 + +确保系统中安装了以下工具: + +```bash +# 基础工具 +curl # HTTP 请求 +jq # JSON 处理 (可选) + +# 分析工具 +go # Go pprof 工具 (可选,用于 protobuf 格式) +python3 # Python 负载测试脚本 + +# macOS 用户 +brew install curl jq go python3 + +# Ubuntu/Debian 用户 +sudo apt-get install curl jq golang-go python3 +``` + +## 性能测试方法 + +### 方法 1:使用专业脚本(推荐) + +项目提供了完整的性能分析脚本: + +```bash +# 查看脚本帮助 +./scripts/profile_rustfs.sh help + +# 检查性能分析状态 +./scripts/profile_rustfs.sh status + +# 收集火焰图(30秒) +./scripts/profile_rustfs.sh flamegraph + +# 收集 protobuf 格式性能数据 +./scripts/profile_rustfs.sh protobuf + +# 收集两种格式的性能数据 +./scripts/profile_rustfs.sh both + +# 自定义参数 +./scripts/profile_rustfs.sh -d 60 -u http://192.168.1.100:9000 both +``` + +### 方法 2:使用 Python 综合测试 + +Python 脚本提供了负载测试和性能分析的一体化解决方案: + +```bash +# 运行综合性能分析 +python3 test_load.py +``` + +此脚本会: +1. 启动后台负载测试(多线程 S3 操作) +2. 并行收集性能分析数据 +3. 生成火焰图用于分析 + +### 方法 3:使用简单负载测试 + +对于快速测试,可以使用 bash 脚本: + +```bash +# 运行简单负载测试 +./simple_load_test.sh +``` + +## 性能分析输出格式 + +### 1. 火焰图(SVG 格式) + +- **用途**: 可视化 CPU 使用情况 +- **文件**: `rustfs_profile_TIMESTAMP.svg` +- **查看方式**: 使用浏览器打开 SVG 文件 +- **分析要点**: + - 宽度表示 CPU 使用时间 + - 高度表示调用栈深度 + - 点击可以放大特定函数 + +```bash +# 在浏览器中打开 +open profiles/rustfs_profile_20240911_143000.svg +``` + +### 2. Protobuf 格式 + +- **用途**: 使用 Go pprof 工具进行详细分析 +- **文件**: `rustfs_profile_TIMESTAMP.pb` +- **分析工具**: `go tool pprof` + +```bash +# 使用 Go pprof 分析 +go tool pprof profiles/rustfs_profile_20240911_143000.pb + +# pprof 常用命令 +(pprof) top # 显示 CPU 使用率最高的函数 +(pprof) list func # 显示指定函数的源代码 +(pprof) web # 生成 web 界面(需要 graphviz) +(pprof) png # 生成 PNG 图片 +(pprof) help # 查看所有命令 +``` + +## API 接口使用 + +### 检查性能分析状态 + +```bash +curl "http://127.0.0.1:9000/rustfs/admin/debug/pprof/status" +``` + +返回示例: +```json +{ + "enabled": "true", + "sampling_rate": "100" +} +``` + +### 收集性能数据 + +```bash +# 收集 30 秒的火焰图 +curl "http://127.0.0.1:9000/rustfs/admin/debug/pprof/profile?seconds=30&format=flamegraph" \ + -o profile.svg + +# 收集 protobuf 格式数据 +curl "http://127.0.0.1:9000/rustfs/admin/debug/pprof/profile?seconds=30&format=protobuf" \ + -o profile.pb +``` + +**参数说明**: +- `seconds`: 收集时长(1-300 秒) +- `format`: 输出格式(`flamegraph`/`svg` 或 `protobuf`/`pb`) + +## 负载测试场景 + +### 1. S3 API 负载测试 + +使用 Python 脚本进行完整的 S3 操作负载测试: + +```python +# 基本配置 +tester = S3LoadTester( + endpoint="http://127.0.0.1:9000", + access_key="rustfsadmin", + secret_key="rustfsadmin" +) + +# 运行负载测试 +# 4 个线程,每个线程执行 10 次操作 +tester.run_load_test(num_threads=4, operations_per_thread=10) +``` + +每次操作包括: +1. 上传 1MB 对象 +2. 下载对象 +3. 删除对象 + +### 2. 自定义负载测试 + +```bash +# 创建测试桶 +curl -X PUT "http://127.0.0.1:9000/test-bucket" + +# 并发上传测试 +for i in {1..10}; do + echo "test data $i" | curl -X PUT "http://127.0.0.1:9000/test-bucket/object-$i" -d @- & +done +wait + +# 并发下载测试 +for i in {1..10}; do + curl "http://127.0.0.1:9000/test-bucket/object-$i" > /dev/null & +done +wait +``` + +## 性能分析最佳实践 + +### 1. 测试环境准备 + +- 确保 RustFS 已启用性能分析: `RUSTFS_ENABLE_PROFILING=true` +- 使用独立的测试环境,避免其他程序干扰 +- 确保有足够的磁盘空间存储分析文件 + +### 2. 数据收集建议 + +- **预热阶段**: 先运行 5-10 分钟的轻量负载 +- **数据收集**: 在稳定负载下收集 30-60 秒的性能数据 +- **多次采样**: 收集多个样本进行对比分析 + +### 3. 分析重点 + +在火焰图中重点关注: + +1. **宽度最大的函数** - CPU 使用时间最长 +2. **平顶函数** - 可能的性能瓶颈 +3. **深度调用栈** - 可能的递归或复杂逻辑 +4. **意外的系统调用** - I/O 或内存分配问题 + +### 4. 常见性能问题 + +- **锁竞争**: 查找 `std::sync` 相关函数 +- **内存分配**: 查找 `alloc` 相关函数 +- **I/O 等待**: 查找文件系统或网络 I/O 函数 +- **序列化开销**: 查找 JSON/XML 解析函数 + +## 故障排除 + +### 1. 性能分析未启用 + +错误信息:`{"enabled":"false"}` + +解决方案: +```bash +export RUSTFS_ENABLE_PROFILING=true +# 重启 RustFS +``` + +### 2. 连接被拒绝 + +错误信息:`Connection refused` + +检查项: +- RustFS 是否正在运行 +- 端口是否正确(默认 9000) +- 防火墙设置 + +### 3. 分析文件过大 + +如果生成的分析文件过大: +- 减少收集时间(如 15-30 秒) +- 降低负载测试的并发度 +- 使用 protobuf 格式而非 SVG + +## 配置参数 + +### 环境变量 + +| 变量 | 默认值 | 描述 | +|------|--------|------| +| `RUSTFS_ENABLE_PROFILING` | `false` | 启用性能分析 | +| `RUSTFS_URL` | `http://127.0.0.1:9000` | RustFS 服务器地址 | +| `PROFILE_DURATION` | `30` | 性能数据收集时长(秒) | +| `OUTPUT_DIR` | `./profiles` | 输出文件目录 | + +### 脚本参数 + +```bash +./scripts/profile_rustfs.sh [OPTIONS] [COMMAND] + +OPTIONS: + -u, --url URL RustFS URL + -d, --duration SECONDS Profile duration + -o, --output DIR Output directory + +COMMANDS: + status 检查状态 + flamegraph 收集火焰图 + protobuf 收集 protobuf 数据 + both 收集两种格式(默认) +``` + +## 输出文件位置 + +- **脚本输出**: `./profiles/` 目录 +- **Python 脚本**: `/tmp/rustfs_profiles/` 目录 +- **文件命名**: `rustfs_profile_TIMESTAMP.{svg|pb}` + +## 示例工作流程 + +1. **启动 RustFS**: + ```bash + RUSTFS_ENABLE_PROFILING=true ./rustfs + ``` + +2. **验证性能分析可用**: + ```bash + ./scripts/profile_rustfs.sh status + ``` + +3. **开始负载测试**: + ```bash + python3 test_load.py & + ``` + +4. **收集性能数据**: + ```bash + ./scripts/profile_rustfs.sh -d 60 both + ``` + +5. **分析结果**: + ```bash + # 查看火焰图 + open profiles/rustfs_profile_*.svg + + # 或使用 pprof 分析 + go tool pprof profiles/rustfs_profile_*.pb + ``` + +通过这个完整的性能测试流程,你可以系统地分析 RustFS 的性能特征,识别瓶颈,并进行有针对性的优化。 \ No newline at end of file diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index b43ad74c..4e3c8205 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -107,6 +107,7 @@ url = { workspace = true } urlencoding = { workspace = true } uuid = { workspace = true } zip = { workspace = true } +pprof.workspace = true [target.'cfg(any(target_os = "macos", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))'.dependencies] sysctl = { workspace = true } diff --git a/rustfs/src/admin/handlers.rs b/rustfs/src/admin/handlers.rs index 0ac9cf3e..2898aec6 100644 --- a/rustfs/src/admin/handlers.rs +++ b/rustfs/src/admin/handlers.rs @@ -81,6 +81,7 @@ pub mod sts; pub mod tier; pub mod trace; pub mod user; +use pprof::protos::Message; use urlencoding::decode; #[allow(dead_code)] @@ -1233,6 +1234,149 @@ async fn count_bucket_objects( } } +pub struct ProfileHandler {} +#[async_trait::async_trait] +impl Operation for ProfileHandler { + async fn call(&self, req: S3Request, _params: Params<'_, '_>) -> S3Result> { + use crate::profiling; + + if !profiling::is_profiler_enabled() { + return Ok(S3Response::new(( + StatusCode::SERVICE_UNAVAILABLE, + Body::from("Profiler not enabled. Set RUSTFS_ENABLE_PROFILING=true to enable profiling".to_string()), + ))); + } + + let queries = extract_query_params(&req.uri); + let seconds = queries.get("seconds").and_then(|s| s.parse::().ok()).unwrap_or(30); + let format = queries.get("format").cloned().unwrap_or_else(|| "protobuf".to_string()); + + if seconds > 300 { + return Ok(S3Response::new(( + StatusCode::BAD_REQUEST, + Body::from("Profile duration cannot exceed 300 seconds".to_string()), + ))); + } + + let guard = match profiling::get_profiler_guard() { + Some(guard) => guard, + None => { + return Ok(S3Response::new(( + StatusCode::SERVICE_UNAVAILABLE, + Body::from("Profiler not initialized".to_string()), + ))); + } + }; + + info!("Starting CPU profile collection for {} seconds", seconds); + + tokio::time::sleep(std::time::Duration::from_secs(seconds)).await; + + let guard_lock = match guard.lock() { + Ok(guard) => guard, + Err(_) => { + error!("Failed to acquire profiler guard lock"); + return Ok(S3Response::new(( + StatusCode::INTERNAL_SERVER_ERROR, + Body::from("Failed to acquire profiler lock".to_string()), + ))); + } + }; + + let report = match guard_lock.report().build() { + Ok(report) => report, + Err(e) => { + error!("Failed to build profiler report: {}", e); + return Ok(S3Response::new(( + StatusCode::INTERNAL_SERVER_ERROR, + Body::from(format!("Failed to build profile report: {}", e)), + ))); + } + }; + + info!("CPU profile collection completed"); + + match format.as_str() { + "protobuf" | "pb" => { + let profile = report.pprof().unwrap(); + let mut body = Vec::new(); + if let Err(e) = profile.write_to_vec(&mut body) { + error!("Failed to serialize protobuf profile: {}", e); + return Ok(S3Response::new(( + StatusCode::INTERNAL_SERVER_ERROR, + Body::from("Failed to serialize profile".to_string()), + ))); + } + + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, "application/octet-stream".parse().unwrap()); + Ok(S3Response::with_headers((StatusCode::OK, Body::from(body)), headers)) + } + "flamegraph" | "svg" => { + let mut flamegraph_buf = Vec::new(); + match report.flamegraph(&mut flamegraph_buf) { + Ok(()) => (), + Err(e) => { + error!("Failed to generate flamegraph: {}", e); + return Ok(S3Response::new(( + StatusCode::INTERNAL_SERVER_ERROR, + Body::from(format!("Failed to generate flamegraph: {}", e)), + ))); + } + }; + + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, "image/svg+xml".parse().unwrap()); + Ok(S3Response::with_headers((StatusCode::OK, Body::from(flamegraph_buf)), headers)) + } + _ => Ok(S3Response::new(( + StatusCode::BAD_REQUEST, + Body::from("Unsupported format. Use 'protobuf' or 'flamegraph'".to_string()), + ))), + } + } +} + +pub struct ProfileStatusHandler {} +#[async_trait::async_trait] +impl Operation for ProfileStatusHandler { + async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + use crate::profiling; + use std::collections::HashMap; + + let status = if profiling::is_profiler_enabled() { + HashMap::from([ + ("enabled", "true"), + ("status", "running"), + ("supported_formats", "protobuf, flamegraph"), + ("max_duration_seconds", "300"), + ("endpoint", "/rustfs/admin/debug/pprof/profile"), + ]) + } else { + HashMap::from([ + ("enabled", "false"), + ("status", "disabled"), + ("message", "Set RUSTFS_ENABLE_PROFILING=true to enable profiling"), + ]) + }; + + match serde_json::to_string(&status) { + Ok(json) => { + let mut headers = HeaderMap::new(); + headers.insert(CONTENT_TYPE, "application/json".parse().unwrap()); + Ok(S3Response::with_headers((StatusCode::OK, Body::from(json)), headers)) + } + Err(e) => { + error!("Failed to serialize status: {}", e); + Ok(S3Response::new(( + StatusCode::INTERNAL_SERVER_ERROR, + Body::from("Failed to serialize status".to_string()), + ))) + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index 9ce6a2f3..da316dad 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -214,6 +214,18 @@ pub fn make_admin_route(console_enabled: bool) -> std::io::Result AdminOperation(&RemoveRemoteTargetHandler {}), )?; + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/debug/pprof/profile").as_str(), + AdminOperation(&handlers::ProfileHandler {}), + )?; + + r.insert( + Method::GET, + format!("{}{}", ADMIN_PREFIX, "/debug/pprof/status").as_str(), + AdminOperation(&handlers::ProfileStatusHandler {}), + )?; + Ok(r) } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 7b63c803..ecf0f3cb 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -18,6 +18,7 @@ mod config; mod error; // mod grpc; pub mod license; +mod profiling; mod server; mod storage; mod update; @@ -94,6 +95,9 @@ async fn main() -> Result<()> { // Store in global storage set_global_guard(guard).map_err(Error::other)?; + // Initialize performance profiling if enabled + profiling::start_profiling_if_enabled(); + // Run parameters run(opt).await } @@ -102,10 +106,12 @@ async fn main() -> Result<()> { async fn run(opt: config::Opt) -> Result<()> { debug!("opt: {:?}", &opt); - // Initialize global DNS resolver early for enhanced DNS resolution - if let Err(e) = init_global_dns_resolver().await { - warn!("Failed to initialize global DNS resolver: {}. Using standard DNS resolution.", e); - } + // Initialize global DNS resolver early for enhanced DNS resolution (concurrent) + let dns_init = tokio::spawn(async { + if let Err(e) = init_global_dns_resolver().await { + warn!("Failed to initialize global DNS resolver: {}. Using standard DNS resolution.", e); + } + }); if let Some(region) = &opt.region { rustfs_ecstore::global::set_global_region(region.clone()); @@ -176,6 +182,9 @@ async fn run(opt: config::Opt) -> Result<()> { // Initialize event notifier init_event_notifier().await; + // Wait for DNS initialization to complete before network-heavy operations + dns_init.await.map_err(Error::other)?; + let buckets_list = store .list_bucket(&BucketOptions { no_metadata: true, @@ -187,14 +196,31 @@ async fn run(opt: config::Opt) -> Result<()> { // Collect bucket names into a vector let buckets: Vec = buckets_list.into_iter().map(|v| v.name).collect(); - // Initialize the bucket metadata system - init_bucket_metadata_sys(store.clone(), buckets.clone()).await; + // Parallelize initialization tasks for better network performance + let bucket_metadata_task = tokio::spawn({ + let store = store.clone(); + let buckets = buckets.clone(); + async move { + init_bucket_metadata_sys(store, buckets).await; + } + }); - // Initialize the IAM system - init_iam_sys(store.clone()).await?; + let iam_init_task = tokio::spawn({ + let store = store.clone(); + async move { init_iam_sys(store).await } + }); - // add bucket notification configuration - add_bucket_notification_configuration(buckets).await; + let notification_config_task = tokio::spawn({ + let buckets = buckets.clone(); + async move { + add_bucket_notification_configuration(buckets).await; + } + }); + + // Wait for all parallel initialization tasks to complete + bucket_metadata_task.await.map_err(Error::other)?; + iam_init_task.await.map_err(Error::other)??; + notification_config_task.await.map_err(Error::other)?; // Initialize the global notification system new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| { @@ -239,12 +265,13 @@ async fn run(opt: config::Opt) -> Result<()> { // initialize bucket replication pool init_bucket_replication_pool().await; - // Async update check (optional) + // Async update check with timeout (optional) tokio::spawn(async { use crate::update::{UpdateCheckError, check_updates}; - match check_updates().await { - Ok(result) => { + // Add timeout to prevent hanging network calls + match tokio::time::timeout(std::time::Duration::from_secs(30), check_updates()).await { + Ok(Ok(result)) => { if result.update_available { if let Some(latest) = &result.latest_version { info!( @@ -262,12 +289,15 @@ async fn run(opt: config::Opt) -> Result<()> { debug!("✅ Version check: Current version is up to date: {}", result.current_version); } } - Err(UpdateCheckError::HttpError(e)) => { + Ok(Err(UpdateCheckError::HttpError(e))) => { debug!("Version check: network error (this is normal): {}", e); } - Err(e) => { + Ok(Err(e)) => { debug!("Version check: failed (this is normal): {}", e); } + Err(_) => { + debug!("Version check: timeout after 30 seconds (this is normal)"); + } } }); @@ -364,14 +394,12 @@ async fn init_event_notifier() { info!("Event notifier configuration found, proceeding with initialization."); // 3. Initialize the notification system asynchronously with a global configuration - // Put it into a separate task to avoid blocking the main initialization process - tokio::spawn(async move { - if let Err(e) = rustfs_notify::initialize(server_config).await { - error!("Failed to initialize event notifier system: {}", e); - } else { - info!("Event notifier system initialized successfully."); - } - }); + // Use direct await for better error handling and faster initialization + if let Err(e) = rustfs_notify::initialize(server_config).await { + error!("Failed to initialize event notifier system: {}", e); + } else { + info!("Event notifier system initialized successfully."); + } } #[instrument(skip_all)] diff --git a/rustfs/src/profiling.rs b/rustfs/src/profiling.rs new file mode 100644 index 00000000..9990bcc5 --- /dev/null +++ b/rustfs/src/profiling.rs @@ -0,0 +1,63 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pprof::ProfilerGuard; +use std::sync::{Arc, Mutex, OnceLock}; +use tracing::info; + +static PROFILER_GUARD: OnceLock>>> = OnceLock::new(); + +pub fn init_profiler() -> Result<(), Box> { + let guard = pprof::ProfilerGuardBuilder::default() + .frequency(1000) + .blocklist(&["libc", "libgcc", "pthread", "vdso"]) + .build() + .map_err(|e| format!("Failed to build profiler guard: {}", e))?; + + PROFILER_GUARD + .set(Arc::new(Mutex::new(guard))) + .map_err(|_| "Failed to set profiler guard (already initialized)")?; + + info!("Performance profiler initialized"); + Ok(()) +} + +pub fn is_profiler_enabled() -> bool { + PROFILER_GUARD.get().is_some() +} + +pub fn get_profiler_guard() -> Option>>> { + PROFILER_GUARD.get().cloned() +} + +pub fn start_profiling_if_enabled() { + let enable_profiling = std::env::var("RUSTFS_ENABLE_PROFILING") + .unwrap_or_else(|_| "false".to_string()) + .parse::() + .unwrap_or(false); + + if enable_profiling { + match init_profiler() { + Ok(()) => { + info!("Performance profiling enabled via RUSTFS_ENABLE_PROFILING environment variable"); + } + Err(e) => { + tracing::error!("Failed to initialize profiler: {}", e); + info!("Performance profiling disabled due to initialization error"); + } + } + } else { + info!("Performance profiling disabled. Set RUSTFS_ENABLE_PROFILING=true to enable"); + } +} diff --git a/rustfs/src/server/http.rs b/rustfs/src/server/http.rs index 71bc37e0..b4edc2ed 100644 --- a/rustfs/src/server/http.rs +++ b/rustfs/src/server/http.rs @@ -157,6 +157,8 @@ pub async fn start_http_server( b.build() }; + // Server will be created per connection - this ensures isolation + tokio::spawn(async move { // Record the PID-related metrics of the current process let meter = opentelemetry::global::meter("system");