diff --git a/Cargo.lock b/Cargo.lock index 09ba10ff..5de2d546 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -311,7 +311,6 @@ dependencies = [ name = "ecstore" version = "0.1.0" dependencies = [ - "anyhow", "async-trait", "base64-simd", "bytes", diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index 29dc282b..2c97f1a2 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -16,7 +16,6 @@ futures.workspace = true async-trait.workspace = true tracing.workspace = true serde.workspace = true -anyhow.workspace = true time.workspace = true serde_json.workspace = true url = "2.5.2" diff --git a/ecstore/src/bucket_meta.rs b/ecstore/src/bucket_meta.rs index 1a9a1313..f089de50 100644 --- a/ecstore/src/bucket_meta.rs +++ b/ecstore/src/bucket_meta.rs @@ -2,7 +2,7 @@ use rmp_serde::Serializer; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -use anyhow::Result; +use crate::error::Result; use crate::disk::BUCKET_META_PREFIX; @@ -40,12 +40,7 @@ impl BucketMetadata { } pub fn save_file_path(&self) -> String { - format!( - "{}/{}/{}", - BUCKET_META_PREFIX, - self.name.as_str(), - BUCKET_METADATA_FILE - ) + format!("{}/{}/{}", BUCKET_META_PREFIX, self.name.as_str(), BUCKET_METADATA_FILE) // PathBuf::new() // .join(BUCKET_META_PREFIX) // .join(self.name.as_str()) diff --git a/ecstore/src/disk/error.rs b/ecstore/src/disk/error.rs new file mode 100644 index 00000000..d475332d --- /dev/null +++ b/ecstore/src/disk/error.rs @@ -0,0 +1,99 @@ +use crate::error::{Error, Result, StdError}; + +#[derive(Debug, thiserror::Error)] +pub enum DiskError { + #[error("file not found")] + FileNotFound, + + #[error("file version not found")] + FileVersionNotFound, + + #[error("disk not found")] + DiskNotFound, + + #[error("disk access denied")] + FileAccessDenied, + + #[error("InconsistentDisk")] + InconsistentDisk, + + #[error("volume already exists")] + VolumeExists, + + #[error("unformatted disk error")] + UnformattedDisk, + + #[error("unsupport disk")] + UnsupportedDisk, + + #[error("disk not a dir")] + DiskNotDir, + + #[error("volume not found")] + VolumeNotFound, +} + +impl DiskError { + pub fn check_disk_fatal_errs(errs: &Vec>) -> Result<()> { + if Self::count_errs(errs, &DiskError::UnsupportedDisk) == errs.len() { + return Err(DiskError::UnsupportedDisk.into()); + } + + if Self::count_errs(errs, &DiskError::FileAccessDenied) == errs.len() { + return Err(DiskError::FileAccessDenied.into()); + } + + if Self::count_errs(errs, &DiskError::DiskNotDir) == errs.len() { + return Err(DiskError::DiskNotDir.into()); + } + + Ok(()) + } + + pub fn count_errs(errs: &Vec>, err: &DiskError) -> usize { + return errs + .iter() + .filter(|&e| { + if e.is_some() { + let e = e.as_ref().unwrap(); + let cast = e.downcast_ref::(); + if cast.is_some() { + let cast = cast.unwrap(); + return cast == err; + } + } + false + }) + .count(); + } + + pub fn quorum_unformatted_disks(errs: &Vec>) -> bool { + Self::count_errs(errs, &DiskError::UnformattedDisk) >= (errs.len() / 2) + 1 + } + + pub fn is(&self, err: &Error) -> bool { + if let Some(e) = err.downcast_ref::() { + e == self + } else { + false + } + } + + pub fn is_err(err: &T, disk_err: &DiskError) -> bool { + return false; + // let cast = err. + // if cast.is_none() { + // return false; + // } + + // let e = cast.unwrap(); + + // e == disk_err + } +} + +impl PartialEq for DiskError { + fn eq(&self, other: &Self) -> bool { + core::mem::discriminant(self) == core::mem::discriminant(other) + } +} diff --git a/ecstore/src/format.rs b/ecstore/src/disk/format.rs similarity index 97% rename from ecstore/src/format.rs rename to ecstore/src/disk/format.rs index b384097b..0c37e086 100644 --- a/ecstore/src/format.rs +++ b/ecstore/src/disk/format.rs @@ -1,10 +1,9 @@ -use anyhow::{Error, Result}; +use super::error::DiskError; +use crate::error::{Error, Result}; use serde::{Deserialize, Serialize}; use serde_json::Error as JsonError; use uuid::Uuid; -use crate::disk_api::DiskError; - #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub enum FormatMetaVersion { #[serde(rename = "1")] @@ -167,7 +166,7 @@ impl FormatV3 { /// - j'th position is the disk index in the current set pub fn find_disk_index_by_disk_id(&self, disk_id: Uuid) -> Result<(usize, usize)> { if disk_id == Uuid::nil() { - return Err(Error::new(DiskError::DiskNotFound)); + return Err(Error::from(DiskError::DiskNotFound)); } if disk_id == Uuid::max() { return Err(Error::msg("disk offline")); diff --git a/ecstore/src/disk.rs b/ecstore/src/disk/local.rs similarity index 88% rename from ecstore/src/disk.rs rename to ecstore/src/disk/local.rs index c793d899..6c9822a8 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk/local.rs @@ -1,15 +1,13 @@ -use std::{ - fs::Metadata, - io::{self, SeekFrom}, - os::unix::ffi::OsStringExt, - path::{Path, PathBuf}, - sync::Arc, -}; - -use anyhow::{Error, Result}; +use super::{error::DiskError, format::FormatV3}; use bytes::Bytes; use futures::future::join_all; use path_absolutize::Absolutize; +use std::{ + fs::Metadata, + io::{self, SeekFrom}, + path::{Path, PathBuf}, + sync::Arc, +}; use time::OffsetDateTime; use tokio::io::{AsyncReadExt, ErrorKind}; use tokio::{ @@ -21,27 +19,19 @@ use uuid::Uuid; use crate::{ disk_api::{ - DeleteOptions, DiskAPI, DiskError, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, - RenameDataResp, VolumeInfo, + DeleteOptions, DiskAPI, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, + VolumeInfo, }, endpoint::{Endpoint, Endpoints}, erasure::ReadAt, + error::{Error, Result}, file_meta::FileMeta, - format::FormatV3, store_api::{FileInfo, RawFileInfo}, utils, }; pub type DiskStore = Arc>; -pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys"; -pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart"; -pub const RUSTFS_META_TMP_BUCKET: &str = ".rustfs.sys/tmp"; -pub const RUSTFS_META_TMP_DELETED_BUCKET: &str = ".rustfs.sys/tmp/.trash"; -pub const BUCKET_META_PREFIX: &str = "buckets"; -pub const FORMAT_CONFIG_FILE: &str = "format.json"; -const STORAGE_FORMAT_FILE: &str = "xl.meta"; - pub struct DiskOption { pub cleanup: bool, pub health_check: bool, @@ -108,8 +98,8 @@ impl LocalDisk { // TODO: 删除tmp数据 } - let format_path = Path::new(RUSTFS_META_BUCKET) - .join(Path::new(FORMAT_CONFIG_FILE)) + let format_path = Path::new(super::RUSTFS_META_BUCKET) + .join(Path::new(super::FORMAT_CONFIG_FILE)) .absolutize_virtually(&root)? .into_owned(); @@ -125,7 +115,7 @@ impl LocalDisk { let (set_idx, disk_idx) = fm.find_disk_index_by_disk_id(fm.erasure.this)?; if Some(set_idx) != ep.set_idx || Some(disk_idx) != ep.disk_idx { - return Err(Error::new(DiskError::InconsistentDisk)); + return Err(Error::from(DiskError::InconsistentDisk)); } id = fm.erasure.this; @@ -149,10 +139,10 @@ impl LocalDisk { } async fn make_meta_volumes(&self) -> Result<()> { - let buckets = format!("{}/{}", RUSTFS_META_BUCKET, BUCKET_META_PREFIX); - let multipart = format!("{}/{}", RUSTFS_META_BUCKET, "multipart"); - let config = format!("{}/{}", RUSTFS_META_BUCKET, "config"); - let tmp = format!("{}/{}", RUSTFS_META_BUCKET, "tmp"); + let buckets = format!("{}/{}", super::RUSTFS_META_BUCKET, super::BUCKET_META_PREFIX); + let multipart = format!("{}/{}", super::RUSTFS_META_BUCKET, "multipart"); + let config = format!("{}/{}", super::RUSTFS_META_BUCKET, "config"); + let tmp = format!("{}/{}", super::RUSTFS_META_BUCKET, "tmp"); let defaults = vec![buckets.as_str(), multipart.as_str(), config.as_str(), tmp.as_str()]; self.make_volumes(defaults).await @@ -217,7 +207,7 @@ impl LocalDisk { } if recursive { - let trash_path = self.get_object_path(RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?; + let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?; // fs::create_dir_all(&trash_path).await?; fs::rename(&delete_path, &trash_path).await.map_err(|err| { // 使用文件路径自定义错误信息 @@ -253,7 +243,7 @@ impl LocalDisk { path: impl AsRef, read_data: bool, ) -> Result<(Vec, OffsetDateTime)> { - let meta_path = path.as_ref().join(Path::new(STORAGE_FORMAT_FILE)); + let meta_path = path.as_ref().join(Path::new(super::STORAGE_FORMAT_FILE)); if read_data { self.read_all_data(bucket, volume_dir, meta_path).await } else { @@ -299,7 +289,7 @@ pub async fn read_file_exists(path: impl AsRef) -> Result<(Vec, Option let (data, meta) = match read_file_all(&p).await { Ok((data, meta)) => (data, Some(meta)), Err(e) => { - if DiskError::is_err(&e, &DiskError::FileNotFound) { + if DiskError::FileNotFound.is(&e) { (Vec::new(), None) } else { return Err(e); @@ -334,9 +324,9 @@ pub async fn read_file_all(path: impl AsRef) -> Result<(Vec, Metadata) pub async fn read_file_metadata(p: impl AsRef) -> Result { let meta = fs::metadata(&p).await.map_err(|e| match e.kind() { - ErrorKind::NotFound => Error::new(DiskError::FileNotFound), - ErrorKind::PermissionDenied => Error::new(DiskError::FileAccessDenied), - _ => Error::new(e), + ErrorKind::NotFound => Error::from(DiskError::FileNotFound), + ErrorKind::PermissionDenied => Error::from(DiskError::FileAccessDenied), + _ => Error::from(e), })?; Ok(meta) @@ -344,19 +334,19 @@ pub async fn read_file_metadata(p: impl AsRef) -> Result { pub async fn check_volume_exists(p: impl AsRef) -> Result<()> { fs::metadata(&p).await.map_err(|e| match e.kind() { - ErrorKind::NotFound => Error::new(DiskError::VolumeNotFound), - ErrorKind::PermissionDenied => Error::new(DiskError::FileAccessDenied), - _ => Error::new(e), + ErrorKind::NotFound => Error::from(DiskError::VolumeNotFound), + ErrorKind::PermissionDenied => Error::from(DiskError::FileAccessDenied), + _ => Error::from(e), })?; Ok(()) } fn skip_access_checks(p: impl AsRef) -> bool { let vols = vec![ - RUSTFS_META_TMP_DELETED_BUCKET, - RUSTFS_META_TMP_BUCKET, - RUSTFS_META_MULTIPART_BUCKET, - RUSTFS_META_BUCKET, + super::RUSTFS_META_TMP_DELETED_BUCKET, + super::RUSTFS_META_TMP_BUCKET, + super::RUSTFS_META_MULTIPART_BUCKET, + super::RUSTFS_META_BUCKET, ]; for v in vols.iter() { @@ -435,7 +425,7 @@ impl DiskAPI for LocalDisk { let src_is_dir = srcp.is_dir(); let dst_is_dir = dstp.is_dir(); if !(src_is_dir && dst_is_dir || !src_is_dir && !dst_is_dir) { - return Err(Error::new(DiskError::FileAccessDenied)); + return Err(Error::from(DiskError::FileAccessDenied)); } // TODO: check path length @@ -541,16 +531,11 @@ impl DiskAPI for LocalDisk { while let Some(entry) = entries.next_entry().await? { if let Ok(metadata) = entry.metadata().await { - let vec = entry.file_name().into_vec(); - // if !metadata.is_dir() { // continue; // } - let name = match String::from_utf8(vec) { - Ok(s) => s, - Err(_) => return Err(Error::msg("Not supported utf8 file name on this platform")), - }; + let name = entry.file_name().to_string_lossy().to_string(); // let created = match metadata.created() { // Ok(md) => OffsetDateTime::from(md), @@ -584,8 +569,10 @@ impl DiskAPI for LocalDisk { check_volume_exists(&dst_volume_path).await?; } - let src_file_path = self.get_object_path(&src_volume, format!("{}/{}", &src_path, STORAGE_FORMAT_FILE).as_str())?; - let dst_file_path = self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, STORAGE_FORMAT_FILE).as_str())?; + let src_file_path = + self.get_object_path(&src_volume, format!("{}/{}", &src_path, super::STORAGE_FORMAT_FILE).as_str())?; + let dst_file_path = + self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, super::STORAGE_FORMAT_FILE).as_str())?; let (src_data_path, dst_data_path) = { let mut data_dir = String::new(); @@ -639,7 +626,7 @@ impl DiskAPI for LocalDisk { self.rename_all(&src_file_path, &dst_file_path, &skip_parent).await?; - if src_volume != RUSTFS_META_MULTIPART_BUCKET { + if src_volume != super::RUSTFS_META_MULTIPART_BUCKET { fs::remove_dir(&src_file_path.parent().unwrap()).await?; } else { self.delete_file(&src_volume_path, &PathBuf::from(src_file_path.parent().unwrap()), true, false) @@ -673,11 +660,11 @@ impl DiskAPI for LocalDisk { fs::create_dir_all(&p).await?; return Ok(()); } - _ => return Err(Error::new(e)), + _ => return Err(Error::from(e)), }, } - Err(Error::new(DiskError::VolumeExists)) + Err(Error::from(DiskError::VolumeExists)) } async fn list_volumes(&self) -> Result> { let mut entries = fs::read_dir(&self.root).await?; @@ -686,16 +673,11 @@ impl DiskAPI for LocalDisk { while let Some(entry) = entries.next_entry().await? { if let Ok(metadata) = entry.metadata().await { - let vec = entry.file_name().into_vec(); - // if !metadata.is_dir() { // continue; // } - let name = match String::from_utf8(vec) { - Ok(s) => s, - Err(_) => return Err(Error::msg("Not supported utf8 file name on this platform")), - }; + let name = entry.file_name().to_string_lossy().to_string(); let created = match metadata.created() { Ok(md) => OffsetDateTime::from(md), @@ -724,7 +706,7 @@ impl DiskAPI for LocalDisk { } async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> { - let p = self.get_object_path(&volume, format!("{}/{}", path, STORAGE_FORMAT_FILE).as_str())?; + let p = self.get_object_path(&volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str())?; warn!("write_metadata {:?} {:?}", &p, &fi); @@ -811,7 +793,7 @@ impl DiskAPI for LocalDisk { } } Err(e) => { - if !(DiskError::is_err(&e, &DiskError::FileNotFound) || DiskError::is_err(&e, &DiskError::VolumeNotFound)) { + if !(DiskError::FileNotFound.is(&e) || DiskError::VolumeNotFound.is(&e)) { res.exists = true; res.error = e.to_string(); } @@ -848,10 +830,10 @@ mod test { // let arr = Vec::new(); let vols = vec![ - RUSTFS_META_TMP_DELETED_BUCKET, - RUSTFS_META_TMP_BUCKET, - RUSTFS_META_MULTIPART_BUCKET, - RUSTFS_META_BUCKET, + super::super::RUSTFS_META_TMP_DELETED_BUCKET, + super::super::RUSTFS_META_TMP_BUCKET, + super::super::RUSTFS_META_MULTIPART_BUCKET, + super::super::RUSTFS_META_BUCKET, ]; let paths: Vec<_> = vols.iter().map(|v| Path::new(v).join("test")).collect(); @@ -876,7 +858,9 @@ mod test { let disk = LocalDisk::new(&ep, false).await.unwrap(); - let tmpp = disk.resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET)).unwrap(); + let tmpp = disk + .resolve_abs_path(Path::new(super::super::RUSTFS_META_TMP_DELETED_BUCKET)) + .unwrap(); println!("ppp :{:?}", &tmpp); @@ -904,7 +888,9 @@ mod test { let disk = LocalDisk::new(&ep, false).await.unwrap(); - let tmpp = disk.resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET)).unwrap(); + let tmpp = disk + .resolve_abs_path(Path::new(super::super::RUSTFS_META_TMP_DELETED_BUCKET)) + .unwrap(); println!("ppp :{:?}", &tmpp); diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs new file mode 100644 index 00000000..0106baf5 --- /dev/null +++ b/ecstore/src/disk/mod.rs @@ -0,0 +1,13 @@ +pub mod error; +pub mod format; +mod local; + +pub use local::{init_disks, DiskOption, DiskStore}; + +pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys"; +pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart"; +pub const RUSTFS_META_TMP_BUCKET: &str = ".rustfs.sys/tmp"; +pub const RUSTFS_META_TMP_DELETED_BUCKET: &str = ".rustfs.sys/tmp/.trash"; +pub const BUCKET_META_PREFIX: &str = "buckets"; +pub const FORMAT_CONFIG_FILE: &str = "format.json"; +const STORAGE_FORMAT_FILE: &str = "xl.meta"; diff --git a/ecstore/src/disk_api.rs b/ecstore/src/disk_api.rs index 01822872..5d59a199 100644 --- a/ecstore/src/disk_api.rs +++ b/ecstore/src/disk_api.rs @@ -1,6 +1,5 @@ use std::{fmt::Debug, io::SeekFrom, pin::Pin}; -use anyhow::{Error, Result}; use bytes::Bytes; use time::OffsetDateTime; use tokio::{ @@ -11,6 +10,7 @@ use uuid::Uuid; use crate::{ erasure::ReadAt, + error::{Error, Result}, store_api::{FileInfo, RawFileInfo}, }; @@ -175,119 +175,3 @@ impl ReadAt for FileReader { Ok((buffer, bytes_read)) } } - -#[derive(Debug, thiserror::Error)] -pub enum DiskError { - #[error("file not found")] - FileNotFound, - - #[error("file version not found")] - FileVersionNotFound, - - #[error("disk not found")] - DiskNotFound, - - #[error("disk access denied")] - FileAccessDenied, - - #[error("InconsistentDisk")] - InconsistentDisk, - - #[error("volume already exists")] - VolumeExists, - - #[error("unformatted disk error")] - UnformattedDisk, - - #[error("unsupport disk")] - UnsupportedDisk, - - #[error("disk not a dir")] - DiskNotDir, - - #[error("volume not found")] - VolumeNotFound, -} - -impl DiskError { - pub fn check_disk_fatal_errs(errs: &Vec>) -> Result<()> { - if Self::count_errs(errs, &DiskError::UnsupportedDisk) == errs.len() { - return Err(Error::new(DiskError::UnsupportedDisk)); - } - - // if count_errs(errs, &DiskError::DiskAccessDenied) == errs.len() { - // return Err(Error::new(DiskError::DiskAccessDenied)); - // } - - if Self::count_errs(errs, &DiskError::FileAccessDenied) == errs.len() { - return Err(Error::new(DiskError::FileAccessDenied)); - } - - // if count_errs(errs, &DiskError::FaultyDisk) == errs.len() { - // return Err(Error::new(DiskError::FaultyDisk)); - // } - - if Self::count_errs(errs, &DiskError::DiskNotDir) == errs.len() { - return Err(Error::new(DiskError::DiskNotDir)); - } - - // if count_errs(errs, &DiskError::XLBackend) == errs.len() { - // return Err(Error::new(DiskError::XLBackend)); - // } - Ok(()) - } - pub fn count_errs(errs: &Vec>, err: &DiskError) -> usize { - return errs - .iter() - .filter(|&e| { - if e.is_some() { - let e = e.as_ref().unwrap(); - let cast = e.downcast_ref::(); - if cast.is_some() { - let cast = cast.unwrap(); - return cast == err; - } - } - false - }) - .count(); - } - - pub fn quorum_unformatted_disks(errs: &Vec>) -> bool { - Self::count_errs(errs, &DiskError::UnformattedDisk) >= (errs.len() / 2) + 1 - } - - pub fn is_err(err: &Error, disk_err: &DiskError) -> bool { - let cast = err.downcast_ref::(); - if cast.is_none() { - return false; - } - - let e = cast.unwrap(); - - e == disk_err - } - - // pub fn match_err(err: Error, matchs: Vec) -> bool { - // let cast = err.downcast_ref::(); - // if cast.is_none() { - // return false; - // } - - // let e = cast.unwrap(); - - // for i in matchs.iter() { - // if e == i { - // return true; - // } - // } - - // return false; - // } -} - -impl PartialEq for DiskError { - fn eq(&self, other: &Self) -> bool { - core::mem::discriminant(self) == core::mem::discriminant(other) - } -} diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index f3282293..41e03919 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -1,7 +1,4 @@ -use crate::error::StdError; -use anyhow::anyhow; -use anyhow::Error; -use anyhow::Result; +use crate::error::{Error, Result, StdError}; use bytes::Bytes; use futures::future::join_all; use futures::{Stream, StreamExt}; @@ -16,7 +13,7 @@ use tracing::warn; use uuid::Uuid; use crate::chunk_stream::ChunkedStream; -use crate::disk_api::DiskError; +use crate::disk::error::DiskError; use crate::disk_api::FileReader; pub struct Erasure { @@ -98,7 +95,7 @@ impl Erasure { // } // } } - Err(e) => return Err(anyhow!(e)), + Err(e) => return Err(Error::from_std_error(e)), } } diff --git a/ecstore/src/error.rs b/ecstore/src/error.rs index 92489bfd..24d2936b 100644 --- a/ecstore/src/error.rs +++ b/ecstore/src/error.rs @@ -14,9 +14,16 @@ impl Error { /// Create a new error from a `std::error::Error`. #[must_use] #[track_caller] - pub fn new(source: StdError) -> Self { + pub fn new(source: T) -> Self { + Self::from_std_error(source.into()) + } + + /// Create a new error from a `std::error::Error`. + #[must_use] + #[track_caller] + pub fn from_std_error(inner: StdError) -> Self { Self { - inner: source, + inner, span_trace: SpanTrace::capture(), } } @@ -25,7 +32,14 @@ impl Error { #[must_use] #[track_caller] pub fn from_string(s: impl Into) -> Self { - Self::new(s.into().into()) + Self::msg(s) + } + + /// Create a new error from a string. + #[must_use] + #[track_caller] + pub fn msg(s: impl Into) -> Self { + Self::from_std_error(s.into().into()) } /// Returns `true` if the inner type is the same as `T`. @@ -51,7 +65,7 @@ impl Error { impl From for Error { fn from(e: T) -> Self { - Self::new(e.into()) + Self::new(e) } } diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 67cd5f45..671da0a6 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -1,6 +1,6 @@ mod bucket_meta; mod chunk_stream; -mod disk; +pub mod disk; pub mod disk_api; mod disks_layout; mod ellipses; @@ -8,7 +8,6 @@ mod endpoint; mod erasure; pub mod error; mod file_meta; -mod format; mod peer; pub mod set_disk; mod sets; diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index aed57746..c56b4128 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -1,13 +1,13 @@ -use anyhow::{Error, Result}; use async_trait::async_trait; use futures::future::join_all; use std::{collections::HashMap, fmt::Debug, sync::Arc}; use tracing::warn; use crate::{ - disk::DiskStore, - disk_api::{DiskError, VolumeInfo}, + disk::{error::DiskError, DiskStore}, + disk_api::VolumeInfo, endpoint::{EndpointServerPools, Node}, + error::{Error, Result}, store_api::{BucketInfo, BucketOptions, MakeBucketOptions}, }; @@ -284,7 +284,7 @@ impl PeerS3Client for LocalPeerS3Client { match disk.make_volume(bucket).await { Ok(_) => Ok(()), Err(e) => { - if opts.force_create && DiskError::is_err(&e, &DiskError::VolumeExists) { + if opts.force_create && DiskError::VolumeExists.is(&e) { return Ok(()); } diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs index 9fbd7e40..d3aa9301 100644 --- a/ecstore/src/sets.rs +++ b/ecstore/src/sets.rs @@ -1,11 +1,11 @@ -use anyhow::Result; use http::HeaderMap; use uuid::Uuid; use crate::{ + disk::format::{DistributionAlgoVersion, FormatV3}, disk::DiskStore, endpoint::PoolEndpoints, - format::{DistributionAlgoVersion, FormatV3}, + error::Result, set_disk::SetDisks, store_api::{ BucketInfo, BucketOptions, CompletePart, FileInfo, GetObjectReader, HTTPRangeSpec, MakeBucketOptions, diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 5011fc1b..8188bde7 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -1,18 +1,10 @@ -use std::collections::HashMap; - -use anyhow::{Error, Result}; - -use http::HeaderMap; -use s3s::{dto::StreamingBlob, Body}; -use tracing::debug; -use uuid::Uuid; - use crate::{ bucket_meta::BucketMetadata, + disk::error::DiskError, disk::{self, DiskOption, DiskStore, RUSTFS_META_BUCKET}, - disk_api::DiskError, disks_layout::DisksLayout, endpoint::EndpointServerPools, + error::{Error, Result}, peer::{PeerS3Client, S3PeerSys}, sets::Sets, store_api::{ @@ -21,6 +13,11 @@ use crate::{ }, store_init, utils, }; +use http::HeaderMap; +use s3s::{dto::StreamingBlob, Body}; +use std::collections::HashMap; +use tracing::debug; +use uuid::Uuid; #[derive(Debug)] pub struct ECStore { @@ -34,12 +31,11 @@ pub struct ECStore { impl ECStore { pub async fn new(address: String, endpoints: Vec) -> Result { - let layouts = DisksLayout::try_from(endpoints.as_slice()).map_err(|v| Error::msg(v))?; + let layouts = DisksLayout::try_from(endpoints.as_slice())?; let mut deployment_id = None; - let (endpoint_pools, _) = - EndpointServerPools::create_server_endpoints(address.as_str(), &layouts).map_err(|v| Error::msg(v))?; + let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address.as_str(), &layouts)?; let mut pools = Vec::with_capacity(endpoint_pools.as_ref().len()); let mut disk_map = HashMap::with_capacity(endpoint_pools.as_ref().len()); diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 3bfda728..f407f321 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -1,5 +1,4 @@ -use anyhow::{Error, Result}; - +use crate::error::{Error, Result}; use http::HeaderMap; use rmp_serde::Serializer; use s3s::dto::StreamingBlob; diff --git a/ecstore/src/store_init.rs b/ecstore/src/store_init.rs index 0190c6a1..c2eac586 100644 --- a/ecstore/src/store_init.rs +++ b/ecstore/src/store_init.rs @@ -3,13 +3,12 @@ use tracing::warn; use uuid::Uuid; use crate::{ + disk::error::DiskError, + disk::format::{FormatErasureVersion, FormatMetaVersion, FormatV3}, disk::{DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET}, - disk_api::DiskError, - format::{FormatErasureVersion, FormatMetaVersion, FormatV3}, + error::{Error, Result}, }; -use anyhow::{Error, Result}; - use std::{ collections::{hash_map::Entry, HashMap}, fmt::Debug, diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index b446339f..916a2ab3 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -1,8 +1,8 @@ mod config; mod storage; -use anyhow::Result; use clap::Parser; +use ecstore::error::Result; use hyper_util::{ rt::{TokioExecutor, TokioIo}, server::conn::auto::Builder as ConnBuilder, @@ -38,9 +38,7 @@ async fn run(opt: config::Opt) -> Result<()> { debug!("opt: {:?}", &opt); // Setup S3 service let service = { - let mut b = S3ServiceBuilder::new( - storage::ecfs::FS::new(opt.address.clone(), opt.volumes.clone()).await?, - ); + let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(opt.address.clone(), opt.volumes.clone()).await?); // Enable authentication if let (Some(ak), Some(sk)) = (opt.access_key, opt.secret_key) { diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 4680a330..ff9df3e4 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use ecstore::disk_api::DiskError; +use ecstore::disk::error::DiskError; use ecstore::store_api::BucketOptions; use ecstore::store_api::CompletePart; use ecstore::store_api::HTTPRangeSpec; @@ -23,7 +23,7 @@ use std::str::FromStr; use time::OffsetDateTime; use transform_stream::AsyncTryStream; -use anyhow::Result; +use ecstore::error::Result; use ecstore::store::ECStore; use tracing::debug; @@ -112,7 +112,7 @@ impl S3 for FS { let input = req.input; if let Err(e) = self.store.get_bucket_info(&input.bucket, &BucketOptions {}).await { - if DiskError::is_err(&e, &DiskError::VolumeNotFound) { + if DiskError::VolumeNotFound.is(&e) { return Err(s3_error!(NoSuchBucket)); } else { return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e))); @@ -179,7 +179,7 @@ impl S3 for FS { let input = req.input; if let Err(e) = self.store.get_bucket_info(&input.bucket, &BucketOptions {}).await { - if DiskError::is_err(&e, &DiskError::VolumeNotFound) { + if DiskError::VolumeNotFound.is(&e) { return Err(s3_error!(NoSuchBucket)); } else { return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));