From d3cff7d033cb939c44b2e9ef59f80af1090ef4b7 Mon Sep 17 00:00:00 2001 From: yxrxy Date: Sat, 14 Mar 2026 23:06:53 +0800 Subject: [PATCH] feat(webdav): add WebDAV protocol gateway (#2158) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: yxrxy <1532529704@qq.com> Co-authored-by: houseme Co-authored-by: 马登山 Co-authored-by: heihutu <30542132+heihutu@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Co-authored-by: 安正超 --- .gitignore | 1 + Cargo.lock | 113 ++ Cargo.toml | 3 + crates/config/src/constants/protocols.rs | 12 + crates/e2e_test/src/protocols/README.md | 24 +- crates/e2e_test/src/protocols/mod.rs | 3 +- crates/e2e_test/src/protocols/test_runner.rs | 17 +- crates/e2e_test/src/protocols/webdav_core.rs | 207 ++++ crates/protocols/Cargo.toml | 7 + crates/protocols/src/common/gateway.rs | 29 + crates/protocols/src/common/session.rs | 1 + crates/protocols/src/constants.rs | 13 + crates/protocols/src/lib.rs | 6 + crates/protocols/src/webdav/README.md | 191 ++++ crates/protocols/src/webdav/config.rs | 103 ++ crates/protocols/src/webdav/driver.rs | 1082 ++++++++++++++++++ crates/protocols/src/webdav/mod.rs | 17 + crates/protocols/src/webdav/server.rs | 334 ++++++ rustfs/Cargo.toml | 3 +- rustfs/src/init.rs | 70 ++ rustfs/src/main.rs | 37 +- 21 files changed, 2265 insertions(+), 8 deletions(-) create mode 100644 crates/e2e_test/src/protocols/webdav_core.rs create mode 100644 crates/protocols/src/webdav/README.md create mode 100644 crates/protocols/src/webdav/config.rs create mode 100644 crates/protocols/src/webdav/driver.rs create mode 100644 crates/protocols/src/webdav/mod.rs create mode 100644 crates/protocols/src/webdav/server.rs diff --git a/.gitignore b/.gitignore index e54161a4..78604601 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,4 @@ docs # nix stuff result* *.gz +rustfs-webdav.code-workspace diff --git a/Cargo.lock b/Cargo.lock index 476264d2..1fd20905 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2855,6 +2855,38 @@ dependencies = [ "sqlparser", ] +[[package]] +name = "dav-server" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88e9e4e7a3546a5b348518694e9f3ed5cf3fc8856e50141c197f54d79b5714a8" +dependencies = [ + "bytes", + "chrono", + "derive-where", + "dyn-clone", + "futures-channel", + "futures-util", + "headers", + "htmlescape", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "libc", + "log", + "lru", + "mime_guess", + "parking_lot", + "percent-encoding", + "pin-project-lite", + "reflink-copy", + "tokio", + "url", + "uuid", + "xml-rs", + "xmltree", +] + [[package]] name = "debugid" version = "0.8.0" @@ -2926,6 +2958,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "derive-where" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef941ded77d15ca19b40374869ac6000af1c9f2a4c0f3d4c70926287e6364a8f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "derive_builder" version = "0.12.0" @@ -4020,6 +4063,30 @@ dependencies = [ "serde_core", ] +[[package]] +name = "headers" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb" +dependencies = [ + "base64 0.22.1", + "bytes", + "headers-core", + "http 1.4.0", + "httpdate", + "mime", + "sha1 0.10.6", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http 1.4.0", +] + [[package]] name = "heapless" version = "0.8.0" @@ -4100,6 +4167,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "htmlescape" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9025058dae765dee5070ec375f591e2ba14638c63feff74f13805a72e523163" + [[package]] name = "http" version = "0.2.12" @@ -6815,6 +6888,18 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "reflink-copy" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13362233b147e57674c37b802d216b7c5e3dcccbed8967c84f0d8d223868ae27" +dependencies = [ + "cfg-if", + "libc", + "rustix 1.1.4", + "windows", +] + [[package]] name = "regex" version = "1.12.3" @@ -7745,6 +7830,7 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", + "dav-server", "futures", "futures-util", "hex", @@ -7752,6 +7838,8 @@ dependencies = [ "http 1.4.0", "http-body 1.0.1", "http-body-util", + "hyper", + "hyper-util", "libunftp", "md5", "percent-encoding", @@ -7773,6 +7861,7 @@ dependencies = [ "thiserror 2.0.18", "time", "tokio", + "tokio-rustls", "tokio-util", "tower", "tracing", @@ -10612,12 +10701,36 @@ dependencies = [ "rustix 1.1.4", ] +[[package]] +name = "xml" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8aa498d22c9bbaf482329839bc5620c46be275a19a812e9a22a2b07529a642a" + +[[package]] +name = "xml-rs" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3a56132a0d6ecbe77352edc10232f788fc4ceefefff4cab784a98e0e16b6b51" +dependencies = [ + "xml", +] + [[package]] name = "xmlparser" version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" +[[package]] +name = "xmltree" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbc04313cab124e498ab1724e739720807b6dc405b9ed0edc5860164d2e4ff70" +dependencies = [ + "xml", +] + [[package]] name = "xxhash-rust" version = "0.8.15" diff --git a/Cargo.toml b/Cargo.toml index c6741465..82b2f691 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -287,6 +287,9 @@ unftp-core = "0.1.0" suppaftp = { version = "8.0.2", features = ["tokio", "tokio-rustls-aws-lc-rs"] } rcgen = "0.14.7" +# WebDAV +dav-server = "0.11.0" + # Performance Analysis and Memory Profiling mimalloc = "0.1" # Use tikv-jemallocator as memory allocator and enable performance analysis diff --git a/crates/config/src/constants/protocols.rs b/crates/config/src/constants/protocols.rs index 8649fe69..eb3f2ae7 100644 --- a/crates/config/src/constants/protocols.rs +++ b/crates/config/src/constants/protocols.rs @@ -45,3 +45,15 @@ pub const ENV_FTPS_CERTS_DIR: &str = "RUSTFS_FTPS_CERTS_DIR"; pub const ENV_FTPS_CA_FILE: &str = "RUSTFS_FTPS_CA_FILE"; pub const ENV_FTPS_PASSIVE_PORTS: &str = "RUSTFS_FTPS_PASSIVE_PORTS"; pub const ENV_FTPS_EXTERNAL_IP: &str = "RUSTFS_FTPS_EXTERNAL_IP"; + +/// Default WebDAV server bind address +pub const DEFAULT_WEBDAV_ADDRESS: &str = "0.0.0.0:8080"; + +/// WebDAV environment variable names +pub const ENV_WEBDAV_ENABLE: &str = "RUSTFS_WEBDAV_ENABLE"; +pub const ENV_WEBDAV_ADDRESS: &str = "RUSTFS_WEBDAV_ADDRESS"; +pub const ENV_WEBDAV_TLS_ENABLED: &str = "RUSTFS_WEBDAV_TLS_ENABLED"; +pub const ENV_WEBDAV_CERTS_DIR: &str = "RUSTFS_WEBDAV_CERTS_DIR"; +pub const ENV_WEBDAV_CA_FILE: &str = "RUSTFS_WEBDAV_CA_FILE"; +pub const ENV_WEBDAV_MAX_BODY_SIZE: &str = "RUSTFS_WEBDAV_MAX_BODY_SIZE"; +pub const ENV_WEBDAV_REQUEST_TIMEOUT: &str = "RUSTFS_WEBDAV_REQUEST_TIMEOUT"; diff --git a/crates/e2e_test/src/protocols/README.md b/crates/e2e_test/src/protocols/README.md index 31b1b85e..9506b57b 100644 --- a/crates/e2e_test/src/protocols/README.md +++ b/crates/e2e_test/src/protocols/README.md @@ -1,6 +1,6 @@ # Protocol E2E Tests -FTPS protocol end-to-end tests for RustFS. +FTPS and WebDAV protocol end-to-end tests for RustFS. ## Prerequisites @@ -19,11 +19,21 @@ brew install sshpass openssh ## Running Tests -Run all protocol tests: +Run all protocol tests (FTPS + WebDAV): +```bash +RUSTFS_BUILD_FEATURES=ftps,webdav cargo test --package e2e_test test_protocol_core_suite -- --test-threads=1 --nocapture +``` + +Run FTPS tests only: ```bash RUSTFS_BUILD_FEATURES=ftps cargo test --package e2e_test test_protocol_core_suite -- --test-threads=1 --nocapture ``` +Run WebDAV tests only: +```bash +RUSTFS_BUILD_FEATURES=webdav cargo test --package e2e_test test_protocol_core_suite -- --test-threads=1 --nocapture +``` + ## Test Coverage ### FTPS Tests @@ -38,3 +48,13 @@ RUSTFS_BUILD_FEATURES=ftps cargo test --package e2e_test test_protocol_core_suit - cdup - rmdir delete bucket +### WebDAV Tests +- PROPFIND at root (list buckets) +- MKCOL (create bucket) +- PUT (upload file) +- GET (download file) +- PROPFIND on bucket (list objects) +- DELETE file +- DELETE bucket +- Authentication failure test + diff --git a/crates/e2e_test/src/protocols/mod.rs b/crates/e2e_test/src/protocols/mod.rs index 18589fd6..20daf490 100644 --- a/crates/e2e_test/src/protocols/mod.rs +++ b/crates/e2e_test/src/protocols/mod.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Protocol tests for FTPS +//! Protocol tests for FTPS and WebDAV pub mod ftps_core; pub mod test_env; pub mod test_runner; +pub mod webdav_core; diff --git a/crates/e2e_test/src/protocols/test_runner.rs b/crates/e2e_test/src/protocols/test_runner.rs index 57794aa2..631aa28f 100644 --- a/crates/e2e_test/src/protocols/test_runner.rs +++ b/crates/e2e_test/src/protocols/test_runner.rs @@ -16,6 +16,7 @@ use crate::common::init_logging; use crate::protocols::ftps_core::test_ftps_core_operations; +use crate::protocols::webdav_core::test_webdav_core_operations; use std::time::Instant; use tokio::time::{Duration, sleep}; use tracing::{error, info}; @@ -59,9 +60,14 @@ struct TestDefinition { impl ProtocolTestSuite { /// Create default test suite pub fn new() -> Self { - let tests = vec![TestDefinition { - name: "test_ftps_core_operations".to_string(), - }]; + let tests = vec![ + TestDefinition { + name: "test_ftps_core_operations".to_string(), + }, + TestDefinition { + name: "test_webdav_core_operations".to_string(), + }, + ]; Self { tests } } @@ -83,6 +89,10 @@ impl ProtocolTestSuite { info!("=== Starting FTPS Module Test ==="); "FTPS core operations (put, ls, mkdir, rmdir, delete)" } + "test_webdav_core_operations" => { + info!("=== Starting WebDAV Core Test ==="); + "WebDAV core operations (MKCOL, PUT, GET, DELETE, PROPFIND)" + } _ => "", }; @@ -121,6 +131,7 @@ impl ProtocolTestSuite { async fn run_single_test(&self, test_def: &TestDefinition) -> Result<(), Box> { match test_def.name.as_str() { "test_ftps_core_operations" => test_ftps_core_operations().await.map_err(|e| e.into()), + "test_webdav_core_operations" => test_webdav_core_operations().await.map_err(|e| e.into()), _ => Err(format!("Test {} not implemented", test_def.name).into()), } } diff --git a/crates/e2e_test/src/protocols/webdav_core.rs b/crates/e2e_test/src/protocols/webdav_core.rs new file mode 100644 index 00000000..7b7471b2 --- /dev/null +++ b/crates/e2e_test/src/protocols/webdav_core.rs @@ -0,0 +1,207 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Core WebDAV tests + +use crate::common::rustfs_binary_path; +use crate::protocols::test_env::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, ProtocolTestEnvironment}; +use anyhow::Result; +use base64::Engine; +use reqwest::Client; +use tokio::process::Command; +use tracing::info; + +// Fixed WebDAV port for testing +const WEBDAV_PORT: u16 = 9080; +const WEBDAV_ADDRESS: &str = "127.0.0.1:9080"; + +/// Create HTTP client with basic auth +fn create_client() -> Client { + Client::builder() + .danger_accept_invalid_certs(true) + .build() + .expect("Failed to create HTTP client") +} + +/// Get basic auth header value +fn basic_auth_header() -> String { + let credentials = format!("{}:{}", DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY); + let encoded = base64::engine::general_purpose::STANDARD.encode(credentials); + format!("Basic {}", encoded) +} + +/// Test WebDAV: MKCOL (create bucket), PUT, GET, DELETE, PROPFIND operations +pub async fn test_webdav_core_operations() -> Result<()> { + let env = ProtocolTestEnvironment::new().map_err(|e| anyhow::anyhow!("{}", e))?; + + // Start server manually + info!("Starting WebDAV server on {}", WEBDAV_ADDRESS); + let binary_path = rustfs_binary_path(); + let mut server_process = Command::new(&binary_path) + .env("RUSTFS_WEBDAV_ENABLE", "true") + .env("RUSTFS_WEBDAV_ADDRESS", WEBDAV_ADDRESS) + .env("RUSTFS_WEBDAV_TLS_ENABLED", "false") // No TLS for testing + .arg(&env.temp_dir) + .spawn()?; + + // Ensure server is cleaned up even on failure + let result = async { + // Wait for server to be ready + ProtocolTestEnvironment::wait_for_port_ready(WEBDAV_PORT, 30) + .await + .map_err(|e| anyhow::anyhow!("{}", e))?; + + let client = create_client(); + let auth_header = basic_auth_header(); + let base_url = format!("http://{}", WEBDAV_ADDRESS); + + // Test PROPFIND at root (list buckets) + info!("Testing WebDAV: PROPFIND at root (list buckets)"); + let resp = client + .request(reqwest::Method::from_bytes(b"PROPFIND").unwrap(), &base_url) + .header("Authorization", &auth_header) + .header("Depth", "1") + .send() + .await?; + assert!( + resp.status().is_success() || resp.status().as_u16() == 207, + "PROPFIND at root should succeed, got: {}", + resp.status() + ); + info!("PASS: PROPFIND at root successful"); + + // Test MKCOL (create bucket) + let bucket_name = "webdav-test-bucket"; + info!("Testing WebDAV: MKCOL (create bucket '{}')", bucket_name); + let resp = client + .request(reqwest::Method::from_bytes(b"MKCOL").unwrap(), format!("{}/{}", base_url, bucket_name)) + .header("Authorization", &auth_header) + .send() + .await?; + assert!( + resp.status().is_success() || resp.status().as_u16() == 201, + "MKCOL should succeed, got: {}", + resp.status() + ); + info!("PASS: MKCOL bucket '{}' successful", bucket_name); + + // Test PUT (upload file) + let filename = "test-file.txt"; + let file_content = "Hello, WebDAV!"; + info!("Testing WebDAV: PUT (upload file '{}')", filename); + let resp = client + .put(format!("{}/{}/{}", base_url, bucket_name, filename)) + .header("Authorization", &auth_header) + .body(file_content) + .send() + .await?; + assert!( + resp.status().is_success() || resp.status().as_u16() == 201, + "PUT should succeed, got: {}", + resp.status() + ); + info!("PASS: PUT file '{}' successful", filename); + + // Test GET (download file) + info!("Testing WebDAV: GET (download file '{}')", filename); + let resp = client + .get(format!("{}/{}/{}", base_url, bucket_name, filename)) + .header("Authorization", &auth_header) + .send() + .await?; + assert!(resp.status().is_success(), "GET should succeed, got: {}", resp.status()); + let downloaded_content = resp.text().await?; + assert_eq!(downloaded_content, file_content, "Downloaded content should match uploaded content"); + info!("PASS: GET file '{}' successful, content matches", filename); + + // Test PROPFIND on bucket (list objects) + info!("Testing WebDAV: PROPFIND on bucket (list objects)"); + let resp = client + .request(reqwest::Method::from_bytes(b"PROPFIND").unwrap(), format!("{}/{}", base_url, bucket_name)) + .header("Authorization", &auth_header) + .header("Depth", "1") + .send() + .await?; + assert!( + resp.status().is_success() || resp.status().as_u16() == 207, + "PROPFIND on bucket should succeed, got: {}", + resp.status() + ); + let body = resp.text().await?; + assert!(body.contains(filename), "File should appear in PROPFIND response"); + info!("PASS: PROPFIND on bucket successful, file '{}' found", filename); + + // Test DELETE file + info!("Testing WebDAV: DELETE file '{}'", filename); + let resp = client + .delete(format!("{}/{}/{}", base_url, bucket_name, filename)) + .header("Authorization", &auth_header) + .send() + .await?; + assert!( + resp.status().is_success() || resp.status().as_u16() == 204, + "DELETE file should succeed, got: {}", + resp.status() + ); + info!("PASS: DELETE file '{}' successful", filename); + + // Verify file is deleted + info!("Testing WebDAV: Verify file is deleted"); + let resp = client + .get(format!("{}/{}/{}", base_url, bucket_name, filename)) + .header("Authorization", &auth_header) + .send() + .await?; + assert!( + resp.status().as_u16() == 404, + "GET deleted file should return 404, got: {}", + resp.status() + ); + info!("PASS: Verified file '{}' is deleted", filename); + + // Test DELETE bucket + info!("Testing WebDAV: DELETE bucket '{}'", bucket_name); + let resp = client + .delete(format!("{}/{}", base_url, bucket_name)) + .header("Authorization", &auth_header) + .send() + .await?; + assert!( + resp.status().is_success() || resp.status().as_u16() == 204, + "DELETE bucket should succeed, got: {}", + resp.status() + ); + info!("PASS: DELETE bucket '{}' successful", bucket_name); + + // Test authentication failure + info!("Testing WebDAV: Authentication failure"); + let resp = client + .request(reqwest::Method::from_bytes(b"PROPFIND").unwrap(), &base_url) + .header("Authorization", "Basic aW52YWxpZDppbnZhbGlk") // invalid:invalid + .send() + .await?; + assert_eq!(resp.status().as_u16(), 401, "Invalid auth should return 401, got: {}", resp.status()); + info!("PASS: Authentication failure test successful"); + + info!("WebDAV core tests passed"); + Ok(()) + } + .await; + + // Always cleanup server process + let _ = server_process.kill().await; + let _ = server_process.wait().await; + + result +} diff --git a/crates/protocols/Cargo.toml b/crates/protocols/Cargo.toml index 3b13f16c..17cceca5 100644 --- a/crates/protocols/Cargo.toml +++ b/crates/protocols/Cargo.toml @@ -54,6 +54,7 @@ swift = [ "dep:base64", "dep:async-compression", ] +webdav = ["dep:dav-server", "dep:hyper", "dep:hyper-util", "dep:http-body-util", "dep:tokio-rustls", "dep:base64", "dep:rustls"] [dependencies] # Core RustFS dependencies @@ -112,6 +113,12 @@ astral-tokio-tar = { workspace = true, optional = true } base64 = { workspace = true, optional = true } async-compression = { workspace = true, optional = true, features = ["tokio", "gzip", "bzip2"] } +# WebDAV specific dependencies (optional) +dav-server = { workspace = true, optional = true } +hyper = { workspace = true, optional = true } +hyper-util = { workspace = true, optional = true } +tokio-rustls = { workspace = true, optional = true } + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/crates/protocols/src/common/gateway.rs b/crates/protocols/src/common/gateway.rs index 12d2bff0..a9c4f9d6 100644 --- a/crates/protocols/src/common/gateway.rs +++ b/crates/protocols/src/common/gateway.rs @@ -182,6 +182,35 @@ pub fn is_operation_supported(protocol: super::session::Protocol, action: &S3Act S3Action::GetObjectAcl => false, S3Action::PutObjectAcl => false, }, + super::session::Protocol::WebDav => match action { + // Bucket operations + S3Action::CreateBucket => true, // MKCOL at root level + S3Action::DeleteBucket => true, // DELETE at root level + S3Action::ListBucket => true, // PROPFIND + S3Action::ListBuckets => true, // PROPFIND at root + S3Action::HeadBucket => true, // PROPFIND/HEAD + + // Object operations + S3Action::GetObject => true, // GET + S3Action::PutObject => true, // PUT + S3Action::DeleteObject => true, // DELETE + S3Action::HeadObject => true, // HEAD/PROPFIND + S3Action::CopyObject => false, // COPY (not implemented yet) + + // Multipart operations (not supported in WebDAV) + S3Action::CreateMultipartUpload => false, + S3Action::UploadPart => false, + S3Action::CompleteMultipartUpload => false, + S3Action::AbortMultipartUpload => false, + S3Action::ListMultipartUploads => false, + S3Action::ListParts => false, + + // ACL operations (not supported in WebDAV) + S3Action::GetBucketAcl => false, + S3Action::PutBucketAcl => false, + S3Action::GetObjectAcl => false, + S3Action::PutObjectAcl => false, + }, } } diff --git a/crates/protocols/src/common/session.rs b/crates/protocols/src/common/session.rs index 8aca7e27..670f88d3 100644 --- a/crates/protocols/src/common/session.rs +++ b/crates/protocols/src/common/session.rs @@ -21,6 +21,7 @@ use std::sync::Arc; pub enum Protocol { Ftps, Swift, + WebDav, } /// Protocol principal representing an authenticated user diff --git a/crates/protocols/src/constants.rs b/crates/protocols/src/constants.rs index 10ec093f..8d5e174a 100644 --- a/crates/protocols/src/constants.rs +++ b/crates/protocols/src/constants.rs @@ -46,6 +46,15 @@ pub mod ftps { pub const PASSIVE_PORTS_PART_COUNT: usize = 2; } +/// WebDAV constants +#[cfg(feature = "webdav")] +pub mod webdav { + /// Maximum body size (5GB) + pub const MAX_BODY_SIZE: u64 = 5 * 1024 * 1024 * 1024; + /// Default request timeout in seconds + pub const REQUEST_TIMEOUT_SECS: u64 = 300; +} + /// Default configuration values pub mod defaults { /// Default protocol addresses @@ -55,4 +64,8 @@ pub mod defaults { /// Default FTPS passive port range #[cfg(feature = "ftps")] pub const DEFAULT_FTPS_PASSIVE_PORTS: &str = "40000-50000"; + + /// Default WebDAV server address + #[cfg(feature = "webdav")] + pub const DEFAULT_WEBDAV_ADDRESS: &str = "0.0.0.0:8080"; } diff --git a/crates/protocols/src/lib.rs b/crates/protocols/src/lib.rs index 10eb43a7..18924ff9 100644 --- a/crates/protocols/src/lib.rs +++ b/crates/protocols/src/lib.rs @@ -23,6 +23,9 @@ pub mod ftps; #[cfg(feature = "swift")] pub mod swift; +#[cfg(feature = "webdav")] +pub mod webdav; + pub use common::session::Protocol; pub use common::{AuthorizationError, ProtocolPrincipal, S3Action, SessionContext, authorize_operation}; @@ -31,3 +34,6 @@ pub use ftps::{config::FtpsConfig, server::FtpsServer}; #[cfg(feature = "swift")] pub use swift::handler::SwiftService; + +#[cfg(feature = "webdav")] +pub use webdav::{config::WebDavConfig, server::WebDavServer}; diff --git a/crates/protocols/src/webdav/README.md b/crates/protocols/src/webdav/README.md new file mode 100644 index 00000000..5c9ef951 --- /dev/null +++ b/crates/protocols/src/webdav/README.md @@ -0,0 +1,191 @@ +# WebDAV Protocol Gateway for RustFS + +WebDAV (Web Distributed Authoring and Versioning) protocol implementation for RustFS, providing HTTP-based file access compatible with native OS file managers and WebDAV clients. + +## Features + +- HTTP/HTTPS WebDAV server with Basic authentication +- Full CRUD operations mapping to S3 storage backend +- Directory (bucket/prefix) creation and deletion +- File upload, download, and deletion +- Property queries (PROPFIND) for metadata +- TLS support with multi-certificate SNI +- Integration with RustFS IAM for access control + +### Supported WebDAV Methods + +| Method | Description | S3 Operation | +|--------|-------------|--------------| +| `PROPFIND` | List directory / Get metadata | ListObjects / HeadObject | +| `MKCOL` | Create directory | CreateBucket / PutObject (prefix) | +| `PUT` | Upload file | PutObject | +| `GET` | Download file | GetObject | +| `DELETE` | Delete file/directory | DeleteObject / DeleteBucket | +| `HEAD` | Get file metadata | HeadObject | + +### Not Yet Implemented + +| Method | Description | Status | +|--------|-------------|--------| +| `MOVE` | Move/rename file | Returns 501 Not Implemented | +| `COPY` | Copy file | Returns 501 Not Implemented | + +## Enable Feature + +**WebDAV is opt-in and must be explicitly enabled.** + +Build with WebDAV support: + +```bash +cargo build --features webdav +``` + +Or enable all protocol features: + +```bash +cargo build --features full +``` + +## Configuration + +Configure WebDAV via environment variables: + +| Variable | Description | Default | +|----------|-------------|---------| +| `RUSTFS_WEBDAV_ENABLE` | Enable WebDAV server | `false` | +| `RUSTFS_WEBDAV_ADDRESS` | Server bind address | `0.0.0.0:8080` | +| `RUSTFS_WEBDAV_TLS_ENABLED` | Enable TLS | `true` | +| `RUSTFS_WEBDAV_CERTS_DIR` | TLS certificate directory | - | +| `RUSTFS_WEBDAV_CA_FILE` | CA file for client verification | - | +| `RUSTFS_WEBDAV_MAX_BODY_SIZE` | Max upload size (bytes) | 5GB | +| `RUSTFS_WEBDAV_REQUEST_TIMEOUT` | Request timeout (seconds) | 300 | + +## Quick Start + +### Start Server + +```bash +RUSTFS_WEBDAV_ENABLE=true \ +RUSTFS_WEBDAV_ADDRESS=0.0.0.0:8080 \ +RUSTFS_WEBDAV_TLS_ENABLED=false \ +RUSTFS_ACCESS_KEY=rustfsadmin \ +RUSTFS_SECRET_KEY=rustfsadmin \ +./target/release/rustfs /path/to/data +``` + +### Test with curl + +```bash +# List root (buckets) +curl -u rustfsadmin:rustfsadmin -X PROPFIND http://127.0.0.1:8080/ -H "Depth: 1" + +# Create bucket +curl -u rustfsadmin:rustfsadmin -X MKCOL http://127.0.0.1:8080/mybucket/ + +# Upload file +curl -u rustfsadmin:rustfsadmin -T file.txt http://127.0.0.1:8080/mybucket/file.txt + +# Download file +curl -u rustfsadmin:rustfsadmin http://127.0.0.1:8080/mybucket/file.txt + +# Create subdirectory +curl -u rustfsadmin:rustfsadmin -X MKCOL http://127.0.0.1:8080/mybucket/subdir/ + +# Delete file +curl -u rustfsadmin:rustfsadmin -X DELETE http://127.0.0.1:8080/mybucket/file.txt + +# Delete bucket +curl -u rustfsadmin:rustfsadmin -X DELETE http://127.0.0.1:8080/mybucket/ +``` + +## Client Configuration + +### Linux (GNOME Files / Nautilus) + +1. Open Files application +2. Press `Ctrl+L` to show address bar +3. Enter: `dav://rustfsadmin:rustfsadmin@127.0.0.1:8080/` + +### macOS Finder + +1. Open Finder +2. Press `Cmd+K` (Connect to Server) +3. Enter: `http://rustfsadmin:rustfsadmin@127.0.0.1:8080/` + +### Windows Explorer + +1. Open This PC +2. Click "Map network drive" +3. Enter: `http://127.0.0.1:8080/` +4. Enter credentials when prompted + +### VSCode (WebDAV Extension) + +Install `jonpfote.webdav` extension and create `.code-workspace`: + +```json +{ + "folders": [ + { + "uri": "webdav://rustfs-local", + "name": "RustFS WebDAV" + } + ], + "settings": { + "jonpfote.webdav-folders": { + "rustfs-local": { + "host": "127.0.0.1:8080", + "ssl": false, + "authtype": "basic", + "username": "rustfsadmin", + "password": "rustfsadmin" + } + } + } +} +``` + +## Architecture + +``` +WebDAV Client (curl, Finder, Explorer, VSCode) + │ + ▼ HTTP/HTTPS +┌───────────────────┐ +│ WebDavServer │ ← Hyper HTTP server + Basic Auth +└─────────┬─────────┘ + │ +┌─────────▼─────────┐ +│ WebDavDriver │ ← DavFileSystem implementation +└─────────┬─────────┘ + │ +┌─────────▼─────────┐ +│ StorageBackend │ ← S3 API operations +└─────────┬─────────┘ + │ +┌─────────▼─────────┐ +│ ECStore │ ← Erasure coded storage +└───────────────────┘ +``` + +### Key Components + +- **config.rs** - WebDAV server configuration +- **server.rs** - Hyper HTTP server with TLS and Basic authentication +- **driver.rs** - DavFileSystem trait implementation mapping to S3 + +### Path Mapping + +WebDAV paths are mapped to S3 buckets and objects: + +``` +WebDAV Path S3 Mapping +/ → List all buckets +/mybucket/ → Bucket: mybucket +/mybucket/file.txt → Bucket: mybucket, Key: file.txt +/mybucket/dir/file.txt → Bucket: mybucket, Key: dir/file.txt +``` + +## License + +Apache License 2.0 diff --git a/crates/protocols/src/webdav/config.rs b/crates/protocols/src/webdav/config.rs new file mode 100644 index 00000000..c71922f5 --- /dev/null +++ b/crates/protocols/src/webdav/config.rs @@ -0,0 +1,103 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::net::SocketAddr; +use thiserror::Error; + +/// WebDAV server initialization error +#[derive(Debug, Error)] +pub enum WebDavInitError { + #[error("failed to bind address: {0}")] + Bind(#[from] std::io::Error), + #[error("server error: {0}")] + Server(String), + #[error("invalid WebDAV configuration: {0}")] + InvalidConfig(String), + #[error("TLS error: {0}")] + Tls(String), +} + +/// WebDAV server configuration +#[derive(Debug, Clone)] +pub struct WebDavConfig { + /// Server bind address + pub bind_addr: SocketAddr, + /// Whether TLS is enabled (default: true) + pub tls_enabled: bool, + /// Certificate directory path (supports multiple certificates) + pub cert_dir: Option, + /// CA certificate file path for client certificate verification + pub ca_file: Option, + /// Maximum request body size in bytes (default: 5GB) + pub max_body_size: u64, + /// Request timeout in seconds (default: 300) + pub request_timeout_secs: u64, +} + +impl WebDavConfig { + /// Default maximum body size (5GB) + pub const DEFAULT_MAX_BODY_SIZE: u64 = 5 * 1024 * 1024 * 1024; + /// Default request timeout (300 seconds) + pub const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 300; + + /// Validates the configuration + pub async fn validate(&self) -> Result<(), WebDavInitError> { + // Validate TLS configuration + if self.tls_enabled && self.cert_dir.is_none() { + return Err(WebDavInitError::InvalidConfig( + "TLS is enabled but certificate directory is missing".to_string(), + )); + } + + if let Some(path) = &self.cert_dir + && !tokio::fs::try_exists(path).await.unwrap_or(false) + { + return Err(WebDavInitError::InvalidConfig(format!("Certificate directory not found: {}", path))); + } + + // Validate CA file exists if specified + if let Some(path) = &self.ca_file + && !tokio::fs::try_exists(path).await.unwrap_or(false) + { + return Err(WebDavInitError::InvalidConfig(format!("CA file not found: {}", path))); + } + + // Validate max body size + if self.max_body_size == 0 { + return Err(WebDavInitError::InvalidConfig("max_body_size cannot be zero".to_string())); + } + + // Validate request timeout + if self.request_timeout_secs == 0 { + return Err(WebDavInitError::InvalidConfig("request_timeout_secs cannot be zero".to_string())); + } + + Ok(()) + } +} + +impl Default for WebDavConfig { + fn default() -> Self { + Self { + // Use direct construction instead of parse().unwrap() to avoid panic + bind_addr: SocketAddr::from(([0, 0, 0, 0], 8080)), + tls_enabled: true, + cert_dir: None, + ca_file: None, + max_body_size: Self::DEFAULT_MAX_BODY_SIZE, + request_timeout_secs: Self::DEFAULT_REQUEST_TIMEOUT_SECS, + } + } +} diff --git a/crates/protocols/src/webdav/driver.rs b/crates/protocols/src/webdav/driver.rs new file mode 100644 index 00000000..01dfd317 --- /dev/null +++ b/crates/protocols/src/webdav/driver.rs @@ -0,0 +1,1082 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::common::client::s3::StorageBackend as S3StorageBackend; +use crate::common::gateway::{S3Action, authorize_operation}; +use crate::common::session::SessionContext; +use bytes::Bytes; +use dav_server::davpath::DavPath; +use dav_server::fs::{ + DavDirEntry, DavFile, DavFileSystem, DavMetaData, FsError, FsFuture, FsResult, FsStream, OpenOptions, ReadDirMeta, +}; +use futures_util::{FutureExt, StreamExt, stream}; +use rustfs_utils::path; +use s3s::dto::*; +use std::fmt::Debug; +use std::io::SeekFrom; +use std::sync::Arc; +use std::time::SystemTime; +use tokio::sync::RwLock; +use tracing::{debug, error}; + +/// Convert s3s ETag enum to string +fn etag_to_string(etag: &ETag) -> String { + match etag { + ETag::Strong(s) => s.clone(), + ETag::Weak(s) => s.clone(), + } +} + +/// WebDAV metadata implementation +#[derive(Debug, Clone)] +pub struct WebDavMetaData { + /// File size in bytes + pub size: u64, + /// Modification time + pub modified: SystemTime, + /// Creation time + pub created: SystemTime, + /// Whether this is a directory + pub is_dir: bool, + /// ETag (optional) + pub etag: Option, + /// Content type (optional) + pub content_type: Option, +} + +impl DavMetaData for WebDavMetaData { + fn len(&self) -> u64 { + self.size + } + + fn modified(&self) -> FsResult { + Ok(self.modified) + } + + fn is_dir(&self) -> bool { + self.is_dir + } + + fn created(&self) -> FsResult { + Ok(self.created) + } + + fn etag(&self) -> Option { + self.etag.clone() + } +} + +/// WebDAV directory entry implementation +#[derive(Debug, Clone)] +pub struct WebDavDirEntry { + /// Entry name + pub name: String, + /// Entry metadata + pub metadata: WebDavMetaData, +} + +impl DavDirEntry for WebDavDirEntry { + fn name(&self) -> Vec { + self.name.as_bytes().to_vec() + } + + fn metadata(&self) -> FsFuture<'_, Box> { + let meta = self.metadata.clone(); + async move { Ok(Box::new(meta) as Box) }.boxed() + } +} + +/// WebDAV file implementation for reading/writing +pub struct WebDavFile +where + S: S3StorageBackend + Debug + Clone + Send + Sync + 'static, +{ + /// Storage backend + storage: S, + /// Session context for authorization + session_context: Arc, + /// Bucket name + bucket: String, + /// Object key + key: String, + /// Current position in file (using RwLock for interior mutability in async) + position: Arc>, + /// File size (known after metadata fetch) + size: Option, + /// Write buffer for accumulating data before upload + write_buffer: Arc>>, + /// Whether we're in write mode + is_write: bool, + /// Maximum body size for chunked transfers + max_body_size: u64, +} + +impl WebDavFile +where + S: S3StorageBackend + Debug + Clone + Send + Sync + 'static, +{ + /// Default maximum body size (5GB) + pub const DEFAULT_MAX_BODY_SIZE: u64 = 5 * 1024 * 1024 * 1024; + + pub fn new(storage: S, session_context: Arc, bucket: String, key: String, is_write: bool) -> Self { + Self::with_max_body_size(storage, session_context, bucket, key, is_write, Self::DEFAULT_MAX_BODY_SIZE) + } + + pub fn with_max_body_size( + storage: S, + session_context: Arc, + bucket: String, + key: String, + is_write: bool, + max_body_size: u64, + ) -> Self { + Self { + storage, + session_context, + bucket, + key, + position: Arc::new(RwLock::new(0)), + size: None, + write_buffer: Arc::new(RwLock::new(Vec::new())), + is_write, + max_body_size, + } + } +} + +impl Debug for WebDavFile +where + S: S3StorageBackend + Debug + Clone + Send + Sync + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WebDavFile") + .field("bucket", &self.bucket) + .field("key", &self.key) + .field("position", &"") + .finish() + } +} + +impl DavFile for WebDavFile +where + S: S3StorageBackend + Debug + Clone + Send + Sync + 'static, +{ + fn metadata(&mut self) -> FsFuture<'_, Box> { + let storage = self.storage.clone(); + let session_context = self.session_context.clone(); + let bucket = self.bucket.clone(); + let key = self.key.clone(); + + async move { + match storage + .head_object( + &bucket, + &key, + &session_context.principal.user_identity.credentials.access_key, + &session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + Ok(output) => { + let size = output.content_length.unwrap_or(0) as u64; + let modified = output + .last_modified + .map(|dt| { + let offset_dt: time::OffsetDateTime = dt.into(); + SystemTime::from(offset_dt) + }) + .unwrap_or_else(SystemTime::now); + + Ok(Box::new(WebDavMetaData { + size, + modified, + created: modified, + is_dir: false, + etag: output.e_tag.as_ref().map(etag_to_string), + content_type: output.content_type.map(|c| c.to_string()), + }) as Box) + } + Err(e) => { + error!("Failed to get file metadata for {}/{}: {}", bucket, key, e); + Err(FsError::NotFound) + } + } + } + .boxed() + } + + fn write_buf(&mut self, mut buf: Box) -> FsFuture<'_, ()> { + let write_buffer = self.write_buffer.clone(); + let max_body_size = self.max_body_size; + async move { + let mut buffer = write_buffer.write().await; + // Consume all chunks from the buffer, not just the first one + while buf.has_remaining() { + let chunk = buf.chunk(); + // Check size limit before extending + if buffer.len() as u64 + chunk.len() as u64 > max_body_size { + return Err(FsError::TooLarge); + } + buffer.extend_from_slice(chunk); + buf.advance(chunk.len()); + } + Ok(()) + } + .boxed() + } + + fn write_bytes(&mut self, buf: Bytes) -> FsFuture<'_, ()> { + let write_buffer = self.write_buffer.clone(); + let max_body_size = self.max_body_size; + async move { + let mut buffer = write_buffer.write().await; + // Check size limit before extending + if buffer.len() as u64 + buf.len() as u64 > max_body_size { + return Err(FsError::TooLarge); + } + buffer.extend_from_slice(&buf); + Ok(()) + } + .boxed() + } + + fn read_bytes(&mut self, count: usize) -> FsFuture<'_, Bytes> { + let storage = self.storage.clone(); + let session_context = self.session_context.clone(); + let bucket = self.bucket.clone(); + let key = self.key.clone(); + let position = self.position.clone(); + + async move { + let start_pos = *position.read().await; + match storage + .get_object_range( + &bucket, + &key, + &session_context.principal.user_identity.credentials.access_key, + &session_context.principal.user_identity.credentials.secret_key, + start_pos, + count as u64, + ) + .await + { + Ok(output) => { + if let Some(body) = output.body { + let mut data = Vec::new(); + let mut stream = body; + while let Some(chunk_result) = stream.next().await { + match chunk_result { + Ok(bytes) => data.extend_from_slice(&bytes), + Err(e) => { + error!("Error reading stream: {}", e); + return Err(FsError::GeneralFailure); + } + } + } + // Update position after successful read + let bytes_read = data.len() as u64; + *position.write().await = start_pos + bytes_read; + Ok(Bytes::from(data)) + } else { + Ok(Bytes::new()) + } + } + Err(e) => { + error!("Failed to read bytes from {}/{}: {}", bucket, key, e); + Err(FsError::GeneralFailure) + } + } + } + .boxed() + } + + fn seek(&mut self, pos: SeekFrom) -> FsFuture<'_, u64> { + let position = self.position.clone(); + let size = self.size; + + async move { + let current_pos = *position.read().await; + let new_pos = match pos { + SeekFrom::Start(offset) => offset, + SeekFrom::End(offset) => { + let file_size = size.unwrap_or(0); + if offset < 0 { + file_size.saturating_sub((-offset) as u64) + } else { + file_size + offset as u64 + } + } + SeekFrom::Current(offset) => { + if offset < 0 { + current_pos.saturating_sub((-offset) as u64) + } else { + current_pos + offset as u64 + } + } + }; + // Persist the new position + *position.write().await = new_pos; + Ok(new_pos) + } + .boxed() + } + + fn flush(&mut self) -> FsFuture<'_, ()> { + let storage = self.storage.clone(); + let session_context = self.session_context.clone(); + let bucket = self.bucket.clone(); + let key = self.key.clone(); + let write_buffer = self.write_buffer.clone(); + let is_write = self.is_write; + + async move { + if !is_write { + return Ok(()); + } + + // Use write lock and std::mem::take to avoid cloning the buffer + let mut buffer = write_buffer.write().await; + let file_size = buffer.len(); + let data_bytes = Bytes::from(std::mem::take(&mut *buffer)); + drop(buffer); + + let stream = stream::once(async move { Ok::(data_bytes) }); + let streaming_blob = StreamingBlob::wrap(stream); + + let put_input = PutObjectInput::builder() + .bucket(bucket.clone()) + .key(key.clone()) + .content_length(Some(file_size as i64)) + .body(Some(streaming_blob)) + .build() + .map_err(|_| FsError::GeneralFailure)?; + + match storage + .put_object( + put_input, + &session_context.principal.user_identity.credentials.access_key, + &session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + Ok(_) => { + debug!("Successfully flushed {} bytes to {}/{}", file_size, bucket, key); + // Buffer already cleared by std::mem::take above + Ok(()) + } + Err(e) => { + error!("Failed to flush to {}/{}: {}", bucket, key, e); + Err(FsError::GeneralFailure) + } + } + } + .boxed() + } +} + +/// WebDAV filesystem driver implementation +pub struct WebDavDriver +where + S: S3StorageBackend + Debug + Clone + Send + Sync + 'static, +{ + /// Storage backend for S3 operations + storage: S, + /// Session context for authorization + session_context: Arc, +} + +impl Debug for WebDavDriver +where + S: S3StorageBackend + Debug + Clone + Send + Sync + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WebDavDriver").field("storage", &"StorageBackend").finish() + } +} + +impl Clone for WebDavDriver +where + S: S3StorageBackend + Debug + Clone + Send + Sync + 'static, +{ + fn clone(&self) -> Self { + Self { + storage: self.storage.clone(), + session_context: self.session_context.clone(), + } + } +} + +impl WebDavDriver +where + S: S3StorageBackend + Debug + Clone + Send + Sync + 'static, +{ + /// Create a new WebDAV driver with the given storage backend and session context + pub fn new(storage: S, session_context: Arc) -> Self { + Self { + storage, + session_context, + } + } + + /// Parse WebDAV path to bucket and object key + fn parse_path(&self, path: &DavPath) -> Result<(String, Option), FsError> { + let path_str = path.as_url_string(); + let cleaned_path = path::clean(&path_str); + let (bucket, object) = path::path_to_bucket_object(&cleaned_path); + + if bucket.is_empty() { + return Ok((String::new(), None)); + } + + let key = if object.is_empty() { None } else { Some(object) }; + Ok((bucket, key)) + } + + /// Check if path is root + fn is_root(&self, path: &DavPath) -> bool { + let path_str = path.as_url_string(); + path_str == "/" || path_str.is_empty() + } + + /// List all buckets (for root path) + async fn list_buckets(&self) -> FsResult> { + match authorize_operation(&self.session_context, &S3Action::ListBuckets, "", None).await { + Ok(_) => {} + Err(_e) => { + return Err(FsError::Forbidden); + } + } + + match self + .storage + .list_buckets( + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + Ok(output) => { + let mut entries = Vec::new(); + if let Some(buckets) = output.buckets { + for bucket in buckets { + if let Some(ref bucket_name) = bucket.name { + let modified = bucket + .creation_date + .map(|dt| { + let offset_dt: time::OffsetDateTime = dt.into(); + SystemTime::from(offset_dt) + }) + .unwrap_or_else(SystemTime::now); + + entries.push(WebDavDirEntry { + name: bucket_name.clone(), + metadata: WebDavMetaData { + size: 0, + modified, + created: modified, + is_dir: true, + etag: None, + content_type: None, + }, + }); + } + } + } + Ok(entries) + } + Err(e) => { + error!("Failed to list buckets: {}", e); + Err(FsError::GeneralFailure) + } + } + } + + /// List objects in a bucket + async fn list_objects(&self, bucket: &str, prefix: Option<&str>) -> FsResult> { + // Authorize the operation + authorize_operation(&self.session_context, &S3Action::ListBucket, bucket, prefix) + .await + .map_err(|_| FsError::Forbidden)?; + + let prefix_with_slash = prefix.map(|p| if p.ends_with('/') { p.to_string() } else { format!("{}/", p) }); + + let list_input = ListObjectsV2Input::builder() + .bucket(bucket.to_string()) + .prefix(prefix_with_slash.clone()) + .delimiter(Some("/".to_string())) + .build() + .map_err(|_| FsError::GeneralFailure)?; + + match self + .storage + .list_objects_v2( + list_input, + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + Ok(output) => { + let mut entries = Vec::new(); + + // Add files (objects) + if let Some(objects) = output.contents { + for obj in objects { + if let Some(key) = obj.key { + // Filter: only show files directly in current directory + let should_show = if prefix.is_none() { + !key.contains('/') + } else { + key.starts_with(&prefix_with_slash.clone().unwrap_or_default()) + }; + + if !should_show { + continue; + } + + let filename = std::path::PathBuf::from(key.as_str()) + .file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| key.clone()); + + let size = obj.size.unwrap_or(0) as u64; + let modified = obj + .last_modified + .map(|dt| { + let offset_dt: time::OffsetDateTime = dt.into(); + SystemTime::from(offset_dt) + }) + .unwrap_or_else(SystemTime::now); + + entries.push(WebDavDirEntry { + name: filename, + metadata: WebDavMetaData { + size, + modified, + created: modified, + is_dir: false, + etag: obj.e_tag.as_ref().map(etag_to_string), + content_type: None, + }, + }); + } + } + } + + // Add directories (common prefixes) + if let Some(common_prefixes) = output.common_prefixes { + for prefix in common_prefixes { + if let Some(prefix_str) = prefix.prefix { + let dir_name = std::path::PathBuf::from(prefix_str.as_str().trim_end_matches('/')) + .file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| prefix_str.clone()); + + entries.push(WebDavDirEntry { + name: dir_name, + metadata: WebDavMetaData { + size: 0, + modified: SystemTime::now(), + created: SystemTime::now(), + is_dir: true, + etag: None, + content_type: None, + }, + }); + } + } + } + + Ok(entries) + } + Err(e) => { + error!("Failed to list objects in {}: {}", bucket, e); + Err(FsError::GeneralFailure) + } + } + } + + /// Recursively delete all objects in a bucket, then delete the bucket itself + async fn delete_bucket_recursively(&self, bucket: &str) -> FsResult<()> { + // First, delete all objects in the bucket (with pagination) + let mut continuation_token = None; + loop { + let mut list_input = ListObjectsV2Input::builder().bucket(bucket.to_string()); + + if let Some(token) = continuation_token { + list_input = list_input.continuation_token(token); + } + + let list_input = list_input.build().map_err(|_| FsError::GeneralFailure)?; + + if let Ok(output) = self + .storage + .list_objects_v2( + list_input, + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + // Delete all objects in this page + if let Some(objects) = output.contents { + for obj in objects { + if let Some(obj_key) = obj.key { + let _ = self + .storage + .delete_object( + bucket, + &obj_key, + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await; + } + } + } + + // Check if there are more objects + if !output.is_truncated.unwrap_or(false) { + break; + } + continuation_token = Some(output.next_continuation_token); + } else { + break; + } + } + + // Then delete the bucket + match self + .storage + .delete_bucket( + bucket, + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + Ok(_) => Ok(()), + Err(e) if e.to_string().contains("NoSuchBucket") => Ok(()), + Err(e) => { + error!("Failed to delete bucket '{}': {}", bucket, e); + Err(FsError::GeneralFailure) + } + } + } +} + +impl DavFileSystem for WebDavDriver +where + S: S3StorageBackend + Debug + Clone + Send + Sync + 'static, +{ + fn open<'a>(&'a self, path: &'a DavPath, options: OpenOptions) -> FsFuture<'a, Box> { + let storage = self.storage.clone(); + let session_context = self.session_context.clone(); + + async move { + let (bucket, key) = self.parse_path(path)?; + + if bucket.is_empty() { + return Err(FsError::Forbidden); + } + + let key = key.ok_or(FsError::Forbidden)?; // Cannot open a bucket as a file + + // Check authorization based on operation type + if options.write || options.create || options.create_new || options.append { + authorize_operation(&session_context, &S3Action::PutObject, &bucket, Some(&key)) + .await + .map_err(|_| FsError::Forbidden)?; + } else { + authorize_operation(&session_context, &S3Action::GetObject, &bucket, Some(&key)) + .await + .map_err(|_| FsError::Forbidden)?; + } + + let is_write = options.write || options.create || options.create_new || options.append; + let file = WebDavFile::new(storage, session_context, bucket, key, is_write); + + Ok(Box::new(file) as Box) + } + .boxed() + } + + fn read_dir<'a>(&'a self, path: &'a DavPath, _meta: ReadDirMeta) -> FsFuture<'a, FsStream>> { + async move { + let entries = if self.is_root(path) { + self.list_buckets().await? + } else { + let (bucket, prefix) = self.parse_path(path)?; + if bucket.is_empty() { + self.list_buckets().await? + } else { + self.list_objects(&bucket, prefix.as_deref()).await? + } + }; + + let stream = stream::iter(entries.into_iter().map(|e| Ok(Box::new(e) as Box))); + Ok(Box::pin(stream) as FsStream>) + } + .boxed() + } + + fn metadata<'a>(&'a self, path: &'a DavPath) -> FsFuture<'a, Box> { + async move { + if self.is_root(path) { + return Ok(Box::new(WebDavMetaData { + size: 0, + modified: SystemTime::now(), + created: SystemTime::now(), + is_dir: true, + etag: None, + content_type: None, + }) as Box); + } + + let (bucket, key) = self.parse_path(path)?; + + if bucket.is_empty() { + return Ok(Box::new(WebDavMetaData { + size: 0, + modified: SystemTime::now(), + created: SystemTime::now(), + is_dir: true, + etag: None, + content_type: None, + }) as Box); + } + + if let Some(key) = key { + // Get object metadata + match self + .storage + .head_object( + &bucket, + &key, + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + Ok(output) => { + let size = output.content_length.unwrap_or(0) as u64; + let modified = output + .last_modified + .map(|dt| { + let offset_dt: time::OffsetDateTime = dt.into(); + SystemTime::from(offset_dt) + }) + .unwrap_or_else(SystemTime::now); + + Ok(Box::new(WebDavMetaData { + size, + modified, + created: modified, + is_dir: false, + etag: output.e_tag.as_ref().map(etag_to_string), + content_type: output.content_type.map(|c| c.to_string()), + }) as Box) + } + Err(e) => { + // Check if it might be a "directory" (prefix) + let prefix = format!("{}/", key); + let list_input = ListObjectsV2Input::builder() + .bucket(bucket.clone()) + .prefix(Some(prefix)) + .max_keys(Some(1)) + .build() + .map_err(|_| FsError::GeneralFailure)?; + + match self + .storage + .list_objects_v2( + list_input, + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + Ok(output) => { + if output.contents.map(|c| !c.is_empty()).unwrap_or(false) + || output.common_prefixes.map(|c| !c.is_empty()).unwrap_or(false) + { + // It's a directory + Ok(Box::new(WebDavMetaData { + size: 0, + modified: SystemTime::now(), + created: SystemTime::now(), + is_dir: true, + etag: None, + content_type: None, + }) as Box) + } else { + debug!("Object not found: {}/{}: {}", bucket, key, e); + Err(FsError::NotFound) + } + } + Err(_) => { + debug!("Object not found: {}/{}: {}", bucket, key, e); + Err(FsError::NotFound) + } + } + } + } + } else { + // Get bucket metadata + match self + .storage + .head_bucket( + &bucket, + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + Ok(_) => Ok(Box::new(WebDavMetaData { + size: 0, + modified: SystemTime::now(), + created: SystemTime::now(), + is_dir: true, + etag: None, + content_type: None, + }) as Box), + Err(e) => { + debug!("Bucket not found: {}: {}", bucket, e); + Err(FsError::NotFound) + } + } + } + } + .boxed() + } + + fn create_dir<'a>(&'a self, path: &'a DavPath) -> FsFuture<'a, ()> { + async move { + let (bucket, key) = self.parse_path(path)?; + + if bucket.is_empty() { + return Err(FsError::Forbidden); + } + + if let Some(key_str) = key { + // Creating a "directory" in S3 by creating a zero-byte object with trailing slash + let dir_key = if key_str.ends_with('/') { + key_str.to_string() + } else { + format!("{}/", key_str) + }; + + authorize_operation(&self.session_context, &S3Action::PutObject, &bucket, Some(&dir_key)) + .await + .map_err(|_| FsError::Forbidden)?; + + // Create empty streaming blob for directory marker + let stream = futures_util::stream::once(async { Ok::(Bytes::new()) }); + let streaming_blob = s3s::dto::StreamingBlob::wrap(stream); + + let put_input = s3s::dto::PutObjectInput::builder() + .bucket(bucket.clone()) + .key(dir_key.clone()) + .content_length(Some(0)) + .content_type(Some("application/x-directory".to_string())) + .body(Some(streaming_blob)) + .build() + .map_err(|_| FsError::GeneralFailure)?; + + match self + .storage + .put_object( + put_input, + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + Ok(_) => { + debug!("Successfully created directory '{}' in bucket '{}'", dir_key, bucket); + return Ok(()); + } + Err(e) => { + error!("Failed to create directory '{}' in bucket '{}': {}", dir_key, bucket, e); + return Err(FsError::GeneralFailure); + } + } + } + + // Create bucket + authorize_operation(&self.session_context, &S3Action::CreateBucket, &bucket, None) + .await + .map_err(|_| FsError::Forbidden)?; + + match self + .storage + .create_bucket( + &bucket, + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + Ok(_) => { + debug!("Successfully created bucket '{}'", bucket); + Ok(()) + } + Err(e) => { + error!("Failed to create bucket '{}': {}", bucket, e); + Err(FsError::GeneralFailure) + } + } + } + .boxed() + } + + fn remove_dir<'a>(&'a self, path: &'a DavPath) -> FsFuture<'a, ()> { + async move { + let (bucket, key) = self.parse_path(path)?; + + if bucket.is_empty() { + return Err(FsError::Forbidden); + } + + if let Some(prefix) = key { + // Delete all objects with this prefix (subdirectory) + let prefix_with_slash = if prefix.ends_with('/') { + prefix.to_string() + } else { + format!("{}/", prefix) + }; + + authorize_operation(&self.session_context, &S3Action::DeleteObject, &bucket, Some(&prefix_with_slash)) + .await + .map_err(|_| FsError::Forbidden)?; + + // List and delete all objects with this prefix + let mut continuation_token = None; + loop { + let mut list_input = ListObjectsV2Input::builder() + .bucket(bucket.clone()) + .prefix(Some(prefix_with_slash.clone())); + + if let Some(token) = continuation_token { + list_input = list_input.continuation_token(token); + } + + let list_input = list_input.build().map_err(|_| FsError::GeneralFailure)?; + + if let Ok(output) = self + .storage + .list_objects_v2( + list_input, + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + if let Some(objects) = output.contents { + for obj in objects { + if let Some(obj_key) = obj.key { + let _ = self + .storage + .delete_object( + &bucket, + &obj_key, + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await; + } + } + } + + if !output.is_truncated.unwrap_or(false) { + break; + } + continuation_token = Some(output.next_continuation_token); + } else { + break; + } + } + + // Also delete the directory marker itself + let _ = self + .storage + .delete_object( + &bucket, + &prefix_with_slash, + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await; + + return Ok(()); + } + + // Delete bucket + authorize_operation(&self.session_context, &S3Action::DeleteBucket, &bucket, None) + .await + .map_err(|_| FsError::Forbidden)?; + + self.delete_bucket_recursively(&bucket).await + } + .boxed() + } + + fn remove_file<'a>(&'a self, path: &'a DavPath) -> FsFuture<'a, ()> { + async move { + let (bucket, key) = self.parse_path(path)?; + + if bucket.is_empty() { + return Err(FsError::Forbidden); + } + + let key = key.ok_or(FsError::Forbidden)?; + + // Authorize delete object + authorize_operation(&self.session_context, &S3Action::DeleteObject, &bucket, Some(&key)) + .await + .map_err(|_| FsError::Forbidden)?; + + match self + .storage + .delete_object( + &bucket, + &key, + &self.session_context.principal.user_identity.credentials.access_key, + &self.session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + Ok(_) => { + debug!("Successfully deleted object '{}/{}'", bucket, key); + Ok(()) + } + Err(e) => { + error!("Failed to delete object '{}/{}': {}", bucket, key, e); + Err(FsError::GeneralFailure) + } + } + } + .boxed() + } + + fn rename<'a>(&'a self, _from: &'a DavPath, _to: &'a DavPath) -> FsFuture<'a, ()> { + // S3 doesn't support native rename, would need copy + delete + async move { Err(FsError::NotImplemented) }.boxed() + } + + fn copy<'a>(&'a self, _from: &'a DavPath, _to: &'a DavPath) -> FsFuture<'a, ()> { + // Could implement using S3 CopyObject, but not required for basic WebDAV + async move { Err(FsError::NotImplemented) }.boxed() + } +} diff --git a/crates/protocols/src/webdav/mod.rs b/crates/protocols/src/webdav/mod.rs new file mode 100644 index 00000000..fb55c747 --- /dev/null +++ b/crates/protocols/src/webdav/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod config; +pub mod driver; +pub mod server; diff --git a/crates/protocols/src/webdav/server.rs b/crates/protocols/src/webdav/server.rs new file mode 100644 index 00000000..aaee1bc3 --- /dev/null +++ b/crates/protocols/src/webdav/server.rs @@ -0,0 +1,334 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::config::{WebDavConfig, WebDavInitError}; +use super::driver::WebDavDriver; +use crate::common::client::s3::StorageBackend; +use crate::common::session::{Protocol, ProtocolPrincipal, SessionContext}; +use bytes::Bytes; +use dav_server::DavHandler; +use dav_server::fakels::FakeLs; +use http_body_util::{BodyExt, Full}; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response, StatusCode}; +use hyper_util::rt::TokioIo; +use rustls::ServerConfig; +use std::convert::Infallible; +use std::net::IpAddr; +use std::sync::Arc; +use tokio::net::TcpListener; +use tokio::sync::broadcast; +use tokio_rustls::TlsAcceptor; +use tracing::{debug, error, info, warn}; + +/// WebDAV server implementation +pub struct WebDavServer +where + S: StorageBackend + Clone + Send + Sync + 'static + std::fmt::Debug, +{ + /// Server configuration + config: WebDavConfig, + /// S3 storage backend + storage: S, +} + +impl WebDavServer +where + S: StorageBackend + Clone + Send + Sync + 'static + std::fmt::Debug, +{ + /// Create a new WebDAV server + pub async fn new(config: WebDavConfig, storage: S) -> Result { + config.validate().await?; + Ok(Self { config, storage }) + } + + /// Start the WebDAV server + pub async fn start(&self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<(), WebDavInitError> { + info!("Initializing WebDAV server on {}", self.config.bind_addr); + + let listener = TcpListener::bind(self.config.bind_addr).await?; + info!("WebDAV server listening on {}", self.config.bind_addr); + + // Setup TLS if enabled + let tls_acceptor = if self.config.tls_enabled { + if let Some(cert_dir) = &self.config.cert_dir { + debug!("Enabling WebDAV TLS with certificates from: {}", cert_dir); + + let cert_key_pairs = rustfs_utils::load_all_certs_from_directory(cert_dir) + .map_err(|e| WebDavInitError::Tls(format!("Failed to load certificates: {}", e)))?; + + if cert_key_pairs.is_empty() { + return Err(WebDavInitError::InvalidConfig("No valid certificates found".into())); + } + + let resolver = rustfs_utils::create_multi_cert_resolver(cert_key_pairs) + .map_err(|e| WebDavInitError::Tls(format!("Failed to create certificate resolver: {}", e)))?; + + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + + let server_config = ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(Arc::new(resolver)); + + Some(TlsAcceptor::from(Arc::new(server_config))) + } else { + None + } + } else { + None + }; + + let storage = self.storage.clone(); + + loop { + tokio::select! { + accept_result = listener.accept() => { + match accept_result { + Ok((stream, addr)) => { + let storage = storage.clone(); + let tls_acceptor = tls_acceptor.clone(); + + let max_body_size = self.config.max_body_size; + tokio::spawn(async move { + let source_ip: IpAddr = addr.ip(); + + if let Some(acceptor) = tls_acceptor { + match acceptor.accept(stream).await { + Ok(tls_stream) => { + let io = TokioIo::new(tls_stream); + if let Err(e) = Self::handle_connection_impl(io, storage, source_ip, max_body_size).await { + debug!("Connection error: {}", e); + } + } + Err(e) => { + debug!("TLS handshake failed: {}", e); + } + } + } else { + let io = TokioIo::new(stream); + if let Err(e) = Self::handle_connection_impl(io, storage, source_ip, max_body_size).await { + debug!("Connection error: {}", e); + } + } + }); + } + Err(e) => { + error!("Failed to accept connection: {}", e); + } + } + } + _ = shutdown_rx.recv() => { + info!("WebDAV server received shutdown signal"); + break; + } + } + } + + info!("WebDAV server stopped"); + Ok(()) + } + + /// Handle a single connection with hyper-util TokioIo wrapper + async fn handle_connection_impl( + io: TokioIo, + storage: S, + source_ip: IpAddr, + max_body_size: u64, + ) -> Result<(), Box> + where + I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, + { + let service = service_fn(move |req: Request| { + let storage = storage.clone(); + async move { Self::handle_request(req, storage, source_ip, max_body_size).await } + }); + + http1::Builder::new().serve_connection(io, service).await?; + + Ok(()) + } + + /// Handle a single WebDAV request + async fn handle_request( + req: Request, + storage: S, + source_ip: IpAddr, + max_body_size: u64, + ) -> Result>, Infallible> { + // Check Content-Length against max_body_size before reading body + if let Some(content_length) = req.headers().get("content-length") + && let Ok(length_str) = content_length.to_str() + && let Ok(length) = length_str.parse::() + && length > max_body_size + { + warn!("Request body too large: {} > {}", length, max_body_size); + return Ok(error_response( + StatusCode::PAYLOAD_TOO_LARGE, + &format!("Request body too large. Maximum size is {} bytes", max_body_size), + )); + } + + // Extract authorization header + let auth_header = req.headers().get("authorization").and_then(|h| h.to_str().ok()); + + // Parse Basic auth credentials + let (access_key, secret_key) = match auth_header { + Some(auth) if auth.starts_with("Basic ") => { + let encoded = &auth[6..]; + match base64_decode(encoded) { + Ok(decoded) => { + let decoded_str = String::from_utf8_lossy(&decoded); + if let Some((user, pass)) = decoded_str.split_once(':') { + (user.to_string(), pass.to_string()) + } else { + return Ok(unauthorized_response()); + } + } + Err(_) => return Ok(unauthorized_response()), + } + } + _ => return Ok(unauthorized_response()), + }; + + // Authenticate user + let session_context = match Self::authenticate(&access_key, &secret_key, source_ip).await { + Ok(ctx) => ctx, + Err(_) => return Ok(unauthorized_response()), + }; + + // Create WebDAV driver with session context + let driver = WebDavDriver::new(storage, Arc::new(session_context)); + + // Build DAV handler with boxed filesystem + let dav_handler = DavHandler::builder() + .filesystem(Box::new(driver)) + .locksystem(FakeLs::new()) + .build_handler(); + + // Convert request body + let (parts, body) = req.into_parts(); + let body_bytes = match body.collect().await { + Ok(collected) => collected.to_bytes(), + Err(e) => { + error!("Failed to read request body: {}", e); + return Ok(error_response(StatusCode::BAD_REQUEST, "Failed to read request body")); + } + }; + + // Create request for dav-server using Bytes + let dav_req = Request::from_parts(parts, dav_server::body::Body::from(body_bytes)); + + // Handle the request + let dav_resp = dav_handler.handle(dav_req).await; + + // Convert response + let (parts, body) = dav_resp.into_parts(); + let body_bytes = match body.collect().await { + Ok(collected) => collected.to_bytes(), + Err(e) => { + error!("Failed to read response body: {}", e); + return Ok(error_response(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error")); + } + }; + + Ok(Response::from_parts(parts, Full::new(body_bytes))) + } + + /// Authenticate user against IAM system + async fn authenticate(access_key: &str, secret_key: &str, source_ip: IpAddr) -> Result { + use rustfs_credentials::Credentials as S3Credentials; + use rustfs_iam::get; + + // Access IAM system + let iam_sys = get().map_err(|e| { + error!("IAM system unavailable during WebDAV auth: {}", e); + WebDavInitError::Server("Internal authentication service unavailable".to_string()) + })?; + + let s3_creds = S3Credentials { + access_key: access_key.to_string(), + secret_key: secret_key.to_string(), + session_token: String::new(), + expiration: None, + status: String::new(), + parent_user: String::new(), + groups: None, + claims: None, + name: None, + description: None, + }; + + let (user_identity, is_valid) = iam_sys.check_key(&s3_creds.access_key).await.map_err(|e| { + error!("IAM check_key failed for {}: {}", access_key, e); + WebDavInitError::Server("Authentication verification failed".to_string()) + })?; + + if !is_valid { + warn!("WebDAV login failed: Invalid access key '{}'", access_key); + return Err(WebDavInitError::Server("Invalid credentials".to_string())); + } + + let identity = user_identity.ok_or_else(|| { + error!("User identity missing despite valid key for {}", access_key); + WebDavInitError::Server("User not found".to_string()) + })?; + + if !identity.credentials.secret_key.eq(&s3_creds.secret_key) { + warn!("WebDAV login failed: Invalid secret key for '{}'", access_key); + return Err(WebDavInitError::Server("Invalid credentials".to_string())); + } + + info!("WebDAV user '{}' authenticated successfully", access_key); + + Ok(SessionContext::new( + ProtocolPrincipal::new(Arc::new(identity)), + Protocol::WebDav, + source_ip, + )) + } + + /// Get server configuration + pub fn config(&self) -> &WebDavConfig { + &self.config + } + + /// Get storage backend + pub fn storage(&self) -> &S { + &self.storage + } +} + +/// Create unauthorized response with WWW-Authenticate header +fn unauthorized_response() -> Response> { + Response::builder() + .status(StatusCode::UNAUTHORIZED) + .header("WWW-Authenticate", "Basic realm=\"RustFS WebDAV\"") + .body(Full::new(Bytes::from("Unauthorized"))) + .unwrap_or_else(|_| Response::new(Full::new(Bytes::from("Unauthorized")))) +} + +/// Create error response +fn error_response(status: StatusCode, message: &str) -> Response> { + Response::builder() + .status(status) + .body(Full::new(Bytes::from(message.to_string()))) + .unwrap_or_else(|_| Response::new(Full::new(Bytes::from("Internal Server Error")))) +} + +/// Decode base64 string +fn base64_decode(encoded: &str) -> Result, ()> { + use base64::Engine; + base64::engine::general_purpose::STANDARD.decode(encoded).map_err(|_| ()) +} diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 452025ab..a3c44576 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -35,7 +35,8 @@ default = ["metrics"] metrics = [] ftps = ["rustfs-protocols/ftps"] swift = ["rustfs-protocols/swift"] -full = ["metrics", "ftps", "swift"] +webdav = ["rustfs-protocols/webdav"] +full = ["metrics", "ftps", "swift", "webdav"] [lints] workspace = true diff --git a/rustfs/src/init.rs b/rustfs/src/init.rs index add0814d..7d203ae8 100644 --- a/rustfs/src/init.rs +++ b/rustfs/src/init.rs @@ -508,3 +508,73 @@ pub async fn init_ftps_system() -> Result Result>, Box> +{ + { + use crate::protocols::ProtocolStorageClient; + use rustfs_config::{ + DEFAULT_WEBDAV_ADDRESS, ENV_WEBDAV_ADDRESS, ENV_WEBDAV_CA_FILE, ENV_WEBDAV_CERTS_DIR, ENV_WEBDAV_ENABLE, + ENV_WEBDAV_MAX_BODY_SIZE, ENV_WEBDAV_REQUEST_TIMEOUT, ENV_WEBDAV_TLS_ENABLED, + }; + use rustfs_protocols::{WebDavConfig, WebDavServer}; + + // Check if WebDAV is enabled + let webdav_enable = rustfs_utils::get_env_bool(ENV_WEBDAV_ENABLE, false); + if !webdav_enable { + debug!("WebDAV system is disabled"); + return Ok(None); + } + + // Parse WebDAV address + let webdav_address_str = rustfs_utils::get_env_str(ENV_WEBDAV_ADDRESS, DEFAULT_WEBDAV_ADDRESS); + let addr = rustfs_utils::net::parse_and_resolve_address(&webdav_address_str) + .map_err(|e| format!("Invalid WebDAV address '{webdav_address_str}': {e}"))?; + + // Get WebDAV configuration from environment variables + let tls_enabled = rustfs_utils::get_env_bool(ENV_WEBDAV_TLS_ENABLED, true); + let cert_dir = rustfs_utils::get_env_opt_str(ENV_WEBDAV_CERTS_DIR); + let ca_file = rustfs_utils::get_env_opt_str(ENV_WEBDAV_CA_FILE); + let max_body_size = rustfs_utils::get_env_u64(ENV_WEBDAV_MAX_BODY_SIZE, WebDavConfig::DEFAULT_MAX_BODY_SIZE); + let request_timeout_secs = + rustfs_utils::get_env_u64(ENV_WEBDAV_REQUEST_TIMEOUT, WebDavConfig::DEFAULT_REQUEST_TIMEOUT_SECS); + + // Create WebDAV configuration + let config = WebDavConfig { + bind_addr: addr, + tls_enabled, + cert_dir, + ca_file, + max_body_size, + request_timeout_secs, + }; + + // Create WebDAV server with protocol storage client + let fs = crate::storage::ecfs::FS::new(); + let storage_client = ProtocolStorageClient::new(fs); + let server: WebDavServer = WebDavServer::new(config, storage_client).await?; + + // Log server configuration + info!("WebDAV server configured on {}", server.config().bind_addr); + + // Start WebDAV server in background task with proper shutdown support + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); + + tokio::spawn(async move { + if let Err(e) = server.start(shutdown_rx).await { + error!("WebDAV server error: {}", e); + } + info!("WebDAV server shutdown completed"); + }); + + info!("WebDAV system initialized successfully"); + Ok(Some(shutdown_tx)) + } +} diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index f4e1921a..0f57f29e 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -21,7 +21,7 @@ mod error; mod init; mod license; mod profiling; -#[cfg(feature = "ftps")] +#[cfg(any(feature = "ftps", feature = "webdav"))] mod protocols; mod server; mod storage; @@ -37,6 +37,9 @@ use crate::init::{ #[cfg(feature = "ftps")] use crate::init::{init_ftp_system, init_ftps_system}; +#[cfg(feature = "webdav")] +use crate::init::init_webdav_system; + use crate::server::{ SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, init_cert, init_event_notifier, shutdown_event_notifier, start_audit_system, start_http_server, stop_audit_system, wait_for_shutdown, @@ -348,6 +351,26 @@ async fn run(config: config::Config) -> Result<()> { #[cfg(not(feature = "ftps"))] let ftps_shutdown_tx: Option> = None; + // Initialize WebDAV system if enabled + #[cfg(feature = "webdav")] + let webdav_shutdown_tx = match init_webdav_system().await { + Ok(Some(tx)) => { + info!("WebDAV system initialized successfully"); + Some(tx) + } + Ok(None) => { + info!("WebDAV system disabled"); + None + } + Err(e) => { + error!("Failed to initialize WebDAV system: {}", e); + return Err(Error::other(e)); + } + }; + + #[cfg(not(feature = "webdav"))] + let webdav_shutdown_tx: Option> = None; + // Initialize buffer profiling system init_buffer_profile_system(&config); @@ -471,6 +494,7 @@ async fn run(config: config::Config) -> Result<()> { console_shutdown_tx, ftp_shutdown_tx, ftps_shutdown_tx, + webdav_shutdown_tx, ctx.clone(), ) .await; @@ -483,6 +507,7 @@ async fn run(config: config::Config) -> Result<()> { console_shutdown_tx, ftp_shutdown_tx, ftps_shutdown_tx, + webdav_shutdown_tx, ctx.clone(), ) .await; @@ -500,6 +525,7 @@ async fn handle_shutdown( console_shutdown_tx: Option>, ftp_shutdown_tx: Option>, ftps_shutdown_tx: Option>, + webdav_shutdown_tx: Option>, ctx: CancellationToken, ) { ctx.cancel(); @@ -552,6 +578,15 @@ async fn handle_shutdown( let _ = ftps_shutdown_tx.send(()); } + // Shutdown WebDAV server + if let Some(webdav_shutdown_tx) = webdav_shutdown_tx { + info!( + target: "rustfs::main::handle_shutdown", + "Shutting down WebDAV server..." + ); + let _ = webdav_shutdown_tx.send(()); + } + // Stop the notification system info!( target: "rustfs::main::handle_shutdown",