todo:put_object

This commit is contained in:
weisd
2024-07-01 18:10:16 +08:00
parent 064a180b3a
commit e2f3721459
11 changed files with 372 additions and 225 deletions

20
Cargo.lock generated
View File

@@ -314,6 +314,7 @@ dependencies = [
"anyhow",
"async-trait",
"bytes",
"crc32fast",
"futures",
"lazy_static",
"netif",
@@ -321,11 +322,14 @@ dependencies = [
"reed-solomon-erasure",
"regex",
"rmp-serde",
"s3s",
"serde",
"serde_json",
"siphasher",
"thiserror",
"time",
"tokio",
"tokio-pipe",
"tokio-util",
"tracing",
"tracing-error",
@@ -1228,6 +1232,12 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
[[package]]
name = "siphasher"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
[[package]]
name = "slab"
version = "0.4.9"
@@ -1387,6 +1397,16 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-pipe"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784"
dependencies = [
"libc",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.11"

View File

@@ -30,6 +30,10 @@ path-absolutize = "3.1.1"
time.workspace = true
rmp-serde = "1.3.0"
tokio-util = "0.7.11"
s3s = "0.10.0"
crc32fast = "1.4.2"
siphasher = "1.0.1"
tokio-pipe = "0.2.12"
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }

View File

@@ -1,3 +1,4 @@
use anyhow::Error;
use bytes::Bytes;
use futures::pin_mut;
use futures::stream::{Stream, StreamExt};
@@ -6,13 +7,11 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use transform_stream::AsyncTryStream;
use crate::error::StdError;
pub type SyncBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + Sync + 'a>>;
pub struct ChunkedStream {
/// inner
inner: AsyncTryStream<Bytes, StdError, SyncBoxFuture<'static, Result<(), StdError>>>,
inner: AsyncTryStream<Bytes, Error, SyncBoxFuture<'static, Result<(), Error>>>,
remaining_length: usize,
}
@@ -20,10 +19,10 @@ pub struct ChunkedStream {
impl ChunkedStream {
pub fn new<S>(body: S, content_length: usize, chunk_size: usize, need_padding: bool) -> Self
where
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync + 'static,
S: Stream<Item = Result<Bytes, Error>> + Send + Sync + 'static,
{
let inner =
AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), StdError>>>::new(|mut y| {
AsyncTryStream::<_, _, SyncBoxFuture<'static, Result<(), Error>>>::new(|mut y| {
#[allow(clippy::shadow_same)] // necessary for `pin_mut!`
Box::pin(async move {
pin_mut!(body);
@@ -90,9 +89,9 @@ impl ChunkedStream {
mut body: Pin<&mut S>,
prev_bytes: Bytes,
data_size: usize,
) -> Option<Result<(Vec<Bytes>, Bytes), StdError>>
) -> Option<Result<(Vec<Bytes>, Bytes), Error>>
where
S: Stream<Item = Result<Bytes, StdError>> + Send + 'static,
S: Stream<Item = Result<Bytes, Error>> + Send + 'static,
{
let mut bytes_buffer = Vec::new();
@@ -186,10 +185,7 @@ impl ChunkedStream {
Some(Ok((bytes_buffer, remaining_bytes)))
}
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, StdError>>> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
let ans = Pin::new(&mut self.inner).poll_next(cx);
if let Poll::Ready(Some(Ok(ref bytes))) = ans {
self.remaining_length = self.remaining_length.saturating_sub(bytes.len());
@@ -203,7 +199,7 @@ impl ChunkedStream {
}
impl Stream for ChunkedStream {
type Item = Result<Bytes, StdError>;
type Item = Result<Bytes, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll(cx)

View File

@@ -1,6 +1,14 @@
use reed_solomon_erasure::{galois_8::ReedSolomon, Error};
use tokio::io::AsyncWriteExt;
struct Erasure {
use anyhow::{Error, Result};
use bytes::Bytes;
use futures::{Stream, StreamExt};
use reed_solomon_erasure::galois_8::ReedSolomon;
use tokio::io::AsyncWrite;
use crate::chunk_stream::ChunkedStream;
pub struct Erasure {
data_shards: usize,
parity_shards: usize,
encoder: ReedSolomon,
@@ -15,7 +23,50 @@ impl Erasure {
}
}
pub fn encode_data(&self, data: &[u8]) -> Result<Vec<Vec<u8>>, Error> {
pub async fn encode<S, W>(
&self,
body: S,
writers: &mut Vec<W>,
block_size: usize,
data_size: usize,
write_quorum: usize,
) -> Result<()>
where
S: Stream<Item = Result<Bytes, Error>> + Send + Sync + 'static,
W: AsyncWrite + Unpin,
{
let mut stream = ChunkedStream::new(body, data_size, block_size, true);
while let Some(result) = stream.next().await {
match result {
Ok(data) => {
let blocks = self.encode_data(data.as_ref())?;
let mut errs = Vec::new();
for (i, w) in writers.iter_mut().enumerate() {
match w.write_all(blocks[i].as_ref()).await {
Ok(_) => errs.push(None),
Err(e) => errs.push(Some(e)),
}
}
// TODO: reduceWriteQuorumErrs
}
Err(e) => return Err(e),
}
}
Ok(())
// loop {
// match rd.next().await {
// Some(res) => todo!(),
// None => todo!(),
// }
// }
}
pub fn encode_data(&self, data: &[u8]) -> Result<Vec<Vec<u8>>> {
let (shard_size, total_size) = self.need_size(data.len());
let mut data_buffer = vec![0u8; total_size];
@@ -43,8 +94,9 @@ impl Erasure {
Ok(shards)
}
pub fn decode_data(&self, shards: &mut Vec<Option<Vec<u8>>>) -> Result<(), Error> {
self.encoder.reconstruct(shards)
pub fn decode_data(&self, shards: &mut Vec<Option<Vec<u8>>>) -> Result<()> {
self.encoder.reconstruct(shards)?;
Ok(())
}
// 每个分片长度,所需要的总长度

View File

@@ -13,5 +13,4 @@ mod sets;
pub mod store;
mod store_api;
mod store_init;
mod stream;
mod utils;

View File

@@ -1,25 +1,33 @@
use std::sync::Arc;
use anyhow::Result;
use futures::StreamExt;
use futures::AsyncWrite;
use time::OffsetDateTime;
use tokio_pipe::{PipeRead, PipeWrite};
use uuid::Uuid;
use crate::{
disk::DiskStore,
disk::{self, DiskStore},
endpoint::PoolEndpoints,
format::FormatV3,
store_api::{MakeBucketOptions, ObjectOptions, PutObjReader, StorageAPI},
erasure::Erasure,
format::{DistributionAlgoVersion, FormatV3},
store_api::{FileInfo, MakeBucketOptions, ObjectOptions, PutObjReader, StorageAPI},
utils::hash,
};
#[derive(Debug)]
pub struct Sets {
pub id: Uuid,
// pub sets: Vec<Objects>,
pub disk_indexs: Vec<Vec<Option<DiskStore>>>, // [set_count_idx][set_drive_count_idx] = disk_idx
pub disk_set: Vec<Vec<Option<DiskStore>>>, // [set_count_idx][set_drive_count_idx] = disk_idx
pub pool_idx: usize,
pub endpoints: PoolEndpoints,
pub format: FormatV3,
pub partiy_count: usize,
pub set_count: usize,
pub set_drive_count: usize,
pub distribution_algo: DistributionAlgoVersion,
}
impl Sets {
@@ -33,7 +41,7 @@ impl Sets {
let set_count = fm.erasure.sets.len();
let set_drive_count = fm.erasure.sets[0].len();
let mut disk_indexs = Vec::with_capacity(set_count);
let mut disk_set = Vec::with_capacity(set_count);
for i in 0..set_count {
let mut set_drive = Vec::with_capacity(set_drive_count);
@@ -47,25 +55,40 @@ impl Sets {
}
}
disk_indexs.push(set_drive);
disk_set.push(set_drive);
}
let sets = Self {
id: fm.id.clone(),
// sets: todo!(),
disk_indexs,
disk_set,
pool_idx,
endpoints: endpoints.clone(),
format: fm.clone(),
partiy_count,
set_count,
set_drive_count,
distribution_algo: fm.erasure.distribution_algo.clone(),
};
Ok(sets)
}
pub fn get_disks(&self, set_idx: usize) -> Vec<Option<DiskStore>> {
self.disk_indexs[set_idx].clone()
self.disk_set[set_idx].clone()
}
pub fn get_disks_by_key(&self, key: &str) -> Vec<Option<DiskStore>> {
self.get_disks(self.get_hashed_set_index(key))
}
fn get_hashed_set_index(&self, input: &str) -> usize {
match self.distribution_algo {
DistributionAlgoVersion::V1 => hash::crc_hash(input, self.disk_set.len()),
DistributionAlgoVersion::V2 | DistributionAlgoVersion::V3 => {
hash::sip_hash(input, self.disk_set.len(), self.id.as_bytes())
}
}
}
}
@@ -92,7 +115,102 @@ impl StorageAPI for Sets {
data: &PutObjReader,
opts: &ObjectOptions,
) -> Result<()> {
// data.stream.next();
let disks = self.get_disks_by_key(object);
let mut parity_drives = self.partiy_count;
if opts.max_parity {
parity_drives = disks.len() / 2;
}
let data_drives = disks.len() - parity_drives;
let mut write_quorum = data_drives;
if data_drives == parity_drives {
write_quorum += 1
}
let mut fi = FileInfo::new(
[bucket, object].join("/").as_str(),
data_drives,
parity_drives,
);
fi.data_dir = Uuid::new_v4().to_string();
let parts_metadata = vec![fi.clone(); disks.len()];
let (shuffle_disks, shuffle_parts_metadata) =
shuffle_disks_and_parts_metadata(&disks, &parts_metadata, &fi);
let mut writers = Vec::with_capacity(disks.len());
for disk in disks.iter() {
let (mut r, mut w) = tokio_pipe::pipe()?;
writers.push(w);
}
let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks);
// erasure.encode(
// data.stream,
// &mut writers,
// fi.erasure.block_size,
// data.content_length,
// write_quorum,
// );
unimplemented!()
}
}
pub struct DiskWriter<'a> {
disk: &'a Option<DiskStore>,
writer: PipeWrite,
reader: PipeRead,
}
impl<'a> DiskWriter<'a> {
pub fn new(disk: &'a Option<DiskStore>) -> Result<Self> {
let (mut reader, mut writer) = tokio_pipe::pipe()?;
Ok(Self {
disk,
reader,
writer,
})
}
pub fn wirter(&self) -> impl AsyncWrite {}
}
// 打乱顺序
fn shuffle_disks_and_parts_metadata(
disks: &Vec<Option<DiskStore>>,
parts_metadata: &Vec<FileInfo>,
fi: &FileInfo,
) -> (Vec<Option<DiskStore>>, Vec<FileInfo>) {
let init = fi.mod_time == OffsetDateTime::UNIX_EPOCH;
let mut shuffled_disks = vec![None; disks.len()];
let mut shuffled_parts_metadata = vec![FileInfo::default(); parts_metadata.len()];
let distribution = &fi.erasure.distribution;
for (k, v) in disks.iter().enumerate() {
if v.is_none() {
continue;
}
if !init && !parts_metadata[k].is_valid() {
continue;
}
// if !init && fi.xlv1 != parts_metadata[k].xlv1 {
// continue;
// }
let block_idx = distribution[k];
shuffled_parts_metadata[block_idx - 1] = parts_metadata[k].clone();
shuffled_disks[block_idx - 1] = disks[k].clone();
}
(shuffled_disks, shuffled_parts_metadata)
}

View File

@@ -1,8 +1,8 @@
use std::{collections::HashMap, io::Cursor};
use std::collections::HashMap;
use anyhow::{Error, Result};
use tokio_util::io::ReaderStream;
use s3s::Body;
use uuid::Uuid;
use crate::{
@@ -13,9 +13,7 @@ use crate::{
peer::{PeerS3Client, S3PeerSys},
sets::Sets,
store_api::{MakeBucketOptions, ObjectOptions, PutObjReader, StorageAPI},
store_init,
stream::into_dyn,
utils,
store_init, utils,
};
#[derive(Debug)]
@@ -121,19 +119,19 @@ impl StorageAPI for ECStore {
let data = meta.marshal_msg()?;
let file_path = meta.save_file_path();
let stream = ReaderStream::new(Cursor::new(data));
// TODO: wrap hash reader
// let reader = PutObjReader::new(stream);
let content_len = data.len() as u64;
// self.put_object(
// RUSTFS_META_BUCKET,
// &file_path,
// &reader,
// &ObjectOptions { max_parity: true },
// )
// .await?;
let reader = PutObjReader::new(Body::from(data), content_len);
self.put_object(
RUSTFS_META_BUCKET,
&file_path,
&reader,
&ObjectOptions { max_parity: true },
)
.await?;
// TODO: toObjectErr

View File

@@ -1,24 +1,136 @@
use std::sync::Arc;
use std::{default, sync::Arc};
use anyhow::Result;
use bytes::Bytes;
use futures::Stream;
use s3s::Body;
use time::OffsetDateTime;
use crate::stream::DynByteStream;
pub const ERASURE_ALGORITHM: &str = "rs-vandermonde";
pub const BLOCK_SIZE_V2: u64 = 1048576; // 1M
#[derive(Debug, Clone)]
pub struct FileInfo {
pub erasure: ErasureInfo,
pub deleted: bool,
// DataDir of the file
pub data_dir: String,
pub mod_time: OffsetDateTime,
}
impl Default for FileInfo {
fn default() -> Self {
Self {
erasure: Default::default(),
deleted: Default::default(),
data_dir: Default::default(),
mod_time: OffsetDateTime::UNIX_EPOCH,
}
}
}
impl FileInfo {
pub fn new(object: &str, data_blocks: usize, parity_blocks: usize) -> Self {
let indexs = {
let cardinality = data_blocks + parity_blocks;
let mut nums = vec![0; cardinality];
let key_crc = crc32fast::hash(object.as_bytes());
let start = key_crc as usize % cardinality;
for i in 1..=cardinality {
nums[i - 1] = 1 + ((start + 1) % cardinality);
}
nums
};
Self {
erasure: ErasureInfo {
algorithm: ERASURE_ALGORITHM,
data_blocks: data_blocks,
parity_blocks: parity_blocks,
block_size: BLOCK_SIZE_V2,
distribution: indexs,
..Default::default()
},
..Default::default()
}
}
pub fn is_valid(&self) -> bool {
if self.deleted {
return true;
}
let data_blocks = self.erasure.data_blocks;
let parity_blocks = self.erasure.parity_blocks;
(data_blocks >= parity_blocks)
&& (data_blocks > 0)
&& (parity_blocks >= 0)
&& (self.erasure.index > 0
&& self.erasure.index <= data_blocks + parity_blocks
&& self.erasure.distribution.len() == (data_blocks + parity_blocks))
}
}
#[derive(Debug, Default, Clone)]
// ErasureInfo holds erasure coding and bitrot related information.
pub struct ErasureInfo {
// Algorithm is the String representation of erasure-coding-algorithm
pub algorithm: &'static str,
// DataBlocks is the number of data blocks for erasure-coding
pub data_blocks: usize,
// ParityBlocks is the number of parity blocks for erasure-coding
pub parity_blocks: usize,
// BlockSize is the size of one erasure-coded block
pub block_size: u64,
// Index is the index of the current disk
pub index: usize,
// Distribution is the distribution of the data and parity blocks
pub distribution: Vec<usize>,
// Checksums holds all bitrot checksums of all erasure encoded blocks
pub checksums: Vec<ChecksumInfo>,
}
#[derive(Debug, Default, Clone)]
// ChecksumInfo - carries checksums of individual scattered parts per disk.
pub struct ChecksumInfo {
pub part_number: usize,
pub algorithm: BitrotAlgorithm,
pub hash: Vec<u8>,
}
#[derive(Debug, Default, Clone)]
// BitrotAlgorithm specifies a algorithm used for bitrot protection.
pub enum BitrotAlgorithm {
// SHA256 represents the SHA-256 hash function
SHA256,
// HighwayHash256 represents the HighwayHash-256 hash function
HighwayHash256,
// HighwayHash256S represents the Streaming HighwayHash-256 hash function
#[default]
HighwayHash256S,
// BLAKE2b512 represents the BLAKE2b-512 hash function
BLAKE2b512,
}
pub struct MakeBucketOptions {
pub force_create: bool,
}
pub struct PutObjReader {
// pub stream: Box<dyn Stream<Item = Bytes>>,
pub stream: Body,
pub content_length: u64,
}
// impl PutObjReader {
// pub fn new<S: Stream>(stream: S) -> Self {
// PutObjReader { stream }
// }
// }
impl PutObjReader {
pub fn new(stream: Body, content_length: u64) -> Self {
PutObjReader {
stream,
content_length,
}
}
}
pub struct ObjectOptions {
// Use the maximum parity (N/2), used when saving server configuration files

View File

@@ -1,174 +0,0 @@
use anyhow::Error;
use tokio::io::AsyncRead;
use tokio_util::io::ReaderStream;
use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures::{pin_mut, Stream, StreamExt};
pub trait ByteStream: Stream {
fn remaining_length(&self) -> RemainingLength {
RemainingLength::unknown()
}
}
pub type DynByteStream =
Pin<Box<dyn ByteStream<Item = Result<Bytes, Error>> + Send + Sync + 'static>>;
pub struct RemainingLength {
lower: usize,
upper: Option<usize>,
}
impl RemainingLength {
/// Creates a new `RemainingLength` with the given lower and upper bounds.
///
/// # Panics
/// This function asserts that `lower <= upper`.
#[must_use]
pub fn new(lower: usize, upper: Option<usize>) -> Self {
if let Some(upper) = upper {
assert!(lower <= upper);
}
Self { lower, upper }
}
#[must_use]
pub fn unknown() -> Self {
Self {
lower: 0,
upper: None,
}
}
#[must_use]
pub fn new_exact(n: usize) -> Self {
Self {
lower: n,
upper: Some(n),
}
}
#[must_use]
pub fn exact(&self) -> Option<usize> {
self.upper.filter(|&upper| upper == self.lower)
}
}
impl fmt::Debug for RemainingLength {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(exact) = self.exact() {
return write!(f, "{exact}");
}
match self.upper {
Some(upper) => write!(f, "({}..={})", self.lower, upper),
None => write!(f, "({}..)", self.lower),
}
}
}
pub(crate) fn into_dyn<S, E>(s: S) -> DynByteStream
where
S: ByteStream<Item = Result<Bytes, E>> + Send + Sync + Unpin + 'static,
E: std::error::Error + Send + Sync + 'static,
{
Box::pin(Wrapper(s))
}
struct Wrapper<S>(S);
impl<S, E> Stream for Wrapper<S>
where
S: ByteStream<Item = Result<Bytes, E>> + Send + Sync + Unpin + 'static,
E: std::error::Error + Send + Sync + 'static,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = Pin::new(&mut self.0);
this.poll_next(cx).map_err(|e| Error::new(e))
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}
impl<S, E> ByteStream for Wrapper<S>
where
S: ByteStream<Item = Result<Bytes, E>> + Send + Sync + Unpin + 'static,
E: std::error::Error + Send + Sync + 'static,
{
fn remaining_length(&self) -> RemainingLength {
self.0.remaining_length()
}
}
// FIXME: unbounded memory allocation
pub(crate) async fn aggregate_unlimited<S, E>(stream: S) -> Result<Vec<Bytes>, E>
where
S: ByteStream<Item = Result<Bytes, E>>,
{
let mut vec = Vec::new();
pin_mut!(stream);
while let Some(result) = stream.next().await {
vec.push(result?);
}
Ok(vec)
}
pub(crate) struct VecByteStream {
queue: VecDeque<Bytes>,
remaining_bytes: usize,
}
impl VecByteStream {
pub fn new(v: Vec<Bytes>) -> Self {
let total = v
.iter()
.map(Bytes::len)
.try_fold(0, usize::checked_add)
.expect("length overflow");
Self {
queue: v.into(),
remaining_bytes: total,
}
}
pub fn exact_remaining_length(&self) -> usize {
self.remaining_bytes
}
}
impl Stream for VecByteStream {
type Item = Result<Bytes, Error>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);
match this.queue.pop_front() {
Some(b) => {
this.remaining_bytes -= b.len();
Poll::Ready(Some(Ok(b)))
}
None => Poll::Ready(None),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let cnt = self.queue.len();
(cnt, Some(cnt))
}
}
impl ByteStream for VecByteStream {
fn remaining_length(&self) -> RemainingLength {
RemainingLength::new_exact(self.remaining_bytes)
}
}
impl<R: AsyncRead> ByteStream for ReaderStream<R> {}

21
ecstore/src/utils/hash.rs Normal file
View File

@@ -0,0 +1,21 @@
use crc32fast::Hasher;
use siphasher::sip::SipHasher;
pub fn sip_hash(key: &str, cardinality: usize, id: &[u8; 16]) -> usize {
// 你的密钥必须是16字节
// 计算字符串的SipHash值
let result = SipHasher::new_with_key(id).hash(key.as_bytes());
result as usize % cardinality
}
pub fn crc_hash(key: &str, cardinality: usize) -> usize {
let mut hasher = Hasher::new(); // 创建一个新的哈希器
hasher.update(key.as_bytes()); // 更新哈希状态,添加数据
let checksum = hasher.finalize();
checksum as usize % cardinality
}

View File

@@ -1,3 +1,4 @@
pub mod hash;
pub mod net;
pub mod path;
pub mod string;