fix:reduce_write_quorum_errs

This commit is contained in:
weisd
2024-09-03 13:34:03 +08:00
parent ced17f6deb
commit 2935945585
8 changed files with 55 additions and 49 deletions

View File

@@ -182,7 +182,9 @@ impl LocalDisk {
self.move_to_trash(delete_path, recursive, immediate_purge).await?;
} else {
if delete_path.is_dir() {
debug!("delete_file remove_dir {:?}", &delete_path);
if let Err(err) = fs::remove_dir(&delete_path).await {
debug!("delete_file remove_dir err {:?} err: {:?}", &delete_path, err);
match err.kind() {
ErrorKind::NotFound => (),
// ErrorKind::DirectoryNotEmpty => (),
@@ -194,6 +196,7 @@ impl LocalDisk {
}
}
}
debug!("delete_file remove_dir done {:?}", &delete_path);
} else {
if let Err(err) = fs::remove_file(&delete_path).await {
match err.kind() {
@@ -483,6 +486,13 @@ impl DiskAPI for LocalDisk {
}
async fn create_file(&self, _origvolume: &str, volume: &str, path: &str, _file_size: usize) -> Result<FileWriter> {
let volpath = self.get_bucket_path(&volume)?;
// check exists
fs::metadata(&volpath).await.map_err(|e| match e.kind() {
ErrorKind::NotFound => Error::new(DiskError::VolumeNotFound),
_ => Error::new(e),
})?;
let fpath = self.get_object_path(volume, path)?;
debug!("CreateFile fpath: {:?}", fpath);
@@ -678,8 +688,6 @@ impl DiskAPI for LocalDisk {
}
}
warn!("get meta {:?}", &meta);
let mut skip_parent = dst_volume_path.clone();
if !dst_buf.is_empty() {
skip_parent = PathBuf::from(&dst_file_path.parent().unwrap_or(Path::new("/")));

View File

@@ -92,6 +92,8 @@ impl Erasure {
}
}
warn!("Erasure encode errs {:?}", errs);
let err_idx = reduce_write_quorum_errs(&errs, object_ignored_errs().as_slice(), write_quorum)?;
if errs[err_idx].is_some() {
let err = errs[err_idx].take().unwrap();

View File

@@ -10,6 +10,7 @@ mod peer;
mod quorum;
pub mod set_disk;
mod sets;
mod storage_class;
pub mod store;
pub mod store_api;
mod store_init;

View File

@@ -0,0 +1,28 @@
// use crate::error::{Error, Result};
// default_partiy_count 默认配置,根据磁盘总数分配校验磁盘数量
pub fn default_partiy_count(drive: usize) -> usize {
match drive {
1 => 0,
2 | 3 => 1,
4 | 5 => 2,
6 | 7 => 3,
_ => 4,
}
}
// Define the minimum number of parity drives required.
// const MIN_PARITY_DRIVES: usize = 0;
// // ValidateParity validates standard storage class parity.
// pub fn validate_parity(ss_parity: usize, set_drive_count: usize) -> Result<()> {
// // if ss_parity > 0 && ss_parity < MIN_PARITY_DRIVES {
// // return Err(Error::msg(format!("parity {} 应该大于等于 {}", ss_parity, MIN_PARITY_DRIVES)));
// // }
// if ss_parity > set_drive_count / 2 {
// return Err(Error::msg(format!("parity {} 应该小于等于 {}", ss_parity, set_drive_count / 2)));
// }
// Ok(())
// }

View File

@@ -6,6 +6,7 @@ use crate::{
error::{Error, Result},
peer::{PeerS3Client, S3PeerSys},
sets::Sets,
storage_class::default_partiy_count,
store_api::{
BucketInfo, BucketOptions, CompletePart, DeletedObject, GetObjectReader, HTTPRangeSpec, ListObjectsInfo,
ListObjectsV2Info, MakeBucketOptions, MultipartUploadResult, ObjectInfo, ObjectOptions, ObjectToDelete, PartInfo,
@@ -48,7 +49,9 @@ impl ECStore {
for (i, pool_eps) in endpoint_pools.as_ref().iter().enumerate() {
// TODO: read from config parseStorageClass
let partiy_count = store_init::default_partiy_count(pool_eps.drives_per_set);
let partiy_count = default_partiy_count(pool_eps.drives_per_set);
// validate_parity(partiy_count, pool_eps.drives_per_set)?;
let (disks, errs) = crate::store_init::init_disks(
&pool_eps.endpoints,

View File

@@ -175,15 +175,6 @@ fn check_format_erasure_value(format: &FormatV3) -> Result<()> {
Ok(())
}
pub fn default_partiy_count(drive: usize) -> usize {
match drive {
1 => 0,
2 | 3 => 1,
4 | 5 => 2,
6 | 7 => 3,
_ => 4,
}
}
// read_format_file_all 读取所有foramt.json
async fn read_format_file_all(disks: &[Option<DiskStore>], heal: bool) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(disks.len());

View File

@@ -39,38 +39,11 @@ impl<'a> AsyncWrite for AppendWriter<'a> {
let mut fut = Box::pin(self.async_write(buf));
debug!("AsyncWrite poll_write {}, buf:{}", self.disk.id(), buf.len());
// while let Poll::Ready(e) = fut.as_mut().poll(cx) {
// let a = match e {
// Ok(_) => {
// debug!("Ready ok {}", self.disk.id());
// Poll::Ready(Ok(buf.len()))
// }
// Err(e) => {
// debug!("Ready err {}", self.disk.id());
// Poll::Ready(Err(e))
// }
// };
// return a;
// }
// Poll::Pending
match fut.as_mut().poll(cx) {
Poll::Pending => {
debug!("Pending {}", self.disk.id());
Poll::Pending
}
Poll::Ready(e) => match e {
Ok(_) => {
debug!("Ready ok {}", self.disk.id());
Poll::Ready(Ok(buf.len()))
}
Err(e) => {
debug!("Ready err {}", self.disk.id());
Poll::Ready(Err(e))
}
},
let mut fut = self.get_mut().async_write(buf);
match futures::future::poll_fn(|cx| fut.as_mut().poll(cx)).start(cx) {
Ready(Ok(n)) => Ready(Ok(n)),
Ready(Err(e)) => Ready(Err(e)),
Pending => Pending,
}
}

View File

@@ -3,8 +3,8 @@
mkdir -p ./target/volume/test
mkdir -p ./target/volume/test{0..4}
DATA_DIR="./target/volume/test"
# DATA_DIR="./target/volume/test{0...4}"
# DATA_DIR="./target/volume/test"
DATA_DIR="./target/volume/test{0...4}"
if [ -n "$1" ]; then
DATA_DIR="$1"
@@ -14,11 +14,11 @@ if [ -z "$RUST_LOG" ]; then
export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug"
fi
cargo run "$DATA_DIR"
# cargo run "$DATA_DIR"
# -- --access-key AKEXAMPLERUSTFS \
# --secret-key SKEXAMPLERUSTFS \
# --address 0.0.0.0:9010 \
# --domain-name 127.0.0.1:9010 \
"$DATA_DIR"
# "$DATA_DIR"
# cargo run "$DATA_DIR"
cargo run "$DATA_DIR"