ec use AsyncRead

This commit is contained in:
weisd
2024-11-04 17:43:05 +08:00
parent 69e19fe8ab
commit e922c70c99
14 changed files with 443 additions and 82 deletions

2
Cargo.lock generated
View File

@@ -1851,6 +1851,7 @@ dependencies = [
"time",
"tokio",
"tokio-stream",
"tokio-util",
"tonic",
"tonic-build",
"tonic-reflection",
@@ -2333,6 +2334,7 @@ checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a"
dependencies = [
"bytes",
"futures-core",
"futures-io",
"futures-sink",
"pin-project-lite",
"tokio",

View File

@@ -39,7 +39,7 @@ nix = { version = "0.29.0", features = ["fs"] }
path-absolutize = "3.1.1"
protos.workspace = true
rmp-serde = "1.3.0"
tokio-util = { version = "0.7.12", features = ["io"] }
tokio-util = { version = "0.7.12", features = ["io", "compat"] }
crc32fast = "1.4.2"
siphasher = "1.0.1"
base64-simd = "0.8.0"

View File

@@ -1,5 +1,5 @@
use crate::{
disk::{error::DiskError, DiskStore, FileReader, FileWriter, Reader},
disk::{error::DiskError, DiskAPI, DiskStore, FileReader, FileWriter, Reader},
erasure::{ReadAt, Writer},
error::{Error, Result},
store_api::BitrotAlgorithm,
@@ -612,7 +612,7 @@ mod test {
use crate::{
bitrot::{new_bitrot_writer, BITROT_ALGORITHMS},
disk::{endpoint::Endpoint, error::DiskError, new_disk, DiskOption},
disk::{endpoint::Endpoint, error::DiskError, new_disk, DiskAPI, DiskOption},
error::{Error, Result},
store_api::BitrotAlgorithm,
};

View File

@@ -10,7 +10,7 @@ use tokio::{
};
use crate::{
disk::{DiskStore, MetaCacheEntries, MetaCacheEntry, WalkDirOptions},
disk::{DiskAPI, DiskStore, MetaCacheEntries, MetaCacheEntry, WalkDirOptions},
error::{Error, Result},
};

View File

@@ -21,9 +21,11 @@ use crate::{
};
use endpoint::Endpoint;
use futures::StreamExt;
use local::LocalDisk;
use protos::proto_gen::node_service::{
node_service_client::NodeServiceClient, ReadAtRequest, ReadAtResponse, WriteRequest, WriteResponse,
};
use remote::RemoteDisk;
use serde::{Deserialize, Serialize};
use std::{
any::Any,
@@ -47,15 +49,304 @@ use tracing::info;
use tracing::warn;
use uuid::Uuid;
pub type DiskStore = Arc<Box<dyn DiskAPI>>;
pub type DiskStore = Arc<Disk>;
#[derive(Debug)]
pub enum Disk {
Local(LocalDisk),
Remote(RemoteDisk),
}
#[async_trait::async_trait]
impl DiskAPI for Disk {
fn to_string(&self) -> String {
match self {
Disk::Local(local_disk) => local_disk.to_string(),
Disk::Remote(remote_disk) => remote_disk.to_string(),
}
}
fn is_local(&self) -> bool {
match self {
Disk::Local(local_disk) => local_disk.is_local(),
Disk::Remote(remote_disk) => remote_disk.is_local(),
}
}
fn host_name(&self) -> String {
match self {
Disk::Local(local_disk) => local_disk.host_name(),
Disk::Remote(remote_disk) => remote_disk.host_name(),
}
}
async fn is_online(&self) -> bool {
match self {
Disk::Local(local_disk) => local_disk.is_online().await,
Disk::Remote(remote_disk) => remote_disk.is_online().await,
}
}
fn endpoint(&self) -> Endpoint {
match self {
Disk::Local(local_disk) => local_disk.endpoint(),
Disk::Remote(remote_disk) => remote_disk.endpoint(),
}
}
async fn close(&self) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.close().await,
Disk::Remote(remote_disk) => remote_disk.close().await,
}
}
fn path(&self) -> PathBuf {
match self {
Disk::Local(local_disk) => local_disk.path(),
Disk::Remote(remote_disk) => remote_disk.path(),
}
}
fn get_disk_location(&self) -> DiskLocation {
match self {
Disk::Local(local_disk) => local_disk.get_disk_location(),
Disk::Remote(remote_disk) => remote_disk.get_disk_location(),
}
}
async fn get_disk_id(&self) -> Result<Option<Uuid>> {
match self {
Disk::Local(local_disk) => local_disk.get_disk_id().await,
Disk::Remote(remote_disk) => remote_disk.get_disk_id().await,
}
}
async fn set_disk_id(&self, id: Option<Uuid>) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.set_disk_id(id).await,
Disk::Remote(remote_disk) => remote_disk.set_disk_id(id).await,
}
}
async fn read_all(&self, volume: &str, path: &str) -> Result<Vec<u8>> {
match self {
Disk::Local(local_disk) => local_disk.read_all(volume, path).await,
Disk::Remote(remote_disk) => remote_disk.read_all(volume, path).await,
}
}
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.write_all(volume, path, data).await,
Disk::Remote(remote_disk) => remote_disk.write_all(volume, path, data).await,
}
}
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.delete(volume, path, opt).await,
Disk::Remote(remote_disk) => remote_disk.delete(volume, path, opt).await,
}
}
async fn verify_file(&self, volume: &str, path: &str, fi: &FileInfo) -> Result<CheckPartsResp> {
match self {
Disk::Local(local_disk) => local_disk.verify_file(volume, path, fi).await,
Disk::Remote(remote_disk) => remote_disk.verify_file(volume, path, fi).await,
}
}
async fn check_parts(&self, volume: &str, path: &str, fi: &FileInfo) -> Result<CheckPartsResp> {
match self {
Disk::Local(local_disk) => local_disk.check_parts(volume, path, fi).await,
Disk::Remote(remote_disk) => remote_disk.check_parts(volume, path, fi).await,
}
}
async fn rename_part(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str, meta: Vec<u8>) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.rename_part(src_volume, src_path, dst_volume, dst_path, meta).await,
Disk::Remote(remote_disk) => {
remote_disk
.rename_part(src_volume, src_path, dst_volume, dst_path, meta)
.await
}
}
}
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.rename_file(src_volume, src_path, dst_volume, dst_path).await,
Disk::Remote(remote_disk) => remote_disk.rename_file(src_volume, src_path, dst_volume, dst_path).await,
}
}
async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, _file_size: usize) -> Result<FileWriter> {
match self {
Disk::Local(local_disk) => local_disk.create_file(_origvolume, volume, path, _file_size).await,
Disk::Remote(remote_disk) => remote_disk.create_file(_origvolume, volume, path, _file_size).await,
}
}
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
match self {
Disk::Local(local_disk) => local_disk.append_file(volume, path).await,
Disk::Remote(remote_disk) => remote_disk.append_file(volume, path).await,
}
}
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
match self {
Disk::Local(local_disk) => local_disk.read_file(volume, path).await,
Disk::Remote(remote_disk) => remote_disk.read_file(volume, path).await,
}
}
async fn list_dir(&self, _origvolume: &str, volume: &str, _dir_path: &str, _count: i32) -> Result<Vec<String>> {
match self {
Disk::Local(local_disk) => local_disk.list_dir(_origvolume, volume, _dir_path, _count).await,
Disk::Remote(remote_disk) => remote_disk.list_dir(_origvolume, volume, _dir_path, _count).await,
}
}
async fn walk_dir(&self, opts: WalkDirOptions) -> Result<Vec<MetaCacheEntry>> {
match self {
Disk::Local(local_disk) => local_disk.walk_dir(opts).await,
Disk::Remote(remote_disk) => remote_disk.walk_dir(opts).await,
}
}
async fn rename_data(
&self,
src_volume: &str,
src_path: &str,
fi: FileInfo,
dst_volume: &str,
dst_path: &str,
) -> Result<RenameDataResp> {
match self {
Disk::Local(local_disk) => local_disk.rename_data(src_volume, src_path, fi, dst_volume, dst_path).await,
Disk::Remote(remote_disk) => remote_disk.rename_data(src_volume, src_path, fi, dst_volume, dst_path).await,
}
}
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.make_volumes(volumes).await,
Disk::Remote(remote_disk) => remote_disk.make_volumes(volumes).await,
}
}
async fn make_volume(&self, volume: &str) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.make_volume(volume).await,
Disk::Remote(remote_disk) => remote_disk.make_volume(volume).await,
}
}
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>> {
match self {
Disk::Local(local_disk) => local_disk.list_volumes().await,
Disk::Remote(remote_disk) => remote_disk.list_volumes().await,
}
}
async fn stat_volume(&self, volume: &str) -> Result<VolumeInfo> {
match self {
Disk::Local(local_disk) => local_disk.stat_volume(volume).await,
Disk::Remote(remote_disk) => remote_disk.stat_volume(volume).await,
}
}
async fn delete_paths(&self, volume: &str, paths: &[&str]) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.delete_paths(volume, paths).await,
Disk::Remote(remote_disk) => remote_disk.delete_paths(volume, paths).await,
}
}
async fn update_metadata(&self, volume: &str, path: &str, fi: FileInfo, opts: &UpdateMetadataOpts) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.update_metadata(volume, path, fi, opts).await,
Disk::Remote(remote_disk) => remote_disk.update_metadata(volume, path, fi, opts).await,
}
}
async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.write_metadata(_org_volume, volume, path, fi).await,
Disk::Remote(remote_disk) => remote_disk.write_metadata(_org_volume, volume, path, fi).await,
}
}
async fn read_version(
&self,
_org_volume: &str,
volume: &str,
path: &str,
version_id: &str,
opts: &ReadOptions,
) -> Result<FileInfo> {
match self {
Disk::Local(local_disk) => local_disk.read_version(_org_volume, volume, path, version_id, opts).await,
Disk::Remote(remote_disk) => remote_disk.read_version(_org_volume, volume, path, version_id, opts).await,
}
}
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
match self {
Disk::Local(local_disk) => local_disk.read_xl(volume, path, read_data).await,
Disk::Remote(remote_disk) => remote_disk.read_xl(volume, path, read_data).await,
}
}
async fn delete_version(
&self,
volume: &str,
path: &str,
fi: FileInfo,
force_del_marker: bool,
opts: DeleteOptions,
) -> Result<RawFileInfo> {
match self {
Disk::Local(local_disk) => local_disk.delete_version(volume, path, fi, force_del_marker, opts).await,
Disk::Remote(remote_disk) => remote_disk.delete_version(volume, path, fi, force_del_marker, opts).await,
}
}
async fn delete_versions(
&self,
volume: &str,
versions: Vec<FileInfoVersions>,
opts: DeleteOptions,
) -> Result<Vec<Option<Error>>> {
match self {
Disk::Local(local_disk) => local_disk.delete_versions(volume, versions, opts).await,
Disk::Remote(remote_disk) => remote_disk.delete_versions(volume, versions, opts).await,
}
}
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>> {
match self {
Disk::Local(local_disk) => local_disk.read_multiple(req).await,
Disk::Remote(remote_disk) => remote_disk.read_multiple(req).await,
}
}
async fn delete_volume(&self, volume: &str) -> Result<()> {
match self {
Disk::Local(local_disk) => local_disk.delete_volume(volume).await,
Disk::Remote(remote_disk) => remote_disk.delete_volume(volume).await,
}
}
async fn disk_info(&self, opts: &DiskInfoOptions) -> Result<DiskInfo> {
match self {
Disk::Local(local_disk) => local_disk.disk_info(opts).await,
Disk::Remote(remote_disk) => remote_disk.disk_info(opts).await,
}
}
}
pub async fn new_disk(ep: &endpoint::Endpoint, opt: &DiskOption) -> Result<DiskStore> {
if ep.is_local {
let s = local::LocalDisk::new(ep, opt.cleanup).await?;
Ok(Arc::new(Box::new(s)))
Ok(Arc::new(Disk::Local(s)))
} else {
let remote_disk = remote::RemoteDisk::new(ep, opt).await?;
Ok(Arc::new(Box::new(remote_disk)))
Ok(Arc::new(Disk::Remote(remote_disk)))
}
}

