todo:ec decode

This commit is contained in:
weisd
2024-07-23 17:02:52 +08:00
parent 815bc126a7
commit 22c85612c7
3 changed files with 190 additions and 33 deletions

View File

@@ -20,9 +20,11 @@ use uuid::Uuid;
use crate::{
disk_api::{
DeleteOptions, DiskAPI, DiskError, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, VolumeInfo,
DeleteOptions, DiskAPI, DiskError, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions,
RenameDataResp, VolumeInfo,
},
endpoint::{Endpoint, Endpoints},
erasure::ReadAt,
file_meta::FileMeta,
format::FormatV3,
store_api::{FileInfo, RawFileInfo},
@@ -505,19 +507,21 @@ impl DiskAPI for LocalDisk {
// Ok(())
}
async fn read_file(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result<(Vec<u8>, usize)> {
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
let p = self.get_object_path(&volume, &path)?;
let mut file = File::options().read(true).open(&p).await?;
let file = File::options().read(true).open(&p).await?;
file.seek(SeekFrom::Start(offset as u64)).await?;
Ok(FileReader::new(file))
let mut buffer = vec![0; length];
// file.seek(SeekFrom::Start(offset as u64)).await?;
let bytes_read = file.read(&mut buffer).await?;
// let mut buffer = vec![0; length];
buffer.truncate(bytes_read);
// let bytes_read = file.read(&mut buffer).await?;
Ok((buffer, bytes_read))
// buffer.truncate(bytes_read);
// Ok((buffer, bytes_read))
}
async fn rename_data(
&self,

View File

@@ -1,12 +1,18 @@
use std::{fmt::Debug, pin::Pin};
use std::{fmt::Debug, io::SeekFrom, pin::Pin};
use anyhow::{Error, Result};
use bytes::Bytes;
use time::OffsetDateTime;
use tokio::io::AsyncWrite;
use tokio::{
fs::File,
io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite},
};
use uuid::Uuid;
use crate::store_api::{FileInfo, RawFileInfo};
use crate::{
erasure::ReadAt,
store_api::{FileInfo, RawFileInfo},
};
#[async_trait::async_trait]
pub trait DiskAPI: Debug + Send + Sync + 'static {
@@ -19,7 +25,7 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()>;
async fn create_file(&self, origvolume: &str, volume: &str, path: &str, file_size: usize) -> Result<FileWriter>;
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter>;
async fn read_file(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result<(Vec<u8>, usize)>;
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader>;
async fn rename_data(
&self,
src_volume: &str,
@@ -139,6 +145,30 @@ impl FileWriter {
}
}
pub struct FileReader {
pub inner: File,
}
impl FileReader {
pub fn new(inner: File) -> Self {
Self { inner }
}
}
impl ReadAt for FileReader {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)> {
self.inner.seek(SeekFrom::Start(offset as u64)).await?;
let mut buffer = vec![0; length];
let bytes_read = self.inner.read(&mut buffer).await?;
buffer.truncate(bytes_read);
Ok((buffer, bytes_read))
}
}
#[derive(Debug, thiserror::Error)]
pub enum DiskError {
#[error("file not found")]

View File

@@ -1,8 +1,11 @@
use anyhow::anyhow;
use anyhow::Error;
use anyhow::Result;
use bytes::Bytes;
use futures::future::join_all;
use futures::{Stream, StreamExt};
use reed_solomon_erasure::galois_8::ReedSolomon;
use s3s::dto::StreamingBlob;
use s3s::StdError;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
@@ -10,10 +13,12 @@ use tokio::io::AsyncWriteExt;
use uuid::Uuid;
use crate::chunk_stream::ChunkedStream;
use crate::disk_api::DiskError;
use crate::disk_api::FileReader;
pub struct Erasure {
// data_shards: usize,
// parity_shards: usize,
data_shards: usize,
parity_shards: usize,
encoder: ReedSolomon,
block_size: usize,
id: Uuid,
@@ -22,8 +27,8 @@ pub struct Erasure {
impl Erasure {
pub fn new(data_shards: usize, parity_shards: usize, block_size: usize) -> Self {
Erasure {
// data_shards,
// parity_shards,
data_shards,
parity_shards,
block_size,
encoder: ReedSolomon::new(data_shards, parity_shards).unwrap(),
id: Uuid::new_v4(),
@@ -35,23 +40,23 @@ impl Erasure {
body: S,
writers: &mut Vec<W>,
// block_size: usize,
data_size: usize,
total_size: usize,
_write_quorum: usize,
) -> Result<usize>
where
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
W: AsyncWrite + Unpin,
{
let mut stream = ChunkedStream::new(body, data_size, self.block_size, true);
let mut stream = ChunkedStream::new(body, total_size, self.block_size, true);
let mut total: usize = 0;
let mut idx = 0;
// let mut idx = 0;
while let Some(result) = stream.next().await {
match result {
Ok(data) => {
let blocks = self.encode_data(data.as_ref())?;
let mut errs = Vec::new();
idx += 1;
// idx += 1;
for (i, w) in writers.iter_mut().enumerate() {
total += blocks[i].len();
@@ -95,31 +100,74 @@ impl Erasure {
// }
}
pub async fn decode<R, W>(&self, writer: W, readers: Vec<R>, offset: usize, length: usize, total_length: usize) -> Result<()>
where
R: ReadAt,
W: AsyncWrite + Unpin,
{
pub async fn decode(
&self,
writer: &StreamingBlob,
readers: Vec<Option<FileReader>>,
offset: usize,
length: usize,
total_length: usize,
) -> Result<usize> {
if length == 0 {
return Ok(0);
}
let mut reader = ShardReader::new(readers, self, offset, total_length);
let start_block = offset / self.block_size;
let end_block = (offset + length) / self.block_size;
for block_idx in start_block..=end_block {
let mut block_offset = 0;
let mut block_length = 0;
if start_block == end_block {
block_offset = offset % self.block_size;
block_length = length;
} else if block_idx == start_block {
block_offset = offset % self.block_size;
block_length = self.block_size - block_offset;
} else if block_idx == end_block {
block_offset = 0;
block_length = (offset + length) % self.block_size;
} else {
block_offset = 0;
block_length = self.block_size;
}
if block_length == 0 {
break;
}
let mut bufs = reader.read().await?;
self.decode_data(&mut bufs)?;
}
unimplemented!()
}
fn write_data_blocks() -> Result<()> {
unimplemented!()
}
pub fn encode_data(&self, data: &[u8]) -> Result<Vec<Vec<u8>>> {
let (shard_size, total_size) = self.need_size(data.len());
// 生成一个新的 所需的所有分片数据长度
let mut data_buffer = vec![0u8; total_size];
{
// 复制源数据
let (left, _) = data_buffer.split_at_mut(data.len());
left.copy_from_slice(data);
}
{
// ec encode, 结果会写进 data_buffer
let data_slices: Vec<&mut [u8]> = data_buffer.chunks_mut(shard_size).collect();
self.encoder.encode(data_slices)?;
}
// Ok(data_buffer)
// 分片
let mut shards = Vec::with_capacity(self.encoder.total_shard_count());
let slices: Vec<&[u8]> = data_buffer.chunks(shard_size).collect();
@@ -142,22 +190,97 @@ impl Erasure {
(shard_size, shard_size * (self.encoder.total_shard_count()))
}
// 算出每个分片大小
fn shard_size(&self, data_size: usize) -> usize {
(data_size + self.encoder.data_shard_count() - 1) / self.encoder.data_shard_count()
}
// returns final erasure size from original size.
fn shard_file_size(&self, total_size: usize) -> usize {
if total_size == 0 {
return 0;
}
let mut num_shards = total_size / self.block_size;
let last_block_size = total_size % self.block_size;
// let last_shard_size = (last_block_size + self.data_shards - 1) / self.data_shards;
// num_shards * self.shard_size(self.block_size) + last_shard_size
// 因为写入的时候ec需要补全所以最后一个长度应该也是一样的
if last_block_size != 0 {
num_shards += 1
}
num_shards * self.shard_size(self.block_size)
}
}
pub trait ReadAt {
async fn read_at(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result<(Vec<u8>, usize)>;
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)>;
}
pub struct ShardReader<R> {
readers: Vec<R>,
pub struct ShardReader {
readers: Vec<Option<FileReader>>, // 磁盘
data_block_count: usize, // 总的分片数量
shard_size: usize, // 每个分片的块大小 一次读取一块
shard_file_size: usize, // 分片文件总长度
offset: usize, // 在分片中的offset
}
impl<R: ReadAt> ShardReader<R> {
pub fn new(readers: Vec<R>, offset: usize, total_length: usize) -> Self {
Self { readers }
impl ShardReader {
pub fn new(readers: Vec<Option<FileReader>>, ec: &Erasure, offset: usize, total_length: usize) -> Self {
Self {
readers,
data_block_count: ec.encoder.data_shard_count(),
shard_size: ec.shard_size(ec.block_size),
shard_file_size: ec.shard_file_size(total_length),
offset: (offset / ec.block_size) * ec.shard_size(ec.block_size),
}
}
pub async fn read(&mut self) -> Result<Vec<Option<Vec<u8>>>> {
// let mut disks = self.readers;
let reader_length = self.readers.len();
let mut futures = Vec::with_capacity(reader_length);
let mut errors = Vec::with_capacity(reader_length);
let mut ress = Vec::with_capacity(reader_length);
for disk in self.readers.iter_mut() {
if disk.is_none() {
ress.push(None);
errors.push(Some(Error::new(DiskError::DiskNotFound)));
continue;
}
let disk: &mut FileReader = disk.as_mut().unwrap();
futures.push(disk.read_at(self.offset, self.shard_size));
}
let results = join_all(futures).await;
for result in results {
match result {
Ok((res, _)) => {
ress.push(Some(res));
errors.push(None);
}
Err(e) => {
ress.push(None);
errors.push(Some(e));
}
}
}
if !self.can_decode(&ress) {
return Err(Error::msg("shard reader read faild"));
}
self.offset += self.shard_size;
Ok(ress)
}
fn can_decode(&self, bufs: &Vec<Option<Vec<u8>>>) -> bool {
bufs.iter().filter(|v| v.is_some()).count() > self.data_block_count
}
}