chunked_stream

This commit is contained in:
weisd
2024-06-25 10:50:29 +08:00
parent 7b9d13bdd1
commit 7634eea4a2

View File

@@ -2,7 +2,6 @@ use bytes::Bytes;
use futures::pin_mut;
use futures::stream::{Stream, StreamExt};
use std::future::Future;
use std::ops::Not;
use std::pin::Pin;
use std::task::{Context, Poll};
use transform_stream::AsyncTryStream;
@@ -13,68 +12,74 @@ pub type SyncBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + '
pub struct ChunkedStream {
/// inner
inner: AsyncTryStream<
Bytes,
ChunkedStreamError,
SyncBoxFuture<'static, Result<(), ChunkedStreamError>>,
>,
inner: AsyncTryStream<Bytes, StdError, SyncBoxFuture<'static, Result<(), StdError>>>,
remaining_length: usize,
}
impl ChunkedStream {
pub fn new<S>(body: S, chunk_size: usize, content_length: usize) -> Self
pub fn new<S>(body: S, content_length: usize, chunk_size: usize, need_padding: bool) -> Self
where
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
{
let inner =
AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), ChunkedStreamError>>>::new(
|mut y| {
#[allow(clippy::shadow_same)] // necessary for `pin_mut!`
Box::pin(async move {
pin_mut!(body);
// 上一次没用完的数据
let mut prev_bytes = Bytes::new();
let mut readed_size = 0;
AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), StdError>>>::new(|mut y| {
#[allow(clippy::shadow_same)] // necessary for `pin_mut!`
Box::pin(async move {
pin_mut!(body);
// 上一次没用完的数据
let mut prev_bytes = Bytes::new();
let mut readed_size = 0;
'outer: {
loop {
let data: Vec<Bytes> = {
// 读固定大小的数据
match Self::read_data(body.as_mut(), prev_bytes, chunk_size)
.await
{
None => break 'outer,
Some(Err(e)) => return Err(e),
Some(Ok((data, remaining_bytes))) => {
prev_bytes = remaining_bytes;
data
}
}
};
for bytes in data {
readed_size += bytes.len();
// 没读完
if readed_size <= content_length {
if bytes.len() < chunk_size {
prev_bytes = bytes;
} else {
y.yield_ok(bytes).await;
}
} else {
// 读完了
break 'outer;
}
loop {
let data: Vec<Bytes> = {
// 读固定大小的数据
match Self::read_data(body.as_mut(), prev_bytes, chunk_size).await {
None => break,
Some(Err(e)) => return Err(e),
Some(Ok((data, remaining_bytes))) => {
prev_bytes = remaining_bytes;
data
}
}
};
Ok(())
})
},
);
for bytes in data {
readed_size += bytes.len();
// println!(
// "readed_size {}, content_length {}",
// readed_size, content_length,
// );
y.yield_ok(bytes).await;
}
if readed_size + prev_bytes.len() >= content_length {
// println!(
// "读完了 readed_size:{} + prev_bytes.len({}) == content_length {}",
// readed_size,
// prev_bytes.len(),
// content_length,
// );
// 填充0
if !need_padding {
y.yield_ok(prev_bytes).await;
break;
}
let mut bytes = vec![0u8; chunk_size];
let (left, _) = bytes.split_at_mut(prev_bytes.len());
left.copy_from_slice(&prev_bytes);
y.yield_ok(Bytes::from(bytes)).await;
break;
}
}
Ok(())
})
});
Self {
inner,
remaining_length: content_length,
@@ -85,56 +90,91 @@ impl ChunkedStream {
mut body: Pin<&mut S>,
prev_bytes: Bytes,
data_size: usize,
) -> Option<Result<(Vec<Bytes>, Bytes), ChunkedStreamError>>
) -> Option<Result<(Vec<Bytes>, Bytes), StdError>>
where
S: Stream<Item = Result<Bytes, StdError>> + Send + 'static,
{
let mut data_size = data_size;
let mut bytes_buffer = Vec::new();
// 只执行一次
let mut push_data_bytes = |mut bytes: Bytes| {
println!("bytes_buffer.len: {}", bytes_buffer.len());
if bytes.is_empty() {
return None;
}
if data_size == 0 {
return Some(bytes);
}
// 合并上一次数据
if !prev_bytes.is_empty() {
let need_size = data_size.wrapping_sub(prev_bytes.len());
// println!(
// " 上一次有剩余{},从这一次中取{},共:{}",
// prev_bytes.len(),
// need_size,
// prev_bytes.len() + need_size
// );
if bytes.len() >= need_size {
let data = bytes.split_to(need_size);
let mut combined = Vec::new();
combined.extend_from_slice(&prev_bytes);
combined.extend_from_slice(&data);
// println!(
// "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{}",
// need_size,
// combined.len(),
// bytes.len(),
// );
bytes_buffer.push(Bytes::from(combined));
} else {
let mut combined = Vec::new();
combined.extend_from_slice(&prev_bytes);
combined.extend_from_slice(&bytes);
// println!(
// "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{},直接返回",
// need_size,
// combined.len(),
// bytes.len(),
// );
return Some(Bytes::from(combined));
}
}
// 取到的数据比需要的块大从bytes中截取需要的块大小
if data_size <= bytes.len() {
let data = bytes.split_to(data_size);
let n = bytes.len() / data_size;
for _ in 0..n {
let data = bytes.split_to(data_size);
// println!("bytes_buffer.push: {} 剩余:{}", data.len(), bytes.len());
bytes_buffer.push(data);
}
println!("bytes_buffer.push: {}", data.len());
bytes_buffer.push(data);
data_size = 0;
Some(bytes)
} else {
// 不够
data_size = data_size.wrapping_sub(bytes.len());
if bytes.is_empty() {
return None;
}
println!("bytes_buffer.push 2: {}, need:{}", bytes.len(), data_size);
bytes_buffer.push(bytes);
None
Some(bytes)
}
};
// 剩余数据
let remaining_bytes = 'outer: {
// 如果上一次数据足够,跳出
if let Some(remaining_bytes) = push_data_bytes(prev_bytes) {
println!("从剩下的取");
break 'outer remaining_bytes;
}
// // 如果上一次数据足够,跳出
// if let Some(remaining_bytes) = push_data_bytes(prev_bytes) {
// println!("从剩下的取");
// break 'outer remaining_bytes;
// }
loop {
match body.next().await? {
Err(e) => return Some(Err(ChunkedStreamError::Underlying(e))),
Err(e) => return Some(Err(e)),
Ok(bytes) => {
println!("从body 取: {}", bytes.len());
if let Some(remaining_bytes) = push_data_bytes(bytes) {
break 'outer remaining_bytes;
}
@@ -143,27 +183,16 @@ impl ChunkedStream {
}
};
// println!(
// "bytes_buffer:{},remaining_bytes:{}",
// bytes_buffer.len(),
// remaining_bytes.len()
// );
Some(Ok((bytes_buffer, remaining_bytes)))
}
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, ChunkedStreamError>>> {
) -> Poll<Option<Result<Bytes, StdError>>> {
let ans = Pin::new(&mut self.inner).poll_next(cx);
if let Poll::Ready(Some(Ok(ref bytes))) = ans {
self.remaining_length = self.remaining_length.saturating_sub(bytes.len());
println!(
"这次读取长度:{} 还需要:{}",
bytes.len(),
self.remaining_length
);
}
ans
}
@@ -174,7 +203,7 @@ impl ChunkedStream {
}
impl Stream for ChunkedStream {
type Item = Result<Bytes, ChunkedStreamError>;
type Item = Result<Bytes, StdError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll(cx)
@@ -185,19 +214,6 @@ impl Stream for ChunkedStream {
}
}
#[derive(Debug, thiserror::Error)]
pub enum ChunkedStreamError {
/// Underlying error
#[error("ChunkedStreamError: Underlying: {}",.0)]
Underlying(StdError),
/// Format error
#[error("ChunkedStreamError: FormatError")]
FormatError,
/// Incomplete stream
#[error("ChunkedStreamError: Incomplete")]
Incomplete,
}
#[cfg(test)]
mod test {
@@ -205,10 +221,10 @@ mod test {
#[tokio::test]
async fn test_chunked_stream() {
let chunk_size = 1024;
let chunk_size = 4;
let data1 = vec![b'a'; 7777]; // 65536
let data2 = vec![b'a'; 7777]; // 65536
let data1 = vec![1u8; 7777]; // 65536
let data2 = vec![1u8; 7777]; // 65536
let content_length = data1.len() + data2.len();
@@ -219,7 +235,7 @@ mod test {
let stream = futures::stream::iter(chunk_results);
let mut chunked_stream = ChunkedStream::new(stream, chunk_size, content_length);
let mut chunked_stream = ChunkedStream::new(stream, content_length, chunk_size, true);
loop {
let ans1 = chunked_stream.next().await;
@@ -227,7 +243,8 @@ mod test {
break;
}
assert!(ans1.unwrap().unwrap().len() == chunk_size)
let bytes = ans1.unwrap().unwrap();
assert!(bytes.len() == chunk_size)
}
// assert_eq!(ans1.unwrap(), chunk1_data.as_slice());