From 76fdefeca432bad92e3f81b9fa8c018d34f166bf Mon Sep 17 00:00:00 2001 From: weisd Date: Thu, 8 May 2025 15:44:28 +0800 Subject: [PATCH] feat: auto-extract support --- Cargo.lock | 60 ++++++++++++++++++ Cargo.toml | 45 +++++++------ crates/zip/Cargo.toml | 28 +++++++++ crates/zip/src/lib.rs | 124 ++++++++++++++++++++++++++++++++++++ ecstore/src/disk/os.rs | 3 +- ecstore/src/set_disk.rs | 9 +-- rustfs/Cargo.toml | 9 ++- rustfs/src/storage/ecfs.rs | 126 +++++++++++++++++++++++++++++++++++-- 8 files changed, 369 insertions(+), 35 deletions(-) create mode 100644 crates/zip/Cargo.toml create mode 100644 crates/zip/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index f3407172..cce1d3bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3270,6 +3270,18 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "filetime" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" +dependencies = [ + "cfg-if", + "libc", + "libredox", + "windows-sys 0.59.0", +] + [[package]] name = "fixedbitset" version = "0.5.7" @@ -4779,6 +4791,7 @@ checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ "bitflags 2.9.0", "libc", + "redox_syscall 0.5.11", ] [[package]] @@ -6894,6 +6907,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.11" @@ -7289,6 +7311,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.2", "tokio-stream", + "tokio-tar", "tokio-util", "tonic 0.13.1", "tonic-build", @@ -7297,6 +7320,7 @@ dependencies = [ "tracing", "transform-stream", "uuid", + "zip", ] [[package]] @@ -8693,6 +8717,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tar" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5714c010ca3e5c27114c1cdeb9d14641ace49874aa5626d7149e47aedace75" +dependencies = [ + "filetime", + "futures-core", + "libc", + "redox_syscall 0.3.5", + "tokio", + "tokio-stream", + "xattr", +] + [[package]] name = "tokio-util" version = "0.7.15" @@ -10213,6 +10252,16 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "xattr" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d65cbf2f12c15564212d48f4e3dfb87923d25d611f2aed18f4cb23f0413d89e" +dependencies = [ + "libc", + "rustix 1.0.5", +] + [[package]] name = "xdg-home" version = "1.3.0" @@ -10476,6 +10525,17 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "zip" +version = "0.0.1" +dependencies = [ + "async-compression", + "tokio", + "tokio-stream", + "tokio-tar", + "xz2", +] + [[package]] name = "zstd" version = "0.13.3" diff --git a/Cargo.toml b/Cargo.toml index 645533ae..1d349f80 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,23 +1,24 @@ [workspace] members = [ - "appauth", # Application authentication and authorization - "cli/rustfs-gui", # Graphical user interface client - "common/common", # Shared utilities and data structures - "common/lock", # Distributed locking implementation - "common/protos", # Protocol buffer definitions - "common/workers", # Worker thread pools and task scheduling - "crates/config", # Configuration management + "appauth", # Application authentication and authorization + "cli/rustfs-gui", # Graphical user interface client + "common/common", # Shared utilities and data structures + "common/lock", # Distributed locking implementation + "common/protos", # Protocol buffer definitions + "common/workers", # Worker thread pools and task scheduling + "crates/config", # Configuration management "crates/event-notifier", # Event notification system - "crates/obs", # Observability utilities - "crates/utils", # Utility functions and helpers - "crypto", # Cryptography and security features - "ecstore", # Erasure coding storage implementation - "e2e_test", # End-to-end test suite - "iam", # Identity and Access Management - "madmin", # Management dashboard and admin API interface - "rustfs", # Core file system implementation - "s3select/api", # S3 Select API interface - "s3select/query", # S3 Select query engine + "crates/obs", # Observability utilities + "crates/utils", # Utility functions and helpers + "crypto", # Cryptography and security features + "ecstore", # Erasure coding storage implementation + "e2e_test", # End-to-end test suite + "iam", # Identity and Access Management + "madmin", # Management dashboard and admin API interface + "rustfs", # Core file system implementation + "s3select/api", # S3 Select API interface + "s3select/query", # S3 Select query engine + "crates/zip", ] resolver = "2" @@ -47,11 +48,13 @@ policy = { path = "./policy", version = "0.0.1" } protos = { path = "./common/protos", version = "0.0.1" } query = { path = "./s3select/query", version = "0.0.1" } rustfs = { path = "./rustfs", version = "0.0.1" } +zip = { path = "./crates/zip", version = "0.0.1" } rustfs-config = { path = "./crates/config", version = "0.0.1" } rustfs-obs = { path = "crates/obs", version = "0.0.1" } rustfs-event-notifier = { path = "crates/event-notifier", version = "0.0.1" } rustfs-utils = { path = "crates/utils", version = "0.0.1" } workers = { path = "./common/workers", version = "0.0.1" } +tokio-tar = "0.3.1" atoi = "2.0.0" async-recursion = "1.1.1" async-trait = "0.1.88" @@ -113,7 +116,9 @@ opentelemetry-appender-tracing = { version = "0.29.1", features = [ opentelemetry_sdk = { version = "0.29.0" } opentelemetry-stdout = { version = "0.29.0" } opentelemetry-otlp = { version = "0.29.0" } -opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] } +opentelemetry-semantic-conventions = { version = "0.29.0", features = [ + "semconv_experimental", +] } parking_lot = "0.12.3" pin-project-lite = "0.2.16" # pin-utils = "0.1.0" @@ -206,8 +211,8 @@ inherits = "dev" opt-level = 3 lto = "fat" codegen-units = 1 -panic = "abort" # Optional, remove the panic expansion code -strip = true # strip symbol information to reduce binary size +panic = "abort" # Optional, remove the panic expansion code +strip = true # strip symbol information to reduce binary size [profile.production] inherits = "release" diff --git a/crates/zip/Cargo.toml b/crates/zip/Cargo.toml new file mode 100644 index 00000000..a73f083b --- /dev/null +++ b/crates/zip/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "zip" +edition.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + + +[dependencies] +async-compression = { version = "0.4.0", features = [ + "tokio", + "bzip2", + "gzip", + "zlib", + "zstd", + "xz", +] } +# async_zip = { version = "0.0.17", features = ["tokio"] } +# rc-zip-tokio = "4.2.6" +tokio = { version = "1.45.0", features = ["full"] } +tokio-stream = "0.1.17" +tokio-tar = { workspace = true } +xz2 = { version = "0.1", optional = true, features = ["static"] } + + +[lints] +workspace = true diff --git a/crates/zip/src/lib.rs b/crates/zip/src/lib.rs new file mode 100644 index 00000000..0da854c5 --- /dev/null +++ b/crates/zip/src/lib.rs @@ -0,0 +1,124 @@ +use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZlibDecoder, ZstdDecoder}; +use tokio::io::{self, AsyncRead, BufReader}; +use tokio_stream::StreamExt; +use tokio_tar::Archive; + +#[derive(Debug, PartialEq)] +pub enum CompressionFormat { + Gzip, //.gz + Bzip2, //.bz2 + // Lz4, //.lz4 + Zip, + Xz, //.xz + Zlib, //.z + Zstd, //.zst + Unknown, +} + +impl CompressionFormat { + pub fn from_extension(ext: &str) -> Self { + match ext { + "gz" => CompressionFormat::Gzip, + "bz2" => CompressionFormat::Bzip2, + // "lz4" => CompressionFormat::Lz4, + "zip" => CompressionFormat::Zip, + "xz" => CompressionFormat::Xz, + "zlib" => CompressionFormat::Zlib, + "zst" => CompressionFormat::Zstd, + _ => CompressionFormat::Unknown, + } + } + + pub fn get_decoder(&self, input: R) -> io::Result> + where + R: AsyncRead + Send + Unpin + 'static, + { + let reader = BufReader::new(input); + + let decoder: Box = match self { + CompressionFormat::Gzip => Box::new(GzipDecoder::new(reader)), + CompressionFormat::Bzip2 => Box::new(BzDecoder::new(reader)), + // CompressionFormat::Lz4 => Box::new(Lz4Decoder::new(reader)), + CompressionFormat::Zlib => Box::new(ZlibDecoder::new(reader)), + CompressionFormat::Xz => Box::new(XzDecoder::new(reader)), + CompressionFormat::Zstd => Box::new(ZstdDecoder::new(reader)), + _ => return Err(io::Error::new(io::ErrorKind::InvalidInput, "Unsupported file format")), + }; + + Ok(decoder) + } +} + +pub async fn decompress(input: R, format: CompressionFormat, mut callback: F) -> io::Result<()> +where + R: AsyncRead + Send + Unpin + 'static, + F: AsyncFnMut(tokio_tar::Entry>>) -> std::io::Result<()> + Send + 'static, +{ + // 打开输入文件 + // println!("format {:?}", format); + + let decoder = format.get_decoder(input)?; + + // let reader: BufReader = BufReader::new(input); + + // // 根据文件扩展名选择解压器 + // let decoder: Box = match format { + // CompressionFormat::Gzip => Box::new(GzipDecoder::new(reader)), + // CompressionFormat::Bzip2 => Box::new(BzDecoder::new(reader)), + // // CompressionFormat::Lz4 => Box::new(Lz4Decoder::new(reader)), + // CompressionFormat::Zlib => Box::new(ZlibDecoder::new(reader)), + // CompressionFormat::Xz => Box::new(XzDecoder::new(reader)), + // CompressionFormat::Zstd => Box::new(ZstdDecoder::new(reader)), + // // CompressionFormat::Zip => Box::new(DeflateDecoder::new(reader)), + // _ => { + // return Err(io::Error::new(io::ErrorKind::InvalidInput, "Unsupported file format")); + // } + // }; + + let mut ar = Archive::new(decoder); + let mut entries = ar.entries().unwrap(); + while let Some(entry) = entries.next().await { + let f = match entry { + Ok(f) => f, + Err(e) => { + println!("Error reading entry: {}", e); + return Err(e); + } + }; + // println!("{}", f.path().unwrap().display()); + callback(f).await?; + } + + Ok(()) +} + +// #[tokio::test] +// async fn test_decompress() -> io::Result<()> { +// use std::path::Path; +// use tokio::fs::File; + +// let input_path = "/Users/weisd/Downloads/wsd.tar.gz"; // 替换为你的压缩文件路径 + +// let f = File::open(input_path).await?; + +// let Some(ext) = Path::new(input_path).extension().and_then(|s| s.to_str()) else { +// return Err(io::Error::new(io::ErrorKind::InvalidInput, "Unsupported file format")); +// }; + +// match decompress( +// f, +// CompressionFormat::from_extension(ext), +// |entry: tokio_tar::Entry>>| async move { +// let path = entry.path().unwrap(); +// println!("Extracted: {}", path.display()); +// Ok(()) +// }, +// ) +// .await +// { +// Ok(_) => println!("解压成功!"), +// Err(e) => println!("解压失败: {}", e), +// } + +// Ok(()) +// } diff --git a/ecstore/src/disk/os.rs b/ecstore/src/disk/os.rs index ae88611a..579bd538 100644 --- a/ecstore/src/disk/os.rs +++ b/ecstore/src/disk/os.rs @@ -9,7 +9,6 @@ use crate::{ }; use common::error::{Error, Result}; use tokio::fs; -use tracing::info; use super::error::{os_err_to_file_err, os_is_exist, DiskError}; @@ -137,7 +136,7 @@ pub async fn reliable_rename( ) -> io::Result<()> { if let Some(parent) = dst_file_path.as_ref().parent() { if !file_exists(parent).await { - info!("reliable_rename reliable_mkdir_all parent: {:?}", parent); + // info!("reliable_rename reliable_mkdir_all parent: {:?}", parent); reliable_mkdir_all(parent, base_dir.as_ref()).await?; } } diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index abf7422a..ca2220b3 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -421,7 +421,7 @@ impl SetDisks { let file_path = file_path.clone(); async move { if let Some(disk) = disk { - match disk + (disk .delete( bucket, &file_path, @@ -430,11 +430,8 @@ impl SetDisks { ..Default::default() }, ) - .await - { - Ok(_) => None, - Err(e) => Some(e), - } + .await) + .err() } else { Some(Error::new(DiskError::DiskNotFound)) } diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 7f7ec690..3023a9ca 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -15,6 +15,8 @@ path = "src/main.rs" workspace = true [dependencies] +zip = { workspace = true } +tokio-tar = { workspace = true } madmin = { workspace = true } api = { workspace = true } appauth = { workspace = true } @@ -77,7 +79,12 @@ tokio-stream.workspace = true tonic = { workspace = true } tower.workspace = true transform-stream.workspace = true -tower-http = { workspace = true, features = ["trace", "compression-deflate", "compression-gzip", "cors"] } +tower-http = { workspace = true, features = [ + "trace", + "compression-deflate", + "compression-gzip", + "cors", +] } uuid = { workspace = true } [target.'cfg(target_os = "linux")'.dependencies] diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index fd7b4c0c..418ecf07 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -4,6 +4,10 @@ use super::options::extract_metadata; use super::options::put_opts; use crate::auth::get_condition_values; use crate::storage::access::ReqInfo; +use crate::storage::error::to_s3_error; +use crate::storage::options::copy_dst_opts; +use crate::storage::options::copy_src_opts; +use crate::storage::options::{extract_metadata_from_mime, get_opts}; use api::query::Context; use api::query::Query; use api::server::dbms::DatabaseManagerSystem; @@ -61,10 +65,12 @@ use s3s::S3Result; use s3s::S3; use s3s::{S3Request, S3Response}; use std::fmt::Debug; +use std::path::Path; use std::str::FromStr; use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use tokio_tar::Archive; use tokio_util::io::ReaderStream; use tokio_util::io::StreamReader; use tracing::debug; @@ -73,11 +79,7 @@ use tracing::info; use tracing::warn; use transform_stream::AsyncTryStream; use uuid::Uuid; - -use crate::storage::error::to_s3_error; -use crate::storage::options::copy_dst_opts; -use crate::storage::options::copy_src_opts; -use crate::storage::options::{extract_metadata_from_mime, get_opts}; +use zip::CompressionFormat; macro_rules! try_ { ($result:expr) => { @@ -107,6 +109,108 @@ impl FS { // let store: ECStore = ECStore::new(address, endpoint_pools).await?; Self {} } + + async fn put_object_extract(&self, req: S3Request) -> S3Result> { + let PutObjectInput { body, bucket, key, .. } = req.input; + + let Some(body) = body else { return Err(s3_error!(IncompleteBody)) }; + + let body = StreamReader::new(body.map(|f| f.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string())))); + + // let etag_stream = EtagReader::new(body); + + let Some(ext) = Path::new(&key).extension().and_then(|s| s.to_str()) else { + return Err(s3_error!(InvalidArgument, "key extension not found")); + }; + + let ext = ext.to_owned(); + + // TODO: spport zip + let decoder = CompressionFormat::from_extension(&ext).get_decoder(body).map_err(|e| { + error!("get_decoder err {:?}", e); + s3_error!(InvalidArgument, "get_decoder err") + })?; + + let mut ar = Archive::new(decoder); + let mut entries = ar.entries().map_err(|e| { + error!("get entries err {:?}", e); + s3_error!(InvalidArgument, "get entries err") + })?; + + let Some(store) = new_object_layer_fn() else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); + }; + + let prefix = req + .headers + .get("X-Amz-Meta-RustFs-Snowball-Prefix") + .map(|v| v.to_str().unwrap_or_default()) + .unwrap_or_default(); + + while let Some(entry) = entries.next().await { + let f = match entry { + Ok(f) => f, + Err(e) => { + println!("Error reading entry: {}", e); + return Err(s3_error!(InvalidArgument, "Error reading entry {:?}", e)); + } + }; + + if f.header().entry_type().is_dir() { + continue; + } + + if let Ok(fpath) = f.path() { + let mut fpath = fpath.to_string_lossy().to_string(); + + if !prefix.is_empty() { + fpath = format!("{}/{}", prefix, fpath); + } + + let size = f.header().size().unwrap_or_default() as usize; + + println!("Extracted: {}, size {}", fpath, size); + + let mut reader = PutObjReader::new(Box::new(f), size); + + let _obj_info = store + .put_object(&bucket, &fpath, &mut reader, &ObjectOptions::default()) + .await + .map_err(to_s3_error)?; + + // let e_tag = obj_info.etag; + + // // store.put_object(bucket, object, data, opts); + + // let output = PutObjectOutput { + // e_tag, + // ..Default::default() + // }; + } + } + + // match decompress( + // body, + // CompressionFormat::from_extension(&ext), + // |entry: tokio_tar::Entry>>| async move { + // let path = entry.path().unwrap(); + // println!("Extracted: {}", path.display()); + // Ok(()) + // }, + // ) + // .await + // { + // Ok(_) => println!("解压成功!"), + // Err(e) => println!("解压失败: {}", e), + // } + + // TODO: etag + let output = PutObjectOutput { + // e_tag: Some(etag_stream.etag().await), + ..Default::default() + }; + Ok(S3Response::new(output)) + } } #[async_trait::async_trait] impl S3 for FS { @@ -409,6 +513,8 @@ impl S3 for FS { .. } = req.input; + // TODO: getObjectInArchiveFileHandler object = xxx.zip/xxx/xxx.xxx + // let range = HTTPRangeSpec::nil(); let h = HeaderMap::new(); @@ -804,8 +910,16 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument(level = "debug", skip(self, req))] + // #[tracing::instrument(level = "debug", skip(self, req))] async fn put_object(&self, req: S3Request) -> S3Result> { + if req + .headers + .get("X-Amz-Meta-Snowball-Auto-Extract") + .is_some_and(|v| v.to_str().unwrap_or_default() == "true") + { + return self.put_object_extract(req).await; + } + let input = req.input; if let Some(ref storage_class) = input.storage_class {