diff --git a/Cargo.lock b/Cargo.lock index 06a9e298..dd42ff16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1156,6 +1156,7 @@ dependencies = [ "clap", "ecstore", "futures", + "futures-util", "http", "hyper-util", "mime", diff --git a/Cargo.toml b/Cargo.toml index 8a7aae0f..9420e9b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,3 +26,4 @@ time = { version = "0.3.36", features = [ ] } async-trait = "0.1.80" tokio = { version = "1.38.0", features = ["fs"] } +futures-util = "0.3.30" diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 98028183..5e5414f0 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -22,7 +22,7 @@ tracing-error.workspace = true http.workspace = true bytes.workspace = true futures.workspace = true - +futures-util.workspace = true ecstore = { path = "../ecstore" } s3s = "0.10.0" diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 07be9c44..f6a02de3 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -10,6 +10,7 @@ use ecstore::store_api::PutObjReader; use ecstore::store_api::StorageAPI; use futures::pin_mut; use futures::{Stream, StreamExt}; +use futures_util::TryStreamExt; use http::HeaderMap; use s3s::dto::*; use s3s::s3_error; @@ -19,7 +20,9 @@ use s3s::S3Result; use s3s::S3; use s3s::{S3Request, S3Response}; use std::fmt::Debug; +use std::pin::Pin; use std::str::FromStr; +use std::task::Poll; use transform_stream::AsyncTryStream; use ecstore::error::Result; @@ -450,3 +453,15 @@ where Ok(()) }) } + +// Consumes this body object to return a bytes stream. +// pub fn into_bytes_stream(mut body: StreamingBlob) -> impl Stream> + Send + 'static { +// futures_util::stream::poll_fn(move |ctx| loop { +// match Pin::new(&mut body).poll_next(ctx) { +// Poll::Ready(Some(Ok(data))) => return Poll::Ready(Some(Ok(data))), +// Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(std::io::Error::new(std::io::ErrorKind::Other, err)))), +// Poll::Ready(None) => return Poll::Ready(None), +// Poll::Pending => return Poll::Pending, +// } +// }) +// }