feat(server): optimize http transport and socket configuration for S3… (#1537)

Co-authored-by: houseme <housemecn@gmail.com>
This commit is contained in:
heihutu
2026-01-17 02:53:24 +08:00
committed by GitHub
parent 2ab6f8c029
commit 76fa86fdc5
5 changed files with 159 additions and 90 deletions

View File

@@ -159,8 +159,8 @@ jobs:
uses: taiki-e/cache-cargo-install-action@v2
with:
tool: s3s-e2e
git: https://github.com/Nugine/s3s.git
rev: 9e41304ed549b89cfb03ede98e9c0d2ac7522051
git: https://github.com/s3s-project/s3s.git
rev: 4a04a670cf41274d9be9ab65dc36f4aa3f92fbad
- name: Build debug binary
run: |

143
Cargo.lock generated
View File

@@ -710,9 +710,9 @@ dependencies = [
[[package]]
name = "aws-runtime"
version = "1.5.17"
version = "1.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d81b5b2898f6798ad58f484856768bca817e3cd9de0974c24ae0f1113fe88f1b"
checksum = "959dab27ce613e6c9658eb3621064d0e2027e5f2acb65bc526a43577facea557"
dependencies = [
"aws-credential-types",
"aws-sigv4",
@@ -735,9 +735,9 @@ dependencies = [
[[package]]
name = "aws-sdk-s3"
version = "1.119.0"
version = "1.120.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d65fddc3844f902dfe1864acb8494db5f9342015ee3ab7890270d36fbd2e01c"
checksum = "06673901e961f20fa8d7da907da48f7ad6c1b383e3726c22bd418900f015abe1"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -747,6 +747,7 @@ dependencies = [
"aws-smithy-eventstream",
"aws-smithy-http",
"aws-smithy-json",
"aws-smithy-observability",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
"aws-smithy-types",
@@ -769,15 +770,16 @@ dependencies = [
[[package]]
name = "aws-sdk-sso"
version = "1.91.0"
version = "1.92.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee6402a36f27b52fe67661c6732d684b2635152b676aa2babbfb5204f99115d"
checksum = "b7d63bd2bdeeb49aa3f9b00c15e18583503b778b2e792fc06284d54e7d5b6566"
dependencies = [
"aws-credential-types",
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-json",
"aws-smithy-observability",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
"aws-smithy-types",
@@ -791,15 +793,16 @@ dependencies = [
[[package]]
name = "aws-sdk-ssooidc"
version = "1.93.0"
version = "1.94.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a45a7f750bbd170ee3677671ad782d90b894548f4e4ae168302c57ec9de5cb3e"
checksum = "532d93574bf731f311bafb761366f9ece345a0416dbcc273d81d6d1a1205239b"
dependencies = [
"aws-credential-types",
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-json",
"aws-smithy-observability",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
"aws-smithy-types",
@@ -813,15 +816,16 @@ dependencies = [
[[package]]
name = "aws-sdk-sts"
version = "1.95.0"
version = "1.96.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55542378e419558e6b1f398ca70adb0b2088077e79ad9f14eb09441f2f7b2164"
checksum = "357e9a029c7524db6a0099cd77fbd5da165540339e7296cca603531bc783b56c"
dependencies = [
"aws-credential-types",
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-json",
"aws-smithy-observability",
"aws-smithy-query",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
@@ -1631,9 +1635,9 @@ dependencies = [
[[package]]
name = "cmov"
version = "0.4.4"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "360a5d5b750cd7fb97d5ead6e6e0ef0b288d3c2464a189f04f38670e268842ed"
checksum = "b1339d398d44e506d9b72c1af2f6f51a41c9c64f9a0738eb9aedede47ed1f683"
[[package]]
name = "colorchoice"
@@ -1812,6 +1816,17 @@ dependencies = [
"rand 0.9.2",
]
[[package]]
name = "core_affinity"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342"
dependencies = [
"libc",
"num_cpus",
"winapi",
]
[[package]]
name = "cpp_demangle"
version = "0.4.5"
@@ -3597,11 +3612,12 @@ dependencies = [
[[package]]
name = "flexi_logger"
version = "0.31.7"
version = "0.31.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e5335674a3a259527f97e9176a3767dcc9b220b8e29d643daeb2d6c72caf8b"
checksum = "aea7feddba9b4e83022270d49a58d4a1b3fdad04b34f78cf1ce471f698e42672"
dependencies = [
"chrono",
"core_affinity",
"crossbeam-channel",
"crossbeam-queue",
"flate2",
@@ -4198,8 +4214,6 @@ version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash 0.1.5",
]
@@ -5297,11 +5311,11 @@ dependencies = [
[[package]]
name = "lru"
version = "0.12.5"
version = "0.16.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38"
checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593"
dependencies = [
"hashbrown 0.15.5",
"hashbrown 0.16.1",
]
[[package]]
@@ -5641,9 +5655,9 @@ dependencies = [
[[package]]
name = "notify-debouncer-mini"
version = "0.6.0"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a689eb4262184d9a1727f9087cd03883ea716682ab03ed24efec57d7716dccb8"
checksum = "17849edfaabd9a5fef1c606d99cfc615a8e99f7ac4366406d86c7942a3184cf2"
dependencies = [
"log",
"notify",
@@ -7326,9 +7340,9 @@ dependencies = [
[[package]]
name = "rmcp"
version = "0.12.0"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "528d42f8176e6e5e71ea69182b17d1d0a19a6b3b894b564678b74cd7cab13cfa"
checksum = "d1815dbc06c414d720f8bc1951eccd66bc99efc6376331f1e7093a119b3eb508"
dependencies = [
"async-trait",
"base64",
@@ -7348,9 +7362,9 @@ dependencies = [
[[package]]
name = "rmcp-macros"
version = "0.12.0"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3f81daaa494eb8e985c9462f7d6ce1ab05e5299f48aafd76cdd3d8b060e6f59"
checksum = "11f0bc7008fa102e771a76c6d2c9b253be3f2baa5964e060464d038ae1cbc573"
dependencies = [
"darling 0.23.0",
"proc-macro2",
@@ -7558,9 +7572,9 @@ dependencies = [
[[package]]
name = "rustc-demangle"
version = "0.1.26"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace"
checksum = "b50b8869d9fc858ce7266cce0194bd74df58b9d0e3f6df3a9fc8eb470d95c09d"
[[package]]
name = "rustc-hash"
@@ -8390,9 +8404,9 @@ dependencies = [
[[package]]
name = "rustls-pki-types"
version = "1.13.2"
version = "1.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282"
checksum = "4910321ebe4151be888e35fe062169554e74aad01beafed60410131420ceffbc"
dependencies = [
"web-time",
"zeroize",
@@ -8739,11 +8753,11 @@ dependencies = [
[[package]]
name = "serde_spanned"
version = "0.6.9"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3"
checksum = "f8bbf91e5a4d6315eee45e704372590b30e260ee83af6639d64557f51b067776"
dependencies = [
"serde",
"serde_core",
]
[[package]]
@@ -9881,44 +9895,42 @@ dependencies = [
[[package]]
name = "toml"
version = "0.8.23"
version = "0.9.11+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit",
]
[[package]]
name = "toml_datetime"
version = "0.6.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c"
dependencies = [
"serde",
]
[[package]]
name = "toml_edit"
version = "0.22.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a"
checksum = "f3afc9a848309fe1aaffaed6e1546a7a14de1f935dc9d89d32afd9a44bab7c46"
dependencies = [
"indexmap 2.13.0",
"serde",
"serde_core",
"serde_spanned",
"toml_datetime",
"toml_write",
"toml_parser",
"toml_writer",
"winnow",
]
[[package]]
name = "toml_write"
version = "0.1.2"
name = "toml_datetime"
version = "0.7.5+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801"
checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347"
dependencies = [
"serde_core",
]
[[package]]
name = "toml_parser"
version = "1.0.6+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44"
dependencies = [
"winnow",
]
[[package]]
name = "toml_writer"
version = "1.0.6+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607"
[[package]]
name = "tonic"
@@ -10438,9 +10450,9 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
[[package]]
name = "wasip2"
version = "1.0.1+wasi-0.2.4"
version = "1.0.2+wasi-0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7"
checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5"
dependencies = [
"wit-bindgen",
]
@@ -10978,15 +10990,12 @@ name = "winnow"
version = "0.7.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a5364e9d77fcdeeaa6062ced926ee3381faa2ee02d3eb83a5c27a8825540829"
dependencies = [
"memchr",
]
[[package]]
name = "wit-bindgen"
version = "0.46.0"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59"
checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5"
[[package]]
name = "wrapcenum-derive"

View File

@@ -132,7 +132,7 @@ flatbuffers = "25.12.19"
form_urlencoded = "1.2.2"
prost = "0.14.3"
quick-xml = "0.39.0"
rmcp = { version = "0.12.0" }
rmcp = { version = "0.13.0" }
rmp = { version = "0.8.15" }
rmp-serde = { version = "1.3.1" }
serde = { version = "1.0.228", features = ["derive"] }
@@ -171,7 +171,7 @@ atoi = "2.0.0"
atomic_enum = "0.3.0"
aws-config = { version = "1.8.12" }
aws-credential-types = { version = "1.2.11" }
aws-sdk-s3 = { version = "1.119.0", default-features = false, features = ["sigv4a", "default-https-client", "rt-tokio"] }
aws-sdk-s3 = { version = "1.120.0", default-features = false, features = ["sigv4a", "default-https-client", "rt-tokio"] }
aws-smithy-types = { version = "1.3.6" }
base64 = "0.22.1"
base64-simd = "0.8.0"
@@ -188,7 +188,7 @@ dunce = "1.0.5"
enumset = "1.1.10"
faster-hex = "0.10.0"
flate2 = "1.1.8"
flexi_logger = { version = "0.31.7", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv", "json"] }
flexi_logger = { version = "0.31.8", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv", "json"] }
glob = "0.3.3"
google-cloud-storage = "1.6.0"
google-cloud-auth = "1.4.0"

View File

@@ -168,7 +168,7 @@ pub const DEFAULT_OBS_LOG_STDOUT_ENABLED: bool = false;
pub const KI_B: usize = 1024;
/// Constant representing 1 Mebibyte (1024 * 1024 bytes)
/// Default value: 1048576
pub const MI_B: usize = 1024 * 1024;
pub const MI_B: usize = 1024 * KI_B;
#[cfg(test)]
mod tests {

View File

@@ -27,7 +27,7 @@ use crate::storage::tonic_service::make_server;
use bytes::Bytes;
use http::{HeaderMap, Method, Request as HttpRequest, Response};
use hyper_util::{
rt::{TokioExecutor, TokioIo},
rt::{TokioExecutor, TokioIo, TokioTimer},
server::conn::auto::Builder as ConnBuilder,
server::graceful::GracefulShutdown,
service::TowerToHyperService,
@@ -84,6 +84,7 @@ pub async fn start_http_server(
};
// If address is IPv6 try to enable dual-stack; on failure, switch to IPv4 socket.
#[cfg(not(target_os = "openbsd"))]
if server_addr.is_ipv6()
&& let Err(e) = socket.set_only_v6(false)
{
@@ -99,6 +100,18 @@ pub async fn start_http_server(
// Set the socket to non-blocking before passing it to Tokio.
socket.set_nonblocking(true)?;
// 1. Disable Nagle algorithm: Critical for 4KB Payload, achieving ultra-low latency
socket.set_tcp_nodelay(true)?;
// 3. Set system-level TCP KeepAlive to protect long connections
// Note: This sets keepalive on the LISTENING socket, which is inherited by accepted sockets on some platforms (e.g. Linux).
// However, we also explicitly set it on accepted sockets in the loop below to be safe and cross-platform.
let keepalive = get_default_tcp_keepalive();
socket.set_tcp_keepalive(&keepalive)?;
// 4. Increase receive buffer to support BDP at GB-level throughput
socket.set_recv_buffer_size(4 * rustfs_config::MI_B)?;
// Attempt bind; if bind fails for IPv6, try IPv4 fallback once more.
if let Err(bind_err) = socket.bind(&server_addr.into()) {
warn!("Failed to bind to {}: {}.", server_addr, bind_err);
@@ -109,6 +122,9 @@ pub async fn start_http_server(
socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?;
socket.set_tcp_nodelay(true)?;
socket.set_tcp_keepalive(&keepalive)?;
socket.set_recv_buffer_size(4 * rustfs_config::MI_B)?;
socket.bind(&server_addr.into())?;
// [FIX] Ensure fallback socket is moved to listening state as well.
socket.listen(backlog)?;
@@ -160,7 +176,7 @@ pub async fn start_http_server(
println!("Console WebUI Start Time: {now_time}");
println!("Console WebUI available at: {protocol}://{local_ip_str}:{server_port}/rustfs/console/index.html");
println!("Console WebUI (localhost): {protocol}://127.0.0.1:{server_port}/rustfs/console/index.html",);
println!("Console WebUI (localhost): {protocol}://127.0.0.1:{server_port}/rustfs/console/index.html");
} else {
info!(target: "rustfs::main::startup","RustFS API: {api_endpoints} {localhost_endpoint}");
println!("RustFS Http API: {api_endpoints} {localhost_endpoint}");
@@ -244,7 +260,34 @@ pub async fn start_http_server(
(sigterm_inner, sigint_inner)
};
let http_server = Arc::new(ConnBuilder::new(TokioExecutor::new()));
// RustFS Transport Layer Configuration Constants - Optimized for S3 Workloads
const H2_INITIAL_STREAM_WINDOW_SIZE: u32 = 1024 * 1024 * 2; // 2MB: Optimize large file throughput
const H2_INITIAL_CONN_WINDOW_SIZE: u32 = 1024 * 1024 * 4; // 4MB: Link-level flow control
const H2_MAX_FRAME_SIZE: u32 = 16384; // 16KB: Reduce framing overhead
let mut conn_builder = ConnBuilder::new(TokioExecutor::new());
// Optimize for HTTP/1.1 (S3 small files/management plane)
conn_builder
.http1()
.timer(TokioTimer::new())
.keep_alive(true)
.header_read_timeout(Duration::from_secs(5))
.max_buf_size(64 * 1024)
.writev(true);
// Optimize for HTTP/2 (AI/Data Lake high concurrency synchronization)
conn_builder
.http2()
.timer(TokioTimer::new())
.initial_stream_window_size(H2_INITIAL_STREAM_WINDOW_SIZE)
.initial_connection_window_size(H2_INITIAL_CONN_WINDOW_SIZE)
.max_frame_size(H2_MAX_FRAME_SIZE)
.max_concurrent_streams(Some(2048))
.keep_alive_interval(Some(Duration::from_secs(20)))
.keep_alive_timeout(Duration::from_secs(10));
let http_server = Arc::new(conn_builder);
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
let graceful = Arc::new(GracefulShutdown::new());
debug!("graceful initiated");
@@ -253,6 +296,9 @@ pub async fn start_http_server(
worker_state_manager.update(ServiceState::Ready);
let tls_acceptor = tls_acceptor.map(Arc::new);
// Initialize keepalive configuration once to avoid recreation in the loop
let keepalive_conf = get_default_tcp_keepalive();
loop {
debug!("Waiting for new connection...");
let (socket, _) = {
@@ -313,29 +359,19 @@ pub async fn start_http_server(
let socket_ref = SockRef::from(&socket);
// Enable TCP Keepalive to detect dead clients (e.g. power loss)
// Idle: 10s, Interval: 5s, Retries: 3
#[cfg(target_os = "openbsd")]
let ka = TcpKeepalive::new().with_time(Duration::from_secs(10));
#[cfg(not(target_os = "openbsd"))]
let ka = TcpKeepalive::new()
.with_time(Duration::from_secs(10))
.with_interval(Duration::from_secs(5))
.with_retries(3);
if let Err(err) = socket_ref.set_tcp_keepalive(&ka) {
if let Err(err) = socket_ref.set_tcp_keepalive(&keepalive_conf) {
warn!(?err, "Failed to set TCP_KEEPALIVE");
}
// 1. Disable Nagle algorithm: Critical for 4KB Payload, achieving ultra-low latency
if let Err(err) = socket_ref.set_tcp_nodelay(true) {
warn!(?err, "Failed to set TCP_NODELAY");
}
#[cfg(not(target_os = "openbsd"))]
// 4. Increase receive buffer to support BDP at GB-level throughput
if let Err(err) = socket_ref.set_recv_buffer_size(4 * rustfs_config::MI_B) {
warn!(?err, "Failed to set set_recv_buffer_size");
}
#[cfg(not(target_os = "openbsd"))]
if let Err(err) = socket_ref.set_send_buffer_size(4 * rustfs_config::MI_B) {
warn!(?err, "Failed to set set_send_buffer_size");
}
@@ -646,6 +682,13 @@ fn process_connection(
/// Handles connection errors by logging them with appropriate severity
fn handle_connection_error(err: &(dyn std::error::Error + 'static)) {
let s = err.to_string();
if s.contains("connection reset") || s.contains("broken pipe") {
warn!("The connection was reset by the peer or broken pipe: {}", s);
// Ignore common non-fatal errors
return;
}
if let Some(hyper_err) = err.downcast_ref::<hyper::Error>() {
if hyper_err.is_incomplete_message() {
warn!("The HTTP connection is closed prematurely and the message is not completed:{}", hyper_err);
@@ -657,6 +700,8 @@ fn handle_connection_error(err: &(dyn std::error::Error + 'static)) {
error!("HTTP user-custom error:{}", hyper_err);
} else if hyper_err.is_canceled() {
warn!("The HTTP connection is canceled:{}", hyper_err);
} else if format!("{:?}", hyper_err).contains("HeaderTimeout") {
warn!("The HTTP connection timed out (HeaderTimeout): {}", hyper_err);
} else {
error!("Unknown hyper error:{:?}", hyper_err);
}
@@ -704,7 +749,7 @@ fn get_listen_backlog() -> i32 {
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
let mut name = [libc::CTL_KERN, libc::KERN_IPC, libc::KIPC_SOMAXCONN];
let mut buf = [0; 1];
let mut buf_len = size_of_val(&buf);
let mut buf_len = std::mem::size_of_val(&buf);
if unsafe {
libc::sysctl(
@@ -729,3 +774,18 @@ fn get_listen_backlog() -> i32 {
const DEFAULT_BACKLOG: i32 = 1024;
DEFAULT_BACKLOG
}
fn get_default_tcp_keepalive() -> TcpKeepalive {
#[cfg(target_os = "openbsd")]
{
TcpKeepalive::new().with_time(Duration::from_secs(60))
}
#[cfg(not(target_os = "openbsd"))]
{
TcpKeepalive::new()
.with_time(Duration::from_secs(60))
.with_interval(Duration::from_secs(5))
.with_retries(3)
}
}