Feat/ftps&sftp (#1308)

[feat] ftp / sftp
This commit is contained in:
yxrxy
2025-12-31 09:01:15 +08:00
committed by GitHub
parent 3c14947878
commit b8aa8214e2
38 changed files with 6162 additions and 57 deletions

0
.docker/observability/prometheus-data/.gitignore vendored Normal file → Executable file
View File

1478
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -266,6 +266,17 @@ opentelemetry_sdk = { version = "0.31.0" }
opentelemetry-semantic-conventions = { version = "0.31.0", features = ["semconv_experimental"] }
opentelemetry-stdout = { version = "0.31.0" }
# FTP and SFTP
libunftp = "0.21.0"
russh = "0.55.0"
russh-sftp = "2.1.1"
russh-keys = "0.49.2"
ssh-key = "0.7.0-rc.4"
ssh2 = "0.9"
suppaftp = { version = "7.0.7", features = ["async-std", "rustls", "native-tls"] }
rcgen = "0.14.6"
native-tls = "0.2.14"
# Performance Analysis and Memory Profiling
mimalloc = "0.1"
# Use tikv-jemallocator as memory allocator and enable performance analysis

View File

@@ -26,6 +26,7 @@ workspace = true
[dependencies]
rustfs-ecstore.workspace = true
rustfs-common.workspace = true
rustfs-iam.workspace = true
flatbuffers.workspace = true
futures.workspace = true
rustfs-lock.workspace = true
@@ -51,3 +52,10 @@ base64 = { workspace = true }
rand = { workspace = true }
chrono = { workspace = true }
md5 = { workspace = true }
ssh2.workspace = true
suppaftp.workspace = true
rcgen.workspace = true
anyhow.workspace = true
native-tls.workspace = true
rustls.workspace = true
rustls-pemfile.workspace = true

View File

@@ -34,8 +34,8 @@ use tracing::{error, info, warn};
use uuid::Uuid;
// Common constants for all E2E tests
pub const DEFAULT_ACCESS_KEY: &str = "minioadmin";
pub const DEFAULT_SECRET_KEY: &str = "minioadmin";
pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin";
pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";
pub const TEST_BUCKET: &str = "e2e-test-bucket";
pub fn workspace_root() -> PathBuf {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
@@ -165,7 +165,7 @@ impl RustFSTestEnvironment {
}
/// Find an available port for the test
async fn find_available_port() -> Result<u16, Box<dyn std::error::Error + Send + Sync>> {
pub async fn find_available_port() -> Result<u16, Box<dyn std::error::Error + Send + Sync>> {
use std::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0")?;
let port = listener.local_addr()?.port();

View File

@@ -40,3 +40,6 @@ mod content_encoding_test;
// Policy variables tests
#[cfg(test)]
mod policy;
#[cfg(test)]
mod protocols;

View File

@@ -0,0 +1,44 @@
# Protocol E2E Tests
FTPS and SFTP protocol end-to-end tests for RustFS.
## Prerequisites
### Required Tools
```bash
# Ubuntu/Debian
sudo apt-get install sshpass ssh-keygen
# RHEL/CentOS
sudo yum install sshpass openssh-clients
# macOS
brew install sshpass openssh
```
## Running Tests
Run all protocol tests:
```bash
cargo test --package e2e_test test_protocol_core_suite -- --test-threads=1 --nocapture
```
Run only FTPS tests:
```bash
cargo test --package e2e_test test_ftps_core_operations -- --test-threads=1 --nocapture
```
## Test Coverage
### FTPS Tests
- mkdir bucket
- cd to bucket
- put file
- ls list objects
- cd . (stay in current directory)
- cd / (return to root)
- cd nonexistent bucket (should fail)
- delete object
- cdup
- rmdir delete bucket

View File

@@ -0,0 +1,211 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Core FTPS tests
use crate::common::rustfs_binary_path;
use crate::protocols::test_env::{DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY, ProtocolTestEnvironment};
use anyhow::Result;
use native_tls::TlsConnector;
use rcgen::generate_simple_self_signed;
use std::io::Cursor;
use std::path::PathBuf;
use suppaftp::NativeTlsConnector;
use suppaftp::NativeTlsFtpStream;
use tokio::process::Command;
use tracing::info;
// Fixed FTPS port for testing
const FTPS_PORT: u16 = 9021;
const FTPS_ADDRESS: &str = "127.0.0.1:9021";
/// Test FTPS: put, ls, mkdir, rmdir, delete operations
pub async fn test_ftps_core_operations() -> Result<()> {
let env = ProtocolTestEnvironment::new().map_err(|e| anyhow::anyhow!("{}", e))?;
// Generate and write certificate
let cert = generate_simple_self_signed(vec!["localhost".to_string(), "127.0.0.1".to_string()])?;
let cert_path = PathBuf::from(&env.temp_dir).join("ftps.crt");
let key_path = PathBuf::from(&env.temp_dir).join("ftps.key");
let cert_pem = cert.cert.pem();
let key_pem = cert.signing_key.serialize_pem();
tokio::fs::write(&cert_path, &cert_pem).await?;
tokio::fs::write(&key_path, &key_pem).await?;
// Start server manually
info!("Starting FTPS server on {}", FTPS_ADDRESS);
let binary_path = rustfs_binary_path();
let mut server_process = Command::new(&binary_path)
.args([
"--ftps-enable",
"--ftps-address",
FTPS_ADDRESS,
"--ftps-certs-file",
cert_path.to_str().unwrap(),
"--ftps-key-file",
key_path.to_str().unwrap(),
&env.temp_dir,
])
.spawn()?;
// Ensure server is cleaned up even on failure
let result = async {
// Wait for server to be ready
ProtocolTestEnvironment::wait_for_port_ready(FTPS_PORT, 30)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
// Create native TLS connector that accepts the certificate
let tls_connector = TlsConnector::builder().danger_accept_invalid_certs(true).build()?;
// Wrap in suppaftp's NativeTlsConnector
let tls_connector = NativeTlsConnector::from(tls_connector);
// Connect to FTPS server
let ftp_stream = NativeTlsFtpStream::connect(FTPS_ADDRESS).map_err(|e| anyhow::anyhow!("Failed to connect: {}", e))?;
// Upgrade to secure connection
let mut ftp_stream = ftp_stream
.into_secure(tls_connector, "127.0.0.1")
.map_err(|e| anyhow::anyhow!("Failed to upgrade to TLS: {}", e))?;
ftp_stream.login(DEFAULT_ACCESS_KEY, DEFAULT_SECRET_KEY)?;
info!("Testing FTPS: mkdir bucket");
let bucket_name = "testbucket";
ftp_stream.mkdir(bucket_name)?;
info!("PASS: mkdir bucket '{}' successful", bucket_name);
info!("Testing FTPS: cd to bucket");
ftp_stream.cwd(bucket_name)?;
info!("PASS: cd to bucket '{}' successful", bucket_name);
info!("Testing FTPS: put file");
let filename = "test.txt";
let content = "Hello, FTPS!";
ftp_stream.put_file(filename, &mut Cursor::new(content.as_bytes()))?;
info!("PASS: put file '{}' ({} bytes) successful", filename, content.len());
info!("Testing FTPS: ls list objects in bucket");
let list = ftp_stream.list(None)?;
assert!(list.iter().any(|line| line.contains(filename)), "File should appear in list");
info!("PASS: ls command successful, file '{}' found in bucket", filename);
info!("Testing FTPS: ls . (list current directory)");
let list_dot = ftp_stream.list(Some(".")).unwrap_or_else(|_| ftp_stream.list(None).unwrap());
assert!(list_dot.iter().any(|line| line.contains(filename)), "File should appear in ls .");
info!("PASS: ls . successful, file '{}' found", filename);
info!("Testing FTPS: ls / (list root directory)");
let list_root = ftp_stream.list(Some("/")).unwrap();
assert!(list_root.iter().any(|line| line.contains(bucket_name)), "Bucket should appear in ls /");
assert!(!list_root.iter().any(|line| line.contains(filename)), "File should not appear in ls /");
info!(
"PASS: ls / successful, bucket '{}' found, file '{}' not found in root",
bucket_name, filename
);
info!("Testing FTPS: ls /. (list root directory with /.)");
let list_root_dot = ftp_stream
.list(Some("/."))
.unwrap_or_else(|_| ftp_stream.list(Some("/")).unwrap());
assert!(
list_root_dot.iter().any(|line| line.contains(bucket_name)),
"Bucket should appear in ls /."
);
info!("PASS: ls /. successful, bucket '{}' found", bucket_name);
info!("Testing FTPS: ls /bucket (list bucket by absolute path)");
let list_bucket = ftp_stream.list(Some(&format!("/{}", bucket_name))).unwrap();
assert!(list_bucket.iter().any(|line| line.contains(filename)), "File should appear in ls /bucket");
info!("PASS: ls /{} successful, file '{}' found", bucket_name, filename);
info!("Testing FTPS: cd . (stay in current directory)");
ftp_stream.cwd(".")?;
info!("PASS: cd . successful (stays in current directory)");
info!("Testing FTPS: ls after cd . (should still see file)");
let list_after_dot = ftp_stream.list(None)?;
assert!(
list_after_dot.iter().any(|line| line.contains(filename)),
"File should still appear in list after cd ."
);
info!("PASS: ls after cd . successful, file '{}' still found in bucket", filename);
info!("Testing FTPS: cd / (go to root directory)");
ftp_stream.cwd("/")?;
info!("PASS: cd / successful (back to root directory)");
info!("Testing FTPS: ls after cd / (should see bucket only)");
let root_list_after = ftp_stream.list(None)?;
assert!(
!root_list_after.iter().any(|line| line.contains(filename)),
"File should not appear in root ls"
);
assert!(
root_list_after.iter().any(|line| line.contains(bucket_name)),
"Bucket should appear in root ls"
);
info!("PASS: ls after cd / successful, file not in root, bucket '{}' found in root", bucket_name);
info!("Testing FTPS: cd back to bucket");
ftp_stream.cwd(bucket_name)?;
info!("PASS: cd back to bucket '{}' successful", bucket_name);
info!("Testing FTPS: delete object");
ftp_stream.rm(filename)?;
info!("PASS: delete object '{}' successful", filename);
info!("Testing FTPS: ls verify object deleted");
let list_after = ftp_stream.list(None)?;
assert!(!list_after.iter().any(|line| line.contains(filename)), "File should be deleted");
info!("PASS: ls after delete successful, file '{}' is not found", filename);
info!("Testing FTPS: cd up to root directory");
ftp_stream.cdup()?;
info!("PASS: cd up to root directory successful");
info!("Testing FTPS: cd to nonexistent bucket (should fail)");
let nonexistent_bucket = "nonexistent-bucket";
let cd_result = ftp_stream.cwd(nonexistent_bucket);
assert!(cd_result.is_err(), "cd to nonexistent bucket should fail");
info!("PASS: cd to nonexistent bucket '{}' failed as expected", nonexistent_bucket);
info!("Testing FTPS: ls verify bucket exists in root");
let root_list = ftp_stream.list(None)?;
assert!(root_list.iter().any(|line| line.contains(bucket_name)), "Bucket should exist in root");
info!("PASS: ls root successful, bucket '{}' found in root", bucket_name);
info!("Testing FTPS: rmdir delete bucket");
ftp_stream.rmdir(bucket_name)?;
info!("PASS: rmdir bucket '{}' successful", bucket_name);
info!("Testing FTPS: ls verify bucket deleted");
let root_list_after = ftp_stream.list(None)?;
assert!(!root_list_after.iter().any(|line| line.contains(bucket_name)), "Bucket should be deleted");
info!("PASS: ls root after delete successful, bucket '{}' is not found", bucket_name);
ftp_stream.quit()?;
info!("FTPS core tests passed");
Ok(())
}
.await;
// Always cleanup server process
let _ = server_process.kill().await;
let _ = server_process.wait().await;
result
}

View File

@@ -0,0 +1,19 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Protocol tests for FTPS and SFTP
pub mod ftps_core;
pub mod test_env;
pub mod test_runner;

View File

@@ -0,0 +1,72 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Protocol test environment for FTPS and SFTP
use std::net::TcpStream;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{info, warn};
/// Default credentials
pub const DEFAULT_ACCESS_KEY: &str = "rustfsadmin";
pub const DEFAULT_SECRET_KEY: &str = "rustfsadmin";
/// Custom test environment that doesn't automatically stop servers
pub struct ProtocolTestEnvironment {
pub temp_dir: String,
}
impl ProtocolTestEnvironment {
/// Create a new test environment
/// This environment won't stop any server when dropped
pub fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let temp_dir = format!("/tmp/rustfs_protocol_test_{}", uuid::Uuid::new_v4());
std::fs::create_dir_all(&temp_dir)?;
Ok(Self { temp_dir })
}
/// Wait for server to be ready
pub async fn wait_for_port_ready(port: u16, max_attempts: u32) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let address = format!("127.0.0.1:{}", port);
info!("Waiting for server to be ready on {}", address);
for i in 0..max_attempts {
if TcpStream::connect(&address).is_ok() {
info!("Server is ready after {} s", i + 1);
return Ok(());
}
if i == max_attempts - 1 {
return Err(format!("Server did not become ready within {} s", max_attempts).into());
}
sleep(Duration::from_secs(1)).await;
}
Ok(())
}
}
// Implement Drop trait that doesn't stop servers
impl Drop for ProtocolTestEnvironment {
fn drop(&mut self) {
// Clean up temp directory only, don't stop any server
if let Err(e) = std::fs::remove_dir_all(&self.temp_dir) {
warn!("Failed to clean up temp directory {}: {}", self.temp_dir, e);
}
}
}

View File

@@ -0,0 +1,171 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Protocol test runner
use crate::common::init_logging;
use crate::protocols::ftps_core::test_ftps_core_operations;
use std::time::Instant;
use tokio::time::{Duration, sleep};
use tracing::{error, info};
/// Test result
#[derive(Debug, Clone)]
pub struct TestResult {
pub test_name: String,
pub success: bool,
pub error_message: Option<String>,
}
impl TestResult {
pub fn success(test_name: String) -> Self {
Self {
test_name,
success: true,
error_message: None,
}
}
pub fn failure(test_name: String, error: String) -> Self {
Self {
test_name,
success: false,
error_message: Some(error),
}
}
}
/// Protocol test suite
pub struct ProtocolTestSuite {
tests: Vec<TestDefinition>,
}
#[derive(Debug, Clone)]
struct TestDefinition {
name: String,
}
impl ProtocolTestSuite {
/// Create default test suite
pub fn new() -> Self {
let tests = vec![
TestDefinition {
name: "test_ftps_core_operations".to_string(),
},
// TestDefinition { name: "test_sftp_core_operations".to_string() },
];
Self { tests }
}
/// Run test suite
pub async fn run_test_suite(&self) -> Vec<TestResult> {
init_logging();
info!("Starting Protocol test suite");
let start_time = Instant::now();
let mut results = Vec::new();
info!("Scheduled {} tests", self.tests.len());
// Run tests
for (i, test_def) in self.tests.iter().enumerate() {
let test_description = match test_def.name.as_str() {
"test_ftps_core_operations" => {
info!("=== Starting FTPS Module Test ===");
"FTPS core operations (put, ls, mkdir, rmdir, delete)"
}
"test_sftp_core_operations" => {
info!("=== Starting SFTP Module Test ===");
"SFTP core operations (put, ls, mkdir, rmdir, delete)"
}
_ => "",
};
info!("Test {}/{} - {}", i + 1, self.tests.len(), test_description);
info!("Running: {}", test_def.name);
let test_start = Instant::now();
let result = self.run_single_test(test_def).await;
let test_duration = test_start.elapsed();
match result {
Ok(_) => {
info!("Test passed: {} ({:.2}s)", test_def.name, test_duration.as_secs_f64());
results.push(TestResult::success(test_def.name.clone()));
}
Err(e) => {
error!("Test failed: {} ({:.2}s): {}", test_def.name, test_duration.as_secs_f64(), e);
results.push(TestResult::failure(test_def.name.clone(), e.to_string()));
}
}
// Delay between tests to avoid resource conflicts
if i < self.tests.len() - 1 {
sleep(Duration::from_secs(2)).await;
}
}
// Print summary
self.print_summary(&results, start_time.elapsed());
results
}
/// Run a single test
async fn run_single_test(&self, test_def: &TestDefinition) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match test_def.name.as_str() {
"test_ftps_core_operations" => test_ftps_core_operations().await.map_err(|e| e.into()),
// "test_sftp_core_operations" => test_sftp_core_operations().await.map_err(|e| e.into()),
_ => Err(format!("Test {} not implemented", test_def.name).into()),
}
}
/// Print test summary
fn print_summary(&self, results: &[TestResult], total_duration: Duration) {
info!("=== Test Suite Summary ===");
info!("Total duration: {:.2}s", total_duration.as_secs_f64());
info!("Total tests: {}", results.len());
let passed = results.iter().filter(|r| r.success).count();
let failed = results.len() - passed;
let success_rate = (passed as f64 / results.len() as f64) * 100.0;
info!("Passed: {} | Failed: {}", passed, failed);
info!("Success rate: {:.1}%", success_rate);
if failed > 0 {
error!("Failed tests:");
for result in results.iter().filter(|r| !r.success) {
error!(" - {}: {}", result.test_name, result.error_message.as_ref().unwrap());
}
}
}
}
/// Test suite
#[tokio::test]
async fn test_protocol_core_suite() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let suite = ProtocolTestSuite::new();
let results = suite.run_test_suite().await;
let failed = results.iter().filter(|r| !r.success).count();
if failed > 0 {
return Err(format!("Protocol tests failed: {failed} failures").into());
}
info!("All protocol tests passed");
Ok(())
}

View File

