From 5eb6d52dafb318b0bbffb8b46a8d71202f9476d1 Mon Sep 17 00:00:00 2001 From: weisd Date: Tue, 2 Jul 2024 17:01:54 +0800 Subject: [PATCH] todo:Error --- ecstore/src/chunk_stream.rs | 16 ++++++++-------- ecstore/src/erasure.rs | 9 +++++---- ecstore/src/sets.rs | 8 +------- ecstore/src/store.rs | 32 ++++++++++---------------------- ecstore/src/store_api.rs | 8 ++++---- rustfs/src/storage/ecfs.rs | 22 ++++++++++++++++++++++ 6 files changed, 50 insertions(+), 45 deletions(-) diff --git a/ecstore/src/chunk_stream.rs b/ecstore/src/chunk_stream.rs index e5a1324e..95bd3ba1 100644 --- a/ecstore/src/chunk_stream.rs +++ b/ecstore/src/chunk_stream.rs @@ -1,7 +1,7 @@ -use anyhow::Error; use bytes::Bytes; use futures::pin_mut; use futures::stream::{Stream, StreamExt}; +use s3s::StdError; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -11,7 +11,7 @@ pub type SyncBoxFuture<'a, T> = Pin + Send + Sync + ' pub struct ChunkedStream { /// inner - inner: AsyncTryStream>>, + inner: AsyncTryStream>>, remaining_length: usize, } @@ -19,9 +19,9 @@ pub struct ChunkedStream { impl ChunkedStream { pub fn new(body: S, content_length: usize, chunk_size: usize, need_padding: bool) -> Self where - S: Stream> + Send + Sync + 'static, + S: Stream> + Send + Sync + 'static, { - let inner = AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), Error>>>::new(|mut y| { + let inner = AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), StdError>>>::new(|mut y| { #[allow(clippy::shadow_same)] // necessary for `pin_mut!` Box::pin(async move { pin_mut!(body); @@ -88,9 +88,9 @@ impl ChunkedStream { mut body: Pin<&mut S>, prev_bytes: Bytes, data_size: usize, - ) -> Option, Bytes), Error>> + ) -> Option, Bytes), StdError>> where - S: Stream> + Send + 'static, + S: Stream> + Send + 'static, { let mut bytes_buffer = Vec::new(); @@ -184,7 +184,7 @@ impl ChunkedStream { Some(Ok((bytes_buffer, remaining_bytes))) } - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { let ans = Pin::new(&mut self.inner).poll_next(cx); if let Poll::Ready(Some(Ok(ref bytes))) = ans { self.remaining_length = self.remaining_length.saturating_sub(bytes.len()); @@ -198,7 +198,7 @@ impl ChunkedStream { } impl Stream for ChunkedStream { - type Item = Result; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.poll(cx) diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 1e667b9c..4fc73823 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -1,10 +1,11 @@ -use tokio::io::AsyncWriteExt; - +use anyhow::anyhow; use anyhow::{Error, Result}; use bytes::Bytes; use futures::{Stream, StreamExt}; use reed_solomon_erasure::galois_8::ReedSolomon; +use s3s::StdError; use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; use crate::chunk_stream::ChunkedStream; @@ -32,7 +33,7 @@ impl Erasure { write_quorum: usize, ) -> Result<()> where - S: Stream> + Send + Sync + 'static, + S: Stream> + Send + Sync + 'static, W: AsyncWrite + Unpin, { let mut stream = ChunkedStream::new(body, data_size, block_size, true); @@ -52,7 +53,7 @@ impl Erasure { // TODO: reduceWriteQuorumErrs } - Err(e) => return Err(e), + Err(e) => return Err(anyhow!(e)), } } diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index b05e21e1..8929be8d 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -139,13 +139,7 @@ impl StorageAPI for Sets { let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks); - // erasure.encode( - // data.stream, - // &mut writers, - // fi.erasure.block_size, - // data.content_length, - // write_quorum, - // ); + erasure.encode(data.stream, &mut writers, fi.erasure.block_size, data.content_length, write_quorum); unimplemented!() } diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 0f8d80c2..5e0baa6c 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use anyhow::{Error, Result}; -use s3s::Body; +use s3s::{dto::StreamingBlob, Body}; use uuid::Uuid; use crate::{ @@ -32,8 +32,7 @@ impl ECStore { let mut deployment_id = None; - let (endpoint_pools, _) = - EndpointServerPools::create_server_endpoints(address, &layouts.pools, layouts.legacy)?; + let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address, &layouts.pools, layouts.legacy)?; let mut pools = Vec::with_capacity(endpoint_pools.len()); let mut disk_map = HashMap::with_capacity(endpoint_pools.len()); @@ -121,37 +120,26 @@ impl StorageAPI for ECStore { // TODO: wrap hash reader - let content_len = data.len() as u64; + let content_len = data.len(); - let reader = PutObjReader::new(Body::from(data), content_len); + let body = Body::from(data); - self.put_object( - RUSTFS_META_BUCKET, - &file_path, - &reader, - &ObjectOptions { max_parity: true }, - ) - .await?; + let reader = PutObjReader::new(StreamingBlob::from(body), content_len); + + self.put_object(RUSTFS_META_BUCKET, &file_path, &reader, &ObjectOptions { max_parity: true }) + .await?; // TODO: toObjectErr Ok(()) } - async fn put_object( - &self, - bucket: &str, - object: &str, - data: &PutObjReader, - opts: &ObjectOptions, - ) -> Result<()> { + async fn put_object(&self, bucket: &str, object: &str, data: &PutObjReader, opts: &ObjectOptions) -> Result<()> { // checkPutObjectArgs let object = utils::path::encode_dir_object(object); if self.single_pool() { - self.pools[0] - .put_object(bucket, object.as_str(), data, opts) - .await?; + self.pools[0].put_object(bucket, object.as_str(), data, opts).await?; return Ok(()); } diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 370f7583..bba172b2 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -3,7 +3,7 @@ use std::{default, sync::Arc}; use anyhow::Result; use bytes::Bytes; use futures::Stream; -use s3s::Body; +use s3s::{dto::StreamingBlob, Body}; use time::OffsetDateTime; pub const ERASURE_ALGORITHM: &str = "rs-vandermonde"; @@ -119,12 +119,12 @@ pub struct MakeBucketOptions { } pub struct PutObjReader { - pub stream: Body, - pub content_length: u64, + pub stream: StreamingBlob, + pub content_length: usize, } impl PutObjReader { - pub fn new(stream: Body, content_length: u64) -> Self { + pub fn new(stream: StreamingBlob, content_length: usize) -> Self { PutObjReader { stream, content_length } } } diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index a997cc2d..0dcc1821 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1,14 +1,30 @@ use std::fmt::Debug; +use ecstore::store_api::ObjectOptions; +use ecstore::store_api::PutObjReader; use ecstore::store_api::StorageAPI; use s3s::dto::*; use s3s::s3_error; +use s3s::S3Error; +use s3s::S3ErrorCode; use s3s::S3Result; use s3s::S3; use s3s::{S3Request, S3Response}; use anyhow::Result; use ecstore::store::ECStore; +use tracing::error; + +macro_rules! try_ { + ($result:expr) => { + match $result { + Ok(val) => val, + Err(err) => { + return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", err))); + } + } + }; +} #[derive(Debug)] pub struct FS { @@ -149,6 +165,12 @@ impl S3 for FS { let Some(body) = body else { return Err(s3_error!(IncompleteBody)) }; + let Some(content_length) = content_length else { return Err(s3_error!(IncompleteBody)) }; + + let reader = PutObjReader::new(body.into(), content_length as usize); + + try_!(self.store.put_object(&bucket, &key, &reader, &ObjectOptions::default()).await); + // self.store.put_object(bucket, object, data, opts); let output = PutObjectOutput { ..Default::default() };