diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index 96b7fc99..3309017a 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -20,9 +20,11 @@ use uuid::Uuid; use crate::{ disk_api::{ - DeleteOptions, DiskAPI, DiskError, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, VolumeInfo, + DeleteOptions, DiskAPI, DiskError, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, + RenameDataResp, VolumeInfo, }, endpoint::{Endpoint, Endpoints}, + erasure::ReadAt, file_meta::FileMeta, format::FormatV3, store_api::{FileInfo, RawFileInfo}, @@ -505,19 +507,21 @@ impl DiskAPI for LocalDisk { // Ok(()) } - async fn read_file(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result<(Vec, usize)> { + async fn read_file(&self, volume: &str, path: &str) -> Result { let p = self.get_object_path(&volume, &path)?; - let mut file = File::options().read(true).open(&p).await?; + let file = File::options().read(true).open(&p).await?; - file.seek(SeekFrom::Start(offset as u64)).await?; + Ok(FileReader::new(file)) - let mut buffer = vec![0; length]; + // file.seek(SeekFrom::Start(offset as u64)).await?; - let bytes_read = file.read(&mut buffer).await?; + // let mut buffer = vec![0; length]; - buffer.truncate(bytes_read); + // let bytes_read = file.read(&mut buffer).await?; - Ok((buffer, bytes_read)) + // buffer.truncate(bytes_read); + + // Ok((buffer, bytes_read)) } async fn rename_data( &self, diff --git a/ecstore/src/disk_api.rs b/ecstore/src/disk_api.rs index 69b55498..c5b4b135 100644 --- a/ecstore/src/disk_api.rs +++ b/ecstore/src/disk_api.rs @@ -1,12 +1,18 @@ -use std::{fmt::Debug, pin::Pin}; +use std::{fmt::Debug, io::SeekFrom, pin::Pin}; use anyhow::{Error, Result}; use bytes::Bytes; use time::OffsetDateTime; -use tokio::io::AsyncWrite; +use tokio::{ + fs::File, + io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite}, +}; use uuid::Uuid; -use crate::store_api::{FileInfo, RawFileInfo}; +use crate::{ + erasure::ReadAt, + store_api::{FileInfo, RawFileInfo}, +}; #[async_trait::async_trait] pub trait DiskAPI: Debug + Send + Sync + 'static { @@ -19,7 +25,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()>; async fn create_file(&self, origvolume: &str, volume: &str, path: &str, file_size: usize) -> Result; async fn append_file(&self, volume: &str, path: &str) -> Result; - async fn read_file(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result<(Vec, usize)>; + async fn read_file(&self, volume: &str, path: &str) -> Result; async fn rename_data( &self, src_volume: &str, @@ -139,6 +145,30 @@ impl FileWriter { } } +pub struct FileReader { + pub inner: File, +} + +impl FileReader { + pub fn new(inner: File) -> Self { + Self { inner } + } +} + +impl ReadAt for FileReader { + async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)> { + self.inner.seek(SeekFrom::Start(offset as u64)).await?; + + let mut buffer = vec![0; length]; + + let bytes_read = self.inner.read(&mut buffer).await?; + + buffer.truncate(bytes_read); + + Ok((buffer, bytes_read)) + } +} + #[derive(Debug, thiserror::Error)] pub enum DiskError { #[error("file not found")] diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index fb82cc8b..b3a715af 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -1,8 +1,11 @@ use anyhow::anyhow; +use anyhow::Error; use anyhow::Result; use bytes::Bytes; +use futures::future::join_all; use futures::{Stream, StreamExt}; use reed_solomon_erasure::galois_8::ReedSolomon; +use s3s::dto::StreamingBlob; use s3s::StdError; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; @@ -10,10 +13,12 @@ use tokio::io::AsyncWriteExt; use uuid::Uuid; use crate::chunk_stream::ChunkedStream; +use crate::disk_api::DiskError; +use crate::disk_api::FileReader; pub struct Erasure { - // data_shards: usize, - // parity_shards: usize, + data_shards: usize, + parity_shards: usize, encoder: ReedSolomon, block_size: usize, id: Uuid, @@ -22,8 +27,8 @@ pub struct Erasure { impl Erasure { pub fn new(data_shards: usize, parity_shards: usize, block_size: usize) -> Self { Erasure { - // data_shards, - // parity_shards, + data_shards, + parity_shards, block_size, encoder: ReedSolomon::new(data_shards, parity_shards).unwrap(), id: Uuid::new_v4(), @@ -35,23 +40,23 @@ impl Erasure { body: S, writers: &mut Vec, // block_size: usize, - data_size: usize, + total_size: usize, _write_quorum: usize, ) -> Result where S: Stream> + Send + Sync + 'static, W: AsyncWrite + Unpin, { - let mut stream = ChunkedStream::new(body, data_size, self.block_size, true); + let mut stream = ChunkedStream::new(body, total_size, self.block_size, true); let mut total: usize = 0; - let mut idx = 0; + // let mut idx = 0; while let Some(result) = stream.next().await { match result { Ok(data) => { let blocks = self.encode_data(data.as_ref())?; let mut errs = Vec::new(); - idx += 1; + // idx += 1; for (i, w) in writers.iter_mut().enumerate() { total += blocks[i].len(); @@ -95,31 +100,74 @@ impl Erasure { // } } - pub async fn decode(&self, writer: W, readers: Vec, offset: usize, length: usize, total_length: usize) -> Result<()> - where - R: ReadAt, - W: AsyncWrite + Unpin, - { + pub async fn decode( + &self, + writer: &StreamingBlob, + readers: Vec>, + offset: usize, + length: usize, + total_length: usize, + ) -> Result { + if length == 0 { + return Ok(0); + } + + let mut reader = ShardReader::new(readers, self, offset, total_length); + + let start_block = offset / self.block_size; + let end_block = (offset + length) / self.block_size; + + for block_idx in start_block..=end_block { + let mut block_offset = 0; + let mut block_length = 0; + if start_block == end_block { + block_offset = offset % self.block_size; + block_length = length; + } else if block_idx == start_block { + block_offset = offset % self.block_size; + block_length = self.block_size - block_offset; + } else if block_idx == end_block { + block_offset = 0; + block_length = (offset + length) % self.block_size; + } else { + block_offset = 0; + block_length = self.block_size; + } + + if block_length == 0 { + break; + } + + let mut bufs = reader.read().await?; + + self.decode_data(&mut bufs)?; + } + unimplemented!() + } + + fn write_data_blocks() -> Result<()> { unimplemented!() } pub fn encode_data(&self, data: &[u8]) -> Result>> { let (shard_size, total_size) = self.need_size(data.len()); + // 生成一个新的 所需的所有分片数据长度 let mut data_buffer = vec![0u8; total_size]; { + // 复制源数据 let (left, _) = data_buffer.split_at_mut(data.len()); left.copy_from_slice(data); } { + // ec encode, 结果会写进 data_buffer let data_slices: Vec<&mut [u8]> = data_buffer.chunks_mut(shard_size).collect(); self.encoder.encode(data_slices)?; } - // Ok(data_buffer) - + // 分片 let mut shards = Vec::with_capacity(self.encoder.total_shard_count()); let slices: Vec<&[u8]> = data_buffer.chunks(shard_size).collect(); @@ -142,22 +190,97 @@ impl Erasure { (shard_size, shard_size * (self.encoder.total_shard_count())) } + // 算出每个分片大小 fn shard_size(&self, data_size: usize) -> usize { (data_size + self.encoder.data_shard_count() - 1) / self.encoder.data_shard_count() } + // returns final erasure size from original size. + fn shard_file_size(&self, total_size: usize) -> usize { + if total_size == 0 { + return 0; + } + + let mut num_shards = total_size / self.block_size; + let last_block_size = total_size % self.block_size; + // let last_shard_size = (last_block_size + self.data_shards - 1) / self.data_shards; + // num_shards * self.shard_size(self.block_size) + last_shard_size + + // 因为写入的时候ec需要补全,所以最后一个长度应该也是一样的 + if last_block_size != 0 { + num_shards += 1 + } + num_shards * self.shard_size(self.block_size) + } } pub trait ReadAt { - async fn read_at(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result<(Vec, usize)>; + async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)>; } -pub struct ShardReader { - readers: Vec, +pub struct ShardReader { + readers: Vec>, // 磁盘 + data_block_count: usize, // 总的分片数量 + shard_size: usize, // 每个分片的块大小 一次读取一块 + shard_file_size: usize, // 分片文件总长度 + offset: usize, // 在分片中的offset } -impl ShardReader { - pub fn new(readers: Vec, offset: usize, total_length: usize) -> Self { - Self { readers } +impl ShardReader { + pub fn new(readers: Vec>, ec: &Erasure, offset: usize, total_length: usize) -> Self { + Self { + readers, + data_block_count: ec.encoder.data_shard_count(), + shard_size: ec.shard_size(ec.block_size), + shard_file_size: ec.shard_file_size(total_length), + offset: (offset / ec.block_size) * ec.shard_size(ec.block_size), + } + } + + pub async fn read(&mut self) -> Result>>> { + // let mut disks = self.readers; + + let reader_length = self.readers.len(); + + let mut futures = Vec::with_capacity(reader_length); + let mut errors = Vec::with_capacity(reader_length); + + let mut ress = Vec::with_capacity(reader_length); + + for disk in self.readers.iter_mut() { + if disk.is_none() { + ress.push(None); + errors.push(Some(Error::new(DiskError::DiskNotFound))); + continue; + } + + let disk: &mut FileReader = disk.as_mut().unwrap(); + futures.push(disk.read_at(self.offset, self.shard_size)); + } + + let results = join_all(futures).await; + for result in results { + match result { + Ok((res, _)) => { + ress.push(Some(res)); + errors.push(None); + } + Err(e) => { + ress.push(None); + errors.push(Some(e)); + } + } + } + + if !self.can_decode(&ress) { + return Err(Error::msg("shard reader read faild")); + } + + self.offset += self.shard_size; + + Ok(ress) + } + fn can_decode(&self, bufs: &Vec>>) -> bool { + bufs.iter().filter(|v| v.is_some()).count() > self.data_block_count } }