todo:put_object

This commit is contained in:
weisd
2024-06-28 18:13:50 +08:00
parent 4c7c3d4c96
commit 064a180b3a
15 changed files with 868 additions and 245 deletions

37
Cargo.lock generated
View File

@@ -170,6 +170,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.6.0"
@@ -287,6 +293,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4"
dependencies = [
"powerfmt",
"serde",
]
[[package]]
@@ -313,11 +320,13 @@ dependencies = [
"path-absolutize",
"reed-solomon-erasure",
"regex",
"rmp-serde",
"serde",
"serde_json",
"thiserror",
"time",
"tokio",
"tokio-util",
"tracing",
"tracing-error",
"transform-stream",
@@ -855,6 +864,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "paste"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
[[package]]
name = "path-absolutize"
version = "3.1.1"
@@ -1027,6 +1042,28 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
[[package]]
name = "rmp"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "228ed7c16fa39782c3b3468e974aec2795e9089153cd08ee2e9aefb3613334c4"
dependencies = [
"byteorder",
"num-traits",
"paste",
]
[[package]]
name = "rmp-serde"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52e599a477cf9840e92f2cde9a7189e67b42c57532749bf90aea6ec10facd4db"
dependencies = [
"byteorder",
"rmp",
"serde",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"

View File

@@ -16,7 +16,7 @@ futures = "0.3.30"
bytes = "1.6.0"
http = "1.1.0"
thiserror = "1.0.61"
time = "0.3.36"
time = { version = "0.3.36", features = ["std", "parsing", "formatting", "macros", "serde"] }
async-trait = "0.1.80"
tokio = { version = "1.38.0", features = ["fs"] }
anyhow = "1.0.86"

View File

@@ -28,6 +28,8 @@ tracing-error = "0.2.0"
serde_json.workspace = true
path-absolutize = "3.1.1"
time.workspace = true
rmp-serde = "1.3.0"
tokio-util = "0.7.11"
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }

View File

@@ -0,0 +1,62 @@
use rmp_serde::Serializer;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use anyhow::Result;
use crate::disk::BUCKET_META_PREFIX;
pub const BUCKET_METADATA_FILE: &str = ".metadata.bin";
pub const BUCKET_METADATA_FORMAT: u16 = 1;
pub const BUCKET_METADATA_VERSION: u16 = 1;
#[derive(Debug, PartialEq, Deserialize, Serialize)]
pub struct BucketMetadata {
format: u16,
version: u16,
pub name: String,
pub created: OffsetDateTime,
}
impl Default for BucketMetadata {
fn default() -> Self {
Self {
format: Default::default(),
version: Default::default(),
name: Default::default(),
created: OffsetDateTime::now_utc(),
}
}
}
impl BucketMetadata {
pub fn new(name: &str) -> Self {
BucketMetadata {
format: BUCKET_METADATA_FORMAT,
version: BUCKET_METADATA_VERSION,
name: name.to_string(),
..Default::default()
}
}
pub fn save_file_path(&self) -> String {
format!(
"{}/{}/{}",
BUCKET_META_PREFIX,
self.name.as_str(),
BUCKET_METADATA_FILE
)
// PathBuf::new()
// .join(BUCKET_META_PREFIX)
// .join(self.name.as_str())
// .join(BUCKET_METADATA_FILE)
}
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut Serializer::new(&mut buf))?;
Ok(buf)
}
}

252
ecstore/src/chunk_stream.rs Normal file
View File

