diff --git a/Cargo.lock b/Cargo.lock index d1a8696c..a4baa981 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -170,6 +170,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.6.0" @@ -287,6 +293,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -313,11 +320,13 @@ dependencies = [ "path-absolutize", "reed-solomon-erasure", "regex", + "rmp-serde", "serde", "serde_json", "thiserror", "time", "tokio", + "tokio-util", "tracing", "tracing-error", "transform-stream", @@ -855,6 +864,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "path-absolutize" version = "3.1.1" @@ -1027,6 +1042,28 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "rmp" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "228ed7c16fa39782c3b3468e974aec2795e9089153cd08ee2e9aefb3613334c4" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52e599a477cf9840e92f2cde9a7189e67b42c57532749bf90aea6ec10facd4db" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + [[package]] name = "rustc-demangle" version = "0.1.24" diff --git a/Cargo.toml b/Cargo.toml index 3e5de4b0..1d81177a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ futures = "0.3.30" bytes = "1.6.0" http = "1.1.0" thiserror = "1.0.61" -time = "0.3.36" +time = { version = "0.3.36", features = ["std", "parsing", "formatting", "macros", "serde"] } async-trait = "0.1.80" tokio = { version = "1.38.0", features = ["fs"] } anyhow = "1.0.86" diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index 0eb5a1ab..783c373c 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -28,6 +28,8 @@ tracing-error = "0.2.0" serde_json.workspace = true path-absolutize = "3.1.1" time.workspace = true +rmp-serde = "1.3.0" +tokio-util = "0.7.11" [dev-dependencies] tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/ecstore/src/bucket_meta.rs b/ecstore/src/bucket_meta.rs new file mode 100644 index 00000000..1a9a1313 --- /dev/null +++ b/ecstore/src/bucket_meta.rs @@ -0,0 +1,62 @@ +use rmp_serde::Serializer; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; + +use anyhow::Result; + +use crate::disk::BUCKET_META_PREFIX; + +pub const BUCKET_METADATA_FILE: &str = ".metadata.bin"; +pub const BUCKET_METADATA_FORMAT: u16 = 1; +pub const BUCKET_METADATA_VERSION: u16 = 1; + +#[derive(Debug, PartialEq, Deserialize, Serialize)] +pub struct BucketMetadata { + format: u16, + version: u16, + pub name: String, + pub created: OffsetDateTime, +} + +impl Default for BucketMetadata { + fn default() -> Self { + Self { + format: Default::default(), + version: Default::default(), + name: Default::default(), + created: OffsetDateTime::now_utc(), + } + } +} + +impl BucketMetadata { + pub fn new(name: &str) -> Self { + BucketMetadata { + format: BUCKET_METADATA_FORMAT, + version: BUCKET_METADATA_VERSION, + name: name.to_string(), + ..Default::default() + } + } + + pub fn save_file_path(&self) -> String { + format!( + "{}/{}/{}", + BUCKET_META_PREFIX, + self.name.as_str(), + BUCKET_METADATA_FILE + ) + // PathBuf::new() + // .join(BUCKET_META_PREFIX) + // .join(self.name.as_str()) + // .join(BUCKET_METADATA_FILE) + } + + pub fn marshal_msg(&self) -> Result> { + let mut buf = Vec::new(); + + self.serialize(&mut Serializer::new(&mut buf))?; + + Ok(buf) + } +} diff --git a/ecstore/src/chunk_stream.rs b/ecstore/src/chunk_stream.rs new file mode 100644 index 00000000..e89cfadd --- /dev/null +++ b/ecstore/src/chunk_stream.rs @@ -0,0 +1,252 @@ +use bytes::Bytes; +use futures::pin_mut; +use futures::stream::{Stream, StreamExt}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use transform_stream::AsyncTryStream; + +use crate::error::StdError; + +pub type SyncBoxFuture<'a, T> = Pin + Send + Sync + 'a>>; + +pub struct ChunkedStream { + /// inner + inner: AsyncTryStream>>, + + remaining_length: usize, +} + +impl ChunkedStream { + pub fn new(body: S, content_length: usize, chunk_size: usize, need_padding: bool) -> Self + where + S: Stream> + Send + Sync + 'static, + { + let inner = + AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), StdError>>>::new(|mut y| { + #[allow(clippy::shadow_same)] // necessary for `pin_mut!` + Box::pin(async move { + pin_mut!(body); + // 上一次没用完的数据 + let mut prev_bytes = Bytes::new(); + let mut readed_size = 0; + + loop { + let data: Vec = { + // 读固定大小的数据 + match Self::read_data(body.as_mut(), prev_bytes, chunk_size).await { + None => break, + Some(Err(e)) => return Err(e), + Some(Ok((data, remaining_bytes))) => { + prev_bytes = remaining_bytes; + data + } + } + }; + + for bytes in data { + readed_size += bytes.len(); + // println!( + // "readed_size {}, content_length {}", + // readed_size, content_length, + // ); + y.yield_ok(bytes).await; + } + + if readed_size + prev_bytes.len() >= content_length { + // println!( + // "读完了 readed_size:{} + prev_bytes.len({}) == content_length {}", + // readed_size, + // prev_bytes.len(), + // content_length, + // ); + + // 填充0? + if !need_padding { + y.yield_ok(prev_bytes).await; + break; + } + + let mut bytes = vec![0u8; chunk_size]; + let (left, _) = bytes.split_at_mut(prev_bytes.len()); + left.copy_from_slice(&prev_bytes); + + y.yield_ok(Bytes::from(bytes)).await; + + break; + } + } + + Ok(()) + }) + }); + Self { + inner, + remaining_length: content_length, + } + } + /// read data and return remaining bytes + async fn read_data( + mut body: Pin<&mut S>, + prev_bytes: Bytes, + data_size: usize, + ) -> Option, Bytes), StdError>> + where + S: Stream> + Send + 'static, + { + let mut bytes_buffer = Vec::new(); + + // 只执行一次 + let mut push_data_bytes = |mut bytes: Bytes| { + if bytes.is_empty() { + return None; + } + + if data_size == 0 { + return Some(bytes); + } + + // 合并上一次数据 + if !prev_bytes.is_empty() { + let need_size = data_size.wrapping_sub(prev_bytes.len()); + // println!( + // " 上一次有剩余{},从这一次中取{},共:{}", + // prev_bytes.len(), + // need_size, + // prev_bytes.len() + need_size + // ); + if bytes.len() >= need_size { + let data = bytes.split_to(need_size); + let mut combined = Vec::new(); + combined.extend_from_slice(&prev_bytes); + combined.extend_from_slice(&data); + + // println!( + // "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{}", + // need_size, + // combined.len(), + // bytes.len(), + // ); + + bytes_buffer.push(Bytes::from(combined)); + } else { + let mut combined = Vec::new(); + combined.extend_from_slice(&prev_bytes); + combined.extend_from_slice(&bytes); + + // println!( + // "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{},直接返回", + // need_size, + // combined.len(), + // bytes.len(), + // ); + + return Some(Bytes::from(combined)); + } + } + + // 取到的数据比需要的块大,从bytes中截取需要的块大小 + if data_size <= bytes.len() { + let n = bytes.len() / data_size; + + for _ in 0..n { + let data = bytes.split_to(data_size); + + // println!("bytes_buffer.push: {}, 剩余:{}", data.len(), bytes.len()); + bytes_buffer.push(data); + } + + Some(bytes) + } else { + // 不够 + Some(bytes) + } + }; + + // 剩余数据 + let remaining_bytes = 'outer: { + // // 如果上一次数据足够,跳出 + // if let Some(remaining_bytes) = push_data_bytes(prev_bytes) { + // println!("从剩下的取"); + // break 'outer remaining_bytes; + // } + + loop { + match body.next().await? { + Err(e) => return Some(Err(e)), + Ok(bytes) => { + if let Some(remaining_bytes) = push_data_bytes(bytes) { + break 'outer remaining_bytes; + } + } + } + } + }; + + Some(Ok((bytes_buffer, remaining_bytes))) + } + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let ans = Pin::new(&mut self.inner).poll_next(cx); + if let Poll::Ready(Some(Ok(ref bytes))) = ans { + self.remaining_length = self.remaining_length.saturating_sub(bytes.len()); + } + ans + } + + // pub fn exact_remaining_length(&self) -> usize { + // self.remaining_length + // } +} + +impl Stream for ChunkedStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll(cx) + } + + fn size_hint(&self) -> (usize, Option) { + (0, None) + } +} + +#[cfg(test)] +mod test { + + use super::*; + + #[tokio::test] + async fn test_chunked_stream() { + let chunk_size = 4; + + let data1 = vec![1u8; 7777]; // 65536 + let data2 = vec![1u8; 7777]; // 65536 + + let content_length = data1.len() + data2.len(); + + let chunk1 = Bytes::from(data1); + let chunk2 = Bytes::from(data2); + + let chunk_results: Vec> = vec![Ok(chunk1), Ok(chunk2)]; + + let stream = futures::stream::iter(chunk_results); + + let mut chunked_stream = ChunkedStream::new(stream, content_length, chunk_size, true); + + loop { + let ans1 = chunked_stream.next().await; + if ans1.is_none() { + break; + } + + let bytes = ans1.unwrap().unwrap(); + assert!(bytes.len() == chunk_size) + } + + // assert_eq!(ans1.unwrap(), chunk1_data.as_slice()); + } +} diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index bed35e51..c2d725c6 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -1,6 +1,7 @@ use std::{ fs::Metadata, path::{Path, PathBuf}, + sync::{Arc, RwLock}, }; use anyhow::{Error, Result}; @@ -25,7 +26,7 @@ pub const RUSTFS_META_TMP_DELETED_BUCKET: &str = ".rustfs.sys/tmp/.trash"; pub const BUCKET_META_PREFIX: &str = "buckets"; pub const FORMAT_CONFIG_FILE: &str = "format.json"; -pub type DiskStore = Box; +pub type DiskStore = Arc>; pub struct DiskOption { pub cleanup: bool, @@ -34,7 +35,8 @@ pub struct DiskOption { pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result { if ep.is_local { - Ok(LocalDisk::new(ep, opt.cleanup).await?) + let s = LocalDisk::new(ep, opt.cleanup).await?; + Ok(Arc::new(Box::new(s))) } else { unimplemented!() // Ok(Disk::Remote(RemoteDisk::new(ep, opt.health_check)?)) @@ -51,24 +53,24 @@ pub async fn init_disks( futures.push(new_disk(ep, opt)); } - let mut storages = Vec::with_capacity(eps.len()); + let mut res = Vec::with_capacity(eps.len()); let mut errors = Vec::with_capacity(eps.len()); let results = join_all(futures).await; for result in results { match result { Ok(s) => { - storages.push(Some(s)); + res.push(Some(s)); errors.push(None); } Err(e) => { - storages.push(None); + res.push(None); errors.push(Some(e)); } } } - (storages, errors) + (res, errors) } // pub async fn load_format(&self, heal: bool) -> Result { @@ -87,7 +89,7 @@ pub struct LocalDisk { } impl LocalDisk { - pub async fn new(ep: &Endpoint, cleanup: bool) -> Result> { + pub async fn new(ep: &Endpoint, cleanup: bool) -> Result { let root = fs::canonicalize(ep.url.path()).await?; if cleanup { @@ -131,7 +133,7 @@ impl LocalDisk { disk.make_meta_volumes().await?; - Ok(Box::new(disk)) + Ok(disk) } async fn make_meta_volumes(&self) -> Result<()> { @@ -241,6 +243,10 @@ fn skip_access_checks(p: impl AsRef) -> bool { #[async_trait::async_trait] impl DiskAPI for LocalDisk { + fn is_local(&self) -> bool { + true + } + #[must_use] async fn read_all(&self, volume: &str, path: &str) -> Result { let p = self.get_object_path(&volume, &path)?; diff --git a/ecstore/src/disk_api.rs b/ecstore/src/disk_api.rs index 5cc92faa..b993b980 100644 --- a/ecstore/src/disk_api.rs +++ b/ecstore/src/disk_api.rs @@ -5,6 +5,8 @@ use bytes::Bytes; #[async_trait::async_trait] pub trait DiskAPI: Debug + Send + Sync + 'static { + fn is_local(&self) -> bool; + async fn read_all(&self, volume: &str, path: &str) -> Result; async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()>; async fn rename_file( diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 828e6ed3..169b88bb 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -1,3 +1,5 @@ +mod bucket_meta; +mod chunk_stream; mod disk; mod disk_api; mod disks_layout; @@ -6,6 +8,7 @@ mod endpoint; mod erasure; pub mod error; mod format; +mod peer; mod sets; pub mod store; mod store_api; diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs new file mode 100644 index 00000000..ddfbe12e --- /dev/null +++ b/ecstore/src/peer.rs @@ -0,0 +1,178 @@ +use anyhow::Result; +use async_trait::async_trait; +use futures::future::join_all; +use std::{fmt::Debug, sync::Arc}; + +use crate::{ + disk::{DiskError, DiskStore}, + endpoint::{EndpointServerPools, Node}, + store_api::MakeBucketOptions, +}; + +type Client = Arc>; + +#[async_trait] +pub trait PeerS3Client: Debug + Sync + Send + 'static { + async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; + fn get_pools(&self) -> Vec; +} + +#[derive(Debug)] +pub struct S3PeerSys { + pub clients: Vec, + pub pools_count: usize, +} + +impl S3PeerSys { + pub fn new(eps: &EndpointServerPools, local_disks: Vec) -> Self { + Self { + clients: Self::new_clients(eps, local_disks), + pools_count: eps.len(), + } + } + + fn new_clients(eps: &EndpointServerPools, local_disks: Vec) -> Vec { + let nodes = eps.get_nodes(); + let v: Vec = nodes + .iter() + .map(|e| { + if e.is_local { + let cli: Box = Box::new(LocalPeerS3Client::new( + local_disks.clone(), + e.clone(), + e.pools.clone(), + )); + Arc::new(cli) + } else { + let cli: Box = + Box::new(RemotePeerS3Client::new(e.clone(), e.pools.clone())); + Arc::new(cli) + } + }) + .collect(); + + v + } +} + +#[async_trait] +impl PeerS3Client for S3PeerSys { + fn get_pools(&self) -> Vec { + unimplemented!() + } + async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { + let mut futures = Vec::with_capacity(self.clients.len()); + for cli in self.clients.iter() { + futures.push(cli.make_bucket(bucket, opts)); + } + + let mut errors = Vec::with_capacity(self.clients.len()); + + let results = join_all(futures).await; + for result in results { + match result { + Ok(_) => { + errors.push(None); + } + Err(e) => { + errors.push(Some(e)); + } + } + } + + for i in 0..self.pools_count { + let mut per_pool_errs = Vec::with_capacity(self.clients.len()); + for (j, cli) in self.clients.iter().enumerate() { + let pools = cli.get_pools(); + let idx = i as i32; + if pools.contains(&idx) { + per_pool_errs.push(errors[j].as_ref()); + } + + // TODO: reduceWriteQuorumErrs + } + } + + // TODO: + + Ok(()) + } +} + +#[derive(Debug)] +pub struct LocalPeerS3Client { + pub local_disks: Vec, + pub node: Node, + pub pools: Vec, +} + +impl LocalPeerS3Client { + fn new(local_disks: Vec, node: Node, pools: Vec) -> Self { + Self { + local_disks, + node, + pools, + } + } +} + +#[async_trait] +impl PeerS3Client for LocalPeerS3Client { + fn get_pools(&self) -> Vec { + self.pools.clone() + } + async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { + let mut futures = Vec::with_capacity(self.local_disks.len()); + for disk in self.local_disks.iter() { + futures.push(async move { + match disk.make_volume(bucket).await { + Ok(_) => Ok(()), + Err(e) => { + if opts.force_create && DiskError::is_err(&e, &DiskError::VolumeExists) { + return Ok(()); + } + + Err(e) + } + } + }); + } + + let results = join_all(futures).await; + + let mut errs = Vec::new(); + + for res in results { + match res { + Ok(_) => errs.push(None), + Err(e) => errs.push(Some(e)), + } + } + + // TODO: reduceWriteQuorumErrs + + Ok(()) + } +} + +#[derive(Debug)] +pub struct RemotePeerS3Client { + pub node: Node, + pub pools: Vec, +} + +impl RemotePeerS3Client { + fn new(node: Node, pools: Vec) -> Self { + Self { node, pools } + } +} + +#[async_trait] +impl PeerS3Client for RemotePeerS3Client { + fn get_pools(&self) -> Vec { + unimplemented!() + } + async fn make_bucket(&self, _bucket: &str, _opts: &MakeBucketOptions) -> Result<()> { + unimplemented!() + } +} diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index aedb44cc..2970978d 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -1,13 +1,19 @@ use anyhow::Result; +use futures::StreamExt; use uuid::Uuid; -use crate::{endpoint::PoolEndpoints, format::FormatV3}; +use crate::{ + disk::DiskStore, + endpoint::PoolEndpoints, + format::FormatV3, + store_api::{MakeBucketOptions, ObjectOptions, PutObjReader, StorageAPI}, +}; #[derive(Debug)] pub struct Sets { pub id: Uuid, // pub sets: Vec, - pub disk_indexs: Vec>, // [set_count_idx][set_drive_count_idx] = disk_idx + pub disk_indexs: Vec>>, // [set_count_idx][set_drive_count_idx] = disk_idx pub pool_idx: usize, pub endpoints: PoolEndpoints, pub format: FormatV3, @@ -18,6 +24,7 @@ pub struct Sets { impl Sets { pub fn new( + disks: Vec>, endpoints: &PoolEndpoints, fm: &FormatV3, pool_idx: usize, @@ -29,13 +36,18 @@ impl Sets { let mut disk_indexs = Vec::with_capacity(set_count); for i in 0..set_count { - let mut set_indexs = Vec::with_capacity(set_drive_count); + let mut set_drive = Vec::with_capacity(set_drive_count); for j in 0..set_drive_count { let idx = i * set_drive_count + j; - set_indexs.push(idx); + if disks[idx].is_none() { + set_drive.push(None); + } else { + let disk = disks[idx].clone(); + set_drive.push(disk); + } } - disk_indexs.push(set_indexs); + disk_indexs.push(set_drive); } let sets = Self { @@ -52,7 +64,7 @@ impl Sets { Ok(sets) } - pub fn get_disks(&self, set_idx: usize) -> Vec { + pub fn get_disks(&self, set_idx: usize) -> Vec> { self.disk_indexs[set_idx].clone() } } @@ -66,3 +78,21 @@ impl Sets { // pub set_drive_count: usize, // pub default_parity_count: usize, // } + +#[async_trait::async_trait] +impl StorageAPI for Sets { + async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { + unimplemented!() + } + + async fn put_object( + &self, + bucket: &str, + object: &str, + data: &PutObjReader, + opts: &ObjectOptions, + ) -> Result<()> { + // data.stream.next(); + unimplemented!() + } +} diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index af7d868d..b5e843c8 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -1,15 +1,21 @@ -use std::collections::HashMap; +use std::{collections::HashMap, io::Cursor}; use anyhow::{Error, Result}; + +use tokio_util::io::ReaderStream; use uuid::Uuid; use crate::{ - disk::{self, DiskError, DiskOption, DiskStore}, + bucket_meta::BucketMetadata, + disk::{self, DiskError, DiskOption, DiskStore, RUSTFS_META_BUCKET}, disks_layout::DisksLayout, endpoint::EndpointServerPools, + peer::{PeerS3Client, S3PeerSys}, sets::Sets, - store_api::{MakeBucketOptions, StorageAPI}, + store_api::{MakeBucketOptions, ObjectOptions, PutObjReader, StorageAPI}, store_init, + stream::into_dyn, + utils, }; #[derive(Debug)] @@ -18,7 +24,8 @@ pub struct ECStore { // pub disks: Vec, pub disk_map: HashMap>>, pub pools: Vec, - pub peer: Vec, + pub peer_sys: S3PeerSys, + pub local_disks: Vec, } impl ECStore { @@ -35,6 +42,8 @@ impl ECStore { let first_is_local = endpoint_pools.first_is_local(); + let mut local_disks = Vec::new(); + for (i, pool_eps) in endpoint_pools.iter().enumerate() { // TODO: read from config parseStorageClass let partiy_count = store_init::default_partiy_count(pool_eps.drives_per_set); @@ -71,28 +80,83 @@ impl ECStore { deployment_id = Some(Uuid::new_v4()); } - disk_map.insert(i, disks); + for disk in disks.iter() { + if disk.is_some() && disk.as_ref().unwrap().is_local() { + local_disks.push(disk.as_ref().unwrap().clone()); + } + } - let sets = Sets::new(pool_eps, &fm, i, partiy_count)?; + let sets = Sets::new(disks.clone(), pool_eps, &fm, i, partiy_count)?; pools.push(sets); + + disk_map.insert(i, disks); } + let peer_sys = S3PeerSys::new(&endpoint_pools, local_disks.clone()); + Ok(ECStore { id: deployment_id.unwrap(), disk_map, pools, - peer: Vec::new(), + local_disks, + peer_sys, }) } + + fn single_pool(&self) -> bool { + self.pools.len() == 1 + } } +#[async_trait::async_trait] impl StorageAPI for ECStore { async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> { // TODO: check valid bucket name - unimplemented!() + + // TODO: delete created bucket when error + self.peer_sys.make_bucket(bucket, opts).await?; + + let meta = BucketMetadata::new(bucket); + let data = meta.marshal_msg()?; + let file_path = meta.save_file_path(); + + let stream = ReaderStream::new(Cursor::new(data)); + + // TODO: wrap hash reader + + // let reader = PutObjReader::new(stream); + + // self.put_object( + // RUSTFS_META_BUCKET, + // &file_path, + // &reader, + // &ObjectOptions { max_parity: true }, + // ) + // .await?; + + // TODO: toObjectErr + + Ok(()) } - async fn put_object(&self, bucket: &str, objcet: &str) -> Result<()> { + async fn put_object( + &self, + bucket: &str, + object: &str, + data: &PutObjReader, + opts: &ObjectOptions, + ) -> Result<()> { + // checkPutObjectArgs + + let object = utils::path::encode_dir_object(object); + + if self.single_pool() { + self.pools[0] + .put_object(bucket, object.as_str(), data, opts) + .await?; + return Ok(()); + } + unimplemented!() } } diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index c74a68a0..7d6b4bcb 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -1,9 +1,39 @@ +use std::sync::Arc; + use anyhow::Result; +use bytes::Bytes; +use futures::Stream; -pub struct MakeBucketOptions {} +use crate::stream::DynByteStream; +pub struct MakeBucketOptions { + pub force_create: bool, +} + +pub struct PutObjReader { + // pub stream: Box>, +} + +// impl PutObjReader { +// pub fn new(stream: S) -> Self { +// PutObjReader { stream } +// } +// } + +pub struct ObjectOptions { + // Use the maximum parity (N/2), used when saving server configuration files + pub max_parity: bool, +} + +#[async_trait::async_trait] pub trait StorageAPI { async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; - async fn put_object(&self, bucket: &str, objcet: &str) -> Result<()>; + async fn put_object( + &self, + bucket: &str, + object: &str, + data: &PutObjReader, + opts: &ObjectOptions, + ) -> Result<()>; } diff --git a/ecstore/src/stream.rs b/ecstore/src/stream.rs index e89cfadd..cd643c4d 100644 --- a/ecstore/src/stream.rs +++ b/ecstore/src/stream.rs @@ -1,252 +1,174 @@ -use bytes::Bytes; -use futures::pin_mut; -use futures::stream::{Stream, StreamExt}; -use std::future::Future; +use anyhow::Error; +use tokio::io::AsyncRead; +use tokio_util::io::ReaderStream; + +use std::collections::VecDeque; +use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; -use transform_stream::AsyncTryStream; -use crate::error::StdError; +use bytes::Bytes; +use futures::{pin_mut, Stream, StreamExt}; -pub type SyncBoxFuture<'a, T> = Pin + Send + Sync + 'a>>; - -pub struct ChunkedStream { - /// inner - inner: AsyncTryStream>>, - - remaining_length: usize, +pub trait ByteStream: Stream { + fn remaining_length(&self) -> RemainingLength { + RemainingLength::unknown() + } } -impl ChunkedStream { - pub fn new(body: S, content_length: usize, chunk_size: usize, need_padding: bool) -> Self - where - S: Stream> + Send + Sync + 'static, - { - let inner = - AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), StdError>>>::new(|mut y| { - #[allow(clippy::shadow_same)] // necessary for `pin_mut!` - Box::pin(async move { - pin_mut!(body); - // 上一次没用完的数据 - let mut prev_bytes = Bytes::new(); - let mut readed_size = 0; +pub type DynByteStream = + Pin> + Send + Sync + 'static>>; - loop { - let data: Vec = { - // 读固定大小的数据 - match Self::read_data(body.as_mut(), prev_bytes, chunk_size).await { - None => break, - Some(Err(e)) => return Err(e), - Some(Ok((data, remaining_bytes))) => { - prev_bytes = remaining_bytes; - data - } - } - }; +pub struct RemainingLength { + lower: usize, + upper: Option, +} - for bytes in data { - readed_size += bytes.len(); - // println!( - // "readed_size {}, content_length {}", - // readed_size, content_length, - // ); - y.yield_ok(bytes).await; - } +impl RemainingLength { + /// Creates a new `RemainingLength` with the given lower and upper bounds. + /// + /// # Panics + /// This function asserts that `lower <= upper`. + #[must_use] + pub fn new(lower: usize, upper: Option) -> Self { + if let Some(upper) = upper { + assert!(lower <= upper); + } + Self { lower, upper } + } - if readed_size + prev_bytes.len() >= content_length { - // println!( - // "读完了 readed_size:{} + prev_bytes.len({}) == content_length {}", - // readed_size, - // prev_bytes.len(), - // content_length, - // ); - - // 填充0? - if !need_padding { - y.yield_ok(prev_bytes).await; - break; - } - - let mut bytes = vec![0u8; chunk_size]; - let (left, _) = bytes.split_at_mut(prev_bytes.len()); - left.copy_from_slice(&prev_bytes); - - y.yield_ok(Bytes::from(bytes)).await; - - break; - } - } - - Ok(()) - }) - }); + #[must_use] + pub fn unknown() -> Self { Self { - inner, - remaining_length: content_length, + lower: 0, + upper: None, } } - /// read data and return remaining bytes - async fn read_data( - mut body: Pin<&mut S>, - prev_bytes: Bytes, - data_size: usize, - ) -> Option, Bytes), StdError>> - where - S: Stream> + Send + 'static, - { - let mut bytes_buffer = Vec::new(); - // 只执行一次 - let mut push_data_bytes = |mut bytes: Bytes| { - if bytes.is_empty() { - return None; - } - - if data_size == 0 { - return Some(bytes); - } - - // 合并上一次数据 - if !prev_bytes.is_empty() { - let need_size = data_size.wrapping_sub(prev_bytes.len()); - // println!( - // " 上一次有剩余{},从这一次中取{},共:{}", - // prev_bytes.len(), - // need_size, - // prev_bytes.len() + need_size - // ); - if bytes.len() >= need_size { - let data = bytes.split_to(need_size); - let mut combined = Vec::new(); - combined.extend_from_slice(&prev_bytes); - combined.extend_from_slice(&data); - - // println!( - // "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{}", - // need_size, - // combined.len(), - // bytes.len(), - // ); - - bytes_buffer.push(Bytes::from(combined)); - } else { - let mut combined = Vec::new(); - combined.extend_from_slice(&prev_bytes); - combined.extend_from_slice(&bytes); - - // println!( - // "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{},直接返回", - // need_size, - // combined.len(), - // bytes.len(), - // ); - - return Some(Bytes::from(combined)); - } - } - - // 取到的数据比需要的块大,从bytes中截取需要的块大小 - if data_size <= bytes.len() { - let n = bytes.len() / data_size; - - for _ in 0..n { - let data = bytes.split_to(data_size); - - // println!("bytes_buffer.push: {}, 剩余:{}", data.len(), bytes.len()); - bytes_buffer.push(data); - } - - Some(bytes) - } else { - // 不够 - Some(bytes) - } - }; - - // 剩余数据 - let remaining_bytes = 'outer: { - // // 如果上一次数据足够,跳出 - // if let Some(remaining_bytes) = push_data_bytes(prev_bytes) { - // println!("从剩下的取"); - // break 'outer remaining_bytes; - // } - - loop { - match body.next().await? { - Err(e) => return Some(Err(e)), - Ok(bytes) => { - if let Some(remaining_bytes) = push_data_bytes(bytes) { - break 'outer remaining_bytes; - } - } - } - } - }; - - Some(Ok((bytes_buffer, remaining_bytes))) - } - - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let ans = Pin::new(&mut self.inner).poll_next(cx); - if let Poll::Ready(Some(Ok(ref bytes))) = ans { - self.remaining_length = self.remaining_length.saturating_sub(bytes.len()); + #[must_use] + pub fn new_exact(n: usize) -> Self { + Self { + lower: n, + upper: Some(n), } - ans } - // pub fn exact_remaining_length(&self) -> usize { - // self.remaining_length - // } + #[must_use] + pub fn exact(&self) -> Option { + self.upper.filter(|&upper| upper == self.lower) + } } -impl Stream for ChunkedStream { - type Item = Result; +impl fmt::Debug for RemainingLength { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if let Some(exact) = self.exact() { + return write!(f, "{exact}"); + } + match self.upper { + Some(upper) => write!(f, "({}..={})", self.lower, upper), + None => write!(f, "({}..)", self.lower), + } + } +} - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll(cx) +pub(crate) fn into_dyn(s: S) -> DynByteStream +where + S: ByteStream> + Send + Sync + Unpin + 'static, + E: std::error::Error + Send + Sync + 'static, +{ + Box::pin(Wrapper(s)) +} + +struct Wrapper(S); + +impl Stream for Wrapper +where + S: ByteStream> + Send + Sync + Unpin + 'static, + E: std::error::Error + Send + Sync + 'static, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = Pin::new(&mut self.0); + this.poll_next(cx).map_err(|e| Error::new(e)) } fn size_hint(&self) -> (usize, Option) { - (0, None) + self.0.size_hint() } } -#[cfg(test)] -mod test { +impl ByteStream for Wrapper +where + S: ByteStream> + Send + Sync + Unpin + 'static, + E: std::error::Error + Send + Sync + 'static, +{ + fn remaining_length(&self) -> RemainingLength { + self.0.remaining_length() + } +} - use super::*; +// FIXME: unbounded memory allocation +pub(crate) async fn aggregate_unlimited(stream: S) -> Result, E> +where + S: ByteStream>, +{ + let mut vec = Vec::new(); + pin_mut!(stream); + while let Some(result) = stream.next().await { + vec.push(result?); + } + Ok(vec) +} - #[tokio::test] - async fn test_chunked_stream() { - let chunk_size = 4; +pub(crate) struct VecByteStream { + queue: VecDeque, + remaining_bytes: usize, +} - let data1 = vec![1u8; 7777]; // 65536 - let data2 = vec![1u8; 7777]; // 65536 +impl VecByteStream { + pub fn new(v: Vec) -> Self { + let total = v + .iter() + .map(Bytes::len) + .try_fold(0, usize::checked_add) + .expect("length overflow"); - let content_length = data1.len() + data2.len(); - - let chunk1 = Bytes::from(data1); - let chunk2 = Bytes::from(data2); - - let chunk_results: Vec> = vec![Ok(chunk1), Ok(chunk2)]; - - let stream = futures::stream::iter(chunk_results); - - let mut chunked_stream = ChunkedStream::new(stream, content_length, chunk_size, true); - - loop { - let ans1 = chunked_stream.next().await; - if ans1.is_none() { - break; - } - - let bytes = ans1.unwrap().unwrap(); - assert!(bytes.len() == chunk_size) + Self { + queue: v.into(), + remaining_bytes: total, } + } - // assert_eq!(ans1.unwrap(), chunk1_data.as_slice()); + pub fn exact_remaining_length(&self) -> usize { + self.remaining_bytes } } + +impl Stream for VecByteStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + let this = Pin::into_inner(self); + match this.queue.pop_front() { + Some(b) => { + this.remaining_bytes -= b.len(); + Poll::Ready(Some(Ok(b))) + } + None => Poll::Ready(None), + } + } + + fn size_hint(&self) -> (usize, Option) { + let cnt = self.queue.len(); + (cnt, Some(cnt)) + } +} + +impl ByteStream for VecByteStream { + fn remaining_length(&self) -> RemainingLength { + RemainingLength::new_exact(self.remaining_bytes) + } +} + +impl ByteStream for ReaderStream {} diff --git a/ecstore/src/utils/mod.rs b/ecstore/src/utils/mod.rs index 5f521a38..9e4b9579 100644 --- a/ecstore/src/utils/mod.rs +++ b/ecstore/src/utils/mod.rs @@ -1,2 +1,3 @@ pub mod net; +pub mod path; pub mod string; diff --git a/ecstore/src/utils/path.rs b/ecstore/src/utils/path.rs new file mode 100644 index 00000000..40559577 --- /dev/null +++ b/ecstore/src/utils/path.rs @@ -0,0 +1,34 @@ +const GLOBAL_DIR_SUFFIX: &str = "__XLDIR__"; +const SLASH_SEPARATOR: char = '/'; + +pub fn has_suffix(s: &str, suffix: &str) -> bool { + if cfg!(target_os = "windows") { + s.to_lowercase().ends_with(&suffix.to_lowercase()) + } else { + s.ends_with(suffix) + } +} + +pub fn encode_dir_object(object: &str) -> String { + if has_suffix(object, &SLASH_SEPARATOR.to_string()) { + format!( + "{}{}", + object.trim_end_matches(SLASH_SEPARATOR), + GLOBAL_DIR_SUFFIX + ) + } else { + object.to_string() + } +} + +pub fn decode_dir_object(object: &str) -> String { + if has_suffix(object, GLOBAL_DIR_SUFFIX) { + format!( + "{}{}", + object.trim_end_matches(GLOBAL_DIR_SUFFIX), + SLASH_SEPARATOR + ) + } else { + object.to_string() + } +}