diff --git a/Cargo.lock b/Cargo.lock index 2d5492c4..f69232ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -525,6 +525,7 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06575e6a9673580f52661c92107baabffbf41e2141373441cbcdc47cb733003c" dependencies = [ + "brotli", "bzip2", "flate2", "futures-core", @@ -3090,6 +3091,7 @@ dependencies = [ "serde_json", "sha2 0.11.0-pre.5", "siphasher 1.0.1", + "smallvec", "tempfile", "thiserror 2.0.12", "time", @@ -7207,6 +7209,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "shadow-rs", + "tikv-jemallocator", "time", "tokio", "tokio-rustls", @@ -7357,7 +7360,7 @@ checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" [[package]] name = "s3s" version = "0.12.0-dev" -source = "git+https://github.com/Nugine/s3s.git?rev=3ad13ace7af703c3c8afc99cf19f4c18c82603a3#3ad13ace7af703c3c8afc99cf19f4c18c82603a3" +source = "git+https://github.com/Nugine/s3s.git?rev=4733cdfb27b2713e832967232cbff413bb768c10#4733cdfb27b2713e832967232cbff413bb768c10" dependencies = [ "arrayvec", "async-trait", @@ -7407,7 +7410,7 @@ dependencies = [ [[package]] name = "s3s-policy" version = "0.12.0-dev" -source = "git+https://github.com/Nugine/s3s.git?rev=3ad13ace7af703c3c8afc99cf19f4c18c82603a3#3ad13ace7af703c3c8afc99cf19f4c18c82603a3" +source = "git+https://github.com/Nugine/s3s.git?rev=4733cdfb27b2713e832967232cbff413bb768c10#4733cdfb27b2713e832967232cbff413bb768c10" dependencies = [ "indexmap 2.9.0", "serde", @@ -8317,6 +8320,26 @@ dependencies = [ "ordered-float", ] +[[package]] +name = "tikv-jemalloc-sys" +version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.3.41" @@ -8621,12 +8644,18 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697" dependencies = [ + "async-compression", "bitflags 2.9.0", "bytes", + "futures-core", "http", + "http-body", "pin-project-lite", + "tokio", + "tokio-util", "tower-layer", "tower-service", + "tracing", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0dc72ad9..381305c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -102,8 +102,8 @@ rust-embed = "8.7.0" rustls = { version = "0.23.26" } rustls-pki-types = "1.11.0" rustls-pemfile = "2.2.0" -s3s = { git = "https://github.com/Nugine/s3s.git", rev = "3ad13ace7af703c3c8afc99cf19f4c18c82603a3" } -s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "3ad13ace7af703c3c8afc99cf19f4c18c82603a3" } +s3s = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" } +s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" } shadow-rs = { version = "0.38.1", default-features = false } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index a57bdcd5..13c8edc5 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -68,6 +68,7 @@ madmin.workspace = true workers.workspace = true reqwest = { workspace = true } urlencoding = "2.1.3" +smallvec = "1.15.0" [target.'cfg(not(windows))'.dependencies] diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index d4f4f1af..d99c7eea 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -1,9 +1,11 @@ use crate::bitrot::{BitrotReader, BitrotWriter}; use crate::error::clone_err; use crate::quorum::{object_op_ignored_errs, reduce_write_quorum_errs}; +use bytes::{Bytes, BytesMut}; use common::error::{Error, Result}; use futures::future::join_all; use reed_solomon_erasure::galois_8::ReedSolomon; +use smallvec::SmallVec; use std::any::Any; use std::io::ErrorKind; use tokio::io::{AsyncRead, AsyncWrite}; @@ -66,6 +68,7 @@ impl Erasure { // ); let mut total: usize = 0; + let mut blocks = >::new(); loop { if total_size > 0 { @@ -97,7 +100,7 @@ impl Erasure { total += self.buf.len(); } - let blocks = self.encode_data(&self.buf)?; + self.encode_data(&self.buf, &mut blocks)?; let mut errs = Vec::new(); // TODO: 并发写入 @@ -355,20 +358,20 @@ impl Erasure { self.data_shards + self.parity_shards } - pub fn encode_data(&self, data: &[u8]) -> Result>> { + #[tracing::instrument(level = "debug", skip_all, fields(data_len=data.len()))] + pub fn encode_data(&self, data: &[u8], shards: &mut SmallVec<[Bytes; 16]>) -> Result<()> { let (shard_size, total_size) = self.need_size(data.len()); // 生成一个新的 所需的所有分片数据长度 - let mut data_buffer = vec![0u8; total_size]; - { - // 复制源数据 - let (left, _) = data_buffer.split_at_mut(data.len()); - left.copy_from_slice(data); - } + let mut data_buffer = BytesMut::with_capacity(total_size); + + // 复制源数据 + data_buffer.extend_from_slice(data); + data_buffer.resize(total_size, 0u8); { // ec encode, 结果会写进 data_buffer - let data_slices: Vec<&mut [u8]> = data_buffer.chunks_mut(shard_size).collect(); + let data_slices: SmallVec<[&mut [u8]; 16]> = data_buffer.chunks_exact_mut(shard_size).collect(); // partiy 数量大于0 才ec if self.parity_shards > 0 { @@ -376,16 +379,16 @@ impl Erasure { } } - // 分片 - let mut shards = Vec::with_capacity(self.total_shard_count()); - - let slices: Vec<&[u8]> = data_buffer.chunks(shard_size).collect(); - - for &d in slices.iter() { - shards.push(d.to_vec()); + // 零拷贝分片,所有 shard 引用 data_buffer + let mut data_buffer = data_buffer.freeze(); + shards.clear(); + shards.reserve(self.total_shard_count()); + for _ in 0..self.total_shard_count() { + let shard = data_buffer.split_to(shard_size); + shards.push(shard); } - Ok(shards) + Ok(()) } pub fn decode_data(&self, shards: &mut [Option>]) -> Result<()> { @@ -625,12 +628,13 @@ mod test { let parity_shards = 2; let data: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]; let ec = Erasure::new(data_shards, parity_shards, 1); - let shards = ec.encode_data(data).unwrap(); + let mut shards = SmallVec::new(); + ec.encode_data(data, &mut shards).unwrap(); println!("shards:{:?}", shards); let mut s: Vec<_> = shards .iter() - .map(|d| if d.is_empty() { None } else { Some(d.clone()) }) + .map(|d| if d.is_empty() { None } else { Some(d.to_vec()) }) .collect(); // let mut s = shards_to_option_shards(&shards); diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index cf59da3e..ff8b5971 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -75,12 +75,15 @@ tokio-stream.workspace = true tonic = { workspace = true } tower.workspace = true transform-stream.workspace = true -tower-http.workspace = true +tower-http = { workspace = true, features = ["trace", "compression-full", "cors"] } uuid = { workspace = true } [target.'cfg(target_os = "linux")'.dependencies] libsystemd.workspace = true +[target.'cfg(all(target_os = "linux", target_env = "gnu"))'.dependencies] +tikv-jemallocator = "0.6" + [build-dependencies] prost-build.workspace = true tonic-build.workspace = true diff --git a/rustfs/src/console.rs b/rustfs/src/console.rs index df6ffaee..f2eda099 100644 --- a/rustfs/src/console.rs +++ b/rustfs/src/console.rs @@ -8,17 +8,21 @@ use axum::{ Router, }; use axum_extra::extract::Host; +use std::io; +use axum::response::Redirect; use axum_server::tls_rustls::RustlsConfig; -use http::Uri; +use http::{header, Uri}; use mime_guess::from_path; use rust_embed::RustEmbed; use serde::Serialize; use shadow_rs::shadow; -use std::net::{Ipv4Addr, SocketAddr}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::OnceLock; use std::time::Duration; use tokio::signal; +use tower_http::cors::{Any, CorsLayer}; +use tower_http::trace::TraceLayer; use tracing::{debug, error, info, instrument}; shadow!(build); @@ -204,7 +208,7 @@ async fn config_handler(uri: Uri, Host(host): Host) -> impl IntoResponse { let url = format!("{}://{}:{}", scheme, host, cfg.port); - // // 如果指定入口, 直接使用 + // // 如果指定入口,直接使用 // let url = if let Some(endpoint) = &config::get_config().console_fs_endpoint { // debug!("axum Using rustfs endpoint address: {}", endpoint); // endpoint.clone() @@ -256,11 +260,19 @@ pub async fn start_static_file_server( secret_key: &str, tls_path: Option, ) { + // 配置 CORS + let cors = CorsLayer::new() + .allow_origin(Any) // 生产环境建议指定具体域名 + .allow_methods([http::Method::GET, http::Method::POST]) + .allow_headers([header::CONTENT_TYPE]); // Create a route let app = Router::new() .route("/license", get(license_handler)) .route("/config.json", get(config_handler)) - .fallback_service(get(static_handler)); + .fallback_service(get(static_handler)) + .layer(cors) + .layer(tower_http::compression::CompressionLayer::new()) + .layer(TraceLayer::new_for_http()); let local_addr: SocketAddr = addrs.parse().expect("Failed to parse socket address"); info!("WebUI: http://{}:{} http://127.0.0.1:{}", local_ip, local_addr.port(), local_addr.port()); info!(" RootUser: {}", access_key); @@ -272,58 +284,78 @@ pub async fn start_static_file_server( Err(e) => error!("Server error: {}", e), } } -async fn start_server(addrs: &str, local_addr: SocketAddr, tls_path: Option, app: Router) -> std::io::Result<()> { +async fn start_server(addrs: &str, local_addr: SocketAddr, tls_path: Option, app: Router) -> io::Result<()> { let tls_path = tls_path.unwrap_or_default(); let key_path = format!("{}/{}", tls_path, RUSTFS_TLS_KEY); let cert_path = format!("{}/{}", tls_path, RUSTFS_TLS_CERT); - let has_tls_certs = tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok(); - debug!("Console TLS certs: {:?}", has_tls_certs); - if has_tls_certs { - debug!("Found TLS certificates, starting with HTTPS"); - match tokio::try_join!(tokio::fs::read(&key_path), tokio::fs::read(&cert_path)) { - Ok((key_data, cert_data)) => { - match RustlsConfig::from_pem(cert_data, key_data).await { - Ok(config) => { - let handle = axum_server::Handle::new(); - // create a signal off listening task - let handle_clone = handle.clone(); - tokio::spawn(async move { - shutdown_signal().await; - handle_clone.graceful_shutdown(Some(Duration::from_secs(10))); - }); - info!("Starting HTTPS server..."); - axum_server::bind_rustls(local_addr, config) - .handle(handle.clone()) - .serve(app.into_make_service()) - .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - Ok(()) - } - Err(e) => { - error!("Failed to create TLS config: {}", e); - start_http_server(addrs, app).await - } - } + let addr = addrs + .parse::() + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid address: {}", e)))?; + + let handle = axum_server::Handle::new(); + // create a signal off listening task + let handle_clone = handle.clone(); + tokio::spawn(async move { + shutdown_signal().await; + info!("Initiating graceful shutdown..."); + handle_clone.graceful_shutdown(Some(Duration::from_secs(10))); + }); + + let has_tls_certs = tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok(); + info!("Console TLS certs: {:?}", has_tls_certs); + if has_tls_certs { + info!("Found TLS certificates, starting with HTTPS"); + match RustlsConfig::from_pem_file(cert_path, key_path).await { + Ok(config) => { + info!("Starting HTTPS server..."); + axum_server::bind_rustls(local_addr, config) + .handle(handle.clone()) + .serve(app.into_make_service()) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + info!("HTTPS server running on https://{}", addr); + + Ok(()) } Err(e) => { - error!("Failed to read TLS certificates: {}", e); - start_http_server(addrs, app).await + error!("Failed to create TLS config: {}", e); + start_http_server(addr, app, handle).await } } } else { - debug!("TLS certificates not found at {} and {}", key_path, cert_path); - start_http_server(addrs, app).await + info!("TLS certificates not found at {} and {}", key_path, cert_path); + start_http_server(addr, app, handle).await } } -async fn start_http_server(addrs: &str, app: Router) -> std::io::Result<()> { +#[allow(dead_code)] +/// HTTP 到 HTTPS 的 308 重定向 +fn redirect_to_https(https_port: u16) -> Router { + Router::new().route( + "/*path", + get({ + move |uri: Uri, req: http::Request| async move { + let host = req + .headers() + .get("host") + .map_or("localhost", |h| h.to_str().unwrap_or("localhost")); + let path = uri.path_and_query().map(|pq| pq.as_str()).unwrap_or(""); + let https_url = format!("https://{}:{}{}", host, https_port, path); + Redirect::permanent(&https_url) + } + }), + ) +} + +async fn start_http_server(addr: SocketAddr, app: Router, handle: axum_server::Handle) -> io::Result<()> { debug!("Starting HTTP server..."); - let listener = tokio::net::TcpListener::bind(addrs).await?; - axum::serve(listener, app) - .with_graceful_shutdown(shutdown_signal()) + axum_server::bind(addr) + .handle(handle) + .serve(app.into_make_service()) .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } async fn shutdown_signal() { diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index f5f5ecc5..570c3482 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -59,6 +59,10 @@ use tonic::{metadata::MetadataValue, Request, Status}; use tower_http::cors::CorsLayer; use tracing::{debug, error, info, info_span, warn}; +#[cfg(all(target_os = "linux", target_env = "gnu"))] +#[global_allocator] +static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + fn check_auth(req: Request<()>) -> Result, Status> { let token: MetadataValue<_> = "rustfs rpc".parse().unwrap(); @@ -325,6 +329,10 @@ async fn run(opt: config::Opt) -> Result<()> { } }; + if let Err(err) = socket.set_nodelay(true) { + warn!(?err, "Failed to set TCP_NODELAY"); + } + if has_tls_certs { debug!("TLS certificates found, starting with SIGINT"); let tls_socket = match tls_acceptor diff --git a/scripts/run.sh b/scripts/run.sh index 61f82aa8..497c2a68 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -28,9 +28,9 @@ fi export RUSTFS_VOLUMES="./target/volume/test{0...4}" # export RUSTFS_VOLUMES="./target/volume/test" -export RUSTFS_ADDRESS="0.0.0.0:9000" +export RUSTFS_ADDRESS="[::]:9000" export RUSTFS_CONSOLE_ENABLE=true -export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9002" +export RUSTFS_CONSOLE_ADDRESS="[::]:9002" # export RUSTFS_SERVER_DOMAINS="localhost:9000" # HTTPS 证书目录 # export RUSTFS_TLS_PATH="./deploy/certs"