todo:Error

This commit is contained in:
weisd
2024-07-02 16:03:26 +08:00
parent e013d33691
commit ace3fbdfa0
5 changed files with 113 additions and 170 deletions

View File

@@ -21,64 +21,63 @@ impl ChunkedStream {
where
S: Stream<Item = Result<Bytes, Error>> + Send + Sync + 'static,
{
let inner =
AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), Error>>>::new(|mut y| {
#[allow(clippy::shadow_same)] // necessary for `pin_mut!`
Box::pin(async move {
pin_mut!(body);
// 上一次没用完的数据
let mut prev_bytes = Bytes::new();
let mut readed_size = 0;
let inner = AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), Error>>>::new(|mut y| {
#[allow(clippy::shadow_same)] // necessary for `pin_mut!`
Box::pin(async move {
pin_mut!(body);
// 上一次没用完的数据
let mut prev_bytes = Bytes::new();
let mut readed_size = 0;
loop {
let data: Vec<Bytes> = {
// 读固定大小的数据
match Self::read_data(body.as_mut(), prev_bytes, chunk_size).await {
None => break,
Some(Err(e)) => return Err(e),
Some(Ok((data, remaining_bytes))) => {
prev_bytes = remaining_bytes;
data
}
loop {
let data: Vec<Bytes> = {
// 读固定大小的数据
match Self::read_data(body.as_mut(), prev_bytes, chunk_size).await {
None => break,
Some(Err(e)) => return Err(e),
Some(Ok((data, remaining_bytes))) => {
prev_bytes = remaining_bytes;
data
}
};
for bytes in data {
readed_size += bytes.len();
// println!(
// "readed_size {}, content_length {}",
// readed_size, content_length,
// );
y.yield_ok(bytes).await;
}
};
if readed_size + prev_bytes.len() >= content_length {
// println!(
// "读完了 readed_size:{} + prev_bytes.len({}) == content_length {}",
// readed_size,
// prev_bytes.len(),
// content_length,
// );
// 填充0
if !need_padding {
y.yield_ok(prev_bytes).await;
break;
}
let mut bytes = vec![0u8; chunk_size];
let (left, _) = bytes.split_at_mut(prev_bytes.len());
left.copy_from_slice(&prev_bytes);
y.yield_ok(Bytes::from(bytes)).await;
break;
}
for bytes in data {
readed_size += bytes.len();
// println!(
// "readed_size {}, content_length {}",
// readed_size, content_length,
// );
y.yield_ok(bytes).await;
}
Ok(())
})
});
if readed_size + prev_bytes.len() >= content_length {
// println!(
// "读完了 readed_size:{} + prev_bytes.len({}) == content_length {}",
// readed_size,
// prev_bytes.len(),
// content_length,
// );
// 填充0
if !need_padding {
y.yield_ok(prev_bytes).await;
break;
}
let mut bytes = vec![0u8; chunk_size];
let (left, _) = bytes.split_at_mut(prev_bytes.len());
left.copy_from_slice(&prev_bytes);
y.yield_ok(Bytes::from(bytes)).await;
break;
}
}
Ok(())
})
});
Self {
inner,
remaining_length: content_length,

View File

@@ -1,17 +1,18 @@
// use s3s::S3Error;
// use s3s::S3ErrorCode;
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::StdError;
use std::panic::Location;
use tracing::error;
pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
#[derive(Debug)]
pub struct Error {
source: StdError,
}
pub type Result<T = (), E = Error> = std::result::Result<T, E>;
impl Error {
#[must_use]
#[track_caller]
@@ -37,11 +38,11 @@ where
}
}
// impl From<Error> for S3Error {
// fn from(e: Error) -> Self {
// S3Error::with_source(S3ErrorCode::InternalError, e.source)
// }
// }
impl From<Error> for S3Error {
fn from(e: Error) -> Self {
S3Error::with_source(S3ErrorCode::InternalError, e.source)
}
}
#[inline]
#[track_caller]
@@ -70,5 +71,3 @@ macro_rules! try_ {
}
};
}

View File

@@ -11,6 +11,6 @@ mod format;
mod peer;
mod sets;
pub mod store;
mod store_api;
pub mod store_api;
mod store_init;
mod utils;

View File

@@ -129,6 +129,7 @@ impl PutObjReader {
}
}
#[derive(Debug, Default)]
pub struct ObjectOptions {
// Use the maximum parity (N/2), used when saving server configuration files
pub max_parity: bool,

View File

@@ -1,5 +1,6 @@
use std::fmt::Debug;
use ecstore::store_api::StorageAPI;
use s3s::dto::*;
use s3s::s3_error;
use s3s::S3Result;
@@ -24,10 +25,7 @@ impl FS {
#[async_trait::async_trait]
impl S3 for FS {
#[tracing::instrument]
async fn create_bucket(
&self,
req: S3Request<CreateBucketInput>,
) -> S3Result<S3Response<CreateBucketOutput>> {
async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
let input = req.input;
let output = CreateBucketOutput::default(); // TODO: handle other fields
@@ -35,41 +33,26 @@ impl S3 for FS {
}
#[tracing::instrument]
async fn copy_object(
&self,
req: S3Request<CopyObjectInput>,
) -> S3Result<S3Response<CopyObjectOutput>> {
async fn copy_object(&self, req: S3Request<CopyObjectInput>) -> S3Result<S3Response<CopyObjectOutput>> {
let input = req.input;
let (bucket, key) = match input.copy_source {
CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)),
CopySource::Bucket {
ref bucket,
ref key,
..
} => (bucket, key),
CopySource::Bucket { ref bucket, ref key, .. } => (bucket, key),
};
let output = CopyObjectOutput {
..Default::default()
};
let output = CopyObjectOutput { ..Default::default() };
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn delete_bucket(
&self,
req: S3Request<DeleteBucketInput>,
) -> S3Result<S3Response<DeleteBucketOutput>> {
async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
let input = req.input;
Ok(S3Response::new(DeleteBucketOutput {}))
}
#[tracing::instrument]
async fn delete_object(
&self,
req: S3Request<DeleteObjectInput>,
) -> S3Result<S3Response<DeleteObjectOutput>> {
async fn delete_object(&self, req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
let input = req.input;
let output = DeleteObjectOutput::default(); // TODO: handle other fields
@@ -77,23 +60,15 @@ impl S3 for FS {
}
#[tracing::instrument]
async fn delete_objects(
&self,
req: S3Request<DeleteObjectsInput>,
) -> S3Result<S3Response<DeleteObjectsOutput>> {
async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
let input = req.input;
let output = DeleteObjectsOutput {
..Default::default()
};
let output = DeleteObjectsOutput { ..Default::default() };
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn get_bucket_location(
&self,
req: S3Request<GetBucketLocationInput>,
) -> S3Result<S3Response<GetBucketLocationOutput>> {
async fn get_bucket_location(&self, req: S3Request<GetBucketLocationInput>) -> S3Result<S3Response<GetBucketLocationOutput>> {
let input = req.input;
let output = GetBucketLocationOutput::default();
@@ -101,57 +76,36 @@ impl S3 for FS {
}
#[tracing::instrument]
async fn get_object(
&self,
req: S3Request<GetObjectInput>,
) -> S3Result<S3Response<GetObjectOutput>> {
async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
let input = req.input;
let output = GetObjectOutput {
..Default::default()
};
let output = GetObjectOutput { ..Default::default() };
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn head_bucket(
&self,
req: S3Request<HeadBucketInput>,
) -> S3Result<S3Response<HeadBucketOutput>> {
async fn head_bucket(&self, req: S3Request<HeadBucketInput>) -> S3Result<S3Response<HeadBucketOutput>> {
let input = req.input;
Ok(S3Response::new(HeadBucketOutput::default()))
}
#[tracing::instrument]
async fn head_object(
&self,
req: S3Request<HeadObjectInput>,
) -> S3Result<S3Response<HeadObjectOutput>> {
async fn head_object(&self, req: S3Request<HeadObjectInput>) -> S3Result<S3Response<HeadObjectOutput>> {
let input = req.input;
let output = HeadObjectOutput {
..Default::default()
};
let output = HeadObjectOutput { ..Default::default() };
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn list_buckets(
&self,
_: S3Request<ListBucketsInput>,
) -> S3Result<S3Response<ListBucketsOutput>> {
let output = ListBucketsOutput {
..Default::default()
};
async fn list_buckets(&self, _: S3Request<ListBucketsInput>) -> S3Result<S3Response<ListBucketsOutput>> {
let output = ListBucketsOutput { ..Default::default() };
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn list_objects(
&self,
req: S3Request<ListObjectsInput>,
) -> S3Result<S3Response<ListObjectsOutput>> {
async fn list_objects(&self, req: S3Request<ListObjectsInput>) -> S3Result<S3Response<ListObjectsOutput>> {
let v2_resp = self.list_objects_v2(req.map_input(Into::into)).await?;
Ok(v2_resp.map_output(|v2| ListObjectsOutput {
@@ -166,28 +120,38 @@ impl S3 for FS {
}
#[tracing::instrument]
async fn list_objects_v2(
&self,
req: S3Request<ListObjectsV2Input>,
) -> S3Result<S3Response<ListObjectsV2Output>> {
async fn list_objects_v2(&self, req: S3Request<ListObjectsV2Input>) -> S3Result<S3Response<ListObjectsV2Output>> {
let input = req.input;
let output = ListObjectsV2Output {
..Default::default()
};
let output = ListObjectsV2Output { ..Default::default() };
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn put_object(
&self,
req: S3Request<PutObjectInput>,
) -> S3Result<S3Response<PutObjectOutput>> {
async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
let input = req.input;
let output = PutObjectOutput {
..Default::default()
};
if let Some(ref storage_class) = input.storage_class {
let is_valid = ["STANDARD", "REDUCED_REDUNDANCY"].contains(&storage_class.as_str());
if !is_valid {
return Err(s3_error!(InvalidStorageClass));
}
}
let PutObjectInput {
body,
bucket,
key,
metadata,
content_length,
..
} = input;
let Some(body) = body else { return Err(s3_error!(IncompleteBody)) };
// self.store.put_object(bucket, object, data, opts);
let output = PutObjectOutput { ..Default::default() };
Ok(S3Response::new(output))
}
@@ -198,18 +162,13 @@ impl S3 for FS {
) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
let input = req.input;
let output = CreateMultipartUploadOutput {
..Default::default()
};
let output = CreateMultipartUploadOutput { ..Default::default() };
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn upload_part(
&self,
req: S3Request<UploadPartInput>,
) -> S3Result<S3Response<UploadPartOutput>> {
async fn upload_part(&self, req: S3Request<UploadPartInput>) -> S3Result<S3Response<UploadPartOutput>> {
let UploadPartInput {
body,
upload_id,
@@ -217,36 +176,23 @@ impl S3 for FS {
..
} = req.input;
let output = UploadPartOutput {
..Default::default()
};
let output = UploadPartOutput { ..Default::default() };
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn upload_part_copy(
&self,
req: S3Request<UploadPartCopyInput>,
) -> S3Result<S3Response<UploadPartCopyOutput>> {
async fn upload_part_copy(&self, req: S3Request<UploadPartCopyInput>) -> S3Result<S3Response<UploadPartCopyOutput>> {
let input = req.input;
let output = UploadPartCopyOutput {
..Default::default()
};
let output = UploadPartCopyOutput { ..Default::default() };
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn list_parts(
&self,
req: S3Request<ListPartsInput>,
) -> S3Result<S3Response<ListPartsOutput>> {
async fn list_parts(&self, req: S3Request<ListPartsInput>) -> S3Result<S3Response<ListPartsOutput>> {
let ListPartsInput {
bucket,
key,
upload_id,
..
bucket, key, upload_id, ..
} = req.input;
let output = ListPartsOutput {
@@ -284,8 +230,6 @@ impl S3 for FS {
&self,
req: S3Request<AbortMultipartUploadInput>,
) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
Ok(S3Response::new(AbortMultipartUploadOutput {
..Default::default()
}))
Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() }))
}
}