feat: improve address binding and port handling mechanism (#366)

* feat: improve address binding and port handling mechanism

1. Add support for ":port" format to enable dual-stack binding (IPv4/IPv6)
2. Implement automatic port allocation when port 0 is specified
3. Optimize server startup process with unified address resolution
4. Enhance error handling and logging for address resolution
5. Improve graceful shutdown with signal listening
6. Clean up commented code in console.rs

Files:
- ecstore/src/utils/net.rs
- rustfs/src/console.rs
- rustfs/src/main.rs

Branch: feature/server-and-console-port

* improve code for console

* improve code

* improve code for console and net.rs

* Update rustfs/src/main.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update rustfs/src/utils/mod.rs

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
houseme
2025-04-27 23:44:26 +08:00
committed by GitHub
parent e8f8a0872d
commit e9d6e2ca95
7 changed files with 259 additions and 86 deletions

View File

@@ -106,7 +106,6 @@ opentelemetry-otlp = { version = "0.29.0" }
opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] }
parking_lot = "0.12.3"
pin-project-lite = "0.2.16"
prometheus = "0.14.0"
# pin-utils = "0.1.0"
prost = "0.13.5"
prost-build = "0.13.5"

View File

@@ -3,7 +3,7 @@ use lazy_static::lazy_static;
use std::{
collections::HashSet,
fmt::Display,
net::{IpAddr, SocketAddr, TcpListener, ToSocketAddrs},
net::{IpAddr, Ipv6Addr, SocketAddr, TcpListener, ToSocketAddrs},
};
use url::Host;
@@ -141,6 +141,33 @@ impl TryFrom<String> for XHost {
}
}
/// parses the address string, process the ":port" format for double-stack binding,
/// and resolve the host name or IP address. If the port is 0, an available port is assigned.
pub fn parse_and_resolve_address(addr_str: &str) -> Result<SocketAddr> {
let resolved_addr: SocketAddr = if let Some(port) = addr_str.strip_prefix(":") {
// Process the ":port" format for double stack binding
let port_str = port;
let port: u16 = port_str
.parse()
.map_err(|e| Error::from_string(format!("Invalid port format: {}, err:{:?}", addr_str, e)))?;
let final_port = if port == 0 {
get_available_port() // assume get_available_port is available here
} else {
port
};
// Using IPv6 without address specified [::], it should handle both IPv4 and IPv6
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), final_port)
} else {
// Use existing logic to handle regular address formats
let mut addr = check_local_server_addr(addr_str)?; // assume check_local_server_addr is available here
if addr.port() == 0 {
addr.set_port(get_available_port());
}
addr
};
Ok(resolved_addr)
}
#[cfg(test)]
mod test {
use std::net::Ipv4Addr;

View File

@@ -29,11 +29,11 @@ pub const DEFAULT_OBS_CONFIG: &str = "config/obs.toml";
/// Default TLS key for rustfs
/// This is the default key for TLS.
pub(crate) const RUSTFS_TLS_KEY: &str = "rustfs_private.key";
pub(crate) const RUSTFS_TLS_KEY: &str = "rustfs_key.pem";
/// Default TLS cert for rustfs
/// This is the default cert for TLS.
pub(crate) const RUSTFS_TLS_CERT: &str = "rustfs_public.crt";
pub(crate) const RUSTFS_TLS_CERT: &str = "rustfs_cert.pem";
#[allow(clippy::const_is_empty)]
const SHORT_VERSION: &str = {

View File

@@ -207,42 +207,6 @@ async fn config_handler(uri: Uri, Host(host): Host) -> impl IntoResponse {
let mut cfg = CONSOLE_CONFIG.get().unwrap().clone();
let url = format!("{}://{}:{}", scheme, host, cfg.port);
// // 如果指定入口,直接使用
// let url = if let Some(endpoint) = &config::get_config().console_fs_endpoint {
// debug!("axum Using rustfs endpoint address: {}", endpoint);
// endpoint.clone()
// } else {
// let host_with_port = if host.contains(':') {
// host.clone()
// } else {
// format!("{}:80", host)
// };
// // 尝试解析为 socket address但不强制要求一定要是 IP 地址
// let socket_addr = host_with_port.to_socket_addrs().ok().and_then(|mut addrs| addrs.next());
// debug!("axum Using host with port: {}, Socket address: {:?}", host_with_port, socket_addr);
// match socket_addr {
// Some(addr) if addr.ip().is_ipv4() => {
// let ipv4 = addr.ip().to_string();
// // 如果是私有 IP、环回地址或未指定地址保留原始域名
// if is_private_ip(addr.ip()) || addr.ip().is_loopback() || addr.ip().is_unspecified() {
// let (host, _) = host_with_port.split_once(':').unwrap_or((&host, "80"));
// debug!("axum Using private IPv4 address: {}", host);
// format!("http://{}:{}", host, cfg.port)
// } else {
// debug!("axum Using public IPv4 address");
// format!("http://{}:{}", ipv4, cfg.port)
// }
// }
// _ => {
// // 如果不是有效的 IPv4 地址,保留原始域名
// let (host, _) = host_with_port.split_once(':').unwrap_or((&host, "80"));
// debug!("axum Using domain address: {}", host);
// format!("http://{}:{}", host, cfg.port)
// }
// }
// };
cfg.api.base_url = format!("{}{}", url, RUSTFS_ADMIN_PREFIX);
cfg.s3.endpoint = url;
@@ -273,26 +237,29 @@ pub async fn start_static_file_server(
.layer(cors)
.layer(tower_http::compression::CompressionLayer::new().gzip(true).deflate(true))
.layer(TraceLayer::new_for_http());
let local_addr: SocketAddr = addrs.parse().expect("Failed to parse socket address");
info!("WebUI: http://{}:{} http://127.0.0.1:{}", local_ip, local_addr.port(), local_addr.port());
use ecstore::utils::net;
let server_addr = net::parse_and_resolve_address(addrs).expect("Failed to parse socket address");
let server_port = server_addr.port();
let server_address = server_addr.to_string();
info!(
"WebUI: http://{}:{} http://127.0.0.1:{} http://{}",
local_ip, server_port, server_port, server_address
);
info!(" RootUser: {}", access_key);
info!(" RootPass: {}", secret_key);
// Check and start the HTTPS/HTTP server
match start_server(addrs, local_addr, tls_path, app.clone()).await {
match start_server(server_addr, tls_path, app.clone()).await {
Ok(_) => info!("Server shutdown gracefully"),
Err(e) => error!("Server error: {}", e),
}
}
async fn start_server(addrs: &str, local_addr: SocketAddr, tls_path: Option<String>, app: Router) -> io::Result<()> {
async fn start_server(server_addr: SocketAddr, tls_path: Option<String>, app: Router) -> io::Result<()> {
let tls_path = tls_path.unwrap_or_default();
let key_path = format!("{}/{}", tls_path, RUSTFS_TLS_KEY);
let cert_path = format!("{}/{}", tls_path, RUSTFS_TLS_CERT);
let addr = addrs
.parse::<SocketAddr>()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid address: {}", e)))?;
let handle = axum_server::Handle::new();
// create a signal off listening task
let handle_clone = handle.clone();
@@ -309,24 +276,24 @@ async fn start_server(addrs: &str, local_addr: SocketAddr, tls_path: Option<Stri
match RustlsConfig::from_pem_file(cert_path, key_path).await {
Ok(config) => {
info!("Starting HTTPS server...");
axum_server::bind_rustls(local_addr, config)
axum_server::bind_rustls(server_addr, config)
.handle(handle.clone())
.serve(app.into_make_service())
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
info!("HTTPS server running on https://{}", addr);
info!("HTTPS server running on https://{}", server_addr);
Ok(())
}
Err(e) => {
error!("Failed to create TLS config: {}", e);
start_http_server(addr, app, handle).await
start_http_server(server_addr, app, handle).await
}
}
} else {
info!("TLS certificates not found at {} and {}", key_path, cert_path);
start_http_server(addr, app, handle).await
start_http_server(server_addr, app, handle).await
}
}

View File

@@ -27,7 +27,7 @@ use ecstore::config as ecconfig;
use ecstore::config::GLOBAL_ConfigSys;
use ecstore::heal::background_heal_ops::init_auto_heal;
use ecstore::store_api::BucketOptions;
use ecstore::utils::net::{self, get_available_port};
use ecstore::utils::net;
use ecstore::StorageAPI;
use ecstore::{
endpoints::EndpointServerPools,
@@ -136,14 +136,8 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("opt: {:?}", &opt);
let mut server_addr = net::check_local_server_addr(opt.address.as_str())?;
if server_addr.port() == 0 {
server_addr.set_port(get_available_port());
}
let server_addr = net::parse_and_resolve_address(opt.address.as_str())?;
let server_port = server_addr.port();
let server_address = server_addr.to_string();
debug!("server_address {}", &server_address);
@@ -263,32 +257,58 @@ async fn run(opt: config::Opt) -> Result<()> {
}
});
let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth);
let tls_path = opt.tls_path.clone().unwrap_or_default();
let key_path = format!("{}/{}", tls_path, RUSTFS_TLS_KEY);
let cert_path = format!("{}/{}", tls_path, RUSTFS_TLS_CERT);
let has_tls_certs = tokio::try_join!(tokio::fs::metadata(key_path.clone()), tokio::fs::metadata(cert_path.clone())).is_ok();
debug!("Main TLS certs: {:?}", has_tls_certs);
let has_tls_certs = tokio::fs::metadata(&tls_path).await.is_ok();
let tls_acceptor = if has_tls_certs {
debug!("Found TLS certificates, starting with HTTPS");
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let certs = utils::load_certs(cert_path.as_str()).map_err(|e| error(e.to_string()))?;
let key = utils::load_private_key(key_path.as_str()).map_err(|e| error(e.to_string()))?;
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.map_err(|e| error(e.to_string()))?;
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
Some(TlsAcceptor::from(Arc::new(server_config)))
debug!("Found TLS directory, checking for certificates");
// 1. Try to load all certificates directly (including root and subdirectories)
match utils::load_all_certs_from_directory(&tls_path) {
Ok(cert_key_pairs) if !cert_key_pairs.is_empty() => {
debug!("Found {} certificates, starting with HTTPS", cert_key_pairs.len());
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
// create a multi certificate configuration
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(Arc::new(utils::create_multi_cert_resolver(cert_key_pairs)?));
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
Some(TlsAcceptor::from(Arc::new(server_config)))
}
_ => {
// 2. If the synthesis fails, fall back to the traditional document certificate mode (backward compatible)
let key_path = format!("{}/{}", tls_path, RUSTFS_TLS_KEY);
let cert_path = format!("{}/{}", tls_path, RUSTFS_TLS_CERT);
let has_single_cert =
tokio::try_join!(tokio::fs::metadata(key_path.clone()), tokio::fs::metadata(cert_path.clone())).is_ok();
if has_single_cert {
debug!("Found legacy single TLS certificate, starting with HTTPS");
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let certs = utils::load_certs(cert_path.as_str()).map_err(|e| error(e.to_string()))?;
let key = utils::load_private_key(key_path.as_str()).map_err(|e| error(e.to_string()))?;
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.map_err(|e| error(e.to_string()))?;
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
Some(TlsAcceptor::from(Arc::new(server_config)))
} else {
debug!("No valid TLS certificates found, starting with HTTP");
None
}
}
}
} else {
debug!("TLS certificates not found, starting with HTTP");
None
};
let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth);
let state_manager = ServiceStateManager::new();
let worker_state_manager = state_manager.clone();
// 更新服务状态为启动中
// Update service status to Starting
state_manager.update(ServiceState::Starting);
// Create shutdown channel
@@ -296,7 +316,7 @@ async fn run(opt: config::Opt) -> Result<()> {
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
// 错误处理改进
// error handling improvements
let sigterm_inner = match signal(SignalKind::terminate()) {
Ok(signal) => signal,
Err(e) => {
@@ -367,7 +387,7 @@ async fn run(opt: config::Opt) -> Result<()> {
let graceful = Arc::new(GracefulShutdown::new());
debug!("graceful initiated");
// 服务准备就绪
// service ready
worker_state_manager.update(ServiceState::Ready);
let value = hybrid_service.clone();
loop {

View File

@@ -1,8 +1,19 @@
use crate::config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use rustls::server::{ClientHello, ResolvesServerCert, ResolvesServerCertUsingSni};
use rustls::sign::CertifiedKey;
use rustls_pemfile::{certs, private_key};
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
use std::collections::HashMap;
use std::fmt::Debug;
use std::io::Error;
use std::net::IpAddr;
use std::path::Path;
use std::sync::Arc;
use std::{fs, io};
use tracing::{debug, warn};
/// Get the local IP address.
/// This function retrieves the local IP address of the machine.
pub(crate) fn get_local_ip() -> Option<std::net::Ipv4Addr> {
match local_ip_address::local_ip() {
Ok(IpAddr::V4(ip)) => Some(ip),
@@ -12,17 +23,24 @@ pub(crate) fn get_local_ip() -> Option<std::net::Ipv4Addr> {
}
/// Load public certificate from file.
/// This function loads a public certificate from the specified file.
pub(crate) fn load_certs(filename: &str) -> io::Result<Vec<CertificateDer<'static>>> {
// Open certificate file.
let cert_file = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?;
let mut reader = io::BufReader::new(cert_file);
// Load and return certificate.
let certs = certs(&mut reader).collect::<Result<Vec<_>, _>>()?;
let certs = certs(&mut reader)
.collect::<Result<Vec<_>, _>>()
.map_err(|_| error(format!("certificate file {} format error", filename)))?;
if certs.is_empty() {
return Err(error(format!("No valid certificate was found in the certificate file {}", filename)));
}
Ok(certs)
}
/// Load private key from file.
/// This function loads a private key from the specified file.
pub(crate) fn load_private_key(filename: &str) -> io::Result<PrivateKeyDer<'static>> {
// Open keyfile.
let keyfile = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?;
@@ -32,6 +50,143 @@ pub(crate) fn load_private_key(filename: &str) -> io::Result<PrivateKeyDer<'stat
private_key(&mut reader)?.ok_or_else(|| error(format!("no private key found in {}", filename)))
}
pub(crate) fn error(err: String) -> io::Error {
io::Error::new(io::ErrorKind::Other, err)
/// error function
pub(crate) fn error(err: String) -> Error {
Error::new(io::ErrorKind::Other, err)
}
/// Load all certificates and private keys in the directory
/// This function loads all certificate and private key pairs from the specified directory.
/// It looks for files named `rustfs_cert.pem` and `rustfs_key.pem` in each subdirectory.
/// The root directory can also contain a default certificate/private key pair.
pub(crate) fn load_all_certs_from_directory(
dir_path: &str,
) -> io::Result<HashMap<String, (Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>> {
let mut cert_key_pairs = HashMap::new();
let dir = Path::new(dir_path);
if !dir.exists() || !dir.is_dir() {
return Err(error(format!(
"The certificate directory does not exist or is not a directory: {}",
dir_path
)));
}
// 1. First check whether there is a certificate/private key pair in the root directory
let root_cert_path = dir.join(RUSTFS_TLS_CERT);
let root_key_path = dir.join(RUSTFS_TLS_KEY);
if root_cert_path.exists() && root_key_path.exists() {
debug!("find the root directory certificate: {:?}", root_cert_path);
let root_cert_str = root_cert_path
.to_str()
.ok_or_else(|| error(format!("Invalid UTF-8 in root certificate path: {:?}", root_cert_path)))?;
let root_key_str = root_key_path
.to_str()
.ok_or_else(|| error(format!("Invalid UTF-8 in root key path: {:?}", root_key_path)))?;
match load_cert_key_pair(root_cert_str, root_key_str) {
Ok((certs, key)) => {
// The root directory certificate is used as the default certificate and is stored using special keys.
cert_key_pairs.insert("default".to_string(), (certs, key));
}
Err(e) => {
warn!("unable to load root directory certificate: {}", e);
}
}
}
// 2.iterate through all folders in the directory
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
let domain_name = path
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| error(format!("invalid domain name directory:{:?}", path)))?;
// find certificate and private key files
let cert_path = path.join(RUSTFS_TLS_CERT); // e.g., rustfs_cert.pem
let key_path = path.join(RUSTFS_TLS_KEY); // e.g., rustfs_key.pem
if cert_path.exists() && key_path.exists() {
debug!("find the domain name certificate: {} in {:?}", domain_name, cert_path);
match load_cert_key_pair(cert_path.to_str().unwrap(), key_path.to_str().unwrap()) {
Ok((certs, key)) => {
cert_key_pairs.insert(domain_name.to_string(), (certs, key));
}
Err(e) => {
warn!("unable to load the certificate for {} domain name: {}", domain_name, e);
}
}
}
}
}
if cert_key_pairs.is_empty() {
return Err(error(format!("No valid certificate/private key pair found in directory {}", dir_path)));
}
Ok(cert_key_pairs)
}
/// loading a single certificate private key pair
/// This function loads a certificate and private key from the specified paths.
/// It returns a tuple containing the certificate and private key.
fn load_cert_key_pair(cert_path: &str, key_path: &str) -> io::Result<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)> {
let certs = load_certs(cert_path)?;
let key = load_private_key(key_path)?;
Ok((certs, key))
}
/// Create a multi-cert resolver
/// This function loads all certificates and private keys from the specified directory.
/// It uses the first certificate/private key pair found in the root directory as the default certificate.
/// The rest of the certificates/private keys are used for SNI resolution.
///
pub fn create_multi_cert_resolver(
cert_key_pairs: HashMap<String, (Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
) -> io::Result<impl ResolvesServerCert> {
#[derive(Debug)]
struct MultiCertResolver {
cert_resolver: ResolvesServerCertUsingSni,
default_cert: Option<Arc<CertifiedKey>>,
}
impl ResolvesServerCert for MultiCertResolver {
fn resolve(&self, client_hello: ClientHello) -> Option<Arc<CertifiedKey>> {
// try matching certificates with sni
if let Some(cert) = self.cert_resolver.resolve(client_hello) {
return Some(cert);
}
// If there is no matching SNI certificate, use the default certificate
self.default_cert.clone()
}
}
let mut resolver = ResolvesServerCertUsingSni::new();
let mut default_cert = None;
for (domain, (certs, key)) in cert_key_pairs {
// create a signature
let signing_key = rustls::crypto::aws_lc_rs::sign::any_supported_type(&key)
.map_err(|_| error(format!("unsupported private key types:{}", domain)))?;
// create a CertifiedKey
let certified_key = CertifiedKey::new(certs, signing_key);
if domain == "default" {
default_cert = Some(Arc::new(certified_key.clone()));
} else {
// add certificate to resolver
resolver
.add(&domain, certified_key)
.map_err(|e| error(format!("failed to add a domain name certificate:{},err: {:?}", domain, e)))?;
}
}
Ok(MultiCertResolver {
cert_resolver: resolver,
default_cert,
})
}

View File

@@ -28,12 +28,12 @@ fi
export RUSTFS_VOLUMES="./target/volume/test{0...4}"
# export RUSTFS_VOLUMES="./target/volume/test"
export RUSTFS_ADDRESS="[::]:9000"
export RUSTFS_ADDRESS=":9000"
export RUSTFS_CONSOLE_ENABLE=true
export RUSTFS_CONSOLE_ADDRESS="[::]:9002"
export RUSTFS_CONSOLE_ADDRESS=":9002"
# export RUSTFS_SERVER_DOMAINS="localhost:9000"
# HTTPS 证书目录
# export RUSTFS_TLS_PATH="./deploy/certs"
export RUSTFS_TLS_PATH="./deploy/certs"
# 具体路径修改为配置文件真实路径obs.example.toml 仅供参考 其中`RUSTFS_OBS_CONFIG` 和下面变量二选一
export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml"
@@ -58,6 +58,11 @@ export RUSTFS__SINKS__KAFKA__BOOTSTRAP_SERVERS=""
export RUSTFS__SINKS__KAFKA__TOPIC=""
export RUSTFS__LOGGER__QUEUE_CAPACITY=10
export OTEL_INSTRUMENTATION_NAME="rustfs"
export OTEL_INSTRUMENTATION_VERSION="0.1.1"
export OTEL_INSTRUMENTATION_SCHEMA_URL="https://opentelemetry.io/schemas/1.31.0"
export OTEL_INSTRUMENTATION_ATTRIBUTES="env=production"
# 事件消息配置
export RUSTFS_EVENT_CONFIG="./deploy/config/event.example.toml"