fix: 小优化

This commit is contained in:
shiro.lee
2024-08-05 22:28:04 +08:00
parent 7b8c11ecf3
commit 4735cedba2
10 changed files with 80 additions and 98 deletions

View File

@@ -1,4 +1,4 @@
use crate::error::{Error, Result, StdError};
use crate::error::{Error, Result};
#[derive(Debug, thiserror::Error)]
pub enum DiskError {
@@ -34,43 +34,51 @@ pub enum DiskError {
}
impl DiskError {
pub fn check_disk_fatal_errs(errs: &Vec<Option<Error>>) -> Result<()> {
if Self::count_errs(errs, &DiskError::UnsupportedDisk) == errs.len() {
/// Checks if the given array of errors contains fatal disk errors.
/// If all errors are of the same fatal disk error type, returns the corresponding error.
/// Otherwise, returns Ok.
///
/// # Parameters
/// - `errs`: A slice of optional errors.
///
/// # Returns
/// If all errors are of the same fatal disk error type, returns the corresponding error.
/// Otherwise, returns Ok.
pub fn check_disk_fatal_errs(errs: &[Option<Error>]) -> Result<()> {
if DiskError::UnsupportedDisk.count_errs(errs) == errs.len() {
return Err(DiskError::UnsupportedDisk.into());
}
if Self::count_errs(errs, &DiskError::FileAccessDenied) == errs.len() {
if DiskError::FileAccessDenied.count_errs(errs) == errs.len() {
return Err(DiskError::FileAccessDenied.into());
}
if Self::count_errs(errs, &DiskError::DiskNotDir) == errs.len() {
if DiskError::DiskNotDir.count_errs(errs) == errs.len() {
return Err(DiskError::DiskNotDir.into());
}
Ok(())
}
pub fn count_errs(errs: &Vec<Option<Error>>, err: &DiskError) -> usize {
pub fn count_errs(&self, errs: &[Option<Error>]) -> usize {
return errs
.iter()
.filter(|&e| {
if e.is_some() {
let e = e.as_ref().unwrap();
let cast = e.downcast_ref::<DiskError>();
if cast.is_some() {
let cast = cast.unwrap();
return cast == err;
}
}
false
.filter(|&err| match err {
None => false,
Some(e) => self.is(e),
})
.count();
}
pub fn quorum_unformatted_disks(errs: &Vec<Option<Error>>) -> bool {
Self::count_errs(errs, &DiskError::UnformattedDisk) >= (errs.len() / 2) + 1
pub fn quorum_unformatted_disks(errs: &[Option<Error>]) -> bool {
DiskError::UnformattedDisk.count_errs(errs) > (errs.len() / 2)
}
pub fn should_init_erasure_disks(errs: &[Option<Error>]) -> bool {
DiskError::UnformattedDisk.count_errs(errs) == errs.len()
}
/// Check if the error is a disk error
pub fn is(&self, err: &Error) -> bool {
if let Some(e) = err.downcast_ref::<DiskError>() {
e == self
@@ -78,18 +86,6 @@ impl DiskError {
false
}
}
pub fn is_err<T: std::error::Error + ?Sized>(err: &T, disk_err: &DiskError) -> bool {
return false;
// let cast = err.
// if cast.is_none() {
// return false;
// }
// let e = cast.unwrap();
// e == disk_err
}
}
impl PartialEq for DiskError {

View File

@@ -4,16 +4,12 @@ use futures::future::join_all;
use path_absolutize::Absolutize;
use std::{
fs::Metadata,
io::{self, SeekFrom},
path::{Path, PathBuf},
sync::Arc,
};
use time::OffsetDateTime;
use tokio::io::{AsyncReadExt, ErrorKind};
use tokio::{
fs::{self, File},
io::AsyncSeekExt,
};
use tokio::fs::{self, File};
use tokio::io::ErrorKind;
use tracing::{debug, warn};
use uuid::Uuid;
@@ -23,7 +19,6 @@ use crate::{
VolumeInfo,
},
endpoint::{Endpoint, Endpoints},
erasure::ReadAt,
error::{Error, Result},
file_meta::FileMeta,
store_api::{FileInfo, RawFileInfo},
@@ -127,7 +122,7 @@ impl LocalDisk {
root,
id,
format_meta,
format_data: format_data,
format_data,
format_path,
// format_legacy,
format_last_check,
@@ -181,7 +176,7 @@ impl LocalDisk {
// }
pub async fn rename_all(&self, src_data_path: &PathBuf, dst_data_path: &PathBuf, skip: &PathBuf) -> Result<()> {
if !skip.starts_with(&src_data_path) {
if !skip.starts_with(src_data_path) {
fs::create_dir_all(dst_data_path.parent().unwrap_or(Path::new("/"))).await?;
}
@@ -211,7 +206,7 @@ impl LocalDisk {
// fs::create_dir_all(&trash_path).await?;
fs::rename(&delete_path, &trash_path).await.map_err(|err| {
// 使用文件路径自定义错误信息
io::Error::new(
std::io::Error::new(
err.kind(),
format!("Failed to rename file '{:?}' to '{:?}': {}", &delete_path, &trash_path, err),
)
@@ -279,8 +274,8 @@ impl LocalDisk {
}
}
fn is_root_path(path: &PathBuf) -> bool {
path.components().count() == 1 && path.has_root()
fn is_root_path(path: impl AsRef<Path>) -> bool {
path.as_ref().components().count() == 1 && path.as_ref().has_root()
}
// 过滤 std::io::ErrorKind::NotFound
@@ -342,7 +337,7 @@ pub async fn check_volume_exists(p: impl AsRef<Path>) -> Result<()> {
}
fn skip_access_checks(p: impl AsRef<str>) -> bool {
let vols = vec![
let vols = [
super::RUSTFS_META_TMP_DELETED_BUCKET,
super::RUSTFS_META_TMP_BUCKET,
super::RUSTFS_META_MULTIPART_BUCKET,
@@ -370,14 +365,14 @@ impl DiskAPI for LocalDisk {
#[must_use]
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes> {
let p = self.get_object_path(&volume, &path)?;
let p = self.get_object_path(volume, path)?;
let (data, _) = read_file_all(&p).await?;
Ok(Bytes::from(data))
}
async fn write_all(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
let p = self.get_object_path(&volume, &path)?;
let p = self.get_object_path(volume, path)?;
write_all_internal(p, data).await?;
@@ -385,12 +380,12 @@ impl DiskAPI for LocalDisk {
}
async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> {
let vol_path = self.get_bucket_path(&volume)?;
if !skip_access_checks(&volume) {
let vol_path = self.get_bucket_path(volume)?;
if !skip_access_checks(volume) {
check_volume_exists(&vol_path).await?;
}
let fpath = self.get_object_path(&volume, &path)?;
let fpath = self.get_object_path(volume, path)?;
self.delete_file(&vol_path, &fpath, opt.recursive, opt.immediate).await?;
@@ -410,17 +405,17 @@ impl DiskAPI for LocalDisk {
}
async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> {
let src_volume_path = self.get_bucket_path(&src_volume)?;
if !skip_access_checks(&src_volume) {
let src_volume_path = self.get_bucket_path(src_volume)?;
if !skip_access_checks(src_volume) {
check_volume_exists(&src_volume_path).await?;
}
if !skip_access_checks(&dst_volume) {
let vol_path = self.get_bucket_path(&dst_volume)?;
if !skip_access_checks(dst_volume) {
let vol_path = self.get_bucket_path(dst_volume)?;
check_volume_exists(&vol_path).await?;
}
let srcp = self.get_object_path(&src_volume, &src_path)?;
let dstp = self.get_object_path(&dst_volume, &dst_path)?;
let srcp = self.get_object_path(src_volume, src_path)?;
let dstp = self.get_object_path(dst_volume, dst_path)?;
let src_is_dir = srcp.is_dir();
let dst_is_dir = dstp.is_dir();
@@ -477,7 +472,7 @@ impl DiskAPI for LocalDisk {
}
// async fn append_file(&self, volume: &str, path: &str, mut r: DuplexStream) -> Result<File> {
async fn append_file(&self, volume: &str, path: &str) -> Result<FileWriter> {
let p = self.get_object_path(&volume, &path)?;
let p = self.get_object_path(volume, path)?;
if let Some(dir_path) = p.parent() {
fs::create_dir_all(&dir_path).await?;
@@ -505,7 +500,7 @@ impl DiskAPI for LocalDisk {
// Ok(())
}
async fn read_file(&self, volume: &str, path: &str) -> Result<FileReader> {
let p = self.get_object_path(&volume, &path)?;
let p = self.get_object_path(volume, path)?;
debug!("read_file {:?}", &p);
let file = File::options().read(true).open(&p).await?;
@@ -523,7 +518,7 @@ impl DiskAPI for LocalDisk {
// Ok((buffer, bytes_read))
}
async fn list_dir(&self, origvolume: &str, volume: &str, dir_path: &str, count: usize) -> Result<Vec<String>> {
let p = self.get_bucket_path(&volume)?;
let p = self.get_bucket_path(volume)?;
let mut entries = fs::read_dir(&p).await?;
@@ -559,20 +554,18 @@ impl DiskAPI for LocalDisk {
dst_volume: &str,
dst_path: &str,
) -> Result<RenameDataResp> {
let src_volume_path = self.get_bucket_path(&src_volume)?;
if !skip_access_checks(&src_volume) {
let src_volume_path = self.get_bucket_path(src_volume)?;
if !skip_access_checks(src_volume) {
check_volume_exists(&src_volume_path).await?;
}
let dst_volume_path = self.get_bucket_path(&dst_volume)?;
if !skip_access_checks(&dst_volume) {
let dst_volume_path = self.get_bucket_path(dst_volume)?;
if !skip_access_checks(dst_volume) {
check_volume_exists(&dst_volume_path).await?;
}
let src_file_path =
self.get_object_path(&src_volume, format!("{}/{}", &src_path, super::STORAGE_FORMAT_FILE).as_str())?;
let dst_file_path =
self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, super::STORAGE_FORMAT_FILE).as_str())?;
let src_file_path = self.get_object_path(src_volume, format!("{}/{}", &src_path, super::STORAGE_FORMAT_FILE).as_str())?;
let dst_file_path = self.get_object_path(dst_volume, format!("{}/{}", &dst_path, super::STORAGE_FORMAT_FILE).as_str())?;
let (src_data_path, dst_data_path) = {
let mut data_dir = String::new();
@@ -582,10 +575,10 @@ impl DiskAPI for LocalDisk {
if !data_dir.is_empty() {
let src_data_path = self.get_object_path(
&src_volume,
src_volume,
utils::path::retain_slash(format!("{}/{}", &src_path, data_dir).as_str()).as_str(),
)?;
let dst_data_path = self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, data_dir).as_str())?;
let dst_data_path = self.get_object_path(dst_volume, format!("{}/{}", &dst_path, data_dir).as_str())?;
(src_data_path, dst_data_path)
} else {
@@ -652,7 +645,7 @@ impl DiskAPI for LocalDisk {
Ok(())
}
async fn make_volume(&self, volume: &str) -> Result<()> {
let p = self.get_bucket_path(&volume)?;
let p = self.get_bucket_path(volume)?;
match File::open(&p).await {
Ok(_) => (),
Err(e) => match e.kind() {
@@ -706,7 +699,7 @@ impl DiskAPI for LocalDisk {
}
async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> {
let p = self.get_object_path(&volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str())?;
let p = self.get_object_path(volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str())?;
warn!("write_metadata {:?} {:?}", &p, &fi);
@@ -812,7 +805,7 @@ impl DiskAPI for LocalDisk {
}
async fn delete_volume(&self, volume: &str) -> Result<()> {
let p = self.get_bucket_path(&volume)?;
let p = self.get_bucket_path(volume)?;
fs::remove_dir_all(&p).await?;
@@ -829,7 +822,7 @@ mod test {
async fn test_skip_access_checks() {
// let arr = Vec::new();
let vols = vec![
let vols = [
super::super::RUSTFS_META_TMP_DELETED_BUCKET,
super::super::RUSTFS_META_TMP_BUCKET,
super::super::RUSTFS_META_MULTIPART_BUCKET,

View File

@@ -4,13 +4,13 @@ use bytes::Bytes;
use time::OffsetDateTime;
use tokio::{
fs::File,
io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite},
io::{AsyncReadExt, AsyncSeekExt, AsyncWrite},
};
use uuid::Uuid;
use crate::{
erasure::ReadAt,
error::{Error, Result},
error::Result,
store_api::{FileInfo, RawFileInfo},
};

View File

@@ -1,5 +1,5 @@
use super::ellipses::*;
use super::error::{Error, Result};
use crate::utils::ellipses::*;
use serde::Deserialize;
use std::collections::HashSet;
@@ -163,7 +163,7 @@ fn get_all_sets<T: AsRef<str>>(is_ellipses: bool, args: &[T]) -> Result<Vec<Vec<
/// methods to get the sets of endpoints.
#[derive(Debug, Default)]
pub struct EndpointSet {
pub arg_patterns: Vec<ArgPattern>,
pub _arg_patterns: Vec<ArgPattern>,
pub endpoints: Vec<String>,
pub set_indexes: Vec<Vec<usize>>,
}
@@ -190,7 +190,7 @@ impl<T: AsRef<str>> TryFrom<&[T]> for EndpointSet {
Ok(EndpointSet {
set_indexes,
arg_patterns,
_arg_patterns: arg_patterns,
endpoints,
})
}
@@ -357,11 +357,11 @@ fn get_total_sizes(arg_patterns: &[ArgPattern]) -> Vec<usize> {
mod test {
use super::*;
use crate::ellipses;
use crate::utils::ellipses;
impl PartialEq for EndpointSet {
fn eq(&self, other: &Self) -> bool {
self.arg_patterns == other.arg_patterns && self.set_indexes == other.set_indexes
self._arg_patterns == other._arg_patterns && self.set_indexes == other.set_indexes
}
}
@@ -621,7 +621,7 @@ mod test {
num: 6,
arg: "{1...27}",
es: EndpointSet {
arg_patterns: vec![ArgPattern::new(vec![Pattern {
_arg_patterns: vec![ArgPattern::new(vec![Pattern {
seq: get_sequences(1, 27, 0),
..Default::default()
}])],
@@ -634,7 +634,7 @@ mod test {
num: 7,
arg: "/export/set{1...64}",
es: EndpointSet {
arg_patterns: vec![ArgPattern::new(vec![Pattern {
_arg_patterns: vec![ArgPattern::new(vec![Pattern {
seq: get_sequences(1, 64, 0),
prefix: "/export/set".to_owned(),
..Default::default()
@@ -649,7 +649,7 @@ mod test {
num: 8,
arg: "http://rustfs{2...3}/export/set{1...64}",
es: EndpointSet {
arg_patterns: vec![ArgPattern::new(vec![
_arg_patterns: vec![ArgPattern::new(vec![
Pattern {
seq: get_sequences(1, 64, 0),
..Default::default()
@@ -670,7 +670,7 @@ mod test {
num: 9,
arg: "http://rustfs{1...64}.mydomain.net/data",
es: EndpointSet {
arg_patterns: vec![ArgPattern::new(vec![Pattern {
_arg_patterns: vec![ArgPattern::new(vec![Pattern {
seq: get_sequences(1, 64, 0),
prefix: "http://rustfs".to_owned(),
suffix: ".mydomain.net/data".to_owned(),
@@ -684,7 +684,7 @@ mod test {
num: 10,
arg: "http://rack{1...4}.mydomain.rustfs{1...16}/data",
es: EndpointSet {
arg_patterns: vec![ArgPattern::new(vec![
_arg_patterns: vec![ArgPattern::new(vec![
Pattern {
seq: get_sequences(1, 16, 0),
suffix: "/data".to_owned(),
@@ -706,7 +706,7 @@ mod test {
num: 11,
arg: "http://rustfs{0...15}.mydomain.net/data{0...1}",
es: EndpointSet {
arg_patterns: vec![ArgPattern::new(vec![
_arg_patterns: vec![ArgPattern::new(vec![
Pattern {
seq: get_sequences(0, 1, 0),
..Default::default()
@@ -727,7 +727,7 @@ mod test {
num: 12,
arg: "http://server1/data{1...32}",
es: EndpointSet {
arg_patterns: vec![ArgPattern::new(vec![Pattern {
_arg_patterns: vec![ArgPattern::new(vec![Pattern {
seq: get_sequences(1, 32, 0),
prefix: "http://server1/data".to_owned(),
..Default::default()
@@ -742,7 +742,7 @@ mod test {
num: 13,
arg: "http://server1/data{01...32}",
es: EndpointSet {
arg_patterns: vec![ArgPattern::new(vec![Pattern {
_arg_patterns: vec![ArgPattern::new(vec![Pattern {
seq: get_sequences(1, 32, 2),
prefix: "http://server1/data".to_owned(),
..Default::default()
@@ -757,7 +757,7 @@ mod test {
num: 14,
arg: "http://rustfs{2...3}/export/set{1...64}/test{1...2}",
es: EndpointSet {
arg_patterns: vec![ArgPattern::new(vec![
_arg_patterns: vec![ArgPattern::new(vec![
Pattern {
seq: get_sequences(1, 2, 0),
..Default::default()
@@ -783,7 +783,7 @@ mod test {
num: 15,
arg: "/export{1...10}/disk{1...10}",
es: EndpointSet {
arg_patterns: vec![ArgPattern::new(vec![
_arg_patterns: vec![ArgPattern::new(vec![
Pattern {
seq: get_sequences(1, 10, 0),
..Default::default()

View File

@@ -3,7 +3,6 @@ use bytes::Bytes;
use futures::future::join_all;
use futures::{Stream, StreamExt};
use reed_solomon_erasure::galois_8::ReedSolomon;
use s3s::dto::StreamingBlob;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::io::DuplexStream;

View File

@@ -3,7 +3,6 @@ mod chunk_stream;
pub mod disk;
pub mod disk_api;
mod disks_layout;
mod ellipses;
mod endpoint;
mod erasure;
pub mod error;

View File

@@ -16,7 +16,6 @@ use crate::{
use http::HeaderMap;
use s3s::{dto::StreamingBlob, Body};
use std::collections::HashMap;
use tracing::debug;
use uuid::Uuid;
#[derive(Debug)]

View File

@@ -27,7 +27,7 @@ pub async fn do_init_format_file(
check_format_erasure_values(&formats, set_drive_count)?;
if first_disk && should_init_erasure_disks(&errs) {
if first_disk && DiskError::should_init_erasure_disks(&errs) {
// UnformattedDisk, not format file create
// new format and save
let fms = init_format_files(&disks, set_count, set_drive_count, deployment_id);
@@ -115,10 +115,6 @@ fn get_format_file_in_quorum(formats: &Vec<Option<FormatV3>>) -> Result<FormatV3
Ok(format)
}
fn should_init_erasure_disks(errs: &Vec<Option<Error>>) -> bool {
DiskError::count_errs(errs, &DiskError::UnformattedDisk) == errs.len()
}
fn check_format_erasure_values(
formats: &Vec<Option<FormatV3>>,
// disks: &Vec<Option<DiskStore>>,

View File

@@ -1,6 +1,5 @@
use crate::error::{Error, Result};
use lazy_static::*;
use super::error::{Error, Result};
use regex::Regex;
lazy_static! {

View File

@@ -1,4 +1,5 @@
pub mod crypto;
pub mod ellipses;
pub mod hash;
pub mod net;
pub mod path;