diff --git a/Cargo.lock b/Cargo.lock index d4cce293..ee080ea1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -458,45 +458,6 @@ dependencies = [ "zbus 5.7.1", ] -[[package]] -name = "asn1-rs" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56624a96882bb8c26d61312ae18cb45868e5a9992ea73c58e45c3101e56a1e60" -dependencies = [ - "asn1-rs-derive", - "asn1-rs-impl", - "displaydoc", - "nom 7.1.3", - "num-traits", - "rusticata-macros", - "thiserror 2.0.12", - "time", -] - -[[package]] -name = "asn1-rs-derive" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3109e49b1e4909e9db6515a30c633684d68cdeaa252f215214cb4fa1a5bfee2c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.101", - "synstructure", -] - -[[package]] -name = "asn1-rs-impl" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.101", -] - [[package]] name = "async-broadcast" version = "0.7.2" @@ -2097,7 +2058,6 @@ dependencies = [ "ciborium", "clap", "criterion-plot", - "futures", "is-terminal", "itertools 0.10.5", "num-traits", @@ -2110,7 +2070,6 @@ dependencies = [ "serde_derive", "serde_json", "tinytemplate", - "tokio", "walkdir", ] @@ -2870,20 +2829,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "der-parser" -version = "10.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07da5016415d5a3c4dd39b11ed26f915f52fc4e0dc197d87908bc916e51bc1a6" -dependencies = [ - "asn1-rs", - "displaydoc", - "nom 7.1.3", - "num-bigint", - "num-traits", - "rusticata-macros", -] - [[package]] name = "deranged" version = "0.4.0" @@ -6394,15 +6339,6 @@ dependencies = [ "walkdir", ] -[[package]] -name = "oid-registry" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12f40cff3dde1b6087cc5d5f5d4d65712f34016a03ed60e9c08dcc392736b5b7" -dependencies = [ - "asn1-rs", -] - [[package]] name = "once_cell" version = "1.21.3" @@ -7947,7 +7883,6 @@ dependencies = [ "rustfs-utils", "rustfs-zip", "rustls 0.23.28", - "rustls-pki-types", "s3s", "serde", "serde_json", @@ -7968,7 +7903,6 @@ dependencies = [ "tracing", "urlencoding", "uuid", - "x509-parser", "zip", ] @@ -8287,7 +8221,6 @@ dependencies = [ "aes-gcm", "bytes", "crc32fast", - "criterion", "futures", "http 1.3.1", "md-5", @@ -8447,15 +8380,6 @@ dependencies = [ "tokio-tar", ] -[[package]] -name = "rusticata-macros" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" -dependencies = [ - "nom 7.1.3", -] - [[package]] name = "rustix" version = "0.38.44" @@ -11527,24 +11451,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "x509-parser" -version = "0.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4569f339c0c402346d4a75a9e39cf8dad310e287eef1ff56d4c68e5067f53460" -dependencies = [ - "asn1-rs", - "data-encoding", - "der-parser", - "lazy_static", - "nom 7.1.3", - "oid-registry", - "ring", - "rusticata-macros", - "thiserror 2.0.12", - "time", -] - [[package]] name = "xattr" version = "1.5.0" diff --git a/Cargo.toml b/Cargo.toml index f9855c51..a7591bdb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,12 +78,12 @@ aes-gcm = { version = "0.10.3", features = ["std"] } arc-swap = "1.7.1" argon2 = { version = "0.5.3", features = ["std"] } atoi = "2.0.0" -async-channel = "2.3.1" +async-channel = "2.4.0" async-recursion = "1.1.1" async-trait = "0.1.88" -async-compression = { version = "0.4.0" } +async-compression = { version = "0.4.25" } atomic_enum = "0.3.0" -aws-sdk-s3 = "1.95.0" +aws-sdk-s3 = "1.96.0" axum = "0.8.4" axum-extra = "0.10.1" axum-server = { version = "0.7.2", features = ["tls-rustls"] } @@ -107,7 +107,7 @@ dioxus = { version = "0.6.3", features = ["router"] } dirs = "6.0.0" enumset = "1.1.6" flatbuffers = "25.2.10" -flate2 = "1.1.1" +flate2 = "1.1.2" flexi_logger = { version = "0.31.2", features = ["trc", "dont_minimize_extra_stacks"] } form_urlencoded = "1.2.1" futures = "0.3.31" @@ -124,7 +124,7 @@ hyper-util = { version = "0.1.14", features = [ "server-auto", "server-graceful", ] } -hyper-rustls = "0.27.5" +hyper-rustls = "0.27.7" http = "1.3.1" http-body = "1.0.1" humantime = "2.2.0" @@ -195,7 +195,7 @@ rmp-serde = "1.3.0" rsa = "0.9.8" rumqttc = { version = "0.24" } rust-embed = { version = "8.7.2" } -rust-i18n = { version = "3.1.4" } +rust-i18n = { version = "3.1.5" } rustfs-rsc = "2025.506.1" rustls = { version = "0.23.28" } rustls-pki-types = "1.12.0" @@ -252,7 +252,7 @@ wildmatch = { version = "2.4.0", features = ["serde"] } winapi = { version = "0.3.9" } x509-parser = "0.17.0" xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] } -zip = "2.2.0" +zip = "2.4.2" zstd = "0.13.3" [profile.wasm-dev] diff --git a/crates/rio/Cargo.toml b/crates/rio/Cargo.toml index 1c3d806c..36c9cdea 100644 --- a/crates/rio/Cargo.toml +++ b/crates/rio/Cargo.toml @@ -40,5 +40,5 @@ serde_json.workspace = true md-5 = { workspace = true } [dev-dependencies] -criterion = { version = "0.5.1", features = ["async", "async_tokio", "tokio"] } +#criterion = { version = "0.5.1", features = ["async", "async_tokio", "tokio"] } tokio-test = "0.4" diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 3f19adac..ddf79d30 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -69,7 +69,6 @@ percent-encoding = { workspace = true } pin-project-lite.workspace = true reqwest = { workspace = true } rustls.workspace = true -rustls-pki-types = { workspace = true } rust-embed = { workspace = true, features = ["interpolate-folder-path"] } s3s.workspace = true serde.workspace = true @@ -102,7 +101,6 @@ tower-http = { workspace = true, features = [ urlencoding = { workspace = true } uuid = { workspace = true } zip = { workspace = true } -x509-parser = { workspace = true, features = ["verify", "validate"] } [target.'cfg(target_os = "linux")'.dependencies] diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 1e974ef6..ecb76259 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -44,7 +44,6 @@ use hyper_util::{ use license::init_license; use rustfs_common::globals::set_global_addr; use rustfs_config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; -use rustfs_ecstore::StorageAPI; use rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys; use rustfs_ecstore::cmd::bucket_replication::init_bucket_replication_pool; use rustfs_ecstore::config as ecconfig; @@ -53,18 +52,16 @@ use rustfs_ecstore::heal::background_heal_ops::init_auto_heal; use rustfs_ecstore::rpc::make_server; use rustfs_ecstore::store_api::BucketOptions; use rustfs_ecstore::{ - endpoints::EndpointServerPools, - heal::data_scanner::init_data_scanner, - set_global_endpoints, - store::{ECStore, init_local_disks}, + StorageAPI, endpoints::EndpointServerPools, global::set_global_rustfs_port, heal::data_scanner::init_data_scanner, + notification_sys::new_global_notification_sys, set_global_endpoints, store::ECStore, store::init_local_disks, update_erasure_type, }; -use rustfs_ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys}; use rustfs_iam::init_iam_sys; use rustfs_obs::{SystemObserver, init_obs, set_global_guard}; use rustfs_protos::proto_gen::node_service::node_service_server::NodeServiceServer; use rustfs_utils::net::parse_and_resolve_address; use rustls::ServerConfig; +use s3s::service::S3Service; use s3s::{host::MultiDomain, service::S3ServiceBuilder}; use service::hybrid; use socket2::SockRef; @@ -72,11 +69,12 @@ use std::io::{Error, Result}; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use tokio::net::TcpListener; +use tokio::net::{TcpListener, TcpStream}; #[cfg(unix)] use tokio::signal::unix::{SignalKind, signal}; use tokio_rustls::TlsAcceptor; use tonic::{Request, Status, metadata::MetadataValue}; +use tower::ServiceBuilder; use tower_http::catch_panic::CatchPanicLayer; use tower_http::cors::CorsLayer; use tower_http::trace::TraceLayer; @@ -129,6 +127,49 @@ async fn main() -> Result<()> { run(opt).await } +/// Sets up the TLS acceptor if certificates are available. +#[instrument(skip(tls_path))] +async fn setup_tls_acceptor(tls_path: &str) -> Result> { + if tls_path.is_empty() || tokio::fs::metadata(tls_path).await.is_err() { + debug!("TLS path is not provided or does not exist, starting with HTTP"); + return Ok(None); + } + + debug!("Found TLS directory, checking for certificates"); + + // 1. Try to load all certificates from the directory (multi-cert support) + if let Ok(cert_key_pairs) = rustfs_utils::load_all_certs_from_directory(tls_path) { + if !cert_key_pairs.is_empty() { + debug!("Found {} certificates, creating multi-cert resolver", cert_key_pairs.len()); + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let mut server_config = ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(Arc::new(rustfs_utils::create_multi_cert_resolver(cert_key_pairs)?)); + server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; + return Ok(Some(TlsAcceptor::from(Arc::new(server_config)))); + } + } + + // 2. Fallback to legacy single certificate mode + let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}"); + let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}"); + if tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok() { + debug!("Found legacy single TLS certificate, starting with HTTPS"); + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let certs = rustfs_utils::load_certs(&cert_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?; + let key = rustfs_utils::load_private_key(&key_path).map_err(|e| rustfs_utils::certs_error(e.to_string()))?; + let mut server_config = ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(certs, key) + .map_err(|e| rustfs_utils::certs_error(e.to_string()))?; + server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; + return Ok(Some(TlsAcceptor::from(Arc::new(server_config)))); + } + + debug!("No valid TLS certificates found in the directory, starting with HTTP"); + Ok(None) +} + #[instrument(skip(opt))] async fn run(opt: config::Opt) -> Result<()> { debug!("opt: {:?}", &opt); @@ -148,7 +189,6 @@ async fn run(opt: config::Opt) -> Result<()> { let listener = TcpListener::bind(server_address.clone()).await?; // Obtain the listener address let local_addr: SocketAddr = listener.local_addr()?; - // let local_ip = utils::get_local_ip().ok_or(local_addr.ip()).unwrap(); let local_ip = rustfs_utils::get_local_ip().ok_or(local_addr.ip()).unwrap(); // For RPC @@ -204,18 +244,14 @@ async fn run(opt: config::Opt) -> Result<()> { // This project uses the S3S library to implement S3 services let s3_service = { let store = storage::ecfs::FS::new(); - // let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(server_address.clone(), endpoint_pools).await?); let mut b = S3ServiceBuilder::new(store.clone()); let access_key = opt.access_key.clone(); let secret_key = opt.secret_key.clone(); - // Displays info information debug!("authentication is enabled {}, {}", &access_key, &secret_key); b.set_auth(IAMAuth::new(access_key, secret_key)); - b.set_access(store.clone()); - b.set_route(admin::make_admin_route()?); if !opt.server_domains.is_empty() { @@ -223,20 +259,6 @@ async fn run(opt: config::Opt) -> Result<()> { b.set_host(MultiDomain::new(&opt.server_domains).map_err(Error::other)?); } - // // Enable parsing virtual-hosted-style requests - // if let Some(dm) = opt.domain_name { - // info!("virtual-hosted-style requests are enabled use domain_name {}", &dm); - // b.set_base_domain(dm); - // } - - // if domain_name.is_some() { - // info!( - // "virtual-hosted-style requests are enabled use domain_name {}", - // domain_name.as_ref().unwrap() - // ); - // b.set_base_domain(domain_name.unwrap()); - // } - b.build() }; @@ -254,57 +276,8 @@ async fn run(opt: config::Opt) -> Result<()> { } }); - let tls_path = opt.tls_path.clone().unwrap_or_default(); - let has_tls_certs = tokio::fs::metadata(&tls_path).await.is_ok(); - let tls_acceptor = if has_tls_certs { - debug!("Found TLS directory, checking for certificates"); + let tls_acceptor = setup_tls_acceptor(opt.tls_path.as_deref().unwrap_or_default()).await?; - // 1. Try to load all certificates directly (including root and subdirectories) - match rustfs_utils::load_all_certs_from_directory(&tls_path) { - Ok(cert_key_pairs) if !cert_key_pairs.is_empty() => { - debug!("Found {} certificates, starting with HTTPS", cert_key_pairs.len()); - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - - // create a multi certificate configuration - let mut server_config = ServerConfig::builder() - .with_no_client_auth() - .with_cert_resolver(Arc::new(rustfs_utils::create_multi_cert_resolver(cert_key_pairs)?)); - - server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; - Some(TlsAcceptor::from(Arc::new(server_config))) - } - _ => { - // 2. If the synthesis fails, fall back to the traditional document certificate mode (backward compatible) - let key_path = format!("{tls_path}/{RUSTFS_TLS_KEY}"); - let cert_path = format!("{tls_path}/{RUSTFS_TLS_CERT}"); - let has_single_cert = - tokio::try_join!(tokio::fs::metadata(key_path.clone()), tokio::fs::metadata(cert_path.clone())).is_ok(); - - if has_single_cert { - debug!("Found legacy single TLS certificate, starting with HTTPS"); - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - let certs = - rustfs_utils::load_certs(cert_path.as_str()).map_err(|e| rustfs_utils::certs_error(e.to_string()))?; - let key = rustfs_utils::load_private_key(key_path.as_str()) - .map_err(|e| rustfs_utils::certs_error(e.to_string()))?; - let mut server_config = ServerConfig::builder() - .with_no_client_auth() - .with_single_cert(certs, key) - .map_err(|e| rustfs_utils::certs_error(e.to_string()))?; - server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; - Some(TlsAcceptor::from(Arc::new(server_config))) - } else { - debug!("No valid TLS certificates found, starting with HTTP"); - None - } - } - } - } else { - debug!("TLS certificates not found, starting with HTTP"); - None - }; - - let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth); let state_manager = ServiceStateManager::new(); let worker_state_manager = state_manager.clone(); // Update service status to Starting @@ -318,73 +291,11 @@ async fn run(opt: config::Opt) -> Result<()> { #[cfg(unix)] let (mut sigterm_inner, mut sigint_inner) = { // Unix platform specific code - let sigterm_inner = match signal(SignalKind::terminate()) { - Ok(signal) => signal, - Err(e) => { - error!("Failed to create SIGTERM signal handler: {}", e); - return; - } - }; - let sigint_inner = match signal(SignalKind::interrupt()) { - Ok(signal) => signal, - Err(e) => { - error!("Failed to create SIGINT signal handler: {}", e); - return; - } - }; + let sigterm_inner = signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal handler"); + let sigint_inner = signal(SignalKind::interrupt()).expect("Failed to create SIGINT signal handler"); (sigterm_inner, sigint_inner) }; - let hybrid_service = TowerToHyperService::new( - tower::ServiceBuilder::new() - .layer(CatchPanicLayer::new()) - .layer( - TraceLayer::new_for_http() - .make_span_with(|request: &HttpRequest<_>| { - let span = tracing::info_span!("http-request", - status_code = tracing::field::Empty, - method = %request.method(), - uri = %request.uri(), - version = ?request.version(), - ); - for (header_name, header_value) in request.headers() { - if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length" - { - span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid")); - } - } - - span - }) - .on_request(|request: &HttpRequest<_>, _span: &Span| { - info!( - counter.rustfs_api_requests_total = 1_u64, - key_request_method = %request.method().to_string(), - key_request_uri_path = %request.uri().path().to_owned(), - "handle request api total", - ); - debug!("http started method: {}, url path: {}", request.method(), request.uri().path()) - }) - .on_response(|response: &Response<_>, latency: Duration, _span: &Span| { - _span.record("http response status_code", tracing::field::display(response.status())); - debug!("http response generated in {:?}", latency) - }) - .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| { - info!(histogram.request.body.len = chunk.len(), "histogram request body length",); - debug!("http body sending {} bytes in {:?}", chunk.len(), latency) - }) - .on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| { - debug!("http stream closed after {:?}", stream_duration) - }) - .on_failure(|_error, latency: Duration, _span: &Span| { - info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total"); - debug!("http request failure error: {:?} in {:?}", _error, latency) - }), - ) - .layer(CorsLayer::permissive()) - .service(hybrid(s3_service, rpc_service)), - ); - let http_server = Arc::new(ConnBuilder::new(TokioExecutor::new())); let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c()); let graceful = Arc::new(GracefulShutdown::new()); @@ -392,42 +303,36 @@ async fn run(opt: config::Opt) -> Result<()> { // service ready worker_state_manager.update(ServiceState::Ready); - let value = hybrid_service.clone(); + let tls_acceptor = tls_acceptor.map(Arc::new); + loop { - debug!("waiting for SIGINT or SIGTERM has_tls_certs: {}", has_tls_certs); - // Wait for a connection + debug!("Waiting for new connection..."); let (socket, _) = { #[cfg(unix)] { tokio::select! { - res = listener.accept() => { - match res { - Ok(conn) => conn, - Err(err) => { - error!("error accepting connection: {err}"); - continue; - } + res = listener.accept() => match res { + Ok(conn) => conn, + Err(err) => { + error!("error accepting connection: {err}"); + continue; } - } - + }, _ = ctrl_c.as_mut() => { info!("Ctrl-C received in worker thread"); let _ = shutdown_tx_clone.send(()); break; - } - + }, Some(_) = sigint_inner.recv() => { info!("SIGINT received in worker thread"); let _ = shutdown_tx_clone.send(()); break; - } - + }, Some(_) = sigterm_inner.recv() => { info!("SIGTERM received in worker thread"); let _ = shutdown_tx_clone.send(()); break; - } - + }, _ = shutdown_rx.recv() => { info!("Shutdown signal received in worker thread"); break; @@ -437,22 +342,18 @@ async fn run(opt: config::Opt) -> Result<()> { #[cfg(not(unix))] { tokio::select! { - res = listener.accept() => { - match res { - Ok(conn) => conn, - Err(err) => { - error!("error accepting connection: {err}"); - continue; - } + res = listener.accept() => match res { + Ok(conn) => conn, + Err(err) => { + error!("error accepting connection: {err}"); + continue; } - } - + }, _ = ctrl_c.as_mut() => { info!("Ctrl-C received in worker thread"); let _ = shutdown_tx_clone.send(()); break; - } - + }, _ = shutdown_rx.recv() => { info!("Shutdown signal received in worker thread"); break; @@ -472,70 +373,12 @@ async fn run(opt: config::Opt) -> Result<()> { warn!(?err, "Failed to set set_send_buffer_size"); } - if has_tls_certs { - debug!("TLS certificates found, starting with SIGINT"); - let peer_addr_str = socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|e| { - warn!("Could not get peer address: {}", e); - "unknown".to_string() - }); - let tls_socket = match tls_acceptor.as_ref() { - Some(acceptor) => match acceptor.accept(socket).await { - Ok(tls_socket) => { - info!("TLS handshake successful with peer: {}", peer_addr_str); - tls_socket - } - Err(err) => { - error!("TLS handshake with peer {} failed: {}", peer_addr_str, err); - continue; - } - }, - None => { - error!( - "TLS acceptor is not available, but TLS is enabled. This is a bug. Dropping connection from {}", - peer_addr_str - ); - continue; - } - }; - - let http_server_clone = http_server.clone(); - let value_clone = value.clone(); - let graceful_clone = graceful.clone(); - - tokio::task::spawn_blocking(move || { - tokio::runtime::Runtime::new() - .expect("Failed to create runtime") - .block_on(async move { - let conn = http_server_clone.serve_connection(TokioIo::new(tls_socket), value_clone); - let conn = graceful_clone.watch(conn); - if let Err(err) = conn.await { - // Handle hyper::Error and low-level IO errors at a more granular level - handle_connection_error(&*err); - } - }); - }); - debug!("TLS handshake success"); - } else { - debug!("Http handshake start"); - - let http_server_clone = http_server.clone(); - let value_clone = value.clone(); - let graceful_clone = graceful.clone(); - tokio::spawn(async move { - let conn = http_server_clone.serve_connection(TokioIo::new(socket), value_clone); - let conn = graceful_clone.watch(conn); - if let Err(err) = conn.await { - // Handle hyper::Error and low-level IO errors at a more granular level - handle_connection_error(&*err); - } - }); - debug!("Http handshake success"); - } + process_connection(socket, tls_acceptor.clone(), http_server.clone(), s3_service.clone(), graceful.clone()); } + worker_state_manager.update(ServiceState::Stopping); match Arc::try_unwrap(graceful) { Ok(g) => { - // Successfully obtaining unique ownership, you can call shutdown tokio::select! { () = g.shutdown() => { debug!("Gracefully shutdown!"); @@ -546,9 +389,7 @@ async fn run(opt: config::Opt) -> Result<()> { } } Err(arc_graceful) => { - // There are other references that cannot be obtained for unique ownership error!("Cannot perform graceful shutdown, other references exist err: {:?}", arc_graceful); - // In this case, we can only wait for the timeout tokio::time::sleep(Duration::from_secs(10)).await; debug!("Timeout reached, forcing shutdown"); } @@ -591,14 +432,12 @@ async fn run(opt: config::Opt) -> Result<()> { init_data_scanner().await; // init auto heal init_auto_heal().await; - + // init console configuration init_console_cfg(local_ip, server_port); print_server_info(); init_bucket_replication_pool().await; - print_server_info(); - // Async update check (optional) tokio::spawn(async { use crate::update_checker::{UpdateCheckError, check_updates}; @@ -666,6 +505,103 @@ async fn run(opt: config::Opt) -> Result<()> { Ok(()) } +/// Process a single incoming TCP connection. +/// +/// This function is executed in a new Tokio task and it will: +/// 1. If TLS is configured, perform TLS handshake. +/// 2. Build a complete service stack for this connection, including S3, RPC services, and all middleware. +/// 3. Use Hyper to handle HTTP requests on this connection. +/// 4. Incorporate connections into the management of elegant closures. +#[instrument(skip_all, fields(peer_addr = %socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|_| "unknown".to_string())))] +fn process_connection( + socket: TcpStream, + tls_acceptor: Option>, + http_server: Arc>, + s3_service: S3Service, + graceful: Arc, +) { + tokio::spawn(async move { + // Build services inside each connected task to avoid passing complex service types across tasks, + // It also ensures that each connection has an independent service instance. + let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth); + let hybrid_service = ServiceBuilder::new() + .layer(CatchPanicLayer::new()) + .layer( + TraceLayer::new_for_http() + .make_span_with(|request: &HttpRequest<_>| { + let span = tracing::info_span!("http-request", + status_code = tracing::field::Empty, + method = %request.method(), + uri = %request.uri(), + version = ?request.version(), + ); + for (header_name, header_value) in request.headers() { + if header_name == "user-agent" || header_name == "content-type" || header_name == "content-length" { + span.record(header_name.as_str(), header_value.to_str().unwrap_or("invalid")); + } + } + + span + }) + .on_request(|request: &HttpRequest<_>, _span: &Span| { + info!( + counter.rustfs_api_requests_total = 1_u64, + key_request_method = %request.method().to_string(), + key_request_uri_path = %request.uri().path().to_owned(), + "handle request api total", + ); + debug!("http started method: {}, url path: {}", request.method(), request.uri().path()) + }) + .on_response(|response: &Response<_>, latency: Duration, _span: &Span| { + _span.record("http response status_code", tracing::field::display(response.status())); + debug!("http response generated in {:?}", latency) + }) + .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| { + info!(histogram.request.body.len = chunk.len(), "histogram request body length",); + debug!("http body sending {} bytes in {:?}", chunk.len(), latency) + }) + .on_eos(|_trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| { + debug!("http stream closed after {:?}", stream_duration) + }) + .on_failure(|_error, latency: Duration, _span: &Span| { + info!(counter.rustfs_api_requests_failure_total = 1_u64, "handle request api failure total"); + debug!("http request failure error: {:?} in {:?}", _error, latency) + }), + ) + .layer(CorsLayer::permissive()) + .service(hybrid(s3_service, rpc_service)); + let hybrid_service = TowerToHyperService::new(hybrid_service); + + // Decide whether to handle HTTPS or HTTP connections based on the existence of TLS Acceptor + if let Some(acceptor) = tls_acceptor { + debug!("TLS handshake start"); + match acceptor.accept(socket).await { + Ok(tls_socket) => { + debug!("TLS handshake successful"); + let stream = TokioIo::new(tls_socket); + let conn = http_server.serve_connection(stream, hybrid_service); + if let Err(err) = graceful.watch(conn).await { + handle_connection_error(&*err); + } + } + Err(err) => { + error!(?err, "TLS handshake failed"); + return; // Failed to end the task directly + } + } + debug!("TLS handshake success"); + } else { + debug!("Http handshake start"); + let stream = TokioIo::new(socket); + let conn = http_server.serve_connection(stream, hybrid_service); + if let Err(err) = graceful.watch(conn).await { + handle_connection_error(&*err); + } + debug!("Http handshake success"); + }; + }); +} + /// Handles the shutdown process of the server async fn handle_shutdown(state_manager: &ServiceStateManager, shutdown_tx: &tokio::sync::broadcast::Sender<()>) { info!("Shutdown signal received in main thread");