@@ -0,0 +1,252 @@
use bytes::Bytes;
use futures::pin_mut;
use futures::stream::{Stream, StreamExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use transform_stream::AsyncTryStream;
use crate::error::StdError;
pub type SyncBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
pub struct ChunkedStream {
/// inner
inner: AsyncTryStream<Bytes, StdError, SyncBoxFuture<'static, Result<(), StdError>>>,
remaining_length: usize,
}
impl ChunkedStream {
pub fn new<S>(body: S, content_length: usize, chunk_size: usize, need_padding: bool) -> Self
where
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
{
let inner =
AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), StdError>>>::new(|mut y| {
#[allow(clippy::shadow_same)] // necessary for `pin_mut!`
Box::pin(async move {
pin_mut!(body);
// 上一次没用完的数据
let mut prev_bytes = Bytes::new();
let mut readed_size = 0;
loop {
let data: Vec<Bytes> = {
// 读固定大小的数据
match Self::read_data(body.as_mut(), prev_bytes, chunk_size).await {
None => break,
Some(Err(e)) => return Err(e),
Some(Ok((data, remaining_bytes))) => {
prev_bytes = remaining_bytes;
data
}
}
};
for bytes in data {
readed_size += bytes.len();
// println!(
// "readed_size {}, content_length {}",
// readed_size, content_length,
// );
y.yield_ok(bytes).await;
}
if readed_size + prev_bytes.len() >= content_length {
// println!(
// "读完了 readed_size:{} + prev_bytes.len({}) == content_length {}",
// readed_size,
// prev_bytes.len(),
// content_length,
// );
// 填充0
if !need_padding {
y.yield_ok(prev_bytes).await;
break;
}
let mut bytes = vec![0u8; chunk_size];
let (left, _) = bytes.split_at_mut(prev_bytes.len());
left.copy_from_slice(&prev_bytes);
y.yield_ok(Bytes::from(bytes)).await;
break;
}
}
Ok(())
})
});
Self {
inner,
remaining_length: content_length,
}
}
/// read data and return remaining bytes
async fn read_data<S>(
mut body: Pin<&mut S>,
prev_bytes: Bytes,
data_size: usize,
) -> Option<Result<(Vec<Bytes>, Bytes), StdError>>
where
S: Stream<Item = Result<Bytes, StdError>> + Send + 'static,
{
let mut bytes_buffer = Vec::new();
// 只执行一次
let mut push_data_bytes = |mut bytes: Bytes| {
if bytes.is_empty() {
return None;
}
if data_size == 0 {
return Some(bytes);
}
// 合并上一次数据
if !prev_bytes.is_empty() {
let need_size = data_size.wrapping_sub(prev_bytes.len());
// println!(
// " 上一次有剩余{},从这一次中取{},共:{}",
// prev_bytes.len(),
// need_size,
// prev_bytes.len() + need_size
// );
if bytes.len() >= need_size {
let data = bytes.split_to(need_size);
let mut combined = Vec::new();
combined.extend_from_slice(&prev_bytes);
combined.extend_from_slice(&data);
// println!(
// "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{}",
// need_size,
// combined.len(),
// bytes.len(),
// );
bytes_buffer.push(Bytes::from(combined));
} else {
let mut combined = Vec::new();
combined.extend_from_slice(&prev_bytes);
combined.extend_from_slice(&bytes);
// println!(
// "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{},直接返回",
// need_size,
// combined.len(),
// bytes.len(),
// );
return Some(Bytes::from(combined));
}
}
// 取到的数据比需要的块大从bytes中截取需要的块大小
if data_size <= bytes.len() {
let n = bytes.len() / data_size;
for _ in 0..n {
let data = bytes.split_to(data_size);
// println!("bytes_buffer.push: {} 剩余:{}", data.len(), bytes.len());
bytes_buffer.push(data);
}
Some(bytes)
} else {
// 不够
Some(bytes)
}
};
// 剩余数据
let remaining_bytes = 'outer: {
// // 如果上一次数据足够,跳出
// if let Some(remaining_bytes) = push_data_bytes(prev_bytes) {
// println!("从剩下的取");
// break 'outer remaining_bytes;
// }
loop {
match body.next().await? {
Err(e) => return Some(Err(e)),
Ok(bytes) => {
if let Some(remaining_bytes) = push_data_bytes(bytes) {
break 'outer remaining_bytes;
}
}
}
}
};
Some(Ok((bytes_buffer, remaining_bytes)))
}
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, StdError>>> {
let ans = Pin::new(&mut self.inner).poll_next(cx);
if let Poll::Ready(Some(Ok(ref bytes))) = ans {
self.remaining_length = self.remaining_length.saturating_sub(bytes.len());
}
ans
}
// pub fn exact_remaining_length(&self) -> usize {
// self.remaining_length
// }
}
impl Stream for ChunkedStream {
type Item = Result<Bytes, StdError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn test_chunked_stream() {
let chunk_size = 4;
let data1 = vec![1u8; 7777]; // 65536
let data2 = vec![1u8; 7777]; // 65536
let content_length = data1.len() + data2.len();
let chunk1 = Bytes::from(data1);
let chunk2 = Bytes::from(data2);
let chunk_results: Vec<Result<Bytes, _>> = vec![Ok(chunk1), Ok(chunk2)];
let stream = futures::stream::iter(chunk_results);
let mut chunked_stream = ChunkedStream::new(stream, content_length, chunk_size, true);
loop {
let ans1 = chunked_stream.next().await;
if ans1.is_none() {
break;
}
let bytes = ans1.unwrap().unwrap();
assert!(bytes.len() == chunk_size)
}
// assert_eq!(ans1.unwrap(), chunk1_data.as_slice());
}
}

