Merge pull request #41 from rustfs/feature/tls

Refactor(server): Encapsulate service creation within connection handler
This commit is contained in:
loverustfs
2025-07-04 08:15:01 +08:00
committed by GitHub
5 changed files with 210 additions and 263 deletions

28
Cargo.lock generated
View File

@@ -472,9 +472,9 @@ dependencies = [
[[package]]
name = "async-channel"
version = "2.3.1"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a"
checksum = "16c74e56284d2188cabb6ad99603d1ace887a5d7e7b695d01b728155ed9ed427"
dependencies = [
"concurrent-queue",
"event-listener-strategy",
@@ -733,9 +733,9 @@ dependencies = [
[[package]]
name = "aws-sdk-s3"
version = "1.95.0"
version = "1.96.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a316e3c4c38837084dfbf87c0fc6ea016b3dc3e1f867d9d7f5eddfe47e5cae37"
checksum = "6e25d24de44b34dcdd5182ac4e4c6f07bcec2661c505acef94c0d293b65505fe"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -2058,7 +2058,6 @@ dependencies = [
"ciborium",
"clap",
"criterion-plot",
"futures",
"is-terminal",
"itertools 0.10.5",
"num-traits",
@@ -2071,7 +2070,6 @@ dependencies = [
"serde_derive",
"serde_json",
"tinytemplate",
"tokio",
"walkdir",
]
@@ -4948,6 +4946,17 @@ version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
[[package]]
name = "io-uring"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013"
dependencies = [
"bitflags 2.9.1",
"cfg-if",
"libc",
]
[[package]]
name = "ipnet"
version = "2.11.0"
@@ -8212,7 +8221,6 @@ dependencies = [
"aes-gcm",
"bytes",
"crc32fast",
"criterion",
"futures",
"http 1.3.1",
"md-5",
@@ -9834,17 +9842,19 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.45.1"
version = "1.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779"
checksum = "1140bb80481756a8cbe10541f37433b459c5aa1e727b4c020fbfebdc25bf3ec4"
dependencies = [
"backtrace",
"bytes",
"io-uring",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"slab",
"socket2",
"tokio-macros",
"tracing",

View File

@@ -78,12 +78,12 @@ aes-gcm = { version = "0.10.3", features = ["std"] }
arc-swap = "1.7.1"
argon2 = { version = "0.5.3", features = ["std"] }
atoi = "2.0.0"
async-channel = "2.3.1"
async-channel = "2.4.0"
async-recursion = "1.1.1"
async-trait = "0.1.88"
async-compression = { version = "0.4.0" }
atomic_enum = "0.3.0"
aws-sdk-s3 = "1.95.0"
aws-sdk-s3 = "1.96.0"
axum = "0.8.4"
axum-extra = "0.10.1"
axum-server = { version = "0.7.2", features = ["tls-rustls"] }
@@ -107,7 +107,7 @@ dioxus = { version = "0.6.3", features = ["router"] }
dirs = "6.0.0"
enumset = "1.1.6"
flatbuffers = "25.2.10"
flate2 = "1.1.1"
flate2 = "1.1.2"
flexi_logger = { version = "0.31.2", features = ["trc", "dont_minimize_extra_stacks"] }
form_urlencoded = "1.2.1"
futures = "0.3.31"
@@ -124,7 +124,7 @@ hyper-util = { version = "0.1.14", features = [
"server-auto",
"server-graceful",
] }
hyper-rustls = "0.27.5"
hyper-rustls = "0.27.7"
http = "1.3.1"
http-body = "1.0.1"
humantime = "2.2.0"
@@ -195,7 +195,7 @@ rmp-serde = "1.3.0"
rsa = "0.9.8"
rumqttc = { version = "0.24" }
rust-embed = { version = "8.7.2" }
rust-i18n = { version = "3.1.4" }
rust-i18n = { version = "3.1.5" }
rustfs-rsc = "2025.506.1"
rustls = { version = "0.23.28" }
rustls-pki-types = "1.12.0"
@@ -225,7 +225,7 @@ time = { version = "0.3.41", features = [
"macros",
"serde",
] }
tokio = { version = "1.45.1", features = ["fs", "rt-multi-thread"] }
tokio = { version = "1.46.0", features = ["fs", "rt-multi-thread"] }
tokio-rustls = { version = "0.26.2", default-features = false }
tokio-stream = { version = "0.1.17" }
tokio-tar = "0.3.1"
@@ -251,7 +251,7 @@ uuid = { version = "1.17.0", features = [
wildmatch = { version = "2.4.0", features = ["serde"] }
winapi = { version = "0.3.9" }
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
zip = "2.2.0"
zip = "2.4.2"
zstd = "0.13.3"
[profile.wasm-dev]

View File

@@ -40,5 +40,5 @@ serde_json.workspace = true
md-5 = { workspace = true }
[dev-dependencies]
criterion = { version = "0.5.1", features = ["async", "async_tokio", "tokio"] }
#criterion = { version = "0.5.1", features = ["async", "async_tokio", "tokio"] }
tokio-test = "0.4"

View File

@@ -30,13 +30,21 @@ workspace = true
[dependencies]
rustfs-zip = { workspace = true }
tokio-tar = { workspace = true }
rustfs-madmin = { workspace = true }
rustfs-s3select-api = { workspace = true }
rustfs-appauth = { workspace = true }
rustfs-ecstore = { workspace = true }
rustfs-policy = { workspace = true }
rustfs-common = { workspace = true }
rustfs-iam = { workspace = true }
rustfs-filemeta.workspace = true
rustfs-rio.workspace = true
rustfs-config = { workspace = true, features = ["constants", "notify"] }
rustfs-notify = { workspace = true }
rustfs-obs = { workspace = true }
rustfs-utils = { workspace = true, features = ["full"] }
rustfs-protos.workspace = true
rustfs-s3select-query = { workspace = true }
atoi = { workspace = true }
atomic_enum = { workspace = true }
axum.workspace = true
@@ -53,19 +61,13 @@ hyper.workspace = true
hyper-util.workspace = true
http.workspace = true
http-body.workspace = true
rustfs-iam = { workspace = true }
lazy_static.workspace = true
matchit = { workspace = true }
mime_guess = { workspace = true }
opentelemetry = { workspace = true }
percent-encoding = { workspace = true }
pin-project-lite.workspace = true
rustfs-protos.workspace = true
rustfs-s3select-query = { workspace = true }
reqwest = { workspace = true }
rustfs-config = { workspace = true, features = ["constants", "notify"] }
rustfs-notify = { workspace = true }
rustfs-obs = { workspace = true }
rustfs-utils = { workspace = true, features = ["full"] }
rustls.workspace = true
rust-embed = { workspace = true, features = ["interpolate-folder-path"] }
s3s.workspace = true
@@ -84,9 +86,9 @@ tokio = { workspace = true, features = [
"net",
"signal",
] }
tokio-rustls.workspace = true
lazy_static.workspace = true
tokio-stream.workspace = true
tokio-rustls = { workspace = true, features = ["default"] }
tokio-tar = { workspace = true }
tonic = { workspace = true }
tower.workspace = true
tower-http = { workspace = true, features = [
@@ -98,10 +100,9 @@ tower-http = { workspace = true, features = [
] }
urlencoding = { workspace = true }
uuid = { workspace = true }
rustfs-filemeta.workspace = true
rustfs-rio.workspace = true
zip = { workspace = true }
[target.'cfg(target_os = "linux")'.dependencies]
libsystemd.workspace = true

View File

@@ -44,7 +44,6 @@ use hyper_util::{
use license::init_license;
use rustfs_common::globals::set_global_addr;
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustfs_ecstore::StorageAPI;
use rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
use rustfs_ecstore::cmd::bucket_replication::init_bucket_replication_pool;
use rustfs_ecstore::config as ecconfig;
@@ -53,18 +52,16 @@ use rustfs_ecstore::heal::background_heal_ops::init_auto_heal;
use rustfs_ecstore::rpc::make_server;
use rustfs_ecstore::store_api::BucketOptions;
use rustfs_ecstore::{
endpoints::EndpointServerPools,
heal::data_scanner::init_data_scanner,
set_global_endpoints,
store::{ECStore, init_local_disks},
StorageAPI, endpoints::EndpointServerPools, global::set_global_rustfs_port, heal::data_scanner::init_data_scanner,
notification_sys::new_global_notification_sys, set_global_endpoints, store::ECStore, store::init_local_disks,
update_erasure_type,
};
use rustfs_ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys};
use rustfs_iam::init_iam_sys;
use rustfs_obs::{SystemObserver, init_obs, set_global_guard};
use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_utils::net::parse_and_resolve_address;
use rustls::ServerConfig;
use s3s::service::S3Service;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
use service::hybrid;
use socket2::SockRef;
@@ -72,11 +69,12 @@ use std::io::{Error, Result};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::net::{TcpListener, TcpStream};
#[cfg(unix)]
use tokio::signal::unix::{SignalKind, signal};
use tokio_rustls::TlsAcceptor;
use tonic::{Request, Status, metadata::MetadataValue};
use tower::ServiceBuilder;
use tower_http::catch_panic::CatchPanicLayer;
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
@@ -129,6 +127,49 @@ async fn main() -> Result<()> {
run(opt).await
}
/// Sets up the TLS acceptor if certificates are available.
#[instrument(skip(tls_path))]
async fn setup_tls_acceptor(tls_path: &str) -> Result<Option<TlsAcceptor>> {
if tls_path.is_empty() || tokio::fs::metadata(tls_path).await.is_err() {
debug!("TLS path is not provided or does not exist, starting with HTTP");
return Ok(None);
}
debug!("Found TLS directory, checking for certificates");
// 1. Try to load all certificates from the directory (multi-cert support)
if let Ok(cert_key_pairs) = rustfs_utils::load_all_certs_from_directory(tls_path) {
if !cert_key_pairs.is_empty() {
debug!("Found {} certificates, creating multi-cert resolver", cert_key_pairs.len());
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(Arc::new(rustfs_utils::create_multi_cert_resolver(cert_key_pairs)?));
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
return Ok(Some(TlsAcceptor::from(Arc::new(server_config))));
}
}
// 2. Fallback to legacy single certificate mode
let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}");
let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}");
if tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok() {
debug!("Found legacy single TLS certificate, starting with HTTPS");
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let certs = rustfs_utils::load_certs(&cert_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
let key = rustfs_utils::load_private_key(&key_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
return Ok(Some(TlsAcceptor::from(Arc::new(server_config))));
}
debug!("No valid TLS certificates found in the directory, starting with HTTP");
Ok(None)
}
#[instrument(skip(opt))]
async fn run(opt: config::Opt) -> Result<()> {
debug!("opt: {:?}", &opt);
@@ -148,7 +189,6 @@ async fn run(opt: config::Opt) -> Result<()> {
let listener = TcpListener::bind(server_address.clone()).await?;
// Obtain the listener address
let local_addr: SocketAddr = listener.local_addr()?;
// let local_ip = utils::get_local_ip().ok_or(local_addr.ip()).unwrap();
let local_ip = rustfs_utils::get_local_ip().ok_or(local_addr.ip()).unwrap();
// For RPC
@@ -204,18 +244,14 @@ async fn run(opt: config::Opt) -> Result<()> {
// This project uses the S3S library to implement S3 services
let s3_service = {
let store = storage::ecfs::FS::new();
// let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(server_address.clone(), endpoint_pools).await?);
let mut b = S3ServiceBuilder::new(store.clone());
let access_key = opt.access_key.clone();
let secret_key = opt.secret_key.clone();
// Displays info information
debug!("authentication is enabled {}, {}", &access_key, &secret_key);
b.set_auth(IAMAuth::new(access_key, secret_key));
b.set_access(store.clone());
b.set_route(admin::make_admin_route()?);
if !opt.server_domains.is_empty() {
@@ -223,20 +259,6 @@ async fn run(opt: config::Opt) -> Result<()> {
b.set_host(MultiDomain::new(&opt.server_domains).map_err(Error::other)?);
}
// // Enable parsing virtual-hosted-style requests
// if let Some(dm) = opt.domain_name {
// info!("virtual-hosted-style requests are enabled use domain_name {}", &dm);
// b.set_base_domain(dm);
// }
// if domain_name.is_some() {
// info!(
// "virtual-hosted-style requests are enabled use domain_name {}",
// domain_name.as_ref().unwrap()
// );
// b.set_base_domain(domain_name.unwrap());
// }
b.build()
};
@@ -254,57 +276,8 @@ async fn run(opt: config::Opt) -> Result<()> {
}
});
let tls_path = opt.tls_path.clone().unwrap_or_default();
let has_tls_certs = tokio::fs::metadata(&tls_path).await.is_ok();
let tls_acceptor = if has_tls_certs {
debug!("Found TLS directory, checking for certificates");
let tls_acceptor = setup_tls_acceptor(opt.tls_path.as_deref().unwrap_or_default()).await?;
// 1. Try to load all certificates directly (including root and subdirectories)
match rustfs_utils::load_all_certs_from_directory(&tls_path) {
Ok(cert_key_pairs) if !cert_key_pairs.is_empty() => {
debug!("Found {} certificates, starting with HTTPS", cert_key_pairs.len());
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
// create a multi certificate configuration
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(Arc::new(rustfs_utils::create_multi_cert_resolver(cert_key_pairs)?));
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
Some(TlsAcceptor::from(Arc::new(server_config)))
}
_ => {
// 2. If the synthesis fails, fall back to the traditional document certificate mode (backward compatible)
let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}");
let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}");
let has_single_cert =
tokio::try_join!(tokio::fs::metadata(key_path.clone()), tokio::fs::metadata(cert_path.clone())).is_ok();
if has_single_cert {
debug!("Found legacy single TLS certificate, starting with HTTPS");
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let certs =
rustfs_utils::load_certs(cert_path.as_str()).map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
let key = rustfs_utils::load_private_key(key_path.as_str())
.map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
Some(TlsAcceptor::from(Arc::new(server_config)))
} else {
debug!("No valid TLS certificates found, starting with HTTP");
None
}
}
}
} else {
debug!("TLS certificates not found, starting with HTTP");
None
};
let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth);
let state_manager = ServiceStateManager::new();
let worker_state_manager = state_manager.clone();
// Update service status to Starting
@@ -318,73 +291,11 @@ async fn run(opt: config::Opt) -> Result<()> {
#[cfg(unix)]
let (mut sigterm_inner, mut sigint_inner) = {
// Unix platform specific code
let sigterm_inner = match signal(SignalKind::terminate()) {
Ok(signal) => signal,
Err(e) => {
error!("Failed to create SIGTERM signal handler: {}", e);
return;
}
};
let sigint_inner = match signal(SignalKind::interrupt()) {
Ok(signal) => signal,
Err(e) => {
error!("Failed to create SIGINT signal handler: {}", e);
return;
}
};
let sigterm_inner = signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal handler");
let sigint_inner = signal(SignalKind::interrupt()).expect("Failed to create SIGINT signal handler");
(sigterm_inner, sigint_inner)
};
let hybrid_service = TowerToHyperService::new(
tower::ServiceBuilder::new()
.layer(CatchPanicLayer::new())
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &HttpRequest<_>| {
let span = tracing::info_span!("http-request",
status_code = tracing::field::Empty,
method = %request.method(),
uri = %request.uri(),
version = ?request.version(),
);
for (header_name, header_value) in request.headers() {
if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length"
{
span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid"));
}
}
span
})
.on_request(|request: &HttpRequest<_>, _span: &Span| {
info!(
counter.rustfs_api_requests_total = 1_u64,
key_request_method = %request.method().to_string(),
key_request_uri_path = %request.uri().path().to_owned(),
"handle request api total",
);
debug!("http started method: {}, url path: {}", request.method(), request.uri().path())
})
.on_response(|response: &Response<_>, latency: Duration, _span: &Span| {
_span.record("http response status_code", tracing::field::display(response.status()));
debug!("http response generated in {:?}", latency)
})
.on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
info!(histogram.request.body.len = chunk.len(), "histogram request body length",);
debug!("http body sending {} bytes in {:?}", chunk.len(), latency)
})
.on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
debug!("http stream closed after {:?}", stream_duration)
})
.on_failure(|_error, latency: Duration, _span: &Span| {
info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total");
debug!("http request failure error: {:?} in {:?}", _error, latency)
}),
)
.layer(CorsLayer::permissive())
.service(hybrid(s3_service, rpc_service)),
);
let http_server = Arc::new(ConnBuilder::new(TokioExecutor::new()));
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
let graceful = Arc::new(GracefulShutdown::new());
@@ -392,42 +303,36 @@ async fn run(opt: config::Opt) -> Result<()> {
// service ready
worker_state_manager.update(ServiceState::Ready);
let value = hybrid_service.clone();
let tls_acceptor = tls_acceptor.map(Arc::new);
loop {
debug!("waiting for SIGINT or SIGTERM has_tls_certs: {}", has_tls_certs);
// Wait for a connection
debug!("Waiting for new connection...");
let (socket, _) = {
#[cfg(unix)]
{
tokio::select! {
res = listener.accept() => {
match res {
Ok(conn) => conn,
Err(err) => {
error!("error accepting connection: {err}");
continue;
}
res = listener.accept() => match res {
Ok(conn) => conn,
Err(err) => {
error!("error accepting connection: {err}");
continue;
}
}
},
_ = ctrl_c.as_mut() => {
info!("Ctrl-C received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
}
},
Some(_) = sigint_inner.recv() => {
info!("SIGINT received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
}
},
Some(_) = sigterm_inner.recv() => {
info!("SIGTERM received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
}
},
_ = shutdown_rx.recv() => {
info!("Shutdown signal received in worker thread");
break;
@@ -437,22 +342,18 @@ async fn run(opt: config::Opt) -> Result<()> {
#[cfg(not(unix))]
{
tokio::select! {
res = listener.accept() => {
match res {
Ok(conn) => conn,
Err(err) => {
error!("error accepting connection: {err}");
continue;
}
res = listener.accept() => match res {
Ok(conn) => conn,
Err(err) => {
error!("error accepting connection: {err}");
continue;
}
}
},
_ = ctrl_c.as_mut() => {
info!("Ctrl-C received in worker thread");
let _ = shutdown_tx_clone.send(());
break;
}
},
_ = shutdown_rx.recv() => {
info!("Shutdown signal received in worker thread");
break;
@@ -472,70 +373,12 @@ async fn run(opt: config::Opt) -> Result<()> {
warn!(?err, "Failed to set set_send_buffer_size");
}
if has_tls_certs {
debug!("TLS certificates found, starting with SIGINT");
let peer_addr_str = socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|e| {
warn!("Could not get peer address: {}", e);
"unknown".to_string()
});
let tls_socket = match tls_acceptor.as_ref() {
Some(acceptor) => match acceptor.accept(socket).await {
Ok(tls_socket) => {
info!("TLS handshake successful with peer: {}", peer_addr_str);
tls_socket
}
Err(err) => {
error!("TLS handshake with peer {} failed: {}", peer_addr_str, err);
continue;
}
},
None => {
error!(
"TLS acceptor is not available, but TLS is enabled. This is a bug. Dropping connection from {}",
peer_addr_str
);
continue;
}
};
let http_server_clone = http_server.clone();
let value_clone = value.clone();
let graceful_clone = graceful.clone();
tokio::task::spawn_blocking(move || {
tokio::runtime::Runtime::new()
.expect("Failed to create runtime")
.block_on(async move {
let conn = http_server_clone.serve_connection(TokioIo::new(tls_socket), value_clone);
let conn = graceful_clone.watch(conn);
if let Err(err) = conn.await {
// Handle hyper::Error and low-level IO errors at a more granular level
handle_connection_error(&*err);
}
});
});
debug!("TLS handshake success");
} else {
debug!("Http handshake start");
let http_server_clone = http_server.clone();
let value_clone = value.clone();
let graceful_clone = graceful.clone();
tokio::spawn(async move {
let conn = http_server_clone.serve_connection(TokioIo::new(socket), value_clone);
let conn = graceful_clone.watch(conn);
if let Err(err) = conn.await {
// Handle hyper::Error and low-level IO errors at a more granular level
handle_connection_error(&*err);
}
});
debug!("Http handshake success");
}
process_connection(socket, tls_acceptor.clone(), http_server.clone(), s3_service.clone(), graceful.clone());
}
worker_state_manager.update(ServiceState::Stopping);
match Arc::try_unwrap(graceful) {
Ok(g) => {
// Successfully obtaining unique ownership, you can call shutdown
tokio::select! {
() = g.shutdown() => {
debug!("Gracefully shutdown!");
@@ -546,9 +389,7 @@ async fn run(opt: config::Opt) -> Result<()> {
}
}
Err(arc_graceful) => {
// There are other references that cannot be obtained for unique ownership
error!("Cannot perform graceful shutdown, other references exist err: {:?}", arc_graceful);
// In this case, we can only wait for the timeout
tokio::time::sleep(Duration::from_secs(10)).await;
debug!("Timeout reached, forcing shutdown");
}
@@ -591,14 +432,12 @@ async fn run(opt: config::Opt) -> Result<()> {
init_data_scanner().await;
// init auto heal
init_auto_heal().await;
// init console configuration
init_console_cfg(local_ip, server_port);
print_server_info();
init_bucket_replication_pool().await;
print_server_info();
// Async update check (optional)
tokio::spawn(async {
use crate::update_checker::{UpdateCheckError, check_updates};
@@ -666,6 +505,103 @@ async fn run(opt: config::Opt) -> Result<()> {
Ok(())
}
/// Process a single incoming TCP connection.
///
/// This function is executed in a new Tokio task and it will:
/// 1. If TLS is configured, perform TLS handshake.
/// 2. Build a complete service stack for this connection, including S3, RPC services, and all middleware.
/// 3. Use Hyper to handle HTTP requests on this connection.
/// 4. Incorporate connections into the management of elegant closures.
#[instrument(skip_all, fields(peer_addr = %socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|_| "unknown".to_string())))]
fn process_connection(
socket: TcpStream,
tls_acceptor: Option<Arc<TlsAcceptor>>,
http_server: Arc<ConnBuilder<TokioExecutor>>,
s3_service: S3Service,
graceful: Arc<GracefulShutdown>,
) {
tokio::spawn(async move {
// Build services inside each connected task to avoid passing complex service types across tasks,
// It also ensures that each connection has an independent service instance.
let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth);
let hybrid_service = ServiceBuilder::new()
.layer(CatchPanicLayer::new())
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &HttpRequest<_>| {
let span = tracing::info_span!("http-request",
status_code = tracing::field::Empty,
method = %request.method(),
uri = %request.uri(),
version = ?request.version(),
);
for (header_name, header_value) in request.headers() {
if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length" {
span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid"));
}
}
span
})
.on_request(|request: &HttpRequest<_>, _span: &Span| {
info!(
counter.rustfs_api_requests_total = 1_u64,
key_request_method = %request.method().to_string(),
key_request_uri_path = %request.uri().path().to_owned(),
"handle request api total",
);
debug!("http started method: {}, url path: {}", request.method(), request.uri().path())
})
.on_response(|response: &Response<_>, latency: Duration, _span: &Span| {
_span.record("http response status_code", tracing::field::display(response.status()));
debug!("http response generated in {:?}", latency)
})
.on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
info!(histogram.request.body.len = chunk.len(), "histogram request body length",);
debug!("http body sending {} bytes in {:?}", chunk.len(), latency)
})
.on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
debug!("http stream closed after {:?}", stream_duration)
})
.on_failure(|_error, latency: Duration, _span: &Span| {
info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total");
debug!("http request failure error: {:?} in {:?}", _error, latency)
}),
)
.layer(CorsLayer::permissive())
.service(hybrid(s3_service, rpc_service));
let hybrid_service = TowerToHyperService::new(hybrid_service);
// Decide whether to handle HTTPS or HTTP connections based on the existence of TLS Acceptor
if let Some(acceptor) = tls_acceptor {
debug!("TLS handshake start");
match acceptor.accept(socket).await {
Ok(tls_socket) => {
debug!("TLS handshake successful");
let stream = TokioIo::new(tls_socket);
let conn = http_server.serve_connection(stream, hybrid_service);
if let Err(err) = graceful.watch(conn).await {
handle_connection_error(&*err);
}
}
Err(err) => {
error!(?err, "TLS handshake failed");
return; // Failed to end the task directly
}
}
debug!("TLS handshake success");
} else {
debug!("Http handshake start");
let stream = TokioIo::new(socket);
let conn = http_server.serve_connection(stream, hybrid_service);
if let Err(err) = graceful.watch(conn).await {
handle_connection_error(&*err);
}
debug!("Http handshake success");
};
});
}
/// Handles the shutdown process of the server
async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) {
info!("Shutdown signal received in main thread");