This commit is contained in:
weisd
2024-06-24 18:14:39 +08:00
parent 86f089478d
commit 7b9d13bdd1
5 changed files with 386 additions and 1 deletions

143
Cargo.lock generated
View File

@@ -55,6 +55,12 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bytes"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
[[package]]
name = "cc"
version = "1.0.100"
@@ -85,6 +91,95 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-util"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "getrandom"
version = "0.2.15"
@@ -284,6 +379,12 @@ version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "powerfmt"
version = "0.2.0"
@@ -380,6 +481,15 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
[[package]]
name = "smallvec"
version = "1.13.2"
@@ -396,8 +506,12 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
name = "store"
version = "0.1.0"
dependencies = [
"bytes",
"futures",
"reed-solomon-erasure",
"thiserror",
"tokio",
"transform-stream",
"url",
"uuid",
]
@@ -413,6 +527,26 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "thiserror"
version = "1.0.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.8"
@@ -527,6 +661,15 @@ dependencies = [
"tracing-log",
]
[[package]]
name = "transform-stream"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05034de7a8fcb11796a36478a2a8b16dca6772644dec5f49f709d5c66a38d359"
dependencies = [
"futures-core",
]
[[package]]
name = "unicode-bidi"
version = "0.3.15"

View File

@@ -9,7 +9,11 @@ rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] }
url = "2.5.2"
uuid = "1.8.0"
reed-solomon-erasure = "6.0.0"
transform-stream = "0.3.0"
bytes.workspace = true
tokio.workspace = true
thiserror.workspace = true
futures.workspace = true

1
store/src/error.rs Normal file
View File

@@ -0,0 +1 @@
pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;

View File

@@ -1,3 +1,5 @@
mod endpoint;
mod erasure;
mod error;
mod store;
mod stream;

View File

@@ -0,0 +1,235 @@
use bytes::Bytes;
use futures::pin_mut;
use futures::stream::{Stream, StreamExt};
use std::future::Future;
use std::ops::Not;
use std::pin::Pin;
use std::task::{Context, Poll};
use transform_stream::AsyncTryStream;
use crate::error::StdError;
pub type SyncBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
pub struct ChunkedStream {
/// inner
inner: AsyncTryStream<
Bytes,
ChunkedStreamError,
SyncBoxFuture<'static, Result<(), ChunkedStreamError>>,
>,
remaining_length: usize,
}
impl ChunkedStream {
pub fn new<S>(body: S, chunk_size: usize, content_length: usize) -> Self
where
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
{
let inner =
AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), ChunkedStreamError>>>::new(
|mut y| {
#[allow(clippy::shadow_same)] // necessary for `pin_mut!`
Box::pin(async move {
pin_mut!(body);
// 上一次没用完的数据
let mut prev_bytes = Bytes::new();
let mut readed_size = 0;
'outer: {
loop {
let data: Vec<Bytes> = {
// 读固定大小的数据
match Self::read_data(body.as_mut(), prev_bytes, chunk_size)
.await
{
None => break 'outer,
Some(Err(e)) => return Err(e),
Some(Ok((data, remaining_bytes))) => {
prev_bytes = remaining_bytes;
data
}
}
};
for bytes in data {
readed_size += bytes.len();
// 没读完
if readed_size <= content_length {
if bytes.len() < chunk_size {
prev_bytes = bytes;
} else {
y.yield_ok(bytes).await;
}
} else {
// 读完了
break 'outer;
}
}
}
};
Ok(())
})
},
);
Self {
inner,
remaining_length: content_length,
}
}
/// read data and return remaining bytes
async fn read_data<S>(
mut body: Pin<&mut S>,
prev_bytes: Bytes,
data_size: usize,
) -> Option<Result<(Vec<Bytes>, Bytes), ChunkedStreamError>>
where
S: Stream<Item = Result<Bytes, StdError>> + Send + 'static,
{
let mut data_size = data_size;
let mut bytes_buffer = Vec::new();
let mut push_data_bytes = |mut bytes: Bytes| {
println!("bytes_buffer.len: {}", bytes_buffer.len());
if data_size == 0 {
return Some(bytes);
}
// 取到的数据比需要的块大从bytes中截取需要的块大小
if data_size <= bytes.len() {
let data = bytes.split_to(data_size);
println!("bytes_buffer.push: {}", data.len());
bytes_buffer.push(data);
data_size = 0;
Some(bytes)
} else {
// 不够
data_size = data_size.wrapping_sub(bytes.len());
if bytes.is_empty() {
return None;
}
println!("bytes_buffer.push 2: {}, need:{}", bytes.len(), data_size);
bytes_buffer.push(bytes);
None
}
};
// 剩余数据
let remaining_bytes = 'outer: {
// 如果上一次数据足够,跳出
if let Some(remaining_bytes) = push_data_bytes(prev_bytes) {
println!("从剩下的取");
break 'outer remaining_bytes;
}
loop {
match body.next().await? {
Err(e) => return Some(Err(ChunkedStreamError::Underlying(e))),
Ok(bytes) => {
println!("从body 取: {}", bytes.len());
if let Some(remaining_bytes) = push_data_bytes(bytes) {
break 'outer remaining_bytes;
}
}
}
}
};
// println!(
// "bytes_buffer:{},remaining_bytes:{}",
// bytes_buffer.len(),
// remaining_bytes.len()
// );
Some(Ok((bytes_buffer, remaining_bytes)))
}
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, ChunkedStreamError>>> {
let ans = Pin::new(&mut self.inner).poll_next(cx);
if let Poll::Ready(Some(Ok(ref bytes))) = ans {
self.remaining_length = self.remaining_length.saturating_sub(bytes.len());
println!(
"这次读取长度:{} 还需要:{}",
bytes.len(),
self.remaining_length
);
}
ans
}
// pub fn exact_remaining_length(&self) -> usize {
// self.remaining_length
// }
}
impl Stream for ChunkedStream {
type Item = Result<Bytes, ChunkedStreamError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
#[derive(Debug, thiserror::Error)]
pub enum ChunkedStreamError {
/// Underlying error
#[error("ChunkedStreamError: Underlying: {}",.0)]
Underlying(StdError),
/// Format error
#[error("ChunkedStreamError: FormatError")]
FormatError,
/// Incomplete stream
#[error("ChunkedStreamError: Incomplete")]
Incomplete,
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn test_chunked_stream() {
let chunk_size = 1024;
let data1 = vec![b'a'; 7777]; // 65536
let data2 = vec![b'a'; 7777]; // 65536
let content_length = data1.len() + data2.len();
let chunk1 = Bytes::from(data1);
let chunk2 = Bytes::from(data2);
let chunk_results: Vec<Result<Bytes, _>> = vec![Ok(chunk1), Ok(chunk2)];
let stream = futures::stream::iter(chunk_results);
let mut chunked_stream = ChunkedStream::new(stream, chunk_size, content_length);
loop {
let ans1 = chunked_stream.next().await;
if ans1.is_none() {
break;
}
assert!(ans1.unwrap().unwrap().len() == chunk_size)
}
// assert_eq!(ans1.unwrap(), chunk1_data.as_slice());
}
}