mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
Refactor(server): Encapsulate service creation within connection handler
Move the construction of the hybrid service stack, including all middleware and the RPC service, from the main `run` function into the `process_connection` function. This change ensures that each incoming connection gets its own isolated service instance. This improves modularity by making the connection handling logic more self-contained and simplifies the main server loop. Key changes: - The `hybrid_service` and `rpc_service` are now created inside `process_connection`. - The `run` function's responsibility is reduced to accepting TCP connections and spawning tasks for `process_connection`.
This commit is contained in:
94
Cargo.lock
generated
94
Cargo.lock
generated
@@ -458,45 +458,6 @@ dependencies = [
|
||||
"zbus 5.7.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "asn1-rs"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "56624a96882bb8c26d61312ae18cb45868e5a9992ea73c58e45c3101e56a1e60"
|
||||
dependencies = [
|
||||
"asn1-rs-derive",
|
||||
"asn1-rs-impl",
|
||||
"displaydoc",
|
||||
"nom 7.1.3",
|
||||
"num-traits",
|
||||
"rusticata-macros",
|
||||
"thiserror 2.0.12",
|
||||
"time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "asn1-rs-derive"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3109e49b1e4909e9db6515a30c633684d68cdeaa252f215214cb4fa1a5bfee2c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.101",
|
||||
"synstructure",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "asn1-rs-impl"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-broadcast"
|
||||
version = "0.7.2"
|
||||
@@ -2097,7 +2058,6 @@ dependencies = [
|
||||
"ciborium",
|
||||
"clap",
|
||||
"criterion-plot",
|
||||
"futures",
|
||||
"is-terminal",
|
||||
"itertools 0.10.5",
|
||||
"num-traits",
|
||||
@@ -2110,7 +2070,6 @@ dependencies = [
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"tinytemplate",
|
||||
"tokio",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
@@ -2870,20 +2829,6 @@ dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "der-parser"
|
||||
version = "10.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07da5016415d5a3c4dd39b11ed26f915f52fc4e0dc197d87908bc916e51bc1a6"
|
||||
dependencies = [
|
||||
"asn1-rs",
|
||||
"displaydoc",
|
||||
"nom 7.1.3",
|
||||
"num-bigint",
|
||||
"num-traits",
|
||||
"rusticata-macros",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deranged"
|
||||
version = "0.4.0"
|
||||
@@ -6394,15 +6339,6 @@ dependencies = [
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "oid-registry"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "12f40cff3dde1b6087cc5d5f5d4d65712f34016a03ed60e9c08dcc392736b5b7"
|
||||
dependencies = [
|
||||
"asn1-rs",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.21.3"
|
||||
@@ -7947,7 +7883,6 @@ dependencies = [
|
||||
"rustfs-utils",
|
||||
"rustfs-zip",
|
||||
"rustls 0.23.28",
|
||||
"rustls-pki-types",
|
||||
"s3s",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -7968,7 +7903,6 @@ dependencies = [
|
||||
"tracing",
|
||||
"urlencoding",
|
||||
"uuid",
|
||||
"x509-parser",
|
||||
"zip",
|
||||
]
|
||||
|
||||
@@ -8287,7 +8221,6 @@ dependencies = [
|
||||
"aes-gcm",
|
||||
"bytes",
|
||||
"crc32fast",
|
||||
"criterion",
|
||||
"futures",
|
||||
"http 1.3.1",
|
||||
"md-5",
|
||||
@@ -8447,15 +8380,6 @@ dependencies = [
|
||||
"tokio-tar",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rusticata-macros"
|
||||
version = "4.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632"
|
||||
dependencies = [
|
||||
"nom 7.1.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustix"
|
||||
version = "0.38.44"
|
||||
@@ -11527,24 +11451,6 @@ dependencies = [
|
||||
"pkg-config",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "x509-parser"
|
||||
version = "0.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4569f339c0c402346d4a75a9e39cf8dad310e287eef1ff56d4c68e5067f53460"
|
||||
dependencies = [
|
||||
"asn1-rs",
|
||||
"data-encoding",
|
||||
"der-parser",
|
||||
"lazy_static",
|
||||
"nom 7.1.3",
|
||||
"oid-registry",
|
||||
"ring",
|
||||
"rusticata-macros",
|
||||
"thiserror 2.0.12",
|
||||
"time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "xattr"
|
||||
version = "1.5.0"
|
||||
|
||||
14
Cargo.toml
14
Cargo.toml
@@ -78,12 +78,12 @@ aes-gcm = { version = "0.10.3", features = ["std"] }
|
||||
arc-swap = "1.7.1"
|
||||
argon2 = { version = "0.5.3", features = ["std"] }
|
||||
atoi = "2.0.0"
|
||||
async-channel = "2.3.1"
|
||||
async-channel = "2.4.0"
|
||||
async-recursion = "1.1.1"
|
||||
async-trait = "0.1.88"
|
||||
async-compression = { version = "0.4.0" }
|
||||
async-compression = { version = "0.4.25" }
|
||||
atomic_enum = "0.3.0"
|
||||
aws-sdk-s3 = "1.95.0"
|
||||
aws-sdk-s3 = "1.96.0"
|
||||
axum = "0.8.4"
|
||||
axum-extra = "0.10.1"
|
||||
axum-server = { version = "0.7.2", features = ["tls-rustls"] }
|
||||
@@ -107,7 +107,7 @@ dioxus = { version = "0.6.3", features = ["router"] }
|
||||
dirs = "6.0.0"
|
||||
enumset = "1.1.6"
|
||||
flatbuffers = "25.2.10"
|
||||
flate2 = "1.1.1"
|
||||
flate2 = "1.1.2"
|
||||
flexi_logger = { version = "0.31.2", features = ["trc", "dont_minimize_extra_stacks"] }
|
||||
form_urlencoded = "1.2.1"
|
||||
futures = "0.3.31"
|
||||
@@ -124,7 +124,7 @@ hyper-util = { version = "0.1.14", features = [
|
||||
"server-auto",
|
||||
"server-graceful",
|
||||
] }
|
||||
hyper-rustls = "0.27.5"
|
||||
hyper-rustls = "0.27.7"
|
||||
http = "1.3.1"
|
||||
http-body = "1.0.1"
|
||||
humantime = "2.2.0"
|
||||
@@ -195,7 +195,7 @@ rmp-serde = "1.3.0"
|
||||
rsa = "0.9.8"
|
||||
rumqttc = { version = "0.24" }
|
||||
rust-embed = { version = "8.7.2" }
|
||||
rust-i18n = { version = "3.1.4" }
|
||||
rust-i18n = { version = "3.1.5" }
|
||||
rustfs-rsc = "2025.506.1"
|
||||
rustls = { version = "0.23.28" }
|
||||
rustls-pki-types = "1.12.0"
|
||||
@@ -252,7 +252,7 @@ wildmatch = { version = "2.4.0", features = ["serde"] }
|
||||
winapi = { version = "0.3.9" }
|
||||
x509-parser = "0.17.0"
|
||||
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
|
||||
zip = "2.2.0"
|
||||
zip = "2.4.2"
|
||||
zstd = "0.13.3"
|
||||
|
||||
[profile.wasm-dev]
|
||||
|
||||
@@ -40,5 +40,5 @@ serde_json.workspace = true
|
||||
md-5 = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = { version = "0.5.1", features = ["async", "async_tokio", "tokio"] }
|
||||
#criterion = { version = "0.5.1", features = ["async", "async_tokio", "tokio"] }
|
||||
tokio-test = "0.4"
|
||||
|
||||
@@ -69,7 +69,6 @@ percent-encoding = { workspace = true }
|
||||
pin-project-lite.workspace = true
|
||||
reqwest = { workspace = true }
|
||||
rustls.workspace = true
|
||||
rustls-pki-types = { workspace = true }
|
||||
rust-embed = { workspace = true, features = ["interpolate-folder-path"] }
|
||||
s3s.workspace = true
|
||||
serde.workspace = true
|
||||
@@ -102,7 +101,6 @@ tower-http = { workspace = true, features = [
|
||||
urlencoding = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
zip = { workspace = true }
|
||||
x509-parser = { workspace = true, features = ["verify", "validate"] }
|
||||
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
|
||||
@@ -44,7 +44,6 @@ use hyper_util::{
|
||||
use license::init_license;
|
||||
use rustfs_common::globals::set_global_addr;
|
||||
use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
|
||||
use rustfs_ecstore::StorageAPI;
|
||||
use rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
|
||||
use rustfs_ecstore::cmd::bucket_replication::init_bucket_replication_pool;
|
||||
use rustfs_ecstore::config as ecconfig;
|
||||
@@ -53,18 +52,16 @@ use rustfs_ecstore::heal::background_heal_ops::init_auto_heal;
|
||||
use rustfs_ecstore::rpc::make_server;
|
||||
use rustfs_ecstore::store_api::BucketOptions;
|
||||
use rustfs_ecstore::{
|
||||
endpoints::EndpointServerPools,
|
||||
heal::data_scanner::init_data_scanner,
|
||||
set_global_endpoints,
|
||||
store::{ECStore, init_local_disks},
|
||||
StorageAPI, endpoints::EndpointServerPools, global::set_global_rustfs_port, heal::data_scanner::init_data_scanner,
|
||||
notification_sys::new_global_notification_sys, set_global_endpoints, store::ECStore, store::init_local_disks,
|
||||
update_erasure_type,
|
||||
};
|
||||
use rustfs_ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys};
|
||||
use rustfs_iam::init_iam_sys;
|
||||
use rustfs_obs::{SystemObserver, init_obs, set_global_guard};
|
||||
use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer;
|
||||
use rustfs_utils::net::parse_and_resolve_address;
|
||||
use rustls::ServerConfig;
|
||||
use s3s::service::S3Service;
|
||||
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
|
||||
use service::hybrid;
|
||||
use socket2::SockRef;
|
||||
@@ -72,11 +69,12 @@ use std::io::{Error, Result};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
#[cfg(unix)]
|
||||
use tokio::signal::unix::{SignalKind, signal};
|
||||
use tokio_rustls::TlsAcceptor;
|
||||
use tonic::{Request, Status, metadata::MetadataValue};
|
||||
use tower::ServiceBuilder;
|
||||
use tower_http::catch_panic::CatchPanicLayer;
|
||||
use tower_http::cors::CorsLayer;
|
||||
use tower_http::trace::TraceLayer;
|
||||
@@ -129,6 +127,49 @@ async fn main() -> Result<()> {
|
||||
run(opt).await
|
||||
}
|
||||
|
||||
/// Sets up the TLS acceptor if certificates are available.
|
||||
#[instrument(skip(tls_path))]
|
||||
async fn setup_tls_acceptor(tls_path: &str) -> Result<Option<TlsAcceptor>> {
|
||||
if tls_path.is_empty() || tokio::fs::metadata(tls_path).await.is_err() {
|
||||
debug!("TLS path is not provided or does not exist, starting with HTTP");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
debug!("Found TLS directory, checking for certificates");
|
||||
|
||||
// 1. Try to load all certificates from the directory (multi-cert support)
|
||||
if let Ok(cert_key_pairs) = rustfs_utils::load_all_certs_from_directory(tls_path) {
|
||||
if !cert_key_pairs.is_empty() {
|
||||
debug!("Found {} certificates, creating multi-cert resolver", cert_key_pairs.len());
|
||||
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
|
||||
let mut server_config = ServerConfig::builder()
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(Arc::new(rustfs_utils::create_multi_cert_resolver(cert_key_pairs)?));
|
||||
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
|
||||
return Ok(Some(TlsAcceptor::from(Arc::new(server_config))));
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Fallback to legacy single certificate mode
|
||||
let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}");
|
||||
let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}");
|
||||
if tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok() {
|
||||
debug!("Found legacy single TLS certificate, starting with HTTPS");
|
||||
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
|
||||
let certs = rustfs_utils::load_certs(&cert_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
|
||||
let key = rustfs_utils::load_private_key(&key_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
|
||||
let mut server_config = ServerConfig::builder()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(certs, key)
|
||||
.map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
|
||||
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
|
||||
return Ok(Some(TlsAcceptor::from(Arc::new(server_config))));
|
||||
}
|
||||
|
||||
debug!("No valid TLS certificates found in the directory, starting with HTTP");
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
#[instrument(skip(opt))]
|
||||
async fn run(opt: config::Opt) -> Result<()> {
|
||||
debug!("opt: {:?}", &opt);
|
||||
@@ -148,7 +189,6 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
let listener = TcpListener::bind(server_address.clone()).await?;
|
||||
// Obtain the listener address
|
||||
let local_addr: SocketAddr = listener.local_addr()?;
|
||||
// let local_ip = utils::get_local_ip().ok_or(local_addr.ip()).unwrap();
|
||||
let local_ip = rustfs_utils::get_local_ip().ok_or(local_addr.ip()).unwrap();
|
||||
|
||||
// For RPC
|
||||
@@ -204,18 +244,14 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
// This project uses the S3S library to implement S3 services
|
||||
let s3_service = {
|
||||
let store = storage::ecfs::FS::new();
|
||||
// let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(server_address.clone(), endpoint_pools).await?);
|
||||
let mut b = S3ServiceBuilder::new(store.clone());
|
||||
|
||||
let access_key = opt.access_key.clone();
|
||||
let secret_key = opt.secret_key.clone();
|
||||
// Displays info information
|
||||
debug!("authentication is enabled {}, {}", &access_key, &secret_key);
|
||||
|
||||
b.set_auth(IAMAuth::new(access_key, secret_key));
|
||||
|
||||
b.set_access(store.clone());
|
||||
|
||||
b.set_route(admin::make_admin_route()?);
|
||||
|
||||
if !opt.server_domains.is_empty() {
|
||||
@@ -223,20 +259,6 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
b.set_host(MultiDomain::new(&opt.server_domains).map_err(Error::other)?);
|
||||
}
|
||||
|
||||
// // Enable parsing virtual-hosted-style requests
|
||||
// if let Some(dm) = opt.domain_name {
|
||||
// info!("virtual-hosted-style requests are enabled use domain_name {}", &dm);
|
||||
// b.set_base_domain(dm);
|
||||
// }
|
||||
|
||||
// if domain_name.is_some() {
|
||||
// info!(
|
||||
// "virtual-hosted-style requests are enabled use domain_name {}",
|
||||
// domain_name.as_ref().unwrap()
|
||||
// );
|
||||
// b.set_base_domain(domain_name.unwrap());
|
||||
// }
|
||||
|
||||
b.build()
|
||||
};
|
||||
|
||||
@@ -254,57 +276,8 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
}
|
||||
});
|
||||
|
||||
let tls_path = opt.tls_path.clone().unwrap_or_default();
|
||||
let has_tls_certs = tokio::fs::metadata(&tls_path).await.is_ok();
|
||||
let tls_acceptor = if has_tls_certs {
|
||||
debug!("Found TLS directory, checking for certificates");
|
||||
let tls_acceptor = setup_tls_acceptor(opt.tls_path.as_deref().unwrap_or_default()).await?;
|
||||
|
||||
// 1. Try to load all certificates directly (including root and subdirectories)
|
||||
match rustfs_utils::load_all_certs_from_directory(&tls_path) {
|
||||
Ok(cert_key_pairs) if !cert_key_pairs.is_empty() => {
|
||||
debug!("Found {} certificates, starting with HTTPS", cert_key_pairs.len());
|
||||
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
|
||||
|
||||
// create a multi certificate configuration
|
||||
let mut server_config = ServerConfig::builder()
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(Arc::new(rustfs_utils::create_multi_cert_resolver(cert_key_pairs)?));
|
||||
|
||||
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
|
||||
Some(TlsAcceptor::from(Arc::new(server_config)))
|
||||
}
|
||||
_ => {
|
||||
// 2. If the synthesis fails, fall back to the traditional document certificate mode (backward compatible)
|
||||
let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}");
|
||||
let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}");
|
||||
let has_single_cert =
|
||||
tokio::try_join!(tokio::fs::metadata(key_path.clone()), tokio::fs::metadata(cert_path.clone())).is_ok();
|
||||
|
||||
if has_single_cert {
|
||||
debug!("Found legacy single TLS certificate, starting with HTTPS");
|
||||
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
|
||||
let certs =
|
||||
rustfs_utils::load_certs(cert_path.as_str()).map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
|
||||
let key = rustfs_utils::load_private_key(key_path.as_str())
|
||||
.map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
|
||||
let mut server_config = ServerConfig::builder()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(certs, key)
|
||||
.map_err(|e| rustfs_utils::certs_error(e.to_string()))?;
|
||||
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
|
||||
Some(TlsAcceptor::from(Arc::new(server_config)))
|
||||
} else {
|
||||
debug!("No valid TLS certificates found, starting with HTTP");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("TLS certificates not found, starting with HTTP");
|
||||
None
|
||||
};
|
||||
|
||||
let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth);
|
||||
let state_manager = ServiceStateManager::new();
|
||||
let worker_state_manager = state_manager.clone();
|
||||
// Update service status to Starting
|
||||
@@ -318,73 +291,11 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
#[cfg(unix)]
|
||||
let (mut sigterm_inner, mut sigint_inner) = {
|
||||
// Unix platform specific code
|
||||
let sigterm_inner = match signal(SignalKind::terminate()) {
|
||||
Ok(signal) => signal,
|
||||
Err(e) => {
|
||||
error!("Failed to create SIGTERM signal handler: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let sigint_inner = match signal(SignalKind::interrupt()) {
|
||||
Ok(signal) => signal,
|
||||
Err(e) => {
|
||||
error!("Failed to create SIGINT signal handler: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let sigterm_inner = signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal handler");
|
||||
let sigint_inner = signal(SignalKind::interrupt()).expect("Failed to create SIGINT signal handler");
|
||||
(sigterm_inner, sigint_inner)
|
||||
};
|
||||
|
||||
let hybrid_service = TowerToHyperService::new(
|
||||
tower::ServiceBuilder::new()
|
||||
.layer(CatchPanicLayer::new())
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.make_span_with(|request: &HttpRequest<_>| {
|
||||
let span = tracing::info_span!("http-request",
|
||||
status_code = tracing::field::Empty,
|
||||
method = %request.method(),
|
||||
uri = %request.uri(),
|
||||
version = ?request.version(),
|
||||
);
|
||||
for (header_name, header_value) in request.headers() {
|
||||
if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length"
|
||||
{
|
||||
span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid"));
|
||||
}
|
||||
}
|
||||
|
||||
span
|
||||
})
|
||||
.on_request(|request: &HttpRequest<_>, _span: &Span| {
|
||||
info!(
|
||||
counter.rustfs_api_requests_total = 1_u64,
|
||||
key_request_method = %request.method().to_string(),
|
||||
key_request_uri_path = %request.uri().path().to_owned(),
|
||||
"handle request api total",
|
||||
);
|
||||
debug!("http started method: {}, url path: {}", request.method(), request.uri().path())
|
||||
})
|
||||
.on_response(|response: &Response<_>, latency: Duration, _span: &Span| {
|
||||
_span.record("http response status_code", tracing::field::display(response.status()));
|
||||
debug!("http response generated in {:?}", latency)
|
||||
})
|
||||
.on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
|
||||
info!(histogram.request.body.len = chunk.len(), "histogram request body length",);
|
||||
debug!("http body sending {} bytes in {:?}", chunk.len(), latency)
|
||||
})
|
||||
.on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
|
||||
debug!("http stream closed after {:?}", stream_duration)
|
||||
})
|
||||
.on_failure(|_error, latency: Duration, _span: &Span| {
|
||||
info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total");
|
||||
debug!("http request failure error: {:?} in {:?}", _error, latency)
|
||||
}),
|
||||
)
|
||||
.layer(CorsLayer::permissive())
|
||||
.service(hybrid(s3_service, rpc_service)),
|
||||
);
|
||||
|
||||
let http_server = Arc::new(ConnBuilder::new(TokioExecutor::new()));
|
||||
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
|
||||
let graceful = Arc::new(GracefulShutdown::new());
|
||||
@@ -392,42 +303,36 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
|
||||
// service ready
|
||||
worker_state_manager.update(ServiceState::Ready);
|
||||
let value = hybrid_service.clone();
|
||||
let tls_acceptor = tls_acceptor.map(Arc::new);
|
||||
|
||||
loop {
|
||||
debug!("waiting for SIGINT or SIGTERM has_tls_certs: {}", has_tls_certs);
|
||||
// Wait for a connection
|
||||
debug!("Waiting for new connection...");
|
||||
let (socket, _) = {
|
||||
#[cfg(unix)]
|
||||
{
|
||||
tokio::select! {
|
||||
res = listener.accept() => {
|
||||
match res {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
error!("error accepting connection: {err}");
|
||||
continue;
|
||||
}
|
||||
res = listener.accept() => match res {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
error!("error accepting connection: {err}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
},
|
||||
_ = ctrl_c.as_mut() => {
|
||||
info!("Ctrl-C received in worker thread");
|
||||
let _ = shutdown_tx_clone.send(());
|
||||
break;
|
||||
}
|
||||
|
||||
},
|
||||
Some(_) = sigint_inner.recv() => {
|
||||
info!("SIGINT received in worker thread");
|
||||
let _ = shutdown_tx_clone.send(());
|
||||
break;
|
||||
}
|
||||
|
||||
},
|
||||
Some(_) = sigterm_inner.recv() => {
|
||||
info!("SIGTERM received in worker thread");
|
||||
let _ = shutdown_tx_clone.send(());
|
||||
break;
|
||||
}
|
||||
|
||||
},
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("Shutdown signal received in worker thread");
|
||||
break;
|
||||
@@ -437,22 +342,18 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
tokio::select! {
|
||||
res = listener.accept() => {
|
||||
match res {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
error!("error accepting connection: {err}");
|
||||
continue;
|
||||
}
|
||||
res = listener.accept() => match res {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
error!("error accepting connection: {err}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
},
|
||||
_ = ctrl_c.as_mut() => {
|
||||
info!("Ctrl-C received in worker thread");
|
||||
let _ = shutdown_tx_clone.send(());
|
||||
break;
|
||||
}
|
||||
|
||||
},
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("Shutdown signal received in worker thread");
|
||||
break;
|
||||
@@ -472,70 +373,12 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
warn!(?err, "Failed to set set_send_buffer_size");
|
||||
}
|
||||
|
||||
if has_tls_certs {
|
||||
debug!("TLS certificates found, starting with SIGINT");
|
||||
let peer_addr_str = socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|e| {
|
||||
warn!("Could not get peer address: {}", e);
|
||||
"unknown".to_string()
|
||||
});
|
||||
let tls_socket = match tls_acceptor.as_ref() {
|
||||
Some(acceptor) => match acceptor.accept(socket).await {
|
||||
Ok(tls_socket) => {
|
||||
info!("TLS handshake successful with peer: {}", peer_addr_str);
|
||||
tls_socket
|
||||
}
|
||||
Err(err) => {
|
||||
error!("TLS handshake with peer {} failed: {}", peer_addr_str, err);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
None => {
|
||||
error!(
|
||||
"TLS acceptor is not available, but TLS is enabled. This is a bug. Dropping connection from {}",
|
||||
peer_addr_str
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let http_server_clone = http_server.clone();
|
||||
let value_clone = value.clone();
|
||||
let graceful_clone = graceful.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
tokio::runtime::Runtime::new()
|
||||
.expect("Failed to create runtime")
|
||||
.block_on(async move {
|
||||
let conn = http_server_clone.serve_connection(TokioIo::new(tls_socket), value_clone);
|
||||
let conn = graceful_clone.watch(conn);
|
||||
if let Err(err) = conn.await {
|
||||
// Handle hyper::Error and low-level IO errors at a more granular level
|
||||
handle_connection_error(&*err);
|
||||
}
|
||||
});
|
||||
});
|
||||
debug!("TLS handshake success");
|
||||
} else {
|
||||
debug!("Http handshake start");
|
||||
|
||||
let http_server_clone = http_server.clone();
|
||||
let value_clone = value.clone();
|
||||
let graceful_clone = graceful.clone();
|
||||
tokio::spawn(async move {
|
||||
let conn = http_server_clone.serve_connection(TokioIo::new(socket), value_clone);
|
||||
let conn = graceful_clone.watch(conn);
|
||||
if let Err(err) = conn.await {
|
||||
// Handle hyper::Error and low-level IO errors at a more granular level
|
||||
handle_connection_error(&*err);
|
||||
}
|
||||
});
|
||||
debug!("Http handshake success");
|
||||
}
|
||||
process_connection(socket, tls_acceptor.clone(), http_server.clone(), s3_service.clone(), graceful.clone());
|
||||
}
|
||||
|
||||
worker_state_manager.update(ServiceState::Stopping);
|
||||
match Arc::try_unwrap(graceful) {
|
||||
Ok(g) => {
|
||||
// Successfully obtaining unique ownership, you can call shutdown
|
||||
tokio::select! {
|
||||
() = g.shutdown() => {
|
||||
debug!("Gracefully shutdown!");
|
||||
@@ -546,9 +389,7 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
}
|
||||
}
|
||||
Err(arc_graceful) => {
|
||||
// There are other references that cannot be obtained for unique ownership
|
||||
error!("Cannot perform graceful shutdown, other references exist err: {:?}", arc_graceful);
|
||||
// In this case, we can only wait for the timeout
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
debug!("Timeout reached, forcing shutdown");
|
||||
}
|
||||
@@ -591,14 +432,12 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
init_data_scanner().await;
|
||||
// init auto heal
|
||||
init_auto_heal().await;
|
||||
|
||||
// init console configuration
|
||||
init_console_cfg(local_ip, server_port);
|
||||
|
||||
print_server_info();
|
||||
init_bucket_replication_pool().await;
|
||||
|
||||
print_server_info();
|
||||
|
||||
// Async update check (optional)
|
||||
tokio::spawn(async {
|
||||
use crate::update_checker::{UpdateCheckError, check_updates};
|
||||
@@ -666,6 +505,103 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Process a single incoming TCP connection.
|
||||
///
|
||||
/// This function is executed in a new Tokio task and it will:
|
||||
/// 1. If TLS is configured, perform TLS handshake.
|
||||
/// 2. Build a complete service stack for this connection, including S3, RPC services, and all middleware.
|
||||
/// 3. Use Hyper to handle HTTP requests on this connection.
|
||||
/// 4. Incorporate connections into the management of elegant closures.
|
||||
#[instrument(skip_all, fields(peer_addr = %socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|_| "unknown".to_string())))]
|
||||
fn process_connection(
|
||||
socket: TcpStream,
|
||||
tls_acceptor: Option<Arc<TlsAcceptor>>,
|
||||
http_server: Arc<ConnBuilder<TokioExecutor>>,
|
||||
s3_service: S3Service,
|
||||
graceful: Arc<GracefulShutdown>,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
// Build services inside each connected task to avoid passing complex service types across tasks,
|
||||
// It also ensures that each connection has an independent service instance.
|
||||
let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth);
|
||||
let hybrid_service = ServiceBuilder::new()
|
||||
.layer(CatchPanicLayer::new())
|
||||
.layer(
|
||||
TraceLayer::new_for_http()
|
||||
.make_span_with(|request: &HttpRequest<_>| {
|
||||
let span = tracing::info_span!("http-request",
|
||||
status_code = tracing::field::Empty,
|
||||
method = %request.method(),
|
||||
uri = %request.uri(),
|
||||
version = ?request.version(),
|
||||
);
|
||||
for (header_name, header_value) in request.headers() {
|
||||
if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length" {
|
||||
span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid"));
|
||||
}
|
||||
}
|
||||
|
||||
span
|
||||
})
|
||||
.on_request(|request: &HttpRequest<_>, _span: &Span| {
|
||||
info!(
|
||||
counter.rustfs_api_requests_total = 1_u64,
|
||||
key_request_method = %request.method().to_string(),
|
||||
key_request_uri_path = %request.uri().path().to_owned(),
|
||||
"handle request api total",
|
||||
);
|
||||
debug!("http started method: {}, url path: {}", request.method(), request.uri().path())
|
||||
})
|
||||
.on_response(|response: &Response<_>, latency: Duration, _span: &Span| {
|
||||
_span.record("http response status_code", tracing::field::display(response.status()));
|
||||
debug!("http response generated in {:?}", latency)
|
||||
})
|
||||
.on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
|
||||
info!(histogram.request.body.len = chunk.len(), "histogram request body length",);
|
||||
debug!("http body sending {} bytes in {:?}", chunk.len(), latency)
|
||||
})
|
||||
.on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
|
||||
debug!("http stream closed after {:?}", stream_duration)
|
||||
})
|
||||
.on_failure(|_error, latency: Duration, _span: &Span| {
|
||||
info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total");
|
||||
debug!("http request failure error: {:?} in {:?}", _error, latency)
|
||||
}),
|
||||
)
|
||||
.layer(CorsLayer::permissive())
|
||||
.service(hybrid(s3_service, rpc_service));
|
||||
let hybrid_service = TowerToHyperService::new(hybrid_service);
|
||||
|
||||
// Decide whether to handle HTTPS or HTTP connections based on the existence of TLS Acceptor
|
||||
if let Some(acceptor) = tls_acceptor {
|
||||
debug!("TLS handshake start");
|
||||
match acceptor.accept(socket).await {
|
||||
Ok(tls_socket) => {
|
||||
debug!("TLS handshake successful");
|
||||
let stream = TokioIo::new(tls_socket);
|
||||
let conn = http_server.serve_connection(stream, hybrid_service);
|
||||
if let Err(err) = graceful.watch(conn).await {
|
||||
handle_connection_error(&*err);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!(?err, "TLS handshake failed");
|
||||
return; // Failed to end the task directly
|
||||
}
|
||||
}
|
||||
debug!("TLS handshake success");
|
||||
} else {
|
||||
debug!("Http handshake start");
|
||||
let stream = TokioIo::new(socket);
|
||||
let conn = http_server.serve_connection(stream, hybrid_service);
|
||||
if let Err(err) = graceful.watch(conn).await {
|
||||
handle_connection_error(&*err);
|
||||
}
|
||||
debug!("Http handshake success");
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/// Handles the shutdown process of the server
|
||||
async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) {
|
||||
info!("Shutdown signal received in main thread");
|
||||
|
||||
Reference in New Issue
Block a user