diff --git a/ecstore/src/chunk_stream.rs b/ecstore/src/chunk_stream.rs index 9afbdd38..e5a1324e 100644 --- a/ecstore/src/chunk_stream.rs +++ b/ecstore/src/chunk_stream.rs @@ -21,64 +21,63 @@ impl ChunkedStream { where S: Stream> + Send + Sync + 'static, { - let inner = - AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), Error>>>::new(|mut y| { - #[allow(clippy::shadow_same)] // necessary for `pin_mut!` - Box::pin(async move { - pin_mut!(body); - // 上一次没用完的数据 - let mut prev_bytes = Bytes::new(); - let mut readed_size = 0; + let inner = AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), Error>>>::new(|mut y| { + #[allow(clippy::shadow_same)] // necessary for `pin_mut!` + Box::pin(async move { + pin_mut!(body); + // 上一次没用完的数据 + let mut prev_bytes = Bytes::new(); + let mut readed_size = 0; - loop { - let data: Vec = { - // 读固定大小的数据 - match Self::read_data(body.as_mut(), prev_bytes, chunk_size).await { - None => break, - Some(Err(e)) => return Err(e), - Some(Ok((data, remaining_bytes))) => { - prev_bytes = remaining_bytes; - data - } + loop { + let data: Vec = { + // 读固定大小的数据 + match Self::read_data(body.as_mut(), prev_bytes, chunk_size).await { + None => break, + Some(Err(e)) => return Err(e), + Some(Ok((data, remaining_bytes))) => { + prev_bytes = remaining_bytes; + data } - }; - - for bytes in data { - readed_size += bytes.len(); - // println!( - // "readed_size {}, content_length {}", - // readed_size, content_length, - // ); - y.yield_ok(bytes).await; } + }; - if readed_size + prev_bytes.len() >= content_length { - // println!( - // "读完了 readed_size:{} + prev_bytes.len({}) == content_length {}", - // readed_size, - // prev_bytes.len(), - // content_length, - // ); - - // 填充0? - if !need_padding { - y.yield_ok(prev_bytes).await; - break; - } - - let mut bytes = vec![0u8; chunk_size]; - let (left, _) = bytes.split_at_mut(prev_bytes.len()); - left.copy_from_slice(&prev_bytes); - - y.yield_ok(Bytes::from(bytes)).await; - - break; - } + for bytes in data { + readed_size += bytes.len(); + // println!( + // "readed_size {}, content_length {}", + // readed_size, content_length, + // ); + y.yield_ok(bytes).await; } - Ok(()) - }) - }); + if readed_size + prev_bytes.len() >= content_length { + // println!( + // "读完了 readed_size:{} + prev_bytes.len({}) == content_length {}", + // readed_size, + // prev_bytes.len(), + // content_length, + // ); + + // 填充0? + if !need_padding { + y.yield_ok(prev_bytes).await; + break; + } + + let mut bytes = vec![0u8; chunk_size]; + let (left, _) = bytes.split_at_mut(prev_bytes.len()); + left.copy_from_slice(&prev_bytes); + + y.yield_ok(Bytes::from(bytes)).await; + + break; + } + } + + Ok(()) + }) + }); Self { inner, remaining_length: content_length, diff --git a/ecstore/src/error.rs b/ecstore/src/error.rs index 13f95f09..57bc687c 100644 --- a/ecstore/src/error.rs +++ b/ecstore/src/error.rs @@ -1,17 +1,18 @@ -// use s3s::S3Error; -// use s3s::S3ErrorCode; +use s3s::S3Error; +use s3s::S3ErrorCode; +use s3s::StdError; use std::panic::Location; use tracing::error; -pub type StdError = Box; - #[derive(Debug)] pub struct Error { source: StdError, } +pub type Result = std::result::Result; + impl Error { #[must_use] #[track_caller] @@ -37,11 +38,11 @@ where } } -// impl From for S3Error { -// fn from(e: Error) -> Self { -// S3Error::with_source(S3ErrorCode::InternalError, e.source) -// } -// } +impl From for S3Error { + fn from(e: Error) -> Self { + S3Error::with_source(S3ErrorCode::InternalError, e.source) + } +} #[inline] #[track_caller] @@ -70,5 +71,3 @@ macro_rules! try_ { } }; } - - diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 33009d3e..257a702d 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -11,6 +11,6 @@ mod format; mod peer; mod sets; pub mod store; -mod store_api; +pub mod store_api; mod store_init; mod utils; diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index c3a2d845..370f7583 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -129,6 +129,7 @@ impl PutObjReader { } } +#[derive(Debug, Default)] pub struct ObjectOptions { // Use the maximum parity (N/2), used when saving server configuration files pub max_parity: bool, diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index e6f8b789..a997cc2d 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1,5 +1,6 @@ use std::fmt::Debug; +use ecstore::store_api::StorageAPI; use s3s::dto::*; use s3s::s3_error; use s3s::S3Result; @@ -24,10 +25,7 @@ impl FS { #[async_trait::async_trait] impl S3 for FS { #[tracing::instrument] - async fn create_bucket( - &self, - req: S3Request, - ) -> S3Result> { + async fn create_bucket(&self, req: S3Request) -> S3Result> { let input = req.input; let output = CreateBucketOutput::default(); // TODO: handle other fields @@ -35,41 +33,26 @@ impl S3 for FS { } #[tracing::instrument] - async fn copy_object( - &self, - req: S3Request, - ) -> S3Result> { + async fn copy_object(&self, req: S3Request) -> S3Result> { let input = req.input; let (bucket, key) = match input.copy_source { CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)), - CopySource::Bucket { - ref bucket, - ref key, - .. - } => (bucket, key), + CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key), }; - let output = CopyObjectOutput { - ..Default::default() - }; + let output = CopyObjectOutput { ..Default::default() }; Ok(S3Response::new(output)) } #[tracing::instrument] - async fn delete_bucket( - &self, - req: S3Request, - ) -> S3Result> { + async fn delete_bucket(&self, req: S3Request) -> S3Result> { let input = req.input; Ok(S3Response::new(DeleteBucketOutput {})) } #[tracing::instrument] - async fn delete_object( - &self, - req: S3Request, - ) -> S3Result> { + async fn delete_object(&self, req: S3Request) -> S3Result> { let input = req.input; let output = DeleteObjectOutput::default(); // TODO: handle other fields @@ -77,23 +60,15 @@ impl S3 for FS { } #[tracing::instrument] - async fn delete_objects( - &self, - req: S3Request, - ) -> S3Result> { + async fn delete_objects(&self, req: S3Request) -> S3Result> { let input = req.input; - let output = DeleteObjectsOutput { - ..Default::default() - }; + let output = DeleteObjectsOutput { ..Default::default() }; Ok(S3Response::new(output)) } #[tracing::instrument] - async fn get_bucket_location( - &self, - req: S3Request, - ) -> S3Result> { + async fn get_bucket_location(&self, req: S3Request) -> S3Result> { let input = req.input; let output = GetBucketLocationOutput::default(); @@ -101,57 +76,36 @@ impl S3 for FS { } #[tracing::instrument] - async fn get_object( - &self, - req: S3Request, - ) -> S3Result> { + async fn get_object(&self, req: S3Request) -> S3Result> { let input = req.input; - let output = GetObjectOutput { - ..Default::default() - }; + let output = GetObjectOutput { ..Default::default() }; Ok(S3Response::new(output)) } #[tracing::instrument] - async fn head_bucket( - &self, - req: S3Request, - ) -> S3Result> { + async fn head_bucket(&self, req: S3Request) -> S3Result> { let input = req.input; Ok(S3Response::new(HeadBucketOutput::default())) } #[tracing::instrument] - async fn head_object( - &self, - req: S3Request, - ) -> S3Result> { + async fn head_object(&self, req: S3Request) -> S3Result> { let input = req.input; - let output = HeadObjectOutput { - ..Default::default() - }; + let output = HeadObjectOutput { ..Default::default() }; Ok(S3Response::new(output)) } #[tracing::instrument] - async fn list_buckets( - &self, - _: S3Request, - ) -> S3Result> { - let output = ListBucketsOutput { - ..Default::default() - }; + async fn list_buckets(&self, _: S3Request) -> S3Result> { + let output = ListBucketsOutput { ..Default::default() }; Ok(S3Response::new(output)) } #[tracing::instrument] - async fn list_objects( - &self, - req: S3Request, - ) -> S3Result> { + async fn list_objects(&self, req: S3Request) -> S3Result> { let v2_resp = self.list_objects_v2(req.map_input(Into::into)).await?; Ok(v2_resp.map_output(|v2| ListObjectsOutput { @@ -166,28 +120,38 @@ impl S3 for FS { } #[tracing::instrument] - async fn list_objects_v2( - &self, - req: S3Request, - ) -> S3Result> { + async fn list_objects_v2(&self, req: S3Request) -> S3Result> { let input = req.input; - let output = ListObjectsV2Output { - ..Default::default() - }; + let output = ListObjectsV2Output { ..Default::default() }; Ok(S3Response::new(output)) } #[tracing::instrument] - async fn put_object( - &self, - req: S3Request, - ) -> S3Result> { + async fn put_object(&self, req: S3Request) -> S3Result> { let input = req.input; - let output = PutObjectOutput { - ..Default::default() - }; + if let Some(ref storage_class) = input.storage_class { + let is_valid = ["STANDARD", "REDUCED_REDUNDANCY"].contains(&storage_class.as_str()); + if !is_valid { + return Err(s3_error!(InvalidStorageClass)); + } + } + + let PutObjectInput { + body, + bucket, + key, + metadata, + content_length, + .. + } = input; + + let Some(body) = body else { return Err(s3_error!(IncompleteBody)) }; + + // self.store.put_object(bucket, object, data, opts); + + let output = PutObjectOutput { ..Default::default() }; Ok(S3Response::new(output)) } @@ -198,18 +162,13 @@ impl S3 for FS { ) -> S3Result> { let input = req.input; - let output = CreateMultipartUploadOutput { - ..Default::default() - }; + let output = CreateMultipartUploadOutput { ..Default::default() }; Ok(S3Response::new(output)) } #[tracing::instrument] - async fn upload_part( - &self, - req: S3Request, - ) -> S3Result> { + async fn upload_part(&self, req: S3Request) -> S3Result> { let UploadPartInput { body, upload_id, @@ -217,36 +176,23 @@ impl S3 for FS { .. } = req.input; - let output = UploadPartOutput { - ..Default::default() - }; + let output = UploadPartOutput { ..Default::default() }; Ok(S3Response::new(output)) } #[tracing::instrument] - async fn upload_part_copy( - &self, - req: S3Request, - ) -> S3Result> { + async fn upload_part_copy(&self, req: S3Request) -> S3Result> { let input = req.input; - let output = UploadPartCopyOutput { - ..Default::default() - }; + let output = UploadPartCopyOutput { ..Default::default() }; Ok(S3Response::new(output)) } #[tracing::instrument] - async fn list_parts( - &self, - req: S3Request, - ) -> S3Result> { + async fn list_parts(&self, req: S3Request) -> S3Result> { let ListPartsInput { - bucket, - key, - upload_id, - .. + bucket, key, upload_id, .. } = req.input; let output = ListPartsOutput { @@ -284,8 +230,6 @@ impl S3 for FS { &self, req: S3Request, ) -> S3Result> { - Ok(S3Response::new(AbortMultipartUploadOutput { - ..Default::default() - })) + Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() })) } }