@@ -1258,6 +1258,28 @@ where
self.update_user_with_claims(access_key, u)
}
/// Add SSH public key for a user (for SFTP authentication)
pub async fn add_user_ssh_public_key(&self, access_key: &str, public_key: &str) -> Result<()> {
if access_key.is_empty() || public_key.is_empty() {
return Err(Error::InvalidArgument);
}
let users = self.cache.users.load();
let u = match users.get(access_key) {
Some(u) => u,
None => return Err(Error::NoSuchUser(access_key.to_string())),
};
let mut user_identity = u.clone();
user_identity.add_ssh_public_key(public_key);
self.api
.save_user_identity(access_key, UserType::Reg, user_identity.clone(), None)
.await?;
self.update_user_with_claims(access_key, user_identity)
}
pub async fn set_user_status(&self, access_key: &str, status: AccountStatus) -> Result<OffsetDateTime> {
if access_key.is_empty() {
return Err(Error::InvalidArgument);

View File

@@ -637,6 +637,19 @@ impl<T: Store> IamSys<T> {
self.store.update_user_secret_key(access_key, secret_key).await
}
/// Add SSH public key for a user (for SFTP authentication)
pub async fn add_user_ssh_public_key(&self, access_key: &str, public_key: &str) -> Result<()> {
if !is_access_key_valid(access_key) {
return Err(IamError::InvalidAccessKeyLength);
}
if public_key.is_empty() {
return Err(IamError::InvalidArgument);
}
self.store.add_user_ssh_public_key(access_key, public_key).await
}
pub async fn check_key(&self, access_key: &str) -> Result<(Option<UserIdentity>, bool)> {
if let Some(sys_cred) = get_global_action_cred() {
if sys_cred.access_key == access_key {

View File

@@ -18,6 +18,8 @@ pub use credentials::*;
use rustfs_credentials::Credentials;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use time::OffsetDateTime;
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
@@ -42,6 +44,25 @@ impl UserIdentity {
update_at: Some(OffsetDateTime::now_utc()),
}
}
/// Add an SSH public key to user identity for SFTP authentication
pub fn add_ssh_public_key(&mut self, public_key: &str) {
self.credentials
.claims
.get_or_insert_with(HashMap::new)
.insert("ssh_public_keys".to_string(), json!([public_key]));
}
/// Get all SSH public keys for user identity
pub fn get_ssh_public_keys(&self) -> Vec<String> {
self.credentials
.claims
.as_ref()
.and_then(|claims| claims.get("ssh_public_keys"))
.and_then(|keys| keys.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_str()).map(String::from).collect())
.unwrap_or_default()
}
}
impl From<Credentials> for UserIdentity {

View File

@@ -115,6 +115,8 @@ md5.workspace = true
mime_guess = { workspace = true }
moka = { workspace = true }
pin-project-lite.workspace = true
# rand = "0.8" is pinned due to dependency conflicts with workspace version
rand = "0.8"
rust-embed = { workspace = true, features = ["interpolate-folder-path"] }
s3s.workspace = true
shadow-rs = { workspace = true, features = ["build", "metadata"] }
@@ -129,6 +131,12 @@ zip = { workspace = true }
# Observability and Metrics
metrics = { workspace = true }
# FTP and SFTP Libraries
libunftp = { workspace = true }
russh = { workspace = true }
russh-sftp = { workspace = true }
ssh-key = { workspace = true }
[target.'cfg(any(target_os = "macos", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))'.dependencies]
sysctl = { workspace = true }

View File

@@ -91,10 +91,22 @@ pub enum AuthType {
StreamingUnsignedTrailer,
}
#[derive(Debug)]
pub struct IAMAuth {
simple_auth: SimpleAuth,
}
impl Clone for IAMAuth {
fn clone(&self) -> Self {
// Since SimpleAuth doesn't implement Clone, we create a new one
// This is a simplified implementation - in a real scenario, you might need
// to store the credentials separately to properly clone
Self {
simple_auth: SimpleAuth::new(),
}
}
}
impl IAMAuth {
pub fn new(ak: impl Into<String>, sk: impl Into<SecretKey>) -> Self {
let simple_auth = SimpleAuth::from_single(ak, sk);

View File

@@ -135,6 +135,47 @@ pub struct Opt {
/// Options: GeneralPurpose, AiTraining, DataAnalytics, WebWorkload, IndustrialIoT, SecureStorage
#[arg(long, default_value_t = String::from("GeneralPurpose"), env = "RUSTFS_BUFFER_PROFILE")]
pub buffer_profile: String,
/// Enable FTPS server
#[arg(long, default_value_t = false, env = "RUSTFS_FTPS_ENABLE")]
pub ftps_enable: bool,
/// FTPS server bind address
#[arg(long, default_value_t = String::from("0.0.0.0:21"), env = "RUSTFS_FTPS_ADDRESS")]
pub ftps_address: String,
/// FTPS server certificate file path
#[arg(long, env = "RUSTFS_FTPS_CERTS_FILE")]
pub ftps_certs_file: Option<String>,
/// FTPS server private key file path
#[arg(long, env = "RUSTFS_FTPS_KEY_FILE")]
pub ftps_key_file: Option<String>,
/// FTPS server passive ports range (e.g., "40000-50000")
#[arg(long, env = "RUSTFS_FTPS_PASSIVE_PORTS")]
pub ftps_passive_ports: Option<String>,
/// FTPS server external IP address for passive mode (auto-detected if not specified)
#[arg(long, env = "RUSTFS_FTPS_EXTERNAL_IP")]
pub ftps_external_ip: Option<String>,
/// Enable SFTP server
#[arg(long, default_value_t = false, env = "RUSTFS_SFTP_ENABLE")]
pub sftp_enable: bool,
/// SFTP server bind address
#[arg(long, default_value_t = String::from("0.0.0.0:22"), env = "RUSTFS_SFTP_ADDRESS")]
pub sftp_address: String,
/// SFTP server host key file path
#[arg(long, env = "RUSTFS_SFTP_HOST_KEY")]
pub sftp_host_key: Option<String>,
/// Path to authorized SSH public keys file for SFTP authentication
/// Each line should contain an OpenSSH public key: ssh-rsa AAAA... comment
#[arg(long, env = "RUSTFS_SFTP_AUTHORIZED_KEYS")]
pub sftp_authorized_keys: Option<String>,
}
// lazy_static::lazy_static! {

View File

@@ -309,3 +309,122 @@ pub(crate) fn init_buffer_profile_system(opt: &config::Opt) {
info!("Buffer profiling system initialized successfully");
}
}
/// Initialize the FTPS system
///
/// This function initializes the FTPS server if enabled in the configuration.
/// It sets up the FTPS server with the appropriate configuration and starts
/// the server in a background task.
///
/// MINIO CONSTRAINT: FTPS server MUST follow the same lifecycle management
/// as other services and MUST integrate with the global shutdown system.
#[instrument(skip_all)]
pub async fn init_ftp_system(
opt: &crate::config::Opt,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use crate::protocols::ftps::server::{FtpsConfig, FtpsServer};
use std::net::SocketAddr;
// Check if FTPS is enabled
if !opt.ftps_enable {
debug!("FTPS system is disabled");
return Ok(());
}
// Parse FTPS address
let addr: SocketAddr = opt
.ftps_address
.parse()
.map_err(|e| format!("Invalid FTPS address '{}': {}", opt.ftps_address, e))?;
// Create FTPS configuration
let config = FtpsConfig {
bind_addr: addr,
passive_ports: opt.ftps_passive_ports.clone(),
external_ip: opt.ftps_external_ip.clone(),
ftps_required: true,
cert_file: opt.ftps_certs_file.clone(),
key_file: opt.ftps_key_file.clone(),
};
// Create FTPS server
let server = FtpsServer::new(config).await?;
// Log server configuration
info!(
"FTPS server configured on {} with passive ports {:?}",
server.config().bind_addr,
server.config().passive_ports
);
// Start FTPS server in background task
let shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move {
if let Err(e) = server.start(shutdown_rx).await {
error!("FTPS server error: {}", e);
}
});
info!("FTPS system initialized successfully");
Ok(())
}
/// Initialize the SFTP system
///
/// This function initializes the SFTP server if enabled in the configuration.
/// It sets up the SFTP server with the appropriate configuration and starts
/// the server in a background task.
///
/// MINIO CONSTRAINT: SFTP server MUST follow the same lifecycle management
/// as other services and MUST integrate with the global shutdown system.
#[instrument(skip_all)]
pub async fn init_sftp_system(
opt: &config::Opt,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use crate::protocols::sftp::server::{SftpConfig, SftpServer};
use std::net::SocketAddr;
// Check if SFTP is enabled
if !opt.sftp_enable {
debug!("SFTP system is disabled");
return Ok(());
}
// Parse SFTP address
let addr: SocketAddr = opt
.sftp_address
.parse()
.map_err(|e| format!("Invalid SFTP address '{}': {}", opt.sftp_address, e))?;
// Create SFTP configuration
let config = SftpConfig {
bind_addr: addr,
require_key_auth: false, // TODO: Add key auth configuration
cert_file: None, // CA certificates for client certificate authentication
key_file: opt.sftp_host_key.clone(), // SFTP server host key
authorized_keys_file: opt.sftp_authorized_keys.clone(), // Pre-loaded authorized SSH public keys
};
// Create SFTP server
let server = SftpServer::new(config)?;
// Log server configuration
info!(
"SFTP server configured on {} with key auth requirement: {}",
server.config().bind_addr,
server.config().require_key_auth
);
// Start SFTP server in background task
let shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move {
if let Err(e) = server.start(shutdown_rx).await {
error!("SFTP server error: {}", e);
}
});
info!("SFTP system initialized successfully");
Ok(())
}

View File

@@ -19,6 +19,7 @@ mod error;
mod init;
mod license;
mod profiling;
mod protocols;
mod server;
mod storage;
mod update;
@@ -26,7 +27,8 @@ mod version;
// Ensure the correct path for parse_license is imported
use crate::init::{
add_bucket_notification_configuration, init_buffer_profile_system, init_kms_system, init_update_check, print_server_info,
add_bucket_notification_configuration, init_buffer_profile_system, init_ftp_system, init_kms_system, init_sftp_system,
init_update_check, print_server_info,
};
use crate::server::{
SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, init_cert, init_event_notifier, shutdown_event_notifier,
@@ -266,6 +268,19 @@ async fn run(opt: config::Opt) -> Result<()> {
// Initialize KMS system if enabled
init_kms_system(&opt).await?;
// Create a shutdown channel for FTP/SFTP services
let (ftp_sftp_shutdown_tx, _) = tokio::sync::broadcast::channel(1);
// Initialize FTP system if enabled
init_ftp_system(&opt, ftp_sftp_shutdown_tx.clone())
.await
.map_err(Error::other)?;
// Initialize SFTP system if enabled
init_sftp_system(&opt, ftp_sftp_shutdown_tx.clone())
.await
.map_err(Error::other)?;
// Initialize buffer profiling system
init_buffer_profile_system(&opt);
@@ -364,11 +379,11 @@ async fn run(opt: config::Opt) -> Result<()> {
match wait_for_shutdown().await {
#[cfg(unix)]
ShutdownSignal::CtrlC | ShutdownSignal::Sigint | ShutdownSignal::Sigterm => {
handle_shutdown(&state_manager, s3_shutdown_tx, console_shutdown_tx, ctx.clone()).await;
handle_shutdown(&state_manager, s3_shutdown_tx, console_shutdown_tx, ftp_sftp_shutdown_tx, ctx.clone()).await;
}
#[cfg(not(unix))]
ShutdownSignal::CtrlC => {
handle_shutdown(&state_manager, s3_shutdown_tx, console_shutdown_tx, ctx.clone()).await;
handle_shutdown(&state_manager, s3_shutdown_tx, console_shutdown_tx, ftp_sftp_shutdown_tx, ctx.clone()).await;
}
}
@@ -381,6 +396,7 @@ async fn handle_shutdown(
state_manager: &ServiceStateManager,
s3_shutdown_tx: Option<tokio::sync::broadcast::Sender<()>>,
console_shutdown_tx: Option<tokio::sync::broadcast::Sender<()>>,
ftp_sftp_shutdown_tx: tokio::sync::broadcast::Sender<()>,
ctx: CancellationToken,
) {
ctx.cancel();
@@ -447,6 +463,9 @@ async fn handle_shutdown(
// Wait for the worker thread to complete the cleaning work
tokio::time::sleep(SHUTDOWN_TIMEOUT).await;
// Send shutdown signal to FTP/SFTP services
let _ = ftp_sftp_shutdown_tx.send(());
// the last updated status is stopped
state_manager.update(ServiceState::Stopped);
info!(

View File

@@ -0,0 +1,159 @@
# RustFS Protocols
RustFS provides multiple protocol interfaces for accessing object storage, including:
- **FTPS** - File Transfer Protocol over TLS for traditional file transfers
- **SFTP** - SSH File Transfer Protocol for secure file transfers with key-based authentication
## Quick Start
### Enable Protocols on Startup
```bash
# Start RustFS with all protocols enabled
rustfs \
--address 0.0.0.0:9000 \
--access-key rustfsadmin \
--secret-key rustfsadmin \
--ftps-enable \
--ftps-address 0.0.0.0:21 \
--ftps-certs-file /path/to/cert.pem \
--ftps-key-file /path/to/key.pem \
--ftps-passive-ports "40000-41000" \
--sftp-enable \
--sftp-address 0.0.0.0:22 \
--sftp-host-key /path/to/host_key \
--sftp-authorized-keys /path/to/authorized_keys \
/data
```
## Protocol Details
### FTPS
- **Port**: 21
- **Protocol**: FTP over TLS (FTPS)
- **Authentication**: Access Key / Secret Key (same as S3)
- **Features**:
- File upload/download
- Directory listing
- File deletion
- Passive mode data connections
- **Limitations**:
- Cannot create/delete buckets (use S3 API)
- No file rename/copy operations
- No multipart upload
### SFTP
- **Port**: 22
- **Protocol**: SSH File Transfer Protocol
- **Authentication**:
- Password (Access Key / Secret Key)
- SSH Public Key (recommended)
- SSH Certificate (optional)
- **Features**:
- File upload/download
- Directory listing and manipulation
- File deletion
- Bucket creation/deletion via mkdir/rmdir
- **Limitations**:
- No file rename/copy operations
- No multipart upload
- No symlinks or file attributes modification
- **Documentation: [SFTP README](./sftp/README.md)**
## Architecture
```
┌─────────────────────────────────────────┐
│ RustFS Client │
│ (FTPS Client, SFTP Client) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ Protocol Gateway Layer │
│ - Action Mapping │
│ - Authorization (IAM Policy) │
│ - Operation Support Check │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ Internal S3 Client │
│ (ProtocolS3Client) │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ Storage Layer (ECStore) │
│ - Object Storage │
│ - Erasure Coding │
│ - Metadata Management │
└─────────────────────────────────────────┘
```
## Security
### Encryption
- **FTPS**: TLS 1.2/1.3 for all control and data connections
- **SFTP**: SSH2 protocol with modern cipher suites (Ed25519, RSA, ECDSA)
### Authentication
All protocols share the same IAM-based authentication system:
- **Access Key**: Username identifier
- **Secret Key**: Password/API key
- **SSH Public Keys**: For SFTP key-based authentication
- **IAM Policies**: Fine-grained access control
### Authorization
Unified authorization based on IAM policies:
- Supports `s3:*` action namespace
- Condition-based policies (IP, time, etc.)
- Bucket-level and object-level permissions
## Troubleshooting
### FTPS Connection Issues
```bash
# Check TLS certificate
openssl s_client -connect localhost:21 -starttls ftp
# Test with lftp
lftp -u username,password -e "set ssl:verify-certificate no; ls; bye" ftps://localhost
```
### SFTP Connection Issues
```bash
# Verbose SFTP connection
sftp -vvv -o StrictHostKeyChecking=no -o LogLevel=DEBUG3 user@localhost
# Check SSH host key
ssh-keygen -l -f /path/to/host_key
```
## Configuration Reference
### FTPS Configuration
| Option | Environment Variable | Description | Default |
|--------|---------------------|-------------|---------|
| `--ftps-enable` | `RUSTFS_FTPS_ENABLE` | Enable FTPS server | `false` |
| `--ftps-address` | `RUSTFS_FTPS_ADDRESS` | FTPS bind address | `0.0.0.0:21` |
| `--ftps-certs-file` | `RUSTFS_FTPS_CERTS_FILE` | TLS certificate file | - |
| `--ftps-key-file` | `RUSTFS_FTPS_KEY_FILE` | TLS private key file | - |
| `--ftps-passive-ports` | `RUSTFS_FTPS_PASSIVE_PORTS` | Passive port range | - |
| `--ftps-external-ip` | `RUSTFS_FTPS_EXTERNAL_IP` | External IP for NAT | - |
### SFTP Configuration
| Option | Environment Variable | Description | Default |
|--------|---------------------|-------------|---------|
| `--sftp-enable` | `RUSTFS_SFTP_ENABLE` | Enable SFTP server | `false` |
| `--sftp-address` | `RUSTFS_SFTP_ADDRESS` | SFTP bind address | `0.0.0.0:22` |
| `--sftp-host-key` | `RUSTFS_SFTP_HOST_KEY` | SSH host key file | - |
| `--sftp-authorized-keys` | `RUSTFS_SFTP_AUTHORIZED_KEYS` | Authorized keys file | - |
## See Also
- [FTPS README](./ftps/README.md) - Detailed FTPS usage
- [SFTP README](./sftp/README.md) - Detailed SFTP usage
- [RustFS Documentation](https://rustfs.com/docs/)
- [IAM Policy Reference](https://rustfs.com/docs/iam-policies)

View File

@@ -0,0 +1,15 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod s3;

View File

@@ -0,0 +1,281 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::storage::ecfs::FS;
use http::{HeaderMap, Method};
use rustfs_credentials;
use s3s::dto::*;
use s3s::{S3, S3Request, S3Result};
use tokio_stream::Stream;
use tracing::trace;
/// S3 client for internal protocol use
pub struct ProtocolS3Client {
/// FS instance for internal operations
fs: FS,
/// Access key for the client
access_key: String,
}
impl ProtocolS3Client {
/// Create a new protocol S3 client
pub fn new(fs: FS, access_key: String) -> Self {
Self { fs, access_key }
}
/// Get object - maps to S3 GetObject
pub async fn get_object(&self, input: GetObjectInput) -> S3Result<GetObjectOutput> {
trace!(
"Protocol S3 client GetObject request: bucket={}, key={:?}, access_key={}",
input.bucket, input.key, self.access_key
);
// Go through standard S3 API path
let uri: http::Uri = format!("/{}{}", input.bucket, input.key.as_str()).parse().unwrap_or_default();
let req = S3Request {
input,
method: Method::GET,
uri,
headers: HeaderMap::default(),
extensions: http::Extensions::default(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
};
let resp = self.fs.get_object(req).await?;
Ok(resp.output)
}
/// Put object - maps to S3 PutObject
pub async fn put_object(&self, input: PutObjectInput) -> S3Result<PutObjectOutput> {
trace!(
"Protocol S3 client PutObject request: bucket={}, key={:?}, access_key={}",
input.bucket, input.key, self.access_key
);
let uri: http::Uri = format!("/{}{}", input.bucket, input.key.as_str()).parse().unwrap_or_default();
// Set required headers for put operation
let mut headers = HeaderMap::default();
if let Some(ref body) = input.body {
let (lower, upper) = body.size_hint();
if let Some(len) = upper {
headers.insert("content-length", len.to_string().parse().unwrap());
} else if lower > 0 {
headers.insert("content-length", lower.to_string().parse().unwrap());
}
}
let req = S3Request {
input,
method: Method::PUT,
uri,
headers,
extensions: http::Extensions::default(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
};
let resp = self.fs.put_object(req).await?;
Ok(resp.output)
}
/// Delete object - maps to S3 DeleteObject
pub async fn delete_object(&self, input: DeleteObjectInput) -> S3Result<DeleteObjectOutput> {
trace!(
"Protocol S3 client DeleteObject request: bucket={}, key={:?}, access_key={}",
input.bucket, input.key, self.access_key
);
let uri: http::Uri = format!("/{}{}", input.bucket, input.key.as_str()).parse().unwrap_or_default();
let req = S3Request {
input,
method: Method::DELETE,
uri,
headers: HeaderMap::default(),
extensions: http::Extensions::default(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
};
let resp = self.fs.delete_object(req).await?;
Ok(resp.output)
}
/// Head object - maps to S3 HeadObject
pub async fn head_object(&self, input: HeadObjectInput) -> S3Result<HeadObjectOutput> {
trace!(
"Protocol S3 client HeadObject request: bucket={}, key={:?}, access_key={}",
input.bucket, input.key, self.access_key
);
let uri: http::Uri = format!("/{}{}", input.bucket, input.key.as_str()).parse().unwrap_or_default();
let req = S3Request {
input,
method: Method::HEAD,
uri,
headers: HeaderMap::default(),
extensions: http::Extensions::default(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
};
let resp = self.fs.head_object(req).await?;
Ok(resp.output)
}
/// Head bucket - maps to S3 HeadBucket
pub async fn head_bucket(&self, input: HeadBucketInput) -> S3Result<HeadBucketOutput> {
trace!(
"Protocol S3 client HeadBucket request: bucket={}, access_key={}",
input.bucket, self.access_key
);
let uri: http::Uri = format!("/{}", input.bucket).parse().unwrap_or_default();
let req = S3Request {
input,
method: Method::HEAD,
uri,
headers: HeaderMap::default(),
extensions: http::Extensions::default(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
};
let resp = self.fs.head_bucket(req).await?;
Ok(resp.output)
}
/// List objects v2 - maps to S3 ListObjectsV2
pub async fn list_objects_v2(&self, input: ListObjectsV2Input) -> S3Result<ListObjectsV2Output> {
trace!(
"Protocol S3 client ListObjectsV2 request: bucket={}, access_key={}",
input.bucket, self.access_key
);
let uri: http::Uri = format!("/{}?list-type=2", input.bucket).parse().unwrap_or_default();
let req = S3Request {
input,
method: Method::GET,
uri,
headers: HeaderMap::default(),
extensions: http::Extensions::default(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
};
let resp = self.fs.list_objects_v2(req).await?;
Ok(resp.output)
}
/// List buckets - maps to S3 ListBuckets
/// Note: This requires credentials and ReqInfo because list_buckets performs credential validation
pub async fn list_buckets(&self, input: ListBucketsInput, secret_key: &str) -> S3Result<ListBucketsOutput> {
trace!("Protocol S3 client ListBuckets request: access_key={}", self.access_key);
// Create proper credentials with the real secret key from authentication
let credentials = Some(s3s::auth::Credentials {
access_key: self.access_key.clone(),
secret_key: secret_key.to_string().into(),
});
// Check if user is the owner (admin)
let is_owner = if let Some(global_cred) = rustfs_credentials::get_global_action_cred() {
self.access_key == global_cred.access_key
} else {
false
};
// Create ReqInfo for authorization (required by list_buckets)
let mut extensions = http::Extensions::default();
extensions.insert(crate::storage::access::ReqInfo {
cred: Some(rustfs_credentials::Credentials {
access_key: self.access_key.clone(),
secret_key: secret_key.to_string(),
session_token: String::new(),
expiration: None,
status: String::new(),
parent_user: String::new(),
groups: None,
claims: None,
name: None,
description: None,
}),
is_owner,
bucket: None,
object: None,
version_id: None,
region: None,
});
let req = S3Request {
input,
method: Method::GET,
uri: http::Uri::from_static("/"),
headers: HeaderMap::default(),
extensions,
credentials,
region: None,
service: None,
trailing_headers: None,
};
let resp = self.fs.list_buckets(req).await?;
Ok(resp.output)
}
/// Create bucket - maps to S3 CreateBucket
pub async fn create_bucket(&self, input: CreateBucketInput) -> S3Result<CreateBucketOutput> {
trace!(
"Protocol S3 client CreateBucket request: bucket={:?}, access_key={}",
input.bucket, self.access_key
);
let bucket_str = input.bucket.as_str();
let uri: http::Uri = format!("/{}", bucket_str).parse().unwrap_or_default();
let req = S3Request {
input,
method: Method::PUT,
uri,
headers: HeaderMap::default(),
extensions: http::Extensions::default(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
};
let resp = self.fs.create_bucket(req).await?;
Ok(resp.output)
}
/// Delete bucket - maps to S3 DeleteBucket
pub async fn delete_bucket(&self, input: DeleteBucketInput) -> S3Result<DeleteBucketOutput> {
trace!(
"Protocol S3 client DeleteBucket request: bucket={}, access_key={}",
input.bucket, self.access_key
);
let uri: http::Uri = format!("/{}", input.bucket).parse().unwrap_or_default();
let req = S3Request {
input,
method: Method::DELETE,
uri,
headers: HeaderMap::default(),
extensions: http::Extensions::default(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
};
let resp = self.fs.delete_bucket(req).await?;
Ok(resp.output)
}
}

View File

@@ -0,0 +1,910 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! FTPS driver implementation
//!
//! This module provides the FTPS driver that integrates with libunftp
//! and translates FTP operations to S3 actions through the gateway.
use crate::protocols::client::s3::ProtocolS3Client;
use crate::protocols::gateway::action::S3Action;
use crate::protocols::gateway::adapter::is_operation_supported;
use crate::protocols::gateway::authorize::authorize_operation;
use crate::protocols::gateway::error::map_s3_error_to_ftps;
use crate::protocols::gateway::restrictions::{get_s3_equivalent_operation, is_ftp_feature_supported};
use crate::protocols::session::context::SessionContext;
use async_trait::async_trait;
use futures::stream;
use futures_util::TryStreamExt;
use libunftp::storage::{Error, ErrorKind, Fileinfo, Metadata, Result, StorageBackend};
use rustfs_utils::path;
use s3s::dto::StreamingBlob;
use s3s::dto::{GetObjectInput, PutObjectInput};
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use tokio::io::AsyncRead;
use tracing::{debug, error, info, trace};
/// FTPS storage driver implementation
#[derive(Debug)]
pub struct FtpsDriver {}
impl FtpsDriver {
/// Create a new FTPS driver
pub fn new() -> Self {
Self {}
}
/// Validate FTP feature support
fn validate_feature_support(&self, feature: &str) -> Result<()> {
if !is_ftp_feature_supported(feature) {
let error_msg = if let Some(s3_equivalent) = get_s3_equivalent_operation(feature) {
format!("Unsupported FTP feature: {}. S3 equivalent: {}", feature, s3_equivalent)
} else {
format!("Unsupported FTP feature: {}", feature)
};
error!("{}", error_msg);
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, error_msg));
}
Ok(())
}
/// Get SessionContext from User
fn get_session_context_from_user(&self, user: &super::server::FtpsUser) -> Result<SessionContext> {
Ok(user.session_context.clone())
}
/// Create ProtocolS3Client for the given user
fn create_s3_client_for_user(&self, user: &super::server::FtpsUser) -> Result<ProtocolS3Client> {
let session_context = &user.session_context;
let fs = crate::storage::ecfs::FS {};
let s3_client = ProtocolS3Client::new(fs, session_context.access_key().to_string());
Ok(s3_client)
}
/// List all buckets (for root path)
async fn list_buckets(
&self,
user: &super::server::FtpsUser,
session_context: &SessionContext,
) -> Result<Vec<Fileinfo<PathBuf, FtpsMetadata>>> {
let s3_client = self.create_s3_client_for_user(user)?;
let action = S3Action::ListBuckets;
if !is_operation_supported(crate::protocols::session::context::Protocol::Ftps, &action) {
error!("FTPS LIST - ListBuckets operation not supported for FTPS protocol");
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Operation not supported"));
}
// Authorize the operation
match authorize_operation(session_context, &action, "", None).await {
Ok(_) => debug!("FTPS LIST - ListBuckets authorization successful"),
Err(e) => {
error!("FTPS LIST - ListBuckets authorization failed: {}", e);
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"));
}
}
let mut list_result = Vec::new();
// List all buckets
let input = s3s::dto::ListBucketsInput::builder()
.build()
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Failed to build ListBucketsInput"))?;
// Get the real secret key from the authenticated user
let secret_key = &session_context.principal.user_identity.credentials.secret_key;
debug!(
"FTPS LIST - calling S3 list_buckets with access_key: {}",
session_context.principal.access_key()
);
match s3_client.list_buckets(input, secret_key).await {
Ok(output) => {
debug!(
"FTPS LIST - S3 list_buckets succeeded, buckets count: {:?}",
output.buckets.as_ref().map(|b| b.len()).unwrap_or(0)
);
if let Some(buckets) = output.buckets {
for bucket in buckets {
if let Some(ref bucket_name) = bucket.name {
debug!("FTPS LIST - found bucket: '{}'", bucket_name);
let metadata = FtpsMetadata {
size: 0,
is_directory: true,
modification_time: bucket
.creation_date
.map(|t| {
let offset_datetime: time::OffsetDateTime = t.into();
offset_datetime.unix_timestamp() as u64
})
.unwrap_or(0),
};
list_result.push(Fileinfo {
path: PathBuf::from(bucket_name),
metadata,
});
}
}
}
Ok(list_result)
}
Err(e) => {
error!("FTPS LIST - Failed to list buckets: {}", e);
let protocol_error = map_s3_error_to_ftps(&e);
Err(Error::new(ErrorKind::PermanentFileNotAvailable, protocol_error))
}
}
}
/// Create bucket
async fn create_bucket(&self, user: &super::server::FtpsUser, session_context: &SessionContext, bucket: &str) -> Result<()> {
let s3_client = self.create_s3_client_for_user(user)?;
let action = S3Action::CreateBucket;
if !is_operation_supported(crate::protocols::session::context::Protocol::Ftps, &action) {
error!("FTPS CREATE_BUCKET - operation not supported for FTPS protocol");
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Operation not supported"));
}
// Authorize the operation
match authorize_operation(session_context, &action, bucket, None).await {
Ok(_) => debug!("FTPS CREATE_BUCKET - authorization successful"),
Err(e) => {
error!("FTPS CREATE_BUCKET - authorization failed: {}", e);
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"));
}
}
// Create bucket
let mut input_builder = s3s::dto::CreateBucketInput::builder();
input_builder.set_bucket(bucket.to_string());
let input = input_builder
.build()
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Failed to build CreateBucketInput"))?;
match s3_client.create_bucket(input).await {
Ok(_) => {
debug!("FTPS CREATE_BUCKET - successfully created bucket: '{}'", bucket);
Ok(())
}
Err(e) => {
error!("FTPS CREATE_BUCKET - failed to create bucket: '{}', error: {}", bucket, e);
let protocol_error = map_s3_error_to_ftps(&e);
Err(Error::new(ErrorKind::PermanentFileNotAvailable, protocol_error))
}
}
}
/// Get bucket and key from path
fn parse_path(&self, path_str: &str) -> Result<(String, Option<String>)> {
debug!("FTPS parse_path - input: '{}'", path_str);
let (bucket, object) = path::path_to_bucket_object(path_str);
let key = if object.is_empty() { None } else { Some(object) };
debug!("FTPS parse_path - bucket: '{}', key: {:?}", bucket, key);
Ok((bucket, key))
}
}
#[async_trait]
impl StorageBackend<super::server::FtpsUser> for FtpsDriver {
type Metadata = FtpsMetadata;
/// Get file metadata
async fn metadata<P: AsRef<Path> + Send + Debug>(&self, user: &super::server::FtpsUser, path: P) -> Result<Self::Metadata> {
trace!("FTPS metadata request for path: {:?}", path);
let s3_client = self.create_s3_client_for_user(user)?;
let path_str = path.as_ref().to_string_lossy();
let (bucket, key) = self.parse_path(&path_str)?;
if let Some(object_key) = key {
// Object metadata request
let action = S3Action::HeadObject;
if !is_operation_supported(crate::protocols::session::context::Protocol::Ftps, &action) {
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Operation not supported"));
}
// Authorize the operation
let session_context = self.get_session_context_from_user(user)?;
// Log the operation for audit purposes
debug!(
"FTPS operation authorized: user={}, action={}, bucket={}, object={}, source_ip={}",
session_context.access_key(),
action.as_str(),
bucket,
object_key,
session_context.source_ip
);
authorize_operation(&session_context, &action, &bucket, Some(&object_key))
.await
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"))?;
let mut builder = s3s::dto::HeadObjectInput::builder();
builder.set_bucket(bucket.clone());
builder.set_key(object_key.clone());
let input = builder
.build()
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Failed to build HeadObjectInput"))?;
match s3_client.head_object(input).await {
Ok(output) => {
let metadata = FtpsMetadata {
size: output.content_length.unwrap_or(0) as u64,
is_directory: false,
modification_time: output
.last_modified
.map(|t| {
let offset_datetime: time::OffsetDateTime = t.into();
offset_datetime.unix_timestamp() as u64
})
.unwrap_or(0),
};
Ok(metadata)
}
Err(e) => {
error!("Failed to get object metadata: {}", e);
Err(map_s3_error_to_ftps(&e))
}
}
} else {
// Bucket metadata request
let action = S3Action::HeadBucket;
if !is_operation_supported(crate::protocols::session::context::Protocol::Ftps, &action) {
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Operation not supported"));
}
// Authorize the operation
let session_context = self.get_session_context_from_user(user)?;
authorize_operation(&session_context, &action, &bucket, None)
.await
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"))?;
let mut builder = s3s::dto::HeadBucketInput::builder();
builder.set_bucket(bucket.clone());
let input = builder
.build()
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Failed to build HeadBucketInput"))?;
match s3_client.head_bucket(input).await {
Ok(_) => {
let metadata = FtpsMetadata {
size: 0,
is_directory: true,
modification_time: 0,
};
Ok(metadata)
}
Err(e) => {
error!("Failed to get bucket metadata: {}", e);
Err(map_s3_error_to_ftps(&e))
}
}
}
}
/// Get directory listing
async fn list<P: AsRef<Path> + Send + Debug>(
&self,
user: &super::server::FtpsUser,
path: P,
) -> Result<Vec<Fileinfo<PathBuf, Self::Metadata>>> {
info!("FTPS LIST request - user: {}, raw path: {:?}", user.username, path);
let s3_client = self.create_s3_client_for_user(user)?;
let session_context = self.get_session_context_from_user(user)?;
let path_str = path.as_ref().to_string_lossy();
info!("FTPS LIST - parsing path: '{}'", path_str);
// Check if this is root path listing
if path_str == "/" || path_str == "/." {
debug!("FTPS LIST - root path listing (including /.), using ListBuckets");
return self.list_buckets(user, &session_context).await;
}
// Handle paths ending with /., e.g., /testbucket/.
// Remove trailing /. to get the actual path
let cleaned_path = if let Some(stripped) = path_str.strip_suffix("/.") {
info!("FTPS LIST - path ends with /., removing trailing /.");
stripped
} else {
&path_str
};
let (bucket, prefix) = self.parse_path(cleaned_path)?;
debug!("FTPS LIST - parsed bucket: '{}', prefix: {:?}", bucket, prefix);
// Validate feature support
self.validate_feature_support("LIST command")?;
let action = S3Action::ListBucket;
if !is_operation_supported(crate::protocols::session::context::Protocol::Ftps, &action) {
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Operation not supported"));
}
// Authorize the operation
debug!("FTPS LIST - authorizing operation for bucket: '{}', prefix: {:?}", bucket, prefix);
match authorize_operation(&session_context, &action, &bucket, prefix.as_deref()).await {
Ok(_) => debug!("FTPS LIST - authorization successful"),
Err(e) => {
error!("FTPS LIST - authorization failed: {}", e);
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"));
}
}
let mut list_result = Vec::new();
// List objects with prefix
let mut builder = s3s::dto::ListObjectsV2Input::builder();
builder.set_bucket(bucket.clone());
builder.set_prefix(prefix.clone());
builder.set_delimiter(Option::from("/".to_string()));
let input = builder
.build()
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Failed to build ListObjectsV2Input"))?;
match s3_client.list_objects_v2(input).await {
Ok(output) => {
// Add directories (common prefixes)
if let Some(common_prefixes) = output.common_prefixes {
for prefix_info in common_prefixes {
if let Some(key) = prefix_info.prefix {
let dir_name = key.trim_end_matches('/').to_string();
let metadata = FtpsMetadata {
size: 0,
is_directory: true,
modification_time: 0,
};
list_result.push(Fileinfo {
path: PathBuf::from(dir_name),
metadata,
});
}
}
}
// Add files (objects)
if let Some(contents) = output.contents {
for object in contents {
if let Some(key) = object.key {
let file_name = key;
let metadata = FtpsMetadata {
size: object.size.unwrap_or(0) as u64,
is_directory: false,
modification_time: object
.last_modified
.map(|t| {
let offset_datetime: time::OffsetDateTime = t.into();
offset_datetime.unix_timestamp() as u64
})
.unwrap_or(0),
};
list_result.push(Fileinfo {
path: PathBuf::from(file_name),
metadata,
});
}
}
}
Ok(list_result)
}
Err(e) => {
error!("Failed to list objects: {}", e);
let protocol_error = map_s3_error_to_ftps(&e);
Err(Error::new(ErrorKind::PermanentFileNotAvailable, protocol_error))
}
}
}
/// Get file
async fn get<P: AsRef<Path> + Send + Debug>(
&self,
user: &super::server::FtpsUser,
path: P,
start_pos: u64,
) -> Result<Box<dyn AsyncRead + Send + Sync + Unpin>> {
trace!("FTPS get request for path: {:?} at position: {}", path, start_pos);
let s3_client = self.create_s3_client_for_user(user)?;
let session_context = self.get_session_context_from_user(user)?;
let path_str = path.as_ref().to_string_lossy();
let (bucket, key) = self.parse_path(&path_str)?;
if key.is_none() {
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Cannot read bucket as file"));
}
let object_key = key.unwrap();
let action = S3Action::GetObject;
if !is_operation_supported(crate::protocols::session::context::Protocol::Ftps, &action) {
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Operation not supported"));
}
// Authorize the operation
authorize_operation(&session_context, &action, &bucket, Some(&object_key))
.await
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"))?;
let mut builder = GetObjectInput::builder();
builder.set_bucket(bucket);
builder.set_key(object_key);
let mut input = builder
.build()
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Failed to build GetObjectInput"))?;
if start_pos > 0 {
input.range = Some(
s3s::dto::Range::parse(&format!("bytes={}-", start_pos))
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Invalid range format"))?,
);
}
match s3_client.get_object(input).await {
Ok(output) => {
if let Some(body) = output.body {
// Map the s3s/Box<dyn StdError> error to std::io::Error
let stream = body.map_err(std::io::Error::other);
// Wrap the stream in StreamReader to make it a tokio::io::AsyncRead
let reader = tokio_util::io::StreamReader::new(stream);
Ok(Box::new(reader))
} else {
Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Empty object body"))
}
}
Err(e) => {
error!("Failed to get object: {}", e);
let protocol_error = map_s3_error_to_ftps(&e);
Err(Error::new(ErrorKind::PermanentFileNotAvailable, protocol_error))
}
}
}
/// Put file
async fn put<P: AsRef<Path> + Send + Debug, R: AsyncRead + Send + Sync + Unpin + 'static>(
&self,
user: &super::server::FtpsUser,
input: R,
path: P,
start_pos: u64,
) -> Result<u64> {
trace!("FTPS put request for path: {:?} at position: {}", path, start_pos);
let s3_client = self.create_s3_client_for_user(user)?;
let session_context = self.get_session_context_from_user(user)?;
let path_str = path.as_ref().to_string_lossy();
let (bucket, key) = self.parse_path(&path_str)?;
if key.is_none() {
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Cannot write to bucket directly"));
}
let object_key = key.unwrap();
// Check for append operation (not supported)
if start_pos > 0 {
self.validate_feature_support("APPE command (file append)")?;
}
let action = S3Action::PutObject;
if !is_operation_supported(crate::protocols::session::context::Protocol::Ftps, &action) {
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Operation not supported"));
}
// Authorize the operation
authorize_operation(&session_context, &action, &bucket, Some(&object_key))
.await
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"))?;
// Convert AsyncRead to bytes
let bytes_vec = {
let mut buffer = Vec::new();
let mut reader = input;
tokio::io::copy(&mut reader, &mut buffer)
.await
.map_err(|e| Error::new(ErrorKind::TransientFileNotAvailable, e.to_string()))?;
buffer
};
let file_size = bytes_vec.len();
let mut put_builder = PutObjectInput::builder();
put_builder.set_bucket(bucket.clone());
put_builder.set_key(object_key.clone());
put_builder.set_content_length(Some(file_size as i64));
// Create StreamingBlob with known size
let data_bytes = bytes::Bytes::from(bytes_vec);
let stream = stream::once(async move { Ok::<bytes::Bytes, std::io::Error>(data_bytes) });
let streaming_blob = StreamingBlob::wrap(stream);
put_builder.set_body(Some(streaming_blob));
let put_input = put_builder
.build()
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Failed to build PutObjectInput"))?;
match s3_client.put_object(put_input).await {
Ok(output) => {
debug!("Successfully put object: {:?}", output);
// Return the size of the uploaded object
Ok(file_size as u64)
}
Err(e) => {
error!("FTPS put - S3 error details: {:?}", e);
let protocol_error = map_s3_error_to_ftps(&e);
Err(Error::new(ErrorKind::PermanentFileNotAvailable, protocol_error))
}
}
}
/// Delete file
async fn del<P: AsRef<Path> + Send + Debug>(&self, user: &super::server::FtpsUser, path: P) -> Result<()> {
trace!("FTPS delete request for path: {:?}", path);
let s3_client = self.create_s3_client_for_user(user)?;
let session_context = self.get_session_context_from_user(user)?;
let path_str = path.as_ref().to_string_lossy();
let (bucket, key) = self.parse_path(&path_str)?;
if key.is_none() {
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Cannot delete bucket"));
}
let object_key = key.unwrap();
let action = S3Action::DeleteObject;
if !is_operation_supported(crate::protocols::session::context::Protocol::Ftps, &action) {
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Operation not supported"));
}
// Authorize the operation
authorize_operation(&session_context, &action, &bucket, Some(&object_key))
.await
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"))?;
let mut builder = s3s::dto::DeleteObjectInput::builder();
builder.set_bucket(bucket);
builder.set_key(object_key);
let input = builder
.build()
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Failed to build DeleteObjectInput"))?;
match s3_client.delete_object(input).await {
Ok(_) => {
debug!("Successfully deleted object");
Ok(())
}
Err(e) => {
error!("Failed to delete object: {}", e);
let protocol_error = map_s3_error_to_ftps(&e);
Err(Error::new(ErrorKind::PermanentFileNotAvailable, protocol_error))
}
}
}
/// Create directory
async fn mkd<P: AsRef<Path> + Send + Debug>(&self, user: &super::server::FtpsUser, path: P) -> Result<()> {
let s3_client = self.create_s3_client_for_user(user)?;
let session_context = self.get_session_context_from_user(user)?;
let path_str = path.as_ref().to_string_lossy();
let (bucket, key) = self.parse_path(&path_str)?;
let dir_key = if let Some(k) = key {
// Creating directory inside bucket
path::retain_slash(&k)
} else {
// Creating bucket - use CreateBucket action instead of PutObject
debug!("FTPS MKDIR - Creating bucket: '{}'", bucket);
return self.create_bucket(user, &session_context, &bucket).await;
};
let action = S3Action::PutObject;
if !is_operation_supported(crate::protocols::session::context::Protocol::Ftps, &action) {
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Operation not supported"));
}
// Authorize the operation
authorize_operation(&session_context, &action, &bucket, Some(&dir_key))
.await
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"))?;
// Create directory marker object
let mut input_builder = PutObjectInput::builder();
input_builder.set_bucket(bucket);
input_builder.set_key(dir_key);
input_builder.set_body(Some(StreamingBlob::from(s3s::Body::from(Vec::new()))));
let input = input_builder
.build()
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Failed to build PutObjectInput"))?;
match s3_client.put_object(input).await {
Ok(_) => {
debug!("Successfully created directory marker");
Ok(())
}
Err(e) => {
error!("Failed to create directory marker: {}", e);
let protocol_error = map_s3_error_to_ftps(&e);
Err(Error::new(ErrorKind::PermanentFileNotAvailable, protocol_error))
}
}
}
async fn rename<P: AsRef<Path> + Send + Debug>(&self, _user: &super::server::FtpsUser, _from: P, _to: P) -> Result<()> {
// Rename/copy operations are not supported in FTPS
Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Rename operation not supported"))
}
/// Remove directory
async fn rmd<P: AsRef<Path> + Send + Debug>(&self, user: &super::server::FtpsUser, path: P) -> Result<()> {
debug!("FTPS RMD request for path: {:?}", path);
let s3_client = self.create_s3_client_for_user(user)?;
let session_context = self.get_session_context_from_user(user)?;
let path_str = path.as_ref().to_string_lossy();
let (bucket, key) = self.parse_path(&path_str)?;
if let Some(key) = key {
// Remove directory inside bucket
let dir_key = path::retain_slash(&key);
let action = S3Action::DeleteObject;
if !is_operation_supported(crate::protocols::session::context::Protocol::Ftps, &action) {
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Operation not supported"));
}
// Authorize the operation
authorize_operation(&session_context, &action, &bucket, Some(&dir_key))
.await
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"))?;
// Save references for debug output after build
let bucket_for_log = bucket.clone();
let dir_key_for_log = dir_key.clone();
let mut builder = s3s::dto::DeleteObjectInput::builder();
builder = builder.bucket(bucket);
builder = builder.key(dir_key);
let input = builder
.build()
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Failed to build DeleteObjectInput"))?;
match s3_client.delete_object(input).await {
Ok(_) => {
debug!(
"FTPS RMD - successfully removed directory marker: '{}' in bucket '{}'",
dir_key_for_log, bucket_for_log
);
Ok(())
}
Err(e) => {
error!("FTPS RMD - failed to remove directory marker: {}", e);
let protocol_error = map_s3_error_to_ftps(&e);
Err(Error::new(ErrorKind::PermanentFileNotAvailable, protocol_error))
}
}
} else {
// Delete bucket - check if bucket is empty first
debug!("FTPS RMD - attempting to delete bucket: '{}'", bucket);
let action = S3Action::DeleteBucket;
if !is_operation_supported(crate::protocols::session::context::Protocol::Ftps, &action) {
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Operation not supported"));
}
authorize_operation(&session_context, &action, &bucket, None)
.await
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"))?;
// Check if bucket is empty
let list_input = s3s::dto::ListObjectsV2Input {
bucket: bucket.clone(),
max_keys: Some(1),
..Default::default()
};
match s3_client.list_objects_v2(list_input).await {
Ok(output) => {
if let Some(objects) = output.contents {
if !objects.is_empty() {
debug!("FTPS RMD - bucket '{}' is not empty, cannot delete", bucket);
return Err(Error::new(
ErrorKind::PermanentFileNotAvailable,
format!("Bucket '{}' is not empty", bucket),
));
}
}
}
Err(e) => {
debug!("FTPS RMD - failed to list objects: {}", e);
}
}
// Bucket is empty, delete it
let delete_bucket_input = s3s::dto::DeleteBucketInput {
bucket: bucket.clone(),
..Default::default()
};
match s3_client.delete_bucket(delete_bucket_input).await {
Ok(_) => {
debug!("FTPS RMD - successfully deleted bucket: '{}'", bucket);
Ok(())
}
Err(e) => {
error!("FTPS RMD - failed to delete bucket '{}': {}", bucket, e);
let protocol_error = map_s3_error_to_ftps(&e);
Err(Error::new(ErrorKind::PermanentFileNotAvailable, protocol_error))
}
}
}
}
/// Change working directory
async fn cwd<P: AsRef<Path> + Send + Debug>(&self, user: &super::server::FtpsUser, path: P) -> Result<()> {
debug!("FTPS cwd request for path: {:?}", path);
let session_context = self.get_session_context_from_user(user)?;
let path_str = path.as_ref().to_string_lossy();
info!("FTPS cwd - received path: '{}'", path_str);
// Handle special cases
if path_str == "/" || path_str == "/." {
// cd to root directory - always allowed
debug!("FTPS cwd - changing to root directory");
return Ok(());
}
if path_str == "." {
// cd . - stay in current directory
debug!("FTPS cwd - staying in current directory");
return Ok(());
}
if path_str == ".." {
// cd .. from root directory should fail
error!("FTPS cwd - cannot go above root directory");
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Cannot go above root directory"));
}
// Parse the path
let (bucket, key) = self.parse_path(&path_str)?;
debug!("FTPS cwd - parsed bucket: '{}', key: {:?}", bucket, key);
// S3 does not support hierarchical directories - you can only cd to bucket root
// Exception: key being "." means stay in current place (handled earlier), but path::clean may have converted it
if key.is_some() && key.as_ref().map(|k| k != ".").unwrap_or(true) {
error!(
"FTPS cwd - S3 does not support multi-level directories, cannot cd to path with key: {:?}",
key
);
return Err(Error::new(
ErrorKind::PermanentFileNotAvailable,
"S3 does not support multi-level directories. Use absolute path to switch buckets.",
));
}
// Validate feature support
self.validate_feature_support("CWD command")?;
// Verify that the bucket exists by trying to list it
let s3_client = self.create_s3_client_for_user(user)?;
let action = S3Action::HeadBucket;
if !is_operation_supported(crate::protocols::session::context::Protocol::Ftps, &action) {
return Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Operation not supported"));
}
// Authorize the operation first
authorize_operation(&session_context, &action, &bucket, None)
.await
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"))?;
// Check if bucket actually exists
let mut builder = s3s::dto::HeadBucketInput::builder();
builder.set_bucket(bucket.clone());
let input = builder
.build()
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Failed to build HeadBucketInput"))?;
match s3_client.head_bucket(input).await {
Ok(_) => {
debug!("FTPS cwd - bucket '{}' exists and is accessible", bucket);
Ok(())
}
Err(e) => {
error!("FTPS cwd - bucket '{}' does not exist or access denied: {}", bucket, e);
Err(Error::new(ErrorKind::PermanentFileNotAvailable, format!("Bucket '{}' not found", bucket)))
}
}
}
}
/// FTPS metadata implementation
#[derive(Debug, Clone)]
pub struct FtpsMetadata {
/// File size in bytes
size: u64,
/// Whether this is a directory
is_directory: bool,
/// Last modification time (Unix timestamp)
modification_time: u64,
}
impl Metadata for FtpsMetadata {
/// Get file size
fn len(&self) -> u64 {
self.size
}
/// Check if file is empty
fn is_empty(&self) -> bool {
self.size == 0
}
/// Check if this is a directory
fn is_dir(&self) -> bool {
self.is_directory
}
/// Check if this is a file
fn is_file(&self) -> bool {
!self.is_directory
}
/// Check if file is a symbolic link (stub implementation)
///
/// S3 doesn't support symbolic links
fn is_symlink(&self) -> bool {
false
}
/// Get last modification time
fn modified(&self) -> Result<std::time::SystemTime> {
Ok(std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(self.modification_time))
}
/// Get file permissions (stub implementation)
fn gid(&self) -> u32 {
0
}
/// Get file permissions (stub implementation)
fn uid(&self) -> u32 {
0
}
/// Get file permissions (stub implementation)
fn links(&self) -> u64 {
1
}
}

View File

@@ -0,0 +1,18 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! FTPS protocol implementation
pub mod driver;
pub mod server;

View File

@@ -0,0 +1,329 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::protocols::ftps::driver::FtpsDriver;
use crate::protocols::session::context::{Protocol as SessionProtocol, SessionContext};
use crate::protocols::session::principal::ProtocolPrincipal;
use libunftp::{
ServerError,
auth::{AuthenticationError, UserDetail},
options::FtpsRequired,
};
use std::fmt::{Debug, Display, Formatter};
use std::net::{IpAddr, SocketAddr};
use std::path::Path;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::broadcast;
use tracing::{debug, error, info, warn};
const ROOT_PATH: &str = "/";
const DEFAULT_SOURCE_IP: &str = "0.0.0.0";
const PORT_RANGE_SEPARATOR: &str = "-";
const PASSIVE_PORTS_PART_COUNT: usize = 2;
/// FTPS user implementation
#[derive(Debug, Clone)]
pub struct FtpsUser {
/// Username for the FTP session
pub username: String,
/// User's display name
pub name: Option<String>,
/// Session context for this user
pub session_context: SessionContext,
}
impl UserDetail for FtpsUser {
fn home(&self) -> Option<&Path> {
Some(Path::new(ROOT_PATH))
}
}
impl Display for FtpsUser {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match &self.name {
Some(display_name) => write!(f, "FtpsUser({} - {})", self.username, display_name),
None => write!(f, "FtpsUser({})", self.username),
}
}
}
/// FTPS server initialization error
#[derive(Debug, Error)]
pub enum FtpsInitError {
#[error("failed to bind address {0}")]
Bind(#[from] std::io::Error),
#[error("server error: {0}")]
Server(#[from] ServerError),
#[error("invalid FTPS configuration: {0}")]
InvalidConfig(String),
}
/// FTPS server configuration
#[derive(Debug, Clone)]
pub struct FtpsConfig {
/// Server bind address
pub bind_addr: SocketAddr,
/// Passive port range (e.g., "40000-50000")
pub passive_ports: Option<String>,
/// External IP address for passive mode
pub external_ip: Option<String>,
/// Whether FTPS is required
pub ftps_required: bool,
/// Certificate file path
pub cert_file: Option<String>,
/// Private key file path
pub key_file: Option<String>,
}
impl FtpsConfig {
/// Validates the configuration
pub async fn validate(&self) -> Result<(), FtpsInitError> {
if self.ftps_required && (self.cert_file.is_none() || self.key_file.is_none()) {
return Err(FtpsInitError::InvalidConfig(
"FTPS is required but certificate or key file is missing".to_string(),
));
}
if let Some(path) = &self.cert_file {
if !tokio::fs::try_exists(path).await.unwrap_or(false) {
return Err(FtpsInitError::InvalidConfig(format!("Certificate file not found: {}", path)));
}
}
if let Some(path) = &self.key_file {
if !tokio::fs::try_exists(path).await.unwrap_or(false) {
return Err(FtpsInitError::InvalidConfig(format!("Key file not found: {}", path)));
}
}
// Validate passive ports format
if self.passive_ports.is_some() {
self.parse_passive_ports()?;
}
Ok(())
}
/// Parse passive ports range from string format "start-end"
fn parse_passive_ports(&self) -> Result<std::ops::RangeInclusive<u16>, FtpsInitError> {
match &self.passive_ports {
Some(ports) => {
let parts: Vec<&str> = ports.split(PORT_RANGE_SEPARATOR).collect();
if parts.len() != PASSIVE_PORTS_PART_COUNT {
return Err(FtpsInitError::InvalidConfig(format!(
"Invalid passive ports format: {}, expected 'start-end'",
ports
)));
}
let start = parts[0]
.parse::<u16>()
.map_err(|e| FtpsInitError::InvalidConfig(format!("Invalid start port: {}", e)))?;
let end = parts[1]
.parse::<u16>()
.map_err(|e| FtpsInitError::InvalidConfig(format!("Invalid end port: {}", e)))?;
if start > end {
return Err(FtpsInitError::InvalidConfig("Start port cannot be greater than end port".to_string()));
}
Ok(start..=end)
}
None => Err(FtpsInitError::InvalidConfig("No passive ports configured".to_string())),
}
}
}
/// FTPS server implementation
pub struct FtpsServer {
/// Server configuration
config: FtpsConfig,
}
impl FtpsServer {
/// Create a new FTPS server
pub async fn new(config: FtpsConfig) -> Result<Self, FtpsInitError> {
config.validate().await?;
Ok(Self { config })
}
/// Start the FTPS server
///
/// This method binds the listener first to ensure the port is available,
/// then spawns the server loop in a background task.
pub async fn start(&self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<(), FtpsInitError> {
info!("Initializing FTPS server on {}", self.config.bind_addr);
let mut server_builder =
libunftp::ServerBuilder::with_authenticator(Box::new(FtpsDriver::new), Arc::new(FtpsAuthenticator::new()));
// Configure passive ports for data connections
if let Some(passive_ports) = &self.config.passive_ports {
let range = self.config.parse_passive_ports()?;
info!("Configuring FTPS passive ports range: {:?} ({})", range, passive_ports);
server_builder = server_builder.passive_ports(range);
} else {
warn!("No passive ports configured, using system-assigned ports");
}
// Configure external IP address for passive mode
if let Some(ref external_ip) = self.config.external_ip {
info!("Configuring FTPS external IP for passive mode: {}", external_ip);
server_builder = server_builder.passive_host(external_ip.as_str());
}
// Configure FTPS / TLS
if let Some(cert) = &self.config.cert_file {
if let Some(key) = &self.config.key_file {
debug!("Enabling FTPS with cert: {} and key: {}", cert, key);
server_builder = server_builder.ftps(cert, key);
if self.config.ftps_required {
info!("FTPS is explicitly required for all connections");
server_builder = server_builder.ftps_required(FtpsRequired::All, FtpsRequired::All);
}
}
} else if self.config.ftps_required {
return Err(FtpsInitError::InvalidConfig("FTPS required but certificates not provided".into()));
}
// Build the server instance
let server = server_builder.build().map_err(FtpsInitError::Server)?;
// libunftp's listen() binds to the address and runs the loop
let bind_addr = self.config.bind_addr.to_string();
let server_handle = tokio::spawn(async move {
if let Err(e) = server.listen(bind_addr).await {
error!("FTPS server runtime error: {}", e);
return Err(FtpsInitError::Server(e));
}
Ok(())
});
// Wait for shutdown signal or server failure
tokio::select! {
result = server_handle => {
match result {
Ok(Ok(())) => {
info!("FTPS server stopped normally");
Ok(())
}
Ok(Err(e)) => {
error!("FTPS server internal error: {}", e);
Err(e)
}
Err(e) => {
error!("FTPS server panic or task cancellation: {}", e);
Err(FtpsInitError::Bind(std::io::Error::other(e.to_string())))
}
}
}
_ = shutdown_rx.recv() => {
info!("FTPS server received shutdown signal");
// libunftp listen() is not easily cancellable gracefully without dropping the future.
// The select! dropping server_handle will close the listener.
Ok(())
}
}
}
/// Get server configuration
pub fn config(&self) -> &FtpsConfig {
&self.config
}
}
/// FTPS authenticator implementation
#[derive(Debug, Default)]
pub struct FtpsAuthenticator;
impl FtpsAuthenticator {
/// Create a new FTPS authenticator
pub fn new() -> Self {
Self
}
}
#[async_trait::async_trait]
impl libunftp::auth::Authenticator<FtpsUser> for FtpsAuthenticator {
/// Authenticate FTP user against RustFS IAM system
async fn authenticate(&self, username: &str, creds: &libunftp::auth::Credentials) -> Result<FtpsUser, AuthenticationError> {
use rustfs_credentials::Credentials as S3Credentials;
use rustfs_iam::get;
debug!("FTPS authentication attempt for user: {}", username);
// Access IAM system
let iam_sys = get().map_err(|e| {
error!("IAM system unavailable during FTPS auth: {}", e);
AuthenticationError::ImplPropagated("Internal authentication service unavailable".to_string(), Some(Box::new(e)))
})?;
// Map FTP credentials to S3 Credentials structure
// Note: FTP PASSWORD is treated as S3 SECRET KEY
let s3_creds = S3Credentials {
access_key: username.to_string(),
secret_key: creds.password.clone().unwrap_or_default(),
// Fields below are not used for authentication verification, but for struct compliance
session_token: String::new(),
expiration: None,
status: String::new(),
parent_user: String::new(),
groups: None,
claims: None,
name: None,
description: None,
};
let (user_identity, is_valid) = iam_sys.check_key(&s3_creds.access_key).await.map_err(|e| {
error!("IAM check_key failed for {}: {}", username, e);
AuthenticationError::ImplPropagated("Authentication verification failed".to_string(), Some(Box::new(e)))
})?;
if !is_valid {
warn!("FTPS login failed: Invalid access key '{}'", username);
return Err(AuthenticationError::BadUser);
}
let identity = user_identity.ok_or_else(|| {
error!("User identity missing despite valid key for {}", username);
AuthenticationError::BadUser
})?;
// Constant time comparison is preferred if available, but for now simple eq
if !identity.credentials.secret_key.eq(&s3_creds.secret_key) {
warn!("FTPS login failed: Invalid secret key for '{}'", username);
return Err(AuthenticationError::BadPassword);
}
// Policy conditions relying on `aws:SourceIp` will currently not work correctly for FTP.
// TODO: Investigate wrapping the authenticator or using Proxy Protocol metadata if available in future libunftp versions.
let source_ip: IpAddr = DEFAULT_SOURCE_IP.parse().unwrap();
let session_context =
SessionContext::new(ProtocolPrincipal::new(Arc::new(identity.clone())), SessionProtocol::Ftps, source_ip);
let ftps_user = FtpsUser {
username: username.to_string(),
name: identity.credentials.name.clone(),
session_context,
};
info!("FTPS user '{}' authenticated successfully", username);
Ok(ftps_user)
}
}

View File

@@ -0,0 +1,110 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_policy::policy::action::S3Action as PolicyS3Action;
/// S3 actions that can be performed through the gateway
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum S3Action {
// Bucket operations
CreateBucket,
DeleteBucket,
ListBucket,
ListBuckets,
HeadBucket,
// Object operations
GetObject,
PutObject,
DeleteObject,
HeadObject,
// Multipart operations
CreateMultipartUpload,
UploadPart,
CompleteMultipartUpload,
AbortMultipartUpload,
ListMultipartUploads,
ListParts,
// ACL operations
GetBucketAcl,
PutBucketAcl,
GetObjectAcl,
PutObjectAcl,
// Other operations
CopyObject,
}
impl From<S3Action> for PolicyS3Action {
fn from(action: S3Action) -> Self {
match action {
S3Action::CreateBucket => PolicyS3Action::CreateBucketAction,
S3Action::DeleteBucket => PolicyS3Action::DeleteBucketAction,
S3Action::ListBucket => PolicyS3Action::ListBucketAction,
S3Action::ListBuckets => PolicyS3Action::ListAllMyBucketsAction,
S3Action::HeadBucket => PolicyS3Action::HeadBucketAction,
S3Action::GetObject => PolicyS3Action::GetObjectAction,
S3Action::PutObject => PolicyS3Action::PutObjectAction,
S3Action::DeleteObject => PolicyS3Action::DeleteObjectAction,
S3Action::HeadObject => PolicyS3Action::GetObjectAction,
S3Action::CreateMultipartUpload => PolicyS3Action::PutObjectAction,
S3Action::UploadPart => PolicyS3Action::PutObjectAction,
S3Action::CompleteMultipartUpload => PolicyS3Action::PutObjectAction,
S3Action::AbortMultipartUpload => PolicyS3Action::AbortMultipartUploadAction,
S3Action::ListMultipartUploads => PolicyS3Action::ListBucketMultipartUploadsAction,
S3Action::ListParts => PolicyS3Action::ListMultipartUploadPartsAction,
S3Action::GetBucketAcl => PolicyS3Action::GetBucketPolicyAction,
S3Action::PutBucketAcl => PolicyS3Action::PutBucketPolicyAction,
S3Action::GetObjectAcl => PolicyS3Action::GetObjectAction,
S3Action::PutObjectAcl => PolicyS3Action::PutObjectAction,
S3Action::CopyObject => PolicyS3Action::PutObjectAction,
}
}
}
impl From<S3Action> for rustfs_policy::policy::action::Action {
fn from(action: S3Action) -> Self {
rustfs_policy::policy::action::Action::S3Action(action.into())
}
}
impl S3Action {
/// Get the string representation of the action
pub fn as_str(&self) -> &'static str {
match self {
S3Action::CreateBucket => "s3:CreateBucket",
S3Action::DeleteBucket => "s3:DeleteBucket",
S3Action::ListBucket => "s3:ListBucket",
S3Action::ListBuckets => "s3:ListAllMyBuckets",
S3Action::HeadBucket => "s3:ListBucket",
S3Action::GetObject => "s3:GetObject",
S3Action::PutObject => "s3:PutObject",
S3Action::DeleteObject => "s3:DeleteObject",
S3Action::HeadObject => "s3:GetObject",
S3Action::CreateMultipartUpload => "s3:PutObject",
S3Action::UploadPart => "s3:PutObject",
S3Action::CompleteMultipartUpload => "s3:PutObject",
S3Action::AbortMultipartUpload => "s3:AbortMultipartUpload",
S3Action::ListMultipartUploads => "s3:ListBucketMultipartUploads",
S3Action::ListParts => "s3:ListMultipartUploadParts",
S3Action::GetBucketAcl => "s3:GetBucketAcl",
S3Action::PutBucketAcl => "s3:PutBucketAcl",
S3Action::GetObjectAcl => "s3:GetObjectAcl",
S3Action::PutObjectAcl => "s3:PutObjectAcl",
S3Action::CopyObject => "s3:PutObject",
}
}
}

View File

@@ -0,0 +1,85 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Protocol to S3 action adapter
use super::action::S3Action;
use crate::protocols::session::context::Protocol;
pub fn is_operation_supported(protocol: Protocol, action: &S3Action) -> bool {
match protocol {
Protocol::Ftps => match action {
// Bucket operations: FTPS cannot create buckets via protocol commands
S3Action::CreateBucket => false,
S3Action::DeleteBucket => false,
// Object operations: All file operations supported
S3Action::GetObject => true, // RETR command
S3Action::PutObject => true, // STOR and APPE commands both map to PutObject
S3Action::DeleteObject => true, // DELE command
S3Action::HeadObject => true, // SIZE command
// Multipart operations: FTPS has no native multipart upload support
S3Action::CreateMultipartUpload => false,
S3Action::UploadPart => false,
S3Action::CompleteMultipartUpload => false,
S3Action::AbortMultipartUpload => false,
S3Action::ListMultipartUploads => false,
S3Action::ListParts => false,
// ACL operations: FTPS has no native ACL support
S3Action::GetBucketAcl => false,
S3Action::PutBucketAcl => false,
S3Action::GetObjectAcl => false,
S3Action::PutObjectAcl => false,
// Other operations
S3Action::CopyObject => false, // No native copy support in FTPS
S3Action::ListBucket => true, // LIST command
S3Action::ListBuckets => true, // LIST at root level
S3Action::HeadBucket => true, // Can check if directory exists
},
Protocol::Sftp => match action {
// Bucket operations: SFTP can create/delete buckets via mkdir/rmdir
S3Action::CreateBucket => true,
S3Action::DeleteBucket => true,
// Object operations: All file operations supported
S3Action::GetObject => true, // RealPath + Open + Read
S3Action::PutObject => true, // Open + Write
S3Action::DeleteObject => true, // Remove
S3Action::HeadObject => true, // Stat/Fstat
// Multipart operations: SFTP has no native multipart upload support
S3Action::CreateMultipartUpload => false,
S3Action::UploadPart => false,
S3Action::CompleteMultipartUpload => false,
S3Action::AbortMultipartUpload => false,
S3Action::ListMultipartUploads => false,
S3Action::ListParts => false,
// ACL operations: SFTP has no native ACL support
S3Action::GetBucketAcl => false,
S3Action::PutBucketAcl => false,
S3Action::GetObjectAcl => false,
S3Action::PutObjectAcl => false,
// Other operations
S3Action::CopyObject => false, // No remote copy, only local rename
S3Action::ListBucket => true, // Readdir
S3Action::ListBuckets => true, // Readdir at root
S3Action::HeadBucket => true, // Stat on directory
},
}
}

View File

@@ -0,0 +1,97 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::action::S3Action;
use super::adapter::is_operation_supported;
use crate::protocols::session::context::SessionContext;
use rustfs_credentials;
use rustfs_iam::get;
use rustfs_policy::policy::Args;
use std::collections::HashMap;
use tracing::{debug, error};
/// Check if a principal is allowed to perform an S3 action
pub async fn is_authorized(session_context: &SessionContext, action: &S3Action, bucket: &str, object: Option<&str>) -> bool {
let iam_sys = match get() {
Ok(sys) => sys,
Err(e) => {
error!("IAM system unavailable: {}", e);
return false;
}
};
// Create policy arguments
let mut claims = HashMap::new();
claims.insert(
"principal".to_string(),
serde_json::Value::String(session_context.principal.access_key().to_string()),
);
let policy_action: rustfs_policy::policy::action::Action = action.clone().into();
// Check if user is the owner (admin)
let is_owner = if let Some(global_cred) = rustfs_credentials::get_global_action_cred() {
session_context.principal.access_key() == global_cred.access_key
} else {
false
};
let args = Args {
account: session_context.principal.access_key(),
groups: &session_context.principal.user_identity.credentials.groups,
action: policy_action,
bucket,
conditions: &HashMap::new(),
is_owner,
object: object.unwrap_or(""),
claims: &claims,
deny_only: false,
};
debug!(
"FTPS AUTH - Checking authorization: account={}, action={:?}, bucket='{}', object={:?}",
args.account, args.action, args.bucket, args.object
);
let allowed = iam_sys.is_allowed(&args).await;
debug!("FTPS AUTH - Authorization result: {}", allowed);
allowed
}
/// Unified authorization entry point for all protocols
pub async fn authorize_operation(
session_context: &SessionContext,
action: &S3Action,
bucket: &str,
object: Option<&str>,
) -> Result<(), AuthorizationError> {
// First check if the operation is supported
if !is_operation_supported(session_context.protocol.clone(), action) {
return Err(AuthorizationError::AccessDenied);
}
// Then check IAM authorization
if is_authorized(session_context, action, bucket, object).await {
Ok(())
} else {
Err(AuthorizationError::AccessDenied)
}
}
/// Authorization errors
#[derive(Debug, thiserror::Error)]
pub enum AuthorizationError {
#[error("Access denied")]
AccessDenied,
}

View File

@@ -0,0 +1,76 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// FTP error code constants
pub mod ftp_errors {
pub const FILE_NOT_FOUND: &str = "550 File not found";
pub const DIRECTORY_NOT_FOUND: &str = "550 Directory not found";
pub const PERMISSION_DENIED: &str = "550 Permission denied";
pub const DIRECTORY_NOT_EMPTY: &str = "550 Directory not empty";
pub const DIRECTORY_ALREADY_EXISTS: &str = "550 Directory already exists";
pub const INVALID_DIRECTORY_NAME: &str = "553 Invalid directory name";
pub const INVALID_FILE_NAME: &str = "553 Invalid file name";
pub const INVALID_REQUEST: &str = "501 Invalid request";
pub const INTERNAL_SERVER_ERROR: &str = "421 Internal server error";
}
// FTP error messages mapping
pub fn map_s3_error_to_ftp_string(s3_error: &s3s::S3Error) -> String {
match s3_error.code() {
s3s::S3ErrorCode::NoSuchKey => ftp_errors::FILE_NOT_FOUND.to_string(),
s3s::S3ErrorCode::NoSuchBucket => ftp_errors::DIRECTORY_NOT_FOUND.to_string(),
s3s::S3ErrorCode::AccessDenied => ftp_errors::PERMISSION_DENIED.to_string(),
s3s::S3ErrorCode::BucketNotEmpty => ftp_errors::DIRECTORY_NOT_EMPTY.to_string(),
s3s::S3ErrorCode::BucketAlreadyExists => ftp_errors::DIRECTORY_ALREADY_EXISTS.to_string(),
s3s::S3ErrorCode::InvalidBucketName => ftp_errors::INVALID_DIRECTORY_NAME.to_string(),
s3s::S3ErrorCode::InvalidObjectState => ftp_errors::INVALID_FILE_NAME.to_string(),
s3s::S3ErrorCode::InvalidRequest => ftp_errors::INVALID_REQUEST.to_string(),
s3s::S3ErrorCode::InternalError => ftp_errors::INTERNAL_SERVER_ERROR.to_string(),
_ => ftp_errors::INTERNAL_SERVER_ERROR.to_string(),
}
}
/// Map S3Error to FTPS libunftp Error
pub fn map_s3_error_to_ftps(s3_error: &s3s::S3Error) -> libunftp::storage::Error {
use libunftp::storage::{Error, ErrorKind};
match s3_error.code() {
s3s::S3ErrorCode::NoSuchKey | s3s::S3ErrorCode::NoSuchBucket => {
Error::new(ErrorKind::PermanentFileNotAvailable, map_s3_error_to_ftp_string(s3_error))
}
s3s::S3ErrorCode::AccessDenied => Error::new(ErrorKind::PermissionDenied, map_s3_error_to_ftp_string(s3_error)),
s3s::S3ErrorCode::InvalidRequest | s3s::S3ErrorCode::InvalidBucketName | s3s::S3ErrorCode::InvalidObjectState => {
Error::new(ErrorKind::PermanentFileNotAvailable, map_s3_error_to_ftp_string(s3_error))
}
_ => Error::new(ErrorKind::PermanentFileNotAvailable, map_s3_error_to_ftp_string(s3_error)),
}
}
/// Map S3Error directly to SFTP StatusCode
pub fn map_s3_error_to_sftp_status(s3_error: &s3s::S3Error) -> russh_sftp::protocol::StatusCode {
use russh_sftp::protocol::StatusCode;
match s3_error.code() {
s3s::S3ErrorCode::NoSuchKey => StatusCode::NoSuchFile, // SSH_FX_NO_SUCH_FILE (2)
s3s::S3ErrorCode::NoSuchBucket => StatusCode::NoSuchFile, // SSH_FX_NO_SUCH_FILE (2)
s3s::S3ErrorCode::AccessDenied => StatusCode::PermissionDenied, // SSH_FX_PERMISSION_DENIED (3)
s3s::S3ErrorCode::BucketNotEmpty => StatusCode::Failure, // SSH_FX_DIR_NOT_EMPTY (21)
s3s::S3ErrorCode::BucketAlreadyExists => StatusCode::Failure, // SSH_FX_FILE_ALREADY_EXISTS (17)
s3s::S3ErrorCode::InvalidBucketName => StatusCode::Failure, // SSH_FX_INVALID_FILENAME (22)
s3s::S3ErrorCode::InvalidObjectState => StatusCode::Failure, // SSH_FX_INVALID_FILENAME (22)
s3s::S3ErrorCode::InvalidRequest => StatusCode::OpUnsupported, // SSH_FX_OP_UNSUPPORTED (5)
s3s::S3ErrorCode::InternalError => StatusCode::Failure, // SSH_FX_FAILURE (4)
_ => StatusCode::Failure, // SSH_FX_FAILURE as default
}
}

View File

@@ -0,0 +1,21 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Gateway module for protocol implementations
pub mod action;
pub mod adapter;
pub mod authorize;
pub mod error;
pub mod restrictions;

View File

@@ -0,0 +1,56 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Unsupported FTP features list
pub const UNSUPPORTED_FTP_FEATURES: &[&str] = &[
// Atomic rename operations (must be implemented via CopyObject+DeleteObject)
"Atomic RNFR/RNTO rename",
// File append operations (S3 does not support native append)
"APPE command (file append)",
// POSIX permission operations (S3 uses ACLs and Policies)
"chmod command",
"chown command",
// Symbolic links (S3 object storage does not support)
"SYMLINK creation",
// Hard links (S3 object storage does not support)
"HARD LINK creation",
// File locking (S3 does not support filesystem-level locking)
"File locking mechanism",
// Direct directory rename (must be implemented via object copy)
"Directory atomic rename",
];
/// Check if an FTP feature is supported
pub fn is_ftp_feature_supported(feature: &str) -> bool {
!UNSUPPORTED_FTP_FEATURES.contains(&feature)
}
/// Get S3 equivalent operation for unsupported features
pub fn get_s3_equivalent_operation(unsupported_feature: &str) -> Option<&'static str> {
match unsupported_feature {
"Atomic RNFR/RNTO rename" | "SSH_FXP_RENAME atomic rename" | "Directory atomic rename" => {
Some("Use CopyObject + DeleteObject to implement rename")
}
"APPE command (file append)" | "SSH_FXP_OPEN append mode" => Some("Use PutObject to overwrite the entire object"),
"chmod command"
| "chown command"
| "SSH_FXP_SETSTAT permission modification"
| "SSH_FXP_FSETSTAT permission modification" => Some("Use S3 ACLs or Bucket Policies to manage permissions"),
"SYMLINK creation" | "SSH_FXP_SYMLINK creation" => Some("S3 object storage does not support symbolic links"),
"File locking mechanism" | "SSH_FXP_BLOCK file locking" => {
Some("Use S3 object versioning or conditional writes for concurrency control")
}
_ => None,
}
}

View File

@@ -0,0 +1,19 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod client;
pub mod ftps;
pub mod gateway;
pub mod session;
pub mod sftp;

View File

@@ -0,0 +1,54 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Session context for protocol implementations
use crate::protocols::session::principal::ProtocolPrincipal;
use std::net::IpAddr;
/// Protocol types
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Protocol {
Ftps,
Sftp,
}
/// Session context for protocol operations
#[derive(Debug, Clone)]
pub struct SessionContext {
/// The protocol principal (authenticated user)
pub principal: ProtocolPrincipal,
/// The protocol type
pub protocol: Protocol,
/// The source IP address
pub source_ip: IpAddr,
}
impl SessionContext {
/// Create a new session context
pub fn new(principal: ProtocolPrincipal, protocol: Protocol, source_ip: IpAddr) -> Self {
Self {
principal,
protocol,
source_ip,
}
}
/// Get the access key for this session
pub fn access_key(&self) -> &str {
self.principal.access_key()
}
}

View File

@@ -0,0 +1,18 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Session management for protocol implementations
pub mod context;
pub mod principal;

View File

@@ -0,0 +1,35 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_policy::auth::UserIdentity;
use std::sync::Arc;
/// Protocol principal representing an authenticated user
#[derive(Debug, Clone)]
pub struct ProtocolPrincipal {
/// User identity from IAM system
pub user_identity: Arc<UserIdentity>,
}
impl ProtocolPrincipal {
/// Create a new protocol principal
pub fn new(user_identity: Arc<UserIdentity>) -> Self {
Self { user_identity }
}
/// Get the access key for this principal
pub fn access_key(&self) -> &str {
&self.user_identity.credentials.access_key
}
}

View File

@@ -0,0 +1,929 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::protocols::client::s3::ProtocolS3Client;
use crate::protocols::gateway::action::S3Action;
use crate::protocols::gateway::authorize::authorize_operation;
use crate::protocols::gateway::error::map_s3_error_to_sftp_status;
use crate::protocols::session::context::SessionContext;
use futures::TryStreamExt;
use russh_sftp::protocol::{Attrs, Data, File, FileAttributes, Handle, Name, OpenFlags, Status, StatusCode, Version};
use russh_sftp::server::Handler;
use rustfs_utils::path;
use s3s::dto::{DeleteBucketInput, DeleteObjectInput, GetObjectInput, ListObjectsV2Input, PutObjectInput, StreamingBlob};
use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use tokio::fs::{File as TokioFile, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::sync::RwLock;
use tracing::{debug, error, trace};
use uuid::Uuid;
const INITIAL_HANDLE_ID: u32 = 1;
const ROOT_PATH: &str = "/";
const CURRENT_DIR: &str = ".";
const PARENT_DIR: &str = "..";
const HANDLE_ID_PREFIX: &str = "handle_";
const PATH_SEPARATOR: &str = "/";
const PERMISSION_DENIED_PATH: &str = "..";
const DIR_MODE: u32 = 0o040000;
const FILE_MODE: u32 = 0o100000;
const DIR_PERMISSIONS: u32 = 0o755;
const FILE_PERMISSIONS: u32 = 0o644;
/// State associated with an open file handle
#[derive(Debug)]
enum HandleState {
Read {
path: String,
bucket: String,
key: String,
},
Write {
path: String,
bucket: String,
key: String,
temp_file_path: PathBuf,
file_handle: Option<TokioFile>,
},
Dir {
path: String,
files: Vec<File>,
offset: usize,
},
}
#[derive(Clone)]
pub struct SftpHandler {
session_context: SessionContext,
handles: Arc<RwLock<HashMap<String, HandleState>>>,
next_handle_id: Arc<AtomicU32>,
temp_dir: PathBuf,
current_dir: Arc<RwLock<String>>,
}
impl SftpHandler {
pub fn new(session_context: SessionContext) -> Self {
Self {
session_context,
handles: Arc::new(RwLock::new(HashMap::new())),
next_handle_id: Arc::new(AtomicU32::new(INITIAL_HANDLE_ID)),
temp_dir: std::env::temp_dir(),
current_dir: Arc::new(RwLock::new(ROOT_PATH.to_string())),
}
}
fn create_s3_client(&self) -> Result<ProtocolS3Client, StatusCode> {
// Create FS instance (empty struct that accesses global ECStore)
let fs = crate::storage::ecfs::FS {};
let client = ProtocolS3Client::new(fs, self.session_context.access_key().to_string());
Ok(client)
}
fn parse_path(&self, path_str: &str) -> Result<(String, Option<String>), StatusCode> {
if path_str.contains(PERMISSION_DENIED_PATH) {
return Err(StatusCode::PermissionDenied);
}
// Clean the path to normalize
let cleaned_path = path::clean(path_str);
let (bucket, object) = path::path_to_bucket_object(&cleaned_path);
let key = if object.is_empty() { None } else { Some(object) };
debug!(
"SFTP parse_path - input: '{}', cleaned: '{}', bucket: '{}', key: {:?}",
path_str, cleaned_path, bucket, key
);
Ok((bucket, key))
}
fn generate_handle_id(&self) -> String {
let id = self.next_handle_id.fetch_add(1, Ordering::Relaxed);
format!("{}{}", HANDLE_ID_PREFIX, id)
}
/// Convert relative path to absolute path based on current directory
async fn resolve_path(&self, path_str: &str) -> String {
let current = self.current_dir.read().await;
if path_str.starts_with(PATH_SEPARATOR) {
// Absolute path
return path::clean(path_str).to_string();
}
// Relative path
if path_str == CURRENT_DIR {
current.clone()
} else if path_str == PARENT_DIR {
if *current == ROOT_PATH {
ROOT_PATH.to_string()
} else {
let parent = std::path::Path::new(&*current)
.parent()
.map(|p| p.to_str().unwrap())
.unwrap_or(ROOT_PATH);
path::clean(parent).to_string()
}
} else {
// Join current directory with path
let joined = if *current == ROOT_PATH {
format!("{}{}", PATH_SEPARATOR, path_str.trim_start_matches(PATH_SEPARATOR))
} else {
format!(
"{}{}{}",
current.trim_end_matches(PATH_SEPARATOR),
PATH_SEPARATOR,
path_str.trim_start_matches(PATH_SEPARATOR)
)
};
path::clean(&joined).to_string()
}
}
async fn cleanup_state(&self, state: HandleState) {
if let HandleState::Write { temp_file_path, .. } = state {
let _ = tokio::fs::remove_file(temp_file_path).await;
}
}
async fn do_stat(&self, path: String) -> Result<FileAttributes, StatusCode> {
debug!("SFTP do_stat - input path: '{}'", path);
let (bucket, key_opt) = self.parse_path(&path)?;
if bucket.is_empty() {
let mut attrs = FileAttributes::default();
attrs.set_dir(true);
attrs.size = Some(0);
let current_mode = attrs.permissions.unwrap_or(0);
attrs.permissions = Some(current_mode | DIR_MODE | DIR_PERMISSIONS);
return Ok(attrs);
}
let action = if key_opt.is_none() {
S3Action::HeadBucket
} else {
S3Action::HeadObject
};
debug!("SFTP do_stat - parsed bucket: '{}', key: {:?}, action: {:?}", bucket, key_opt, action);
authorize_operation(&self.session_context, &action, &bucket, key_opt.as_deref())
.await
.map_err(|_| StatusCode::PermissionDenied)?;
let s3_client = self.create_s3_client()?;
match action {
S3Action::HeadBucket => {
let input = s3s::dto::HeadBucketInput {
bucket,
..Default::default()
};
match s3_client.head_bucket(input).await {
Ok(_) => {
let mut attrs = FileAttributes::default();
attrs.set_dir(true);
attrs.size = Some(0);
attrs.permissions = Some(DIR_PERMISSIONS | DIR_MODE);
attrs.mtime = Some(0);
Ok(attrs)
}
Err(_) => Err(StatusCode::NoSuchFile),
}
}
S3Action::HeadObject => {
let key = key_opt.expect("key_opt should be Some for HeadObject action");
let input = s3s::dto::HeadObjectInput {
bucket,
key,
..Default::default()
};
match s3_client.head_object(input).await {
Ok(out) => {
let mut attrs = FileAttributes::default();
attrs.set_dir(false);
attrs.size = Some(out.content_length.unwrap_or(0) as u64);
if let Some(lm) = out.last_modified {
let dt = time::OffsetDateTime::from(lm);
attrs.mtime = Some(dt.unix_timestamp() as u32);
}
attrs.permissions = Some(FILE_PERMISSIONS | FILE_MODE);
Ok(attrs)
}
Err(_) => Err(StatusCode::NoSuchFile),
}
}
_ => {
error!("SFTP do_stat - Unexpected action type");
Err(StatusCode::Failure)
}
}
}
}
impl Handler for SftpHandler {
type Error = StatusCode;
fn unimplemented(&self) -> Self::Error {
StatusCode::OpUnsupported
}
async fn init(&mut self, version: u32, _extensions: HashMap<String, String>) -> Result<Version, Self::Error> {
trace!("SFTP Init version: {}", version);
Ok(Version::new())
}
fn open(
&mut self,
id: u32,
filename: String,
pflags: OpenFlags,
_attrs: FileAttributes,
) -> impl Future<Output = Result<Handle, Self::Error>> + Send {
let this = self.clone();
async move {
debug!("SFTP Open: {} (flags: {:?})", filename, pflags);
// Resolve relative path to absolute path
let resolved_filename = this.resolve_path(&filename).await;
let (bucket, key_opt) = this.parse_path(&resolved_filename)?;
if bucket.is_empty() {
return Err(StatusCode::PermissionDenied); // Cannot open root directory as file
}
let key = key_opt.ok_or(StatusCode::PermissionDenied)?; // Cannot open bucket as file
let handle_id = this.generate_handle_id();
let state;
if pflags.contains(OpenFlags::WRITE) || pflags.contains(OpenFlags::CREATE) || pflags.contains(OpenFlags::TRUNCATE) {
let action = S3Action::PutObject;
authorize_operation(&this.session_context, &action, &bucket, Some(&key))
.await
.map_err(|_| StatusCode::PermissionDenied)?;
if pflags.contains(OpenFlags::APPEND) {
return Err(StatusCode::OpUnsupported);
}
let temp_filename = format!("rustfs-sftp-{}.tmp", Uuid::new_v4());
let temp_path = this.temp_dir.join(temp_filename);
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&temp_path)
.await
.map_err(|e| {
error!("Failed to create temp file: {}", e);
StatusCode::Failure
})?;
state = HandleState::Write {
path: filename.clone(),
bucket,
key,
temp_file_path: temp_path,
file_handle: Some(file),
};
} else {
let action = S3Action::GetObject;
authorize_operation(&this.session_context, &action, &bucket, Some(&key))
.await
.map_err(|_| StatusCode::PermissionDenied)?;
state = HandleState::Read {
path: filename.clone(),
bucket,
key,
};
}
this.handles.write().await.insert(handle_id.clone(), state);
Ok(Handle { id, handle: handle_id })
}
}
fn close(&mut self, id: u32, handle: String) -> impl Future<Output = Result<Status, Self::Error>> + Send {
let this = self.clone();
async move {
let state = this.handles.write().await.remove(&handle);
match state {
Some(HandleState::Write {
bucket,
key,
temp_file_path,
mut file_handle,
..
}) => {
let mut file = file_handle.take().ok_or(StatusCode::Failure)?;
if let Err(e) = file.flush().await {
error!("Flush to disk failed: {}", e);
let _ = tokio::fs::remove_file(&temp_file_path).await;
return Err(StatusCode::Failure);
}
let metadata = file.metadata().await.map_err(|e| {
error!("Failed to get metadata: {}", e);
StatusCode::Failure
})?;
let file_size = metadata.len();
if let Err(e) = file.seek(std::io::SeekFrom::Start(0)).await {
error!("Seek temp file failed: {}", e);
let _ = tokio::fs::remove_file(&temp_file_path).await;
return Err(StatusCode::Failure);
}
let s3_client = match this.create_s3_client() {
Ok(c) => c,
Err(e) => {
let _ = tokio::fs::remove_file(&temp_file_path).await;
return Err(e);
}
};
let stream = tokio_util::io::ReaderStream::new(file);
let body = StreamingBlob::wrap(stream);
let input = PutObjectInput::builder()
.bucket(bucket.clone())
.key(key.clone())
.body(Option::from(body))
.content_length(Option::from(file_size as i64)) // 告诉 S3 文件多大
.build()
.unwrap();
let result = match s3_client.put_object(input).await {
Ok(_) => Status {
id,
status_code: StatusCode::Ok,
error_message: "Success".into(),
language_tag: "en".into(),
},
Err(e) => {
error!("S3 PutObject failed: {}", e);
let status_code = map_s3_error_to_sftp_status(&e);
return Err(status_code);
}
};
let _ = tokio::fs::remove_file(&temp_file_path).await;
Ok(result)
}
Some(state) => {
this.cleanup_state(state).await;
Ok(Status {
id,
status_code: StatusCode::Ok,
error_message: "Success".into(),
language_tag: "en".into(),
})
}
None => Err(StatusCode::NoSuchFile),
}
}
}
fn read(&mut self, id: u32, handle: String, offset: u64, len: u32) -> impl Future<Output = Result<Data, Self::Error>> + Send {
let this = self.clone();
async move {
let (bucket, key) = {
let guard = this.handles.read().await;
match guard.get(&handle) {
Some(HandleState::Read { bucket, key, .. }) => (bucket.clone(), key.clone()),
Some(_) => return Err(StatusCode::OpUnsupported),
None => return Err(StatusCode::NoSuchFile),
}
};
let range_end = offset + (len as u64) - 1;
let mut builder = GetObjectInput::builder();
builder.set_bucket(bucket);
builder.set_key(key);
if let Ok(range) = s3s::dto::Range::parse(&format!("bytes={}-{}", offset, range_end)) {
builder.set_range(Some(range));
}
let s3_client = this.create_s3_client()?;
let input = builder.build().map_err(|_| StatusCode::Failure)?;
match s3_client.get_object(input).await {
Ok(output) => {
let mut data = Vec::with_capacity(len as usize);
if let Some(body) = output.body {
let stream = body.map_err(std::io::Error::other);
let mut reader = tokio_util::io::StreamReader::new(stream);
let _ = reader.read_to_end(&mut data).await;
}
Ok(Data { id, data })
}
Err(e) => {
debug!("S3 Read failed: {}", e);
Ok(Data { id, data: Vec::new() })
}
}
}
}
fn write(
&mut self,
id: u32,
handle: String,
offset: u64,
data: Vec<u8>,
) -> impl Future<Output = Result<Status, Self::Error>> + Send {
let this = self.clone();
async move {
let mut guard = this.handles.write().await;
if let Some(HandleState::Write { file_handle, .. }) = guard.get_mut(&handle) {
if let Some(file) = file_handle {
if let Err(e) = file.seek(std::io::SeekFrom::Start(offset)).await {
error!("File seek failed: {}", e);
return Err(StatusCode::Failure);
}
if let Err(e) = file.write_all(&data).await {
error!("File write failed: {}", e);
return Err(StatusCode::Failure);
}
Ok(Status {
id,
status_code: StatusCode::Ok,
error_message: "Success".into(),
language_tag: "en".into(),
})
} else {
Err(StatusCode::Failure)
}
} else {
Err(StatusCode::NoSuchFile)
}
}
}
fn lstat(&mut self, id: u32, path: String) -> impl Future<Output = Result<Attrs, Self::Error>> + Send {
let this = self.clone();
async move {
let resolved = this.resolve_path(&path).await;
let attrs = this.do_stat(resolved).await?;
Ok(Attrs { id, attrs })
}
}
fn fstat(&mut self, id: u32, handle: String) -> impl Future<Output = Result<Attrs, Self::Error>> + Send {
let this = self.clone();
async move {
let path = {
let guard = this.handles.read().await;
match guard.get(&handle) {
Some(HandleState::Read { path, .. }) => path.clone(),
Some(HandleState::Write { path, .. }) => path.clone(),
Some(HandleState::Dir { path, .. }) => path.clone(),
None => return Err(StatusCode::NoSuchFile),
}
};
let attrs = this.do_stat(path).await?;
Ok(Attrs { id, attrs })
}
}
fn opendir(&mut self, id: u32, path: String) -> impl Future<Output = Result<Handle, Self::Error>> + Send {
let this = self.clone();
async move {
debug!("SFTP Opendir START: path='{}'", path);
// Resolve relative path to absolute path
let resolved_path = this.resolve_path(&path).await;
debug!("SFTP Opendir - resolved path: '{}'", resolved_path);
// Handle root directory case - list all buckets
if resolved_path == "/" || resolved_path == "/." {
debug!("SFTP Opendir - listing root directory (all buckets)");
let action = S3Action::ListBuckets;
authorize_operation(&this.session_context, &action, "", None)
.await
.map_err(|_| StatusCode::PermissionDenied)?;
// List all buckets
let s3_client = this.create_s3_client().inspect_err(|&e| {
error!("SFTP Opendir - failed to create S3 client: {}", e);
})?;
let input = s3s::dto::ListBucketsInput::builder()
.build()
.map_err(|_| StatusCode::Failure)?;
let secret_key = &this.session_context.principal.user_identity.credentials.secret_key;
let output = s3_client.list_buckets(input, secret_key).await.map_err(|e| {
error!("SFTP Opendir - failed to list buckets: {}", e);
StatusCode::Failure
})?;
let mut files = Vec::new();
if let Some(buckets) = output.buckets {
for bucket in buckets {
if let Some(bucket_name) = bucket.name {
let mut attrs = FileAttributes::default();
attrs.set_dir(true);
attrs.permissions = Some(0o755);
files.push(File {
filename: bucket_name.clone(),
longname: format!("drwxr-xr-x 2 0 0 0 Dec 28 18:54 {}", bucket_name),
attrs,
});
}
}
}
let handle_id = this.generate_handle_id();
let mut guard = this.handles.write().await;
guard.insert(
handle_id.clone(),
HandleState::Dir {
path: "/".to_string(),
files,
offset: 0,
},
);
return Ok(Handle { id, handle: handle_id });
}
// Handle bucket directory listing
let (bucket, key_prefix) = this.parse_path(&resolved_path)?;
debug!("SFTP Opendir - bucket: '{}', key_prefix: {:?}", bucket, key_prefix);
let action = S3Action::ListBucket;
authorize_operation(&this.session_context, &action, &bucket, key_prefix.as_deref())
.await
.map_err(|_| StatusCode::PermissionDenied)?;
let mut builder = s3s::dto::ListObjectsV2Input::builder();
builder.set_bucket(bucket.clone());
let prefix = if let Some(ref p) = key_prefix {
path::retain_slash(p)
} else {
String::new()
};
if !prefix.is_empty() {
builder.set_prefix(Some(prefix));
}
builder.set_delimiter(Some("/".to_string()));
let s3_client = this.create_s3_client()?;
let input = builder.build().map_err(|_| StatusCode::Failure)?;
let mut files = Vec::new();
match s3_client.list_objects_v2(input).await {
Ok(output) => {
if let Some(prefixes) = output.common_prefixes {
for p in prefixes {
if let Some(prefix_str) = p.prefix {
let name = prefix_str
.trim_end_matches('/')
.split('/')
.next_back()
.unwrap_or("")
.to_string();
if !name.is_empty() {
let mut attrs = FileAttributes::default();
attrs.set_dir(true);
attrs.permissions = Some(0o755);
files.push(File {
filename: name.clone(),
longname: format!("drwxr-xr-x 1 rustfs rustfs 0 Jan 1 1970 {}", name),
attrs,
});
}
}
}
}
if let Some(contents) = output.contents {
for obj in contents {
if let Some(key) = obj.key {
if key.ends_with('/') {
continue;
}
let name = key.split('/').next_back().unwrap_or("").to_string();
let size = obj.size.unwrap_or(0) as u64;
let mut attrs = FileAttributes {
size: Some(size),
permissions: Some(0o644),
..Default::default()
};
if let Some(lm) = obj.last_modified {
let dt = time::OffsetDateTime::from(lm);
attrs.mtime = Some(dt.unix_timestamp() as u32);
}
files.push(File {
filename: name.clone(),
longname: format!("-rw-r--r-- 1 rustfs rustfs {} Jan 1 1970 {}", size, name),
attrs,
});
}
}
}
}
Err(e) => {
error!("S3 List failed: {}", e);
return Err(StatusCode::Failure);
}
}
let handle_id = this.generate_handle_id();
this.handles
.write()
.await
.insert(handle_id.clone(), HandleState::Dir { path, files, offset: 0 });
Ok(Handle { id, handle: handle_id })
}
}
fn readdir(&mut self, id: u32, handle: String) -> impl Future<Output = Result<Name, Self::Error>> + Send {
let this = self.clone();
async move {
let mut guard = this.handles.write().await;
if let Some(HandleState::Dir { files, offset, .. }) = guard.get_mut(&handle) {
debug!("SFTP Readdir - handle: {}, offset: {}, total files: {}", handle, offset, files.len());
for (i, f) in files.iter().enumerate() {
debug!("SFTP Readdir - file[{}]: filename='{}', longname='{}'", i, f.filename, f.longname);
}
if *offset >= files.len() {
debug!("SFTP Readdir - offset {} >= files length {}, returning empty", offset, files.len());
return Ok(Name { id, files: Vec::new() });
}
let chunk = files[*offset..].to_vec();
debug!("SFTP Readdir - returning {} files (offset {})", chunk.len(), offset);
*offset = files.len();
Ok(Name { id, files: chunk })
} else {
debug!("SFTP Readdir - handle '{}' not found or not a directory handle", handle);
Err(StatusCode::NoSuchFile)
}
}
}
fn remove(&mut self, id: u32, filename: String) -> impl Future<Output = Result<Status, Self::Error>> + Send {
let this = self.clone();
async move {
// Resolve relative path to absolute path
let resolved_filename = this.resolve_path(&filename).await;
let (bucket, key_opt) = this.parse_path(&resolved_filename)?;
if let Some(key) = key_opt {
// Delete object
let action = S3Action::DeleteObject;
authorize_operation(&this.session_context, &action, &bucket, Some(&key))
.await
.map_err(|_| StatusCode::PermissionDenied)?;
let input = DeleteObjectInput {
bucket,
key,
..Default::default()
};
let s3_client = this.create_s3_client()?;
s3_client.delete_object(input).await.map_err(|e| {
error!("SFTP REMOVE - failed to delete object: {}", e);
StatusCode::Failure
})?;
Ok(Status {
id,
status_code: StatusCode::Ok,
error_message: "Success".into(),
language_tag: "en".into(),
})
} else {
// Delete bucket - check if bucket is empty first
debug!("SFTP REMOVE - attempting to delete bucket: '{}'", bucket);
let action = S3Action::DeleteBucket;
authorize_operation(&this.session_context, &action, &bucket, None)
.await
.map_err(|_| StatusCode::PermissionDenied)?;
let s3_client = this.create_s3_client()?;
// Check if bucket is empty
let list_input = ListObjectsV2Input {
bucket: bucket.clone(),
max_keys: Some(1),
..Default::default()
};
match s3_client.list_objects_v2(list_input).await {
Ok(output) => {
if let Some(objects) = output.contents {
if !objects.is_empty() {
debug!("SFTP REMOVE - bucket '{}' is not empty, cannot delete", bucket);
return Ok(Status {
id,
status_code: StatusCode::Failure,
error_message: format!("Bucket '{}' is not empty", bucket),
language_tag: "en".into(),
});
}
}
}
Err(e) => {
debug!("SFTP REMOVE - failed to list objects: {}", e);
}
}
// Bucket is empty, delete it
let delete_bucket_input = DeleteBucketInput {
bucket: bucket.clone(),
..Default::default()
};
match s3_client.delete_bucket(delete_bucket_input).await {
Ok(_) => {
debug!("SFTP REMOVE - successfully deleted bucket: '{}'", bucket);
Ok(Status {
id,
status_code: StatusCode::Ok,
error_message: "Success".into(),
language_tag: "en".into(),
})
}
Err(e) => {
error!("SFTP REMOVE - failed to delete bucket '{}': {}", bucket, e);
Ok(Status {
id,
status_code: StatusCode::Failure,
error_message: format!("Failed to delete bucket: {}", e),
language_tag: "en".into(),
})
}
}
}
}
}
fn mkdir(
&mut self,
id: u32,
path: String,
_attrs: FileAttributes,
) -> impl Future<Output = Result<Status, Self::Error>> + Send {
let this = self.clone();
async move {
let (bucket, key_opt) = this.parse_path(&path)?;
if let Some(key) = key_opt {
// Create directory inside bucket
let dir_key = path::retain_slash(&key);
let action = S3Action::PutObject;
authorize_operation(&this.session_context, &action, &bucket, Some(&dir_key))
.await
.map_err(|_| StatusCode::PermissionDenied)?;
let s3_client = this.create_s3_client()?;
let empty_stream = futures::stream::empty::<Result<bytes::Bytes, std::io::Error>>();
let body = StreamingBlob::wrap(empty_stream);
let input = PutObjectInput {
bucket,
key: dir_key,
body: Some(body),
..Default::default()
};
match s3_client.put_object(input).await {
Ok(_) => Ok(Status {
id,
status_code: StatusCode::Ok,
error_message: "Directory created".into(),
language_tag: "en".into(),
}),
Err(e) => {
error!("SFTP Failed to create directory: {}", e);
Ok(Status {
id,
status_code: StatusCode::Failure,
error_message: format!("Failed to create directory: {}", e),
language_tag: "en".into(),
})
}
}
} else {
// Create bucket
debug!("SFTP mkdir - Creating bucket: '{}'", bucket);
let action = S3Action::CreateBucket;
authorize_operation(&this.session_context, &action, &bucket, None)
.await
.map_err(|_| StatusCode::PermissionDenied)?;
let s3_client = this.create_s3_client()?;
let input = s3s::dto::CreateBucketInput {
bucket,
..Default::default()
};
match s3_client.create_bucket(input).await {
Ok(_) => Ok(Status {
id,
status_code: StatusCode::Ok,
error_message: "Bucket created".into(),
language_tag: "en".into(),
}),
Err(e) => {
error!("SFTP Failed to create bucket: {}", e);
Ok(Status {
id,
status_code: StatusCode::Failure,
error_message: format!("Failed to create bucket: {}", e),
language_tag: "en".into(),
})
}
}
}
}
}
fn rmdir(&mut self, id: u32, path: String) -> impl Future<Output = Result<Status, Self::Error>> + Send {
self.remove(id, path)
}
fn realpath(&mut self, id: u32, path: String) -> impl Future<Output = Result<Name, Self::Error>> + Send {
let this = self.clone();
async move {
let resolved = this.resolve_path(&path).await;
debug!("SFTP Realpath - input: '{}', resolved: '{}'", path, resolved);
// Check if this path is a directory and get proper attributes
let attrs = this.do_stat(resolved.clone()).await.unwrap_or_else(|_| {
let mut default_attrs = FileAttributes::default();
// Assume it's a directory if stat fails (for root path)
default_attrs.set_dir(true);
default_attrs
});
Ok(Name {
id,
files: vec![File {
filename: resolved.clone(),
longname: format!(
"{:?} {:>4} {:>6} {:>6} {:>8} {} {}",
if attrs.is_dir() { "drwxr-xr-x" } else { "-rw-r--r--" },
1,
"rustfs",
"rustfs",
attrs.size.unwrap_or(0),
"Jan 1 1970",
resolved.split('/').next_back().unwrap_or(&resolved)
),
attrs,
}],
})
}
}
fn stat(&mut self, id: u32, path: String) -> impl Future<Output = Result<Attrs, Self::Error>> + Send {
let this = self.clone();
async move {
let attrs = this.do_stat(path).await?;
Ok(Attrs { id, attrs })
}
}
}

View File

@@ -0,0 +1,17 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! SFTP protocol implementation
pub mod handler;
pub mod server;

View File

@@ -0,0 +1,706 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::protocols::session::context::{Protocol as SessionProtocol, SessionContext};
use crate::protocols::session::principal::ProtocolPrincipal;
use crate::protocols::sftp::handler::SftpHandler;
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
use russh::ChannelId;
use russh::keys::{Algorithm, HashAlg, PrivateKey, PublicKey, PublicKeyBase64};
use russh::server::{Auth, Handler, Server as RusshServer, Session};
use ssh_key::Certificate;
use ssh_key::certificate::CertType;
use std::borrow::Cow;
use std::collections::HashMap;
use std::future::Future;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
const DEFAULT_ADDR: &str = "0.0.0.0:0";
const AUTH_SUFFIX_SVC: &str = "=svc";
const AUTH_SUFFIX_LDAP: &str = "=ldap";
const SSH_KEY_TYPE_RSA: &str = "ssh-rsa";
const SSH_KEY_TYPE_ED25519: &str = "ssh-ed25519";
const SSH_KEY_TYPE_ECDSA: &str = "ecdsa-";
const SFTP_SUBSYSTEM: &str = "sftp";
const CRITICAL_OPTION_SOURCE_ADDRESS: &str = "source-address";
const AUTH_FAILURE_DELAY_MS: u64 = 300;
const SFTP_BUFFER_SIZE: usize = 65536;
const SFTP_READ_BUF_SIZE: usize = 32 * 1024;
type ServerError = Box<dyn std::error::Error + Send + Sync>;
#[derive(Debug, Clone)]
pub struct SftpConfig {
pub bind_addr: SocketAddr,
pub require_key_auth: bool,
pub cert_file: Option<String>,
pub key_file: Option<String>,
pub authorized_keys_file: Option<String>,
}
#[derive(Clone)]
pub struct SftpServer {
config: SftpConfig,
key_pair: Arc<PrivateKey>,
trusted_certificates: Arc<Vec<ssh_key::PublicKey>>,
authorized_keys: Arc<Vec<String>>,
}
impl SftpServer {
pub fn new(config: SftpConfig) -> Result<Self, ServerError> {
let key_pair = if let Some(key_file) = &config.key_file {
let path = Path::new(key_file);
russh::keys::load_secret_key(path, None)?
} else {
warn!("No host key provided, generating random key (not recommended for production).");
let mut rng = rand::rngs::OsRng;
PrivateKey::random(&mut rng, Algorithm::Ed25519)?
};
let trusted_certificates = if let Some(cert_file) = &config.cert_file {
info!("Loading trusted CA certificates from: {}", cert_file);
load_trusted_certificates(cert_file)?
} else {
if config.require_key_auth {
warn!("Key auth required but no CA certs provided.");
}
Vec::new()
};
let authorized_keys = if let Some(auth_keys_file) = &config.authorized_keys_file {
info!("Loading authorized SSH public keys from: {}", auth_keys_file);
load_authorized_keys(auth_keys_file).unwrap_or_else(|e| {
error!("Failed to load authorized keys from {}: {}", auth_keys_file, e);
Vec::new()
})
} else {
info!("No authorized keys file provided, will use IAM for key validation.");
Vec::new()
};
info!("Loaded {} authorized SSH public key(s)", authorized_keys.len());
Ok(Self {
config,
key_pair: Arc::new(key_pair),
trusted_certificates: Arc::new(trusted_certificates),
authorized_keys: Arc::new(authorized_keys),
})
}
pub async fn start(&self, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>) -> Result<(), ServerError> {
info!("Starting SFTP server on {}", self.config.bind_addr);
let config = Arc::new(self.make_ssh_config());
let socket = tokio::net::TcpListener::bind(&self.config.bind_addr).await?;
let server_stub = self.clone();
loop {
tokio::select! {
accept_res = socket.accept() => {
match accept_res {
Ok((stream, addr)) => {
let config = config.clone();
let server_instance = server_stub.clone();
tokio::spawn(async move {
let handler = SftpConnectionHandler::new(addr, server_instance.trusted_certificates.clone(), server_instance.authorized_keys.clone());
if let Err(e) = russh::server::run_stream(config, stream, handler).await {
debug!("SFTP session closed from {}: {}", addr, e);
}
});
}
Err(e) => error!("Failed to accept SFTP connection: {}", e),
}
}
_ = shutdown_rx.recv() => {
info!("SFTP server shutting down");
break;
}
}
}
Ok(())
}
fn make_ssh_config(&self) -> russh::server::Config {
let mut config = russh::server::Config::default();
config.keys.push(self.key_pair.as_ref().clone());
config.preferred.key = Cow::Borrowed(&[
Algorithm::Ed25519,
Algorithm::Rsa { hash: None },
Algorithm::Rsa {
hash: Some(HashAlg::Sha256),
},
Algorithm::Rsa {
hash: Some(HashAlg::Sha512),
},
]);
config
}
pub fn config(&self) -> &SftpConfig {
&self.config
}
}
impl RusshServer for SftpServer {
type Handler = SftpConnectionHandler;
fn new_client(&mut self, peer_addr: Option<SocketAddr>) -> Self::Handler {
let addr = peer_addr.unwrap_or_else(|| DEFAULT_ADDR.parse().unwrap());
SftpConnectionHandler::new(addr, self.trusted_certificates.clone(), self.authorized_keys.clone())
}
}
struct ConnectionState {
client_ip: SocketAddr,
identity: Option<rustfs_policy::auth::UserIdentity>,
trusted_certificates: Arc<Vec<ssh_key::PublicKey>>,
authorized_keys: Arc<Vec<String>>,
sftp_channels: HashMap<ChannelId, mpsc::UnboundedSender<Vec<u8>>>,
}
#[derive(Clone)]
pub struct SftpConnectionHandler {
state: Arc<Mutex<ConnectionState>>,
}
impl SftpConnectionHandler {
fn new(client_ip: SocketAddr, trusted_certificates: Arc<Vec<ssh_key::PublicKey>>, authorized_keys: Arc<Vec<String>>) -> Self {
Self {
state: Arc::new(Mutex::new(ConnectionState {
client_ip,
identity: None,
trusted_certificates,
authorized_keys,
sftp_channels: HashMap::new(),
})),
}
}
}
impl Handler for SftpConnectionHandler {
type Error = ServerError;
fn auth_password(&mut self, user: &str, password: &str) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
let raw_user = user.to_string();
let password = password.to_string();
let state = self.state.clone();
async move {
use rustfs_credentials::Credentials as S3Credentials;
use rustfs_iam::get;
let (username, suffix) = parse_auth_username(&raw_user);
if let Some(s) = suffix {
debug!("Detected auth suffix '{}' for user '{}'", s, username);
}
let iam_sys = get().map_err(|e| format!("IAM system unavailable: {}", e))?;
let s3_creds = S3Credentials {
access_key: username.to_string(),
secret_key: password.clone(),
session_token: String::new(),
expiration: None,
status: String::new(),
parent_user: String::new(),
groups: None,
claims: None,
name: None,
description: None,
};
let (user_identity, is_valid) = iam_sys
.check_key(&s3_creds.access_key)
.await
.map_err(|e| format!("IAM check failed: {}", e))?;
if !is_valid {
warn!("Invalid AccessKey: {}", username);
tokio::time::sleep(std::time::Duration::from_millis(AUTH_FAILURE_DELAY_MS)).await;
return Ok(Auth::Reject {
proceed_with_methods: None,
partial_success: false,
});
}
if let Some(identity) = user_identity {
if identity.credentials.secret_key != s3_creds.secret_key {
warn!("Invalid SecretKey for user: {}", username);
tokio::time::sleep(std::time::Duration::from_millis(AUTH_FAILURE_DELAY_MS)).await;
return Ok(Auth::Reject {
proceed_with_methods: None,
partial_success: false,
});
}
{
let mut guard = state.lock().unwrap();
guard.identity = Some(identity);
}
debug!("User {} authenticated successfully via password", username);
Ok(Auth::Accept)
} else {
Ok(Auth::Reject {
proceed_with_methods: None,
partial_success: false,
})
}
}
}
fn auth_publickey(&mut self, user: &str, key: &PublicKey) -> impl Future<Output = Result<Auth, Self::Error>> + Send {
let raw_user = user.to_string();
let key = key.clone();
let state = self.state.clone();
async move {
debug!("SFTP public key auth request for user: {}", raw_user);
let trusted_cas = {
let guard = state.lock().unwrap();
guard.trusted_certificates.clone()
};
if !trusted_cas.is_empty() {
match validate_ssh_certificate(&key, &trusted_cas, &raw_user) {
Ok(true) => {
let (username, _) = parse_auth_username(&raw_user);
use rustfs_iam::get;
let iam_sys = get().map_err(|e| format!("IAM system unavailable: {}", e))?;
let (user_identity, is_valid) = iam_sys
.check_key(username)
.await
.map_err(|e| format!("IAM lookup error: {}", e))?;
if is_valid && user_identity.is_some() {
{
let mut guard = state.lock().unwrap();
guard.identity = user_identity;
}
info!("User {} authenticated via SSH certificate", username);
Ok(Auth::Accept)
} else {
warn!("Valid certificate presented, but user '{}' does not exist in IAM", username);
Ok(Auth::Reject {
proceed_with_methods: None,
partial_success: false,
})
}
}
Ok(false) => Ok(Auth::Reject {
proceed_with_methods: None,
partial_success: false,
}),
Err(e) => {
error!("SSH certificate validation error: {}", e);
Ok(Auth::Reject {
proceed_with_methods: None,
partial_success: false,
})
}
}
} else {
let (username, _) = parse_auth_username(&raw_user);
use russh::keys::PublicKeyBase64;
let client_key_bytes = key.public_key_bytes();
let client_key_openssh = BASE64.encode(&client_key_bytes);
let authorized_keys_clone = {
let guard = state.lock().unwrap();
guard.authorized_keys.clone()
};
if !authorized_keys_clone.is_empty() {
debug!("Checking against {} pre-loaded authorized key(s)", authorized_keys_clone.len());
for authorized_key in authorized_keys_clone.iter() {
if authorized_key.contains(&client_key_openssh)
|| authorized_key == &client_key_openssh
|| compare_keys(authorized_key, &client_key_openssh)
{
use rustfs_iam::get;
if let Ok(iam_sys) = get() {
match iam_sys.check_key(username).await {
Ok((user_identity, is_valid)) => {
if is_valid && user_identity.is_some() {
let mut guard = state.lock().unwrap();
guard.identity = user_identity;
info!("User {} authenticated via pre-loaded authorized key (IAM verified)", username);
return Ok(Auth::Accept);
}
}
Err(e) => {
error!("IAM lookup error: {}", e);
}
}
}
warn!(
"Key matched pre-loaded authorized keys, but IAM verification failed for user '{}'",
username
);
}
}
}
use rustfs_iam::get;
match get() {
Ok(iam_sys) => match iam_sys.check_key(username).await {
Ok((user_identity, is_valid)) => {
if is_valid {
if let Some(identity) = user_identity {
let authorized_keys = identity.get_ssh_public_keys();
if authorized_keys.is_empty() {
warn!("User '{}' found in IAM but has no SSH public keys registered", username);
return Ok(Auth::Reject {
proceed_with_methods: None,
partial_success: false,
});
}
let key_valid = authorized_keys.iter().any(|authorized_key| {
authorized_key.contains(&client_key_openssh)
|| authorized_key == &client_key_openssh
|| compare_keys(authorized_key, &client_key_openssh)
});
if key_valid {
{
let mut guard = state.lock().unwrap();
guard.identity = Some(identity);
}
info!("User {} authenticated via public key from IAM", username);
Ok(Auth::Accept)
} else {
warn!("Public key auth failed: client key not in IAM for user '{}'", username);
Ok(Auth::Reject {
proceed_with_methods: None,
partial_success: false,
})
}
} else {
warn!("Public key auth failed: user '{}' not found in IAM", username);
Ok(Auth::Reject {
proceed_with_methods: None,
partial_success: false,
})
}
} else {
warn!("Public key auth failed: user '{}' not valid in IAM", username);
Ok(Auth::Reject {
proceed_with_methods: None,
partial_success: false,
})
}
}
Err(e) => {
error!("IAM lookup error: {}", e);
Ok(Auth::Reject {
proceed_with_methods: None,
partial_success: false,
})
}
},
Err(e) => {
error!("IAM system unavailable: {}", e);
Ok(Auth::Reject {
proceed_with_methods: None,
partial_success: false,
})
}
}
}
}
}
async fn channel_open_session(
&mut self,
_channel: russh::Channel<russh::server::Msg>,
_session: &mut Session,
) -> Result<bool, Self::Error> {
Ok(true)
}
fn data(
&mut self,
channel_id: ChannelId,
data: &[u8],
_session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
let state = self.state.clone();
let data = data.to_vec();
async move {
let sender = {
let guard = state.lock().unwrap();
guard.sftp_channels.get(&channel_id).cloned()
};
if let Some(tx) = sender {
let _ = tx.send(data);
}
Ok(())
}
}
fn subsystem_request(
&mut self,
channel_id: ChannelId,
name: &str,
session: &mut Session,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
let name = name.to_string();
let state = self.state.clone();
let session_handle = session.handle();
async move {
if name == SFTP_SUBSYSTEM {
let (identity, client_ip) = {
let guard = state.lock().unwrap();
if let Some(id) = &guard.identity {
(id.clone(), guard.client_ip)
} else {
error!("SFTP subsystem requested but user not authenticated");
return Ok(());
}
};
debug!("Initializing SFTP subsystem for user: {}", identity.credentials.access_key);
let context =
SessionContext::new(ProtocolPrincipal::new(Arc::new(identity)), SessionProtocol::Sftp, client_ip.ip());
let (client_pipe, server_pipe) = tokio::io::duplex(SFTP_BUFFER_SIZE);
let (mut client_read, mut client_write) = tokio::io::split(client_pipe);
let (tx, mut rx) = mpsc::unbounded_channel::<Vec<u8>>();
{
let mut guard = state.lock().unwrap();
guard.sftp_channels.insert(channel_id, tx);
}
tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
while let Some(data) = rx.recv().await {
if let Err(e) = client_write.write_all(&data).await {
debug!("SFTP input pipe closed: {}", e);
break;
}
}
});
let sftp_handler = SftpHandler::new(context);
tokio::spawn(async move {
russh_sftp::server::run(server_pipe, sftp_handler).await;
debug!("SFTP handler finished");
});
let session_handle = session_handle.clone();
tokio::spawn(async move {
use tokio::io::AsyncReadExt;
let mut buf = vec![0u8; SFTP_READ_BUF_SIZE];
loop {
match client_read.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
let data: Vec<u8> = buf[..n].to_vec();
if session_handle.data(channel_id, data.into()).await.is_err() {
break;
}
}
Err(e) => {
error!("Error reading from SFTP output: {}", e);
break;
}
}
}
let _ = session_handle.close(channel_id).await;
});
}
Ok(())
}
}
}
fn load_trusted_certificates(ca_cert_path: &str) -> Result<Vec<ssh_key::PublicKey>, ServerError> {
let path = Path::new(ca_cert_path);
if !path.exists() {
return Err(format!("CA certificate file not found: {}", ca_cert_path).into());
}
let contents = std::fs::read_to_string(path)?;
let mut keys = Vec::new();
for line in contents.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
match ssh_key::PublicKey::from_openssh(line) {
Ok(key) => keys.push(key),
Err(e) => warn!("Skipping invalid CA key line in {}: {}", ca_cert_path, e),
}
}
info!("Loaded {} trusted CA certificates from {}", keys.len(), ca_cert_path);
Ok(keys)
}
fn load_authorized_keys(auth_keys_path: &str) -> Result<Vec<String>, ServerError> {
let path = Path::new(auth_keys_path);
if !path.exists() {
return Err(format!("Authorized keys file not found: {}", auth_keys_path).into());
}
let contents = std::fs::read_to_string(path)?;
let mut keys = Vec::new();
for line in contents.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if line.starts_with(SSH_KEY_TYPE_RSA) || line.starts_with(SSH_KEY_TYPE_ED25519) || line.starts_with(SSH_KEY_TYPE_ECDSA) {
keys.push(line.to_string());
} else {
warn!(
"Skipping invalid authorized key line in {}: doesn't start with valid key type",
auth_keys_path
);
}
}
info!("Loaded {} authorized SSH public keys from {}", keys.len(), auth_keys_path);
Ok(keys)
}
fn parse_auth_username(username: &str) -> (&str, Option<&str>) {
if let Some(idx) = username.rfind('=') {
let suffix = &username[idx..];
if suffix == AUTH_SUFFIX_SVC || suffix == AUTH_SUFFIX_LDAP {
return (&username[..idx], Some(suffix));
}
}
(username, None)
}
fn validate_ssh_certificate(
russh_key: &PublicKey,
trusted_cas: &[ssh_key::PublicKey],
raw_username: &str,
) -> Result<bool, ServerError> {
let (username, _suffix) = parse_auth_username(raw_username);
let key_bytes = russh_key.public_key_bytes();
let cert = match Certificate::from_bytes(&key_bytes) {
Ok(c) => c,
Err(_) => {
debug!("Provided key is not a certificate. Skipping cert validation.");
return Ok(false);
}
};
debug!("Verifying SSH Certificate: KeyID='{}', Serial={}", cert.comment(), cert.serial());
let mut signature_valid = false;
let signature_key = cert.signature_key();
for ca in trusted_cas {
if ca.key_data() == signature_key {
signature_valid = true;
debug!("Certificate signed by trusted CA: {}", ca.fingerprint(Default::default()));
break;
}
}
if !signature_valid {
warn!("Certificate signer not found in trusted CAs");
return Ok(false);
}
let now = SystemTime::now();
let valid_after = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(cert.valid_after());
let valid_before = SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(cert.valid_before());
if now < valid_after {
warn!("Certificate is not yet valid (valid after {:?})", valid_after);
return Ok(false);
}
if now > valid_before {
warn!("Certificate has expired (valid until {:?})", valid_before);
return Ok(false);
}
if !cert.valid_principals().contains(&username.to_string()) {
warn!(
"Certificate does not authorize user '{}'. Principals: {:?}",
username,
cert.valid_principals()
);
return Ok(false);
}
match cert.cert_type() {
CertType::User => {}
_ => {
warn!("Certificate is not a User certificate");
return Ok(false);
}
}
for (name, _value) in cert.critical_options().iter() {
if name.as_str() == CRITICAL_OPTION_SOURCE_ADDRESS {
} else {
warn!("Rejecting certificate due to unsupported critical option: {}", name);
return Ok(false);
}
}
info!("SSH Certificate validation successful for user '{}'", username);
Ok(true)
}
fn compare_keys(stored_key: &str, client_key_base64: &str) -> bool {
let stored_key_parts: Vec<&str> = stored_key.split_whitespace().collect();
if stored_key_parts.is_empty() {
return false;
}
let stored_key_data = stored_key_parts.get(1).unwrap_or(&stored_key);
if *stored_key_data == client_key_base64 {
return true;
}
if let Ok(stored_bytes) = BASE64.decode(stored_key_data) {
if let Ok(client_bytes) = BASE64.decode(client_key_base64) {
return stored_bytes == client_bytes;
}
}
false
}