diff --git a/ecstore/src/chunk_stream.rs b/ecstore/src/chunk_stream.rs index 5fb46625..5a783ba0 100644 --- a/ecstore/src/chunk_stream.rs +++ b/ecstore/src/chunk_stream.rs @@ -37,13 +37,13 @@ impl ChunkedStream { None => break, Some(Err(e)) => return Err(e), Some(Ok((data, remaining_bytes))) => { - debug!( - "content_length:{},readed_size:{}, read_data data:{}, remaining_bytes: {} ", - content_length, - readed_size, - data.len(), - remaining_bytes.len() - ); + // debug!( + // "content_length:{},readed_size:{}, read_data data:{}, remaining_bytes: {} ", + // content_length, + // readed_size, + // data.len(), + // remaining_bytes.len() + // ); prev_bytes = remaining_bytes; data @@ -107,7 +107,7 @@ impl ChunkedStream { // 只执行一次 let mut push_data_bytes = |mut bytes: Bytes| { - debug!("read from body {} split per {}, prev_bytes: {}", bytes.len(), data_size, prev_bytes.len()); + // debug!("read from body {} split per {}, prev_bytes: {}", bytes.len(), data_size, prev_bytes.len()); if bytes.is_empty() { return None; @@ -132,12 +132,12 @@ impl ChunkedStream { combined.extend_from_slice(&prev_bytes); combined.extend_from_slice(&data); - debug!( - "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{}", - need_size, - combined.len(), - bytes.len(), - ); + // debug!( + // "取到的长度大于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{}", + // need_size, + // combined.len(), + // bytes.len(), + // ); bytes_buffer.push(Bytes::from(combined)); } else { @@ -145,12 +145,12 @@ impl ChunkedStream { combined.extend_from_slice(&prev_bytes); combined.extend_from_slice(&bytes); - debug!( - "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{},直接返回", - need_size, - combined.len(), - bytes.len(), - ); + // debug!( + // "取到的长度小于所需,取出需要的长度:{},与上一次合并得到:{},bytes剩余:{},直接返回", + // need_size, + // combined.len(), + // bytes.len(), + // ); return Some(Bytes::from(combined)); } diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index 147630c0..9df80c69 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -346,6 +346,10 @@ impl DiskAPI for LocalDisk { true } + fn id(&self) -> Uuid { + self.id + } + #[must_use] async fn read_all(&self, volume: &str, path: &str) -> Result { let p = self.get_object_path(&volume, &path)?; @@ -427,19 +431,33 @@ impl DiskAPI for LocalDisk { Ok(()) } - async fn append_file(&self, volume: &str, path: &str, mut r: DuplexStream) -> Result<()> { + async fn append_file(&self, volume: &str, path: &str, buf: &[u8]) -> Result<()> { let p = self.get_object_path(&volume, &path)?; + debug!("append_file start {} {:?}", self.id(), &p); + + if let Some(dir_path) = p.parent() { + fs::create_dir_all(&dir_path).await?; + } + + debug!("append_file open {} {:?}", self.id(), &p); + let mut file = File::options() + .read(true) .create(true) .write(true) - .append(true) // 设置为追加模式 - .open(p) + // .append(true) + .open(&p) .await?; - let mut writer = BufWriter::new(file); + debug!("append_file opened {} {:?}", self.id(), &p); - io::copy(&mut r, &mut writer).await?; + // let mut writer = BufWriter::new(file); + + file.write(&buf).await?; + + debug!("append_file end {} {}", self.id(), path); + // io::copy(&mut r, &mut file).await?; Ok(()) } diff --git a/ecstore/src/disk_api.rs b/ecstore/src/disk_api.rs index b1ea4a33..8e716b52 100644 --- a/ecstore/src/disk_api.rs +++ b/ecstore/src/disk_api.rs @@ -11,12 +11,13 @@ use crate::store_api::{FileInfo, RawFileInfo}; #[async_trait::async_trait] pub trait DiskAPI: Debug + Send + Sync + 'static { fn is_local(&self) -> bool; + fn id(&self) -> Uuid; async fn read_all(&self, volume: &str, path: &str) -> Result; async fn write_all(&self, volume: &str, path: &str, data: Vec) -> Result<()>; async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()>; async fn create_file(&self, origvolume: &str, volume: &str, path: &str, file_size: usize, r: DuplexStream) -> Result<()>; - async fn append_file(&self, volume: &str, path: &str, r: DuplexStream) -> Result<()>; + async fn append_file(&self, volume: &str, path: &str, r: &[u8]) -> Result<()>; async fn rename_data( &self, src_volume: &str, diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 7c7da1ff..5de84484 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -39,19 +39,20 @@ impl Erasure { { let mut stream = ChunkedStream::new(body, data_size, block_size, true); let mut total: usize = 0; + let mut idx = 0; while let Some(result) = stream.next().await { match result { Ok(data) => { let blocks = self.encode_data(data.as_ref())?; let mut errs = Vec::new(); - + idx += 1; for (i, w) in writers.iter_mut().enumerate() { total += blocks[i].len(); - debug!("encode write {}", blocks[i].len()); + debug!("{}-{} encode write {} , total:{}", idx, i, blocks[i].len(), total); - match w.write_all(blocks[i].as_ref()).await { + match w.write(blocks[i].as_ref()).await { Ok(_) => errs.push(None), Err(e) => errs.push(Some(e)), } diff --git a/ecstore/src/writer.rs b/ecstore/src/writer.rs index 877b7a27..beb189c7 100644 --- a/ecstore/src/writer.rs +++ b/ecstore/src/writer.rs @@ -1,8 +1,9 @@ use std::{io, task::Poll}; -use futures::Future; +use futures::{ready, Future}; use tokio::io::AsyncWrite; use tracing::debug; +use uuid::Uuid; use crate::disk::DiskStore; @@ -14,13 +15,18 @@ pub struct AppendWriter<'a> { impl<'a> AppendWriter<'a> { pub fn new(disk: DiskStore, volume: &'a str, path: &'a str) -> Self { + debug!("AppendWriter new {}: {}/{}", disk.id(), volume, path); Self { disk, volume, path } } async fn async_write(&self, buf: &[u8]) -> Result<(), std::io::Error> { - debug!("async_write {}: {}", &self.path, buf.len()); + debug!("async_write {}: {}: {}", self.disk.id(), &self.path, buf.len()); - unimplemented!() + self.disk + .append_file(&self.volume, &self.path, buf) + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + Ok(()) } } @@ -31,10 +37,40 @@ impl<'a> AsyncWrite for AppendWriter<'a> { buf: &[u8], ) -> std::task::Poll> { let mut fut = Box::pin(self.async_write(buf)); + debug!("AsyncWrite poll_write {}, buf:{}", self.disk.id(), buf.len()); + + // while let Poll::Ready(e) = fut.as_mut().poll(cx) { + // let a = match e { + // Ok(_) => { + // debug!("Ready ok {}", self.disk.id()); + // Poll::Ready(Ok(buf.len())) + // } + // Err(e) => { + // debug!("Ready err {}", self.disk.id()); + // Poll::Ready(Err(e)) + // } + // }; + + // return a; + // } + + // Poll::Pending match fut.as_mut().poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(_) => Poll::Ready(Ok(buf.len())), + Poll::Pending => { + debug!("Pending {}", self.disk.id()); + Poll::Pending + } + Poll::Ready(e) => match e { + Ok(_) => { + debug!("Ready ok {}", self.disk.id()); + Poll::Ready(Ok(buf.len())) + } + Err(e) => { + debug!("Ready err {}", self.disk.id()); + Poll::Ready(Err(e)) + } + }, } }