diff --git a/Cargo.toml b/Cargo.toml index b9eb8dfd..59c48cc1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,7 +106,6 @@ opentelemetry-otlp = { version = "0.29.0" } opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] } parking_lot = "0.12.3" pin-project-lite = "0.2.16" -prometheus = "0.14.0" # pin-utils = "0.1.0" prost = "0.13.5" prost-build = "0.13.5" diff --git a/ecstore/src/utils/net.rs b/ecstore/src/utils/net.rs index e892dce3..bcd2c80d 100644 --- a/ecstore/src/utils/net.rs +++ b/ecstore/src/utils/net.rs @@ -3,7 +3,7 @@ use lazy_static::lazy_static; use std::{ collections::HashSet, fmt::Display, - net::{IpAddr, SocketAddr, TcpListener, ToSocketAddrs}, + net::{IpAddr, Ipv6Addr, SocketAddr, TcpListener, ToSocketAddrs}, }; use url::Host; @@ -141,6 +141,33 @@ impl TryFrom for XHost { } } +/// parses the address string, process the ":port" format for double-stack binding, +/// and resolve the host name or IP address. If the port is 0, an available port is assigned. +pub fn parse_and_resolve_address(addr_str: &str) -> Result { + let resolved_addr: SocketAddr = if let Some(port) = addr_str.strip_prefix(":") { + // Process the ":port" format for double stack binding + let port_str = port; + let port: u16 = port_str + .parse() + .map_err(|e| Error::from_string(format!("Invalid port format: {}, err:{:?}", addr_str, e)))?; + let final_port = if port == 0 { + get_available_port() // assume get_available_port is available here + } else { + port + }; + // Using IPv6 without address specified [::], it should handle both IPv4 and IPv6 + SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), final_port) + } else { + // Use existing logic to handle regular address formats + let mut addr = check_local_server_addr(addr_str)?; // assume check_local_server_addr is available here + if addr.port() == 0 { + addr.set_port(get_available_port()); + } + addr + }; + Ok(resolved_addr) +} + #[cfg(test)] mod test { use std::net::Ipv4Addr; diff --git a/rustfs/src/config/mod.rs b/rustfs/src/config/mod.rs index e877ebad..7a7bdc4c 100644 --- a/rustfs/src/config/mod.rs +++ b/rustfs/src/config/mod.rs @@ -29,11 +29,11 @@ pub const DEFAULT_OBS_CONFIG: &str = "config/obs.toml"; /// Default TLS key for rustfs /// This is the default key for TLS. -pub(crate) const RUSTFS_TLS_KEY: &str = "rustfs_private.key"; +pub(crate) const RUSTFS_TLS_KEY: &str = "rustfs_key.pem"; /// Default TLS cert for rustfs /// This is the default cert for TLS. -pub(crate) const RUSTFS_TLS_CERT: &str = "rustfs_public.crt"; +pub(crate) const RUSTFS_TLS_CERT: &str = "rustfs_cert.pem"; #[allow(clippy::const_is_empty)] const SHORT_VERSION: &str = { diff --git a/rustfs/src/console.rs b/rustfs/src/console.rs index 1125b24d..c3edbaaa 100644 --- a/rustfs/src/console.rs +++ b/rustfs/src/console.rs @@ -207,42 +207,6 @@ async fn config_handler(uri: Uri, Host(host): Host) -> impl IntoResponse { let mut cfg = CONSOLE_CONFIG.get().unwrap().clone(); let url = format!("{}://{}:{}", scheme, host, cfg.port); - - // // 如果指定入口,直接使用 - // let url = if let Some(endpoint) = &config::get_config().console_fs_endpoint { - // debug!("axum Using rustfs endpoint address: {}", endpoint); - // endpoint.clone() - // } else { - // let host_with_port = if host.contains(':') { - // host.clone() - // } else { - // format!("{}:80", host) - // }; - // // 尝试解析为 socket address,但不强制要求一定要是 IP 地址 - // let socket_addr = host_with_port.to_socket_addrs().ok().and_then(|mut addrs| addrs.next()); - // debug!("axum Using host with port: {}, Socket address: {:?}", host_with_port, socket_addr); - // match socket_addr { - // Some(addr) if addr.ip().is_ipv4() => { - // let ipv4 = addr.ip().to_string(); - // // 如果是私有 IP、环回地址或未指定地址,保留原始域名 - // if is_private_ip(addr.ip()) || addr.ip().is_loopback() || addr.ip().is_unspecified() { - // let (host, _) = host_with_port.split_once(':').unwrap_or((&host, "80")); - // debug!("axum Using private IPv4 address: {}", host); - // format!("http://{}:{}", host, cfg.port) - // } else { - // debug!("axum Using public IPv4 address"); - // format!("http://{}:{}", ipv4, cfg.port) - // } - // } - // _ => { - // // 如果不是有效的 IPv4 地址,保留原始域名 - // let (host, _) = host_with_port.split_once(':').unwrap_or((&host, "80")); - // debug!("axum Using domain address: {}", host); - // format!("http://{}:{}", host, cfg.port) - // } - // } - // }; - cfg.api.base_url = format!("{}{}", url, RUSTFS_ADMIN_PREFIX); cfg.s3.endpoint = url; @@ -273,26 +237,29 @@ pub async fn start_static_file_server( .layer(cors) .layer(tower_http::compression::CompressionLayer::new().gzip(true).deflate(true)) .layer(TraceLayer::new_for_http()); - let local_addr: SocketAddr = addrs.parse().expect("Failed to parse socket address"); - info!("WebUI: http://{}:{} http://127.0.0.1:{}", local_ip, local_addr.port(), local_addr.port()); + + use ecstore::utils::net; + let server_addr = net::parse_and_resolve_address(addrs).expect("Failed to parse socket address"); + let server_port = server_addr.port(); + let server_address = server_addr.to_string(); + + info!( + "WebUI: http://{}:{} http://127.0.0.1:{} http://{}", + local_ip, server_port, server_port, server_address + ); info!(" RootUser: {}", access_key); info!(" RootPass: {}", secret_key); // Check and start the HTTPS/HTTP server - match start_server(addrs, local_addr, tls_path, app.clone()).await { + match start_server(server_addr, tls_path, app.clone()).await { Ok(_) => info!("Server shutdown gracefully"), Err(e) => error!("Server error: {}", e), } } -async fn start_server(addrs: &str, local_addr: SocketAddr, tls_path: Option, app: Router) -> io::Result<()> { +async fn start_server(server_addr: SocketAddr, tls_path: Option, app: Router) -> io::Result<()> { let tls_path = tls_path.unwrap_or_default(); let key_path = format!("{}/{}", tls_path, RUSTFS_TLS_KEY); let cert_path = format!("{}/{}", tls_path, RUSTFS_TLS_CERT); - - let addr = addrs - .parse::() - .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid address: {}", e)))?; - let handle = axum_server::Handle::new(); // create a signal off listening task let handle_clone = handle.clone(); @@ -309,24 +276,24 @@ async fn start_server(addrs: &str, local_addr: SocketAddr, tls_path: Option { info!("Starting HTTPS server..."); - axum_server::bind_rustls(local_addr, config) + axum_server::bind_rustls(server_addr, config) .handle(handle.clone()) .serve(app.into_make_service()) .await .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - info!("HTTPS server running on https://{}", addr); + info!("HTTPS server running on https://{}", server_addr); Ok(()) } Err(e) => { error!("Failed to create TLS config: {}", e); - start_http_server(addr, app, handle).await + start_http_server(server_addr, app, handle).await } } } else { info!("TLS certificates not found at {} and {}", key_path, cert_path); - start_http_server(addr, app, handle).await + start_http_server(server_addr, app, handle).await } } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index ca73521f..7871fba5 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -27,7 +27,7 @@ use ecstore::config as ecconfig; use ecstore::config::GLOBAL_ConfigSys; use ecstore::heal::background_heal_ops::init_auto_heal; use ecstore::store_api::BucketOptions; -use ecstore::utils::net::{self, get_available_port}; +use ecstore::utils::net; use ecstore::StorageAPI; use ecstore::{ endpoints::EndpointServerPools, @@ -136,14 +136,8 @@ async fn run(opt: config::Opt) -> Result<()> { debug!("opt: {:?}", &opt); - let mut server_addr = net::check_local_server_addr(opt.address.as_str())?; - - if server_addr.port() == 0 { - server_addr.set_port(get_available_port()); - } - + let server_addr = net::parse_and_resolve_address(opt.address.as_str())?; let server_port = server_addr.port(); - let server_address = server_addr.to_string(); debug!("server_address {}", &server_address); @@ -263,32 +257,58 @@ async fn run(opt: config::Opt) -> Result<()> { } }); - let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth); - let tls_path = opt.tls_path.clone().unwrap_or_default(); - let key_path = format!("{}/{}", tls_path, RUSTFS_TLS_KEY); - let cert_path = format!("{}/{}", tls_path, RUSTFS_TLS_CERT); - let has_tls_certs = tokio::try_join!(tokio::fs::metadata(key_path.clone()), tokio::fs::metadata(cert_path.clone())).is_ok(); - debug!("Main TLS certs: {:?}", has_tls_certs); + let has_tls_certs = tokio::fs::metadata(&tls_path).await.is_ok(); let tls_acceptor = if has_tls_certs { - debug!("Found TLS certificates, starting with HTTPS"); - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - let certs = utils::load_certs(cert_path.as_str()).map_err(|e| error(e.to_string()))?; - let key = utils::load_private_key(key_path.as_str()).map_err(|e| error(e.to_string()))?; - let mut server_config = ServerConfig::builder() - .with_no_client_auth() - .with_single_cert(certs, key) - .map_err(|e| error(e.to_string()))?; - server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; - Some(TlsAcceptor::from(Arc::new(server_config))) + debug!("Found TLS directory, checking for certificates"); + + // 1. Try to load all certificates directly (including root and subdirectories) + match utils::load_all_certs_from_directory(&tls_path) { + Ok(cert_key_pairs) if !cert_key_pairs.is_empty() => { + debug!("Found {} certificates, starting with HTTPS", cert_key_pairs.len()); + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + + // create a multi certificate configuration + let mut server_config = ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(Arc::new(utils::create_multi_cert_resolver(cert_key_pairs)?)); + + server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; + Some(TlsAcceptor::from(Arc::new(server_config))) + } + _ => { + // 2. If the synthesis fails, fall back to the traditional document certificate mode (backward compatible) + let key_path = format!("{}/{}", tls_path, RUSTFS_TLS_KEY); + let cert_path = format!("{}/{}", tls_path, RUSTFS_TLS_CERT); + let has_single_cert = + tokio::try_join!(tokio::fs::metadata(key_path.clone()), tokio::fs::metadata(cert_path.clone())).is_ok(); + + if has_single_cert { + debug!("Found legacy single TLS certificate, starting with HTTPS"); + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let certs = utils::load_certs(cert_path.as_str()).map_err(|e| error(e.to_string()))?; + let key = utils::load_private_key(key_path.as_str()).map_err(|e| error(e.to_string()))?; + let mut server_config = ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(certs, key) + .map_err(|e| error(e.to_string()))?; + server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; + Some(TlsAcceptor::from(Arc::new(server_config))) + } else { + debug!("No valid TLS certificates found, starting with HTTP"); + None + } + } + } } else { debug!("TLS certificates not found, starting with HTTP"); None }; + let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth); let state_manager = ServiceStateManager::new(); let worker_state_manager = state_manager.clone(); - // 更新服务状态为启动中 + // Update service status to Starting state_manager.update(ServiceState::Starting); // Create shutdown channel @@ -296,7 +316,7 @@ async fn run(opt: config::Opt) -> Result<()> { let shutdown_tx_clone = shutdown_tx.clone(); tokio::spawn(async move { - // 错误处理改进 + // error handling improvements let sigterm_inner = match signal(SignalKind::terminate()) { Ok(signal) => signal, Err(e) => { @@ -367,7 +387,7 @@ async fn run(opt: config::Opt) -> Result<()> { let graceful = Arc::new(GracefulShutdown::new()); debug!("graceful initiated"); - // 服务准备就绪 + // service ready worker_state_manager.update(ServiceState::Ready); let value = hybrid_service.clone(); loop { diff --git a/rustfs/src/utils/mod.rs b/rustfs/src/utils/mod.rs index 2d12f64d..fad2718c 100644 --- a/rustfs/src/utils/mod.rs +++ b/rustfs/src/utils/mod.rs @@ -1,8 +1,19 @@ +use crate::config::{RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; +use rustls::server::{ClientHello, ResolvesServerCert, ResolvesServerCertUsingSni}; +use rustls::sign::CertifiedKey; use rustls_pemfile::{certs, private_key}; use rustls_pki_types::{CertificateDer, PrivateKeyDer}; +use std::collections::HashMap; +use std::fmt::Debug; +use std::io::Error; use std::net::IpAddr; +use std::path::Path; +use std::sync::Arc; use std::{fs, io}; +use tracing::{debug, warn}; +/// Get the local IP address. +/// This function retrieves the local IP address of the machine. pub(crate) fn get_local_ip() -> Option { match local_ip_address::local_ip() { Ok(IpAddr::V4(ip)) => Some(ip), @@ -12,17 +23,24 @@ pub(crate) fn get_local_ip() -> Option { } /// Load public certificate from file. +/// This function loads a public certificate from the specified file. pub(crate) fn load_certs(filename: &str) -> io::Result>> { // Open certificate file. let cert_file = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?; let mut reader = io::BufReader::new(cert_file); // Load and return certificate. - let certs = certs(&mut reader).collect::, _>>()?; + let certs = certs(&mut reader) + .collect::, _>>() + .map_err(|_| error(format!("certificate file {} format error", filename)))?; + if certs.is_empty() { + return Err(error(format!("No valid certificate was found in the certificate file {}", filename))); + } Ok(certs) } /// Load private key from file. +/// This function loads a private key from the specified file. pub(crate) fn load_private_key(filename: &str) -> io::Result> { // Open keyfile. let keyfile = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?; @@ -32,6 +50,143 @@ pub(crate) fn load_private_key(filename: &str) -> io::Result io::Error { - io::Error::new(io::ErrorKind::Other, err) +/// error function +pub(crate) fn error(err: String) -> Error { + Error::new(io::ErrorKind::Other, err) +} + +/// Load all certificates and private keys in the directory +/// This function loads all certificate and private key pairs from the specified directory. +/// It looks for files named `rustfs_cert.pem` and `rustfs_key.pem` in each subdirectory. +/// The root directory can also contain a default certificate/private key pair. +pub(crate) fn load_all_certs_from_directory( + dir_path: &str, +) -> io::Result>, PrivateKeyDer<'static>)>> { + let mut cert_key_pairs = HashMap::new(); + let dir = Path::new(dir_path); + + if !dir.exists() || !dir.is_dir() { + return Err(error(format!( + "The certificate directory does not exist or is not a directory: {}", + dir_path + ))); + } + + // 1. First check whether there is a certificate/private key pair in the root directory + let root_cert_path = dir.join(RUSTFS_TLS_CERT); + let root_key_path = dir.join(RUSTFS_TLS_KEY); + + if root_cert_path.exists() && root_key_path.exists() { + debug!("find the root directory certificate: {:?}", root_cert_path); + let root_cert_str = root_cert_path + .to_str() + .ok_or_else(|| error(format!("Invalid UTF-8 in root certificate path: {:?}", root_cert_path)))?; + let root_key_str = root_key_path + .to_str() + .ok_or_else(|| error(format!("Invalid UTF-8 in root key path: {:?}", root_key_path)))?; + match load_cert_key_pair(root_cert_str, root_key_str) { + Ok((certs, key)) => { + // The root directory certificate is used as the default certificate and is stored using special keys. + cert_key_pairs.insert("default".to_string(), (certs, key)); + } + Err(e) => { + warn!("unable to load root directory certificate: {}", e); + } + } + } + + // 2.iterate through all folders in the directory + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + + if path.is_dir() { + let domain_name = path + .file_name() + .and_then(|name| name.to_str()) + .ok_or_else(|| error(format!("invalid domain name directory:{:?}", path)))?; + + // find certificate and private key files + let cert_path = path.join(RUSTFS_TLS_CERT); // e.g., rustfs_cert.pem + let key_path = path.join(RUSTFS_TLS_KEY); // e.g., rustfs_key.pem + + if cert_path.exists() && key_path.exists() { + debug!("find the domain name certificate: {} in {:?}", domain_name, cert_path); + match load_cert_key_pair(cert_path.to_str().unwrap(), key_path.to_str().unwrap()) { + Ok((certs, key)) => { + cert_key_pairs.insert(domain_name.to_string(), (certs, key)); + } + Err(e) => { + warn!("unable to load the certificate for {} domain name: {}", domain_name, e); + } + } + } + } + } + + if cert_key_pairs.is_empty() { + return Err(error(format!("No valid certificate/private key pair found in directory {}", dir_path))); + } + + Ok(cert_key_pairs) +} + +/// loading a single certificate private key pair +/// This function loads a certificate and private key from the specified paths. +/// It returns a tuple containing the certificate and private key. +fn load_cert_key_pair(cert_path: &str, key_path: &str) -> io::Result<(Vec>, PrivateKeyDer<'static>)> { + let certs = load_certs(cert_path)?; + let key = load_private_key(key_path)?; + Ok((certs, key)) +} + +/// Create a multi-cert resolver +/// This function loads all certificates and private keys from the specified directory. +/// It uses the first certificate/private key pair found in the root directory as the default certificate. +/// The rest of the certificates/private keys are used for SNI resolution. +/// +pub fn create_multi_cert_resolver( + cert_key_pairs: HashMap>, PrivateKeyDer<'static>)>, +) -> io::Result { + #[derive(Debug)] + struct MultiCertResolver { + cert_resolver: ResolvesServerCertUsingSni, + default_cert: Option>, + } + impl ResolvesServerCert for MultiCertResolver { + fn resolve(&self, client_hello: ClientHello) -> Option> { + // try matching certificates with sni + if let Some(cert) = self.cert_resolver.resolve(client_hello) { + return Some(cert); + } + + // If there is no matching SNI certificate, use the default certificate + self.default_cert.clone() + } + } + + let mut resolver = ResolvesServerCertUsingSni::new(); + let mut default_cert = None; + + for (domain, (certs, key)) in cert_key_pairs { + // create a signature + let signing_key = rustls::crypto::aws_lc_rs::sign::any_supported_type(&key) + .map_err(|_| error(format!("unsupported private key types:{}", domain)))?; + + // create a CertifiedKey + let certified_key = CertifiedKey::new(certs, signing_key); + if domain == "default" { + default_cert = Some(Arc::new(certified_key.clone())); + } else { + // add certificate to resolver + resolver + .add(&domain, certified_key) + .map_err(|e| error(format!("failed to add a domain name certificate:{},err: {:?}", domain, e)))?; + } + } + + Ok(MultiCertResolver { + cert_resolver: resolver, + default_cert, + }) } diff --git a/scripts/run.sh b/scripts/run.sh index 7717a371..690ac851 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -28,12 +28,12 @@ fi export RUSTFS_VOLUMES="./target/volume/test{0...4}" # export RUSTFS_VOLUMES="./target/volume/test" -export RUSTFS_ADDRESS="[::]:9000" +export RUSTFS_ADDRESS=":9000" export RUSTFS_CONSOLE_ENABLE=true -export RUSTFS_CONSOLE_ADDRESS="[::]:9002" +export RUSTFS_CONSOLE_ADDRESS=":9002" # export RUSTFS_SERVER_DOMAINS="localhost:9000" # HTTPS 证书目录 -# export RUSTFS_TLS_PATH="./deploy/certs" + export RUSTFS_TLS_PATH="./deploy/certs" # 具体路径修改为配置文件真实路径,obs.example.toml 仅供参考 其中`RUSTFS_OBS_CONFIG` 和下面变量二选一 export RUSTFS_OBS_CONFIG="./deploy/config/obs.example.toml" @@ -58,6 +58,11 @@ export RUSTFS__SINKS__KAFKA__BOOTSTRAP_SERVERS="" export RUSTFS__SINKS__KAFKA__TOPIC="" export RUSTFS__LOGGER__QUEUE_CAPACITY=10 +export OTEL_INSTRUMENTATION_NAME="rustfs" +export OTEL_INSTRUMENTATION_VERSION="0.1.1" +export OTEL_INSTRUMENTATION_SCHEMA_URL="https://opentelemetry.io/schemas/1.31.0" +export OTEL_INSTRUMENTATION_ATTRIBUTES="env=production" + # 事件消息配置 export RUSTFS_EVENT_CONFIG="./deploy/config/event.example.toml"