View File

@@ -1,6 +1,7 @@
use std::{
fs::Metadata,
path::{Path, PathBuf},
sync::{Arc, RwLock},
};
use anyhow::{Error, Result};
@@ -25,7 +26,7 @@ pub const RUSTFS_META_TMP_DELETED_BUCKET: &str = ".rustfs.sys/tmp/.trash";
pub const BUCKET_META_PREFIX: &str = "buckets";
pub const FORMAT_CONFIG_FILE: &str = "format.json";
pub type DiskStore = Box<dyn DiskAPI>;
pub type DiskStore = Arc<Box<dyn DiskAPI>>;
pub struct DiskOption {
pub cleanup: bool,
@@ -34,7 +35,8 @@ pub struct DiskOption {
pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result<DiskStore> {
if ep.is_local {
Ok(LocalDisk::new(ep, opt.cleanup).await?)
let s = LocalDisk::new(ep, opt.cleanup).await?;
Ok(Arc::new(Box::new(s)))
} else {
unimplemented!()
// Ok(Disk::Remote(RemoteDisk::new(ep, opt.health_check)?))
@@ -51,24 +53,24 @@ pub async fn init_disks(
futures.push(new_disk(ep, opt));
}
let mut storages = Vec::with_capacity(eps.len());
let mut res = Vec::with_capacity(eps.len());
let mut errors = Vec::with_capacity(eps.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(s) => {
storages.push(Some(s));
res.push(Some(s));
errors.push(None);
}
Err(e) => {
storages.push(None);
res.push(None);
errors.push(Some(e));
}
}
}
(storages, errors)
(res, errors)
}
// pub async fn load_format(&self, heal: bool) -> Result<FormatV3> {
@@ -87,7 +89,7 @@ pub struct LocalDisk {
}
impl LocalDisk {
pub async fn new(ep: &Endpoint, cleanup: bool) -> Result<Box<Self>> {
pub async fn new(ep: &Endpoint, cleanup: bool) -> Result<Self> {
let root = fs::canonicalize(ep.url.path()).await?;
if cleanup {
@@ -131,7 +133,7 @@ impl LocalDisk {
disk.make_meta_volumes().await?;
Ok(Box::new(disk))
Ok(disk)
}
async fn make_meta_volumes(&self) -> Result<()> {
@@ -241,6 +243,10 @@ fn skip_access_checks(p: impl AsRef<str>) -> bool {
#[async_trait::async_trait]
impl DiskAPI for LocalDisk {
fn is_local(&self) -> bool {
true
}
#[must_use]
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes> {
let p = self.get_object_path(&volume, &path)?;

View File

@@ -5,6 +5,8 @@ use bytes::Bytes;
#[async_trait::async_trait]
pub trait DiskAPI: Debug + Send + Sync + 'static {
fn is_local(&self) -> bool;
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes>;
async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<()>;
async fn rename_file(

View File

@@ -1,3 +1,5 @@
mod bucket_meta;
mod chunk_stream;
mod disk;
mod disk_api;
mod disks_layout;
@@ -6,6 +8,7 @@ mod endpoint;
mod erasure;
pub mod error;
mod format;
mod peer;
mod sets;
pub mod store;
mod store_api;

178
ecstore/src/peer.rs Normal file
View File

@@ -0,0 +1,178 @@
use anyhow::Result;
use async_trait::async_trait;
use futures::future::join_all;
use std::{fmt::Debug, sync::Arc};
use crate::{
disk::{DiskError, DiskStore},
endpoint::{EndpointServerPools, Node},
store_api::MakeBucketOptions,
};
type Client = Arc<Box<dyn PeerS3Client>>;
#[async_trait]
pub trait PeerS3Client: Debug + Sync + Send + 'static {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
fn get_pools(&self) -> Vec<i32>;
}
#[derive(Debug)]
pub struct S3PeerSys {
pub clients: Vec<Client>,
pub pools_count: usize,
}
impl S3PeerSys {
pub fn new(eps: &EndpointServerPools, local_disks: Vec<DiskStore>) -> Self {
Self {
clients: Self::new_clients(eps, local_disks),
pools_count: eps.len(),
}
}
fn new_clients(eps: &EndpointServerPools, local_disks: Vec<DiskStore>) -> Vec<Client> {
let nodes = eps.get_nodes();
let v: Vec<Client> = nodes
.iter()
.map(|e| {
if e.is_local {
let cli: Box<dyn PeerS3Client> = Box::new(LocalPeerS3Client::new(
local_disks.clone(),
e.clone(),
e.pools.clone(),
));
Arc::new(cli)
} else {
let cli: Box<dyn PeerS3Client> =
Box::new(RemotePeerS3Client::new(e.clone(), e.pools.clone()));
Arc::new(cli)
}
})
.collect();
v
}
}
#[async_trait]
impl PeerS3Client for S3PeerSys {
fn get_pools(&self) -> Vec<i32> {
unimplemented!()
}
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let mut futures = Vec::with_capacity(self.clients.len());
for cli in self.clients.iter() {
futures.push(cli.make_bucket(bucket, opts));
}
let mut errors = Vec::with_capacity(self.clients.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(_) => {
errors.push(None);
}
Err(e) => {
errors.push(Some(e));
}
}
}
for i in 0..self.pools_count {
let mut per_pool_errs = Vec::with_capacity(self.clients.len());
for (j, cli) in self.clients.iter().enumerate() {
let pools = cli.get_pools();
let idx = i as i32;
if pools.contains(&idx) {
per_pool_errs.push(errors[j].as_ref());
}
// TODO: reduceWriteQuorumErrs
}
}
// TODO:
Ok(())
}
}
#[derive(Debug)]
pub struct LocalPeerS3Client {
pub local_disks: Vec<DiskStore>,
pub node: Node,
pub pools: Vec<i32>,
}
impl LocalPeerS3Client {
fn new(local_disks: Vec<DiskStore>, node: Node, pools: Vec<i32>) -> Self {
Self {
local_disks,
node,
pools,
}
}
}
#[async_trait]
impl PeerS3Client for LocalPeerS3Client {
fn get_pools(&self) -> Vec<i32> {
self.pools.clone()
}
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
let mut futures = Vec::with_capacity(self.local_disks.len());
for disk in self.local_disks.iter() {
futures.push(async move {
match disk.make_volume(bucket).await {
Ok(_) => Ok(()),
Err(e) => {
if opts.force_create && DiskError::is_err(&e, &DiskError::VolumeExists) {
return Ok(());
}
Err(e)
}
}
});
}
let results = join_all(futures).await;
let mut errs = Vec::new();
for res in results {
match res {
Ok(_) => errs.push(None),
Err(e) => errs.push(Some(e)),
}
}
// TODO: reduceWriteQuorumErrs
Ok(())
}
}
#[derive(Debug)]
pub struct RemotePeerS3Client {
pub node: Node,
pub pools: Vec<i32>,
}
impl RemotePeerS3Client {
fn new(node: Node, pools: Vec<i32>) -> Self {
Self { node, pools }
}
}
#[async_trait]
impl PeerS3Client for RemotePeerS3Client {
fn get_pools(&self) -> Vec<i32> {
unimplemented!()
}
async fn make_bucket(&self, _bucket: &str, _opts: &MakeBucketOptions) -> Result<()> {
unimplemented!()
}
}

View File

@@ -1,13 +1,19 @@
use anyhow::Result;
use futures::StreamExt;
use uuid::Uuid;
use crate::{endpoint::PoolEndpoints, format::FormatV3};
use crate::{
disk::DiskStore,
endpoint::PoolEndpoints,
format::FormatV3,
store_api::{MakeBucketOptions, ObjectOptions, PutObjReader, StorageAPI},
};
#[derive(Debug)]
pub struct Sets {
pub id: Uuid,
// pub sets: Vec<Objects>,
pub disk_indexs: Vec<Vec<usize>>, // [set_count_idx][set_drive_count_idx] = disk_idx
pub disk_indexs: Vec<Vec<Option<DiskStore>>>, // [set_count_idx][set_drive_count_idx] = disk_idx
pub pool_idx: usize,
pub endpoints: PoolEndpoints,
pub format: FormatV3,
@@ -18,6 +24,7 @@ pub struct Sets {
impl Sets {
pub fn new(
disks: Vec<Option<DiskStore>>,
endpoints: &PoolEndpoints,
fm: &FormatV3,
pool_idx: usize,
@@ -29,13 +36,18 @@ impl Sets {
let mut disk_indexs = Vec::with_capacity(set_count);
for i in 0..set_count {
let mut set_indexs = Vec::with_capacity(set_drive_count);
let mut set_drive = Vec::with_capacity(set_drive_count);
for j in 0..set_drive_count {
let idx = i * set_drive_count + j;
set_indexs.push(idx);
if disks[idx].is_none() {
set_drive.push(None);
} else {
let disk = disks[idx].clone();
set_drive.push(disk);
}
}
disk_indexs.push(set_indexs);
disk_indexs.push(set_drive);
}
let sets = Self {
@@ -52,7 +64,7 @@ impl Sets {
Ok(sets)
}
pub fn get_disks(&self, set_idx: usize) -> Vec<usize> {
pub fn get_disks(&self, set_idx: usize) -> Vec<Option<DiskStore>> {
self.disk_indexs[set_idx].clone()
}
}
@@ -66,3 +78,21 @@ impl Sets {
// pub set_drive_count: usize,
// pub default_parity_count: usize,
// }
#[async_trait::async_trait]
impl StorageAPI for Sets {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
unimplemented!()
}
async fn put_object(
&self,
bucket: &str,
object: &str,
data: &PutObjReader,
opts: &ObjectOptions,
) -> Result<()> {
// data.stream.next();
unimplemented!()
}
}

View File

@@ -1,15 +1,21 @@
use std::collections::HashMap;
use std::{collections::HashMap, io::Cursor};
use anyhow::{Error, Result};
use tokio_util::io::ReaderStream;
use uuid::Uuid;
use crate::{
disk::{self, DiskError, DiskOption, DiskStore},
bucket_meta::BucketMetadata,
disk::{self, DiskError, DiskOption, DiskStore, RUSTFS_META_BUCKET},
disks_layout::DisksLayout,
endpoint::EndpointServerPools,
peer::{PeerS3Client, S3PeerSys},
sets::Sets,
store_api::{MakeBucketOptions, StorageAPI},
store_api::{MakeBucketOptions, ObjectOptions, PutObjReader, StorageAPI},
store_init,
stream::into_dyn,
utils,
};
#[derive(Debug)]
@@ -18,7 +24,8 @@ pub struct ECStore {
// pub disks: Vec<DiskStore>,
pub disk_map: HashMap<usize, Vec<Option<DiskStore>>>,
pub pools: Vec<Sets>,
pub peer: Vec<String>,
pub peer_sys: S3PeerSys,
pub local_disks: Vec<DiskStore>,
}
impl ECStore {
@@ -35,6 +42,8 @@ impl ECStore {
let first_is_local = endpoint_pools.first_is_local();
let mut local_disks = Vec::new();
for (i, pool_eps) in endpoint_pools.iter().enumerate() {
// TODO: read from config parseStorageClass
let partiy_count = store_init::default_partiy_count(pool_eps.drives_per_set);
@@ -71,28 +80,83 @@ impl ECStore {
deployment_id = Some(Uuid::new_v4());
}
disk_map.insert(i, disks);
for disk in disks.iter() {
if disk.is_some() && disk.as_ref().unwrap().is_local() {
local_disks.push(disk.as_ref().unwrap().clone());
}
}
let sets = Sets::new(pool_eps, &fm, i, partiy_count)?;
let sets = Sets::new(disks.clone(), pool_eps, &fm, i, partiy_count)?;
pools.push(sets);
disk_map.insert(i, disks);
}
let peer_sys = S3PeerSys::new(&endpoint_pools, local_disks.clone());
Ok(ECStore {
id: deployment_id.unwrap(),
disk_map,
pools,
peer: Vec::new(),
local_disks,
peer_sys,
})
}
fn single_pool(&self) -> bool {
self.pools.len() == 1
}
}
#[async_trait::async_trait]
impl StorageAPI for ECStore {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
// TODO: check valid bucket name
unimplemented!()
// TODO: delete created bucket when error
self.peer_sys.make_bucket(bucket, opts).await?;
let meta = BucketMetadata::new(bucket);
let data = meta.marshal_msg()?;
let file_path = meta.save_file_path();
let stream = ReaderStream::new(Cursor::new(data));
// TODO: wrap hash reader
// let reader = PutObjReader::new(stream);
// self.put_object(
// RUSTFS_META_BUCKET,
// &file_path,
// &reader,
// &ObjectOptions { max_parity: true },
// )
// .await?;
// TODO: toObjectErr
Ok(())
}
async fn put_object(&self, bucket: &str, objcet: &str) -> Result<()> {
async fn put_object(
&self,
bucket: &str,
object: &str,
data: &PutObjReader,
opts: &ObjectOptions,
) -> Result<()> {
// checkPutObjectArgs
let object = utils::path::encode_dir_object(object);
if self.single_pool() {
self.pools[0]
.put_object(bucket, object.as_str(), data, opts)
.await?;
return Ok(());
}
unimplemented!()
}
}

View File

@@ -1,9 +1,39 @@
use std::sync::Arc;
use anyhow::Result;
use bytes::Bytes;
use futures::Stream;
pub struct MakeBucketOptions {}
use crate::stream::DynByteStream;
pub struct MakeBucketOptions {
pub force_create: bool,
}
pub struct PutObjReader {
// pub stream: Box<dyn Stream<Item = Bytes>>,
}
// impl PutObjReader {
// pub fn new<S: Stream>(stream: S) -> Self {
// PutObjReader { stream }
// }
// }
pub struct ObjectOptions {
// Use the maximum parity (N/2), used when saving server configuration files
pub max_parity: bool,
}
#[async_trait::async_trait]
pub trait StorageAPI {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn put_object(&self, bucket: &str, objcet: &str) -> Result<()>;
async fn put_object(
&self,
bucket: &str,
object: &str,
data: &PutObjReader,
opts: &ObjectOptions,
) -> Result<()>;
}

View File

@@ -1,252 +1,174 @@
use bytes::Bytes;
use futures::pin_mut;
use futures::stream::{Stream, StreamExt};
use std::future::Future;
use anyhow::Error;
use tokio::io::AsyncRead;
use tokio_util::io::ReaderStream;
use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
use transform_stream::AsyncTryStream;
use crate::error::StdError;
use bytes::Bytes;
use futures::{pin_mut, Stream, StreamExt};
pub type SyncBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
pub struct ChunkedStream {
/// inner
inner: AsyncTryStream<Bytes, StdError, SyncBoxFuture<'static, Result<(), StdError>>>,
remaining_length: usize,
pub trait ByteStream: Stream {
fn remaining_length(&self) -> RemainingLength {
RemainingLength::unknown()
}
}
impl ChunkedStream {
pub fn new<S>(body: S, content_length: usize, chunk_size: usize, need_padding: bool) -> Self
where
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
{
let inner =
AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), StdError>>>::new(|mut y| {
#[allow(clippy::shadow_same)] // necessary for `pin_mut!`
Box::pin(async move {
pin_mut!(body);
// 上一次没用完的数据
let mut prev_bytes = Bytes::new();
let mut readed_size = 0;
pub type DynByteStream =
Pin<Box<dyn ByteStream<Item = Result<Bytes, Error>> + Send + Sync + 'static>>;
loop {
let data: Vec<Bytes> = {
// 读固定大小的数据
match Self::read_data(body.as_mut(), prev_bytes, chunk_size).await {
None => break,
Some(Err(e)) => return Err(e),
Some(Ok((data, remaining_bytes))) => {
prev_bytes = remaining_bytes;
data
}
}
};
pub struct RemainingLength {
lower: usize,
upper: Option<usize>,
}
for bytes in data {
readed_size += bytes.len();
// println!(
// "readed_size {}, content_length {}",
// readed_size, content_length,
// );
y.yield_ok(bytes).await;
}
impl RemainingLength {
/// Creates a new `RemainingLength` with the given lower and upper bounds.
///
/// # Panics
/// This function asserts that `lower <= upper`.
#[must_use]
pub fn new(lower: usize, upper: Option<usize>) -> Self {
if let Some(upper) = upper {
assert!(lower <= upper);
}
Self { lower, upper }
}
if readed_size + prev_bytes.len() >= content_length {
// println!(
// "读完了 readed_size:{} + prev_bytes.len({}) == content_length {}",
// readed_size,
// prev_bytes.len(),
// content_length,
// );
// 填充0
if !need_padding {
y.yield_ok(prev_bytes).await;
break;
}
let mut bytes = vec![0u8; chunk_size];
let (left, _) = bytes.split_at_mut(prev_bytes.len());
left.copy_from_slice(&prev_bytes);
y.yield_ok(Bytes::from(bytes)).await;
break;
}
}
Ok(())
})
});
#[must_use]
pub fn unknown() -> Self {
Self {
inner,
remaining_length: content_length,
lower: 0,
upper: None,
}
}
/// read data and return remaining bytes
async fn read_data<S>(
mut body: Pin<&mut S>,
prev_bytes: Bytes,
data_size: usize,
) -> Option<Result<(Vec<Bytes>, Bytes), StdError>>
where
S: Stream<Item = Result<Bytes, StdError>> + Send + 'static,
{
let mut bytes_buffer = Vec::new();
// 只执行一次
let mut push_data_bytes = |mut bytes: Bytes| {
if bytes.is_empty() {
return None;
}
if data_size == 0 {
return Some(bytes);
}
// 合并上一次数据
if !prev_bytes.is_empty() {
let need_size = data_size.wrapping_sub(prev_bytes.len());
// println!(
// " 上一次有剩余{},从这一次中取{},共:{}",
// prev_bytes.len(),
// need_size,
// prev_bytes.len() + need_size
// );
if bytes.len() >= need_size {
let data = bytes.split_to(need_size);
let mut combined = Vec::new();
combined.extend_from_slice(&prev_bytes);
combined.extend_from_slice(&data);
// println!(
// "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{}",
// need_size,
// combined.len(),
// bytes.len(),
// );
bytes_buffer.push(Bytes::from(combined));
} else {
let mut combined = Vec::new();
combined.extend_from_slice(&prev_bytes);
combined.extend_from_slice(&bytes);
// println!(
// "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{},直接返回",
// need_size,
// combined.len(),
// bytes.len(),
// );
return Some(Bytes::from(combined));
}
}
// 取到的数据比需要的块大从bytes中截取需要的块大小
if data_size <= bytes.len() {
let n = bytes.len() / data_size;
for _ in 0..n {
let data = bytes.split_to(data_size);
// println!("bytes_buffer.push: {} 剩余:{}", data.len(), bytes.len());
bytes_buffer.push(data);
}
Some(bytes)
} else {
// 不够
Some(bytes)
}
};
// 剩余数据
let remaining_bytes = 'outer: {
// // 如果上一次数据足够,跳出
// if let Some(remaining_bytes) = push_data_bytes(prev_bytes) {
// println!("从剩下的取");
// break 'outer remaining_bytes;
// }
loop {
match body.next().await? {
Err(e) => return Some(Err(e)),
Ok(bytes) => {
if let Some(remaining_bytes) = push_data_bytes(bytes) {
break 'outer remaining_bytes;
}
}
}
}
};
Some(Ok((bytes_buffer, remaining_bytes)))
}
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, StdError>>> {
let ans = Pin::new(&mut self.inner).poll_next(cx);
if let Poll::Ready(Some(Ok(ref bytes))) = ans {
self.remaining_length = self.remaining_length.saturating_sub(bytes.len());
#[must_use]
pub fn new_exact(n: usize) -> Self {
Self {
lower: n,
upper: Some(n),
}
ans
}
// pub fn exact_remaining_length(&self) -> usize {
// self.remaining_length
// }
#[must_use]
pub fn exact(&self) -> Option<usize> {
self.upper.filter(|&upper| upper == self.lower)
}
}
impl Stream for ChunkedStream {
type Item = Result<Bytes, StdError>;
impl fmt::Debug for RemainingLength {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(exact) = self.exact() {
return write!(f, "{exact}");
}
match self.upper {
Some(upper) => write!(f, "({}..={})", self.lower, upper),
None => write!(f, "({}..)", self.lower),
}
}
}
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll(cx)
pub(crate) fn into_dyn<S, E>(s: S) -> DynByteStream
where
S: ByteStream<Item = Result<Bytes, E>> + Send + Sync + Unpin + 'static,
E: std::error::Error + Send + Sync + 'static,
{
Box::pin(Wrapper(s))
}
struct Wrapper<S>(S);
impl<S, E> Stream for Wrapper<S>
where
S: ByteStream<Item = Result<Bytes, E>> + Send + Sync + Unpin + 'static,
E: std::error::Error + Send + Sync + 'static,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = Pin::new(&mut self.0);
this.poll_next(cx).map_err(|e| Error::new(e))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
self.0.size_hint()
}
}
#[cfg(test)]
mod test {
impl<S, E> ByteStream for Wrapper<S>
where
S: ByteStream<Item = Result<Bytes, E>> + Send + Sync + Unpin + 'static,
E: std::error::Error + Send + Sync + 'static,
{
fn remaining_length(&self) -> RemainingLength {
self.0.remaining_length()
}
}
use super::*;
// FIXME: unbounded memory allocation
pub(crate) async fn aggregate_unlimited<S, E>(stream: S) -> Result<Vec<Bytes>, E>
where
S: ByteStream<Item = Result<Bytes, E>>,
{
let mut vec = Vec::new();
pin_mut!(stream);
while let Some(result) = stream.next().await {
vec.push(result?);
}
Ok(vec)
}
#[tokio::test]
async fn test_chunked_stream() {
let chunk_size = 4;
pub(crate) struct VecByteStream {
queue: VecDeque<Bytes>,
remaining_bytes: usize,
}
let data1 = vec![1u8; 7777]; // 65536
let data2 = vec![1u8; 7777]; // 65536
impl VecByteStream {
pub fn new(v: Vec<Bytes>) -> Self {
let total = v
.iter()
.map(Bytes::len)
.try_fold(0, usize::checked_add)
.expect("length overflow");
let content_length = data1.len() + data2.len();
let chunk1 = Bytes::from(data1);
let chunk2 = Bytes::from(data2);
let chunk_results: Vec<Result<Bytes, _>> = vec![Ok(chunk1), Ok(chunk2)];
let stream = futures::stream::iter(chunk_results);
let mut chunked_stream = ChunkedStream::new(stream, content_length, chunk_size, true);
loop {
let ans1 = chunked_stream.next().await;
if ans1.is_none() {
break;
}
let bytes = ans1.unwrap().unwrap();
assert!(bytes.len() == chunk_size)
Self {
queue: v.into(),
remaining_bytes: total,
}
}
// assert_eq!(ans1.unwrap(), chunk1_data.as_slice());
pub fn exact_remaining_length(&self) -> usize {
self.remaining_bytes
}
}
impl Stream for VecByteStream {
type Item = Result<Bytes, Error>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);
match this.queue.pop_front() {
Some(b) => {
this.remaining_bytes -= b.len();
Poll::Ready(Some(Ok(b)))
}
None => Poll::Ready(None),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let cnt = self.queue.len();
(cnt, Some(cnt))
}
}
impl ByteStream for VecByteStream {
fn remaining_length(&self) -> RemainingLength {
RemainingLength::new_exact(self.remaining_bytes)
}
}
impl<R: AsyncRead> ByteStream for ReaderStream<R> {}

View File

@@ -1,2 +1,3 @@
pub mod net;
pub mod path;
pub mod string;

34
ecstore/src/utils/path.rs Normal file
View File

@@ -0,0 +1,34 @@
const GLOBAL_DIR_SUFFIX: &str = "__XLDIR__";
const SLASH_SEPARATOR: char = '/';
pub fn has_suffix(s: &str, suffix: &str) -> bool {
if cfg!(target_os = "windows") {
s.to_lowercase().ends_with(&suffix.to_lowercase())
} else {
s.ends_with(suffix)
}
}
pub fn encode_dir_object(object: &str) -> String {
if has_suffix(object, &SLASH_SEPARATOR.to_string()) {
format!(
"{}{}",
object.trim_end_matches(SLASH_SEPARATOR),
GLOBAL_DIR_SUFFIX
)
} else {
object.to_string()
}
}
pub fn decode_dir_object(object: &str) -> String {
if has_suffix(object, GLOBAL_DIR_SUFFIX) {
format!(
"{}{}",
object.trim_end_matches(GLOBAL_DIR_SUFFIX),
SLASH_SEPARATOR
)
} else {
object.to_string()
}
}