Merge branch 'main' into dada/decom

This commit is contained in:
weisd
2025-04-18 16:02:05 +08:00
8 changed files with 145 additions and 68 deletions

33
Cargo.lock generated
View File

@@ -525,6 +525,7 @@ version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06575e6a9673580f52661c92107baabffbf41e2141373441cbcdc47cb733003c"
dependencies = [
"brotli",
"bzip2",
"flate2",
"futures-core",
@@ -3090,6 +3091,7 @@ dependencies = [
"serde_json",
"sha2 0.11.0-pre.5",
"siphasher 1.0.1",
"smallvec",
"tempfile",
"thiserror 2.0.12",
"time",
@@ -7207,6 +7209,7 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"shadow-rs",
"tikv-jemallocator",
"time",
"tokio",
"tokio-rustls",
@@ -7357,7 +7360,7 @@ checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"
[[package]]
name = "s3s"
version = "0.12.0-dev"
source = "git+https://github.com/Nugine/s3s.git?rev=3ad13ace7af703c3c8afc99cf19f4c18c82603a3#3ad13ace7af703c3c8afc99cf19f4c18c82603a3"
source = "git+https://github.com/Nugine/s3s.git?rev=4733cdfb27b2713e832967232cbff413bb768c10#4733cdfb27b2713e832967232cbff413bb768c10"
dependencies = [
"arrayvec",
"async-trait",
@@ -7407,7 +7410,7 @@ dependencies = [
[[package]]
name = "s3s-policy"
version = "0.12.0-dev"
source = "git+https://github.com/Nugine/s3s.git?rev=3ad13ace7af703c3c8afc99cf19f4c18c82603a3#3ad13ace7af703c3c8afc99cf19f4c18c82603a3"
source = "git+https://github.com/Nugine/s3s.git?rev=4733cdfb27b2713e832967232cbff413bb768c10#4733cdfb27b2713e832967232cbff413bb768c10"
dependencies = [
"indexmap 2.9.0",
"serde",
@@ -8317,6 +8320,26 @@ dependencies = [
"ordered-float",
]
[[package]]
name = "tikv-jemalloc-sys"
version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "tikv-jemallocator"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865"
dependencies = [
"libc",
"tikv-jemalloc-sys",
]
[[package]]
name = "time"
version = "0.3.41"
@@ -8621,12 +8644,18 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697"
dependencies = [
"async-compression",
"bitflags 2.9.0",
"bytes",
"futures-core",
"http",
"http-body",
"pin-project-lite",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]

View File

@@ -102,8 +102,8 @@ rust-embed = "8.7.0"
rustls = { version = "0.23.26" }
rustls-pki-types = "1.11.0"
rustls-pemfile = "2.2.0"
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "3ad13ace7af703c3c8afc99cf19f4c18c82603a3" }
s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "3ad13ace7af703c3c8afc99cf19f4c18c82603a3" }
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" }
s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" }
shadow-rs = { version = "0.38.1", default-features = false }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"

View File

@@ -68,6 +68,7 @@ madmin.workspace = true
workers.workspace = true
reqwest = { workspace = true }
urlencoding = "2.1.3"
smallvec = "1.15.0"
[target.'cfg(not(windows))'.dependencies]

View File

@@ -1,9 +1,11 @@
use crate::bitrot::{BitrotReader, BitrotWriter};
use crate::error::clone_err;
use crate::quorum::{object_op_ignored_errs, reduce_write_quorum_errs};
use bytes::{Bytes, BytesMut};
use common::error::{Error, Result};
use futures::future::join_all;
use reed_solomon_erasure::galois_8::ReedSolomon;
use smallvec::SmallVec;
use std::any::Any;
use std::io::ErrorKind;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -66,6 +68,7 @@ impl Erasure {
// );
let mut total: usize = 0;
let mut blocks = <SmallVec<[Bytes; 16]>>::new();
loop {
if total_size > 0 {
@@ -97,7 +100,7 @@ impl Erasure {
total += self.buf.len();
}
let blocks = self.encode_data(&self.buf)?;
self.encode_data(&self.buf, &mut blocks)?;
let mut errs = Vec::new();
// TODO: 并发写入
@@ -355,20 +358,20 @@ impl Erasure {
self.data_shards + self.parity_shards
}
pub fn encode_data(&self, data: &[u8]) -> Result<Vec<Vec<u8>>> {
#[tracing::instrument(level = "debug", skip_all, fields(data_len=data.len()))]
pub fn encode_data(&self, data: &[u8], shards: &mut SmallVec<[Bytes; 16]>) -> Result<()> {
let (shard_size, total_size) = self.need_size(data.len());
// 生成一个新的 所需的所有分片数据长度
let mut data_buffer = vec![0u8; total_size];
{
// 复制源数据
let (left, _) = data_buffer.split_at_mut(data.len());
left.copy_from_slice(data);
}
let mut data_buffer = BytesMut::with_capacity(total_size);
// 复制源数据
data_buffer.extend_from_slice(data);
data_buffer.resize(total_size, 0u8);
{
// ec encode, 结果会写进 data_buffer
let data_slices: Vec<&mut [u8]> = data_buffer.chunks_mut(shard_size).collect();
let data_slices: SmallVec<[&mut [u8]; 16]> = data_buffer.chunks_exact_mut(shard_size).collect();
// partiy 数量大于0 才ec
if self.parity_shards > 0 {
@@ -376,16 +379,16 @@ impl Erasure {
}
}
// 分片
let mut shards = Vec::with_capacity(self.total_shard_count());
let slices: Vec<&[u8]> = data_buffer.chunks(shard_size).collect();
for &d in slices.iter() {
shards.push(d.to_vec());
// 零拷贝分片,所有 shard 引用 data_buffer
let mut data_buffer = data_buffer.freeze();
shards.clear();
shards.reserve(self.total_shard_count());
for _ in 0..self.total_shard_count() {
let shard = data_buffer.split_to(shard_size);
shards.push(shard);
}
Ok(shards)
Ok(())
}
pub fn decode_data(&self, shards: &mut [Option<Vec<u8>>]) -> Result<()> {
@@ -625,12 +628,13 @@ mod test {
let parity_shards = 2;
let data: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];
let ec = Erasure::new(data_shards, parity_shards, 1);
let shards = ec.encode_data(data).unwrap();
let mut shards = SmallVec::new();
ec.encode_data(data, &mut shards).unwrap();
println!("shards:{:?}", shards);
let mut s: Vec<_> = shards
.iter()
.map(|d| if d.is_empty() { None } else { Some(d.clone()) })
.map(|d| if d.is_empty() { None } else { Some(d.to_vec()) })
.collect();
// let mut s = shards_to_option_shards(&shards);

View File

@@ -75,12 +75,15 @@ tokio-stream.workspace = true
tonic = { workspace = true }
tower.workspace = true
transform-stream.workspace = true
tower-http.workspace = true
tower-http = { workspace = true, features = ["trace", "compression-full", "cors"] }
uuid = { workspace = true }
[target.'cfg(target_os = "linux")'.dependencies]
libsystemd.workspace = true
[target.'cfg(all(target_os = "linux", target_env = "gnu"))'.dependencies]
tikv-jemallocator = "0.6"
[build-dependencies]
prost-build.workspace = true
tonic-build.workspace = true

View File

@@ -8,17 +8,21 @@ use axum::{
Router,
};
use axum_extra::extract::Host;
use std::io;
use axum::response::Redirect;
use axum_server::tls_rustls::RustlsConfig;
use http::Uri;
use http::{header, Uri};
use mime_guess::from_path;
use rust_embed::RustEmbed;
use serde::Serialize;
use shadow_rs::shadow;
use std::net::{Ipv4Addr, SocketAddr};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::OnceLock;
use std::time::Duration;
use tokio::signal;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
use tracing::{debug, error, info, instrument};
shadow!(build);
@@ -204,7 +208,7 @@ async fn config_handler(uri: Uri, Host(host): Host) -> impl IntoResponse {
let url = format!("{}://{}:{}", scheme, host, cfg.port);
// // 如果指定入口, 直接使用
// // 如果指定入口直接使用
// let url = if let Some(endpoint) = &config::get_config().console_fs_endpoint {
// debug!("axum Using rustfs endpoint address: {}", endpoint);
// endpoint.clone()
@@ -256,11 +260,19 @@ pub async fn start_static_file_server(
secret_key: &str,
tls_path: Option<String>,
) {
// 配置 CORS
let cors = CorsLayer::new()
.allow_origin(Any) // 生产环境建议指定具体域名
.allow_methods([http::Method::GET, http::Method::POST])
.allow_headers([header::CONTENT_TYPE]);
// Create a route
let app = Router::new()
.route("/license", get(license_handler))
.route("/config.json", get(config_handler))
.fallback_service(get(static_handler));
.fallback_service(get(static_handler))
.layer(cors)
.layer(tower_http::compression::CompressionLayer::new())
.layer(TraceLayer::new_for_http());
let local_addr: SocketAddr = addrs.parse().expect("Failed to parse socket address");
info!("WebUI: http://{}:{} http://127.0.0.1:{}", local_ip, local_addr.port(), local_addr.port());
info!(" RootUser: {}", access_key);
@@ -272,58 +284,78 @@ pub async fn start_static_file_server(
Err(e) => error!("Server error: {}", e),
}
}
async fn start_server(addrs: &str, local_addr: SocketAddr, tls_path: Option<String>, app: Router) -> std::io::Result<()> {
async fn start_server(addrs: &str, local_addr: SocketAddr, tls_path: Option<String>, app: Router) -> io::Result<()> {
let tls_path = tls_path.unwrap_or_default();
let key_path = format!("{}/{}", tls_path, RUSTFS_TLS_KEY);
let cert_path = format!("{}/{}", tls_path, RUSTFS_TLS_CERT);
let has_tls_certs = tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok();
debug!("Console TLS certs: {:?}", has_tls_certs);
if has_tls_certs {
debug!("Found TLS certificates, starting with HTTPS");
match tokio::try_join!(tokio::fs::read(&key_path), tokio::fs::read(&cert_path)) {
Ok((key_data, cert_data)) => {
match RustlsConfig::from_pem(cert_data, key_data).await {
Ok(config) => {
let handle = axum_server::Handle::new();
// create a signal off listening task
let handle_clone = handle.clone();
tokio::spawn(async move {
shutdown_signal().await;
handle_clone.graceful_shutdown(Some(Duration::from_secs(10)));
});
info!("Starting HTTPS server...");
axum_server::bind_rustls(local_addr, config)
.handle(handle.clone())
.serve(app.into_make_service())
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
Ok(())
}
Err(e) => {
error!("Failed to create TLS config: {}", e);
start_http_server(addrs, app).await
}
}
let addr = addrs
.parse::<SocketAddr>()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid address: {}", e)))?;
let handle = axum_server::Handle::new();
// create a signal off listening task
let handle_clone = handle.clone();
tokio::spawn(async move {
shutdown_signal().await;
info!("Initiating graceful shutdown...");
handle_clone.graceful_shutdown(Some(Duration::from_secs(10)));
});
let has_tls_certs = tokio::try_join!(tokio::fs::metadata(&key_path), tokio::fs::metadata(&cert_path)).is_ok();
info!("Console TLS certs: {:?}", has_tls_certs);
if has_tls_certs {
info!("Found TLS certificates, starting with HTTPS");
match RustlsConfig::from_pem_file(cert_path, key_path).await {
Ok(config) => {
info!("Starting HTTPS server...");
axum_server::bind_rustls(local_addr, config)
.handle(handle.clone())
.serve(app.into_make_service())
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
info!("HTTPS server running on https://{}", addr);
Ok(())
}
Err(e) => {
error!("Failed to read TLS certificates: {}", e);
start_http_server(addrs, app).await
error!("Failed to create TLS config: {}", e);
start_http_server(addr, app, handle).await
}
}
} else {
debug!("TLS certificates not found at {} and {}", key_path, cert_path);
start_http_server(addrs, app).await
info!("TLS certificates not found at {} and {}", key_path, cert_path);
start_http_server(addr, app, handle).await
}
}
async fn start_http_server(addrs: &str, app: Router) -> std::io::Result<()> {
#[allow(dead_code)]
/// HTTP 到 HTTPS 的 308 重定向
fn redirect_to_https(https_port: u16) -> Router {
Router::new().route(
"/*path",
get({
move |uri: Uri, req: http::Request<Body>| async move {
let host = req
.headers()
.get("host")
.map_or("localhost", |h| h.to_str().unwrap_or("localhost"));
let path = uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("");
let https_url = format!("https://{}:{}{}", host, https_port, path);
Redirect::permanent(&https_url)
}
}),
)
}
async fn start_http_server(addr: SocketAddr, app: Router, handle: axum_server::Handle) -> io::Result<()> {
debug!("Starting HTTP server...");
let listener = tokio::net::TcpListener::bind(addrs).await?;
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
axum_server::bind(addr)
.handle(handle)
.serve(app.into_make_service())
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
async fn shutdown_signal() {

View File

@@ -59,6 +59,10 @@ use tonic::{metadata::MetadataValue, Request, Status};
use tower_http::cors::CorsLayer;
use tracing::{debug, error, info, info_span, warn};
#[cfg(all(target_os = "linux", target_env = "gnu"))]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
fn check_auth(req: Request<()>) -> Result<Request<()>, Status> {
let token: MetadataValue<_> = "rustfs rpc".parse().unwrap();
@@ -325,6 +329,10 @@ async fn run(opt: config::Opt) -> Result<()> {
}
};
if let Err(err) = socket.set_nodelay(true) {
warn!(?err, "Failed to set TCP_NODELAY");
}
if has_tls_certs {
debug!("TLS certificates found, starting with SIGINT");
let tls_socket = match tls_acceptor

View File

@@ -28,9 +28,9 @@ fi
export RUSTFS_VOLUMES="./target/volume/test{0...4}"
# export RUSTFS_VOLUMES="./target/volume/test"
export RUSTFS_ADDRESS="0.0.0.0:9000"
export RUSTFS_ADDRESS="[::]:9000"
export RUSTFS_CONSOLE_ENABLE=true
export RUSTFS_CONSOLE_ADDRESS="0.0.0.0:9002"
export RUSTFS_CONSOLE_ADDRESS="[::]:9002"
# export RUSTFS_SERVER_DOMAINS="localhost:9000"
# HTTPS 证书目录
# export RUSTFS_TLS_PATH="./deploy/certs"