test: AsyncWrite

This commit is contained in:
weisd
2024-07-10 11:44:58 +08:00
parent 90ac002c42
commit d7264dc7d2
5 changed files with 90 additions and 34 deletions

View File

@@ -37,13 +37,13 @@ impl ChunkedStream {
None => break,
Some(Err(e)) => return Err(e),
Some(Ok((data, remaining_bytes))) => {
debug!(
"content_length:{},readed_size:{}, read_data data:{}, remaining_bytes: {} ",
content_length,
readed_size,
data.len(),
remaining_bytes.len()
);
// debug!(
// "content_length:{},readed_size:{}, read_data data:{}, remaining_bytes: {} ",
// content_length,
// readed_size,
// data.len(),
// remaining_bytes.len()
// );
prev_bytes = remaining_bytes;
data
@@ -107,7 +107,7 @@ impl ChunkedStream {
// 只执行一次
let mut push_data_bytes = |mut bytes: Bytes| {
debug!("read from body {} split per {}, prev_bytes: {}", bytes.len(), data_size, prev_bytes.len());
// debug!("read from body {} split per {}, prev_bytes: {}", bytes.len(), data_size, prev_bytes.len());
if bytes.is_empty() {
return None;
@@ -132,12 +132,12 @@ impl ChunkedStream {
combined.extend_from_slice(&prev_bytes);
combined.extend_from_slice(&data);
debug!(
"取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{}",
need_size,
combined.len(),
bytes.len(),
);
// debug!(
// "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{}",
// need_size,
// combined.len(),
// bytes.len(),
// );
bytes_buffer.push(Bytes::from(combined));
} else {
@@ -145,12 +145,12 @@ impl ChunkedStream {
combined.extend_from_slice(&prev_bytes);
combined.extend_from_slice(&bytes);
debug!(
"取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{},直接返回",
need_size,
combined.len(),
bytes.len(),
);
// debug!(
// "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{},直接返回",
// need_size,
// combined.len(),
// bytes.len(),
// );
return Some(Bytes::from(combined));
}

View File

@@ -346,6 +346,10 @@ impl DiskAPI for LocalDisk {
true
}
fn id(&self) -> Uuid {
self.id
}
#[must_use]
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes> {
let p = self.get_object_path(&volume, &path)?;
@@ -427,19 +431,33 @@ impl DiskAPI for LocalDisk {
Ok(())
}
async fn append_file(&self, volume: &str, path: &str, mut r: DuplexStream) -> Result<()> {
async fn append_file(&self, volume: &str, path: &str, buf: &[u8]) -> Result<()> {
let p = self.get_object_path(&volume, &path)?;
debug!("append_file start {} {:?}", self.id(), &p);
if let Some(dir_path) = p.parent() {
fs::create_dir_all(&dir_path).await?;
}
debug!("append_file open {} {:?}", self.id(), &p);
let mut file = File::options()
.read(true)
.create(true)
.write(true)
.append(true) // 设置为追加模式
.open(p)
// .append(true)
.open(&p)
.await?;
let mut writer = BufWriter::new(file);
debug!("append_file opened {} {:?}", self.id(), &p);
io::copy(&mut r, &mut writer).await?;
// let mut writer = BufWriter::new(file);
file.write(&buf).await?;
debug!("append_file end {} {}", self.id(), path);
// io::copy(&mut r, &mut file).await?;
Ok(())
}

View File

@@ -11,12 +11,13 @@ use crate::store_api::{FileInfo, RawFileInfo};
#[async_trait::async_trait]
pub trait DiskAPI: Debug + Send + Sync + 'static {
fn is_local(&self) -> bool;
fn id(&self) -> Uuid;
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes>;
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()>;
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()>;
async fn create_file(&self, origvolume: &str, volume: &str, path: &str, file_size: usize, r: DuplexStream) -> Result<()>;
async fn append_file(&self, volume: &str, path: &str, r: DuplexStream) -> Result<()>;
async fn append_file(&self, volume: &str, path: &str, r: &[u8]) -> Result<()>;
async fn rename_data(
&self,
src_volume: &str,

View File

@@ -39,19 +39,20 @@ impl Erasure {
{
let mut stream = ChunkedStream::new(body, data_size, block_size, true);
let mut total: usize = 0;
let mut idx = 0;
while let Some(result) = stream.next().await {
match result {
Ok(data) => {
let blocks = self.encode_data(data.as_ref())?;
let mut errs = Vec::new();
idx += 1;
for (i, w) in writers.iter_mut().enumerate() {
total += blocks[i].len();
debug!("encode write {}", blocks[i].len());
debug!("{}-{} encode write {} , total:{}", idx, i, blocks[i].len(), total);
match w.write_all(blocks[i].as_ref()).await {
match w.write(blocks[i].as_ref()).await {
Ok(_) => errs.push(None),
Err(e) => errs.push(Some(e)),
}

View File

@@ -1,8 +1,9 @@
use std::{io, task::Poll};
use futures::Future;
use futures::{ready, Future};
use tokio::io::AsyncWrite;
use tracing::debug;
use uuid::Uuid;
use crate::disk::DiskStore;
@@ -14,13 +15,18 @@ pub struct AppendWriter<'a> {
impl<'a> AppendWriter<'a> {
pub fn new(disk: DiskStore, volume: &'a str, path: &'a str) -> Self {
debug!("AppendWriter new {}: {}/{}", disk.id(), volume, path);
Self { disk, volume, path }
}
async fn async_write(&self, buf: &[u8]) -> Result<(), std::io::Error> {
debug!("async_write {}: {}", &self.path, buf.len());
debug!("async_write {}: {}: {}", self.disk.id(), &self.path, buf.len());
unimplemented!()
self.disk
.append_file(&self.volume, &self.path, buf)
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
Ok(())
}
}
@@ -31,10 +37,40 @@ impl<'a> AsyncWrite for AppendWriter<'a> {
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
let mut fut = Box::pin(self.async_write(buf));
debug!("AsyncWrite poll_write {}, buf:{}", self.disk.id(), buf.len());
// while let Poll::Ready(e) = fut.as_mut().poll(cx) {
// let a = match e {
// Ok(_) => {
// debug!("Ready ok {}", self.disk.id());
// Poll::Ready(Ok(buf.len()))
// }
// Err(e) => {
// debug!("Ready err {}", self.disk.id());
// Poll::Ready(Err(e))
// }
// };
// return a;
// }
// Poll::Pending
match fut.as_mut().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Ok(buf.len())),
Poll::Pending => {
debug!("Pending {}", self.disk.id());
Poll::Pending
}
Poll::Ready(e) => match e {
Ok(_) => {
debug!("Ready ok {}", self.disk.id());
Poll::Ready(Ok(buf.len()))
}
Err(e) => {
debug!("Ready err {}", self.disk.id());
Poll::Ready(Err(e))
}
},
}
}