fix quorum err

This commit is contained in:
weisd
2024-09-24 10:39:12 +08:00
parent 29100d5250
commit dbb6980e96
11 changed files with 156 additions and 141 deletions

1
.gitignore vendored
View File

@@ -2,3 +2,4 @@
.DS_Store
.idea
.vscode
/test

View File

@@ -100,9 +100,6 @@ pub enum DiskError {
#[error("more data was sent than what was advertised")]
MoreData,
#[error("other io err {0}")]
IoError(io::Error),
}
impl DiskError {
@@ -172,95 +169,89 @@ impl CheckErrorFn for DiskError {
}
}
pub fn clone_err(err: &Error) -> Error {
if let Some(e) = err.downcast_ref::<DiskError>() {
match e {
DiskError::MaxVersionsExceeded => Error::new(DiskError::MaxVersionsExceeded),
DiskError::Unexpected => Error::new(DiskError::Unexpected),
DiskError::CorruptedFormat => Error::new(DiskError::CorruptedFormat),
DiskError::CorruptedBackend => Error::new(DiskError::CorruptedBackend),
DiskError::UnformattedDisk => Error::new(DiskError::UnformattedDisk),
DiskError::InconsistentDisk => Error::new(DiskError::InconsistentDisk),
DiskError::UnsupportedDisk => Error::new(DiskError::UnsupportedDisk),
DiskError::DiskFull => Error::new(DiskError::DiskFull),
DiskError::DiskNotDir => Error::new(DiskError::DiskNotDir),
DiskError::DiskNotFound => Error::new(DiskError::DiskNotFound),
DiskError::DiskOngoingReq => Error::new(DiskError::DiskOngoingReq),
DiskError::DriveIsRoot => Error::new(DiskError::DriveIsRoot),
DiskError::FaultyRemoteDisk => Error::new(DiskError::FaultyRemoteDisk),
DiskError::FaultyDisk => Error::new(DiskError::FaultyDisk),
DiskError::DiskAccessDenied => Error::new(DiskError::DiskAccessDenied),
DiskError::FileNotFound => Error::new(DiskError::FileNotFound),
DiskError::FileVersionNotFound => Error::new(DiskError::FileVersionNotFound),
DiskError::TooManyOpenFiles => Error::new(DiskError::TooManyOpenFiles),
DiskError::FileNameTooLong => Error::new(DiskError::FileNameTooLong),
DiskError::VolumeExists => Error::new(DiskError::VolumeExists),
DiskError::IsNotRegular => Error::new(DiskError::IsNotRegular),
DiskError::PathNotFound => Error::new(DiskError::PathNotFound),
DiskError::VolumeNotFound => Error::new(DiskError::VolumeNotFound),
DiskError::VolumeNotEmpty => Error::new(DiskError::VolumeNotEmpty),
DiskError::VolumeAccessDenied => Error::new(DiskError::VolumeAccessDenied),
DiskError::FileAccessDenied => Error::new(DiskError::FileAccessDenied),
DiskError::FileCorrupt => Error::new(DiskError::FileCorrupt),
DiskError::BitrotHashAlgoInvalid => Error::new(DiskError::BitrotHashAlgoInvalid),
DiskError::CrossDeviceLink => Error::new(DiskError::CrossDeviceLink),
DiskError::LessData => Error::new(DiskError::LessData),
DiskError::MoreData => Error::new(DiskError::MoreData),
DiskError::IoError(ioerr) => Error::msg(ioerr.to_string()),
}
} else {
Error::msg(err.to_string())
pub fn clone_disk_err(e: &DiskError) -> Error {
match e {
DiskError::MaxVersionsExceeded => Error::new(DiskError::MaxVersionsExceeded),
DiskError::Unexpected => Error::new(DiskError::Unexpected),
DiskError::CorruptedFormat => Error::new(DiskError::CorruptedFormat),
DiskError::CorruptedBackend => Error::new(DiskError::CorruptedBackend),
DiskError::UnformattedDisk => Error::new(DiskError::UnformattedDisk),
DiskError::InconsistentDisk => Error::new(DiskError::InconsistentDisk),
DiskError::UnsupportedDisk => Error::new(DiskError::UnsupportedDisk),
DiskError::DiskFull => Error::new(DiskError::DiskFull),
DiskError::DiskNotDir => Error::new(DiskError::DiskNotDir),
DiskError::DiskNotFound => Error::new(DiskError::DiskNotFound),
DiskError::DiskOngoingReq => Error::new(DiskError::DiskOngoingReq),
DiskError::DriveIsRoot => Error::new(DiskError::DriveIsRoot),
DiskError::FaultyRemoteDisk => Error::new(DiskError::FaultyRemoteDisk),
DiskError::FaultyDisk => Error::new(DiskError::FaultyDisk),
DiskError::DiskAccessDenied => Error::new(DiskError::DiskAccessDenied),
DiskError::FileNotFound => Error::new(DiskError::FileNotFound),
DiskError::FileVersionNotFound => Error::new(DiskError::FileVersionNotFound),
DiskError::TooManyOpenFiles => Error::new(DiskError::TooManyOpenFiles),
DiskError::FileNameTooLong => Error::new(DiskError::FileNameTooLong),
DiskError::VolumeExists => Error::new(DiskError::VolumeExists),
DiskError::IsNotRegular => Error::new(DiskError::IsNotRegular),
DiskError::PathNotFound => Error::new(DiskError::PathNotFound),
DiskError::VolumeNotFound => Error::new(DiskError::VolumeNotFound),
DiskError::VolumeNotEmpty => Error::new(DiskError::VolumeNotEmpty),
DiskError::VolumeAccessDenied => Error::new(DiskError::VolumeAccessDenied),
DiskError::FileAccessDenied => Error::new(DiskError::FileAccessDenied),
DiskError::FileCorrupt => Error::new(DiskError::FileCorrupt),
DiskError::BitrotHashAlgoInvalid => Error::new(DiskError::BitrotHashAlgoInvalid),
DiskError::CrossDeviceLink => Error::new(DiskError::CrossDeviceLink),
DiskError::LessData => Error::new(DiskError::LessData),
DiskError::MoreData => Error::new(DiskError::MoreData),
}
}
pub fn ioerr_to_diskerr(e:io::Error)->DiskError{
match e.kind(){
io::ErrorKind::NotFound => DiskError::FileNotFound,
io::ErrorKind::PermissionDenied => DiskError::FileAccessDenied,
// io::ErrorKind::ConnectionRefused => todo!(),
// io::ErrorKind::ConnectionReset => todo!(),
// io::ErrorKind::HostUnreachable => todo!(),
// io::ErrorKind::NetworkUnreachable => todo!(),
// io::ErrorKind::ConnectionAborted => todo!(),
// io::ErrorKind::NotConnected => todo!(),
// io::ErrorKind::AddrInUse => todo!(),
// io::ErrorKind::AddrNotAvailable => todo!(),
// io::ErrorKind::NetworkDown => todo!(),
// io::ErrorKind::BrokenPipe => todo!(),
// io::ErrorKind::AlreadyExists => todo!(),
// io::ErrorKind::WouldBlock => todo!(),
// io::ErrorKind::NotADirectory => DiskError::FileNotFound,
// io::ErrorKind::IsADirectory => DiskError::FileNotFound,
// io::ErrorKind::DirectoryNotEmpty => DiskError::VolumeNotEmpty,
// io::ErrorKind::ReadOnlyFilesystem => todo!(),
// io::ErrorKind::FilesystemLoop => todo!(),
// io::ErrorKind::StaleNetworkFileHandle => todo!(),
// io::ErrorKind::InvalidInput => todo!(),
// io::ErrorKind::InvalidData => todo!(),
// io::ErrorKind::TimedOut => todo!(),
// io::ErrorKind::WriteZero => todo!(),
// io::ErrorKind::StorageFull => DiskError::DiskFull,
// io::ErrorKind::NotSeekable => todo!(),
// io::ErrorKind::FilesystemQuotaExceeded => todo!(),
// io::ErrorKind::FileTooLarge => todo!(),
// io::ErrorKind::ResourceBusy => todo!(),
// io::ErrorKind::ExecutableFileBusy => todo!(),
// io::ErrorKind::Deadlock => todo!(),
// io::ErrorKind::CrossesDevices => todo!(),
// io::ErrorKind::TooManyLinks =>DiskError::TooManyOpenFiles,
// io::ErrorKind::InvalidFilename => todo!(),
// io::ErrorKind::ArgumentListTooLong => todo!(),
// io::ErrorKind::Interrupted => todo!(),
// io::ErrorKind::Unsupported => todo!(),
// io::ErrorKind::UnexpectedEof => todo!(),
// io::ErrorKind::OutOfMemory => todo!(),
// io::ErrorKind::Other => todo!(),
// TODO: 把不支持的king用字符串处理
_ => DiskError::IoError(e),
}
pub fn ioerr_to_err(e: io::Error) -> Error {
match e.kind() {
io::ErrorKind::NotFound => Error::new(DiskError::FileNotFound),
io::ErrorKind::PermissionDenied => Error::new(DiskError::FileAccessDenied),
// io::ErrorKind::ConnectionRefused => todo!(),
// io::ErrorKind::ConnectionReset => todo!(),
// io::ErrorKind::HostUnreachable => todo!(),
// io::ErrorKind::NetworkUnreachable => todo!(),
// io::ErrorKind::ConnectionAborted => todo!(),
// io::ErrorKind::NotConnected => todo!(),
// io::ErrorKind::AddrInUse => todo!(),
// io::ErrorKind::AddrNotAvailable => todo!(),
// io::ErrorKind::NetworkDown => todo!(),
// io::ErrorKind::BrokenPipe => todo!(),
// io::ErrorKind::AlreadyExists => todo!(),
// io::ErrorKind::WouldBlock => todo!(),
// io::ErrorKind::NotADirectory => DiskError::FileNotFound,
// io::ErrorKind::IsADirectory => DiskError::FileNotFound,
// io::ErrorKind::DirectoryNotEmpty => DiskError::VolumeNotEmpty,
// io::ErrorKind::ReadOnlyFilesystem => todo!(),
// io::ErrorKind::FilesystemLoop => todo!(),
// io::ErrorKind::StaleNetworkFileHandle => todo!(),
// io::ErrorKind::InvalidInput => todo!(),
// io::ErrorKind::InvalidData => todo!(),
// io::ErrorKind::TimedOut => todo!(),
// io::ErrorKind::WriteZero => todo!(),
// io::ErrorKind::StorageFull => DiskError::DiskFull,
// io::ErrorKind::NotSeekable => todo!(),
// io::ErrorKind::FilesystemQuotaExceeded => todo!(),
// io::ErrorKind::FileTooLarge => todo!(),
// io::ErrorKind::ResourceBusy => todo!(),
// io::ErrorKind::ExecutableFileBusy => todo!(),
// io::ErrorKind::Deadlock => todo!(),
// io::ErrorKind::CrossesDevices => todo!(),
// io::ErrorKind::TooManyLinks =>DiskError::TooManyOpenFiles,
// io::ErrorKind::InvalidFilename => todo!(),
// io::ErrorKind::ArgumentListTooLong => todo!(),
// io::ErrorKind::Interrupted => todo!(),
// io::ErrorKind::Unsupported => todo!(),
// io::ErrorKind::UnexpectedEof => todo!(),
// io::ErrorKind::OutOfMemory => todo!(),
// io::ErrorKind::Other => todo!(),
// TODO: 把不支持的king用字符串处理
_ => Error::new(e),
}
}
pub fn os_is_not_exist(e:io::Error) -> bool{
e.kind() == ErrorKind::NotFound
}
pub fn os_is_not_exist(e: io::Error) -> bool {
e.kind() == ErrorKind::NotFound
}

View File

@@ -1,4 +1,4 @@
use super::error::{ioerr_to_diskerr, os_is_not_exist};
use super::error::os_is_not_exist;
use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
use super::{
os, DeleteOptions, DiskAPI, DiskLocation, FileInfoVersions, FileReader, FileWriter, MetaCacheEntry, ReadMultipleReq,
@@ -971,7 +971,7 @@ impl DiskAPI for LocalDisk {
})?;
for entry in entries {
if utils::path::has_suffix(&entry, SLASH_SEPARATOR) || !Self::is_valid_volname(&entry) {
if !utils::path::has_suffix(&entry, SLASH_SEPARATOR) || !Self::is_valid_volname(&entry) {
continue;
}

View File

@@ -8,7 +8,7 @@ use crate::{
utils,
};
use super::error::{ioerr_to_diskerr, DiskError};
use super::error::{ioerr_to_err, DiskError};
fn check_path_length(path_name: &str) -> Result<()> {
// Apple OS X path length is limited to 1016
@@ -51,7 +51,7 @@ fn check_path_length(path_name: &str) -> Result<()> {
pub async fn make_dir_all(path: impl AsRef<Path>) -> Result<()> {
check_path_length(path.as_ref().to_string_lossy().to_string().as_str())?;
utils::fs::make_dir_all(path.as_ref()).map_err(ioerr_to_diskerr).await?;
utils::fs::make_dir_all(path.as_ref()).map_err(ioerr_to_err).await?;
Ok(())
}

View File

@@ -401,10 +401,12 @@ impl DiskAPI for RemoteDisk {
Ok(volume_info)
}
async fn delete_paths(&self, volume: &str, paths: &[&str]) -> Result<()> {
async fn delete_paths(&self, _volume: &str, _paths: &[&str]) -> Result<()> {
// TODO:
unimplemented!()
}
async fn update_metadata(&self, volume: &str, path: &str, fi: FileInfo, opts: UpdateMetadataOpts) {
async fn update_metadata(&self, _volume: &str, _path: &str, _fi: FileInfo, _opts: UpdateMetadataOpts) {
// TODO:
unimplemented!()
}

View File

@@ -94,11 +94,14 @@ impl Erasure {
}
}
warn!("Erasure encode errs {:?}", errs);
warn!("Erasure encode errs {:?}", &errs);
let err_idx = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum)?;
if errs[err_idx].is_some() {
let err = errs[err_idx].take().unwrap();
let none_count = errs.iter().filter(|&x| x.is_none()).count();
if none_count >= write_quorum {
continue;
}
if let Some(err) = reduce_write_quorum_errs(&errs, object_op_ignored_errs().as_ref(), write_quorum) {
return Err(err);
}
}

View File

@@ -1,5 +1,9 @@
use std::io;
use tracing_error::{SpanTrace, SpanTraceStatus};
use crate::disk::error::{clone_disk_err, DiskError};
pub type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type Result<T = (), E = Error> = std::result::Result<T, E>;
@@ -80,3 +84,16 @@ impl std::fmt::Display for Error {
Ok(())
}
}
impl Clone for Error {
fn clone(&self) -> Self {
if let Some(e) = self.downcast_ref::<DiskError>() {
clone_disk_err(e)
} else if let Some(e) = self.downcast_ref::<io::Error>() {
Error::new(io::Error::new(e.kind(), e.to_string()))
} else {
// TODO: 优化其他类型
Error::msg(self.to_string())
}
}
}

View File

@@ -262,6 +262,7 @@ impl PeerS3Client for LocalPeerS3Client {
}
}
warn!("list_bucket ress {:?}", &ress);
warn!("list_bucket errs {:?}", &errs);
let mut uniq_map: HashMap<&String, &VolumeInfo> = HashMap::new();

View File

@@ -1,4 +1,7 @@
use crate::{disk::error::DiskError, error::Error};
use crate::{
disk::error::DiskError,
error::{Error, Result},
};
use std::{collections::HashMap, fmt::Debug};
// pub type CheckErrorFn = fn(e: &Error) -> bool;
@@ -27,8 +30,7 @@ pub fn base_ignored_errs() -> Vec<Box<dyn CheckErrorFn>> {
pub fn object_op_ignored_errs() -> Vec<Box<dyn CheckErrorFn>> {
let mut base = base_ignored_errs();
let ext:Vec<Box<dyn CheckErrorFn>> = vec![
let ext: Vec<Box<dyn CheckErrorFn>> = vec![
// Box::new(DiskError::DiskNotFound),
// Box::new(DiskError::FaultyDisk),
// Box::new(DiskError::FaultyRemoteDisk),
@@ -47,7 +49,7 @@ fn is_err_ignored(err: &Error, ignored_errs: &Vec<Box<dyn CheckErrorFn>>) -> boo
}
// 减少错误数量并返回出现次数最多的错误
fn reduce_errs(errs: &Vec<Option<Error>>, ignored_errs: &Vec<Box<dyn CheckErrorFn>>) -> (usize, Option<usize>) {
fn reduce_errs(errs: &Vec<Option<Error>>, ignored_errs: &Vec<Box<dyn CheckErrorFn>>) -> (usize, Option<Error>) {
let mut error_counts: HashMap<String, usize> = HashMap::new();
let mut error_map: HashMap<String, usize> = HashMap::new(); // 存err位置
let nil = "nil".to_string();
@@ -80,8 +82,10 @@ fn reduce_errs(errs: &Vec<Option<Error>>, ignored_errs: &Vec<Box<dyn CheckErrorF
}
if let Some(&c) = error_counts.get(&max_err) {
if let Some(&err) = error_map.get(&max_err) {
return (c, Some(err));
if let Some(&err_idx) = error_map.get(&max_err) {
let err = errs[err_idx].clone();
return (c, err);
}
return (c, None);
@@ -91,12 +95,17 @@ fn reduce_errs(errs: &Vec<Option<Error>>, ignored_errs: &Vec<Box<dyn CheckErrorF
}
// 根据quorum验证错误数量
fn reduce_quorum_errs(errs: &Vec<Option<Error>>, ignored_errs: &Vec<Box<dyn CheckErrorFn>>, quorum: usize) -> Option<usize> {
fn reduce_quorum_errs(
errs: &Vec<Option<Error>>,
ignored_errs: &Vec<Box<dyn CheckErrorFn>>,
quorum: usize,
quorum_err: QuorumError,
) -> Option<Error> {
let (max_count, max_err) = reduce_errs(errs, ignored_errs);
if max_count >= quorum {
max_err
} else {
None
Some(Error::new(quorum_err))
}
}
@@ -106,13 +115,8 @@ pub fn reduce_read_quorum_errs(
errs: &mut Vec<Option<Error>>,
ignored_errs: &Vec<Box<dyn CheckErrorFn>>,
read_quorum: usize,
) -> Result<usize, Error> {
let idx = reduce_quorum_errs(errs, ignored_errs, read_quorum);
if idx.is_none() {
return Err(Error::new(QuorumError::Read));
}
Ok(idx.unwrap())
) -> Option<Error> {
reduce_quorum_errs(errs, ignored_errs, read_quorum, QuorumError::Read)
}
// 根据写quorum验证错误数量
@@ -121,12 +125,6 @@ pub fn reduce_write_quorum_errs(
errs: &Vec<Option<Error>>,
ignored_errs: &Vec<Box<dyn CheckErrorFn>>,
write_quorum: usize,
) -> Result<usize, Error> {
let idx = reduce_quorum_errs(errs, ignored_errs, write_quorum);
if idx.is_none() {
return Err(Error::new(QuorumError::Write));
}
Ok(idx.unwrap())
) -> Option<Error> {
reduce_quorum_errs(errs, ignored_errs, write_quorum, QuorumError::Write)
}

View File

@@ -1,10 +1,10 @@
use std::{fs::Metadata, path::Path};
use std::{fs::Metadata, path::Path};
use tokio::{fs, io};
#[cfg(target_os = "linux")]
#[cfg(not(target_os = "windows"))]
pub fn same_file(f1: &Metadata, f2: &Metadata) -> bool {
use os::unix::fs::MetadataExt;
use std::os::unix::fs::MetadataExt;
if f1.dev() != f2.dev() {
return false;
@@ -31,12 +31,11 @@ pub fn same_file(f1: &Metadata, f2: &Metadata) -> bool {
#[cfg(target_os = "windows")]
pub fn same_file(f1: &Metadata, f2: &Metadata) -> bool {
if f1.permissions() != f2.permissions() {
return false;
return false;
}
if f1.file_type() != f2.file_type() {
return false;
return false;
}
if f1.len() != f2.len() {
@@ -45,11 +44,11 @@ pub fn same_file(f1: &Metadata, f2: &Metadata) -> bool {
true
}
pub async fn access(path: impl AsRef<Path>) -> io::Result<()>{
pub async fn access(path: impl AsRef<Path>) -> io::Result<()> {
fs::metadata(path).await?;
Ok(())
}
pub async fn make_dir_all(path: impl AsRef<Path>) -> io::Result<()>{
pub async fn make_dir_all(path: impl AsRef<Path>) -> io::Result<()> {
fs::create_dir_all(path.as_ref()).await
}
}

View File

@@ -1,24 +1,27 @@
#!/bin/bash
current_dir=$(pwd)
mkdir -p ./target/volume/test
mkdir -p ./target/volume/test{0..4}
# DATA_DIR="./target/volume/test"
DATA_DIR="./target/volume/test{0...4}"
if [ -n "$1" ]; then
DATA_DIR="$1"
fi
if [ -z "$RUST_LOG" ]; then
export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug"
fi
# cargo run "$DATA_DIR"
DATA_DIR_ARG="./target/volume/test{0...4}"
if [ -n "$1" ]; then
DATA_DIR_ARG="$1"
fi
# cargo run "$DATA_DIR_ARG"
# -- --access-key AKEXAMPLERUSTFS \
# --secret-key SKEXAMPLERUSTFS \
# --address 0.0.0.0:9010 \
# --domain-name 127.0.0.1:9010 \
# "$DATA_DIR"
# "$DATA_DIR_ARG"
cargo run "$DATA_DIR"
cargo run "$DATA_DIR_ARG"