todo:Error

This commit is contained in:
weisd
2024-07-02 17:01:54 +08:00
parent ace3fbdfa0
commit 5eb6d52daf
6 changed files with 50 additions and 45 deletions

View File

@@ -1,7 +1,7 @@
use anyhow::Error;
use bytes::Bytes;
use futures::pin_mut;
use futures::stream::{Stream, StreamExt};
use s3s::StdError;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -11,7 +11,7 @@ pub type SyncBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + '
pub struct ChunkedStream {
/// inner
inner: AsyncTryStream<Bytes, Error, SyncBoxFuture<'static, Result<(), Error>>>,
inner: AsyncTryStream<Bytes, StdError, SyncBoxFuture<'static, Result<(), StdError>>>,
remaining_length: usize,
}
@@ -19,9 +19,9 @@ pub struct ChunkedStream {
impl ChunkedStream {
pub fn new<S>(body: S, content_length: usize, chunk_size: usize, need_padding: bool) -> Self
where
S: Stream<Item = Result<Bytes, Error>> + Send + Sync + 'static,
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
{
let inner = AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), Error>>>::new(|mut y| {
let inner = AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), StdError>>>::new(|mut y| {
#[allow(clippy::shadow_same)] // necessary for `pin_mut!`
Box::pin(async move {
pin_mut!(body);
@@ -88,9 +88,9 @@ impl ChunkedStream {
mut body: Pin<&mut S>,
prev_bytes: Bytes,
data_size: usize,
) -> Option<Result<(Vec<Bytes>, Bytes), Error>>
) -> Option<Result<(Vec<Bytes>, Bytes), StdError>>
where
S: Stream<Item = Result<Bytes, Error>> + Send + 'static,
S: Stream<Item = Result<Bytes, StdError>> + Send + 'static,
{
let mut bytes_buffer = Vec::new();
@@ -184,7 +184,7 @@ impl ChunkedStream {
Some(Ok((bytes_buffer, remaining_bytes)))
}
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, StdError>>> {
let ans = Pin::new(&mut self.inner).poll_next(cx);
if let Poll::Ready(Some(Ok(ref bytes))) = ans {
self.remaining_length = self.remaining_length.saturating_sub(bytes.len());
@@ -198,7 +198,7 @@ impl ChunkedStream {
}
impl Stream for ChunkedStream {
type Item = Result<Bytes, Error>;
type Item = Result<Bytes, StdError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll(cx)

View File

@@ -1,10 +1,11 @@
use tokio::io::AsyncWriteExt;
use anyhow::anyhow;
use anyhow::{Error, Result};
use bytes::Bytes;
use futures::{Stream, StreamExt};
use reed_solomon_erasure::galois_8::ReedSolomon;
use s3s::StdError;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use crate::chunk_stream::ChunkedStream;
@@ -32,7 +33,7 @@ impl Erasure {
write_quorum: usize,
) -> Result<()>
where
S: Stream<Item = Result<Bytes, Error>> + Send + Sync + 'static,
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
W: AsyncWrite + Unpin,
{
let mut stream = ChunkedStream::new(body, data_size, block_size, true);
@@ -52,7 +53,7 @@ impl Erasure {
// TODO: reduceWriteQuorumErrs
}
Err(e) => return Err(e),
Err(e) => return Err(anyhow!(e)),
}
}

View File

@@ -139,13 +139,7 @@ impl StorageAPI for Sets {
let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks);
// erasure.encode(
// data.stream,
// &mut writers,
// fi.erasure.block_size,
// data.content_length,
// write_quorum,
// );
erasure.encode(data.stream, &mut writers, fi.erasure.block_size, data.content_length, write_quorum);
unimplemented!()
}

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use anyhow::{Error, Result};
use s3s::Body;
use s3s::{dto::StreamingBlob, Body};
use uuid::Uuid;
use crate::{
@@ -32,8 +32,7 @@ impl ECStore {
let mut deployment_id = None;
let (endpoint_pools, _) =
EndpointServerPools::create_server_endpoints(address, &layouts.pools, layouts.legacy)?;
let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address, &layouts.pools, layouts.legacy)?;
let mut pools = Vec::with_capacity(endpoint_pools.len());
let mut disk_map = HashMap::with_capacity(endpoint_pools.len());
@@ -121,37 +120,26 @@ impl StorageAPI for ECStore {
// TODO: wrap hash reader
let content_len = data.len() as u64;
let content_len = data.len();
let reader = PutObjReader::new(Body::from(data), content_len);
let body = Body::from(data);
self.put_object(
RUSTFS_META_BUCKET,
&file_path,
&reader,
&ObjectOptions { max_parity: true },
)
.await?;
let reader = PutObjReader::new(StreamingBlob::from(body), content_len);
self.put_object(RUSTFS_META_BUCKET, &file_path, &reader, &ObjectOptions { max_parity: true })
.await?;
// TODO: toObjectErr
Ok(())
}
async fn put_object(
&self,
bucket: &str,
object: &str,
data: &PutObjReader,
opts: &ObjectOptions,
) -> Result<()> {
async fn put_object(&self, bucket: &str, object: &str, data: &PutObjReader, opts: &ObjectOptions) -> Result<()> {
// checkPutObjectArgs
let object = utils::path::encode_dir_object(object);
if self.single_pool() {
self.pools[0]
.put_object(bucket, object.as_str(), data, opts)
.await?;
self.pools[0].put_object(bucket, object.as_str(), data, opts).await?;
return Ok(());
}

View File

@@ -3,7 +3,7 @@ use std::{default, sync::Arc};
use anyhow::Result;
use bytes::Bytes;
use futures::Stream;
use s3s::Body;
use s3s::{dto::StreamingBlob, Body};
use time::OffsetDateTime;
pub const ERASURE_ALGORITHM: &str = "rs-vandermonde";
@@ -119,12 +119,12 @@ pub struct MakeBucketOptions {
}
pub struct PutObjReader {
pub stream: Body,
pub content_length: u64,
pub stream: StreamingBlob,
pub content_length: usize,
}
impl PutObjReader {
pub fn new(stream: Body, content_length: u64) -> Self {
pub fn new(stream: StreamingBlob, content_length: usize) -> Self {
PutObjReader { stream, content_length }
}
}

View File

@@ -1,14 +1,30 @@
use std::fmt::Debug;
use ecstore::store_api::ObjectOptions;
use ecstore::store_api::PutObjReader;
use ecstore::store_api::StorageAPI;
use s3s::dto::*;
use s3s::s3_error;
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::S3Result;
use s3s::S3;
use s3s::{S3Request, S3Response};
use anyhow::Result;
use ecstore::store::ECStore;
use tracing::error;
macro_rules! try_ {
($result:expr) => {
match $result {
Ok(val) => val,
Err(err) => {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", err)));
}
}
};
}
#[derive(Debug)]
pub struct FS {
@@ -149,6 +165,12 @@ impl S3 for FS {
let Some(body) = body else { return Err(s3_error!(IncompleteBody)) };
let Some(content_length) = content_length else { return Err(s3_error!(IncompleteBody)) };
let reader = PutObjReader::new(body.into(), content_length as usize);
try_!(self.store.put_object(&bucket, &key, &reader, &ObjectOptions::default()).await);
// self.store.put_object(bucket, object, data, opts);
let output = PutObjectOutput { ..Default::default() };