Merge branch 'main' into fix/kms_encrypt_vault

This commit is contained in:
loverustfs
2026-01-17 20:08:04 +08:00
committed by GitHub
7 changed files with 96 additions and 33 deletions

View File

@@ -159,8 +159,8 @@ jobs:
uses: taiki-e/cache-cargo-install-action@v2
with:
tool: s3s-e2e
git: https://github.com/Nugine/s3s.git
rev: 9e41304ed549b89cfb03ede98e9c0d2ac7522051
git: https://github.com/s3s-project/s3s.git
rev: 4a04a670cf41274d9be9ab65dc36f4aa3f92fbad
- name: Build debug binary
run: |

8
Cargo.lock generated
View File

@@ -7341,9 +7341,9 @@ dependencies = [
[[package]]
name = "rmcp"
version = "0.12.0"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "528d42f8176e6e5e71ea69182b17d1d0a19a6b3b894b564678b74cd7cab13cfa"
checksum = "d1815dbc06c414d720f8bc1951eccd66bc99efc6376331f1e7093a119b3eb508"
dependencies = [
"async-trait",
"base64",
@@ -7363,9 +7363,9 @@ dependencies = [
[[package]]
name = "rmcp-macros"
version = "0.12.0"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3f81daaa494eb8e985c9462f7d6ce1ab05e5299f48aafd76cdd3d8b060e6f59"
checksum = "11f0bc7008fa102e771a76c6d2c9b253be3f2baa5964e060464d038ae1cbc573"
dependencies = [
"darling 0.23.0",
"proc-macro2",

View File

@@ -132,7 +132,7 @@ flatbuffers = "25.12.19"
form_urlencoded = "1.2.2"
prost = "0.14.3"
quick-xml = "0.39.0"
rmcp = { version = "0.12.0" }
rmcp = { version = "0.13.0" }
rmp = { version = "0.8.15" }
rmp-serde = { version = "1.3.1" }
serde = { version = "1.0.228", features = ["derive"] }
@@ -171,7 +171,7 @@ atoi = "2.0.0"
atomic_enum = "0.3.0"
aws-config = { version = "1.8.12" }
aws-credential-types = { version = "1.2.11" }
aws-sdk-s3 = { version = "1.119.0", default-features = false, features = ["sigv4a", "default-https-client", "rt-tokio"] }
aws-sdk-s3 = { version = "1.120.0", default-features = false, features = ["sigv4a", "default-https-client", "rt-tokio"] }
aws-smithy-types = { version = "1.3.6" }
base64 = "0.22.1"
base64-simd = "0.8.0"
@@ -188,7 +188,7 @@ dunce = "1.0.5"
enumset = "1.1.10"
faster-hex = "0.10.0"
flate2 = "1.1.8"
flexi_logger = { version = "0.31.7", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv", "json"] }
flexi_logger = { version = "0.31.8", features = ["trc", "dont_minimize_extra_stacks", "compress", "kv", "json"] }
glob = "0.3.3"
google-cloud-storage = "1.6.0"
google-cloud-auth = "1.4.0"

View File

@@ -168,7 +168,7 @@ pub const DEFAULT_OBS_LOG_STDOUT_ENABLED: bool = false;
pub const KI_B: usize = 1024;
/// Constant representing 1 Mebibyte (1024 * 1024 bytes)
/// Default value: 1048576
pub const MI_B: usize = 1024 * 1024;
pub const MI_B: usize = 1024 * KI_B;
#[cfg(test)]
mod tests {

View File

@@ -161,6 +161,8 @@ pin_project! {
}
impl HashReader {
/// Used for transformation layers (compression/encryption)
pub const SIZE_PRESERVE_LAYER: i64 = -1;
pub fn new(
mut inner: Box<dyn Reader>,
size: i64,
@@ -169,7 +171,8 @@ impl HashReader {
sha256hex: Option<String>,
diskable_md5: bool,
) -> std::io::Result<Self> {
if size >= 0
// Get the innermost HashReader
if size != Self::SIZE_PRESERVE_LAYER
&& let Some(existing_hash_reader) = inner.as_hash_reader_mut()
{
if existing_hash_reader.bytes_read() > 0 {

View File

@@ -27,7 +27,7 @@ use crate::storage::tonic_service::make_server;
use bytes::Bytes;
use http::{HeaderMap, Method, Request as HttpRequest, Response};
use hyper_util::{
rt::{TokioExecutor, TokioIo},
rt::{TokioExecutor, TokioIo, TokioTimer},
server::conn::auto::Builder as ConnBuilder,
server::graceful::GracefulShutdown,
service::TowerToHyperService,
@@ -84,6 +84,7 @@ pub async fn start_http_server(
};
// If address is IPv6 try to enable dual-stack; on failure, switch to IPv4 socket.
#[cfg(not(target_os = "openbsd"))]
if server_addr.is_ipv6()
&& let Err(e) = socket.set_only_v6(false)
{
@@ -99,6 +100,18 @@ pub async fn start_http_server(
// Set the socket to non-blocking before passing it to Tokio.
socket.set_nonblocking(true)?;
// 1. Disable Nagle algorithm: Critical for 4KB Payload, achieving ultra-low latency
socket.set_tcp_nodelay(true)?;
// 3. Set system-level TCP KeepAlive to protect long connections
// Note: This sets keepalive on the LISTENING socket, which is inherited by accepted sockets on some platforms (e.g. Linux).
// However, we also explicitly set it on accepted sockets in the loop below to be safe and cross-platform.
let keepalive = get_default_tcp_keepalive();
socket.set_tcp_keepalive(&keepalive)?;
// 4. Increase receive buffer to support BDP at GB-level throughput
socket.set_recv_buffer_size(4 * rustfs_config::MI_B)?;
// Attempt bind; if bind fails for IPv6, try IPv4 fallback once more.
if let Err(bind_err) = socket.bind(&server_addr.into()) {
warn!("Failed to bind to {}: {}.", server_addr, bind_err);
@@ -109,6 +122,9 @@ pub async fn start_http_server(
socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?;
socket.set_tcp_nodelay(true)?;
socket.set_tcp_keepalive(&keepalive)?;
socket.set_recv_buffer_size(4 * rustfs_config::MI_B)?;
socket.bind(&server_addr.into())?;
// [FIX] Ensure fallback socket is moved to listening state as well.
socket.listen(backlog)?;
@@ -160,7 +176,7 @@ pub async fn start_http_server(
println!("Console WebUI Start Time: {now_time}");
println!("Console WebUI available at: {protocol}://{local_ip_str}:{server_port}/rustfs/console/index.html");
println!("Console WebUI (localhost): {protocol}://127.0.0.1:{server_port}/rustfs/console/index.html",);
println!("Console WebUI (localhost): {protocol}://127.0.0.1:{server_port}/rustfs/console/index.html");
} else {
info!(target: "rustfs::main::startup","RustFS API: {api_endpoints} {localhost_endpoint}");
println!("RustFS Http API: {api_endpoints} {localhost_endpoint}");
@@ -244,7 +260,34 @@ pub async fn start_http_server(
(sigterm_inner, sigint_inner)
};
let http_server = Arc::new(ConnBuilder::new(TokioExecutor::new()));
// RustFS Transport Layer Configuration Constants - Optimized for S3 Workloads
const H2_INITIAL_STREAM_WINDOW_SIZE: u32 = 1024 * 1024 * 2; // 2MB: Optimize large file throughput
const H2_INITIAL_CONN_WINDOW_SIZE: u32 = 1024 * 1024 * 4; // 4MB: Link-level flow control
const H2_MAX_FRAME_SIZE: u32 = 16384; // 16KB: Reduce framing overhead
let mut conn_builder = ConnBuilder::new(TokioExecutor::new());
// Optimize for HTTP/1.1 (S3 small files/management plane)
conn_builder
.http1()
.timer(TokioTimer::new())
.keep_alive(true)
.header_read_timeout(Duration::from_secs(5))
.max_buf_size(64 * 1024)
.writev(true);
// Optimize for HTTP/2 (AI/Data Lake high concurrency synchronization)
conn_builder
.http2()
.timer(TokioTimer::new())
.initial_stream_window_size(H2_INITIAL_STREAM_WINDOW_SIZE)
.initial_connection_window_size(H2_INITIAL_CONN_WINDOW_SIZE)
.max_frame_size(H2_MAX_FRAME_SIZE)
.max_concurrent_streams(Some(2048))
.keep_alive_interval(Some(Duration::from_secs(20)))
.keep_alive_timeout(Duration::from_secs(10));
let http_server = Arc::new(conn_builder);
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
let graceful = Arc::new(GracefulShutdown::new());
debug!("graceful initiated");
@@ -253,6 +296,9 @@ pub async fn start_http_server(
worker_state_manager.update(ServiceState::Ready);
let tls_acceptor = tls_acceptor.map(Arc::new);
// Initialize keepalive configuration once to avoid recreation in the loop
let keepalive_conf = get_default_tcp_keepalive();
loop {
debug!("Waiting for new connection...");
let (socket, _) = {
@@ -313,29 +359,19 @@ pub async fn start_http_server(
let socket_ref = SockRef::from(&socket);
// Enable TCP Keepalive to detect dead clients (e.g. power loss)
// Idle: 10s, Interval: 5s, Retries: 3
#[cfg(target_os = "openbsd")]
let ka = TcpKeepalive::new().with_time(Duration::from_secs(10));
#[cfg(not(target_os = "openbsd"))]
let ka = TcpKeepalive::new()
.with_time(Duration::from_secs(10))
.with_interval(Duration::from_secs(5))
.with_retries(3);
if let Err(err) = socket_ref.set_tcp_keepalive(&ka) {
if let Err(err) = socket_ref.set_tcp_keepalive(&keepalive_conf) {
warn!(?err, "Failed to set TCP_KEEPALIVE");
}
// 1. Disable Nagle algorithm: Critical for 4KB Payload, achieving ultra-low latency
if let Err(err) = socket_ref.set_tcp_nodelay(true) {
warn!(?err, "Failed to set TCP_NODELAY");
}
#[cfg(not(target_os = "openbsd"))]
// 4. Increase receive buffer to support BDP at GB-level throughput
if let Err(err) = socket_ref.set_recv_buffer_size(4 * rustfs_config::MI_B) {
warn!(?err, "Failed to set set_recv_buffer_size");
}
#[cfg(not(target_os = "openbsd"))]
if let Err(err) = socket_ref.set_send_buffer_size(4 * rustfs_config::MI_B) {
warn!(?err, "Failed to set set_send_buffer_size");
}
@@ -646,6 +682,13 @@ fn process_connection(
/// Handles connection errors by logging them with appropriate severity
fn handle_connection_error(err: &(dyn std::error::Error + 'static)) {
let s = err.to_string();
if s.contains("connection reset") || s.contains("broken pipe") {
warn!("The connection was reset by the peer or broken pipe: {}", s);
// Ignore common non-fatal errors
return;
}
if let Some(hyper_err) = err.downcast_ref::<hyper::Error>() {
if hyper_err.is_incomplete_message() {
warn!("The HTTP connection is closed prematurely and the message is not completed:{}", hyper_err);
@@ -657,6 +700,8 @@ fn handle_connection_error(err: &(dyn std::error::Error + 'static)) {
error!("HTTP user-custom error:{}", hyper_err);
} else if hyper_err.is_canceled() {
warn!("The HTTP connection is canceled:{}", hyper_err);
} else if format!("{:?}", hyper_err).contains("HeaderTimeout") {
warn!("The HTTP connection timed out (HeaderTimeout): {}", hyper_err);
} else {
error!("Unknown hyper error:{:?}", hyper_err);
}
@@ -704,7 +749,7 @@ fn get_listen_backlog() -> i32 {
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
let mut name = [libc::CTL_KERN, libc::KERN_IPC, libc::KIPC_SOMAXCONN];
let mut buf = [0; 1];
let mut buf_len = size_of_val(&buf);
let mut buf_len = std::mem::size_of_val(&buf);
if unsafe {
libc::sysctl(
@@ -729,3 +774,18 @@ fn get_listen_backlog() -> i32 {
const DEFAULT_BACKLOG: i32 = 1024;
DEFAULT_BACKLOG
}
fn get_default_tcp_keepalive() -> TcpKeepalive {
#[cfg(target_os = "openbsd")]
{
TcpKeepalive::new().with_time(Duration::from_secs(60))
}
#[cfg(not(target_os = "openbsd"))]
{
TcpKeepalive::new()
.with_time(Duration::from_secs(60))
.with_interval(Duration::from_secs(5))
.with_retries(3)
}
}

View File

@@ -466,7 +466,7 @@ impl FS {
let hrd = HashReader::new(reader, size, actual_size, None, None, false).map_err(ApiError::from)?;
reader = Box::new(CompressReader::new(hrd, CompressionAlgorithm::default()));
size = -1;
size = HashReader::SIZE_PRESERVE_LAYER;
}
let hrd = HashReader::new(reader, size, actual_size, None, None, false).map_err(ApiError::from)?;
@@ -1031,7 +1031,7 @@ impl S3 for FS {
// let hrd = HashReader::new(reader, length, actual_size, None, false).map_err(ApiError::from)?;
reader = Box::new(CompressReader::new(hrd, CompressionAlgorithm::default()));
length = -1;
length = HashReader::SIZE_PRESERVE_LAYER;
} else {
src_info
.user_defined
@@ -3301,7 +3301,7 @@ impl S3 for FS {
opts.want_checksum = hrd.checksum();
reader = Box::new(CompressReader::new(hrd, CompressionAlgorithm::default()));
size = -1;
size = HashReader::SIZE_PRESERVE_LAYER;
md5hex = None;
sha256hex = None;
}
@@ -3740,7 +3740,7 @@ impl S3 for FS {
let compress_reader = CompressReader::new(hrd, CompressionAlgorithm::default());
reader = Box::new(compress_reader);
size = -1;
size = HashReader::SIZE_PRESERVE_LAYER;
md5hex = None;
sha256hex = None;
}
@@ -3995,7 +3995,7 @@ impl S3 for FS {
if is_compressible {
let hrd = HashReader::new(reader, size, actual_size, None, None, false).map_err(ApiError::from)?;
reader = Box::new(CompressReader::new(hrd, CompressionAlgorithm::default()));
size = -1;
size = HashReader::SIZE_PRESERVE_LAYER;
}
let mut reader = HashReader::new(reader, size, actual_size, None, None, false).map_err(ApiError::from)?;