rebase to main

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2025-03-31 06:32:05 +00:00
parent 9d9bc150f6
commit b950f61b70
8 changed files with 3202 additions and 903 deletions

View File

@@ -16,5 +16,7 @@ object_store = "0.11.2"
s3s.workspace = true
snafu = { workspace = true, features = ["backtrace"] }
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
transform-stream.workspace = true
url.workspace = true

View File

@@ -1,13 +1,14 @@
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use ecstore::io::READ_BUFFER_SIZE;
use ecstore::new_object_layer_fn;
use ecstore::store::ECStore;
use ecstore::store_api::ObjectIO;
use ecstore::store_api::ObjectOptions;
use ecstore::StorageAPI;
use futures::stream;
use futures::StreamExt;
use futures::pin_mut;
use futures::{Stream, StreamExt};
use futures_core::stream::BoxStream;
use http::HeaderMap;
use object_store::path::Path;
@@ -28,7 +29,9 @@ use s3s::s3_error;
use s3s::S3Result;
use std::ops::Range;
use std::sync::Arc;
use tokio_util::io::ReaderStream;
use tracing::info;
use transform_stream::AsyncTryStream;
#[derive(Debug)]
pub struct EcObjectStore {
@@ -76,16 +79,16 @@ impl ObjectStore for EcObjectStore {
source: "can not get object info".into(),
})?;
let stream = stream::unfold(reader.stream, |mut blob| async move {
match blob.next().await {
Some(Ok(chunk)) => {
let bytes = chunk;
Some((Ok(bytes), blob))
}
_ => None,
}
})
.boxed();
// let stream = stream::unfold(reader.stream, |mut blob| async move {
// match blob.next().await {
// Some(Ok(chunk)) => {
// let bytes = chunk;
// Some((Ok(bytes), blob))
// }
// _ => None,
// }
// })
// .boxed();
let meta = ObjectMeta {
location: location.clone(),
last_modified: Utc::now(),
@@ -96,7 +99,9 @@ impl ObjectStore for EcObjectStore {
let attributes = Attributes::default();
Ok(GetResult {
payload: object_store::GetResultPayload::Stream(stream),
payload: object_store::GetResultPayload::Stream(
bytes_stream(ReaderStream::with_capacity(reader.stream, READ_BUFFER_SIZE), reader.object_info.size).boxed(),
),
meta,
range: 0..reader.object_info.size,
attributes,
@@ -148,3 +153,25 @@ impl ObjectStore for EcObjectStore {
unimplemented!()
}
}
pub fn bytes_stream<S>(stream: S, content_length: usize) -> impl Stream<Item = Result<Bytes>> + Send + 'static
where
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
{
AsyncTryStream::<Bytes, o_Error, _>::new(|mut y| async move {
pin_mut!(stream);
let mut remaining: usize = content_length;
while let Some(result) = stream.next().await {
let mut bytes = result.map_err(|e| o_Error::Generic {
store: "",
source: Box::new(e),
})?;
if bytes.len() > remaining {
bytes.truncate(remaining);
}
remaining -= bytes.len();
y.yield_ok(bytes).await;
}
Ok(())
})
}