fix: fix FTPS/SFTP download issues and optimize S3Client caching (#1353)

Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: loverustfs <hello@rustfs.com>
This commit is contained in:
yxrxy
2026-01-04 17:28:18 +08:00
committed by GitHub
parent ffbcd3852f
commit 38c2d74d36
4 changed files with 133 additions and 125 deletions

154
Cargo.lock generated
View File

@@ -532,7 +532,7 @@ checksum = "3109e49b1e4909e9db6515a30c633684d68cdeaa252f215214cb4fa1a5bfee2c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
"synstructure 0.13.2",
]
@@ -544,7 +544,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -612,7 +612,7 @@ checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -634,7 +634,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -645,7 +645,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -671,7 +671,7 @@ checksum = "99e1aca718ea7b89985790c94aad72d77533063fe00bc497bb79a7c2dae6a661"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -1375,7 +1375,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -1712,7 +1712,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -1804,7 +1804,7 @@ checksum = "5c25c2a02ba19f2d4fd9f54d5f239f97c867deb7397763a9771edab63c44a4fa"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -2174,7 +2174,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -2242,7 +2242,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim 0.11.1",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -2256,7 +2256,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim 0.11.1",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -2269,7 +2269,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim 0.11.1",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -2291,7 +2291,7 @@ checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
dependencies = [
"darling_core 0.20.11",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -2302,7 +2302,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81"
dependencies = [
"darling_core 0.21.3",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -2313,7 +2313,7 @@ checksum = "ac3984ec7bd6cfa798e62b4a642426a5be0e68f9401cfc2a01e3fa9ea2fcdb8d"
dependencies = [
"darling_core 0.23.0",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -2811,7 +2811,7 @@ checksum = "1063ad4c9e094b3f798acee16d9a47bd7372d9699be2de21b05c3bd3f34ab848"
dependencies = [
"datafusion-doc",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -3007,7 +3007,7 @@ checksum = "780eb241654bf097afb00fc5f054a09b687dad862e485fdcf8399bb056565370"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -3074,7 +3074,7 @@ checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -3116,7 +3116,7 @@ dependencies = [
"darling 0.20.11",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -3136,7 +3136,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c"
dependencies = [
"derive_builder_core 0.20.2",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -3158,7 +3158,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustc_version",
"syn 2.0.112",
"syn 2.0.111",
"unicode-xid",
]
@@ -3230,7 +3230,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -3436,7 +3436,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -3448,7 +3448,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -3469,7 +3469,7 @@ dependencies = [
"darling 0.21.3",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -3508,7 +3508,7 @@ checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -3840,7 +3840,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -3949,7 +3949,7 @@ dependencies = [
"proc-macro-error2",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -4323,7 +4323,7 @@ dependencies = [
"proc-macro-error2",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -5070,7 +5070,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -5645,7 +5645,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -6483,7 +6483,7 @@ dependencies = [
"phf_shared 0.11.3",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -6521,7 +6521,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -6781,7 +6781,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b"
dependencies = [
"proc-macro2",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -6821,7 +6821,7 @@ dependencies = [
"proc-macro-error-attr2",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -6885,7 +6885,7 @@ dependencies = [
"pulldown-cmark",
"pulldown-cmark-to-cmark",
"regex",
"syn 2.0.112",
"syn 2.0.111",
"tempfile",
]
@@ -6899,7 +6899,7 @@ dependencies = [
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -6912,7 +6912,7 @@ dependencies = [
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -7252,7 +7252,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b"
dependencies = [
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -7313,7 +7313,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -7480,7 +7480,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde_json",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -7567,7 +7567,7 @@ dependencies = [
"regex",
"relative-path",
"rustc_version",
"syn 2.0.112",
"syn 2.0.111",
"unicode-ident",
]
@@ -7716,7 +7716,7 @@ dependencies = [
"quote",
"rust-embed-utils",
"shellexpand",
"syn 2.0.112",
"syn 2.0.111",
"walkdir",
]
@@ -8614,7 +8614,7 @@ checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984"
[[package]]
name = "s3s"
version = "0.13.0-alpha"
source = "git+https://github.com/s3s-project/s3s.git?branch=main#336ed86c77e99fd94fc7581a3a39ef246a63281b"
source = "git+https://github.com/s3s-project/s3s.git?branch=main#9e41304ed549b89cfb03ede98e9c0d2ac7522051"
dependencies = [
"arrayvec",
"async-trait",
@@ -8731,7 +8731,7 @@ dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -8867,7 +8867,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -8878,7 +8878,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -8963,7 +8963,7 @@ dependencies = [
"darling 0.21.3",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -8998,7 +8998,7 @@ checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -9253,7 +9253,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -9339,7 +9339,7 @@ checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -9503,7 +9503,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -9645,9 +9645,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.112"
version = "2.0.111"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21f182278bf2d2bcb3c88b1b08a37df029d71ce3d3ae26168e3c653b213b99d4"
checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87"
dependencies = [
"proc-macro2",
"quote",
@@ -9692,7 +9692,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -9791,7 +9791,7 @@ dependencies = [
"cfg-if",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -9802,7 +9802,7 @@ checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
"test-case-core",
]
@@ -9832,7 +9832,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -9843,7 +9843,7 @@ checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -9992,7 +9992,7 @@ checksum = "2d2e76690929402faae40aebdda620a2c0e25dd6d3b9afe48867dfd95991f4bd"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -10020,7 +10020,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -10183,7 +10183,7 @@ dependencies = [
"prettyplease",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -10208,7 +10208,7 @@ dependencies = [
"prost-build",
"prost-types",
"quote",
"syn 2.0.112",
"syn 2.0.111",
"tempfile",
"tonic-build",
]
@@ -10301,7 +10301,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -10559,7 +10559,7 @@ checksum = "39d11901c36b3650df7acb0f9ebe624f35b5ac4e1922ecd3c57f444648429594"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -10715,7 +10715,7 @@ dependencies = [
"bumpalo",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
"wasm-bindgen-shared",
]
@@ -10921,7 +10921,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -10932,7 +10932,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -11221,7 +11221,7 @@ dependencies = [
"darling 0.20.11",
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -11330,7 +11330,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
"synstructure 0.13.2",
]
@@ -11351,7 +11351,7 @@ checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -11371,7 +11371,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
"synstructure 0.13.2",
]
@@ -11392,7 +11392,7 @@ checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -11425,7 +11425,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.112",
"syn 2.0.111",
]
[[package]]
@@ -11464,9 +11464,9 @@ checksum = "40990edd51aae2c2b6907af74ffb635029d5788228222c4bb811e9351c0caad3"
[[package]]
name = "zmij"
version = "1.0.6"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aac060176f7020d62c3bcc1cdbcec619d54f48b07ad1963a3f80ce7a0c17755f"
checksum = "e9747e91771f56fd7893e1164abd78febd14a670ceec257caad15e051de35f06"
[[package]]
name = "zopfli"

View File

@@ -111,6 +111,16 @@ pub async fn test_ftps_core_operations() -> Result<()> {
ftp_stream.put_file(filename, &mut Cursor::new(content.as_bytes()))?;
info!("PASS: put file '{}' ({} bytes) successful", filename, content.len());
info!("Testing FTPS: download file");
let downloaded_content = ftp_stream.retr(filename, |stream| {
let mut buffer = Vec::new();
stream.read_to_end(&mut buffer).map_err(suppaftp::FtpError::ConnectionError)?;
Ok(buffer)
})?;
let downloaded_str = String::from_utf8(downloaded_content)?;
assert_eq!(downloaded_str, content, "Downloaded content should match uploaded content");
info!("PASS: download file '{}' successful, content matches", filename);
info!("Testing FTPS: ls list objects in bucket");
let list = ftp_stream.list(None)?;
assert!(list.iter().any(|line| line.contains(filename)), "File should appear in list");

View File

@@ -34,16 +34,20 @@ use s3s::dto::{GetObjectInput, PutObjectInput};
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use tokio::io::AsyncRead;
use tokio_util::io::StreamReader;
use tracing::{debug, error, info, trace};
/// FTPS storage driver implementation
#[derive(Debug)]
pub struct FtpsDriver {}
#[derive(Debug, Clone)]
pub struct FtpsDriver {
fs: crate::storage::ecfs::FS,
}
impl FtpsDriver {
/// Create a new FTPS driver
pub fn new() -> Self {
Self {}
let fs = crate::storage::ecfs::FS {};
Self { fs }
}
/// Validate FTP feature support
@@ -68,9 +72,7 @@ impl FtpsDriver {
/// Create ProtocolS3Client for the given user
fn create_s3_client_for_user(&self, user: &super::server::FtpsUser) -> Result<ProtocolS3Client> {
let session_context = &user.session_context;
let fs = crate::storage::ecfs::FS {};
let s3_client = ProtocolS3Client::new(fs, session_context.access_key().to_string());
let s3_client = ProtocolS3Client::new(self.fs.clone(), session_context.access_key().to_string());
Ok(s3_client)
}
@@ -455,31 +457,28 @@ impl StorageBackend<super::server::FtpsUser> for FtpsDriver {
let mut builder = GetObjectInput::builder();
builder.set_bucket(bucket);
builder.set_key(object_key);
let mut input = builder
if start_pos > 0
&& let Ok(range) = s3s::dto::Range::parse(&format!("bytes={}-", start_pos))
{
builder.set_range(Some(range));
}
let input = builder
.build()
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Failed to build GetObjectInput"))?;
if start_pos > 0 {
input.range = Some(
s3s::dto::Range::parse(&format!("bytes={}-", start_pos))
.map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Invalid range format"))?,
);
}
match s3_client.get_object(input).await {
Ok(output) => {
if let Some(body) = output.body {
// Map the s3s/Box<dyn StdError> error to std::io::Error
let stream = body.map_err(std::io::Error::other);
// Wrap the stream in StreamReader to make it a tokio::io::AsyncRead
let reader = tokio_util::io::StreamReader::new(stream);
let reader = StreamReader::new(stream);
Ok(Box::new(reader))
} else {
Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Empty object body"))
}
}
Err(e) => {
error!("Failed to get object: {}", e);
let protocol_error = map_s3_error_to_ftps(&e);
Err(Error::new(ErrorKind::PermanentFileNotAvailable, protocol_error))
}

View File

@@ -21,6 +21,7 @@ use futures::TryStreamExt;
use russh_sftp::protocol::{Attrs, Data, File, FileAttributes, Handle, Name, OpenFlags, Status, StatusCode, Version};
use russh_sftp::server::Handler;
use rustfs_utils::path;
use s3s::S3ErrorCode;
use s3s::dto::{DeleteBucketInput, DeleteObjectInput, GetObjectInput, ListObjectsV2Input, PutObjectInput, StreamingBlob};
use std::collections::HashMap;
use std::future::Future;
@@ -30,6 +31,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
use tokio::fs::{File as TokioFile, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::sync::RwLock;
use tokio_util::io::StreamReader;
use tracing::{debug, error, trace};
use uuid::Uuid;
@@ -74,24 +76,25 @@ pub struct SftpHandler {
next_handle_id: Arc<AtomicU32>,
temp_dir: PathBuf,
current_dir: Arc<RwLock<String>>,
fs: crate::storage::ecfs::FS,
}
impl SftpHandler {
pub fn new(session_context: SessionContext) -> Self {
let fs = crate::storage::ecfs::FS {};
Self {
session_context,
handles: Arc::new(RwLock::new(HashMap::new())),
next_handle_id: Arc::new(AtomicU32::new(INITIAL_HANDLE_ID)),
temp_dir: std::env::temp_dir(),
current_dir: Arc::new(RwLock::new(ROOT_PATH.to_string())),
fs,
}
}
fn create_s3_client(&self) -> Result<ProtocolS3Client, StatusCode> {
// Create FS instance (empty struct that accesses global ECStore)
let fs = crate::storage::ecfs::FS {};
let client = ProtocolS3Client::new(fs, self.session_context.access_key().to_string());
Ok(client)
fn create_s3_client(&self) -> ProtocolS3Client {
ProtocolS3Client::new(self.fs.clone(), self.session_context.access_key().to_string())
}
fn parse_path(&self, path_str: &str) -> Result<(String, Option<String>), StatusCode> {
@@ -188,7 +191,7 @@ impl SftpHandler {
.await
.map_err(|_| StatusCode::PermissionDenied)?;
let s3_client = self.create_s3_client()?;
let s3_client = self.create_s3_client();
match action {
S3Action::HeadBucket => {
@@ -365,13 +368,7 @@ impl Handler for SftpHandler {
return Err(StatusCode::Failure);
}
let s3_client = match this.create_s3_client() {
Ok(c) => c,
Err(e) => {
let _ = tokio::fs::remove_file(&temp_file_path).await;
return Err(e);
}
};
let s3_client = this.create_s3_client();
let stream = tokio_util::io::ReaderStream::new(file);
let body = StreamingBlob::wrap(stream);
@@ -427,31 +424,35 @@ impl Handler for SftpHandler {
}
};
let s3_client = this.create_s3_client();
let range_end = offset + (len as u64) - 1;
let mut builder = GetObjectInput::builder();
builder.set_bucket(bucket);
builder.set_key(key);
if let Ok(range) = s3s::dto::Range::parse(&format!("bytes={}-{}", offset, range_end)) {
if offset > 0
&& let Ok(range) = s3s::dto::Range::parse(&format!("bytes={}-{}", offset, range_end))
{
builder.set_range(Some(range));
}
let s3_client = this.create_s3_client()?;
let input = builder.build().map_err(|_| StatusCode::Failure)?;
match s3_client.get_object(input).await {
Ok(output) => {
let mut data = Vec::with_capacity(len as usize);
let mut data = Vec::new();
if let Some(body) = output.body {
let stream = body.map_err(std::io::Error::other);
let mut reader = tokio_util::io::StreamReader::new(stream);
let _ = reader.read_to_end(&mut data).await;
let mut reader = StreamReader::new(stream);
reader.read_to_end(&mut data).await.map_err(|_| StatusCode::Failure)?;
}
Ok(Data { id, data })
}
Err(e) => {
debug!("S3 Read failed: {}", e);
Ok(Data { id, data: Vec::new() })
}
Err(e) => match e.code() {
S3ErrorCode::InvalidRange => Err(StatusCode::Eof),
_ => Err(map_s3_error_to_sftp_status(&e)),
},
}
}
}
@@ -538,9 +539,7 @@ impl Handler for SftpHandler {
.map_err(|_| StatusCode::PermissionDenied)?;
// List all buckets
let s3_client = this.create_s3_client().inspect_err(|&e| {
error!("SFTP Opendir - failed to create S3 client: {}", e);
})?;
let s3_client = this.create_s3_client();
let input = s3s::dto::ListBucketsInput::builder()
.build()
@@ -604,7 +603,7 @@ impl Handler for SftpHandler {
}
builder.set_delimiter(Some("/".to_string()));
let s3_client = this.create_s3_client()?;
let s3_client = this.create_s3_client();
let input = builder.build().map_err(|_| StatusCode::Failure)?;
let mut files = Vec::new();
@@ -721,7 +720,7 @@ impl Handler for SftpHandler {
..Default::default()
};
let s3_client = this.create_s3_client()?;
let s3_client = this.create_s3_client();
s3_client.delete_object(input).await.map_err(|e| {
error!("SFTP REMOVE - failed to delete object: {}", e);
StatusCode::Failure
@@ -742,7 +741,7 @@ impl Handler for SftpHandler {
.await
.map_err(|_| StatusCode::PermissionDenied)?;
let s3_client = this.create_s3_client()?;
let s3_client = this.create_s3_client();
// Check if bucket is empty
let list_input = ListObjectsV2Input {
@@ -818,7 +817,7 @@ impl Handler for SftpHandler {
.await
.map_err(|_| StatusCode::PermissionDenied)?;
let s3_client = this.create_s3_client()?;
let s3_client = this.create_s3_client();
let empty_stream = futures::stream::empty::<Result<bytes::Bytes, std::io::Error>>();
let body = StreamingBlob::wrap(empty_stream);
let input = PutObjectInput {
@@ -854,7 +853,7 @@ impl Handler for SftpHandler {
.await
.map_err(|_| StatusCode::PermissionDenied)?;
let s3_client = this.create_s3_client()?;
let s3_client = this.create_s3_client();
let input = s3s::dto::CreateBucketInput {
bucket,
..Default::default()