diff --git a/ecstore/src/chunk_stream.rs b/ecstore/src/chunk_stream.rs index 5a783ba0..f8ec02ae 100644 --- a/ecstore/src/chunk_stream.rs +++ b/ecstore/src/chunk_stream.rs @@ -37,13 +37,13 @@ impl ChunkedStream { None => break, Some(Err(e)) => return Err(e), Some(Ok((data, remaining_bytes))) => { - // debug!( - // "content_length:{},readed_size:{}, read_data data:{}, remaining_bytes: {} ", - // content_length, - // readed_size, - // data.len(), - // remaining_bytes.len() - // ); + debug!( + "content_length:{},readed_size:{}, read_data data:{}, remaining_bytes: {} ", + content_length, + readed_size, + data.len(), + remaining_bytes.len() + ); prev_bytes = remaining_bytes; data @@ -53,20 +53,17 @@ impl ChunkedStream { for bytes in data { readed_size += bytes.len(); - // println!( - // "readed_size {}, content_length {}", - // readed_size, content_length, - // ); + println!("readed_size {}, content_length {}", readed_size, content_length,); y.yield_ok(bytes).await; } if readed_size + prev_bytes.len() >= content_length { - // println!( - // "读完了 readed_size:{} + prev_bytes.len({}) == content_length {}", - // readed_size, - // prev_bytes.len(), - // content_length, - // ); + println!( + "读完了 readed_size:{} + prev_bytes.len({}) == content_length {}", + readed_size, + prev_bytes.len(), + content_length, + ); // 填充0? if !need_padding { @@ -107,7 +104,7 @@ impl ChunkedStream { // 只执行一次 let mut push_data_bytes = |mut bytes: Bytes| { - // debug!("read from body {} split per {}, prev_bytes: {}", bytes.len(), data_size, prev_bytes.len()); + debug!("read from body {} split per {}, prev_bytes: {}", bytes.len(), data_size, prev_bytes.len()); if bytes.is_empty() { return None; @@ -120,24 +117,24 @@ impl ChunkedStream { // 合并上一次数据 if !prev_bytes.is_empty() { let need_size = data_size.wrapping_sub(prev_bytes.len()); - // println!( - // " 上一次有剩余{},从这一次中取{},共:{}", - // prev_bytes.len(), - // need_size, - // prev_bytes.len() + need_size - // ); + println!( + " 上一次有剩余{},从这一次中取{},共:{}", + prev_bytes.len(), + need_size, + prev_bytes.len() + need_size + ); if bytes.len() >= need_size { let data = bytes.split_to(need_size); let mut combined = Vec::new(); combined.extend_from_slice(&prev_bytes); combined.extend_from_slice(&data); - // debug!( - // "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{}", - // need_size, - // combined.len(), - // bytes.len(), - // ); + debug!( + "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{}", + need_size, + combined.len(), + bytes.len(), + ); bytes_buffer.push(Bytes::from(combined)); } else { @@ -145,12 +142,12 @@ impl ChunkedStream { combined.extend_from_slice(&prev_bytes); combined.extend_from_slice(&bytes); - // debug!( - // "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{},直接返回", - // need_size, - // combined.len(), - // bytes.len(), - // ); + debug!( + "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{},直接返回", + need_size, + combined.len(), + bytes.len(), + ); return Some(Bytes::from(combined)); } diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index 9df80c69..f5bbfde8 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -431,16 +431,16 @@ impl DiskAPI for LocalDisk { Ok(()) } - async fn append_file(&self, volume: &str, path: &str, buf: &[u8]) -> Result<()> { + async fn append_file(&self, volume: &str, path: &str, mut r: DuplexStream) -> Result<()> { let p = self.get_object_path(&volume, &path)?; debug!("append_file start {} {:?}", self.id(), &p); - if let Some(dir_path) = p.parent() { - fs::create_dir_all(&dir_path).await?; - } + // if let Some(dir_path) = p.parent() { + // fs::create_dir_all(&dir_path).await?; + // } - debug!("append_file open {} {:?}", self.id(), &p); + // debug!("append_file open {} {:?}", self.id(), &p); let mut file = File::options() .read(true) @@ -450,11 +450,9 @@ impl DiskAPI for LocalDisk { .open(&p) .await?; - debug!("append_file opened {} {:?}", self.id(), &p); + let mut writer = BufWriter::new(file); - // let mut writer = BufWriter::new(file); - - file.write(&buf).await?; + io::copy(&mut r, &mut writer).await?; debug!("append_file end {} {}", self.id(), path); // io::copy(&mut r, &mut file).await?; diff --git a/ecstore/src/disk_api.rs b/ecstore/src/disk_api.rs index 8e716b52..56e1f4b4 100644 --- a/ecstore/src/disk_api.rs +++ b/ecstore/src/disk_api.rs @@ -17,7 +17,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { async fn write_all(&self, volume: &str, path: &str, data: Vec) -> Result<()>; async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()>; async fn create_file(&self, origvolume: &str, volume: &str, path: &str, file_size: usize, r: DuplexStream) -> Result<()>; - async fn append_file(&self, volume: &str, path: &str, r: &[u8]) -> Result<()>; + async fn append_file(&self, volume: &str, path: &str, r: DuplexStream) -> Result<()>; async fn rename_data( &self, src_volume: &str, diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 5de84484..f371cc2a 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -1,4 +1,5 @@ use anyhow::anyhow; +use anyhow::Error; use anyhow::Result; use bytes::Bytes; use futures::{Stream, StreamExt}; @@ -7,6 +8,7 @@ use s3s::StdError; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tracing::debug; +use uuid::Uuid; use crate::chunk_stream::ChunkedStream; @@ -14,6 +16,7 @@ pub struct Erasure { // data_shards: usize, // parity_shards: usize, encoder: ReedSolomon, + id: Uuid, } impl Erasure { @@ -22,6 +25,7 @@ impl Erasure { // data_shards, // parity_shards, encoder: ReedSolomon::new(data_shards, parity_shards).unwrap(), + id: Uuid::new_v4(), } } @@ -50,7 +54,15 @@ impl Erasure { for (i, w) in writers.iter_mut().enumerate() { total += blocks[i].len(); - debug!("{}-{} encode write {} , total:{}", idx, i, blocks[i].len(), total); + debug!( + "{} {}-{} encode write {} , total:{}, readed:{}", + self.id, + idx, + i, + blocks[i].len(), + data_size, + total + ); match w.write(blocks[i].as_ref()).await { Ok(_) => errs.push(None), @@ -58,13 +70,20 @@ impl Erasure { } } - debug!("encode_data write errs:{:?}", errs); + debug!("{} encode_data write errs:{:?}", self.id, errs); // TODO: reduceWriteQuorumErrs + for err in errs.iter() { + if err.is_some() { + return Err(Error::msg("message")); + } + } } Err(e) => return Err(anyhow!(e)), } } + debug!("{} encode_data done {}", self.id, total); + Ok(total) // loop { diff --git a/ecstore/src/writer.rs b/ecstore/src/writer.rs index beb189c7..48257dc0 100644 --- a/ecstore/src/writer.rs +++ b/ecstore/src/writer.rs @@ -1,7 +1,7 @@ use std::{io, task::Poll}; use futures::{ready, Future}; -use tokio::io::AsyncWrite; +use tokio::io::{AsyncWrite, BufWriter}; use tracing::debug; use uuid::Uuid; @@ -22,10 +22,10 @@ impl<'a> AppendWriter<'a> { async fn async_write(&self, buf: &[u8]) -> Result<(), std::io::Error> { debug!("async_write {}: {}: {}", self.disk.id(), &self.path, buf.len()); - self.disk - .append_file(&self.volume, &self.path, buf) - .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + // self.disk + // .append_file(&self.volume, &self.path, buf) + // .await + // .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; Ok(()) } }