From 9bdc96de8c9c510d82323d4a31073922f92fb99e Mon Sep 17 00:00:00 2001 From: junxiang Mu <1948535941@qq.com> Date: Tue, 1 Apr 2025 03:52:12 +0000 Subject: [PATCH 01/10] support func Signed-off-by: junxiang Mu <1948535941@qq.com> --- s3select/api/src/query/function.rs | 11 ++-- s3select/query/src/dispatcher/manager.rs | 2 +- .../query/src/function/simple_func_manager.rs | 60 ++++++++++++++--- s3select/query/src/instance.rs | 64 ++++++++++++++----- s3select/query/src/metadata/mod.rs | 6 +- 5 files changed, 110 insertions(+), 33 deletions(-) diff --git a/s3select/api/src/query/function.rs b/s3select/api/src/query/function.rs index af207fc1..3a8c6761 100644 --- a/s3select/api/src/query/function.rs +++ b/s3select/api/src/query/function.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::sync::Arc; use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF}; @@ -7,11 +6,11 @@ use crate::QueryResult; pub type FuncMetaManagerRef = Arc; pub trait FunctionMetadataManager { - fn register_udf(&mut self, udf: ScalarUDF) -> QueryResult<()>; + fn register_udf(&mut self, udf: Arc) -> QueryResult<()>; - fn register_udaf(&mut self, udaf: AggregateUDF) -> QueryResult<()>; + fn register_udaf(&mut self, udaf: Arc) -> QueryResult<()>; - fn register_udwf(&mut self, udwf: WindowUDF) -> QueryResult<()>; + fn register_udwf(&mut self, udwf: Arc) -> QueryResult<()>; fn udf(&self, name: &str) -> QueryResult>; @@ -19,5 +18,7 @@ pub trait FunctionMetadataManager { fn udwf(&self, name: &str) -> QueryResult>; - fn udfs(&self) -> HashSet; + fn udfs(&self) -> Vec; + fn udafs(&self) -> Vec; + fn udwfs(&self) -> Vec; } diff --git a/s3select/query/src/dispatcher/manager.rs b/s3select/query/src/dispatcher/manager.rs index 4abc4cec..e85e7a54 100644 --- a/s3select/query/src/dispatcher/manager.rs +++ b/s3select/query/src/dispatcher/manager.rs @@ -139,7 +139,7 @@ impl SimpleQueryDispatcher { let path = format!("s3://{}/{}", self.input.bucket, self.input.key); let table_path = ListingTableUrl::parse(path)?; let listing_options = if self.input.request.input_serialization.csv.is_some() { - let file_format = CsvFormat::default().with_options(CsvOptions::default().with_has_header(false)); + let file_format = CsvFormat::default().with_options(CsvOptions::default().with_has_header(true)); ListingOptions::new(Arc::new(file_format)).with_file_extension(".csv") } else if self.input.request.input_serialization.parquet.is_some() { let file_format = ParquetFormat::new(); diff --git a/s3select/query/src/function/simple_func_manager.rs b/s3select/query/src/function/simple_func_manager.rs index 129efacf..bb3d3852 100644 --- a/s3select/query/src/function/simple_func_manager.rs +++ b/s3select/query/src/function/simple_func_manager.rs @@ -1,13 +1,15 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use api::query::function::FunctionMetadataManager; use api::{QueryError, QueryResult}; +use datafusion::execution::SessionStateDefaults; use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF}; +use tracing::debug; pub type SimpleFunctionMetadataManagerRef = Arc; -#[derive(Debug, Default)] +#[derive(Debug)] pub struct SimpleFunctionMetadataManager { /// Scalar functions that are registered with the context pub scalar_functions: HashMap>, @@ -17,19 +19,53 @@ pub struct SimpleFunctionMetadataManager { pub window_functions: HashMap>, } +impl Default for SimpleFunctionMetadataManager { + fn default() -> Self { + let mut func_meta_manager = Self { + scalar_functions: Default::default(), + aggregate_functions: Default::default(), + window_functions: Default::default(), + }; + SessionStateDefaults::default_scalar_functions().into_iter().for_each(|udf| { + let existing_udf = func_meta_manager.register_udf(udf.clone()); + if let Ok(()) = existing_udf { + debug!("Overwrote an existing UDF: {}", udf.name()); + } + }); + + SessionStateDefaults::default_aggregate_functions() + .into_iter() + .for_each(|udaf| { + let existing_udaf = func_meta_manager.register_udaf(udaf.clone()); + if let Ok(()) = existing_udaf { + debug!("Overwrote an existing UDAF: {}", udaf.name()); + } + }); + + SessionStateDefaults::default_window_functions().into_iter().for_each(|udwf| { + let existing_udwf = func_meta_manager.register_udwf(udwf.clone()); + if let Ok(()) = existing_udwf { + debug!("Overwrote an existing UDWF: {}", udwf.name()); + } + }); + + func_meta_manager + } +} + impl FunctionMetadataManager for SimpleFunctionMetadataManager { - fn register_udf(&mut self, f: ScalarUDF) -> QueryResult<()> { - self.scalar_functions.insert(f.inner().name().to_uppercase(), Arc::new(f)); + fn register_udf(&mut self, f: Arc) -> QueryResult<()> { + self.scalar_functions.insert(f.inner().name().to_uppercase(), f); Ok(()) } - fn register_udaf(&mut self, f: AggregateUDF) -> QueryResult<()> { - self.aggregate_functions.insert(f.inner().name().to_uppercase(), Arc::new(f)); + fn register_udaf(&mut self, f: Arc) -> QueryResult<()> { + self.aggregate_functions.insert(f.inner().name().to_uppercase(), f); Ok(()) } - fn register_udwf(&mut self, f: WindowUDF) -> QueryResult<()> { - self.window_functions.insert(f.inner().name().to_uppercase(), Arc::new(f)); + fn register_udwf(&mut self, f: Arc) -> QueryResult<()> { + self.window_functions.insert(f.inner().name().to_uppercase(), f); Ok(()) } @@ -57,7 +93,13 @@ impl FunctionMetadataManager for SimpleFunctionMetadataManager { .ok_or_else(|| QueryError::FunctionNotExists { name: name.to_string() }) } - fn udfs(&self) -> HashSet { + fn udfs(&self) -> Vec { self.scalar_functions.keys().cloned().collect() } + fn udafs(&self) -> Vec { + self.aggregate_functions.keys().cloned().collect() + } + fn udwfs(&self) -> Vec { + self.window_functions.keys().cloned().collect() + } } diff --git a/s3select/query/src/instance.rs b/s3select/query/src/instance.rs index 03cc7b03..3ad8941a 100644 --- a/s3select/query/src/instance.rs +++ b/s3select/query/src/instance.rs @@ -141,24 +141,58 @@ mod tests { let results = result.result().chunk_result().await.unwrap().to_vec(); let expected = [ - "+----------------+----------+----------+------------+----------+", - "| column_1 | column_2 | column_3 | column_4 | column_5 |", - "+----------------+----------+----------+------------+----------+", - "| id | name | age | department | salary |", - "| 1 | Alice | 25 | HR | 5000 |", - "| 2 | Bob | 30 | IT | 6000 |", - "| 3 | Charlie | 35 | Finance | 7000 |", - "| 4 | Diana | 22 | Marketing | 4500 |", - "| 5 | Eve | 28 | IT | 5500 |", - "| 6 | Frank | 40 | Finance | 8000 |", - "| 7 | Grace | 26 | HR | 5200 |", - "| 8 | Henry | 32 | IT | 6200 |", - "| 9 | Ivy | 24 | Marketing | 4800 |", - "| 10 | Jack | 38 | Finance | 7500 |", - "+----------------+----------+----------+------------+----------+", + "+----------------+---------+-----+------------+--------+", + "| id | name | age | department | salary |", + "+----------------+---------+-----+------------+--------+", + "| 1 | Alice | 25 | HR | 5000 |", + "| 2 | Bob | 30 | IT | 6000 |", + "| 3 | Charlie | 35 | Finance | 7000 |", + "| 4 | Diana | 22 | Marketing | 4500 |", + "| 5 | Eve | 28 | IT | 5500 |", + "| 6 | Frank | 40 | Finance | 8000 |", + "| 7 | Grace | 26 | HR | 5200 |", + "| 8 | Henry | 32 | IT | 6200 |", + "| 9 | Ivy | 24 | Marketing | 4800 |", + "| 10 | Jack | 38 | Finance | 7500 |", + "+----------------+---------+-----+------------+--------+", ]; assert_batches_eq!(expected, &results); pretty::print_batches(&results).unwrap(); } + + #[tokio::test] + #[ignore] + async fn test_func_sql() { + let sql = "select count(s.id) from S3Object as s"; + let input = SelectObjectContentInput { + bucket: "dandan".to_string(), + expected_bucket_owner: None, + key: "test.csv".to_string(), + sse_customer_algorithm: None, + sse_customer_key: None, + sse_customer_key_md5: None, + request: SelectObjectContentRequest { + expression: sql.to_string(), + expression_type: ExpressionType::from_static("SQL"), + input_serialization: InputSerialization { + csv: Some(CSVInput::default()), + ..Default::default() + }, + output_serialization: OutputSerialization { + csv: Some(CSVOutput::default()), + ..Default::default() + }, + request_progress: None, + scan_range: None, + }, + }; + let db = make_rustfsms(input.clone(), true).await.unwrap(); + let query = Query::new(Context { input }, sql.to_string()); + + let result = db.execute(&query).await.unwrap(); + + let results = result.result().chunk_result().await.unwrap().to_vec(); + pretty::print_batches(&results).unwrap(); + } } diff --git a/s3select/query/src/metadata/mod.rs b/s3select/query/src/metadata/mod.rs index 78c79e36..04a71d4e 100644 --- a/s3select/query/src/metadata/mod.rs +++ b/s3select/query/src/metadata/mod.rs @@ -113,14 +113,14 @@ impl ContextProvider for MetadataProvider { } fn udf_names(&self) -> Vec { - todo!() + self.func_manager.udfs() } fn udaf_names(&self) -> Vec { - todo!() + self.func_manager.udafs() } fn udwf_names(&self) -> Vec { - todo!() + self.func_manager.udwfs() } } From 19943025744b245c6aa59dc3616bec78123a7842 Mon Sep 17 00:00:00 2001 From: houseme Date: Tue, 1 Apr 2025 22:06:47 +0800 Subject: [PATCH 02/10] improve tls for console --- .gitignore | 5 +- Cargo.lock | 224 +++++++++++++++++++++++++++++++++++++-- Cargo.toml | 6 +- rustfs/Cargo.toml | 4 +- rustfs/src/config/mod.rs | 12 +++ rustfs/src/console.rs | 109 +++++++++++++++++-- rustfs/src/main.rs | 22 ++-- 7 files changed, 352 insertions(+), 30 deletions(-) diff --git a/.gitignore b/.gitignore index 83b9ef43..e9172ab1 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,7 @@ .devcontainer rustfs/static/* vendor -cli/rustfs-gui/embedded-rustfs/rustfs \ No newline at end of file +cli/rustfs-gui/embedded-rustfs/rustfs +.log +config/obs.toml +config/certs/* \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 8dbf614d..fea149c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -679,6 +679,29 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "aws-lc-rs" +version = "1.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dabb68eb3a7aa08b46fddfd59a3d55c978243557a90ab804769f7e20e67d2b01" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77926887776171ced7d662120a75998e444d3750c951abfe07f90da130514b1f" +dependencies = [ + "bindgen", + "cc", + "cmake", + "dunce", + "fs_extra", +] + [[package]] name = "axum" version = "0.7.9" @@ -734,6 +757,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-server" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1ad46c3ec4e12f4a4b6835e173ba21c25e484c9d02b49770bf006ce5367c036" +dependencies = [ + "arc-swap", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "pin-project-lite", + "rustls 0.21.12", + "rustls-pemfile", + "tokio", + "tokio-rustls 0.24.1", + "tower 0.4.13", + "tower-service", +] + [[package]] name = "backon" version = "1.4.0" @@ -795,6 +841,29 @@ dependencies = [ "num-traits", ] +[[package]] +name = "bindgen" +version = "0.69.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" +dependencies = [ + "bitflags 2.9.0", + "cexpr", + "clang-sys", + "itertools 0.12.1", + "lazy_static", + "lazycell", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash 1.1.0", + "shlex", + "syn 2.0.98", + "which", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -1002,6 +1071,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfb" version = "0.7.3" @@ -1133,6 +1211,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading 0.8.6", +] + [[package]] name = "clap" version = "4.5.31" @@ -1173,6 +1262,15 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" +[[package]] +name = "cmake" +version = "0.1.54" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" +dependencies = [ + "cc", +] + [[package]] name = "cocoa" version = "0.25.0" @@ -3116,6 +3214,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futf" version = "0.1.5" @@ -3853,10 +3957,12 @@ dependencies = [ "http", "hyper", "hyper-util", - "rustls", + "log", + "rustls 0.23.23", + "rustls-native-certs", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tower-service", "webpki-roots", ] @@ -4172,6 +4278,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -4331,6 +4446,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "lexical-core" version = "1.0.5" @@ -5281,6 +5402,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + [[package]] name = "option-ext" version = "0.2.0" @@ -6034,7 +6161,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.1.1", - "rustls", + "rustls 0.23.23", "socket2", "thiserror 2.0.11", "tokio", @@ -6052,7 +6179,7 @@ dependencies = [ "rand 0.8.5", "ring", "rustc-hash 2.1.1", - "rustls", + "rustls 0.23.23", "rustls-pki-types", "slab", "thiserror 2.0.11", @@ -6354,7 +6481,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls", + "rustls 0.23.23", "rustls-pemfile", "rustls-pki-types", "serde", @@ -6363,7 +6490,7 @@ dependencies = [ "sync_wrapper", "system-configuration", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tokio-util", "tower 0.5.2", "tower-service", @@ -6529,6 +6656,7 @@ dependencies = [ "async-trait", "atoi", "axum", + "axum-server", "bytes", "chrono", "clap", @@ -6545,6 +6673,7 @@ dependencies = [ "http", "http-body", "hyper", + "hyper-rustls", "hyper-util", "iam", "jsonwebtoken", @@ -6623,21 +6752,46 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + [[package]] name = "rustls" version = "0.23.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47796c98c480fce5406ef69d1c76378375492c3b0a0de587be0c1d9feb12f395" dependencies = [ + "aws-lc-rs", "log", "once_cell", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.102.8", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework 3.2.0", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -6656,12 +6810,23 @@ dependencies = [ "web-time", ] +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.102.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", @@ -6746,6 +6911,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -6758,6 +6932,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -7722,13 +7906,23 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls", + "rustls 0.23.23", "tokio", ] @@ -7838,7 +8032,7 @@ dependencies = [ "rustls-pemfile", "socket2", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tokio-stream", "tower 0.4.13", "tower-layer", @@ -8594,6 +8788,18 @@ dependencies = [ "windows-core 0.58.0", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index adae8650..ed7abe71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,9 +47,10 @@ flatbuffers = "24.12.23" futures = "0.3.31" futures-util = "0.3.31" common = { path = "./common/common" } -policy = {path = "./policy"} +policy = { path = "./policy" } hex = "0.4.3" hyper = "1.6.0" +hyper-rustls = { version = "0.27.5", features = ["http2"] } hyper-util = { version = "0.1.10", features = [ "tokio", "server-auto", @@ -72,7 +73,7 @@ prost-types = "0.13.4" protobuf = "3.7" protos = { path = "./common/protos" } rand = "0.8.5" -reqwest = { version = "0.12.12", default-features = false, features = ["rustls-tls", "charset", "http2", "macos-system-configuration", "stream","blocking"] } +reqwest = { version = "0.12.12", default-features = false, features = ["rustls-tls", "charset", "http2", "macos-system-configuration", "stream", "blocking"] } rfd = { version = "0.15.2", default-features = false, features = ["xdg-portal", "tokio"] } rmp = "0.8.14" rmp-serde = "1.3.0" @@ -114,6 +115,7 @@ uuid = { version = "1.15.1", features = [ ] } log = "0.4.25" axum = "0.7.9" +axum-server = { version = "0.6", features = ["tls-rustls"] } md-5 = "0.10.6" workers = { path = "./common/workers" } test-case = "3.3.1" diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 493971d0..4c506863 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -24,12 +24,13 @@ csv = "1.3.1" datafusion = { workspace = true } common.workspace = true ecstore.workspace = true -policy.workspace =true +policy.workspace = true flatbuffers.workspace = true futures.workspace = true futures-util.workspace = true h2 = "0.4.7" hyper.workspace = true +hyper-rustls.workspace = true hyper-util.workspace = true http.workspace = true http-body.workspace = true @@ -65,6 +66,7 @@ transform-stream.workspace = true uuid = "1.15.1" url.workspace = true axum.workspace = true +axum-server = { workspace = true } matchit = "0.8.6" shadow-rs.workspace = true const-str = { version = "0.6.1", features = ["std", "proc"] } diff --git a/rustfs/src/config/mod.rs b/rustfs/src/config/mod.rs index cd8f238f..b6c3d370 100644 --- a/rustfs/src/config/mod.rs +++ b/rustfs/src/config/mod.rs @@ -7,6 +7,14 @@ shadow_rs::shadow!(build); pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin"; pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin"; +/// Default TLS key for rustfs +/// This is the default key for TLS. +pub const RUSTFS_TLS_KEY: &str = "rustfs_tls_key.pem"; + +/// Default TLS cert for rustfs +/// This is the default cert for TLS. +pub const RUSTFS_TLS_CERT: &str = "rustfs_tls_cert.pem"; + #[allow(clippy::const_is_empty)] const SHORT_VERSION: &str = { if !build::TAG.is_empty() { @@ -62,4 +70,8 @@ pub struct Opt { #[arg(long, default_value_t = format!("127.0.0.1:{}", 9002), env = "RUSTFS_CONSOLE_ADDRESS")] pub console_address: String, + + /// tls path for rustfs api and console. + #[arg(long, env = "RUSTFS_TLS_PATH")] + pub tls_path: Option, } diff --git a/rustfs/src/console.rs b/rustfs/src/console.rs index 8ccf0fb8..602997b6 100644 --- a/rustfs/src/console.rs +++ b/rustfs/src/console.rs @@ -6,19 +6,24 @@ use axum::{ routing::get, Router, }; - +use axum_server::tls_rustls::RustlsConfig; use mime_guess::from_path; use rust_embed::RustEmbed; use serde::Serialize; use shadow_rs::shadow; use std::net::{Ipv4Addr, SocketAddr, ToSocketAddrs}; use std::sync::OnceLock; -use tracing::info; +use std::time::Duration; +use tokio::signal; +use tracing::{debug, error, info}; shadow!(build); const RUSTFS_ADMIN_PREFIX: &str = "/rustfs/admin/v3"; +const RUSTFS_CONSOLE_TLS_KEY: &str = "rustfs_console_tls_key.pem"; +const RUSTFS_CONSOLE_TLS_CERT: &str = "rustfs_console_tls_cert.pem"; + #[derive(RustEmbed)] #[folder = "$CARGO_MANIFEST_DIR/static"] struct StaticFiles; @@ -189,18 +194,104 @@ async fn config_handler(Host(host): Host) -> impl IntoResponse { .unwrap() } -pub async fn start_static_file_server(addrs: &str, local_ip: Ipv4Addr, access_key: &str, secret_key: &str) { - // 创建路由 +pub async fn start_static_file_server( + addrs: &str, + local_ip: Ipv4Addr, + access_key: &str, + secret_key: &str, + tls_path: Option, +) { + // Create a route let app = Router::new() .route("/config.json", get(config_handler)) .nest_service("/", get(static_handler)); - - let listener = tokio::net::TcpListener::bind(addrs).await.unwrap(); - let local_addr = listener.local_addr().unwrap(); - + let local_addr: SocketAddr = addrs.parse().expect("Failed to parse socket address"); info!("WebUI: http://{}:{} http://127.0.0.1:{}", local_ip, local_addr.port(), local_addr.port()); info!(" RootUser: {}", access_key); info!(" RootPass: {}", secret_key); - axum::serve(listener, app).await.unwrap(); + let tls_path = tls_path.unwrap_or_default(); + let key_path = format!("{}/{}", tls_path, RUSTFS_CONSOLE_TLS_KEY); + let cert_path = format!("{}/{}", tls_path, RUSTFS_CONSOLE_TLS_CERT); + // Check and start the HTTPS/HTTP server + match start_server(addrs, local_addr, &key_path, &cert_path, app.clone()).await { + Ok(_) => info!("Server shutdown gracefully"), + Err(e) => error!("Server error: {}", e), + } +} +async fn start_server(addrs: &str, local_addr: SocketAddr, key_path: &str, cert_path: &str, app: Router) -> std::io::Result<()> { + let has_tls_certs = tokio::try_join!(tokio::fs::metadata(key_path), tokio::fs::metadata(cert_path)).is_ok(); + + if has_tls_certs { + debug!("Found TLS certificates, starting with HTTPS"); + match tokio::try_join!(tokio::fs::read(key_path), tokio::fs::read(cert_path)) { + Ok((key_data, cert_data)) => { + match RustlsConfig::from_pem(cert_data, key_data).await { + Ok(config) => { + let handle = axum_server::Handle::new(); + // create a signal off listening task + let handle_clone = handle.clone(); + tokio::spawn(async move { + shutdown_signal().await; + handle_clone.graceful_shutdown(Some(Duration::from_secs(10))); + }); + debug!("Starting HTTPS server..."); + axum_server::bind_rustls(local_addr, config) + .handle(handle.clone()) + .serve(app.into_make_service()) + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + + Ok(()) + } + Err(e) => { + error!("Failed to create TLS config: {}", e); + start_http_server(addrs, app).await + } + } + } + Err(e) => { + error!("Failed to read TLS certificates: {}", e); + start_http_server(addrs, app).await + } + } + } else { + debug!("TLS certificates not found at {} and {}", key_path, cert_path); + start_http_server(addrs, app).await + } +} + +async fn start_http_server(addrs: &str, app: Router) -> std::io::Result<()> { + debug!("Starting HTTP server..."); + let listener = tokio::net::TcpListener::bind(addrs).await.unwrap(); + axum::serve(listener, app) + .with_graceful_shutdown(shutdown_signal()) + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) +} + +async fn shutdown_signal() { + let ctrl_c = async { + signal::ctrl_c().await.expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + info!("shutdown_signal ctrl_c") + }, + _ = terminate => { + info!("shutdown_signal terminate") + }, + } } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 35187ba9..cd4f5c62 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -87,7 +87,7 @@ fn main() -> Result<()> { //解析获得到的参数 let opt = config::Opt::parse(); - //设置trace + //设置 trace setup_tracing(); //运行参数 @@ -110,18 +110,17 @@ async fn run(opt: config::Opt) -> Result<()> { debug!("server_address {}", &server_address); - //设置AK和SK - + //设置 AK 和 SK iam::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone())).unwrap(); set_global_rustfs_port(server_port); - //监听地址,端口从参数中获取 + //监听地址,端口从参数中获取 let listener = TcpListener::bind(server_address.clone()).await?; //获取监听地址 let local_addr: SocketAddr = listener.local_addr()?; let local_ip = utils::get_local_ip().ok_or(local_addr.ip()).unwrap(); - // 用于rpc + // 用于 rpc let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(server_address.clone().as_str(), opt.volumes.clone()) .map_err(|err| Error::from_string(err.to_string()))?; @@ -172,7 +171,7 @@ async fn run(opt: config::Opt) -> Result<()> { .map_err(|err| Error::from_string(err.to_string()))?; // Setup S3 service - // 本项目使用s3s库来实现s3服务 + // 本项目使用 s3s 库来实现 s3 服务 let service = { let store = storage::ecfs::FS::new(); // let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(server_address.clone(), endpoint_pools).await?); @@ -180,7 +179,7 @@ async fn run(opt: config::Opt) -> Result<()> { let access_key = opt.access_key.clone(); let secret_key = opt.secret_key.clone(); - //显示info信息 + //显示 info 信息 debug!("authentication is enabled {}, {}", &access_key, &secret_key); b.set_auth(IAMAuth::new(access_key, secret_key)); @@ -294,8 +293,15 @@ async fn run(opt: config::Opt) -> Result<()> { let access_key = opt.access_key.clone(); let secret_key = opt.secret_key.clone(); let console_address = opt.console_address.clone(); + let tls_path = opt.tls_path.clone(); + + if console_address.is_empty() { + error!("console_address is empty"); + return Err(Error::from_string("console_address is empty".to_string())); + } + tokio::spawn(async move { - console::start_static_file_server(&console_address, local_ip, &access_key, &secret_key).await; + console::start_static_file_server(&console_address, local_ip, &access_key, &secret_key, tls_path).await; }); } From 28edca1b63dc1c4aeb3d947884c6f161c6b2924e Mon Sep 17 00:00:00 2001 From: houseme Date: Tue, 1 Apr 2025 23:09:47 +0800 Subject: [PATCH 03/10] add rustls --- Cargo.lock | 4 ++++ Cargo.toml | 4 ++++ rustfs/Cargo.toml | 4 ++++ rustfs/src/config/mod.rs | 4 ++-- rustfs/src/main.rs | 31 ++++++++++++++++++++++++++----- rustfs/src/utils.rs | 27 +++++++++++++++++++++++++++ 6 files changed, 67 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fea149c5..147bd87d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6696,6 +6696,9 @@ dependencies = [ "query", "rmp-serde", "rust-embed", + "rustls 0.23.23", + "rustls-pemfile", + "rustls-pki-types", "s3s", "serde", "serde_json", @@ -6703,6 +6706,7 @@ dependencies = [ "shadow-rs", "time", "tokio", + "tokio-rustls 0.26.2", "tokio-stream", "tokio-util", "tonic", diff --git a/Cargo.toml b/Cargo.toml index ed7abe71..8e71d2a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,9 @@ rfd = { version = "0.15.2", default-features = false, features = ["xdg-portal", rmp = "0.8.14" rmp-serde = "1.3.0" rust-embed = "8.6.0" +rustls = { version = "0.23" } +rustls-pki-types = "1.11.0" +rustls-pemfile = "2.2.0" s3s = { git = "https://github.com/Nugine/s3s.git", rev = "ab139f72fe768fb9d8cecfe36269451da1ca9779", default-features = true, features = [ "tower", ] } @@ -99,6 +102,7 @@ tokio = { version = "1.43.0", features = ["fs", "rt-multi-thread"] } tonic = { version = "0.12.3", features = ["gzip"] } tonic-build = "0.12.3" tonic-reflection = "0.12" +tokio-rustls = { version = "0.26", default-features = false } tokio-stream = "0.1.17" tokio-util = { version = "0.7.13", features = ["io", "compat"] } tower = { version = "0.5.2", features = ["timeout"] } diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 4c506863..da0f2ddf 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -43,6 +43,9 @@ prost-types.workspace = true protos.workspace = true protobuf.workspace = true rmp-serde.workspace = true +rustls.workspace = true +rustls-pemfile.workspace = true +rustls-pki-types.workspace = true s3s.workspace = true serde.workspace = true serde_json.workspace = true @@ -55,6 +58,7 @@ tokio = { workspace = true, features = [ "net", "signal", ] } +tokio-rustls.workspace = true lazy_static.workspace = true tokio-stream.workspace = true tonic = { version = "0.12.3", features = ["gzip"] } diff --git a/rustfs/src/config/mod.rs b/rustfs/src/config/mod.rs index b6c3d370..ada5307e 100644 --- a/rustfs/src/config/mod.rs +++ b/rustfs/src/config/mod.rs @@ -9,11 +9,11 @@ pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin"; /// Default TLS key for rustfs /// This is the default key for TLS. -pub const RUSTFS_TLS_KEY: &str = "rustfs_tls_key.pem"; +pub(crate) const RUSTFS_TLS_KEY: &str = "rustfs_tls_key.pem"; /// Default TLS cert for rustfs /// This is the default cert for TLS. -pub const RUSTFS_TLS_CERT: &str = "rustfs_tls_cert.pem"; +pub(crate) const RUSTFS_TLS_CERT: &str = "rustfs_tls_cert.pem"; #[allow(clippy::const_is_empty)] const SHORT_VERSION: &str = { diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index cd4f5c62..a34b182d 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -9,13 +9,14 @@ mod utils; use crate::auth::IAMAuth; use crate::console::{init_console_cfg, CONSOLE_CONFIG}; +use crate::utils::error; use chrono::Datelike; use clap::Parser; use common::{ error::{Error, Result}, globals::set_global_addr, }; -use config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY}; +use config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY}; use ecstore::heal::background_heal_ops::init_auto_heal; use ecstore::utils::net::{self, get_available_port}; use ecstore::{ @@ -26,6 +27,7 @@ use ecstore::{ update_erasure_type, }; use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys}; +use futures_util::TryFutureExt; use grpc::make_server; use hyper_util::{ rt::{TokioExecutor, TokioIo}, @@ -34,10 +36,13 @@ use hyper_util::{ }; use iam::init_iam_sys; use protos::proto_gen::node_service::node_service_server::NodeServiceServer; +use rustls::ServerConfig; use s3s::{host::MultiDomain, service::S3ServiceBuilder}; use service::hybrid; +use std::sync::Arc; use std::{io::IsTerminal, net::SocketAddr}; use tokio::net::TcpListener; +use tokio_rustls::TlsAcceptor; use tonic::{metadata::MetadataValue, Request, Status}; use tower_http::cors::CorsLayer; use tracing::{debug, error, info, warn}; @@ -211,6 +216,22 @@ async fn run(opt: config::Opt) -> Result<()> { }; let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth); + let tls_path = opt.tls_path.clone().unwrap_or_default(); + let key_path = format!("{}/{}", tls_path, RUSTFS_TLS_KEY); + let cert_path = format!("{}/{}", tls_path, RUSTFS_TLS_CERT); + + let has_tls_certs = tokio::try_join!(tokio::fs::metadata(key_path.clone()), tokio::fs::metadata(cert_path.clone())).is_ok(); + + if has_tls_certs { + let certs = utils::load_certs(cert_path.as_str()).map_err(|e| error(e.to_string()))?; + let key = utils::load_private_key(key_path.as_str()).map_err(|e| error(e.to_string()))?; + let mut server_config = ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(certs, key) + .map_err(|e| error(e.to_string()))?; + server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; + let tls_acceptor = TlsAcceptor::from(Arc::new(server_config)); + }; tokio::spawn(async move { let hyper_service = service.into_shared(); @@ -250,10 +271,10 @@ async fn run(opt: config::Opt) -> Result<()> { tokio::select! { () = graceful.shutdown() => { - tracing::debug!("Gracefully shutdown!"); + debug!("Gracefully shutdown!"); }, () = tokio::time::sleep(std::time::Duration::from_secs(10)) => { - tracing::debug!("Waited 10 seconds for graceful shutdown, aborting..."); + debug!("Waited 10 seconds for graceful shutdown, aborting..."); } } }); @@ -267,7 +288,7 @@ async fn run(opt: config::Opt) -> Result<()> { })?; ECStore::init(store.clone()).await.map_err(|err| { - error!("ECStore init faild {:?}", &err); + error!("ECStore init failed {:?}", &err); Error::from_string(err.to_string()) })?; debug!("init store success!"); @@ -275,7 +296,7 @@ async fn run(opt: config::Opt) -> Result<()> { init_iam_sys(store.clone()).await.unwrap(); new_global_notification_sys(endpoint_pools.clone()).await.map_err(|err| { - error!("new_global_notification_sys faild {:?}", &err); + error!("new_global_notification_sys failed {:?}", &err); Error::from_string(err.to_string()) })?; diff --git a/rustfs/src/utils.rs b/rustfs/src/utils.rs index 1496bd3f..b90d942b 100644 --- a/rustfs/src/utils.rs +++ b/rustfs/src/utils.rs @@ -1,4 +1,7 @@ +use rustls_pemfile::{certs, private_key}; +use rustls_pki_types::{CertificateDer, PrivateKeyDer}; use std::net::IpAddr; +use std::{fs, io}; pub(crate) fn get_local_ip() -> Option { match local_ip_address::local_ip() { @@ -7,3 +10,27 @@ pub(crate) fn get_local_ip() -> Option { Ok(IpAddr::V6(_)) => todo!(), } } + +/// Load public certificate from file. +pub(crate) fn load_certs(filename: &str) -> io::Result>> { + // Open certificate file. + let cert_file = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?; + let mut reader = io::BufReader::new(cert_file); + + // Load and return certificate. + certs(&mut reader).collect() +} + +/// Load private key from file. +pub(crate) fn load_private_key(filename: &str) -> io::Result> { + // Open keyfile. + let keyfile = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?; + let mut reader = io::BufReader::new(keyfile); + + // Load and return a single private key. + private_key(&mut reader).map(|key| key.unwrap()) +} + +pub(crate) fn error(err: String) -> io::Error { + io::Error::new(io::ErrorKind::Other, err) +} From de0e9bee20537dcaae3f7cdc253e72aae03000e9 Mon Sep 17 00:00:00 2001 From: houseme Date: Tue, 1 Apr 2025 23:27:48 +0800 Subject: [PATCH 04/10] Log records uniformly use `tracing` --- Cargo.lock | 1 - rustfs/Cargo.toml | 2 +- rustfs/src/storage/ecfs.rs | 42 +++++++++++++++++++------------------- 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 147bd87d..b453c2bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6680,7 +6680,6 @@ dependencies = [ "lazy_static", "local-ip-address", "lock", - "log", "madmin", "matchit 0.8.6", "mime", diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index da0f2ddf..9ae6b580 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -16,7 +16,7 @@ workspace = true [dependencies] madmin.workspace = true -log.workspace = true +#log.workspace = true async-trait.workspace = true bytes.workspace = true clap.workspace = true diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 1fc410cb..3cee79f2 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -46,7 +46,6 @@ use futures::pin_mut; use futures::{Stream, StreamExt}; use http::HeaderMap; use lazy_static::lazy_static; -use log::warn; use policy::auth; use policy::policy::action::Action; use policy::policy::action::S3Action; @@ -70,6 +69,7 @@ use tokio_util::io::StreamReader; use tracing::debug; use tracing::error; use tracing::info; +use tracing::warn; use transform_stream::AsyncTryStream; use uuid::Uuid; @@ -230,7 +230,7 @@ impl S3 for FS { #[tracing::instrument(level = "debug", skip(self, req))] async fn delete_bucket(&self, req: S3Request) -> S3Result> { let input = req.input; - // TODO: DeleteBucketInput 没有force参数? + // TODO: DeleteBucketInput 没有 force 参数? let Some(store) = new_object_layer_fn() else { return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); }; @@ -586,8 +586,8 @@ impl S3 for FS { .await .is_ok() || authorize_request(&mut req, Action::S3Action(S3Action::GetBucketLocationAction)) - .await - .is_ok() + .await + .is_ok() }) }); } @@ -1256,7 +1256,7 @@ impl S3 for FS { conditions: &conditions, object: "", }) - .await; + .await; let write_olny = PolicySys::is_allowed(&BucketPolicyArgs { bucket: &bucket, @@ -1267,7 +1267,7 @@ impl S3 for FS { conditions: &conditions, object: "", }) - .await; + .await; let is_public = read_olny && write_olny; @@ -1680,11 +1680,11 @@ impl S3 for FS { // TODO: valid target list if let Some(NotificationConfiguration { - event_bridge_configuration, - lambda_function_configurations, - queue_configurations, - topic_configurations, - }) = has_notification_config + event_bridge_configuration, + lambda_function_configurations, + queue_configurations, + topic_configurations, + }) = has_notification_config { Ok(S3Response::new(GetBucketNotificationConfigurationOutput { event_bridge_configuration, @@ -1785,10 +1785,10 @@ impl S3 for FS { // !gs.is_empty() && gs.first().is_some_and(|g| { - g.to_owned() - .permission - .is_some_and(|p| p.as_str() == Permission::FULL_CONTROL) - }) + g.to_owned() + .permission + .is_some_and(|p| p.as_str() == Permission::FULL_CONTROL) + }) }) }); @@ -1855,10 +1855,10 @@ impl S3 for FS { // !gs.is_empty() && gs.first().is_some_and(|g| { - g.to_owned() - .permission - .is_some_and(|p| p.as_str() == Permission::FULL_CONTROL) - }) + g.to_owned() + .permission + .is_some_and(|p| p.as_str() == Permission::FULL_CONTROL) + }) }) }); @@ -1934,9 +1934,9 @@ impl S3 for FS { } #[allow(dead_code)] -pub fn bytes_stream(stream: S, content_length: usize) -> impl Stream> + Send + 'static +pub fn bytes_stream(stream: S, content_length: usize) -> impl Stream> + Send + 'static where - S: Stream> + Send + 'static, + S: Stream> + Send + 'static, E: Send + 'static, { AsyncTryStream::::new(|mut y| async move { From d017409f5bd98f2a34a52fa09f17239f30510ed2 Mon Sep 17 00:00:00 2001 From: houseme Date: Wed, 2 Apr 2025 00:48:08 +0800 Subject: [PATCH 05/10] add rustfs tls --- Cargo.lock | 30 ------------------------- Cargo.toml | 2 +- rustfs/Cargo.toml | 2 +- rustfs/src/main.rs | 56 +++++++++++++++++++++++++++++++++++----------- 4 files changed, 45 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b453c2bf..26424790 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3957,9 +3957,7 @@ dependencies = [ "http", "hyper", "hyper-util", - "log", "rustls 0.23.23", - "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls 0.26.2", @@ -5402,12 +5400,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" -[[package]] -name = "openssl-probe" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" - [[package]] name = "option-ext" version = "0.2.0" @@ -6673,7 +6665,6 @@ dependencies = [ "http", "http-body", "hyper", - "hyper-rustls", "hyper-util", "iam", "jsonwebtoken", @@ -6783,18 +6774,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-native-certs" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" -dependencies = [ - "openssl-probe", - "rustls-pki-types", - "schannel", - "security-framework 3.2.0", -] - [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -6914,15 +6893,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "schannel" -version = "0.1.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" -dependencies = [ - "windows-sys 0.59.0", -] - [[package]] name = "scoped-tls" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 8e71d2a4..2e97d0eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,7 @@ common = { path = "./common/common" } policy = { path = "./policy" } hex = "0.4.3" hyper = "1.6.0" -hyper-rustls = { version = "0.27.5", features = ["http2"] } +#hyper-rustls = { version = "0.27.5", features = ["http2"] } hyper-util = { version = "0.1.10", features = [ "tokio", "server-auto", diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 9ae6b580..c1a5d071 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -30,7 +30,7 @@ futures.workspace = true futures-util.workspace = true h2 = "0.4.7" hyper.workspace = true -hyper-rustls.workspace = true +#hyper-rustls.workspace = true hyper-util.workspace = true http.workspace = true http-body.workspace = true diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index a34b182d..8edc9e72 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -27,7 +27,6 @@ use ecstore::{ update_erasure_type, }; use ecstore::{global::set_global_rustfs_port, notification_sys::new_global_notification_sys}; -use futures_util::TryFutureExt; use grpc::make_server; use hyper_util::{ rt::{TokioExecutor, TokioIo}, @@ -216,13 +215,14 @@ async fn run(opt: config::Opt) -> Result<()> { }; let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth); + let tls_path = opt.tls_path.clone().unwrap_or_default(); let key_path = format!("{}/{}", tls_path, RUSTFS_TLS_KEY); let cert_path = format!("{}/{}", tls_path, RUSTFS_TLS_CERT); - let has_tls_certs = tokio::try_join!(tokio::fs::metadata(key_path.clone()), tokio::fs::metadata(cert_path.clone())).is_ok(); - - if has_tls_certs { + let tls_acceptor = if has_tls_certs { + debug!("Found TLS certificates, starting with HTTPS"); + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); let certs = utils::load_certs(cert_path.as_str()).map_err(|e| error(e.to_string()))?; let key = utils::load_private_key(key_path.as_str()).map_err(|e| error(e.to_string()))?; let mut server_config = ServerConfig::builder() @@ -230,12 +230,14 @@ async fn run(opt: config::Opt) -> Result<()> { .with_single_cert(certs, key) .map_err(|e| error(e.to_string()))?; server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; - let tls_acceptor = TlsAcceptor::from(Arc::new(server_config)); + Some(TlsAcceptor::from(Arc::new(server_config))) + } else { + debug!("TLS certificates not found, starting with HTTP"); + None }; tokio::spawn(async move { let hyper_service = service.into_shared(); - let hybrid_service = TowerToHyperService::new( tower::ServiceBuilder::new() .layer(CorsLayer::permissive()) @@ -245,8 +247,10 @@ async fn run(opt: config::Opt) -> Result<()> { let http_server = ConnBuilder::new(TokioExecutor::new()); let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c()); let graceful = hyper_util::server::graceful::GracefulShutdown::new(); - + debug!("graceful initiated"); loop { + debug!("waiting for SIGINT or SIGTERM has_tls_certs: {}", has_tls_certs); + // Wait for a connection let (socket, _) = tokio::select! { res = listener.accept() => { match res { @@ -261,12 +265,38 @@ async fn run(opt: config::Opt) -> Result<()> { break; } }; - - let conn = http_server.serve_connection(TokioIo::new(socket), hybrid_service.clone()); - let conn = graceful.watch(conn.into_owned()); - tokio::spawn(async move { - let _ = conn.await; - }); + if has_tls_certs { + debug!("TLS certificates found, starting with SIGINT"); + let tls_socket = match tls_acceptor.as_ref().ok_or_else(|| error("TLS not configured".to_string())).unwrap().accept(socket).await { + Ok(tls_socket) => tls_socket, + Err(err) => { + error!("TLS handshake failed {}", err); + continue; + } + }; + let conn = http_server.serve_connection(TokioIo::new(tls_socket), hybrid_service.clone()); + let conn = graceful.watch(conn.into_owned()); + tokio::task::spawn_blocking(move || { + tokio::runtime::Runtime::new() + .expect("Failed to create runtime") + .block_on(async move { + if let Err(err) = conn.await { + error!("Https Connection error: {}", err); + } + }); + }); + debug!("TLS handshake success"); + } else { + debug!("Http handshake start"); + let conn = http_server.serve_connection(TokioIo::new(socket), hybrid_service.clone()); + let conn = graceful.watch(conn.into_owned()); + tokio::spawn(async move { + if let Err(err) = conn.await { + error!("Http Connection error: {}", err); + } + }); + debug!("Http handshake success"); + } } tokio::select! { From 15efeb572fde6f6bac775846b6d29328970169e6 Mon Sep 17 00:00:00 2001 From: houseme Date: Wed, 2 Apr 2025 00:51:59 +0800 Subject: [PATCH 06/10] improve crate and remove log crate --- Cargo.lock | 3 --- Cargo.toml | 3 +-- cli/rustfs-gui/Cargo.toml | 1 - iam/Cargo.toml | 1 - iam/src/cache.rs | 12 ++++++------ iam/src/lib.rs | 2 +- iam/src/manager.rs | 2 +- policy/Cargo.toml | 1 - rustfs/Cargo.toml | 6 ++---- 9 files changed, 11 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26424790..1db4030e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4012,7 +4012,6 @@ dependencies = [ "itertools 0.14.0", "jsonwebtoken", "lazy_static", - "log", "madmin", "policy", "rand 0.8.5", @@ -5845,7 +5844,6 @@ dependencies = [ "itertools 0.14.0", "jsonwebtoken", "lazy_static", - "log", "madmin", "rand 0.8.5", "regex", @@ -6719,7 +6717,6 @@ dependencies = [ "chrono", "dioxus", "dirs 6.0.0", - "futures-util", "hex", "keyring", "lazy_static", diff --git a/Cargo.toml b/Cargo.toml index 2e97d0eb..20f6a009 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,6 +106,7 @@ tokio-rustls = { version = "0.26", default-features = false } tokio-stream = "0.1.17" tokio-util = { version = "0.7.13", features = ["io", "compat"] } tower = { version = "0.5.2", features = ["timeout"] } +tower-http = { version = "0.6.2", features = ["cors"] } tracing = "0.1.41" tracing-error = "0.2.1" tracing-subscriber = { version = "0.3.19", features = ["env-filter", "time"] } @@ -117,13 +118,11 @@ uuid = { version = "1.15.1", features = [ "fast-rng", "macro-diagnostics", ] } -log = "0.4.25" axum = "0.7.9" axum-server = { version = "0.6", features = ["tls-rustls"] } md-5 = "0.10.6" workers = { path = "./common/workers" } test-case = "3.3.1" -zip = "2.2.3" snafu = "0.8.5" diff --git a/cli/rustfs-gui/Cargo.toml b/cli/rustfs-gui/Cargo.toml index c9af2fc3..faca71ee 100644 --- a/cli/rustfs-gui/Cargo.toml +++ b/cli/rustfs-gui/Cargo.toml @@ -10,7 +10,6 @@ version.workspace = true chrono = { workspace = true } dioxus = { workspace = true, features = ["router"] } dirs = { workspace = true } -futures-util = { workspace = true } hex = { workspace = true } keyring = { workspace = true } lazy_static = { workspace = true } diff --git a/iam/Cargo.toml b/iam/Cargo.toml index 5afc93c5..fce678b6 100644 --- a/iam/Cargo.toml +++ b/iam/Cargo.toml @@ -11,7 +11,6 @@ workspace = true [dependencies] tokio.workspace = true -log.workspace = true time = { workspace = true, features = ["serde-human-readable"] } serde = { workspace = true, features = ["derive", "rc"] } ecstore = { path = "../ecstore" } diff --git a/iam/src/cache.rs b/iam/src/cache.rs index f1692116..b5dd20c3 100644 --- a/iam/src/cache.rs +++ b/iam/src/cache.rs @@ -6,12 +6,12 @@ use std::{ }; use arc_swap::{ArcSwap, AsRaw, Guard}; -use log::warn; use policy::{ auth::UserIdentity, policy::{Args, PolicyDoc}, }; use time::OffsetDateTime; +use tracing::warn; use crate::store::{GroupInfo, MappedPolicy}; @@ -63,7 +63,7 @@ impl Cache { let mut new = CacheEntity::clone(&cur); op(&mut new); - // 使用cas原子替换内容 + // 使用 cas 原子替换内容 let prev = target.compare_and_swap(&*cur, Arc::new(new)); let swapped = Self::ptr_eq(&*cur, &*prev); if swapped { @@ -112,8 +112,8 @@ impl CacheInner { // todo!() // } - // /// 如果是临时用户,返回Ok(Some(partent_name))) - // /// 如果不是临时用户,返回Ok(None) + // /// 如果是临时用户,返回 Ok(Some(partent_name))) + // /// 如果不是临时用户,返回 Ok(None) // fn is_temp_user(&self, user_name: &str) -> crate::Result> { // let user = self // .get_user(user_name) @@ -126,8 +126,8 @@ impl CacheInner { // } // } - // /// 如果是临时用户,返回Ok(Some(partent_name))) - // /// 如果不是临时用户,返回Ok(None) + // /// 如果是临时用户,返回 Ok(Some(partent_name))) + // /// 如果不是临时用户,返回 Ok(None) // fn is_service_account(&self, user_name: &str) -> crate::Result> { // let user = self // .get_user(user_name) diff --git a/iam/src/lib.rs b/iam/src/lib.rs index 88392bb9..d88da755 100644 --- a/iam/src/lib.rs +++ b/iam/src/lib.rs @@ -1,12 +1,12 @@ use common::error::{Error, Result}; use ecstore::store::ECStore; use error::Error as IamError; -use log::debug; use manager::IamCache; use policy::auth::Credentials; use std::sync::{Arc, OnceLock}; use store::object::ObjectStore; use sys::IamSys; +use tracing::debug; pub mod cache; pub mod error; diff --git a/iam/src/manager.rs b/iam/src/manager.rs index f4fc076c..73994e22 100644 --- a/iam/src/manager.rs +++ b/iam/src/manager.rs @@ -11,7 +11,6 @@ use crate::{ use common::error::{Error, Result}; use ecstore::config::error::is_err_config_not_found; use ecstore::utils::{crypto::base64_encode, path::path_join_buf}; -use log::{debug, warn}; use madmin::{AccountStatus, AddOrUpdateUserReq, GroupDesc}; use policy::{ arn::ARN, @@ -40,6 +39,7 @@ use tokio::{ }, }; use tracing::error; +use tracing::{debug, warn}; const IAM_FORMAT_FILE: &str = "format.json"; const IAM_FORMAT_VERSION_1: i32 = 1; diff --git a/policy/Cargo.toml b/policy/Cargo.toml index da2c4636..56e33d72 100644 --- a/policy/Cargo.toml +++ b/policy/Cargo.toml @@ -11,7 +11,6 @@ workspace = true [dependencies] tokio.workspace = true -log.workspace = true time = { workspace = true, features = ["serde-human-readable"] } serde = { workspace = true, features = ["derive", "rc"] } serde_json.workspace = true diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index c1a5d071..80540621 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -16,7 +16,6 @@ workspace = true [dependencies] madmin.workspace = true -#log.workspace = true async-trait.workspace = true bytes.workspace = true clap.workspace = true @@ -30,7 +29,6 @@ futures.workspace = true futures-util.workspace = true h2 = "0.4.7" hyper.workspace = true -#hyper-rustls.workspace = true hyper-util.workspace = true http.workspace = true http-body.workspace = true @@ -61,7 +59,7 @@ tokio = { workspace = true, features = [ tokio-rustls.workspace = true lazy_static.workspace = true tokio-stream.workspace = true -tonic = { version = "0.12.3", features = ["gzip"] } +tonic.workspace = true tonic-reflection.workspace = true tower.workspace = true tracing-error.workspace = true @@ -81,7 +79,7 @@ query = { path = "../s3select/query" } api = { path = "../s3select/api" } iam = { path = "../iam" } jsonwebtoken = "9.3.0" -tower-http = { version = "0.6.2", features = ["cors"] } +tower-http.workspace = true mime_guess = "2.0.5" rust-embed = { workspace = true, features = ["interpolate-folder-path"] } local-ip-address = { workspace = true } From b365aab902cbcab0dc2a717ee813b51feeef089a Mon Sep 17 00:00:00 2001 From: houseme Date: Wed, 2 Apr 2025 01:12:43 +0800 Subject: [PATCH 07/10] Update rustfs/src/utils.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- rustfs/src/utils.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rustfs/src/utils.rs b/rustfs/src/utils.rs index b90d942b..8f72e5f7 100644 --- a/rustfs/src/utils.rs +++ b/rustfs/src/utils.rs @@ -28,7 +28,10 @@ pub(crate) fn load_private_key(filename: &str) -> io::Result Ok(key), + None => Err(error(format!("no private key found in {}", filename))), + } } pub(crate) fn error(err: String) -> io::Error { From f47a417319b0a580e4af4ca9e07badcbfedaaab7 Mon Sep 17 00:00:00 2001 From: houseme Date: Wed, 2 Apr 2025 01:12:54 +0800 Subject: [PATCH 08/10] Update rustfs/src/utils.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- rustfs/src/utils.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rustfs/src/utils.rs b/rustfs/src/utils.rs index 8f72e5f7..b0ccb228 100644 --- a/rustfs/src/utils.rs +++ b/rustfs/src/utils.rs @@ -18,7 +18,8 @@ pub(crate) fn load_certs(filename: &str) -> io::Result, _>>()?; + Ok(certs) } /// Load private key from file. From 2a90b3bb70338666708c210ad4cafd7cfde5e346 Mon Sep 17 00:00:00 2001 From: houseme Date: Wed, 2 Apr 2025 01:18:44 +0800 Subject: [PATCH 09/10] improve code --- rustfs/src/utils.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rustfs/src/utils.rs b/rustfs/src/utils.rs index b0ccb228..2d12f64d 100644 --- a/rustfs/src/utils.rs +++ b/rustfs/src/utils.rs @@ -29,10 +29,7 @@ pub(crate) fn load_private_key(filename: &str) -> io::Result Ok(key), - None => Err(error(format!("no private key found in {}", filename))), - } + private_key(&mut reader)?.ok_or_else(|| error(format!("no private key found in {}", filename))) } pub(crate) fn error(err: String) -> io::Error { From 8d4c3dfa0e01cd14fe78ebc22cd7a8912f56effd Mon Sep 17 00:00:00 2001 From: houseme Date: Wed, 2 Apr 2025 08:37:06 +0800 Subject: [PATCH 10/10] add example certs readme.md --- config/certs/README.md | 44 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 config/certs/README.md diff --git a/config/certs/README.md b/config/certs/README.md new file mode 100644 index 00000000..ce3dbe75 --- /dev/null +++ b/config/certs/README.md @@ -0,0 +1,44 @@ +## Certs + +### Generate a self-signed certificate + +```bash +openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes +``` + +### Generate a self-signed certificate with a specific subject + +```bash +openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes \ + -subj "/C=US/ST=California/L=San Francisco/O=My Company/CN=mydomain.com" +``` + +### Generate a self-signed certificate with a specific subject and SAN + +```bash +openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes \ + -subj "/C=US/ST=California/L=San Francisco/O=My Company/CN=mydomain.com" \ + -addext "subjectAltName=DNS:mydomain.com,DNS:www.mydomain.com" +``` + +### Generate a self-signed certificate with a specific subject and SAN (multiple SANs) + +```bash +openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes \ + -subj "/C=US/ST=California/L=San Francisco/O=My Company/CN=mydomain.com" \ + -addext "subjectAltName=DNS:mydomain.com,DNS:www.mydomain.com,DNS:api.mydomain.com" +``` + +### TLS File + +```text + + rustfs_tls_cert.pem api cert.pem + + rustfs_tls_key.pem api key.pem + + rustfs_console_tls_cert.pem console cert.pem + + rustfs_console_tls_key.pem console key.pem + +``` \ No newline at end of file