test:get_object

This commit is contained in:
weisd
2024-07-24 16:45:23 +08:00
parent 22c85612c7
commit 1b60d05434
7 changed files with 183 additions and 40 deletions

3
Cargo.lock generated
View File

@@ -1091,8 +1091,10 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"clap",
"ecstore",
"futures",
"http",
"hyper-util",
"mime",
@@ -1101,6 +1103,7 @@ dependencies = [
"tokio",
"tracing",
"tracing-subscriber",
"transform-stream",
]
[[package]]

View File

@@ -509,6 +509,8 @@ impl DiskAPI for LocalDisk {
}
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
let p = self.get_object_path(&volume, &path)?;
debug!("read_file {:?}", &p);
let file = File::options().read(true).open(&p).await?;
Ok(FileReader::new(file))

View File

@@ -9,6 +9,9 @@ use s3s::dto::StreamingBlob;
use s3s::StdError;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::io::DuplexStream;
use tracing::debug;
use tracing::warn;
// use tracing::debug;
use uuid::Uuid;
@@ -49,14 +52,16 @@ impl Erasure {
{
let mut stream = ChunkedStream::new(body, total_size, self.block_size, true);
let mut total: usize = 0;
// let mut idx = 0;
let mut idx = 0;
while let Some(result) = stream.next().await {
match result {
Ok(data) => {
let blocks = self.encode_data(data.as_ref())?;
warn!("encode shard size: {}/{} from block_size {} ", blocks[0].len(), blocks.len(), data.len());
let mut errs = Vec::new();
// idx += 1;
idx += 1;
for (i, w) in writers.iter_mut().enumerate() {
total += blocks[i].len();
@@ -88,7 +93,7 @@ impl Erasure {
}
}
// debug!("{} encode_data done {}", self.id, total);
warn!(" encode_data done shard block num {}", idx);
Ok(total)
@@ -102,7 +107,7 @@ impl Erasure {
pub async fn decode(
&self,
writer: &StreamingBlob,
writer: &mut DuplexStream,
readers: Vec<Option<FileReader>>,
offset: usize,
length: usize,
@@ -117,6 +122,9 @@ impl Erasure {
let start_block = offset / self.block_size;
let end_block = (offset + length) / self.block_size;
warn!("decode block from {} to {}", start_block, end_block);
let mut bytes_writed = 0;
for block_idx in start_block..=end_block {
let mut block_offset = 0;
let mut block_length = 0;
@@ -138,15 +146,77 @@ impl Erasure {
break;
}
warn!("decode block_offset {},block_length {} ", block_offset, block_length);
let mut bufs = reader.read().await?;
self.decode_data(&mut bufs)?;
let writed_n = self
.write_data_blocks(writer, bufs, self.data_shards, block_offset, block_length)
.await?;
bytes_writed += writed_n;
}
unimplemented!()
if bytes_writed != length {
warn!("bytes_writed != length ");
return Err(Error::msg("erasure decode less data"));
}
Ok(bytes_writed)
}
fn write_data_blocks() -> Result<()> {
unimplemented!()
async fn write_data_blocks(
&self,
writer: &mut DuplexStream,
bufs: Vec<Option<Vec<u8>>>,
data_blocks: usize,
offset: usize,
length: usize,
) -> Result<usize> {
if bufs.len() < data_blocks {
return Err(Error::msg("read bufs not match data_blocks"));
}
let data_len: usize = bufs
.iter()
.take(data_blocks)
.filter(|v| v.is_some())
.map(|v| v.as_ref().unwrap().len())
.sum();
if data_len < length {
return Err(Error::msg(format!("write_data_blocks data_len < length {} < {}", data_len, length)));
}
let mut offset = offset;
debug!("write_data_blocks offset {}, length {}", offset, length);
let mut write = length;
let mut total_writed = 0;
for opt_buf in bufs.iter().take(data_blocks) {
let buf = opt_buf.as_ref().unwrap();
if offset >= buf.len() {
offset -= buf.len();
continue;
}
let buf = &buf[offset..];
offset = 0;
// if write < buf.len() {}
let n = writer.write(buf).await?;
write -= n;
total_writed += n;
}
Ok(total_writed)
}
pub fn encode_data(&self, data: &[u8]) -> Result<Vec<Vec<u8>>> {
@@ -239,6 +309,8 @@ impl ShardReader {
pub async fn read(&mut self) -> Result<Vec<Option<Vec<u8>>>> {
// let mut disks = self.readers;
warn!("shard reader read offset {}, shard_size {}", self.offset, self.shard_size);
let reader_length = self.readers.len();
let mut futures = Vec::with_capacity(reader_length);
@@ -271,6 +343,9 @@ impl ShardReader {
}
}
// debug!("ec decode read ress {:?}", &ress);
debug!("ec decode read errors {:?}", &errors);
if !self.can_decode(&ress) {
return Err(Error::msg("shard reader read faild"));
}

View File

@@ -1,9 +1,11 @@
use anyhow::{Error, Result};
use http::HeaderMap;
use rmp_serde::Serializer;
use s3s::dto::StreamingBlob;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::io::{AsyncRead, DuplexStream};
use uuid::Uuid;
pub const ERASURE_ALGORITHM: &str = "rs-vandermonde";
@@ -58,11 +60,12 @@ impl FileInfo {
Ok(t)
}
pub fn add_object_part(&mut self, num: usize, part_size: usize, mod_time: OffsetDateTime) {
pub fn add_object_part(&mut self, num: usize, part_size: usize, mod_time: OffsetDateTime, actual_size: usize) {
let part = ObjectPartInfo {
number: num,
size: part_size,
mod_time,
actual_size,
};
for p in self.parts.iter_mut() {
@@ -179,7 +182,7 @@ pub struct ObjectPartInfo {
// pub etag: Option<String>,
pub number: usize,
pub size: usize,
// pub actual_size: usize,
pub actual_size: usize, // 源数据大小
pub mod_time: OffsetDateTime,
// pub index: Option<Vec<u8>>,
// pub checksums: Option<std::collections::HashMap<String, String>>,
@@ -191,6 +194,7 @@ impl Default for ObjectPartInfo {
number: Default::default(),
size: Default::default(),
mod_time: OffsetDateTime::UNIX_EPOCH,
actual_size: Default::default(),
}
}
}
@@ -260,11 +264,11 @@ pub struct GetObjectReader {
pub object_info: ObjectInfo,
}
impl GetObjectReader {
pub fn new(stream: StreamingBlob, object_info: ObjectInfo) -> Self {
GetObjectReader { stream, object_info }
}
}
// impl GetObjectReader {
// pub fn new(stream: StreamingBlob, object_info: ObjectInfo) -> Self {
// GetObjectReader { stream, object_info }
// }
// }
pub struct HTTPRangeSpec {
pub is_shuffix_length: bool,

View File

@@ -10,7 +10,7 @@ rust-version.workspace = true
[dependencies]
async-trait.workspace = true
tracing.workspace = true
tracing.workspace = true
anyhow.workspace = true
time = { workspace = true, features = ["parsing", "formatting"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "net", "signal"] }
@@ -23,3 +23,7 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] }
hyper-util = { version = "0.1.5", features = ["tokio", "server-auto", "server-graceful"] }
http.workspace = true
mime = "0.3.17"
bytes.workspace = true
transform-stream = "0.3.0"
futures.workspace = true
# pin-utils = "0.1.0"

View File

@@ -1,11 +1,16 @@
use bytes::Bytes;
use ecstore::disk_api::DiskError;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::CompletePart;
use ecstore::store_api::HTTPRangeSpec;
use ecstore::store_api::MakeBucketOptions;
use ecstore::store_api::MultipartUploadResult;
use ecstore::store_api::ObjectOptions;
use ecstore::store_api::PutObjReader;
use ecstore::store_api::StorageAPI;
use futures::pin_mut;
use futures::{Stream, StreamExt};
use http::HeaderMap;
use s3s::dto::*;
use s3s::s3_error;
use s3s::S3Error;
@@ -15,11 +20,12 @@ use s3s::S3;
use s3s::{S3Request, S3Response};
use std::fmt::Debug;
use std::str::FromStr;
use time::OffsetDateTime;
use transform_stream::AsyncTryStream;
use anyhow::Result;
use ecstore::store::ECStore;
use tracing::debug;
use tracing::info;
macro_rules! try_ {
($result:expr) => {
@@ -43,10 +49,13 @@ impl FS {
Ok(Self { store })
}
}
#[async_trait::async_trait]
impl S3 for FS {
#[tracing::instrument]
#[tracing::instrument(
level = "debug",
skip(self, req),
fields(start_time=?time::OffsetDateTime::now_utc())
)]
async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
let input = req.input;
@@ -60,7 +69,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn copy_object(&self, req: S3Request<CopyObjectInput>) -> S3Result<S3Response<CopyObjectOutput>> {
let input = req.input;
let (_bucket, _key) = match input.copy_source {
@@ -72,14 +81,14 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
let _input = req.input;
Ok(S3Response::new(DeleteBucketOutput {}))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn delete_object(&self, req: S3Request<DeleteObjectInput>) -> S3Result<S3Response<DeleteObjectOutput>> {
let _input = req.input;
@@ -87,7 +96,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn delete_objects(&self, req: S3Request<DeleteObjectsInput>) -> S3Result<S3Response<DeleteObjectsOutput>> {
let _input = req.input;
@@ -95,7 +104,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn get_bucket_location(&self, req: S3Request<GetBucketLocationInput>) -> S3Result<S3Response<GetBucketLocationOutput>> {
// mc get 1
let input = req.input;
@@ -121,18 +130,44 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(
level = "debug",
skip(self, req),
fields(start_time=?time::OffsetDateTime::now_utc())
)]
async fn get_object(&self, req: S3Request<GetObjectInput>) -> S3Result<S3Response<GetObjectOutput>> {
// mc get 3
let input = req.input;
println!("get_object: {:?}", &input);
let GetObjectInput { bucket, key, .. } = req.input;
let output = GetObjectOutput { ..Default::default() };
let range = HTTPRangeSpec::nil();
let h = HeaderMap::new();
let opts = &ObjectOptions {
max_parity: false,
mod_time: OffsetDateTime::UNIX_EPOCH,
part_number: 0,
};
let reader = try_!(
self.store
.get_object_reader(bucket.as_str(), key.as_str(), range, h, opts)
.await
);
// let body = bytes_stream(reader.stream, 100);
let output = GetObjectOutput {
body: Some(reader.stream),
content_length: Some(reader.object_info.size as i64),
..Default::default()
};
debug!("get_object response {:?}", output);
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn head_bucket(&self, req: S3Request<HeadBucketInput>) -> S3Result<S3Response<HeadBucketOutput>> {
let input = req.input;
@@ -148,7 +183,7 @@ impl S3 for FS {
Ok(S3Response::new(HeadBucketOutput::default()))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn head_object(&self, req: S3Request<HeadObjectInput>) -> S3Result<S3Response<HeadObjectOutput>> {
// mc get 2
let HeadObjectInput {
@@ -184,13 +219,13 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self))]
async fn list_buckets(&self, _: S3Request<ListBucketsInput>) -> S3Result<S3Response<ListBucketsOutput>> {
let output = ListBucketsOutput { ..Default::default() };
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn list_objects(&self, req: S3Request<ListObjectsInput>) -> S3Result<S3Response<ListObjectsOutput>> {
let v2_resp = self.list_objects_v2(req.map_input(Into::into)).await?;
@@ -205,7 +240,7 @@ impl S3 for FS {
}))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn list_objects_v2(&self, req: S3Request<ListObjectsV2Input>) -> S3Result<S3Response<ListObjectsV2Output>> {
let _input = req.input;
@@ -213,7 +248,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn put_object(&self, req: S3Request<PutObjectInput>) -> S3Result<S3Response<PutObjectOutput>> {
let input = req.input;
@@ -247,7 +282,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn create_multipart_upload(
&self,
req: S3Request<CreateMultipartUploadInput>,
@@ -276,7 +311,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn upload_part(&self, req: S3Request<UploadPartInput>) -> S3Result<S3Response<UploadPartOutput>> {
let UploadPartInput {
body,
@@ -309,7 +344,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn upload_part_copy(&self, req: S3Request<UploadPartCopyInput>) -> S3Result<S3Response<UploadPartCopyOutput>> {
let _input = req.input;
@@ -318,7 +353,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn list_parts(&self, req: S3Request<ListPartsInput>) -> S3Result<S3Response<ListPartsOutput>> {
let ListPartsInput {
bucket, key, upload_id, ..
@@ -333,7 +368,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self, req))]
async fn complete_multipart_upload(
&self,
req: S3Request<CompleteMultipartUploadInput>,
@@ -372,7 +407,7 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
#[tracing::instrument]
#[tracing::instrument(level = "debug", skip(self))]
async fn abort_multipart_upload(
&self,
_req: S3Request<AbortMultipartUploadInput>,
@@ -380,3 +415,23 @@ impl S3 for FS {
Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() }))
}
}
pub fn bytes_stream<S, E>(stream: S, content_length: usize) -> impl Stream<Item = Result<Bytes, E>> + Send + 'static
where
S: Stream<Item = Result<Bytes, E>> + Send + 'static,
E: Send + 'static,
{
AsyncTryStream::<Bytes, E, _>::new(|mut y| async move {
pin_mut!(stream);
let mut remaining: usize = content_length;
while let Some(result) = stream.next().await {
let mut bytes = result?;
if bytes.len() > remaining {
bytes.truncate(remaining);
}
remaining -= bytes.len();
y.yield_ok(bytes).await;
}
Ok(())
})
}

View File

@@ -9,7 +9,7 @@ if [ -n "$1" ]; then
fi
if [ -z "$RUST_LOG" ]; then
export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug"
export RUST_LOG="rustfs=debug,ecstore=info,s3s=debug"
fi
cargo run \