This commit is contained in:
weisd
2024-07-02 15:07:40 +08:00
parent 9db300035a
commit e013d33691
4 changed files with 10 additions and 61 deletions

11
Cargo.lock generated
View File

@@ -329,7 +329,6 @@ dependencies = [
"thiserror",
"time",
"tokio",
"tokio-pipe",
"tokio-util",
"tracing",
"tracing-error",
@@ -1397,16 +1396,6 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-pipe"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784"
dependencies = [
"libc",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.11"

View File

@@ -33,7 +33,7 @@ tokio-util = { version = "0.7.11", features = ["io"] }
s3s = "0.10.0"
crc32fast = "1.4.2"
siphasher = "1.0.1"
tokio-pipe = "0.2.12"
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }

View File

@@ -4,7 +4,6 @@ use anyhow::Result;
use futures::AsyncWrite;
use time::OffsetDateTime;
use tokio_pipe::{PipeRead, PipeWrite};
use uuid::Uuid;
use crate::{
@@ -108,13 +107,7 @@ impl StorageAPI for Sets {
unimplemented!()
}
async fn put_object(
&self,
bucket: &str,
object: &str,
data: &PutObjReader,
opts: &ObjectOptions,
) -> Result<()> {
async fn put_object(&self, bucket: &str, object: &str, data: &PutObjReader, opts: &ObjectOptions) -> Result<()> {
let disks = self.get_disks_by_key(object);
let mut parity_drives = self.partiy_count;
@@ -128,25 +121,20 @@ impl StorageAPI for Sets {
write_quorum += 1
}
let mut fi = FileInfo::new(
[bucket, object].join("/").as_str(),
data_drives,
parity_drives,
);
let mut fi = FileInfo::new([bucket, object].join("/").as_str(), data_drives, parity_drives);
fi.data_dir = Uuid::new_v4().to_string();
let parts_metadata = vec![fi.clone(); disks.len()];
let (shuffle_disks, shuffle_parts_metadata) =
shuffle_disks_and_parts_metadata(&disks, &parts_metadata, &fi);
let (shuffle_disks, shuffle_parts_metadata) = shuffle_disks_and_parts_metadata(&disks, &parts_metadata, &fi);
let mut writers = Vec::with_capacity(disks.len());
for disk in disks.iter() {
let (mut r, mut w) = tokio_pipe::pipe()?;
let (reader, writer) = tokio::io::duplex(fi.erasure.block_size);
writers.push(w);
writers.push(writer);
}
let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks);
@@ -163,25 +151,6 @@ impl StorageAPI for Sets {
}
}
pub struct DiskWriter<'a> {
disk: &'a Option<DiskStore>,
writer: PipeWrite,
reader: PipeRead,
}
impl<'a> DiskWriter<'a> {
pub fn new(disk: &'a Option<DiskStore>) -> Result<Self> {
let (mut reader, mut writer) = tokio_pipe::pipe()?;
Ok(Self {
disk,
reader,
writer,
})
}
pub fn wirter(&self) -> impl AsyncWrite {}
}
// 打乱顺序
fn shuffle_disks_and_parts_metadata(
disks: &Vec<Option<DiskStore>>,

View File

@@ -7,7 +7,7 @@ use s3s::Body;
use time::OffsetDateTime;
pub const ERASURE_ALGORITHM: &str = "rs-vandermonde";
pub const BLOCK_SIZE_V2: u64 = 1048576; // 1M
pub const BLOCK_SIZE_V2: usize = 1048576; // 1M
#[derive(Debug, Clone)]
pub struct FileInfo {
@@ -83,7 +83,7 @@ pub struct ErasureInfo {
// ParityBlocks is the number of parity blocks for erasure-coding
pub parity_blocks: usize,
// BlockSize is the size of one erasure-coded block
pub block_size: u64,
pub block_size: usize,
// Index is the index of the current disk
pub index: usize,
// Distribution is the distribution of the data and parity blocks
@@ -125,10 +125,7 @@ pub struct PutObjReader {
impl PutObjReader {
pub fn new(stream: Body, content_length: u64) -> Self {
PutObjReader {
stream,
content_length,
}
PutObjReader { stream, content_length }
}
}
@@ -141,11 +138,5 @@ pub struct ObjectOptions {
pub trait StorageAPI {
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
async fn put_object(
&self,
bucket: &str,
object: &str,
data: &PutObjReader,
opts: &ObjectOptions,
) -> Result<()>;
async fn put_object(&self, bucket: &str, object: &str, data: &PutObjReader, opts: &ObjectOptions) -> Result<()>;
}