From 1b60d054347306222d85395cc186ef1ad3f59fb7 Mon Sep 17 00:00:00 2001 From: weisd Date: Wed, 24 Jul 2024 16:45:23 +0800 Subject: [PATCH] test:get_object --- Cargo.lock | 3 ++ ecstore/src/disk.rs | 2 + ecstore/src/erasure.rs | 89 +++++++++++++++++++++++++++++--- ecstore/src/store_api.rs | 18 ++++--- rustfs/Cargo.toml | 6 ++- rustfs/src/storage/ecfs.rs | 103 ++++++++++++++++++++++++++++--------- scripts/run.sh | 2 +- 7 files changed, 183 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 38a14fc3..09ba10ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1091,8 +1091,10 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "bytes", "clap", "ecstore", + "futures", "http", "hyper-util", "mime", @@ -1101,6 +1103,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "transform-stream", ] [[package]] diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index 3309017a..b7054b2b 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -509,6 +509,8 @@ impl DiskAPI for LocalDisk { } async fn read_file(&self, volume: &str, path: &str) -> Result { let p = self.get_object_path(&volume, &path)?; + + debug!("read_file {:?}", &p); let file = File::options().read(true).open(&p).await?; Ok(FileReader::new(file)) diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index b3a715af..d120bc61 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -9,6 +9,9 @@ use s3s::dto::StreamingBlob; use s3s::StdError; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; +use tokio::io::DuplexStream; +use tracing::debug; +use tracing::warn; // use tracing::debug; use uuid::Uuid; @@ -49,14 +52,16 @@ impl Erasure { { let mut stream = ChunkedStream::new(body, total_size, self.block_size, true); let mut total: usize = 0; - // let mut idx = 0; + let mut idx = 0; while let Some(result) = stream.next().await { match result { Ok(data) => { let blocks = self.encode_data(data.as_ref())?; + warn!("encode shard size: {}/{} from block_size {} ", blocks[0].len(), blocks.len(), data.len()); + let mut errs = Vec::new(); - // idx += 1; + idx += 1; for (i, w) in writers.iter_mut().enumerate() { total += blocks[i].len(); @@ -88,7 +93,7 @@ impl Erasure { } } - // debug!("{} encode_data done {}", self.id, total); + warn!(" encode_data done shard block num {}", idx); Ok(total) @@ -102,7 +107,7 @@ impl Erasure { pub async fn decode( &self, - writer: &StreamingBlob, + writer: &mut DuplexStream, readers: Vec>, offset: usize, length: usize, @@ -117,6 +122,9 @@ impl Erasure { let start_block = offset / self.block_size; let end_block = (offset + length) / self.block_size; + warn!("decode block from {} to {}", start_block, end_block); + + let mut bytes_writed = 0; for block_idx in start_block..=end_block { let mut block_offset = 0; let mut block_length = 0; @@ -138,15 +146,77 @@ impl Erasure { break; } + warn!("decode block_offset {},block_length {} ", block_offset, block_length); + let mut bufs = reader.read().await?; self.decode_data(&mut bufs)?; + + let writed_n = self + .write_data_blocks(writer, bufs, self.data_shards, block_offset, block_length) + .await?; + + bytes_writed += writed_n; } - unimplemented!() + + if bytes_writed != length { + warn!("bytes_writed != length "); + return Err(Error::msg("erasure decode less data")); + } + + Ok(bytes_writed) } - fn write_data_blocks() -> Result<()> { - unimplemented!() + async fn write_data_blocks( + &self, + writer: &mut DuplexStream, + bufs: Vec>>, + data_blocks: usize, + offset: usize, + length: usize, + ) -> Result { + if bufs.len() < data_blocks { + return Err(Error::msg("read bufs not match data_blocks")); + } + + let data_len: usize = bufs + .iter() + .take(data_blocks) + .filter(|v| v.is_some()) + .map(|v| v.as_ref().unwrap().len()) + .sum(); + if data_len < length { + return Err(Error::msg(format!("write_data_blocks data_len < length {} < {}", data_len, length))); + } + + let mut offset = offset; + + debug!("write_data_blocks offset {}, length {}", offset, length); + + let mut write = length; + let mut total_writed = 0; + + for opt_buf in bufs.iter().take(data_blocks) { + let buf = opt_buf.as_ref().unwrap(); + + if offset >= buf.len() { + offset -= buf.len(); + continue; + } + + let buf = &buf[offset..]; + + offset = 0; + + // if write < buf.len() {} + + let n = writer.write(buf).await?; + + write -= n; + total_writed += n; + } + + Ok(total_writed) } pub fn encode_data(&self, data: &[u8]) -> Result>> { @@ -239,6 +309,8 @@ impl ShardReader { pub async fn read(&mut self) -> Result>>> { // let mut disks = self.readers; + warn!("shard reader read offset {}, shard_size {}", self.offset, self.shard_size); + let reader_length = self.readers.len(); let mut futures = Vec::with_capacity(reader_length); @@ -271,6 +343,9 @@ impl ShardReader { } } + // debug!("ec decode read ress {:?}", &ress); + debug!("ec decode read errors {:?}", &errors); + if !self.can_decode(&ress) { return Err(Error::msg("shard reader read faild")); } diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index e3543105..56ee977b 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -1,9 +1,11 @@ use anyhow::{Error, Result}; + use http::HeaderMap; use rmp_serde::Serializer; use s3s::dto::StreamingBlob; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; +use tokio::io::{AsyncRead, DuplexStream}; use uuid::Uuid; pub const ERASURE_ALGORITHM: &str = "rs-vandermonde"; @@ -58,11 +60,12 @@ impl FileInfo { Ok(t) } - pub fn add_object_part(&mut self, num: usize, part_size: usize, mod_time: OffsetDateTime) { + pub fn add_object_part(&mut self, num: usize, part_size: usize, mod_time: OffsetDateTime, actual_size: usize) { let part = ObjectPartInfo { number: num, size: part_size, mod_time, + actual_size, }; for p in self.parts.iter_mut() { @@ -179,7 +182,7 @@ pub struct ObjectPartInfo { // pub etag: Option, pub number: usize, pub size: usize, - // pub actual_size: usize, + pub actual_size: usize, // 源数据大小 pub mod_time: OffsetDateTime, // pub index: Option>, // pub checksums: Option>, @@ -191,6 +194,7 @@ impl Default for ObjectPartInfo { number: Default::default(), size: Default::default(), mod_time: OffsetDateTime::UNIX_EPOCH, + actual_size: Default::default(), } } } @@ -260,11 +264,11 @@ pub struct GetObjectReader { pub object_info: ObjectInfo, } -impl GetObjectReader { - pub fn new(stream: StreamingBlob, object_info: ObjectInfo) -> Self { - GetObjectReader { stream, object_info } - } -} +// impl GetObjectReader { +// pub fn new(stream: StreamingBlob, object_info: ObjectInfo) -> Self { +// GetObjectReader { stream, object_info } +// } +// } pub struct HTTPRangeSpec { pub is_shuffix_length: bool, diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 2cac5e65..45f83802 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -10,7 +10,7 @@ rust-version.workspace = true [dependencies] async-trait.workspace = true -tracing.workspace = true +tracing.workspace = true anyhow.workspace = true time = { workspace = true, features = ["parsing", "formatting"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "net", "signal"] } @@ -23,3 +23,7 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] } hyper-util = { version = "0.1.5", features = ["tokio", "server-auto", "server-graceful"] } http.workspace = true mime = "0.3.17" +bytes.workspace = true +transform-stream = "0.3.0" +futures.workspace = true +# pin-utils = "0.1.0" diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 79f5421e..a760827c 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1,11 +1,16 @@ +use bytes::Bytes; use ecstore::disk_api::DiskError; use ecstore::store_api::BucketOptions; use ecstore::store_api::CompletePart; +use ecstore::store_api::HTTPRangeSpec; use ecstore::store_api::MakeBucketOptions; use ecstore::store_api::MultipartUploadResult; use ecstore::store_api::ObjectOptions; use ecstore::store_api::PutObjReader; use ecstore::store_api::StorageAPI; +use futures::pin_mut; +use futures::{Stream, StreamExt}; +use http::HeaderMap; use s3s::dto::*; use s3s::s3_error; use s3s::S3Error; @@ -15,11 +20,12 @@ use s3s::S3; use s3s::{S3Request, S3Response}; use std::fmt::Debug; use std::str::FromStr; +use time::OffsetDateTime; +use transform_stream::AsyncTryStream; use anyhow::Result; use ecstore::store::ECStore; use tracing::debug; -use tracing::info; macro_rules! try_ { ($result:expr) => { @@ -43,10 +49,13 @@ impl FS { Ok(Self { store }) } } - #[async_trait::async_trait] impl S3 for FS { - #[tracing::instrument] + #[tracing::instrument( + level = "debug", + skip(self, req), + fields(start_time=?time::OffsetDateTime::now_utc()) + )] async fn create_bucket(&self, req: S3Request) -> S3Result> { let input = req.input; @@ -60,7 +69,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn copy_object(&self, req: S3Request) -> S3Result> { let input = req.input; let (_bucket, _key) = match input.copy_source { @@ -72,14 +81,14 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn delete_bucket(&self, req: S3Request) -> S3Result> { let _input = req.input; Ok(S3Response::new(DeleteBucketOutput {})) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn delete_object(&self, req: S3Request) -> S3Result> { let _input = req.input; @@ -87,7 +96,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn delete_objects(&self, req: S3Request) -> S3Result> { let _input = req.input; @@ -95,7 +104,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn get_bucket_location(&self, req: S3Request) -> S3Result> { // mc get 1 let input = req.input; @@ -121,18 +130,44 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument( + level = "debug", + skip(self, req), + fields(start_time=?time::OffsetDateTime::now_utc()) + )] async fn get_object(&self, req: S3Request) -> S3Result> { // mc get 3 - let input = req.input; - println!("get_object: {:?}", &input); + let GetObjectInput { bucket, key, .. } = req.input; - let output = GetObjectOutput { ..Default::default() }; + let range = HTTPRangeSpec::nil(); + + let h = HeaderMap::new(); + let opts = &ObjectOptions { + max_parity: false, + mod_time: OffsetDateTime::UNIX_EPOCH, + part_number: 0, + }; + + let reader = try_!( + self.store + .get_object_reader(bucket.as_str(), key.as_str(), range, h, opts) + .await + ); + + // let body = bytes_stream(reader.stream, 100); + + let output = GetObjectOutput { + body: Some(reader.stream), + content_length: Some(reader.object_info.size as i64), + ..Default::default() + }; + + debug!("get_object response {:?}", output); Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn head_bucket(&self, req: S3Request) -> S3Result> { let input = req.input; @@ -148,7 +183,7 @@ impl S3 for FS { Ok(S3Response::new(HeadBucketOutput::default())) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn head_object(&self, req: S3Request) -> S3Result> { // mc get 2 let HeadObjectInput { @@ -184,13 +219,13 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self))] async fn list_buckets(&self, _: S3Request) -> S3Result> { let output = ListBucketsOutput { ..Default::default() }; Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn list_objects(&self, req: S3Request) -> S3Result> { let v2_resp = self.list_objects_v2(req.map_input(Into::into)).await?; @@ -205,7 +240,7 @@ impl S3 for FS { })) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn list_objects_v2(&self, req: S3Request) -> S3Result> { let _input = req.input; @@ -213,7 +248,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn put_object(&self, req: S3Request) -> S3Result> { let input = req.input; @@ -247,7 +282,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn create_multipart_upload( &self, req: S3Request, @@ -276,7 +311,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn upload_part(&self, req: S3Request) -> S3Result> { let UploadPartInput { body, @@ -309,7 +344,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn upload_part_copy(&self, req: S3Request) -> S3Result> { let _input = req.input; @@ -318,7 +353,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn list_parts(&self, req: S3Request) -> S3Result> { let ListPartsInput { bucket, key, upload_id, .. @@ -333,7 +368,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self, req))] async fn complete_multipart_upload( &self, req: S3Request, @@ -372,7 +407,7 @@ impl S3 for FS { Ok(S3Response::new(output)) } - #[tracing::instrument] + #[tracing::instrument(level = "debug", skip(self))] async fn abort_multipart_upload( &self, _req: S3Request, @@ -380,3 +415,23 @@ impl S3 for FS { Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() })) } } + +pub fn bytes_stream(stream: S, content_length: usize) -> impl Stream> + Send + 'static +where + S: Stream> + Send + 'static, + E: Send + 'static, +{ + AsyncTryStream::::new(|mut y| async move { + pin_mut!(stream); + let mut remaining: usize = content_length; + while let Some(result) = stream.next().await { + let mut bytes = result?; + if bytes.len() > remaining { + bytes.truncate(remaining); + } + remaining -= bytes.len(); + y.yield_ok(bytes).await; + } + Ok(()) + }) +} diff --git a/scripts/run.sh b/scripts/run.sh index 8e9ee7a6..5cc7c63d 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -9,7 +9,7 @@ if [ -n "$1" ]; then fi if [ -z "$RUST_LOG" ]; then - export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug" + export RUST_LOG="rustfs=debug,ecstore=info,s3s=debug" fi cargo run \