diff --git a/Cargo.lock b/Cargo.lock index 58472627..704d1422 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -329,7 +329,6 @@ dependencies = [ "thiserror", "time", "tokio", - "tokio-pipe", "tokio-util", "tracing", "tracing-error", @@ -1397,16 +1396,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-pipe" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784" -dependencies = [ - "libc", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.11" diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index fd332b50..792dfd48 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -33,7 +33,7 @@ tokio-util = { version = "0.7.11", features = ["io"] } s3s = "0.10.0" crc32fast = "1.4.2" siphasher = "1.0.1" -tokio-pipe = "0.2.12" + [dev-dependencies] tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 63e97f3d..b05e21e1 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -4,7 +4,6 @@ use anyhow::Result; use futures::AsyncWrite; use time::OffsetDateTime; -use tokio_pipe::{PipeRead, PipeWrite}; use uuid::Uuid; use crate::{ @@ -108,13 +107,7 @@ impl StorageAPI for Sets { unimplemented!() } - async fn put_object( - &self, - bucket: &str, - object: &str, - data: &PutObjReader, - opts: &ObjectOptions, - ) -> Result<()> { + async fn put_object(&self, bucket: &str, object: &str, data: &PutObjReader, opts: &ObjectOptions) -> Result<()> { let disks = self.get_disks_by_key(object); let mut parity_drives = self.partiy_count; @@ -128,25 +121,20 @@ impl StorageAPI for Sets { write_quorum += 1 } - let mut fi = FileInfo::new( - [bucket, object].join("/").as_str(), - data_drives, - parity_drives, - ); + let mut fi = FileInfo::new([bucket, object].join("/").as_str(), data_drives, parity_drives); fi.data_dir = Uuid::new_v4().to_string(); let parts_metadata = vec![fi.clone(); disks.len()]; - let (shuffle_disks, shuffle_parts_metadata) = - shuffle_disks_and_parts_metadata(&disks, &parts_metadata, &fi); + let (shuffle_disks, shuffle_parts_metadata) = shuffle_disks_and_parts_metadata(&disks, &parts_metadata, &fi); let mut writers = Vec::with_capacity(disks.len()); for disk in disks.iter() { - let (mut r, mut w) = tokio_pipe::pipe()?; + let (reader, writer) = tokio::io::duplex(fi.erasure.block_size); - writers.push(w); + writers.push(writer); } let erasure = Erasure::new(fi.erasure.data_blocks, fi.erasure.parity_blocks); @@ -163,25 +151,6 @@ impl StorageAPI for Sets { } } -pub struct DiskWriter<'a> { - disk: &'a Option, - writer: PipeWrite, - reader: PipeRead, -} - -impl<'a> DiskWriter<'a> { - pub fn new(disk: &'a Option) -> Result { - let (mut reader, mut writer) = tokio_pipe::pipe()?; - Ok(Self { - disk, - reader, - writer, - }) - } - - pub fn wirter(&self) -> impl AsyncWrite {} -} - // 打乱顺序 fn shuffle_disks_and_parts_metadata( disks: &Vec>, diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 3a43876c..c3a2d845 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -7,7 +7,7 @@ use s3s::Body; use time::OffsetDateTime; pub const ERASURE_ALGORITHM: &str = "rs-vandermonde"; -pub const BLOCK_SIZE_V2: u64 = 1048576; // 1M +pub const BLOCK_SIZE_V2: usize = 1048576; // 1M #[derive(Debug, Clone)] pub struct FileInfo { @@ -83,7 +83,7 @@ pub struct ErasureInfo { // ParityBlocks is the number of parity blocks for erasure-coding pub parity_blocks: usize, // BlockSize is the size of one erasure-coded block - pub block_size: u64, + pub block_size: usize, // Index is the index of the current disk pub index: usize, // Distribution is the distribution of the data and parity blocks @@ -125,10 +125,7 @@ pub struct PutObjReader { impl PutObjReader { pub fn new(stream: Body, content_length: u64) -> Self { - PutObjReader { - stream, - content_length, - } + PutObjReader { stream, content_length } } } @@ -141,11 +138,5 @@ pub struct ObjectOptions { pub trait StorageAPI { async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>; - async fn put_object( - &self, - bucket: &str, - object: &str, - data: &PutObjReader, - opts: &ObjectOptions, - ) -> Result<()>; + async fn put_object(&self, bucket: &str, object: &str, data: &PutObjReader, opts: &ObjectOptions) -> Result<()>; }