Merge main branches

This commit is contained in:
houseme
2025-04-02 15:57:11 +08:00
20 changed files with 681 additions and 134 deletions

3
.gitignore vendored
View File

@@ -10,4 +10,5 @@ rustfs/static/*
vendor
cli/rustfs-gui/embedded-rustfs/rustfs
config/obs.toml
*.log
*.log
config/certs/*

314
Cargo.lock generated
View File

@@ -684,6 +684,29 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "aws-lc-rs"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b756939cb2f8dc900aa6dcd505e6e2428e9cae7ff7b028c49e3946efa70878"
dependencies = [
"aws-lc-sys",
"zeroize",
]
[[package]]
name = "aws-lc-sys"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f7720b74ed28ca77f90769a71fd8c637a0137f6fae4ae947e1050229cff57f"
dependencies = [
"bindgen",
"cc",
"cmake",
"dunce",
"fs_extra",
]
[[package]]
name = "axum"
version = "0.7.9"
@@ -739,6 +762,29 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum-server"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1ad46c3ec4e12f4a4b6835e173ba21c25e484c9d02b49770bf006ce5367c036"
dependencies = [
"arc-swap",
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-util",
"pin-project-lite",
"rustls 0.21.12",
"rustls-pemfile",
"tokio",
"tokio-rustls 0.24.1",
"tower 0.4.13",
"tower-service",
]
[[package]]
name = "backon"
version = "1.4.1"
@@ -795,9 +841,9 @@ checksum = "89e25b6adfb930f02d1981565a6e5d9c547ac15a96606256d3b59040e5cd4ca3"
[[package]]
name = "bigdecimal"
version = "0.4.7"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f31f3af01c5c65a07985c804d3366560e6fa7883d640a122819b14ec327482c"
checksum = "1a22f228ab7a1b23027ccc6c350b72868017af7ea8356fbdf19f8d991c690013"
dependencies = [
"autocfg",
"libm",
@@ -806,6 +852,29 @@ dependencies = [
"num-traits",
]
[[package]]
name = "bindgen"
version = "0.69.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088"
dependencies = [
"bitflags 2.9.0",
"cexpr",
"clang-sys",
"itertools 0.12.1",
"lazy_static",
"lazycell",
"log",
"prettyplease",
"proc-macro2",
"quote",
"regex",
"rustc-hash 1.1.0",
"shlex",
"syn 2.0.100",
"which",
]
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -1013,6 +1082,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c"
[[package]]
name = "cexpr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
]
[[package]]
name = "cfb"
version = "0.7.3"
@@ -1145,10 +1223,21 @@ dependencies = [
]
[[package]]
name = "clap"
version = "4.5.34"
name = "clang-sys"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e958897981290da2a852763fe9cdb89cd36977a5d729023127095fa94d95e2ff"
checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4"
dependencies = [
"glob",
"libc",
"libloading 0.8.6",
]
[[package]]
name = "clap"
version = "4.5.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8aa86934b44c19c50f87cc2790e19f54f7a67aedb64101c2e1a2e5ecfb73944"
dependencies = [
"clap_builder",
"clap_derive",
@@ -1156,9 +1245,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.34"
version = "4.5.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83b0f35019843db2160b5bb19ae09b4e6411ac33fc6a712003c33e03090e2489"
checksum = "2414dbb2dd0695280da6ea9261e327479e9d37b0630f6b53ba2a11c60c679fd9"
dependencies = [
"anstream",
"anstyle",
@@ -1184,6 +1273,15 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6"
[[package]]
name = "cmake"
version = "0.1.54"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0"
dependencies = [
"cc",
]
[[package]]
name = "cocoa"
version = "0.25.0"
@@ -3100,9 +3198,9 @@ dependencies = [
[[package]]
name = "flate2"
version = "1.1.0"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11faaf5a5236997af9848be0bef4db95824b1d534ebc64d0f0c6cf3e67bd38dc"
checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece"
dependencies = [
"crc32fast",
"miniz_oxide",
@@ -3156,6 +3254,12 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "fs_extra"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "futf"
version = "0.1.5"
@@ -3907,10 +4011,10 @@ dependencies = [
"http",
"hyper",
"hyper-util",
"rustls",
"rustls 0.23.25",
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tokio-rustls 0.26.2",
"tower-service",
"webpki-roots",
]
@@ -3930,9 +4034,9 @@ dependencies = [
[[package]]
name = "hyper-util"
version = "0.1.10"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4"
checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2"
dependencies = [
"bytes",
"futures-channel",
@@ -3940,6 +4044,7 @@ dependencies = [
"http",
"http-body",
"hyper",
"libc",
"pin-project-lite",
"socket2",
"tokio",
@@ -3962,7 +4067,6 @@ dependencies = [
"itertools 0.14.0",
"jsonwebtoken",
"lazy_static",
"log",
"madmin",
"policy",
"rand 0.8.5",
@@ -3979,9 +4083,9 @@ dependencies = [
[[package]]
name = "iana-time-zone"
version = "0.1.62"
version = "0.1.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2fd658b06e56721792c5df4475705b6cda790e9298d19d2f8af083457bcd127"
checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8"
dependencies = [
"android_system_properties",
"core-foundation-sys",
@@ -3989,7 +4093,7 @@ dependencies = [
"js-sys",
"log",
"wasm-bindgen",
"windows-core 0.52.0",
"windows-core 0.61.0",
]
[[package]]
@@ -4227,6 +4331,15 @@ version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "itertools"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569"
dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.13.0"
@@ -4304,10 +4417,11 @@ checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130"
[[package]]
name = "jobserver"
version = "0.1.32"
version = "0.1.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0"
checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a"
dependencies = [
"getrandom 0.3.2",
"libc",
]
@@ -4397,6 +4511,12 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lexical-core"
version = "1.0.5"
@@ -4481,7 +4601,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e9ec52138abedcc58dc17a7c6c0c00a2bdb4f3427c7f63fa97fd0d859155caf"
dependencies = [
"gtk-sys",
"libloading",
"libloading 0.7.4",
"once_cell",
]
@@ -4510,6 +4630,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "libloading"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
dependencies = [
"cfg-if",
"windows-targets 0.52.6",
]
[[package]]
name = "libm"
version = "0.2.11"
@@ -5986,7 +6116,6 @@ dependencies = [
"itertools 0.14.0",
"jsonwebtoken",
"lazy_static",
"log",
"madmin",
"rand 0.8.5",
"regex",
@@ -6296,9 +6425,9 @@ dependencies = [
[[package]]
name = "quick-xml"
version = "0.37.3"
version = "0.37.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf763ab1c7a3aa408be466efc86efe35ed1bd3dd74173ed39d6b0d0a6f0ba148"
checksum = "a4ce8c88de324ff838700f36fb6ab86c96df0e3c4ab6ef3a9b2044465cce1369"
dependencies = [
"memchr",
"serde",
@@ -6316,7 +6445,7 @@ dependencies = [
"quinn-proto",
"quinn-udp",
"rustc-hash 2.1.1",
"rustls",
"rustls 0.23.25",
"socket2",
"thiserror 2.0.12",
"tokio",
@@ -6335,7 +6464,7 @@ dependencies = [
"rand 0.9.0",
"ring",
"rustc-hash 2.1.1",
"rustls",
"rustls 0.23.25",
"rustls-pki-types",
"slab",
"thiserror 2.0.12",
@@ -6673,7 +6802,7 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"quinn",
"rustls",
"rustls 0.23.25",
"rustls-pemfile",
"rustls-pki-types",
"serde",
@@ -6682,7 +6811,7 @@ dependencies = [
"sync_wrapper",
"system-configuration",
"tokio",
"tokio-rustls",
"tokio-rustls 0.26.2",
"tokio-util",
"tower 0.5.2",
"tower-service",
@@ -6871,6 +7000,7 @@ dependencies = [
"async-trait",
"atoi",
"axum",
"axum-server",
"bytes",
"chrono",
"clap",
@@ -6909,6 +7039,9 @@ dependencies = [
"rmp-serde",
"rust-embed",
"rustfs-obs",
"rustls 0.23.25",
"rustls-pemfile",
"rustls-pki-types",
"s3s",
"serde",
"serde_json",
@@ -6916,6 +7049,7 @@ dependencies = [
"shadow-rs",
"time",
"tokio",
"tokio-rustls 0.26.2",
"tokio-stream",
"tokio-util",
"tonic",
@@ -6939,7 +7073,6 @@ dependencies = [
"chrono",
"dioxus",
"dirs 6.0.0",
"futures-util",
"hex",
"keyring",
"lazy_static",
@@ -6997,9 +7130,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "1.0.3"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e56a18552996ac8d29ecc3b190b4fdbb2d91ca4ec396de7bbffaf43f3d637e96"
checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf"
dependencies = [
"bitflags 2.9.0",
"errno",
@@ -7008,17 +7141,30 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "rustls"
version = "0.21.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e"
dependencies = [
"log",
"ring",
"rustls-webpki 0.101.7",
"sct",
]
[[package]]
name = "rustls"
version = "0.23.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "822ee9188ac4ec04a2f0531e55d035fb2de73f18b41a63c70c2712503b6fb13c"
dependencies = [
"aws-lc-rs",
"log",
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki",
"rustls-webpki 0.103.1",
"subtle",
"zeroize",
]
@@ -7041,12 +7187,23 @@ dependencies = [
"web-time",
]
[[package]]
name = "rustls-webpki"
version = "0.101.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "rustls-webpki"
version = "0.103.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fef8b8769aaccf73098557a87cd1816b4f9c7c16811c9c77142aa695c16f2c03"
dependencies = [
"aws-lc-rs",
"ring",
"rustls-pki-types",
"untrusted",
@@ -7137,6 +7294,16 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "sct"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "security-framework"
version = "2.11.1"
@@ -7889,7 +8056,7 @@ dependencies = [
"fastrand",
"getrandom 0.3.2",
"once_cell",
"rustix 1.0.3",
"rustix 1.0.5",
"windows-sys 0.59.0",
]
@@ -8101,13 +8268,23 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
"rustls 0.21.12",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
dependencies = [
"rustls",
"rustls 0.23.25",
"tokio",
]
@@ -8217,7 +8394,7 @@ dependencies = [
"rustls-pemfile",
"socket2",
"tokio",
"tokio-rustls",
"tokio-rustls 0.26.2",
"tokio-stream",
"tower 0.4.13",
"tower-layer",
@@ -8923,8 +9100,8 @@ dependencies = [
"webview2-com-sys",
"windows",
"windows-core 0.58.0",
"windows-implement",
"windows-interface",
"windows-implement 0.58.0",
"windows-interface 0.58.0",
]
[[package]]
@@ -8949,6 +9126,18 @@ dependencies = [
"windows-core 0.58.0",
]
[[package]]
name = "which"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
dependencies = [
"either",
"home",
"once_cell",
"rustix 0.38.44",
]
[[package]]
name = "winapi"
version = "0.3.9"
@@ -8992,24 +9181,28 @@ dependencies = [
[[package]]
name = "windows-core"
version = "0.52.0"
version = "0.58.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99"
dependencies = [
"windows-implement 0.58.0",
"windows-interface 0.58.0",
"windows-result 0.2.0",
"windows-strings 0.1.0",
"windows-targets 0.52.6",
]
[[package]]
name = "windows-core"
version = "0.58.0"
version = "0.61.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99"
checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980"
dependencies = [
"windows-implement",
"windows-interface",
"windows-result 0.2.0",
"windows-strings 0.1.0",
"windows-targets 0.52.6",
"windows-implement 0.60.0",
"windows-interface 0.59.1",
"windows-link",
"windows-result 0.3.2",
"windows-strings 0.4.0",
]
[[package]]
@@ -9023,6 +9216,17 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "windows-implement"
version = "0.60.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "windows-interface"
version = "0.58.0"
@@ -9034,6 +9238,17 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "windows-interface"
version = "0.59.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "windows-link"
version = "0.1.1"
@@ -9088,6 +9303,15 @@ dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-sys"
version = "0.45.0"

View File

@@ -90,6 +90,9 @@ rmp = "0.8.14"
rmp-serde = "1.3.0"
rustfs-obs = { path = "packages/obs", version = "0.0.1" }
rust-embed = "8.6.0"
rustls = { version = "0.23" }
rustls-pki-types = "1.11.0"
rustls-pemfile = "2.2.0"
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "ab139f72fe768fb9d8cecfe36269451da1ca9779", default-features = true, features = [
"tower",
] }
@@ -111,9 +114,11 @@ tokio = { version = "1.44.0", features = ["fs", "rt-multi-thread"] }
tonic = { version = "0.12.3", features = ["gzip"] }
tonic-build = "0.12.3"
tonic-reflection = "0.12"
tokio-rustls = { version = "0.26", default-features = false }
tokio-stream = "0.1.17"
tokio-util = { version = "0.7.13", features = ["io", "compat"] }
tower = { version = "0.5.2", features = ["timeout"] }
tower-http = { version = "0.6.2", features = ["cors"] }
tracing = "0.1.41"
tracing-core = "0.1.33"
tracing-error = "0.2.1"
@@ -127,12 +132,11 @@ uuid = { version = "1.15.1", features = [
"fast-rng",
"macro-diagnostics",
] }
log = "0.4.25"
axum = "0.7.9"
axum-server = { version = "0.6", features = ["tls-rustls"] }
md-5 = "0.10.6"
workers = { path = "./common/workers" }
test-case = "3.3.1"
zip = "2.2.3"
snafu = "0.8.5"
[profile.wasm-dev]

View File

@@ -10,7 +10,6 @@ version.workspace = true
chrono = { workspace = true }
dioxus = { workspace = true, features = ["router"] }
dirs = { workspace = true }
futures-util = { workspace = true }
hex = { workspace = true }
keyring = { workspace = true }
lazy_static = { workspace = true }

44
config/certs/README.md Normal file
View File

@@ -0,0 +1,44 @@
## Certs
### Generate a self-signed certificate
```bash
openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes
```
### Generate a self-signed certificate with a specific subject
```bash
openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes \
-subj "/C=US/ST=California/L=San Francisco/O=My Company/CN=mydomain.com"
```
### Generate a self-signed certificate with a specific subject and SAN
```bash
openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes \
-subj "/C=US/ST=California/L=San Francisco/O=My Company/CN=mydomain.com" \
-addext "subjectAltName=DNS:mydomain.com,DNS:www.mydomain.com"
```
### Generate a self-signed certificate with a specific subject and SAN (multiple SANs)
```bash
openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes \
-subj "/C=US/ST=California/L=San Francisco/O=My Company/CN=mydomain.com" \
-addext "subjectAltName=DNS:mydomain.com,DNS:www.mydomain.com,DNS:api.mydomain.com"
```
### TLS File
```text
rustfs_tls_cert.pem api cert.pem
rustfs_tls_key.pem api key.pem
rustfs_console_tls_cert.pem console cert.pem
rustfs_console_tls_key.pem console key.pem
```

View File

@@ -11,7 +11,6 @@ workspace = true
[dependencies]
tokio.workspace = true
log.workspace = true
time = { workspace = true, features = ["serde-human-readable"] }
serde = { workspace = true, features = ["derive", "rc"] }
ecstore = { path = "../ecstore" }

View File

@@ -6,12 +6,12 @@ use std::{
};
use arc_swap::{ArcSwap, AsRaw, Guard};
use log::warn;
use policy::{
auth::UserIdentity,
policy::{Args, PolicyDoc},
};
use time::OffsetDateTime;
use tracing::warn;
use crate::store::{GroupInfo, MappedPolicy};
@@ -63,7 +63,7 @@ impl Cache {
let mut new = CacheEntity::clone(&cur);
op(&mut new);
// 使用cas原子替换内容
// 使用 cas 原子替换内容
let prev = target.compare_and_swap(&*cur, Arc::new(new));
let swapped = Self::ptr_eq(&*cur, &*prev);
if swapped {
@@ -112,8 +112,8 @@ impl CacheInner {
// todo!()
// }
// /// 如果是临时用户返回Ok(Some(partent_name)))
// /// 如果不是临时用户返回Ok(None)
// /// 如果是临时用户,返回 Ok(Some(partent_name)))
// /// 如果不是临时用户,返回 Ok(None)
// fn is_temp_user(&self, user_name: &str) -> crate::Result<Option<&str>> {
// let user = self
// .get_user(user_name)
@@ -126,8 +126,8 @@ impl CacheInner {
// }
// }
// /// 如果是临时用户返回Ok(Some(partent_name)))
// /// 如果不是临时用户返回Ok(None)
// /// 如果是临时用户,返回 Ok(Some(partent_name)))
// /// 如果不是临时用户,返回 Ok(None)
// fn is_service_account(&self, user_name: &str) -> crate::Result<Option<&str>> {
// let user = self
// .get_user(user_name)

View File

@@ -11,7 +11,6 @@ use crate::{
use common::error::{Error, Result};
use ecstore::config::error::is_err_config_not_found;
use ecstore::utils::{crypto::base64_encode, path::path_join_buf};
use log::{debug, warn};
use madmin::{AccountStatus, AddOrUpdateUserReq, GroupDesc};
use policy::{
arn::ARN,
@@ -40,6 +39,7 @@ use tokio::{
},
};
use tracing::error;
use tracing::{debug, warn};
const IAM_FORMAT_FILE: &str = "format.json";
const IAM_FORMAT_VERSION_1: i32 = 1;

View File

@@ -11,7 +11,6 @@ workspace = true
[dependencies]
tokio.workspace = true
log.workspace = true
time = { workspace = true, features = ["serde-human-readable"] }
serde = { workspace = true, features = ["derive", "rc"] }
serde_json.workspace = true

View File

@@ -16,7 +16,6 @@ workspace = true
[dependencies]
madmin.workspace = true
#log.workspace = true
async-trait.workspace = true
bytes.workspace = true
clap.workspace = true
@@ -24,7 +23,7 @@ csv = "1.3.1"
datafusion = { workspace = true }
common.workspace = true
ecstore.workspace = true
policy.workspace =true
policy.workspace = true
flatbuffers.workspace = true
futures.workspace = true
futures-util.workspace = true
@@ -43,6 +42,9 @@ prost-types.workspace = true
protos.workspace = true
protobuf.workspace = true
rmp-serde.workspace = true
rustls.workspace = true
rustls-pemfile.workspace = true
rustls-pki-types.workspace = true
s3s.workspace = true
serde.workspace = true
serde_json.workspace = true
@@ -55,9 +57,10 @@ tokio = { workspace = true, features = [
"net",
"signal",
] }
tokio-rustls.workspace = true
lazy_static.workspace = true
tokio-stream.workspace = true
tonic = { version = "0.12.3", features = ["gzip"] }
tonic.workspace = true
tonic-reflection.workspace = true
tower.workspace = true
tracing-core = { workspace = true }
@@ -67,6 +70,7 @@ transform-stream.workspace = true
uuid = "1.15.1"
url.workspace = true
axum.workspace = true
axum-server = { workspace = true }
matchit = "0.8.6"
shadow-rs.workspace = true
const-str = { version = "0.6.1", features = ["std", "proc"] }
@@ -77,7 +81,7 @@ query = { path = "../s3select/query" }
api = { path = "../s3select/api" }
iam = { path = "../iam" }
jsonwebtoken = "9.3.0"
tower-http = { version = "0.6.2", features = ["cors"] }
tower-http.workspace = true
mime_guess = "2.0.5"
rust-embed = { workspace = true, features = ["interpolate-folder-path"] }
local-ip-address = { workspace = true }

View File

@@ -28,6 +28,14 @@ pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";
/// Example: --obs-config /etc/rustfs/obs.toml
pub const DEFAULT_OBS_CONFIG: &str = "config/obs.toml";
/// Default TLS key for rustfs
/// This is the default key for TLS.
pub(crate) const RUSTFS_TLS_KEY: &str = "rustfs_tls_key.pem";
/// Default TLS cert for rustfs
/// This is the default cert for TLS.
pub(crate) const RUSTFS_TLS_CERT: &str = "rustfs_tls_cert.pem";
#[allow(clippy::const_is_empty)]
const SHORT_VERSION: &str = {
if !build::TAG.is_empty() {
@@ -88,4 +96,8 @@ pub struct Opt {
/// Default value: config/obs.toml
#[arg(long, default_value_t = DEFAULT_OBS_CONFIG.to_string(), env = "RUSTFS_OBS_CONFIG")]
pub obs_config: String,
/// tls path for rustfs api and console.
#[arg(long, env = "RUSTFS_TLS_PATH")]
pub tls_path: Option<String>,
}

View File

@@ -6,19 +6,24 @@ use axum::{
routing::get,
Router,
};
use axum_server::tls_rustls::RustlsConfig;
use mime_guess::from_path;
use rust_embed::RustEmbed;
use serde::Serialize;
use shadow_rs::shadow;
use std::net::{Ipv4Addr, SocketAddr, ToSocketAddrs};
use std::sync::OnceLock;
use tracing::info;
use std::time::Duration;
use tokio::signal;
use tracing::{debug, error, info};
shadow!(build);
const RUSTFS_ADMIN_PREFIX: &str = "/rustfs/admin/v3";
const RUSTFS_CONSOLE_TLS_KEY: &str = "rustfs_console_tls_key.pem";
const RUSTFS_CONSOLE_TLS_CERT: &str = "rustfs_console_tls_cert.pem";
#[derive(RustEmbed)]
#[folder = "$CARGO_MANIFEST_DIR/static"]
struct StaticFiles;
@@ -189,18 +194,104 @@ async fn config_handler(Host(host): Host) -> impl IntoResponse {
.unwrap()
}
pub async fn start_static_file_server(addrs: &str, local_ip: Ipv4Addr, access_key: &str, secret_key: &str) {
// 创建路由
pub async fn start_static_file_server(
addrs: &str,
local_ip: Ipv4Addr,
access_key: &str,
secret_key: &str,
tls_path: Option<String>,
) {
// Create a route
let app = Router::new()
.route("/config.json", get(config_handler))
.nest_service("/", get(static_handler));
let listener = tokio::net::TcpListener::bind(addrs).await.unwrap();
let local_addr = listener.local_addr().unwrap();
let local_addr: SocketAddr = addrs.parse().expect("Failed to parse socket address");
info!("WebUI: http://{}:{} http://127.0.0.1:{}", local_ip, local_addr.port(), local_addr.port());
info!(" RootUser: {}", access_key);
info!(" RootPass: {}", secret_key);
axum::serve(listener, app).await.unwrap();
let tls_path = tls_path.unwrap_or_default();
let key_path = format!("{}/{}", tls_path, RUSTFS_CONSOLE_TLS_KEY);
let cert_path = format!("{}/{}", tls_path, RUSTFS_CONSOLE_TLS_CERT);
// Check and start the HTTPS/HTTP server
match start_server(addrs, local_addr, &key_path, &cert_path, app.clone()).await {
Ok(_) => info!("Server shutdown gracefully"),
Err(e) => error!("Server error: {}", e),
}
}
async fn start_server(addrs: &str, local_addr: SocketAddr, key_path: &str, cert_path: &str, app: Router) -> std::io::Result<()> {
let has_tls_certs = tokio::try_join!(tokio::fs::metadata(key_path), tokio::fs::metadata(cert_path)).is_ok();
if has_tls_certs {
debug!("Found TLS certificates, starting with HTTPS");
match tokio::try_join!(tokio::fs::read(key_path), tokio::fs::read(cert_path)) {
Ok((key_data, cert_data)) => {
match RustlsConfig::from_pem(cert_data, key_data).await {
Ok(config) => {
let handle = axum_server::Handle::new();
// create a signal off listening task
let handle_clone = handle.clone();
tokio::spawn(async move {
shutdown_signal().await;
handle_clone.graceful_shutdown(Some(Duration::from_secs(10)));
});
debug!("Starting HTTPS server...");
axum_server::bind_rustls(local_addr, config)
.handle(handle.clone())
.serve(app.into_make_service())
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
Ok(())
}
Err(e) => {
error!("Failed to create TLS config: {}", e);
start_http_server(addrs, app).await
}
}
}
Err(e) => {
error!("Failed to read TLS certificates: {}", e);
start_http_server(addrs, app).await
}
}
} else {
debug!("TLS certificates not found at {} and {}", key_path, cert_path);
start_http_server(addrs, app).await
}
}
async fn start_http_server(addrs: &str, app: Router) -> std::io::Result<()> {
debug!("Starting HTTP server...");
let listener = tokio::net::TcpListener::bind(addrs).await.unwrap();
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c().await.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {
info!("shutdown_signal ctrl_c")
},
_ = terminate => {
info!("shutdown_signal terminate")
},
}
}

View File

@@ -10,13 +10,14 @@ mod utils;
use crate::auth::IAMAuth;
use crate::console::{init_console_cfg, CONSOLE_CONFIG};
use crate::utils::error;
use chrono::Datelike;
use clap::Parser;
use common::{
error::{Error, Result},
globals::set_global_addr,
};
use config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY};
use config::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, RUSTFS_TLS_CERT, RUSTFS_TLS_KEY};
use ecstore::heal::background_heal_ops::init_auto_heal;
use ecstore::utils::net::{self, get_available_port};
use ecstore::{
@@ -36,10 +37,13 @@ use hyper_util::{
use iam::init_iam_sys;
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use rustfs_obs::{init_obs, load_config, set_global_guard, InitLogStatus};
use rustls::ServerConfig;
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
use service::hybrid;
use std::sync::Arc;
use std::{io::IsTerminal, net::SocketAddr};
use tokio::net::TcpListener;
use tokio_rustls::TlsAcceptor;
use tonic::{metadata::MetadataValue, Request, Status};
use tower_http::cors::CorsLayer;
use tracing::{debug, error, info, info_span, warn};
@@ -128,6 +132,7 @@ async fn run(opt: config::Opt) -> Result<()> {
//设置 AK 和 SK
iam::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone()))?;
set_global_rustfs_port(server_port);
//监听地址,端口从参数中获取
@@ -228,9 +233,28 @@ async fn run(opt: config::Opt) -> Result<()> {
let rpc_service = NodeServiceServer::with_interceptor(make_server(), check_auth);
let tls_path = opt.tls_path.clone().unwrap_or_default();
let key_path = format!("{}/{}", tls_path, RUSTFS_TLS_KEY);
let cert_path = format!("{}/{}", tls_path, RUSTFS_TLS_CERT);
let has_tls_certs = tokio::try_join!(tokio::fs::metadata(key_path.clone()), tokio::fs::metadata(cert_path.clone())).is_ok();
let tls_acceptor = if has_tls_certs {
debug!("Found TLS certificates, starting with HTTPS");
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let certs = utils::load_certs(cert_path.as_str()).map_err(|e| error(e.to_string()))?;
let key = utils::load_private_key(key_path.as_str()).map_err(|e| error(e.to_string()))?;
let mut server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.map_err(|e| error(e.to_string()))?;
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
Some(TlsAcceptor::from(Arc::new(server_config)))
} else {
debug!("TLS certificates not found, starting with HTTP");
None
};
tokio::spawn(async move {
let hyper_service = service.into_shared();
let hybrid_service = TowerToHyperService::new(
tower::ServiceBuilder::new()
.layer(CorsLayer::permissive())
@@ -240,8 +264,10 @@ async fn run(opt: config::Opt) -> Result<()> {
let http_server = ConnBuilder::new(TokioExecutor::new());
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
debug!("graceful initiated");
loop {
debug!("waiting for SIGINT or SIGTERM has_tls_certs: {}", has_tls_certs);
// Wait for a connection
let (socket, _) = tokio::select! {
res = listener.accept() => {
match res {
@@ -256,12 +282,44 @@ async fn run(opt: config::Opt) -> Result<()> {
break;
}
};
let conn = http_server.serve_connection(TokioIo::new(socket), hybrid_service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
let _ = conn.await;
});
if has_tls_certs {
debug!("TLS certificates found, starting with SIGINT");
let tls_socket = match tls_acceptor
.as_ref()
.ok_or_else(|| error("TLS not configured".to_string()))
.unwrap()
.accept(socket)
.await
{
Ok(tls_socket) => tls_socket,
Err(err) => {
error!("TLS handshake failed {}", err);
continue;
}
};
let conn = http_server.serve_connection(TokioIo::new(tls_socket), hybrid_service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::task::spawn_blocking(move || {
tokio::runtime::Runtime::new()
.expect("Failed to create runtime")
.block_on(async move {
if let Err(err) = conn.await {
error!("Https Connection error: {}", err);
}
});
});
debug!("TLS handshake success");
} else {
debug!("Http handshake start");
let conn = http_server.serve_connection(TokioIo::new(socket), hybrid_service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
if let Err(err) = conn.await {
error!("Http Connection error: {}", err);
}
});
debug!("Http handshake success");
}
}
tokio::select! {
@@ -283,7 +341,7 @@ async fn run(opt: config::Opt) -> Result<()> {
})?;
ECStore::init(store.clone()).await.map_err(|err| {
error!("ECStore init faild {:?}", &err);
error!("ECStore init failed {:?}", &err);
Error::from_string(err.to_string())
})?;
debug!("init store success!");
@@ -309,8 +367,15 @@ async fn run(opt: config::Opt) -> Result<()> {
let access_key = opt.access_key.clone();
let secret_key = opt.secret_key.clone();
let console_address = opt.console_address.clone();
let tls_path = opt.tls_path.clone();
if console_address.is_empty() {
error!("console_address is empty");
return Err(Error::from_string("console_address is empty".to_string()));
}
tokio::spawn(async move {
console::start_static_file_server(&console_address, local_ip, &access_key, &secret_key).await;
console::start_static_file_server(&console_address, local_ip, &access_key, &secret_key, tls_path).await;
});
}

View File

@@ -586,8 +586,8 @@ impl S3 for FS {
.await
.is_ok()
|| authorize_request(&mut req, Action::S3Action(S3Action::GetBucketLocationAction))
.await
.is_ok()
.await
.is_ok()
})
});
}
@@ -1256,7 +1256,7 @@ impl S3 for FS {
conditions: &conditions,
object: "",
})
.await;
.await;
let write_olny = PolicySys::is_allowed(&BucketPolicyArgs {
bucket: &bucket,
@@ -1267,7 +1267,7 @@ impl S3 for FS {
conditions: &conditions,
object: "",
})
.await;
.await;
let is_public = read_olny && write_olny;
@@ -1680,11 +1680,11 @@ impl S3 for FS {
// TODO: valid target list
if let Some(NotificationConfiguration {
event_bridge_configuration,
lambda_function_configurations,
queue_configurations,
topic_configurations,
}) = has_notification_config
event_bridge_configuration,
lambda_function_configurations,
queue_configurations,
topic_configurations,
}) = has_notification_config
{
Ok(S3Response::new(GetBucketNotificationConfigurationOutput {
event_bridge_configuration,
@@ -1785,10 +1785,10 @@ impl S3 for FS {
//
!gs.is_empty()
&& gs.first().is_some_and(|g| {
g.to_owned()
.permission
.is_some_and(|p| p.as_str() == Permission::FULL_CONTROL)
})
g.to_owned()
.permission
.is_some_and(|p| p.as_str() == Permission::FULL_CONTROL)
})
})
});
@@ -1855,10 +1855,10 @@ impl S3 for FS {
//
!gs.is_empty()
&& gs.first().is_some_and(|g| {
g.to_owned()
.permission
.is_some_and(|p| p.as_str() == Permission::FULL_CONTROL)
})
g.to_owned()
.permission
.is_some_and(|p| p.as_str() == Permission::FULL_CONTROL)
})
})
});
@@ -1934,9 +1934,9 @@ impl S3 for FS {
}
#[allow(dead_code)]
pub fn bytes_stream<S, E>(stream: S, content_length: usize) -> impl Stream<Item = Result<Bytes, E>> + Send + 'static
pub fn bytes_stream<S, E>(stream: S, content_length: usize) -> impl Stream<Item=Result<Bytes, E>> + Send + 'static
where
S: Stream<Item = Result<Bytes, E>> + Send + 'static,
S: Stream<Item=Result<Bytes, E>> + Send + 'static,
E: Send + 'static,
{
AsyncTryStream::<Bytes, E, _>::new(|mut y| async move {

View File

@@ -1,4 +1,7 @@
use rustls_pemfile::{certs, private_key};
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
use std::net::IpAddr;
use std::{fs, io};
pub(crate) fn get_local_ip() -> Option<std::net::Ipv4Addr> {
match local_ip_address::local_ip() {
@@ -7,3 +10,28 @@ pub(crate) fn get_local_ip() -> Option<std::net::Ipv4Addr> {
Ok(IpAddr::V6(_)) => todo!(),
}
}
/// Load public certificate from file.
pub(crate) fn load_certs(filename: &str) -> io::Result<Vec<CertificateDer<'static>>> {
// Open certificate file.
let cert_file = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?;
let mut reader = io::BufReader::new(cert_file);
// Load and return certificate.
let certs = certs(&mut reader).collect::<Result<Vec<_>, _>>()?;
Ok(certs)
}
/// Load private key from file.
pub(crate) fn load_private_key(filename: &str) -> io::Result<PrivateKeyDer<'static>> {
// Open keyfile.
let keyfile = fs::File::open(filename).map_err(|e| error(format!("failed to open {}: {}", filename, e)))?;
let mut reader = io::BufReader::new(keyfile);
// Load and return a single private key.
private_key(&mut reader)?.ok_or_else(|| error(format!("no private key found in {}", filename)))
}
pub(crate) fn error(err: String) -> io::Error {
io::Error::new(io::ErrorKind::Other, err)
}

View File

@@ -1,4 +1,3 @@
use std::collections::HashSet;
use std::sync::Arc;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
@@ -7,11 +6,11 @@ use crate::QueryResult;
pub type FuncMetaManagerRef = Arc<dyn FunctionMetadataManager + Send + Sync>;
pub trait FunctionMetadataManager {
fn register_udf(&mut self, udf: ScalarUDF) -> QueryResult<()>;
fn register_udf(&mut self, udf: Arc<ScalarUDF>) -> QueryResult<()>;
fn register_udaf(&mut self, udaf: AggregateUDF) -> QueryResult<()>;
fn register_udaf(&mut self, udaf: Arc<AggregateUDF>) -> QueryResult<()>;
fn register_udwf(&mut self, udwf: WindowUDF) -> QueryResult<()>;
fn register_udwf(&mut self, udwf: Arc<WindowUDF>) -> QueryResult<()>;
fn udf(&self, name: &str) -> QueryResult<Arc<ScalarUDF>>;
@@ -19,5 +18,7 @@ pub trait FunctionMetadataManager {
fn udwf(&self, name: &str) -> QueryResult<Arc<WindowUDF>>;
fn udfs(&self) -> HashSet<String>;
fn udfs(&self) -> Vec<String>;
fn udafs(&self) -> Vec<String>;
fn udwfs(&self) -> Vec<String>;
}

View File

@@ -139,7 +139,7 @@ impl SimpleQueryDispatcher {
let path = format!("s3://{}/{}", self.input.bucket, self.input.key);
let table_path = ListingTableUrl::parse(path)?;
let listing_options = if self.input.request.input_serialization.csv.is_some() {
let file_format = CsvFormat::default().with_options(CsvOptions::default().with_has_header(false));
let file_format = CsvFormat::default().with_options(CsvOptions::default().with_has_header(true));
ListingOptions::new(Arc::new(file_format)).with_file_extension(".csv")
} else if self.input.request.input_serialization.parquet.is_some() {
let file_format = ParquetFormat::new();

View File

@@ -1,13 +1,15 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use api::query::function::FunctionMetadataManager;
use api::{QueryError, QueryResult};
use datafusion::execution::SessionStateDefaults;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
use tracing::debug;
pub type SimpleFunctionMetadataManagerRef = Arc<SimpleFunctionMetadataManager>;
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct SimpleFunctionMetadataManager {
/// Scalar functions that are registered with the context
pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
@@ -17,19 +19,53 @@ pub struct SimpleFunctionMetadataManager {
pub window_functions: HashMap<String, Arc<WindowUDF>>,
}
impl Default for SimpleFunctionMetadataManager {
fn default() -> Self {
let mut func_meta_manager = Self {
scalar_functions: Default::default(),
aggregate_functions: Default::default(),
window_functions: Default::default(),
};
SessionStateDefaults::default_scalar_functions().into_iter().for_each(|udf| {
let existing_udf = func_meta_manager.register_udf(udf.clone());
if let Ok(()) = existing_udf {
debug!("Overwrote an existing UDF: {}", udf.name());
}
});
SessionStateDefaults::default_aggregate_functions()
.into_iter()
.for_each(|udaf| {
let existing_udaf = func_meta_manager.register_udaf(udaf.clone());
if let Ok(()) = existing_udaf {
debug!("Overwrote an existing UDAF: {}", udaf.name());
}
});
SessionStateDefaults::default_window_functions().into_iter().for_each(|udwf| {
let existing_udwf = func_meta_manager.register_udwf(udwf.clone());
if let Ok(()) = existing_udwf {
debug!("Overwrote an existing UDWF: {}", udwf.name());
}
});
func_meta_manager
}
}
impl FunctionMetadataManager for SimpleFunctionMetadataManager {
fn register_udf(&mut self, f: ScalarUDF) -> QueryResult<()> {
self.scalar_functions.insert(f.inner().name().to_uppercase(), Arc::new(f));
fn register_udf(&mut self, f: Arc<ScalarUDF>) -> QueryResult<()> {
self.scalar_functions.insert(f.inner().name().to_uppercase(), f);
Ok(())
}
fn register_udaf(&mut self, f: AggregateUDF) -> QueryResult<()> {
self.aggregate_functions.insert(f.inner().name().to_uppercase(), Arc::new(f));
fn register_udaf(&mut self, f: Arc<AggregateUDF>) -> QueryResult<()> {
self.aggregate_functions.insert(f.inner().name().to_uppercase(), f);
Ok(())
}
fn register_udwf(&mut self, f: WindowUDF) -> QueryResult<()> {
self.window_functions.insert(f.inner().name().to_uppercase(), Arc::new(f));
fn register_udwf(&mut self, f: Arc<WindowUDF>) -> QueryResult<()> {
self.window_functions.insert(f.inner().name().to_uppercase(), f);
Ok(())
}
@@ -57,7 +93,13 @@ impl FunctionMetadataManager for SimpleFunctionMetadataManager {
.ok_or_else(|| QueryError::FunctionNotExists { name: name.to_string() })
}
fn udfs(&self) -> HashSet<String> {
fn udfs(&self) -> Vec<String> {
self.scalar_functions.keys().cloned().collect()
}
fn udafs(&self) -> Vec<String> {
self.aggregate_functions.keys().cloned().collect()
}
fn udwfs(&self) -> Vec<String> {
self.window_functions.keys().cloned().collect()
}
}

View File

@@ -141,24 +141,58 @@ mod tests {
let results = result.result().chunk_result().await.unwrap().to_vec();
let expected = [
"+----------------+----------+----------+------------+----------+",
"| column_1 | column_2 | column_3 | column_4 | column_5 |",
"+----------------+----------+----------+------------+----------+",
"| id | name | age | department | salary |",
"| 1 | Alice | 25 | HR | 5000 |",
"| 2 | Bob | 30 | IT | 6000 |",
"| 3 | Charlie | 35 | Finance | 7000 |",
"| 4 | Diana | 22 | Marketing | 4500 |",
"| 5 | Eve | 28 | IT | 5500 |",
"| 6 | Frank | 40 | Finance | 8000 |",
"| 7 | Grace | 26 | HR | 5200 |",
"| 8 | Henry | 32 | IT | 6200 |",
"| 9 | Ivy | 24 | Marketing | 4800 |",
"| 10 | Jack | 38 | Finance | 7500 |",
"+----------------+----------+----------+------------+----------+",
"+----------------+---------+-----+------------+--------+",
"| id | name | age | department | salary |",
"+----------------+---------+-----+------------+--------+",
"| 1 | Alice | 25 | HR | 5000 |",
"| 2 | Bob | 30 | IT | 6000 |",
"| 3 | Charlie | 35 | Finance | 7000 |",
"| 4 | Diana | 22 | Marketing | 4500 |",
"| 5 | Eve | 28 | IT | 5500 |",
"| 6 | Frank | 40 | Finance | 8000 |",
"| 7 | Grace | 26 | HR | 5200 |",
"| 8 | Henry | 32 | IT | 6200 |",
"| 9 | Ivy | 24 | Marketing | 4800 |",
"| 10 | Jack | 38 | Finance | 7500 |",
"+----------------+---------+-----+------------+--------+",
];
assert_batches_eq!(expected, &results);
pretty::print_batches(&results).unwrap();
}
#[tokio::test]
#[ignore]
async fn test_func_sql() {
let sql = "select count(s.id) from S3Object as s";
let input = SelectObjectContentInput {
bucket: "dandan".to_string(),
expected_bucket_owner: None,
key: "test.csv".to_string(),
sse_customer_algorithm: None,
sse_customer_key: None,
sse_customer_key_md5: None,
request: SelectObjectContentRequest {
expression: sql.to_string(),
expression_type: ExpressionType::from_static("SQL"),
input_serialization: InputSerialization {
csv: Some(CSVInput::default()),
..Default::default()
},
output_serialization: OutputSerialization {
csv: Some(CSVOutput::default()),
..Default::default()
},
request_progress: None,
scan_range: None,
},
};
let db = make_rustfsms(input.clone(), true).await.unwrap();
let query = Query::new(Context { input }, sql.to_string());
let result = db.execute(&query).await.unwrap();
let results = result.result().chunk_result().await.unwrap().to_vec();
pretty::print_batches(&results).unwrap();
}
}

View File

@@ -113,14 +113,14 @@ impl ContextProvider for MetadataProvider {
}
fn udf_names(&self) -> Vec<String> {
todo!()
self.func_manager.udfs()
}
fn udaf_names(&self) -> Vec<String> {
todo!()
self.func_manager.udafs()
}
fn udwf_names(&self) -> Vec<String> {
todo!()
self.func_manager.udwfs()
}
}