mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
This commit is contained in:
@@ -32,7 +32,16 @@ use crate::{EtagResolvable, HashReaderDetector, HashReaderMut};
|
||||
fn get_http_client() -> Client {
|
||||
// Reuse the HTTP connection pool in the global `reqwest::Client` instance
|
||||
// TODO: interact with load balancing?
|
||||
static CLIENT: LazyLock<Client> = LazyLock::new(Client::new);
|
||||
static CLIENT: LazyLock<Client> = LazyLock::new(|| {
|
||||
Client::builder()
|
||||
.connect_timeout(std::time::Duration::from_secs(5))
|
||||
.tcp_keepalive(std::time::Duration::from_secs(10))
|
||||
.http2_keep_alive_interval(std::time::Duration::from_secs(5))
|
||||
.http2_keep_alive_timeout(std::time::Duration::from_secs(3))
|
||||
.http2_keep_alive_while_idle(true)
|
||||
.build()
|
||||
.expect("Failed to create global HTTP client")
|
||||
});
|
||||
CLIENT.clone()
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,21 @@ To resolve this, we needed to transform the passive failure detection (waiting f
|
||||
## 3. Implemented Solution
|
||||
We modified the internal gRPC client configuration in `crates/protos/src/lib.rs` to implement a multi-layered health check strategy.
|
||||
|
||||
### Solution Overview
|
||||
The fix implements a multi-layered detection strategy covering both Control Plane (RPC) and Data Plane (Streaming):
|
||||
|
||||
1. **Control Plane (gRPC)**:
|
||||
* Enabled `http2_keep_alive_interval` (5s) and `keep_alive_timeout` (3s) in `tonic` clients.
|
||||
* Enforced `tcp_keepalive` (10s) on underlying transport.
|
||||
* Context: Ensures cluster metadata operations (raft, status checks) fail fast if a node dies.
|
||||
|
||||
2. **Data Plane (File Uploads/Downloads)**:
|
||||
* **Client (Rio)**: Updated `reqwest` client builder in `crates/rio` to enable TCP Keepalive (10s) and HTTP/2 Keepalive (5s). This prevents hangs during large file streaming (e.g., 1GB uploads).
|
||||
* **Server**: Enabled `SO_KEEPALIVE` on all incoming TCP connections in `rustfs/src/server/http.rs` to forcefully close sockets from dead clients.
|
||||
|
||||
3. **Cross-Platform Build Stability**:
|
||||
* Guarded Linux-specific profiling code (`jemalloc_pprof`) with `#[cfg(target_os = "linux")]` to fix build failures on macOS/AArch64.
|
||||
|
||||
### Configuration Changes
|
||||
|
||||
```rust
|
||||
|
||||
@@ -12,272 +12,291 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use chrono::Utc;
|
||||
use jemalloc_pprof::PROF_CTL;
|
||||
use pprof::protos::Message;
|
||||
use rustfs_config::{
|
||||
DEFAULT_CPU_DURATION_SECS, DEFAULT_CPU_FREQ, DEFAULT_CPU_INTERVAL_SECS, DEFAULT_CPU_MODE, DEFAULT_ENABLE_PROFILING,
|
||||
DEFAULT_MEM_INTERVAL_SECS, DEFAULT_MEM_PERIODIC, DEFAULT_OUTPUT_DIR, ENV_CPU_DURATION_SECS, ENV_CPU_FREQ,
|
||||
ENV_CPU_INTERVAL_SECS, ENV_CPU_MODE, ENV_ENABLE_PROFILING, ENV_MEM_INTERVAL_SECS, ENV_MEM_PERIODIC, ENV_OUTPUT_DIR,
|
||||
};
|
||||
use rustfs_utils::{get_env_bool, get_env_str, get_env_u64, get_env_usize};
|
||||
use std::fs::{File, create_dir_all};
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{debug, error, info, warn};
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
pub async fn init_from_env() {}
|
||||
|
||||
static CPU_CONT_GUARD: OnceLock<Arc<Mutex<Option<pprof::ProfilerGuard<'static>>>>> = OnceLock::new();
|
||||
|
||||
/// CPU profiling mode
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum CpuMode {
|
||||
Off,
|
||||
Continuous,
|
||||
Periodic,
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
pub async fn dump_cpu_pprof_for(_duration: std::time::Duration) -> Result<std::path::PathBuf, String> {
|
||||
Err("CPU profiling is only supported on Linux".to_string())
|
||||
}
|
||||
|
||||
/// Get or create output directory
|
||||
fn output_dir() -> PathBuf {
|
||||
let dir = get_env_str(ENV_OUTPUT_DIR, DEFAULT_OUTPUT_DIR);
|
||||
let p = PathBuf::from(dir);
|
||||
if let Err(e) = create_dir_all(&p) {
|
||||
warn!("profiling: create output dir {} failed: {}, fallback to current dir", p.display(), e);
|
||||
return PathBuf::from(".");
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
pub async fn dump_memory_pprof_now() -> Result<std::path::PathBuf, String> {
|
||||
Err("Memory profiling is only supported on Linux".to_string())
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
mod linux_impl {
|
||||
use chrono::Utc;
|
||||
use jemalloc_pprof::PROF_CTL;
|
||||
use pprof::protos::Message;
|
||||
use rustfs_config::{
|
||||
DEFAULT_CPU_DURATION_SECS, DEFAULT_CPU_FREQ, DEFAULT_CPU_INTERVAL_SECS, DEFAULT_CPU_MODE, DEFAULT_ENABLE_PROFILING,
|
||||
DEFAULT_MEM_INTERVAL_SECS, DEFAULT_MEM_PERIODIC, DEFAULT_OUTPUT_DIR, ENV_CPU_DURATION_SECS, ENV_CPU_FREQ,
|
||||
ENV_CPU_INTERVAL_SECS, ENV_CPU_MODE, ENV_ENABLE_PROFILING, ENV_MEM_INTERVAL_SECS, ENV_MEM_PERIODIC, ENV_OUTPUT_DIR,
|
||||
};
|
||||
use rustfs_utils::{get_env_bool, get_env_str, get_env_u64, get_env_usize};
|
||||
use std::fs::{File, create_dir_all};
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
static CPU_CONT_GUARD: OnceLock<Arc<Mutex<Option<pprof::ProfilerGuard<'static>>>>> = OnceLock::new();
|
||||
|
||||
/// CPU profiling mode
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum CpuMode {
|
||||
Off,
|
||||
Continuous,
|
||||
Periodic,
|
||||
}
|
||||
p
|
||||
}
|
||||
|
||||
/// Read CPU profiling mode from env
|
||||
fn read_cpu_mode() -> CpuMode {
|
||||
match get_env_str(ENV_CPU_MODE, DEFAULT_CPU_MODE).to_lowercase().as_str() {
|
||||
"continuous" => CpuMode::Continuous,
|
||||
"periodic" => CpuMode::Periodic,
|
||||
_ => CpuMode::Off,
|
||||
/// Get or create output directory
|
||||
fn output_dir() -> PathBuf {
|
||||
let dir = get_env_str(ENV_OUTPUT_DIR, DEFAULT_OUTPUT_DIR);
|
||||
let p = PathBuf::from(dir);
|
||||
if let Err(e) = create_dir_all(&p) {
|
||||
warn!("profiling: create output dir {} failed: {}, fallback to current dir", p.display(), e);
|
||||
return PathBuf::from(".");
|
||||
}
|
||||
p
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate timestamp string for filenames
|
||||
fn ts() -> String {
|
||||
Utc::now().format("%Y%m%dT%H%M%S").to_string()
|
||||
}
|
||||
|
||||
/// Write pprof report to file in protobuf format
|
||||
fn write_pprof_report_pb(report: &pprof::Report, path: &Path) -> Result<(), String> {
|
||||
let profile = report.pprof().map_err(|e| format!("pprof() failed: {e}"))?;
|
||||
let mut buf = Vec::with_capacity(512 * 1024);
|
||||
profile.write_to_vec(&mut buf).map_err(|e| format!("encode failed: {e}"))?;
|
||||
let mut f = File::create(path).map_err(|e| format!("create file failed: {e}"))?;
|
||||
f.write_all(&buf).map_err(|e| format!("write file failed: {e}"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Internal: dump CPU pprof from existing guard
|
||||
async fn dump_cpu_with_guard(guard: &pprof::ProfilerGuard<'_>) -> Result<PathBuf, String> {
|
||||
let report = guard.report().build().map_err(|e| format!("build report failed: {e}"))?;
|
||||
let out = output_dir().join(format!("cpu_profile_{}.pb", ts()));
|
||||
write_pprof_report_pb(&report, &out)?;
|
||||
info!("CPU profile exported: {}", out.display());
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
// Public API: dump CPU for a duration; if continuous guard exists, snapshot immediately.
|
||||
pub async fn dump_cpu_pprof_for(duration: Duration) -> Result<PathBuf, String> {
|
||||
if let Some(cell) = CPU_CONT_GUARD.get() {
|
||||
let guard_slot = cell.lock().await;
|
||||
if let Some(ref guard) = *guard_slot {
|
||||
debug!("profiling: using continuous profiler guard for CPU dump");
|
||||
return dump_cpu_with_guard(guard).await;
|
||||
/// Read CPU profiling mode from env
|
||||
fn read_cpu_mode() -> CpuMode {
|
||||
match get_env_str(ENV_CPU_MODE, DEFAULT_CPU_MODE).to_lowercase().as_str() {
|
||||
"continuous" => CpuMode::Continuous,
|
||||
"periodic" => CpuMode::Periodic,
|
||||
_ => CpuMode::Off,
|
||||
}
|
||||
}
|
||||
|
||||
let freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32;
|
||||
let guard = pprof::ProfilerGuard::new(freq).map_err(|e| format!("create profiler failed: {e}"))?;
|
||||
sleep(duration).await;
|
||||
|
||||
dump_cpu_with_guard(&guard).await
|
||||
}
|
||||
|
||||
// Public API: dump memory pprof now (jemalloc)
|
||||
pub async fn dump_memory_pprof_now() -> Result<PathBuf, String> {
|
||||
let out = output_dir().join(format!("mem_profile_{}.pb", ts()));
|
||||
let mut f = File::create(&out).map_err(|e| format!("create file failed: {e}"))?;
|
||||
|
||||
let prof_ctl_cell = PROF_CTL
|
||||
.as_ref()
|
||||
.ok_or_else(|| "jemalloc profiling control not available".to_string())?;
|
||||
let mut prof_ctl = prof_ctl_cell.lock().await;
|
||||
|
||||
if !prof_ctl.activated() {
|
||||
return Err("jemalloc profiling is not active".to_string());
|
||||
/// Generate timestamp string for filenames
|
||||
fn ts() -> String {
|
||||
Utc::now().format("%Y%m%dT%H%M%S").to_string()
|
||||
}
|
||||
|
||||
let bytes = prof_ctl.dump_pprof().map_err(|e| format!("dump pprof failed: {e}"))?;
|
||||
f.write_all(&bytes).map_err(|e| format!("write file failed: {e}"))?;
|
||||
info!("Memory profile exported: {}", out.display());
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
// Jemalloc status check (No forced placement, only status observation)
|
||||
pub async fn check_jemalloc_profiling() {
|
||||
use tikv_jemalloc_ctl::{config, epoch, stats};
|
||||
|
||||
if let Err(e) = epoch::advance() {
|
||||
warn!("jemalloc epoch advance failed: {e}");
|
||||
/// Write pprof report to file in protobuf format
|
||||
fn write_pprof_report_pb(report: &pprof::Report, path: &Path) -> Result<(), String> {
|
||||
let profile = report.pprof().map_err(|e| format!("pprof() failed: {e}"))?;
|
||||
let mut buf = Vec::with_capacity(512 * 1024);
|
||||
profile.write_to_vec(&mut buf).map_err(|e| format!("encode failed: {e}"))?;
|
||||
let mut f = File::create(path).map_err(|e| format!("create file failed: {e}"))?;
|
||||
f.write_all(&buf).map_err(|e| format!("write file failed: {e}"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
match config::malloc_conf::read() {
|
||||
Ok(conf) => debug!("jemalloc malloc_conf: {}", conf),
|
||||
Err(e) => debug!("jemalloc read malloc_conf failed: {e}"),
|
||||
/// Internal: dump CPU pprof from existing guard
|
||||
async fn dump_cpu_with_guard(guard: &pprof::ProfilerGuard<'_>) -> Result<PathBuf, String> {
|
||||
let report = guard.report().build().map_err(|e| format!("build report failed: {e}"))?;
|
||||
let out = output_dir().join(format!("cpu_profile_{}.pb", ts()));
|
||||
write_pprof_report_pb(&report, &out)?;
|
||||
info!("CPU profile exported: {}", out.display());
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
match std::env::var("MALLOC_CONF") {
|
||||
Ok(v) => debug!("MALLOC_CONF={}", v),
|
||||
Err(_) => debug!("MALLOC_CONF is not set"),
|
||||
}
|
||||
|
||||
if let Some(lock) = PROF_CTL.as_ref() {
|
||||
let ctl = lock.lock().await;
|
||||
info!(activated = ctl.activated(), "jemalloc profiling status");
|
||||
} else {
|
||||
info!("jemalloc profiling controller is NOT available");
|
||||
}
|
||||
|
||||
let _ = epoch::advance();
|
||||
macro_rules! show {
|
||||
($name:literal, $reader:expr) => {
|
||||
match $reader {
|
||||
Ok(v) => debug!(concat!($name, "={}"), v),
|
||||
Err(e) => debug!(concat!($name, " read failed: {}"), e),
|
||||
// Public API: dump CPU for a duration; if continuous guard exists, snapshot immediately.
|
||||
pub async fn dump_cpu_pprof_for(duration: Duration) -> Result<PathBuf, String> {
|
||||
if let Some(cell) = CPU_CONT_GUARD.get() {
|
||||
let guard_slot = cell.lock().await;
|
||||
if let Some(ref guard) = *guard_slot {
|
||||
debug!("profiling: using continuous profiler guard for CPU dump");
|
||||
return dump_cpu_with_guard(guard).await;
|
||||
}
|
||||
};
|
||||
}
|
||||
show!("allocated", stats::allocated::read());
|
||||
show!("resident", stats::resident::read());
|
||||
show!("mapped", stats::mapped::read());
|
||||
show!("metadata", stats::metadata::read());
|
||||
show!("active", stats::active::read());
|
||||
}
|
||||
|
||||
// Internal: start continuous CPU profiling
|
||||
async fn start_cpu_continuous(freq_hz: i32) {
|
||||
let cell = CPU_CONT_GUARD.get_or_init(|| Arc::new(Mutex::new(None))).clone();
|
||||
let mut slot = cell.lock().await;
|
||||
if slot.is_some() {
|
||||
warn!("profiling: continuous CPU guard already running");
|
||||
return;
|
||||
}
|
||||
match pprof::ProfilerGuardBuilder::default()
|
||||
.frequency(freq_hz)
|
||||
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
|
||||
.build()
|
||||
{
|
||||
Ok(guard) => {
|
||||
*slot = Some(guard);
|
||||
info!(freq = freq_hz, "start continuous CPU profiling");
|
||||
}
|
||||
Err(e) => warn!("start continuous CPU profiling failed: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
// Internal: start periodic CPU sampling loop
|
||||
async fn start_cpu_periodic(freq_hz: i32, interval: Duration, duration: Duration) {
|
||||
info!(freq = freq_hz, ?interval, ?duration, "start periodic CPU profiling");
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
sleep(interval).await;
|
||||
let guard = match pprof::ProfilerGuard::new(freq_hz) {
|
||||
Ok(g) => g,
|
||||
Err(e) => {
|
||||
warn!("periodic CPU profiler create failed: {e}");
|
||||
continue;
|
||||
let freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32;
|
||||
let guard = pprof::ProfilerGuard::new(freq).map_err(|e| format!("create profiler failed: {e}"))?;
|
||||
sleep(duration).await;
|
||||
|
||||
dump_cpu_with_guard(&guard).await
|
||||
}
|
||||
|
||||
// Public API: dump memory pprof now (jemalloc)
|
||||
pub async fn dump_memory_pprof_now() -> Result<PathBuf, String> {
|
||||
let out = output_dir().join(format!("mem_profile_{}.pb", ts()));
|
||||
let mut f = File::create(&out).map_err(|e| format!("create file failed: {e}"))?;
|
||||
|
||||
let prof_ctl_cell = PROF_CTL
|
||||
.as_ref()
|
||||
.ok_or_else(|| "jemalloc profiling control not available".to_string())?;
|
||||
let mut prof_ctl = prof_ctl_cell.lock().await;
|
||||
|
||||
if !prof_ctl.activated() {
|
||||
return Err("jemalloc profiling is not active".to_string());
|
||||
}
|
||||
|
||||
let bytes = prof_ctl.dump_pprof().map_err(|e| format!("dump pprof failed: {e}"))?;
|
||||
f.write_all(&bytes).map_err(|e| format!("write file failed: {e}"))?;
|
||||
info!("Memory profile exported: {}", out.display());
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
// Jemalloc status check (No forced placement, only status observation)
|
||||
pub async fn check_jemalloc_profiling() {
|
||||
use tikv_jemalloc_ctl::{config, epoch, stats};
|
||||
|
||||
if let Err(e) = epoch::advance() {
|
||||
warn!("jemalloc epoch advance failed: {e}");
|
||||
}
|
||||
|
||||
match config::malloc_conf::read() {
|
||||
Ok(conf) => debug!("jemalloc malloc_conf: {}", conf),
|
||||
Err(e) => debug!("jemalloc read malloc_conf failed: {e}"),
|
||||
}
|
||||
|
||||
match std::env::var("MALLOC_CONF") {
|
||||
Ok(v) => debug!("MALLOC_CONF={}", v),
|
||||
Err(_) => debug!("MALLOC_CONF is not set"),
|
||||
}
|
||||
|
||||
if let Some(lock) = PROF_CTL.as_ref() {
|
||||
let ctl = lock.lock().await;
|
||||
info!(activated = ctl.activated(), "jemalloc profiling status");
|
||||
} else {
|
||||
info!("jemalloc profiling controller is NOT available");
|
||||
}
|
||||
|
||||
let _ = epoch::advance();
|
||||
macro_rules! show {
|
||||
($name:literal, $reader:expr) => {
|
||||
match $reader {
|
||||
Ok(v) => debug!(concat!($name, "={}"), v),
|
||||
Err(e) => debug!(concat!($name, " read failed: {}"), e),
|
||||
}
|
||||
};
|
||||
sleep(duration).await;
|
||||
match guard.report().build() {
|
||||
Ok(report) => {
|
||||
let out = output_dir().join(format!("cpu_profile_{}.pb", ts()));
|
||||
if let Err(e) = write_pprof_report_pb(&report, &out) {
|
||||
warn!("write periodic CPU pprof failed: {e}");
|
||||
} else {
|
||||
info!("periodic CPU profile exported: {}", out.display());
|
||||
}
|
||||
show!("allocated", stats::allocated::read());
|
||||
show!("resident", stats::resident::read());
|
||||
show!("mapped", stats::mapped::read());
|
||||
show!("metadata", stats::metadata::read());
|
||||
show!("active", stats::active::read());
|
||||
}
|
||||
|
||||
// Internal: start continuous CPU profiling
|
||||
async fn start_cpu_continuous(freq_hz: i32) {
|
||||
let cell = CPU_CONT_GUARD.get_or_init(|| Arc::new(Mutex::new(None))).clone();
|
||||
let mut slot = cell.lock().await;
|
||||
if slot.is_some() {
|
||||
warn!("profiling: continuous CPU guard already running");
|
||||
return;
|
||||
}
|
||||
match pprof::ProfilerGuardBuilder::default()
|
||||
.frequency(freq_hz)
|
||||
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
|
||||
.build()
|
||||
{
|
||||
Ok(guard) => {
|
||||
*slot = Some(guard);
|
||||
info!(freq = freq_hz, "start continuous CPU profiling");
|
||||
}
|
||||
Err(e) => warn!("start continuous CPU profiling failed: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
// Internal: start periodic CPU sampling loop
|
||||
async fn start_cpu_periodic(freq_hz: i32, interval: Duration, duration: Duration) {
|
||||
info!(freq = freq_hz, ?interval, ?duration, "start periodic CPU profiling");
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
sleep(interval).await;
|
||||
let guard = match pprof::ProfilerGuard::new(freq_hz) {
|
||||
Ok(g) => g,
|
||||
Err(e) => {
|
||||
warn!("periodic CPU profiler create failed: {e}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(e) => warn!("periodic CPU report build failed: {e}"),
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Internal: start periodic memory dump when jemalloc profiling is active
|
||||
async fn start_memory_periodic(interval: Duration) {
|
||||
info!(?interval, "start periodic memory pprof dump");
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
sleep(interval).await;
|
||||
|
||||
let Some(lock) = PROF_CTL.as_ref() else {
|
||||
debug!("skip memory dump: PROF_CTL not available");
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut ctl = lock.lock().await;
|
||||
if !ctl.activated() {
|
||||
debug!("skip memory dump: jemalloc profiling not active");
|
||||
continue;
|
||||
}
|
||||
|
||||
let out = output_dir().join(format!("mem_profile_periodic_{}.pb", ts()));
|
||||
match File::create(&out) {
|
||||
Err(e) => {
|
||||
error!("periodic mem dump create file failed: {}", e);
|
||||
continue;
|
||||
}
|
||||
Ok(mut f) => match ctl.dump_pprof() {
|
||||
Ok(bytes) => {
|
||||
if let Err(e) = f.write_all(&bytes) {
|
||||
error!("periodic mem dump write failed: {}", e);
|
||||
};
|
||||
sleep(duration).await;
|
||||
match guard.report().build() {
|
||||
Ok(report) => {
|
||||
let out = output_dir().join(format!("cpu_profile_{}.pb", ts()));
|
||||
if let Err(e) = write_pprof_report_pb(&report, &out) {
|
||||
warn!("write periodic CPU pprof failed: {e}");
|
||||
} else {
|
||||
info!("periodic memory profile dumped to {}", out.display());
|
||||
info!("periodic CPU profile exported: {}", out.display());
|
||||
}
|
||||
}
|
||||
Err(e) => error!("periodic mem dump failed: {}", e),
|
||||
},
|
||||
Err(e) => warn!("periodic CPU report build failed: {e}"),
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Internal: start periodic memory dump when jemalloc profiling is active
|
||||
async fn start_memory_periodic(interval: Duration) {
|
||||
info!(?interval, "start periodic memory pprof dump");
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
sleep(interval).await;
|
||||
|
||||
let Some(lock) = PROF_CTL.as_ref() else {
|
||||
debug!("skip memory dump: PROF_CTL not available");
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut ctl = lock.lock().await;
|
||||
if !ctl.activated() {
|
||||
debug!("skip memory dump: jemalloc profiling not active");
|
||||
continue;
|
||||
}
|
||||
|
||||
let out = output_dir().join(format!("mem_profile_periodic_{}.pb", ts()));
|
||||
match File::create(&out) {
|
||||
Err(e) => {
|
||||
error!("periodic mem dump create file failed: {}", e);
|
||||
continue;
|
||||
}
|
||||
Ok(mut f) => match ctl.dump_pprof() {
|
||||
Ok(bytes) => {
|
||||
if let Err(e) = f.write_all(&bytes) {
|
||||
error!("periodic mem dump write failed: {}", e);
|
||||
} else {
|
||||
info!("periodic memory profile dumped to {}", out.display());
|
||||
}
|
||||
}
|
||||
Err(e) => error!("periodic mem dump failed: {}", e),
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Public: unified init entry, avoid duplication/conflict
|
||||
pub async fn init_from_env() {
|
||||
let enabled = get_env_bool(ENV_ENABLE_PROFILING, DEFAULT_ENABLE_PROFILING);
|
||||
if !enabled {
|
||||
debug!("profiling: disabled by env");
|
||||
return;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Public: unified init entry, avoid duplication/conflict
|
||||
pub async fn init_from_env() {
|
||||
let enabled = get_env_bool(ENV_ENABLE_PROFILING, DEFAULT_ENABLE_PROFILING);
|
||||
if !enabled {
|
||||
debug!("profiling: disabled by env");
|
||||
return;
|
||||
}
|
||||
// Jemalloc state check once (no dump)
|
||||
check_jemalloc_profiling().await;
|
||||
|
||||
// Jemalloc state check once (no dump)
|
||||
check_jemalloc_profiling().await;
|
||||
// CPU
|
||||
let cpu_mode = read_cpu_mode();
|
||||
let cpu_freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32;
|
||||
let cpu_interval = Duration::from_secs(get_env_u64(ENV_CPU_INTERVAL_SECS, DEFAULT_CPU_INTERVAL_SECS));
|
||||
let cpu_duration = Duration::from_secs(get_env_u64(ENV_CPU_DURATION_SECS, DEFAULT_CPU_DURATION_SECS));
|
||||
|
||||
// CPU
|
||||
let cpu_mode = read_cpu_mode();
|
||||
let cpu_freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32;
|
||||
let cpu_interval = Duration::from_secs(get_env_u64(ENV_CPU_INTERVAL_SECS, DEFAULT_CPU_INTERVAL_SECS));
|
||||
let cpu_duration = Duration::from_secs(get_env_u64(ENV_CPU_DURATION_SECS, DEFAULT_CPU_DURATION_SECS));
|
||||
match cpu_mode {
|
||||
CpuMode::Off => debug!("profiling: CPU mode off"),
|
||||
CpuMode::Continuous => start_cpu_continuous(cpu_freq).await,
|
||||
CpuMode::Periodic => start_cpu_periodic(cpu_freq, cpu_interval, cpu_duration).await,
|
||||
}
|
||||
|
||||
match cpu_mode {
|
||||
CpuMode::Off => debug!("profiling: CPU mode off"),
|
||||
CpuMode::Continuous => start_cpu_continuous(cpu_freq).await,
|
||||
CpuMode::Periodic => start_cpu_periodic(cpu_freq, cpu_interval, cpu_duration).await,
|
||||
}
|
||||
|
||||
// Memory
|
||||
let mem_periodic = get_env_bool(ENV_MEM_PERIODIC, DEFAULT_MEM_PERIODIC);
|
||||
let mem_interval = Duration::from_secs(get_env_u64(ENV_MEM_INTERVAL_SECS, DEFAULT_MEM_INTERVAL_SECS));
|
||||
if mem_periodic {
|
||||
start_memory_periodic(mem_interval).await;
|
||||
// Memory
|
||||
let mem_periodic = get_env_bool(ENV_MEM_PERIODIC, DEFAULT_MEM_PERIODIC);
|
||||
let mem_interval = Duration::from_secs(get_env_u64(ENV_MEM_INTERVAL_SECS, DEFAULT_MEM_INTERVAL_SECS));
|
||||
if mem_periodic {
|
||||
start_memory_periodic(mem_interval).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub use linux_impl::{dump_cpu_pprof_for, dump_memory_pprof_now, init_from_env};
|
||||
|
||||
@@ -33,7 +33,7 @@ use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServ
|
||||
use rustfs_utils::net::parse_and_resolve_address;
|
||||
use rustls::ServerConfig;
|
||||
use s3s::{host::MultiDomain, service::S3Service, service::S3ServiceBuilder};
|
||||
use socket2::SockRef;
|
||||
use socket2::{SockRef, TcpKeepalive};
|
||||
use std::io::{Error, Result};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
@@ -371,6 +371,20 @@ pub async fn start_http_server(
|
||||
};
|
||||
|
||||
let socket_ref = SockRef::from(&socket);
|
||||
|
||||
// Enable TCP Keepalive to detect dead clients (e.g. power loss)
|
||||
// Idle: 10s, Interval: 5s, Retries: 3
|
||||
let ka = TcpKeepalive::new()
|
||||
.with_time(Duration::from_secs(10))
|
||||
.with_interval(Duration::from_secs(5));
|
||||
|
||||
#[cfg(not(any(target_os = "openbsd", target_os = "netbsd")))]
|
||||
let ka = ka.with_retries(3);
|
||||
|
||||
if let Err(err) = socket_ref.set_tcp_keepalive(&ka) {
|
||||
warn!(?err, "Failed to set TCP_KEEPALIVE");
|
||||
}
|
||||
|
||||
if let Err(err) = socket_ref.set_tcp_nodelay(true) {
|
||||
warn!(?err, "Failed to set TCP_NODELAY");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user