test: erasure

This commit is contained in:
weisd
2024-07-10 16:12:01 +08:00
parent 3e77ca6497
commit f991e7b5f3
5 changed files with 67 additions and 53 deletions

View File

@@ -37,13 +37,13 @@ impl ChunkedStream {
None => break,
Some(Err(e)) => return Err(e),
Some(Ok((data, remaining_bytes))) => {
// debug!(
// "content_length:{},readed_size:{}, read_data data:{}, remaining_bytes: {} ",
// content_length,
// readed_size,
// data.len(),
// remaining_bytes.len()
// );
debug!(
"content_length:{},readed_size:{}, read_data data:{}, remaining_bytes: {} ",
content_length,
readed_size,
data.len(),
remaining_bytes.len()
);
prev_bytes = remaining_bytes;
data
@@ -53,20 +53,17 @@ impl ChunkedStream {
for bytes in data {
readed_size += bytes.len();
// println!(
// "readed_size {}, content_length {}",
// readed_size, content_length,
// );
println!("readed_size {}, content_length {}", readed_size, content_length,);
y.yield_ok(bytes).await;
}
if readed_size + prev_bytes.len() >= content_length {
// println!(
// "读完了 readed_size:{} + prev_bytes.len({}) == content_length {}",
// readed_size,
// prev_bytes.len(),
// content_length,
// );
println!(
"读完了 readed_size:{} + prev_bytes.len({}) == content_length {}",
readed_size,
prev_bytes.len(),
content_length,
);
// 填充0
if !need_padding {
@@ -107,7 +104,7 @@ impl ChunkedStream {
// 只执行一次
let mut push_data_bytes = |mut bytes: Bytes| {
// debug!("read from body {} split per {}, prev_bytes: {}", bytes.len(), data_size, prev_bytes.len());
debug!("read from body {} split per {}, prev_bytes: {}", bytes.len(), data_size, prev_bytes.len());
if bytes.is_empty() {
return None;
@@ -120,24 +117,24 @@ impl ChunkedStream {
// 合并上一次数据
if !prev_bytes.is_empty() {
let need_size = data_size.wrapping_sub(prev_bytes.len());
// println!(
// " 上一次有剩余{},从这一次中取{},共:{}",
// prev_bytes.len(),
// need_size,
// prev_bytes.len() + need_size
// );
println!(
" 上一次有剩余{},从这一次中取{},共:{}",
prev_bytes.len(),
need_size,
prev_bytes.len() + need_size
);
if bytes.len() >= need_size {
let data = bytes.split_to(need_size);
let mut combined = Vec::new();
combined.extend_from_slice(&prev_bytes);
combined.extend_from_slice(&data);
// debug!(
// "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{}",
// need_size,
// combined.len(),
// bytes.len(),
// );
debug!(
"取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{}",
need_size,
combined.len(),
bytes.len(),
);
bytes_buffer.push(Bytes::from(combined));
} else {
@@ -145,12 +142,12 @@ impl ChunkedStream {
combined.extend_from_slice(&prev_bytes);
combined.extend_from_slice(&bytes);
// debug!(
// "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{},直接返回",
// need_size,
// combined.len(),
// bytes.len(),
// );
debug!(
"取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{}bytes剩余{},直接返回",
need_size,
combined.len(),
bytes.len(),
);
return Some(Bytes::from(combined));
}

View File

@@ -431,16 +431,16 @@ impl DiskAPI for LocalDisk {
Ok(())
}
async fn append_file(&self, volume: &str, path: &str, buf: &[u8]) -> Result<()> {
async fn append_file(&self, volume: &str, path: &str, mut r: DuplexStream) -> Result<()> {
let p = self.get_object_path(&volume, &path)?;
debug!("append_file start {} {:?}", self.id(), &p);
if let Some(dir_path) = p.parent() {
fs::create_dir_all(&dir_path).await?;
}
// if let Some(dir_path) = p.parent() {
// fs::create_dir_all(&dir_path).await?;
// }
debug!("append_file open {} {:?}", self.id(), &p);
// debug!("append_file open {} {:?}", self.id(), &p);
let mut file = File::options()
.read(true)
@@ -450,11 +450,9 @@ impl DiskAPI for LocalDisk {
.open(&p)
.await?;
debug!("append_file opened {} {:?}", self.id(), &p);
let mut writer = BufWriter::new(file);
// let mut writer = BufWriter::new(file);
file.write(&buf).await?;
io::copy(&mut r, &mut writer).await?;
debug!("append_file end {} {}", self.id(), path);
// io::copy(&mut r, &mut file).await?;

View File

@@ -17,7 +17,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()>;
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()>;
async fn create_file(&self, origvolume: &str, volume: &str, path: &str, file_size: usize, r: DuplexStream) -> Result<()>;
async fn append_file(&self, volume: &str, path: &str, r: &[u8]) -> Result<()>;
async fn append_file(&self, volume: &str, path: &str, r: DuplexStream) -> Result<()>;
async fn rename_data(
&self,
src_volume: &str,

View File

@@ -1,4 +1,5 @@
use anyhow::anyhow;
use anyhow::Error;
use anyhow::Result;
use bytes::Bytes;
use futures::{Stream, StreamExt};
@@ -7,6 +8,7 @@ use s3s::StdError;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tracing::debug;
use uuid::Uuid;
use crate::chunk_stream::ChunkedStream;
@@ -14,6 +16,7 @@ pub struct Erasure {
// data_shards: usize,
// parity_shards: usize,
encoder: ReedSolomon,
id: Uuid,
}
impl Erasure {
@@ -22,6 +25,7 @@ impl Erasure {
// data_shards,
// parity_shards,
encoder: ReedSolomon::new(data_shards, parity_shards).unwrap(),
id: Uuid::new_v4(),
}
}
@@ -50,7 +54,15 @@ impl Erasure {
for (i, w) in writers.iter_mut().enumerate() {
total += blocks[i].len();
debug!("{}-{} encode write {} , total:{}", idx, i, blocks[i].len(), total);
debug!(
"{} {}-{} encode write {} , total:{}, readed:{}",
self.id,
idx,
i,
blocks[i].len(),
data_size,
total
);
match w.write(blocks[i].as_ref()).await {
Ok(_) => errs.push(None),
@@ -58,13 +70,20 @@ impl Erasure {
}
}
debug!("encode_data write errs:{:?}", errs);
debug!("{} encode_data write errs:{:?}", self.id, errs);
// TODO: reduceWriteQuorumErrs
for err in errs.iter() {
if err.is_some() {
return Err(Error::msg("message"));
}
}
}
Err(e) => return Err(anyhow!(e)),
}
}
debug!("{} encode_data done {}", self.id, total);
Ok(total)
// loop {

View File

@@ -1,7 +1,7 @@
use std::{io, task::Poll};
use futures::{ready, Future};
use tokio::io::AsyncWrite;
use tokio::io::{AsyncWrite, BufWriter};
use tracing::debug;
use uuid::Uuid;
@@ -22,10 +22,10 @@ impl<'a> AppendWriter<'a> {
async fn async_write(&self, buf: &[u8]) -> Result<(), std::io::Error> {
debug!("async_write {}: {}: {}", self.disk.id(), &self.path, buf.len());
self.disk
.append_file(&self.volume, &self.path, buf)
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
// self.disk
// .append_file(&self.volume, &self.path, buf)
// .await
// .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
Ok(())
}
}