diff --git a/ecstore/src/disk/error.rs b/ecstore/src/disk/error.rs index d475332d..0b5e3630 100644 --- a/ecstore/src/disk/error.rs +++ b/ecstore/src/disk/error.rs @@ -1,4 +1,4 @@ -use crate::error::{Error, Result, StdError}; +use crate::error::{Error, Result}; #[derive(Debug, thiserror::Error)] pub enum DiskError { @@ -34,43 +34,51 @@ pub enum DiskError { } impl DiskError { - pub fn check_disk_fatal_errs(errs: &Vec>) -> Result<()> { - if Self::count_errs(errs, &DiskError::UnsupportedDisk) == errs.len() { + /// Checks if the given array of errors contains fatal disk errors. + /// If all errors are of the same fatal disk error type, returns the corresponding error. + /// Otherwise, returns Ok. + /// + /// # Parameters + /// - `errs`: A slice of optional errors. + /// + /// # Returns + /// If all errors are of the same fatal disk error type, returns the corresponding error. + /// Otherwise, returns Ok. + pub fn check_disk_fatal_errs(errs: &[Option]) -> Result<()> { + if DiskError::UnsupportedDisk.count_errs(errs) == errs.len() { return Err(DiskError::UnsupportedDisk.into()); } - if Self::count_errs(errs, &DiskError::FileAccessDenied) == errs.len() { + if DiskError::FileAccessDenied.count_errs(errs) == errs.len() { return Err(DiskError::FileAccessDenied.into()); } - if Self::count_errs(errs, &DiskError::DiskNotDir) == errs.len() { + if DiskError::DiskNotDir.count_errs(errs) == errs.len() { return Err(DiskError::DiskNotDir.into()); } Ok(()) } - pub fn count_errs(errs: &Vec>, err: &DiskError) -> usize { + pub fn count_errs(&self, errs: &[Option]) -> usize { return errs .iter() - .filter(|&e| { - if e.is_some() { - let e = e.as_ref().unwrap(); - let cast = e.downcast_ref::(); - if cast.is_some() { - let cast = cast.unwrap(); - return cast == err; - } - } - false + .filter(|&err| match err { + None => false, + Some(e) => self.is(e), }) .count(); } - pub fn quorum_unformatted_disks(errs: &Vec>) -> bool { - Self::count_errs(errs, &DiskError::UnformattedDisk) >= (errs.len() / 2) + 1 + pub fn quorum_unformatted_disks(errs: &[Option]) -> bool { + DiskError::UnformattedDisk.count_errs(errs) > (errs.len() / 2) } + pub fn should_init_erasure_disks(errs: &[Option]) -> bool { + DiskError::UnformattedDisk.count_errs(errs) == errs.len() + } + + /// Check if the error is a disk error pub fn is(&self, err: &Error) -> bool { if let Some(e) = err.downcast_ref::() { e == self @@ -78,18 +86,6 @@ impl DiskError { false } } - - pub fn is_err(err: &T, disk_err: &DiskError) -> bool { - return false; - // let cast = err. - // if cast.is_none() { - // return false; - // } - - // let e = cast.unwrap(); - - // e == disk_err - } } impl PartialEq for DiskError { diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 6c9822a8..4b0b77f2 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -4,16 +4,12 @@ use futures::future::join_all; use path_absolutize::Absolutize; use std::{ fs::Metadata, - io::{self, SeekFrom}, path::{Path, PathBuf}, sync::Arc, }; use time::OffsetDateTime; -use tokio::io::{AsyncReadExt, ErrorKind}; -use tokio::{ - fs::{self, File}, - io::AsyncSeekExt, -}; +use tokio::fs::{self, File}; +use tokio::io::ErrorKind; use tracing::{debug, warn}; use uuid::Uuid; @@ -23,7 +19,6 @@ use crate::{ VolumeInfo, }, endpoint::{Endpoint, Endpoints}, - erasure::ReadAt, error::{Error, Result}, file_meta::FileMeta, store_api::{FileInfo, RawFileInfo}, @@ -127,7 +122,7 @@ impl LocalDisk { root, id, format_meta, - format_data: format_data, + format_data, format_path, // format_legacy, format_last_check, @@ -181,7 +176,7 @@ impl LocalDisk { // } pub async fn rename_all(&self, src_data_path: &PathBuf, dst_data_path: &PathBuf, skip: &PathBuf) -> Result<()> { - if !skip.starts_with(&src_data_path) { + if !skip.starts_with(src_data_path) { fs::create_dir_all(dst_data_path.parent().unwrap_or(Path::new("/"))).await?; } @@ -211,7 +206,7 @@ impl LocalDisk { // fs::create_dir_all(&trash_path).await?; fs::rename(&delete_path, &trash_path).await.map_err(|err| { // 使用文件路径自定义错误信息 - io::Error::new( + std::io::Error::new( err.kind(), format!("Failed to rename file '{:?}' to '{:?}': {}", &delete_path, &trash_path, err), ) @@ -279,8 +274,8 @@ impl LocalDisk { } } -fn is_root_path(path: &PathBuf) -> bool { - path.components().count() == 1 && path.has_root() +fn is_root_path(path: impl AsRef) -> bool { + path.as_ref().components().count() == 1 && path.as_ref().has_root() } // 过滤 std::io::ErrorKind::NotFound @@ -342,7 +337,7 @@ pub async fn check_volume_exists(p: impl AsRef) -> Result<()> { } fn skip_access_checks(p: impl AsRef) -> bool { - let vols = vec![ + let vols = [ super::RUSTFS_META_TMP_DELETED_BUCKET, super::RUSTFS_META_TMP_BUCKET, super::RUSTFS_META_MULTIPART_BUCKET, @@ -370,14 +365,14 @@ impl DiskAPI for LocalDisk { #[must_use] async fn read_all(&self, volume: &str, path: &str) -> Result { - let p = self.get_object_path(&volume, &path)?; + let p = self.get_object_path(volume, path)?; let (data, _) = read_file_all(&p).await?; Ok(Bytes::from(data)) } async fn write_all(&self, volume: &str, path: &str, data: Vec) -> Result<()> { - let p = self.get_object_path(&volume, &path)?; + let p = self.get_object_path(volume, path)?; write_all_internal(p, data).await?; @@ -385,12 +380,12 @@ impl DiskAPI for LocalDisk { } async fn delete(&self, volume: &str, path: &str, opt: DeleteOptions) -> Result<()> { - let vol_path = self.get_bucket_path(&volume)?; - if !skip_access_checks(&volume) { + let vol_path = self.get_bucket_path(volume)?; + if !skip_access_checks(volume) { check_volume_exists(&vol_path).await?; } - let fpath = self.get_object_path(&volume, &path)?; + let fpath = self.get_object_path(volume, path)?; self.delete_file(&vol_path, &fpath, opt.recursive, opt.immediate).await?; @@ -410,17 +405,17 @@ impl DiskAPI for LocalDisk { } async fn rename_file(&self, src_volume: &str, src_path: &str, dst_volume: &str, dst_path: &str) -> Result<()> { - let src_volume_path = self.get_bucket_path(&src_volume)?; - if !skip_access_checks(&src_volume) { + let src_volume_path = self.get_bucket_path(src_volume)?; + if !skip_access_checks(src_volume) { check_volume_exists(&src_volume_path).await?; } - if !skip_access_checks(&dst_volume) { - let vol_path = self.get_bucket_path(&dst_volume)?; + if !skip_access_checks(dst_volume) { + let vol_path = self.get_bucket_path(dst_volume)?; check_volume_exists(&vol_path).await?; } - let srcp = self.get_object_path(&src_volume, &src_path)?; - let dstp = self.get_object_path(&dst_volume, &dst_path)?; + let srcp = self.get_object_path(src_volume, src_path)?; + let dstp = self.get_object_path(dst_volume, dst_path)?; let src_is_dir = srcp.is_dir(); let dst_is_dir = dstp.is_dir(); @@ -477,7 +472,7 @@ impl DiskAPI for LocalDisk { } // async fn append_file(&self, volume: &str, path: &str, mut r: DuplexStream) -> Result { async fn append_file(&self, volume: &str, path: &str) -> Result { - let p = self.get_object_path(&volume, &path)?; + let p = self.get_object_path(volume, path)?; if let Some(dir_path) = p.parent() { fs::create_dir_all(&dir_path).await?; @@ -505,7 +500,7 @@ impl DiskAPI for LocalDisk { // Ok(()) } async fn read_file(&self, volume: &str, path: &str) -> Result { - let p = self.get_object_path(&volume, &path)?; + let p = self.get_object_path(volume, path)?; debug!("read_file {:?}", &p); let file = File::options().read(true).open(&p).await?; @@ -523,7 +518,7 @@ impl DiskAPI for LocalDisk { // Ok((buffer, bytes_read)) } async fn list_dir(&self, origvolume: &str, volume: &str, dir_path: &str, count: usize) -> Result> { - let p = self.get_bucket_path(&volume)?; + let p = self.get_bucket_path(volume)?; let mut entries = fs::read_dir(&p).await?; @@ -559,20 +554,18 @@ impl DiskAPI for LocalDisk { dst_volume: &str, dst_path: &str, ) -> Result { - let src_volume_path = self.get_bucket_path(&src_volume)?; - if !skip_access_checks(&src_volume) { + let src_volume_path = self.get_bucket_path(src_volume)?; + if !skip_access_checks(src_volume) { check_volume_exists(&src_volume_path).await?; } - let dst_volume_path = self.get_bucket_path(&dst_volume)?; - if !skip_access_checks(&dst_volume) { + let dst_volume_path = self.get_bucket_path(dst_volume)?; + if !skip_access_checks(dst_volume) { check_volume_exists(&dst_volume_path).await?; } - let src_file_path = - self.get_object_path(&src_volume, format!("{}/{}", &src_path, super::STORAGE_FORMAT_FILE).as_str())?; - let dst_file_path = - self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, super::STORAGE_FORMAT_FILE).as_str())?; + let src_file_path = self.get_object_path(src_volume, format!("{}/{}", &src_path, super::STORAGE_FORMAT_FILE).as_str())?; + let dst_file_path = self.get_object_path(dst_volume, format!("{}/{}", &dst_path, super::STORAGE_FORMAT_FILE).as_str())?; let (src_data_path, dst_data_path) = { let mut data_dir = String::new(); @@ -582,10 +575,10 @@ impl DiskAPI for LocalDisk { if !data_dir.is_empty() { let src_data_path = self.get_object_path( - &src_volume, + src_volume, utils::path::retain_slash(format!("{}/{}", &src_path, data_dir).as_str()).as_str(), )?; - let dst_data_path = self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, data_dir).as_str())?; + let dst_data_path = self.get_object_path(dst_volume, format!("{}/{}", &dst_path, data_dir).as_str())?; (src_data_path, dst_data_path) } else { @@ -652,7 +645,7 @@ impl DiskAPI for LocalDisk { Ok(()) } async fn make_volume(&self, volume: &str) -> Result<()> { - let p = self.get_bucket_path(&volume)?; + let p = self.get_bucket_path(volume)?; match File::open(&p).await { Ok(_) => (), Err(e) => match e.kind() { @@ -706,7 +699,7 @@ impl DiskAPI for LocalDisk { } async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> { - let p = self.get_object_path(&volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str())?; + let p = self.get_object_path(volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str())?; warn!("write_metadata {:?} {:?}", &p, &fi); @@ -812,7 +805,7 @@ impl DiskAPI for LocalDisk { } async fn delete_volume(&self, volume: &str) -> Result<()> { - let p = self.get_bucket_path(&volume)?; + let p = self.get_bucket_path(volume)?; fs::remove_dir_all(&p).await?; @@ -829,7 +822,7 @@ mod test { async fn test_skip_access_checks() { // let arr = Vec::new(); - let vols = vec![ + let vols = [ super::super::RUSTFS_META_TMP_DELETED_BUCKET, super::super::RUSTFS_META_TMP_BUCKET, super::super::RUSTFS_META_MULTIPART_BUCKET, diff --git a/ecstore/src/disk_api.rs b/ecstore/src/disk_api.rs index 5d59a199..fcc0cb1a 100644 --- a/ecstore/src/disk_api.rs +++ b/ecstore/src/disk_api.rs @@ -4,13 +4,13 @@ use bytes::Bytes; use time::OffsetDateTime; use tokio::{ fs::File, - io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite}, + io::{AsyncReadExt, AsyncSeekExt, AsyncWrite}, }; use uuid::Uuid; use crate::{ erasure::ReadAt, - error::{Error, Result}, + error::Result, store_api::{FileInfo, RawFileInfo}, }; diff --git a/ecstore/src/disks_layout.rs b/ecstore/src/disks_layout.rs index 88364c00..359e3304 100644 --- a/ecstore/src/disks_layout.rs +++ b/ecstore/src/disks_layout.rs @@ -1,5 +1,5 @@ -use super::ellipses::*; use super::error::{Error, Result}; +use crate::utils::ellipses::*; use serde::Deserialize; use std::collections::HashSet; @@ -163,7 +163,7 @@ fn get_all_sets>(is_ellipses: bool, args: &[T]) -> Result, + pub _arg_patterns: Vec, pub endpoints: Vec, pub set_indexes: Vec>, } @@ -190,7 +190,7 @@ impl> TryFrom<&[T]> for EndpointSet { Ok(EndpointSet { set_indexes, - arg_patterns, + _arg_patterns: arg_patterns, endpoints, }) } @@ -357,11 +357,11 @@ fn get_total_sizes(arg_patterns: &[ArgPattern]) -> Vec { mod test { use super::*; - use crate::ellipses; + use crate::utils::ellipses; impl PartialEq for EndpointSet { fn eq(&self, other: &Self) -> bool { - self.arg_patterns == other.arg_patterns && self.set_indexes == other.set_indexes + self._arg_patterns == other._arg_patterns && self.set_indexes == other.set_indexes } } @@ -621,7 +621,7 @@ mod test { num: 6, arg: "{1...27}", es: EndpointSet { - arg_patterns: vec![ArgPattern::new(vec![Pattern { + _arg_patterns: vec![ArgPattern::new(vec![Pattern { seq: get_sequences(1, 27, 0), ..Default::default() }])], @@ -634,7 +634,7 @@ mod test { num: 7, arg: "/export/set{1...64}", es: EndpointSet { - arg_patterns: vec![ArgPattern::new(vec![Pattern { + _arg_patterns: vec![ArgPattern::new(vec![Pattern { seq: get_sequences(1, 64, 0), prefix: "/export/set".to_owned(), ..Default::default() @@ -649,7 +649,7 @@ mod test { num: 8, arg: "http://rustfs{2...3}/export/set{1...64}", es: EndpointSet { - arg_patterns: vec![ArgPattern::new(vec![ + _arg_patterns: vec![ArgPattern::new(vec![ Pattern { seq: get_sequences(1, 64, 0), ..Default::default() @@ -670,7 +670,7 @@ mod test { num: 9, arg: "http://rustfs{1...64}.mydomain.net/data", es: EndpointSet { - arg_patterns: vec![ArgPattern::new(vec![Pattern { + _arg_patterns: vec![ArgPattern::new(vec![Pattern { seq: get_sequences(1, 64, 0), prefix: "http://rustfs".to_owned(), suffix: ".mydomain.net/data".to_owned(), @@ -684,7 +684,7 @@ mod test { num: 10, arg: "http://rack{1...4}.mydomain.rustfs{1...16}/data", es: EndpointSet { - arg_patterns: vec![ArgPattern::new(vec![ + _arg_patterns: vec![ArgPattern::new(vec![ Pattern { seq: get_sequences(1, 16, 0), suffix: "/data".to_owned(), @@ -706,7 +706,7 @@ mod test { num: 11, arg: "http://rustfs{0...15}.mydomain.net/data{0...1}", es: EndpointSet { - arg_patterns: vec![ArgPattern::new(vec![ + _arg_patterns: vec![ArgPattern::new(vec![ Pattern { seq: get_sequences(0, 1, 0), ..Default::default() @@ -727,7 +727,7 @@ mod test { num: 12, arg: "http://server1/data{1...32}", es: EndpointSet { - arg_patterns: vec![ArgPattern::new(vec![Pattern { + _arg_patterns: vec![ArgPattern::new(vec![Pattern { seq: get_sequences(1, 32, 0), prefix: "http://server1/data".to_owned(), ..Default::default() @@ -742,7 +742,7 @@ mod test { num: 13, arg: "http://server1/data{01...32}", es: EndpointSet { - arg_patterns: vec![ArgPattern::new(vec![Pattern { + _arg_patterns: vec![ArgPattern::new(vec![Pattern { seq: get_sequences(1, 32, 2), prefix: "http://server1/data".to_owned(), ..Default::default() @@ -757,7 +757,7 @@ mod test { num: 14, arg: "http://rustfs{2...3}/export/set{1...64}/test{1...2}", es: EndpointSet { - arg_patterns: vec![ArgPattern::new(vec![ + _arg_patterns: vec![ArgPattern::new(vec![ Pattern { seq: get_sequences(1, 2, 0), ..Default::default() @@ -783,7 +783,7 @@ mod test { num: 15, arg: "/export{1...10}/disk{1...10}", es: EndpointSet { - arg_patterns: vec![ArgPattern::new(vec![ + _arg_patterns: vec![ArgPattern::new(vec![ Pattern { seq: get_sequences(1, 10, 0), ..Default::default() diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 41e03919..0a18e50c 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -3,7 +3,6 @@ use bytes::Bytes; use futures::future::join_all; use futures::{Stream, StreamExt}; use reed_solomon_erasure::galois_8::ReedSolomon; -use s3s::dto::StreamingBlob; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::io::DuplexStream; diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 671da0a6..57ffa1f7 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -3,7 +3,6 @@ mod chunk_stream; pub mod disk; pub mod disk_api; mod disks_layout; -mod ellipses; mod endpoint; mod erasure; pub mod error; diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 8188bde7..d1e6eb6b 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -16,7 +16,6 @@ use crate::{ use http::HeaderMap; use s3s::{dto::StreamingBlob, Body}; use std::collections::HashMap; -use tracing::debug; use uuid::Uuid; #[derive(Debug)] diff --git a/ecstore/src/store_init.rs b/ecstore/src/store_init.rs index c2eac586..f7d16569 100644 --- a/ecstore/src/store_init.rs +++ b/ecstore/src/store_init.rs @@ -27,7 +27,7 @@ pub async fn do_init_format_file( check_format_erasure_values(&formats, set_drive_count)?; - if first_disk && should_init_erasure_disks(&errs) { + if first_disk && DiskError::should_init_erasure_disks(&errs) { // UnformattedDisk, not format file create // new format and save let fms = init_format_files(&disks, set_count, set_drive_count, deployment_id); @@ -115,10 +115,6 @@ fn get_format_file_in_quorum(formats: &Vec>) -> Result>) -> bool { - DiskError::count_errs(errs, &DiskError::UnformattedDisk) == errs.len() -} - fn check_format_erasure_values( formats: &Vec>, // disks: &Vec>, diff --git a/ecstore/src/ellipses.rs b/ecstore/src/utils/ellipses.rs similarity index 99% rename from ecstore/src/ellipses.rs rename to ecstore/src/utils/ellipses.rs index fdb647ea..9badb80c 100644 --- a/ecstore/src/ellipses.rs +++ b/ecstore/src/utils/ellipses.rs @@ -1,6 +1,5 @@ +use crate::error::{Error, Result}; use lazy_static::*; - -use super::error::{Error, Result}; use regex::Regex; lazy_static! { diff --git a/ecstore/src/utils/mod.rs b/ecstore/src/utils/mod.rs index 20d8332a..d0a295df 100644 --- a/ecstore/src/utils/mod.rs +++ b/ecstore/src/utils/mod.rs @@ -1,4 +1,5 @@ pub mod crypto; +pub mod ellipses; pub mod hash; pub mod net; pub mod path;