View File

@@ -1,4 +1,4 @@
use crate::bitrot::{close_bitrot_writers, BitrotReader, BitrotWriter};
use crate::bitrot::{BitrotReader, BitrotWriter};
use crate::error::{Error, Result, StdError};
use crate::quorum::{object_op_ignored_errs, reduce_write_quorum_errs};
use bytes::Bytes;
@@ -7,15 +7,16 @@ use futures::{pin_mut, Stream, StreamExt};
use reed_solomon_erasure::galois_8::ReedSolomon;
use std::any::Any;
use std::fmt::Debug;
use tokio::io::AsyncWriteExt;
use std::io::ErrorKind;
use tokio::io::DuplexStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::debug;
use tracing::warn;
// use tracing::debug;
use uuid::Uuid;
// use reader::reader::ChunkedStream;
use crate::chunk_stream::ChunkedStream;
// use crate::chunk_stream::ChunkedStream;
use crate::disk::error::DiskError;
pub struct Erasure {
@@ -24,6 +25,7 @@ pub struct Erasure {
encoder: Option<ReedSolomon>,
pub block_size: usize,
_id: Uuid,
buf: Vec<u8>,
}
impl Erasure {
@@ -43,12 +45,13 @@ impl Erasure {
block_size,
encoder,
_id: Uuid::new_v4(),
buf: vec![0u8; block_size],
}
}
#[tracing::instrument(level = "debug", skip(self, body, writers))]
pub async fn encode<S>(
&self,
&mut self,
body: S,
writers: &mut [Option<BitrotWriter>],
// block_size: usize,
@@ -58,76 +61,138 @@ impl Erasure {
where
S: Stream<Item = Result<Bytes, StdError>> + Send + Sync,
{
// let stream = ChunkedStream::new(body, self.block_size);
let stream = ChunkedStream::new(body, total_size, self.block_size, false);
pin_mut!(body);
let mut reader = tokio_util::io::StreamReader::new(
body.map(|f| f.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))),
);
let mut total: usize = 0;
// let mut idx = 0;
pin_mut!(stream);
// warn!("encode start...");
loop {
match stream.next().await {
Some(result) => match result {
Ok(data) => {
total += data.len();
// EOF
if data.is_empty() {
break;
}
// idx += 1;
// warn!("encode {} get data {:?}", data.len(), data.to_vec());
let blocks = self.encode_data(data.as_ref())?;
// warn!(
// "encode shard size: {}/{} from block_size {}, total_size {} ",
// blocks[0].len(),
// blocks.len(),
// data.len(),
// total_size
// );
let mut errs = Vec::new();
for (i, w_op) in writers.iter_mut().enumerate() {
if let Some(w) = w_op {
match w.write(blocks[i].as_ref()).await {
Ok(_) => errs.push(None),
Err(e) => errs.push(Some(e)),
}
} else {
errs.push(Some(Error::new(DiskError::DiskNotFound)));
}
}
let none_count = errs.iter().filter(|&x| x.is_none()).count();
if none_count >= write_quorum {
continue;
}
if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) {
warn!("Erasure encode errs {:?}", &errs);
return Err(err);
}
}
Err(e) => {
warn!("poll result err {:?}", &e);
return Err(Error::msg(e.to_string()));
}
},
None => {
// warn!("poll empty result");
break;
let new_len = {
let remain = total_size - total;
if remain > self.block_size {
self.block_size
} else {
remain
}
};
if new_len == 0 {
break;
}
self.buf.resize(new_len, 0u8);
match reader.read_exact(&mut self.buf).await {
Ok(res) => res,
Err(e) => {
if let ErrorKind::UnexpectedEof = e.kind() {
break;
} else {
return Err(Error::new(e));
}
}
};
total += self.buf.len();
let blocks = self.encode_data(&self.buf)?;
let mut errs = Vec::new();
for (i, w_op) in writers.iter_mut().enumerate() {
if let Some(w) = w_op {
match w.write(blocks[i].as_ref()).await {
Ok(_) => errs.push(None),
Err(e) => errs.push(Some(e)),
}
} else {
errs.push(Some(Error::new(DiskError::DiskNotFound)));
}
}
let none_count = errs.iter().filter(|&x| x.is_none()).count();
if none_count >= write_quorum {
continue;
}
if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) {
warn!("Erasure encode errs {:?}", &errs);
return Err(err);
}
}
let _ = close_bitrot_writers(writers).await?;
Ok(total)
// // let stream = ChunkedStream::new(body, self.block_size);
// let stream = ChunkedStream::new(body, total_size, self.block_size, false);
// let mut total: usize = 0;
// // let mut idx = 0;
// pin_mut!(stream);
// // warn!("encode start...");
// loop {
// match stream.next().await {
// Some(result) => match result {
// Ok(data) => {
// total += data.len();
// // EOF
// if data.is_empty() {
// break;
// }
// // idx += 1;
// // warn!("encode {} get data {:?}", data.len(), data.to_vec());
// let blocks = self.encode_data(data.as_ref())?;
// // warn!(
// // "encode shard size: {}/{} from block_size {}, total_size {} ",
// // blocks[0].len(),
// // blocks.len(),
// // data.len(),
// // total_size
// // );
// let mut errs = Vec::new();
// for (i, w_op) in writers.iter_mut().enumerate() {
// if let Some(w) = w_op {
// match w.write(blocks[i].as_ref()).await {
// Ok(_) => errs.push(None),
// Err(e) => errs.push(Some(e)),
// }
// } else {
// errs.push(Some(Error::new(DiskError::DiskNotFound)));
// }
// }
// let none_count = errs.iter().filter(|&x| x.is_none()).count();
// if none_count >= write_quorum {
// continue;
// }
// if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) {
// warn!("Erasure encode errs {:?}", &errs);
// return Err(err);
// }
// }
// Err(e) => {
// warn!("poll result err {:?}", &e);
// return Err(Error::msg(e.to_string()));
// }
// },
// None => {
// // warn!("poll empty result");
// break;
// }
// }
// }
// let _ = close_bitrot_writers(writers).await?;
// Ok(total)
}
pub async fn decode(

View File

@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use crate::{
disk::{DeleteOptions, DiskStore, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
disk::{DeleteOptions, DiskAPI, DiskStore, BUCKET_META_PREFIX, RUSTFS_META_BUCKET},
error::{Error, Result},
heal::heal_ops::HEALING_TRACKER_FILENAME,
new_object_layer_fn,

View File

@@ -7,6 +7,7 @@ use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tonic::Request;
use tracing::warn;
use crate::disk::DiskAPI;
use crate::store::all_local_disk;
use crate::{
disk::{self, error::DiskError, VolumeInfo},

View File

@@ -11,7 +11,7 @@ use uuid::Uuid;
use crate::{
disk::{
format::{DistributionAlgoVersion, FormatV3},
DiskStore,
DiskAPI, DiskStore,
},
endpoints::PoolEndpoints,
error::{Error, Result},

View File

@@ -5,7 +5,7 @@ use crate::bucket::metadata_sys::{self, init_bucket_metadata_sys, set_bucket_met
use crate::bucket::utils::{check_valid_bucket_name, check_valid_bucket_name_strict, is_meta_bucketname};
use crate::config::{self, storageclass, GLOBAL_ConfigSys};
use crate::disk::endpoint::EndpointType;
use crate::disk::{DiskInfo, DiskInfoOptions, MetaCacheEntry};
use crate::disk::{DiskAPI, DiskInfo, DiskInfoOptions, MetaCacheEntry};
use crate::global::{
is_dist_erasure, is_erasure_sd, set_global_deployment_id, set_object_layer, DISK_ASSUME_UNKNOWN_SIZE, DISK_FILL_FRACTION,
DISK_MIN_INODES, DISK_RESERVE_FRACTION, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES,

View File

@@ -1,4 +1,5 @@
use crate::config::{storageclass, KVS};
use crate::disk::DiskAPI;
use crate::{
disk::{
error::DiskError,

View File

@@ -35,6 +35,7 @@ s3s.workspace = true
serde_json.workspace = true
tracing.workspace = true
time = { workspace = true, features = ["parsing", "formatting"] }
tokio-util = { version = "0.7.12", features = ["io", "compat"] }
tokio = { workspace = true, features = [
"rt-multi-thread",
"macros",

View File

@@ -2,8 +2,8 @@ use std::{error::Error, io::ErrorKind, pin::Pin};
use ecstore::{
disk::{
DeleteOptions, DiskInfoOptions, DiskStore, FileInfoVersions, ReadMultipleReq, ReadOptions, Reader, UpdateMetadataOpts,
WalkDirOptions,
DeleteOptions, DiskAPI, DiskInfoOptions, DiskStore, FileInfoVersions, ReadMultipleReq, ReadOptions, Reader,
UpdateMetadataOpts, WalkDirOptions,
},
erasure::{ReadAt, Writer},
peer::{LocalPeerS3Client, PeerS3Client},

View File

@@ -6,9 +6,9 @@ mkdir -p ./target/volume/test
mkdir -p ./target/volume/test{0..4}
if [ -z "$RUST_LOG" ]; then
export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug,reader=debug"
fi
# if [ -z "$RUST_LOG" ]; then
# export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug,reader=debug"
# fi
# export RUSTFS_ERASURE_SET_DRIVE_COUNT=5