diff --git a/crates/rio/src/http_reader.rs b/crates/rio/src/http_reader.rs index af6a01a5..a2b8d33a 100644 --- a/crates/rio/src/http_reader.rs +++ b/crates/rio/src/http_reader.rs @@ -32,7 +32,16 @@ use crate::{EtagResolvable, HashReaderDetector, HashReaderMut}; fn get_http_client() -> Client { // Reuse the HTTP connection pool in the global `reqwest::Client` instance // TODO: interact with load balancing? - static CLIENT: LazyLock = LazyLock::new(Client::new); + static CLIENT: LazyLock = LazyLock::new(|| { + Client::builder() + .connect_timeout(std::time::Duration::from_secs(5)) + .tcp_keepalive(std::time::Duration::from_secs(10)) + .http2_keep_alive_interval(std::time::Duration::from_secs(5)) + .http2_keep_alive_timeout(std::time::Duration::from_secs(3)) + .http2_keep_alive_while_idle(true) + .build() + .expect("Failed to create global HTTP client") + }); CLIENT.clone() } diff --git a/docs/cluster_recovery.md b/docs/cluster_recovery.md index 6c339af6..21c6b500 100644 --- a/docs/cluster_recovery.md +++ b/docs/cluster_recovery.md @@ -25,6 +25,21 @@ To resolve this, we needed to transform the passive failure detection (waiting f ## 3. Implemented Solution We modified the internal gRPC client configuration in `crates/protos/src/lib.rs` to implement a multi-layered health check strategy. +### Solution Overview +The fix implements a multi-layered detection strategy covering both Control Plane (RPC) and Data Plane (Streaming): + +1. **Control Plane (gRPC)**: + * Enabled `http2_keep_alive_interval` (5s) and `keep_alive_timeout` (3s) in `tonic` clients. + * Enforced `tcp_keepalive` (10s) on underlying transport. + * Context: Ensures cluster metadata operations (raft, status checks) fail fast if a node dies. + +2. **Data Plane (File Uploads/Downloads)**: + * **Client (Rio)**: Updated `reqwest` client builder in `crates/rio` to enable TCP Keepalive (10s) and HTTP/2 Keepalive (5s). This prevents hangs during large file streaming (e.g., 1GB uploads). + * **Server**: Enabled `SO_KEEPALIVE` on all incoming TCP connections in `rustfs/src/server/http.rs` to forcefully close sockets from dead clients. + +3. **Cross-Platform Build Stability**: + * Guarded Linux-specific profiling code (`jemalloc_pprof`) with `#[cfg(target_os = "linux")]` to fix build failures on macOS/AArch64. + ### Configuration Changes ```rust diff --git a/rustfs/src/profiling.rs b/rustfs/src/profiling.rs index 1b219aab..11f69c10 100644 --- a/rustfs/src/profiling.rs +++ b/rustfs/src/profiling.rs @@ -12,272 +12,291 @@ // See the License for the specific language governing permissions and // limitations under the License. -use chrono::Utc; -use jemalloc_pprof::PROF_CTL; -use pprof::protos::Message; -use rustfs_config::{ - DEFAULT_CPU_DURATION_SECS, DEFAULT_CPU_FREQ, DEFAULT_CPU_INTERVAL_SECS, DEFAULT_CPU_MODE, DEFAULT_ENABLE_PROFILING, - DEFAULT_MEM_INTERVAL_SECS, DEFAULT_MEM_PERIODIC, DEFAULT_OUTPUT_DIR, ENV_CPU_DURATION_SECS, ENV_CPU_FREQ, - ENV_CPU_INTERVAL_SECS, ENV_CPU_MODE, ENV_ENABLE_PROFILING, ENV_MEM_INTERVAL_SECS, ENV_MEM_PERIODIC, ENV_OUTPUT_DIR, -}; -use rustfs_utils::{get_env_bool, get_env_str, get_env_u64, get_env_usize}; -use std::fs::{File, create_dir_all}; -use std::io::Write; -use std::path::{Path, PathBuf}; -use std::sync::{Arc, OnceLock}; -use std::time::Duration; -use tokio::sync::Mutex; -use tokio::time::sleep; -use tracing::{debug, error, info, warn}; +#[cfg(not(target_os = "linux"))] +pub async fn init_from_env() {} -static CPU_CONT_GUARD: OnceLock>>>> = OnceLock::new(); - -/// CPU profiling mode -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -enum CpuMode { - Off, - Continuous, - Periodic, +#[cfg(not(target_os = "linux"))] +pub async fn dump_cpu_pprof_for(_duration: std::time::Duration) -> Result { + Err("CPU profiling is only supported on Linux".to_string()) } -/// Get or create output directory -fn output_dir() -> PathBuf { - let dir = get_env_str(ENV_OUTPUT_DIR, DEFAULT_OUTPUT_DIR); - let p = PathBuf::from(dir); - if let Err(e) = create_dir_all(&p) { - warn!("profiling: create output dir {} failed: {}, fallback to current dir", p.display(), e); - return PathBuf::from("."); +#[cfg(not(target_os = "linux"))] +pub async fn dump_memory_pprof_now() -> Result { + Err("Memory profiling is only supported on Linux".to_string()) +} + +#[cfg(target_os = "linux")] +mod linux_impl { + use chrono::Utc; + use jemalloc_pprof::PROF_CTL; + use pprof::protos::Message; + use rustfs_config::{ + DEFAULT_CPU_DURATION_SECS, DEFAULT_CPU_FREQ, DEFAULT_CPU_INTERVAL_SECS, DEFAULT_CPU_MODE, DEFAULT_ENABLE_PROFILING, + DEFAULT_MEM_INTERVAL_SECS, DEFAULT_MEM_PERIODIC, DEFAULT_OUTPUT_DIR, ENV_CPU_DURATION_SECS, ENV_CPU_FREQ, + ENV_CPU_INTERVAL_SECS, ENV_CPU_MODE, ENV_ENABLE_PROFILING, ENV_MEM_INTERVAL_SECS, ENV_MEM_PERIODIC, ENV_OUTPUT_DIR, + }; + use rustfs_utils::{get_env_bool, get_env_str, get_env_u64, get_env_usize}; + use std::fs::{File, create_dir_all}; + use std::io::Write; + use std::path::{Path, PathBuf}; + use std::sync::{Arc, OnceLock}; + use std::time::Duration; + use tokio::sync::Mutex; + use tokio::time::sleep; + use tracing::{debug, error, info, warn}; + + static CPU_CONT_GUARD: OnceLock>>>> = OnceLock::new(); + + /// CPU profiling mode + #[derive(Clone, Copy, Debug, Eq, PartialEq)] + enum CpuMode { + Off, + Continuous, + Periodic, } - p -} -/// Read CPU profiling mode from env -fn read_cpu_mode() -> CpuMode { - match get_env_str(ENV_CPU_MODE, DEFAULT_CPU_MODE).to_lowercase().as_str() { - "continuous" => CpuMode::Continuous, - "periodic" => CpuMode::Periodic, - _ => CpuMode::Off, + /// Get or create output directory + fn output_dir() -> PathBuf { + let dir = get_env_str(ENV_OUTPUT_DIR, DEFAULT_OUTPUT_DIR); + let p = PathBuf::from(dir); + if let Err(e) = create_dir_all(&p) { + warn!("profiling: create output dir {} failed: {}, fallback to current dir", p.display(), e); + return PathBuf::from("."); + } + p } -} -/// Generate timestamp string for filenames -fn ts() -> String { - Utc::now().format("%Y%m%dT%H%M%S").to_string() -} - -/// Write pprof report to file in protobuf format -fn write_pprof_report_pb(report: &pprof::Report, path: &Path) -> Result<(), String> { - let profile = report.pprof().map_err(|e| format!("pprof() failed: {e}"))?; - let mut buf = Vec::with_capacity(512 * 1024); - profile.write_to_vec(&mut buf).map_err(|e| format!("encode failed: {e}"))?; - let mut f = File::create(path).map_err(|e| format!("create file failed: {e}"))?; - f.write_all(&buf).map_err(|e| format!("write file failed: {e}"))?; - Ok(()) -} - -/// Internal: dump CPU pprof from existing guard -async fn dump_cpu_with_guard(guard: &pprof::ProfilerGuard<'_>) -> Result { - let report = guard.report().build().map_err(|e| format!("build report failed: {e}"))?; - let out = output_dir().join(format!("cpu_profile_{}.pb", ts())); - write_pprof_report_pb(&report, &out)?; - info!("CPU profile exported: {}", out.display()); - Ok(out) -} - -// Public API: dump CPU for a duration; if continuous guard exists, snapshot immediately. -pub async fn dump_cpu_pprof_for(duration: Duration) -> Result { - if let Some(cell) = CPU_CONT_GUARD.get() { - let guard_slot = cell.lock().await; - if let Some(ref guard) = *guard_slot { - debug!("profiling: using continuous profiler guard for CPU dump"); - return dump_cpu_with_guard(guard).await; + /// Read CPU profiling mode from env + fn read_cpu_mode() -> CpuMode { + match get_env_str(ENV_CPU_MODE, DEFAULT_CPU_MODE).to_lowercase().as_str() { + "continuous" => CpuMode::Continuous, + "periodic" => CpuMode::Periodic, + _ => CpuMode::Off, } } - let freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32; - let guard = pprof::ProfilerGuard::new(freq).map_err(|e| format!("create profiler failed: {e}"))?; - sleep(duration).await; - - dump_cpu_with_guard(&guard).await -} - -// Public API: dump memory pprof now (jemalloc) -pub async fn dump_memory_pprof_now() -> Result { - let out = output_dir().join(format!("mem_profile_{}.pb", ts())); - let mut f = File::create(&out).map_err(|e| format!("create file failed: {e}"))?; - - let prof_ctl_cell = PROF_CTL - .as_ref() - .ok_or_else(|| "jemalloc profiling control not available".to_string())?; - let mut prof_ctl = prof_ctl_cell.lock().await; - - if !prof_ctl.activated() { - return Err("jemalloc profiling is not active".to_string()); + /// Generate timestamp string for filenames + fn ts() -> String { + Utc::now().format("%Y%m%dT%H%M%S").to_string() } - let bytes = prof_ctl.dump_pprof().map_err(|e| format!("dump pprof failed: {e}"))?; - f.write_all(&bytes).map_err(|e| format!("write file failed: {e}"))?; - info!("Memory profile exported: {}", out.display()); - Ok(out) -} - -// Jemalloc status check (No forced placement, only status observation) -pub async fn check_jemalloc_profiling() { - use tikv_jemalloc_ctl::{config, epoch, stats}; - - if let Err(e) = epoch::advance() { - warn!("jemalloc epoch advance failed: {e}"); + /// Write pprof report to file in protobuf format + fn write_pprof_report_pb(report: &pprof::Report, path: &Path) -> Result<(), String> { + let profile = report.pprof().map_err(|e| format!("pprof() failed: {e}"))?; + let mut buf = Vec::with_capacity(512 * 1024); + profile.write_to_vec(&mut buf).map_err(|e| format!("encode failed: {e}"))?; + let mut f = File::create(path).map_err(|e| format!("create file failed: {e}"))?; + f.write_all(&buf).map_err(|e| format!("write file failed: {e}"))?; + Ok(()) } - match config::malloc_conf::read() { - Ok(conf) => debug!("jemalloc malloc_conf: {}", conf), - Err(e) => debug!("jemalloc read malloc_conf failed: {e}"), + /// Internal: dump CPU pprof from existing guard + async fn dump_cpu_with_guard(guard: &pprof::ProfilerGuard<'_>) -> Result { + let report = guard.report().build().map_err(|e| format!("build report failed: {e}"))?; + let out = output_dir().join(format!("cpu_profile_{}.pb", ts())); + write_pprof_report_pb(&report, &out)?; + info!("CPU profile exported: {}", out.display()); + Ok(out) } - match std::env::var("MALLOC_CONF") { - Ok(v) => debug!("MALLOC_CONF={}", v), - Err(_) => debug!("MALLOC_CONF is not set"), - } - - if let Some(lock) = PROF_CTL.as_ref() { - let ctl = lock.lock().await; - info!(activated = ctl.activated(), "jemalloc profiling status"); - } else { - info!("jemalloc profiling controller is NOT available"); - } - - let _ = epoch::advance(); - macro_rules! show { - ($name:literal, $reader:expr) => { - match $reader { - Ok(v) => debug!(concat!($name, "={}"), v), - Err(e) => debug!(concat!($name, " read failed: {}"), e), + // Public API: dump CPU for a duration; if continuous guard exists, snapshot immediately. + pub async fn dump_cpu_pprof_for(duration: Duration) -> Result { + if let Some(cell) = CPU_CONT_GUARD.get() { + let guard_slot = cell.lock().await; + if let Some(ref guard) = *guard_slot { + debug!("profiling: using continuous profiler guard for CPU dump"); + return dump_cpu_with_guard(guard).await; } - }; - } - show!("allocated", stats::allocated::read()); - show!("resident", stats::resident::read()); - show!("mapped", stats::mapped::read()); - show!("metadata", stats::metadata::read()); - show!("active", stats::active::read()); -} - -// Internal: start continuous CPU profiling -async fn start_cpu_continuous(freq_hz: i32) { - let cell = CPU_CONT_GUARD.get_or_init(|| Arc::new(Mutex::new(None))).clone(); - let mut slot = cell.lock().await; - if slot.is_some() { - warn!("profiling: continuous CPU guard already running"); - return; - } - match pprof::ProfilerGuardBuilder::default() - .frequency(freq_hz) - .blocklist(&["libc", "libgcc", "pthread", "vdso"]) - .build() - { - Ok(guard) => { - *slot = Some(guard); - info!(freq = freq_hz, "start continuous CPU profiling"); } - Err(e) => warn!("start continuous CPU profiling failed: {e}"), - } -} -// Internal: start periodic CPU sampling loop -async fn start_cpu_periodic(freq_hz: i32, interval: Duration, duration: Duration) { - info!(freq = freq_hz, ?interval, ?duration, "start periodic CPU profiling"); - tokio::spawn(async move { - loop { - sleep(interval).await; - let guard = match pprof::ProfilerGuard::new(freq_hz) { - Ok(g) => g, - Err(e) => { - warn!("periodic CPU profiler create failed: {e}"); - continue; + let freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32; + let guard = pprof::ProfilerGuard::new(freq).map_err(|e| format!("create profiler failed: {e}"))?; + sleep(duration).await; + + dump_cpu_with_guard(&guard).await + } + + // Public API: dump memory pprof now (jemalloc) + pub async fn dump_memory_pprof_now() -> Result { + let out = output_dir().join(format!("mem_profile_{}.pb", ts())); + let mut f = File::create(&out).map_err(|e| format!("create file failed: {e}"))?; + + let prof_ctl_cell = PROF_CTL + .as_ref() + .ok_or_else(|| "jemalloc profiling control not available".to_string())?; + let mut prof_ctl = prof_ctl_cell.lock().await; + + if !prof_ctl.activated() { + return Err("jemalloc profiling is not active".to_string()); + } + + let bytes = prof_ctl.dump_pprof().map_err(|e| format!("dump pprof failed: {e}"))?; + f.write_all(&bytes).map_err(|e| format!("write file failed: {e}"))?; + info!("Memory profile exported: {}", out.display()); + Ok(out) + } + + // Jemalloc status check (No forced placement, only status observation) + pub async fn check_jemalloc_profiling() { + use tikv_jemalloc_ctl::{config, epoch, stats}; + + if let Err(e) = epoch::advance() { + warn!("jemalloc epoch advance failed: {e}"); + } + + match config::malloc_conf::read() { + Ok(conf) => debug!("jemalloc malloc_conf: {}", conf), + Err(e) => debug!("jemalloc read malloc_conf failed: {e}"), + } + + match std::env::var("MALLOC_CONF") { + Ok(v) => debug!("MALLOC_CONF={}", v), + Err(_) => debug!("MALLOC_CONF is not set"), + } + + if let Some(lock) = PROF_CTL.as_ref() { + let ctl = lock.lock().await; + info!(activated = ctl.activated(), "jemalloc profiling status"); + } else { + info!("jemalloc profiling controller is NOT available"); + } + + let _ = epoch::advance(); + macro_rules! show { + ($name:literal, $reader:expr) => { + match $reader { + Ok(v) => debug!(concat!($name, "={}"), v), + Err(e) => debug!(concat!($name, " read failed: {}"), e), } }; - sleep(duration).await; - match guard.report().build() { - Ok(report) => { - let out = output_dir().join(format!("cpu_profile_{}.pb", ts())); - if let Err(e) = write_pprof_report_pb(&report, &out) { - warn!("write periodic CPU pprof failed: {e}"); - } else { - info!("periodic CPU profile exported: {}", out.display()); + } + show!("allocated", stats::allocated::read()); + show!("resident", stats::resident::read()); + show!("mapped", stats::mapped::read()); + show!("metadata", stats::metadata::read()); + show!("active", stats::active::read()); + } + + // Internal: start continuous CPU profiling + async fn start_cpu_continuous(freq_hz: i32) { + let cell = CPU_CONT_GUARD.get_or_init(|| Arc::new(Mutex::new(None))).clone(); + let mut slot = cell.lock().await; + if slot.is_some() { + warn!("profiling: continuous CPU guard already running"); + return; + } + match pprof::ProfilerGuardBuilder::default() + .frequency(freq_hz) + .blocklist(&["libc", "libgcc", "pthread", "vdso"]) + .build() + { + Ok(guard) => { + *slot = Some(guard); + info!(freq = freq_hz, "start continuous CPU profiling"); + } + Err(e) => warn!("start continuous CPU profiling failed: {e}"), + } + } + + // Internal: start periodic CPU sampling loop + async fn start_cpu_periodic(freq_hz: i32, interval: Duration, duration: Duration) { + info!(freq = freq_hz, ?interval, ?duration, "start periodic CPU profiling"); + tokio::spawn(async move { + loop { + sleep(interval).await; + let guard = match pprof::ProfilerGuard::new(freq_hz) { + Ok(g) => g, + Err(e) => { + warn!("periodic CPU profiler create failed: {e}"); + continue; } - } - Err(e) => warn!("periodic CPU report build failed: {e}"), - } - } - }); -} - -// Internal: start periodic memory dump when jemalloc profiling is active -async fn start_memory_periodic(interval: Duration) { - info!(?interval, "start periodic memory pprof dump"); - tokio::spawn(async move { - loop { - sleep(interval).await; - - let Some(lock) = PROF_CTL.as_ref() else { - debug!("skip memory dump: PROF_CTL not available"); - continue; - }; - - let mut ctl = lock.lock().await; - if !ctl.activated() { - debug!("skip memory dump: jemalloc profiling not active"); - continue; - } - - let out = output_dir().join(format!("mem_profile_periodic_{}.pb", ts())); - match File::create(&out) { - Err(e) => { - error!("periodic mem dump create file failed: {}", e); - continue; - } - Ok(mut f) => match ctl.dump_pprof() { - Ok(bytes) => { - if let Err(e) = f.write_all(&bytes) { - error!("periodic mem dump write failed: {}", e); + }; + sleep(duration).await; + match guard.report().build() { + Ok(report) => { + let out = output_dir().join(format!("cpu_profile_{}.pb", ts())); + if let Err(e) = write_pprof_report_pb(&report, &out) { + warn!("write periodic CPU pprof failed: {e}"); } else { - info!("periodic memory profile dumped to {}", out.display()); + info!("periodic CPU profile exported: {}", out.display()); } } - Err(e) => error!("periodic mem dump failed: {}", e), - }, + Err(e) => warn!("periodic CPU report build failed: {e}"), + } } + }); + } + + // Internal: start periodic memory dump when jemalloc profiling is active + async fn start_memory_periodic(interval: Duration) { + info!(?interval, "start periodic memory pprof dump"); + tokio::spawn(async move { + loop { + sleep(interval).await; + + let Some(lock) = PROF_CTL.as_ref() else { + debug!("skip memory dump: PROF_CTL not available"); + continue; + }; + + let mut ctl = lock.lock().await; + if !ctl.activated() { + debug!("skip memory dump: jemalloc profiling not active"); + continue; + } + + let out = output_dir().join(format!("mem_profile_periodic_{}.pb", ts())); + match File::create(&out) { + Err(e) => { + error!("periodic mem dump create file failed: {}", e); + continue; + } + Ok(mut f) => match ctl.dump_pprof() { + Ok(bytes) => { + if let Err(e) = f.write_all(&bytes) { + error!("periodic mem dump write failed: {}", e); + } else { + info!("periodic memory profile dumped to {}", out.display()); + } + } + Err(e) => error!("periodic mem dump failed: {}", e), + }, + } + } + }); + } + + // Public: unified init entry, avoid duplication/conflict + pub async fn init_from_env() { + let enabled = get_env_bool(ENV_ENABLE_PROFILING, DEFAULT_ENABLE_PROFILING); + if !enabled { + debug!("profiling: disabled by env"); + return; } - }); -} -// Public: unified init entry, avoid duplication/conflict -pub async fn init_from_env() { - let enabled = get_env_bool(ENV_ENABLE_PROFILING, DEFAULT_ENABLE_PROFILING); - if !enabled { - debug!("profiling: disabled by env"); - return; - } + // Jemalloc state check once (no dump) + check_jemalloc_profiling().await; - // Jemalloc state check once (no dump) - check_jemalloc_profiling().await; + // CPU + let cpu_mode = read_cpu_mode(); + let cpu_freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32; + let cpu_interval = Duration::from_secs(get_env_u64(ENV_CPU_INTERVAL_SECS, DEFAULT_CPU_INTERVAL_SECS)); + let cpu_duration = Duration::from_secs(get_env_u64(ENV_CPU_DURATION_SECS, DEFAULT_CPU_DURATION_SECS)); - // CPU - let cpu_mode = read_cpu_mode(); - let cpu_freq = get_env_usize(ENV_CPU_FREQ, DEFAULT_CPU_FREQ) as i32; - let cpu_interval = Duration::from_secs(get_env_u64(ENV_CPU_INTERVAL_SECS, DEFAULT_CPU_INTERVAL_SECS)); - let cpu_duration = Duration::from_secs(get_env_u64(ENV_CPU_DURATION_SECS, DEFAULT_CPU_DURATION_SECS)); + match cpu_mode { + CpuMode::Off => debug!("profiling: CPU mode off"), + CpuMode::Continuous => start_cpu_continuous(cpu_freq).await, + CpuMode::Periodic => start_cpu_periodic(cpu_freq, cpu_interval, cpu_duration).await, + } - match cpu_mode { - CpuMode::Off => debug!("profiling: CPU mode off"), - CpuMode::Continuous => start_cpu_continuous(cpu_freq).await, - CpuMode::Periodic => start_cpu_periodic(cpu_freq, cpu_interval, cpu_duration).await, - } - - // Memory - let mem_periodic = get_env_bool(ENV_MEM_PERIODIC, DEFAULT_MEM_PERIODIC); - let mem_interval = Duration::from_secs(get_env_u64(ENV_MEM_INTERVAL_SECS, DEFAULT_MEM_INTERVAL_SECS)); - if mem_periodic { - start_memory_periodic(mem_interval).await; + // Memory + let mem_periodic = get_env_bool(ENV_MEM_PERIODIC, DEFAULT_MEM_PERIODIC); + let mem_interval = Duration::from_secs(get_env_u64(ENV_MEM_INTERVAL_SECS, DEFAULT_MEM_INTERVAL_SECS)); + if mem_periodic { + start_memory_periodic(mem_interval).await; + } } } + +#[cfg(target_os = "linux")] +pub use linux_impl::{dump_cpu_pprof_for, dump_memory_pprof_now, init_from_env}; diff --git a/rustfs/src/server/http.rs b/rustfs/src/server/http.rs index edc5dd52..521c2b06 100644 --- a/rustfs/src/server/http.rs +++ b/rustfs/src/server/http.rs @@ -33,7 +33,7 @@ use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServ use rustfs_utils::net::parse_and_resolve_address; use rustls::ServerConfig; use s3s::{host::MultiDomain, service::S3Service, service::S3ServiceBuilder}; -use socket2::SockRef; +use socket2::{SockRef, TcpKeepalive}; use std::io::{Error, Result}; use std::net::SocketAddr; use std::sync::Arc; @@ -371,6 +371,20 @@ pub async fn start_http_server( }; let socket_ref = SockRef::from(&socket); + + // Enable TCP Keepalive to detect dead clients (e.g. power loss) + // Idle: 10s, Interval: 5s, Retries: 3 + let ka = TcpKeepalive::new() + .with_time(Duration::from_secs(10)) + .with_interval(Duration::from_secs(5)); + + #[cfg(not(any(target_os = "openbsd", target_os = "netbsd")))] + let ka = ka.with_retries(3); + + if let Err(err) = socket_ref.set_tcp_keepalive(&ka) { + warn!(?err, "Failed to set TCP_KEEPALIVE"); + } + if let Err(err) = socket_ref.set_tcp_nodelay(true) { warn!(?err, "Failed to set TCP_NODELAY"); }