feat: auto-extract support

This commit is contained in:
weisd
2025-05-08 15:44:28 +08:00
parent 29ddf4dbc8
commit 76fdefeca4
8 changed files with 369 additions and 35 deletions

60
Cargo.lock generated
View File

@@ -3270,6 +3270,18 @@ dependencies = [
"rustc_version",
]
[[package]]
name = "filetime"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586"
dependencies = [
"cfg-if",
"libc",
"libredox",
"windows-sys 0.59.0",
]
[[package]]
name = "fixedbitset"
version = "0.5.7"
@@ -4779,6 +4791,7 @@ checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d"
dependencies = [
"bitflags 2.9.0",
"libc",
"redox_syscall 0.5.11",
]
[[package]]
@@ -6894,6 +6907,15 @@ dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.5.11"
@@ -7289,6 +7311,7 @@ dependencies = [
"tokio",
"tokio-rustls 0.26.2",
"tokio-stream",
"tokio-tar",
"tokio-util",
"tonic 0.13.1",
"tonic-build",
@@ -7297,6 +7320,7 @@ dependencies = [
"tracing",
"transform-stream",
"uuid",
"zip",
]
[[package]]
@@ -8693,6 +8717,21 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tar"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5714c010ca3e5c27114c1cdeb9d14641ace49874aa5626d7149e47aedace75"
dependencies = [
"filetime",
"futures-core",
"libc",
"redox_syscall 0.3.5",
"tokio",
"tokio-stream",
"xattr",
]
[[package]]
name = "tokio-util"
version = "0.7.15"
@@ -10213,6 +10252,16 @@ dependencies = [
"pkg-config",
]
[[package]]
name = "xattr"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d65cbf2f12c15564212d48f4e3dfb87923d25d611f2aed18f4cb23f0413d89e"
dependencies = [
"libc",
"rustix 1.0.5",
]
[[package]]
name = "xdg-home"
version = "1.3.0"
@@ -10476,6 +10525,17 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "zip"
version = "0.0.1"
dependencies = [
"async-compression",
"tokio",
"tokio-stream",
"tokio-tar",
"xz2",
]
[[package]]
name = "zstd"
version = "0.13.3"

View File

@@ -1,23 +1,24 @@
[workspace]
members = [
"appauth", # Application authentication and authorization
"cli/rustfs-gui", # Graphical user interface client
"common/common", # Shared utilities and data structures
"common/lock", # Distributed locking implementation
"common/protos", # Protocol buffer definitions
"common/workers", # Worker thread pools and task scheduling
"crates/config", # Configuration management
"appauth", # Application authentication and authorization
"cli/rustfs-gui", # Graphical user interface client
"common/common", # Shared utilities and data structures
"common/lock", # Distributed locking implementation
"common/protos", # Protocol buffer definitions
"common/workers", # Worker thread pools and task scheduling
"crates/config", # Configuration management
"crates/event-notifier", # Event notification system
"crates/obs", # Observability utilities
"crates/utils", # Utility functions and helpers
"crypto", # Cryptography and security features
"ecstore", # Erasure coding storage implementation
"e2e_test", # End-to-end test suite
"iam", # Identity and Access Management
"madmin", # Management dashboard and admin API interface
"rustfs", # Core file system implementation
"s3select/api", # S3 Select API interface
"s3select/query", # S3 Select query engine
"crates/obs", # Observability utilities
"crates/utils", # Utility functions and helpers
"crypto", # Cryptography and security features
"ecstore", # Erasure coding storage implementation
"e2e_test", # End-to-end test suite
"iam", # Identity and Access Management
"madmin", # Management dashboard and admin API interface
"rustfs", # Core file system implementation
"s3select/api", # S3 Select API interface
"s3select/query", # S3 Select query engine
"crates/zip",
]
resolver = "2"
@@ -47,11 +48,13 @@ policy = { path = "./policy", version = "0.0.1" }
protos = { path = "./common/protos", version = "0.0.1" }
query = { path = "./s3select/query", version = "0.0.1" }
rustfs = { path = "./rustfs", version = "0.0.1" }
zip = { path = "./crates/zip", version = "0.0.1" }
rustfs-config = { path = "./crates/config", version = "0.0.1" }
rustfs-obs = { path = "crates/obs", version = "0.0.1" }
rustfs-event-notifier = { path = "crates/event-notifier", version = "0.0.1" }
rustfs-utils = { path = "crates/utils", version = "0.0.1" }
workers = { path = "./common/workers", version = "0.0.1" }
tokio-tar = "0.3.1"
atoi = "2.0.0"
async-recursion = "1.1.1"
async-trait = "0.1.88"
@@ -113,7 +116,9 @@ opentelemetry-appender-tracing = { version = "0.29.1", features = [
opentelemetry_sdk = { version = "0.29.0" }
opentelemetry-stdout = { version = "0.29.0" }
opentelemetry-otlp = { version = "0.29.0" }
opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] }
opentelemetry-semantic-conventions = { version = "0.29.0", features = [
"semconv_experimental",
] }
parking_lot = "0.12.3"
pin-project-lite = "0.2.16"
# pin-utils = "0.1.0"
@@ -206,8 +211,8 @@ inherits = "dev"
opt-level = 3
lto = "fat"
codegen-units = 1
panic = "abort" # Optional, remove the panic expansion code
strip = true # strip symbol information to reduce binary size
panic = "abort" # Optional, remove the panic expansion code
strip = true # strip symbol information to reduce binary size
[profile.production]
inherits = "release"

28
crates/zip/Cargo.toml Normal file
View File

@@ -0,0 +1,28 @@
[package]
name = "zip"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
[dependencies]
async-compression = { version = "0.4.0", features = [
"tokio",
"bzip2",
"gzip",
"zlib",
"zstd",
"xz",
] }
# async_zip = { version = "0.0.17", features = ["tokio"] }
# rc-zip-tokio = "4.2.6"
tokio = { version = "1.45.0", features = ["full"] }
tokio-stream = "0.1.17"
tokio-tar = { workspace = true }
xz2 = { version = "0.1", optional = true, features = ["static"] }
[lints]
workspace = true

124
crates/zip/src/lib.rs Normal file
View File

@@ -0,0 +1,124 @@
use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZlibDecoder, ZstdDecoder};
use tokio::io::{self, AsyncRead, BufReader};
use tokio_stream::StreamExt;
use tokio_tar::Archive;
#[derive(Debug, PartialEq)]
pub enum CompressionFormat {
Gzip, //.gz
Bzip2, //.bz2
// Lz4, //.lz4
Zip,
Xz, //.xz
Zlib, //.z
Zstd, //.zst
Unknown,
}
impl CompressionFormat {
pub fn from_extension(ext: &str) -> Self {
match ext {
"gz" => CompressionFormat::Gzip,
"bz2" => CompressionFormat::Bzip2,
// "lz4" => CompressionFormat::Lz4,
"zip" => CompressionFormat::Zip,
"xz" => CompressionFormat::Xz,
"zlib" => CompressionFormat::Zlib,
"zst" => CompressionFormat::Zstd,
_ => CompressionFormat::Unknown,
}
}
pub fn get_decoder<R>(&self, input: R) -> io::Result<Box<dyn AsyncRead + Send + Unpin>>
where
R: AsyncRead + Send + Unpin + 'static,
{
let reader = BufReader::new(input);
let decoder: Box<dyn AsyncRead + Send + Unpin + 'static> = match self {
CompressionFormat::Gzip => Box::new(GzipDecoder::new(reader)),
CompressionFormat::Bzip2 => Box::new(BzDecoder::new(reader)),
// CompressionFormat::Lz4 => Box::new(Lz4Decoder::new(reader)),
CompressionFormat::Zlib => Box::new(ZlibDecoder::new(reader)),
CompressionFormat::Xz => Box::new(XzDecoder::new(reader)),
CompressionFormat::Zstd => Box::new(ZstdDecoder::new(reader)),
_ => return Err(io::Error::new(io::ErrorKind::InvalidInput, "Unsupported file format")),
};
Ok(decoder)
}
}
pub async fn decompress<R, F>(input: R, format: CompressionFormat, mut callback: F) -> io::Result<()>
where
R: AsyncRead + Send + Unpin + 'static,
F: AsyncFnMut(tokio_tar::Entry<Archive<Box<dyn AsyncRead + Send + Unpin + 'static>>>) -> std::io::Result<()> + Send + 'static,
{
// 打开输入文件
// println!("format {:?}", format);
let decoder = format.get_decoder(input)?;
// let reader: BufReader<R> = BufReader::new(input);
// // 根据文件扩展名选择解压器
// let decoder: Box<dyn AsyncRead + Send + Unpin> = match format {
// CompressionFormat::Gzip => Box::new(GzipDecoder::new(reader)),
// CompressionFormat::Bzip2 => Box::new(BzDecoder::new(reader)),
// // CompressionFormat::Lz4 => Box::new(Lz4Decoder::new(reader)),
// CompressionFormat::Zlib => Box::new(ZlibDecoder::new(reader)),
// CompressionFormat::Xz => Box::new(XzDecoder::new(reader)),
// CompressionFormat::Zstd => Box::new(ZstdDecoder::new(reader)),
// // CompressionFormat::Zip => Box::new(DeflateDecoder::new(reader)),
// _ => {
// return Err(io::Error::new(io::ErrorKind::InvalidInput, "Unsupported file format"));
// }
// };
let mut ar = Archive::new(decoder);
let mut entries = ar.entries().unwrap();
while let Some(entry) = entries.next().await {
let f = match entry {
Ok(f) => f,
Err(e) => {
println!("Error reading entry: {}", e);
return Err(e);
}
};
// println!("{}", f.path().unwrap().display());
callback(f).await?;
}
Ok(())
}
// #[tokio::test]
// async fn test_decompress() -> io::Result<()> {
// use std::path::Path;
// use tokio::fs::File;
// let input_path = "/Users/weisd/Downloads/wsd.tar.gz"; // 替换为你的压缩文件路径
// let f = File::open(input_path).await?;
// let Some(ext) = Path::new(input_path).extension().and_then(|s| s.to_str()) else {
// return Err(io::Error::new(io::ErrorKind::InvalidInput, "Unsupported file format"));
// };
// match decompress(
// f,
// CompressionFormat::from_extension(ext),
// |entry: tokio_tar::Entry<Archive<Box<dyn AsyncRead + Send + Unpin>>>| async move {
// let path = entry.path().unwrap();
// println!("Extracted: {}", path.display());
// Ok(())
// },
// )
// .await
// {
// Ok(_) => println!("解压成功!"),
// Err(e) => println!("解压失败: {}", e),
// }
// Ok(())
// }

View File

@@ -9,7 +9,6 @@ use crate::{
};
use common::error::{Error, Result};
use tokio::fs;
use tracing::info;
use super::error::{os_err_to_file_err, os_is_exist, DiskError};
@@ -137,7 +136,7 @@ pub async fn reliable_rename(
) -> io::Result<()> {
if let Some(parent) = dst_file_path.as_ref().parent() {
if !file_exists(parent).await {
info!("reliable_rename reliable_mkdir_all parent: {:?}", parent);
// info!("reliable_rename reliable_mkdir_all parent: {:?}", parent);
reliable_mkdir_all(parent, base_dir.as_ref()).await?;
}
}

View File

@@ -421,7 +421,7 @@ impl SetDisks {
let file_path = file_path.clone();
async move {
if let Some(disk) = disk {
match disk
(disk
.delete(
bucket,
&file_path,
@@ -430,11 +430,8 @@ impl SetDisks {
..Default::default()
},
)
.await
{
Ok(_) => None,
Err(e) => Some(e),
}
.await)
.err()
} else {
Some(Error::new(DiskError::DiskNotFound))
}

View File

@@ -15,6 +15,8 @@ path = "src/main.rs"
workspace = true
[dependencies]
zip = { workspace = true }
tokio-tar = { workspace = true }
madmin = { workspace = true }
api = { workspace = true }
appauth = { workspace = true }
@@ -77,7 +79,12 @@ tokio-stream.workspace = true
tonic = { workspace = true }
tower.workspace = true
transform-stream.workspace = true
tower-http = { workspace = true, features = ["trace", "compression-deflate", "compression-gzip", "cors"] }
tower-http = { workspace = true, features = [
"trace",
"compression-deflate",
"compression-gzip",
"cors",
] }
uuid = { workspace = true }
[target.'cfg(target_os = "linux")'.dependencies]

View File

@@ -4,6 +4,10 @@ use super::options::extract_metadata;
use super::options::put_opts;
use crate::auth::get_condition_values;
use crate::storage::access::ReqInfo;
use crate::storage::error::to_s3_error;
use crate::storage::options::copy_dst_opts;
use crate::storage::options::copy_src_opts;
use crate::storage::options::{extract_metadata_from_mime, get_opts};
use api::query::Context;
use api::query::Query;
use api::server::dbms::DatabaseManagerSystem;
@@ -61,10 +65,12 @@ use s3s::S3Result;
use s3s::S3;
use s3s::{S3Request, S3Response};
use std::fmt::Debug;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_tar::Archive;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tracing::debug;
@@ -73,11 +79,7 @@ use tracing::info;
use tracing::warn;
use transform_stream::AsyncTryStream;
use uuid::Uuid;
use crate::storage::error::to_s3_error;
use crate::storage::options::copy_dst_opts;
use crate::storage::options::copy_src_opts;
use crate::storage::options::{extract_metadata_from_mime, get_opts};
use zip::CompressionFormat;
macro_rules! try_ {
($result:expr) => {
@@ -107,6 +109,108 @@ impl FS {
// let store: ECStore = ECStore::new(address, endpoint_pools).await?;
Self {}
}
async fn put_object_extract(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
let PutObjectInput { body, bucket, key, .. } = req.input;
let Some(body) = body else { return Err(s3_error!(IncompleteBody)) };
let body = StreamReader::new(body.map(|f| f.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))));
// let etag_stream = EtagReader::new(body);
let Some(ext) = Path::new(&key).extension().and_then(|s| s.to_str()) else {
return Err(s3_error!(InvalidArgument, "key extension not found"));
};
let ext = ext.to_owned();
// TODO: spport zip
let decoder = CompressionFormat::from_extension(&ext).get_decoder(body).map_err(|e| {
error!("get_decoder err {:?}", e);
s3_error!(InvalidArgument, "get_decoder err")
})?;
let mut ar = Archive::new(decoder);
let mut entries = ar.entries().map_err(|e| {
error!("get entries err {:?}", e);
s3_error!(InvalidArgument, "get entries err")
})?;
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let prefix = req
.headers
.get("X-Amz-Meta-RustFs-Snowball-Prefix")
.map(|v| v.to_str().unwrap_or_default())
.unwrap_or_default();
while let Some(entry) = entries.next().await {
let f = match entry {
Ok(f) => f,
Err(e) => {
println!("Error reading entry: {}", e);
return Err(s3_error!(InvalidArgument, "Error reading entry {:?}", e));
}
};
if f.header().entry_type().is_dir() {
continue;
}
if let Ok(fpath) = f.path() {
let mut fpath = fpath.to_string_lossy().to_string();
if !prefix.is_empty() {
fpath = format!("{}/{}", prefix, fpath);
}
let size = f.header().size().unwrap_or_default() as usize;
println!("Extracted: {}, size {}", fpath, size);
let mut reader = PutObjReader::new(Box::new(f), size);
let _obj_info = store
.put_object(&bucket, &fpath, &mut reader, &ObjectOptions::default())
.await
.map_err(to_s3_error)?;
// let e_tag = obj_info.etag;
// // store.put_object(bucket, object, data, opts);
// let output = PutObjectOutput {
// e_tag,
// ..Default::default()
// };
}
}
// match decompress(
// body,
// CompressionFormat::from_extension(&ext),
// |entry: tokio_tar::Entry<tokio_tar::Archive<Box<dyn AsyncRead + Send + Unpin + 'static>>>| async move {
// let path = entry.path().unwrap();
// println!("Extracted: {}", path.display());
// Ok(())
// },
// )
// .await
// {
// Ok(_) => println!("解压成功!"),
// Err(e) => println!("解压失败: {}", e),
// }
// TODO: etag
let output = PutObjectOutput {
// e_tag: Some(etag_stream.etag().await),
..Default::default()
};
Ok(S3Response::new(output))
}
}
#[async_trait::async_trait]
impl S3 for FS {
@@ -409,6 +513,8 @@ impl S3 for FS {
..
} = req.input;
// TODO: getObjectInArchiveFileHandler object = xxx.zip/xxx/xxx.xxx
// let range = HTTPRangeSpec::nil();
let h = HeaderMap::new();
@@ -804,8 +910,16 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument(level = "debug", skip(self, req))]
// #[tracing::instrument(level = "debug", skip(self, req))]
async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
if req
.headers
.get("X-Amz-Meta-Snowball-Auto-Extract")
.is_some_and(|v| v.to_str().unwrap_or_default() == "true")
{
return self.put_object_extract(req).await;
}
let input = req.input;
if let Some(ref storage_class) = input.storage_class {