mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
fix: 去除anyhow提供的错误处理,统一使用自定义的Error和Result
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -311,7 +311,6 @@ dependencies = [
|
||||
name = "ecstore"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"base64-simd",
|
||||
"bytes",
|
||||
|
||||
@@ -16,7 +16,6 @@ futures.workspace = true
|
||||
async-trait.workspace = true
|
||||
tracing.workspace = true
|
||||
serde.workspace = true
|
||||
anyhow.workspace = true
|
||||
time.workspace = true
|
||||
serde_json.workspace = true
|
||||
url = "2.5.2"
|
||||
|
||||
@@ -2,7 +2,7 @@ use rmp_serde::Serializer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use anyhow::Result;
|
||||
use crate::error::Result;
|
||||
|
||||
use crate::disk::BUCKET_META_PREFIX;
|
||||
|
||||
@@ -40,12 +40,7 @@ impl BucketMetadata {
|
||||
}
|
||||
|
||||
pub fn save_file_path(&self) -> String {
|
||||
format!(
|
||||
"{}/{}/{}",
|
||||
BUCKET_META_PREFIX,
|
||||
self.name.as_str(),
|
||||
BUCKET_METADATA_FILE
|
||||
)
|
||||
format!("{}/{}/{}", BUCKET_META_PREFIX, self.name.as_str(), BUCKET_METADATA_FILE)
|
||||
// PathBuf::new()
|
||||
// .join(BUCKET_META_PREFIX)
|
||||
// .join(self.name.as_str())
|
||||
|
||||
99
ecstore/src/disk/error.rs
Normal file
99
ecstore/src/disk/error.rs
Normal file
@@ -0,0 +1,99 @@
|
||||
use crate::error::{Error, Result, StdError};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DiskError {
|
||||
#[error("file not found")]
|
||||
FileNotFound,
|
||||
|
||||
#[error("file version not found")]
|
||||
FileVersionNotFound,
|
||||
|
||||
#[error("disk not found")]
|
||||
DiskNotFound,
|
||||
|
||||
#[error("disk access denied")]
|
||||
FileAccessDenied,
|
||||
|
||||
#[error("InconsistentDisk")]
|
||||
InconsistentDisk,
|
||||
|
||||
#[error("volume already exists")]
|
||||
VolumeExists,
|
||||
|
||||
#[error("unformatted disk error")]
|
||||
UnformattedDisk,
|
||||
|
||||
#[error("unsupport disk")]
|
||||
UnsupportedDisk,
|
||||
|
||||
#[error("disk not a dir")]
|
||||
DiskNotDir,
|
||||
|
||||
#[error("volume not found")]
|
||||
VolumeNotFound,
|
||||
}
|
||||
|
||||
impl DiskError {
|
||||
pub fn check_disk_fatal_errs(errs: &Vec<Option<Error>>) -> Result<()> {
|
||||
if Self::count_errs(errs, &DiskError::UnsupportedDisk) == errs.len() {
|
||||
return Err(DiskError::UnsupportedDisk.into());
|
||||
}
|
||||
|
||||
if Self::count_errs(errs, &DiskError::FileAccessDenied) == errs.len() {
|
||||
return Err(DiskError::FileAccessDenied.into());
|
||||
}
|
||||
|
||||
if Self::count_errs(errs, &DiskError::DiskNotDir) == errs.len() {
|
||||
return Err(DiskError::DiskNotDir.into());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn count_errs(errs: &Vec<Option<Error>>, err: &DiskError) -> usize {
|
||||
return errs
|
||||
.iter()
|
||||
.filter(|&e| {
|
||||
if e.is_some() {
|
||||
let e = e.as_ref().unwrap();
|
||||
let cast = e.downcast_ref::<DiskError>();
|
||||
if cast.is_some() {
|
||||
let cast = cast.unwrap();
|
||||
return cast == err;
|
||||
}
|
||||
}
|
||||
false
|
||||
})
|
||||
.count();
|
||||
}
|
||||
|
||||
pub fn quorum_unformatted_disks(errs: &Vec<Option<Error>>) -> bool {
|
||||
Self::count_errs(errs, &DiskError::UnformattedDisk) >= (errs.len() / 2) + 1
|
||||
}
|
||||
|
||||
pub fn is(&self, err: &Error) -> bool {
|
||||
if let Some(e) = err.downcast_ref::<DiskError>() {
|
||||
e == self
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_err<T: std::error::Error + ?Sized>(err: &T, disk_err: &DiskError) -> bool {
|
||||
return false;
|
||||
// let cast = err.
|
||||
// if cast.is_none() {
|
||||
// return false;
|
||||
// }
|
||||
|
||||
// let e = cast.unwrap();
|
||||
|
||||
// e == disk_err
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for DiskError {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
core::mem::discriminant(self) == core::mem::discriminant(other)
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,9 @@
|
||||
use anyhow::{Error, Result};
|
||||
use super::error::DiskError;
|
||||
use crate::error::{Error, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Error as JsonError;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::disk_api::DiskError;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
pub enum FormatMetaVersion {
|
||||
#[serde(rename = "1")]
|
||||
@@ -167,7 +166,7 @@ impl FormatV3 {
|
||||
/// - j'th position is the disk index in the current set
|
||||
pub fn find_disk_index_by_disk_id(&self, disk_id: Uuid) -> Result<(usize, usize)> {
|
||||
if disk_id == Uuid::nil() {
|
||||
return Err(Error::new(DiskError::DiskNotFound));
|
||||
return Err(Error::from(DiskError::DiskNotFound));
|
||||
}
|
||||
if disk_id == Uuid::max() {
|
||||
return Err(Error::msg("disk offline"));
|
||||
@@ -1,15 +1,13 @@
|
||||
use std::{
|
||||
fs::Metadata,
|
||||
io::{self, SeekFrom},
|
||||
os::unix::ffi::OsStringExt,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
use super::{error::DiskError, format::FormatV3};
|
||||
use bytes::Bytes;
|
||||
use futures::future::join_all;
|
||||
use path_absolutize::Absolutize;
|
||||
use std::{
|
||||
fs::Metadata,
|
||||
io::{self, SeekFrom},
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
use time::OffsetDateTime;
|
||||
use tokio::io::{AsyncReadExt, ErrorKind};
|
||||
use tokio::{
|
||||
@@ -21,27 +19,19 @@ use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
disk_api::{
|
||||
DeleteOptions, DiskAPI, DiskError, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions,
|
||||
RenameDataResp, VolumeInfo,
|
||||
DeleteOptions, DiskAPI, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp,
|
||||
VolumeInfo,
|
||||
},
|
||||
endpoint::{Endpoint, Endpoints},
|
||||
erasure::ReadAt,
|
||||
error::{Error, Result},
|
||||
file_meta::FileMeta,
|
||||
format::FormatV3,
|
||||
store_api::{FileInfo, RawFileInfo},
|
||||
utils,
|
||||
};
|
||||
|
||||
pub type DiskStore = Arc<Box<dyn DiskAPI>>;
|
||||
|
||||
pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys";
|
||||
pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart";
|
||||
pub const RUSTFS_META_TMP_BUCKET: &str = ".rustfs.sys/tmp";
|
||||
pub const RUSTFS_META_TMP_DELETED_BUCKET: &str = ".rustfs.sys/tmp/.trash";
|
||||
pub const BUCKET_META_PREFIX: &str = "buckets";
|
||||
pub const FORMAT_CONFIG_FILE: &str = "format.json";
|
||||
const STORAGE_FORMAT_FILE: &str = "xl.meta";
|
||||
|
||||
pub struct DiskOption {
|
||||
pub cleanup: bool,
|
||||
pub health_check: bool,
|
||||
@@ -108,8 +98,8 @@ impl LocalDisk {
|
||||
// TODO: 删除tmp数据
|
||||
}
|
||||
|
||||
let format_path = Path::new(RUSTFS_META_BUCKET)
|
||||
.join(Path::new(FORMAT_CONFIG_FILE))
|
||||
let format_path = Path::new(super::RUSTFS_META_BUCKET)
|
||||
.join(Path::new(super::FORMAT_CONFIG_FILE))
|
||||
.absolutize_virtually(&root)?
|
||||
.into_owned();
|
||||
|
||||
@@ -125,7 +115,7 @@ impl LocalDisk {
|
||||
let (set_idx, disk_idx) = fm.find_disk_index_by_disk_id(fm.erasure.this)?;
|
||||
|
||||
if Some(set_idx) != ep.set_idx || Some(disk_idx) != ep.disk_idx {
|
||||
return Err(Error::new(DiskError::InconsistentDisk));
|
||||
return Err(Error::from(DiskError::InconsistentDisk));
|
||||
}
|
||||
|
||||
id = fm.erasure.this;
|
||||
@@ -149,10 +139,10 @@ impl LocalDisk {
|
||||
}
|
||||
|
||||
async fn make_meta_volumes(&self) -> Result<()> {
|
||||
let buckets = format!("{}/{}", RUSTFS_META_BUCKET, BUCKET_META_PREFIX);
|
||||
let multipart = format!("{}/{}", RUSTFS_META_BUCKET, "multipart");
|
||||
let config = format!("{}/{}", RUSTFS_META_BUCKET, "config");
|
||||
let tmp = format!("{}/{}", RUSTFS_META_BUCKET, "tmp");
|
||||
let buckets = format!("{}/{}", super::RUSTFS_META_BUCKET, super::BUCKET_META_PREFIX);
|
||||
let multipart = format!("{}/{}", super::RUSTFS_META_BUCKET, "multipart");
|
||||
let config = format!("{}/{}", super::RUSTFS_META_BUCKET, "config");
|
||||
let tmp = format!("{}/{}", super::RUSTFS_META_BUCKET, "tmp");
|
||||
let defaults = vec![buckets.as_str(), multipart.as_str(), config.as_str(), tmp.as_str()];
|
||||
|
||||
self.make_volumes(defaults).await
|
||||
@@ -217,7 +207,7 @@ impl LocalDisk {
|
||||
}
|
||||
|
||||
if recursive {
|
||||
let trash_path = self.get_object_path(RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
|
||||
let trash_path = self.get_object_path(super::RUSTFS_META_TMP_DELETED_BUCKET, Uuid::new_v4().to_string().as_str())?;
|
||||
// fs::create_dir_all(&trash_path).await?;
|
||||
fs::rename(&delete_path, &trash_path).await.map_err(|err| {
|
||||
// 使用文件路径自定义错误信息
|
||||
@@ -253,7 +243,7 @@ impl LocalDisk {
|
||||
path: impl AsRef<Path>,
|
||||
read_data: bool,
|
||||
) -> Result<(Vec<u8>, OffsetDateTime)> {
|
||||
let meta_path = path.as_ref().join(Path::new(STORAGE_FORMAT_FILE));
|
||||
let meta_path = path.as_ref().join(Path::new(super::STORAGE_FORMAT_FILE));
|
||||
if read_data {
|
||||
self.read_all_data(bucket, volume_dir, meta_path).await
|
||||
} else {
|
||||
@@ -299,7 +289,7 @@ pub async fn read_file_exists(path: impl AsRef<Path>) -> Result<(Vec<u8>, Option
|
||||
let (data, meta) = match read_file_all(&p).await {
|
||||
Ok((data, meta)) => (data, Some(meta)),
|
||||
Err(e) => {
|
||||
if DiskError::is_err(&e, &DiskError::FileNotFound) {
|
||||
if DiskError::FileNotFound.is(&e) {
|
||||
(Vec::new(), None)
|
||||
} else {
|
||||
return Err(e);
|
||||
@@ -334,9 +324,9 @@ pub async fn read_file_all(path: impl AsRef<Path>) -> Result<(Vec<u8>, Metadata)
|
||||
|
||||
pub async fn read_file_metadata(p: impl AsRef<Path>) -> Result<Metadata> {
|
||||
let meta = fs::metadata(&p).await.map_err(|e| match e.kind() {
|
||||
ErrorKind::NotFound => Error::new(DiskError::FileNotFound),
|
||||
ErrorKind::PermissionDenied => Error::new(DiskError::FileAccessDenied),
|
||||
_ => Error::new(e),
|
||||
ErrorKind::NotFound => Error::from(DiskError::FileNotFound),
|
||||
ErrorKind::PermissionDenied => Error::from(DiskError::FileAccessDenied),
|
||||
_ => Error::from(e),
|
||||
})?;
|
||||
|
||||
Ok(meta)
|
||||
@@ -344,19 +334,19 @@ pub async fn read_file_metadata(p: impl AsRef<Path>) -> Result<Metadata> {
|
||||
|
||||
pub async fn check_volume_exists(p: impl AsRef<Path>) -> Result<()> {
|
||||
fs::metadata(&p).await.map_err(|e| match e.kind() {
|
||||
ErrorKind::NotFound => Error::new(DiskError::VolumeNotFound),
|
||||
ErrorKind::PermissionDenied => Error::new(DiskError::FileAccessDenied),
|
||||
_ => Error::new(e),
|
||||
ErrorKind::NotFound => Error::from(DiskError::VolumeNotFound),
|
||||
ErrorKind::PermissionDenied => Error::from(DiskError::FileAccessDenied),
|
||||
_ => Error::from(e),
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn skip_access_checks(p: impl AsRef<str>) -> bool {
|
||||
let vols = vec![
|
||||
RUSTFS_META_TMP_DELETED_BUCKET,
|
||||
RUSTFS_META_TMP_BUCKET,
|
||||
RUSTFS_META_MULTIPART_BUCKET,
|
||||
RUSTFS_META_BUCKET,
|
||||
super::RUSTFS_META_TMP_DELETED_BUCKET,
|
||||
super::RUSTFS_META_TMP_BUCKET,
|
||||
super::RUSTFS_META_MULTIPART_BUCKET,
|
||||
super::RUSTFS_META_BUCKET,
|
||||
];
|
||||
|
||||
for v in vols.iter() {
|
||||
@@ -435,7 +425,7 @@ impl DiskAPI for LocalDisk {
|
||||
let src_is_dir = srcp.is_dir();
|
||||
let dst_is_dir = dstp.is_dir();
|
||||
if !(src_is_dir && dst_is_dir || !src_is_dir && !dst_is_dir) {
|
||||
return Err(Error::new(DiskError::FileAccessDenied));
|
||||
return Err(Error::from(DiskError::FileAccessDenied));
|
||||
}
|
||||
|
||||
// TODO: check path length
|
||||
@@ -541,16 +531,11 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
if let Ok(metadata) = entry.metadata().await {
|
||||
let vec = entry.file_name().into_vec();
|
||||
|
||||
// if !metadata.is_dir() {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
let name = match String::from_utf8(vec) {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Err(Error::msg("Not supported utf8 file name on this platform")),
|
||||
};
|
||||
let name = entry.file_name().to_string_lossy().to_string();
|
||||
|
||||
// let created = match metadata.created() {
|
||||
// Ok(md) => OffsetDateTime::from(md),
|
||||
@@ -584,8 +569,10 @@ impl DiskAPI for LocalDisk {
|
||||
check_volume_exists(&dst_volume_path).await?;
|
||||
}
|
||||
|
||||
let src_file_path = self.get_object_path(&src_volume, format!("{}/{}", &src_path, STORAGE_FORMAT_FILE).as_str())?;
|
||||
let dst_file_path = self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, STORAGE_FORMAT_FILE).as_str())?;
|
||||
let src_file_path =
|
||||
self.get_object_path(&src_volume, format!("{}/{}", &src_path, super::STORAGE_FORMAT_FILE).as_str())?;
|
||||
let dst_file_path =
|
||||
self.get_object_path(&dst_volume, format!("{}/{}", &dst_path, super::STORAGE_FORMAT_FILE).as_str())?;
|
||||
|
||||
let (src_data_path, dst_data_path) = {
|
||||
let mut data_dir = String::new();
|
||||
@@ -639,7 +626,7 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
self.rename_all(&src_file_path, &dst_file_path, &skip_parent).await?;
|
||||
|
||||
if src_volume != RUSTFS_META_MULTIPART_BUCKET {
|
||||
if src_volume != super::RUSTFS_META_MULTIPART_BUCKET {
|
||||
fs::remove_dir(&src_file_path.parent().unwrap()).await?;
|
||||
} else {
|
||||
self.delete_file(&src_volume_path, &PathBuf::from(src_file_path.parent().unwrap()), true, false)
|
||||
@@ -673,11 +660,11 @@ impl DiskAPI for LocalDisk {
|
||||
fs::create_dir_all(&p).await?;
|
||||
return Ok(());
|
||||
}
|
||||
_ => return Err(Error::new(e)),
|
||||
_ => return Err(Error::from(e)),
|
||||
},
|
||||
}
|
||||
|
||||
Err(Error::new(DiskError::VolumeExists))
|
||||
Err(Error::from(DiskError::VolumeExists))
|
||||
}
|
||||
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>> {
|
||||
let mut entries = fs::read_dir(&self.root).await?;
|
||||
@@ -686,16 +673,11 @@ impl DiskAPI for LocalDisk {
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
if let Ok(metadata) = entry.metadata().await {
|
||||
let vec = entry.file_name().into_vec();
|
||||
|
||||
// if !metadata.is_dir() {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
let name = match String::from_utf8(vec) {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Err(Error::msg("Not supported utf8 file name on this platform")),
|
||||
};
|
||||
let name = entry.file_name().to_string_lossy().to_string();
|
||||
|
||||
let created = match metadata.created() {
|
||||
Ok(md) => OffsetDateTime::from(md),
|
||||
@@ -724,7 +706,7 @@ impl DiskAPI for LocalDisk {
|
||||
}
|
||||
|
||||
async fn write_metadata(&self, _org_volume: &str, volume: &str, path: &str, fi: FileInfo) -> Result<()> {
|
||||
let p = self.get_object_path(&volume, format!("{}/{}", path, STORAGE_FORMAT_FILE).as_str())?;
|
||||
let p = self.get_object_path(&volume, format!("{}/{}", path, super::STORAGE_FORMAT_FILE).as_str())?;
|
||||
|
||||
warn!("write_metadata {:?} {:?}", &p, &fi);
|
||||
|
||||
@@ -811,7 +793,7 @@ impl DiskAPI for LocalDisk {
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
if !(DiskError::is_err(&e, &DiskError::FileNotFound) || DiskError::is_err(&e, &DiskError::VolumeNotFound)) {
|
||||
if !(DiskError::FileNotFound.is(&e) || DiskError::VolumeNotFound.is(&e)) {
|
||||
res.exists = true;
|
||||
res.error = e.to_string();
|
||||
}
|
||||
@@ -848,10 +830,10 @@ mod test {
|
||||
// let arr = Vec::new();
|
||||
|
||||
let vols = vec![
|
||||
RUSTFS_META_TMP_DELETED_BUCKET,
|
||||
RUSTFS_META_TMP_BUCKET,
|
||||
RUSTFS_META_MULTIPART_BUCKET,
|
||||
RUSTFS_META_BUCKET,
|
||||
super::super::RUSTFS_META_TMP_DELETED_BUCKET,
|
||||
super::super::RUSTFS_META_TMP_BUCKET,
|
||||
super::super::RUSTFS_META_MULTIPART_BUCKET,
|
||||
super::super::RUSTFS_META_BUCKET,
|
||||
];
|
||||
|
||||
let paths: Vec<_> = vols.iter().map(|v| Path::new(v).join("test")).collect();
|
||||
@@ -876,7 +858,9 @@ mod test {
|
||||
|
||||
let disk = LocalDisk::new(&ep, false).await.unwrap();
|
||||
|
||||
let tmpp = disk.resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET)).unwrap();
|
||||
let tmpp = disk
|
||||
.resolve_abs_path(Path::new(super::super::RUSTFS_META_TMP_DELETED_BUCKET))
|
||||
.unwrap();
|
||||
|
||||
println!("ppp :{:?}", &tmpp);
|
||||
|
||||
@@ -904,7 +888,9 @@ mod test {
|
||||
|
||||
let disk = LocalDisk::new(&ep, false).await.unwrap();
|
||||
|
||||
let tmpp = disk.resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET)).unwrap();
|
||||
let tmpp = disk
|
||||
.resolve_abs_path(Path::new(super::super::RUSTFS_META_TMP_DELETED_BUCKET))
|
||||
.unwrap();
|
||||
|
||||
println!("ppp :{:?}", &tmpp);
|
||||
|
||||
13
ecstore/src/disk/mod.rs
Normal file
13
ecstore/src/disk/mod.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
pub mod error;
|
||||
pub mod format;
|
||||
mod local;
|
||||
|
||||
pub use local::{init_disks, DiskOption, DiskStore};
|
||||
|
||||
pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys";
|
||||
pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart";
|
||||
pub const RUSTFS_META_TMP_BUCKET: &str = ".rustfs.sys/tmp";
|
||||
pub const RUSTFS_META_TMP_DELETED_BUCKET: &str = ".rustfs.sys/tmp/.trash";
|
||||
pub const BUCKET_META_PREFIX: &str = "buckets";
|
||||
pub const FORMAT_CONFIG_FILE: &str = "format.json";
|
||||
const STORAGE_FORMAT_FILE: &str = "xl.meta";
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::{fmt::Debug, io::SeekFrom, pin::Pin};
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
use bytes::Bytes;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::{
|
||||
@@ -11,6 +10,7 @@ use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
erasure::ReadAt,
|
||||
error::{Error, Result},
|
||||
store_api::{FileInfo, RawFileInfo},
|
||||
};
|
||||
|
||||
@@ -175,119 +175,3 @@ impl ReadAt for FileReader {
|
||||
Ok((buffer, bytes_read))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DiskError {
|
||||
#[error("file not found")]
|
||||
FileNotFound,
|
||||
|
||||
#[error("file version not found")]
|
||||
FileVersionNotFound,
|
||||
|
||||
#[error("disk not found")]
|
||||
DiskNotFound,
|
||||
|
||||
#[error("disk access denied")]
|
||||
FileAccessDenied,
|
||||
|
||||
#[error("InconsistentDisk")]
|
||||
InconsistentDisk,
|
||||
|
||||
#[error("volume already exists")]
|
||||
VolumeExists,
|
||||
|
||||
#[error("unformatted disk error")]
|
||||
UnformattedDisk,
|
||||
|
||||
#[error("unsupport disk")]
|
||||
UnsupportedDisk,
|
||||
|
||||
#[error("disk not a dir")]
|
||||
DiskNotDir,
|
||||
|
||||
#[error("volume not found")]
|
||||
VolumeNotFound,
|
||||
}
|
||||
|
||||
impl DiskError {
|
||||
pub fn check_disk_fatal_errs(errs: &Vec<Option<Error>>) -> Result<()> {
|
||||
if Self::count_errs(errs, &DiskError::UnsupportedDisk) == errs.len() {
|
||||
return Err(Error::new(DiskError::UnsupportedDisk));
|
||||
}
|
||||
|
||||
// if count_errs(errs, &DiskError::DiskAccessDenied) == errs.len() {
|
||||
// return Err(Error::new(DiskError::DiskAccessDenied));
|
||||
// }
|
||||
|
||||
if Self::count_errs(errs, &DiskError::FileAccessDenied) == errs.len() {
|
||||
return Err(Error::new(DiskError::FileAccessDenied));
|
||||
}
|
||||
|
||||
// if count_errs(errs, &DiskError::FaultyDisk) == errs.len() {
|
||||
// return Err(Error::new(DiskError::FaultyDisk));
|
||||
// }
|
||||
|
||||
if Self::count_errs(errs, &DiskError::DiskNotDir) == errs.len() {
|
||||
return Err(Error::new(DiskError::DiskNotDir));
|
||||
}
|
||||
|
||||
// if count_errs(errs, &DiskError::XLBackend) == errs.len() {
|
||||
// return Err(Error::new(DiskError::XLBackend));
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
pub fn count_errs(errs: &Vec<Option<Error>>, err: &DiskError) -> usize {
|
||||
return errs
|
||||
.iter()
|
||||
.filter(|&e| {
|
||||
if e.is_some() {
|
||||
let e = e.as_ref().unwrap();
|
||||
let cast = e.downcast_ref::<DiskError>();
|
||||
if cast.is_some() {
|
||||
let cast = cast.unwrap();
|
||||
return cast == err;
|
||||
}
|
||||
}
|
||||
false
|
||||
})
|
||||
.count();
|
||||
}
|
||||
|
||||
pub fn quorum_unformatted_disks(errs: &Vec<Option<Error>>) -> bool {
|
||||
Self::count_errs(errs, &DiskError::UnformattedDisk) >= (errs.len() / 2) + 1
|
||||
}
|
||||
|
||||
pub fn is_err(err: &Error, disk_err: &DiskError) -> bool {
|
||||
let cast = err.downcast_ref::<DiskError>();
|
||||
if cast.is_none() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let e = cast.unwrap();
|
||||
|
||||
e == disk_err
|
||||
}
|
||||
|
||||
// pub fn match_err(err: Error, matchs: Vec<DiskError>) -> bool {
|
||||
// let cast = err.downcast_ref::<DiskError>();
|
||||
// if cast.is_none() {
|
||||
// return false;
|
||||
// }
|
||||
|
||||
// let e = cast.unwrap();
|
||||
|
||||
// for i in matchs.iter() {
|
||||
// if e == i {
|
||||
// return true;
|
||||
// }
|
||||
// }
|
||||
|
||||
// return false;
|
||||
// }
|
||||
}
|
||||
|
||||
impl PartialEq for DiskError {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
core::mem::discriminant(self) == core::mem::discriminant(other)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,4 @@
|
||||
use crate::error::StdError;
|
||||
use anyhow::anyhow;
|
||||
use anyhow::Error;
|
||||
use anyhow::Result;
|
||||
use crate::error::{Error, Result, StdError};
|
||||
use bytes::Bytes;
|
||||
use futures::future::join_all;
|
||||
use futures::{Stream, StreamExt};
|
||||
@@ -16,7 +13,7 @@ use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::chunk_stream::ChunkedStream;
|
||||
use crate::disk_api::DiskError;
|
||||
use crate::disk::error::DiskError;
|
||||
use crate::disk_api::FileReader;
|
||||
|
||||
pub struct Erasure {
|
||||
@@ -98,7 +95,7 @@ impl Erasure {
|
||||
// }
|
||||
// }
|
||||
}
|
||||
Err(e) => return Err(anyhow!(e)),
|
||||
Err(e) => return Err(Error::from_std_error(e)),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,9 +14,16 @@ impl Error {
|
||||
/// Create a new error from a `std::error::Error`.
|
||||
#[must_use]
|
||||
#[track_caller]
|
||||
pub fn new(source: StdError) -> Self {
|
||||
pub fn new<T: std::error::Error + Send + Sync + 'static>(source: T) -> Self {
|
||||
Self::from_std_error(source.into())
|
||||
}
|
||||
|
||||
/// Create a new error from a `std::error::Error`.
|
||||
#[must_use]
|
||||
#[track_caller]
|
||||
pub fn from_std_error(inner: StdError) -> Self {
|
||||
Self {
|
||||
inner: source,
|
||||
inner,
|
||||
span_trace: SpanTrace::capture(),
|
||||
}
|
||||
}
|
||||
@@ -25,7 +32,14 @@ impl Error {
|
||||
#[must_use]
|
||||
#[track_caller]
|
||||
pub fn from_string(s: impl Into<String>) -> Self {
|
||||
Self::new(s.into().into())
|
||||
Self::msg(s)
|
||||
}
|
||||
|
||||
/// Create a new error from a string.
|
||||
#[must_use]
|
||||
#[track_caller]
|
||||
pub fn msg(s: impl Into<String>) -> Self {
|
||||
Self::from_std_error(s.into().into())
|
||||
}
|
||||
|
||||
/// Returns `true` if the inner type is the same as `T`.
|
||||
@@ -51,7 +65,7 @@ impl Error {
|
||||
|
||||
impl<T: std::error::Error + Send + Sync + 'static> From<T> for Error {
|
||||
fn from(e: T) -> Self {
|
||||
Self::new(e.into())
|
||||
Self::new(e)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
mod bucket_meta;
|
||||
mod chunk_stream;
|
||||
mod disk;
|
||||
pub mod disk;
|
||||
pub mod disk_api;
|
||||
mod disks_layout;
|
||||
mod ellipses;
|
||||
@@ -8,7 +8,6 @@ mod endpoint;
|
||||
mod erasure;
|
||||
pub mod error;
|
||||
mod file_meta;
|
||||
mod format;
|
||||
mod peer;
|
||||
pub mod set_disk;
|
||||
mod sets;
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
use anyhow::{Error, Result};
|
||||
use async_trait::async_trait;
|
||||
use futures::future::join_all;
|
||||
use std::{collections::HashMap, fmt::Debug, sync::Arc};
|
||||
use tracing::warn;
|
||||
|
||||
use crate::{
|
||||
disk::DiskStore,
|
||||
disk_api::{DiskError, VolumeInfo},
|
||||
disk::{error::DiskError, DiskStore},
|
||||
disk_api::VolumeInfo,
|
||||
endpoint::{EndpointServerPools, Node},
|
||||
error::{Error, Result},
|
||||
store_api::{BucketInfo, BucketOptions, MakeBucketOptions},
|
||||
};
|
||||
|
||||
@@ -284,7 +284,7 @@ impl PeerS3Client for LocalPeerS3Client {
|
||||
match disk.make_volume(bucket).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
if opts.force_create && DiskError::is_err(&e, &DiskError::VolumeExists) {
|
||||
if opts.force_create && DiskError::VolumeExists.is(&e) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
use anyhow::Result;
|
||||
use http::HeaderMap;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
disk::format::{DistributionAlgoVersion, FormatV3},
|
||||
disk::DiskStore,
|
||||
endpoint::PoolEndpoints,
|
||||
format::{DistributionAlgoVersion, FormatV3},
|
||||
error::Result,
|
||||
set_disk::SetDisks,
|
||||
store_api::{
|
||||
BucketInfo, BucketOptions, CompletePart, FileInfo, GetObjectReader, HTTPRangeSpec, MakeBucketOptions,
|
||||
|
||||
@@ -1,18 +1,10 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
|
||||
use http::HeaderMap;
|
||||
use s3s::{dto::StreamingBlob, Body};
|
||||
use tracing::debug;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
bucket_meta::BucketMetadata,
|
||||
disk::error::DiskError,
|
||||
disk::{self, DiskOption, DiskStore, RUSTFS_META_BUCKET},
|
||||
disk_api::DiskError,
|
||||
disks_layout::DisksLayout,
|
||||
endpoint::EndpointServerPools,
|
||||
error::{Error, Result},
|
||||
peer::{PeerS3Client, S3PeerSys},
|
||||
sets::Sets,
|
||||
store_api::{
|
||||
@@ -21,6 +13,11 @@ use crate::{
|
||||
},
|
||||
store_init, utils,
|
||||
};
|
||||
use http::HeaderMap;
|
||||
use s3s::{dto::StreamingBlob, Body};
|
||||
use std::collections::HashMap;
|
||||
use tracing::debug;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ECStore {
|
||||
@@ -34,12 +31,11 @@ pub struct ECStore {
|
||||
|
||||
impl ECStore {
|
||||
pub async fn new(address: String, endpoints: Vec<String>) -> Result<Self> {
|
||||
let layouts = DisksLayout::try_from(endpoints.as_slice()).map_err(|v| Error::msg(v))?;
|
||||
let layouts = DisksLayout::try_from(endpoints.as_slice())?;
|
||||
|
||||
let mut deployment_id = None;
|
||||
|
||||
let (endpoint_pools, _) =
|
||||
EndpointServerPools::create_server_endpoints(address.as_str(), &layouts).map_err(|v| Error::msg(v))?;
|
||||
let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address.as_str(), &layouts)?;
|
||||
|
||||
let mut pools = Vec::with_capacity(endpoint_pools.as_ref().len());
|
||||
let mut disk_map = HashMap::with_capacity(endpoint_pools.as_ref().len());
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use anyhow::{Error, Result};
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use http::HeaderMap;
|
||||
use rmp_serde::Serializer;
|
||||
use s3s::dto::StreamingBlob;
|
||||
|
||||
@@ -3,13 +3,12 @@ use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
disk::error::DiskError,
|
||||
disk::format::{FormatErasureVersion, FormatMetaVersion, FormatV3},
|
||||
disk::{DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET},
|
||||
disk_api::DiskError,
|
||||
format::{FormatErasureVersion, FormatMetaVersion, FormatV3},
|
||||
error::{Error, Result},
|
||||
};
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
fmt::Debug,
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
mod config;
|
||||
mod storage;
|
||||
|
||||
use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use ecstore::error::Result;
|
||||
use hyper_util::{
|
||||
rt::{TokioExecutor, TokioIo},
|
||||
server::conn::auto::Builder as ConnBuilder,
|
||||
@@ -38,9 +38,7 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
debug!("opt: {:?}", &opt);
|
||||
// Setup S3 service
|
||||
let service = {
|
||||
let mut b = S3ServiceBuilder::new(
|
||||
storage::ecfs::FS::new(opt.address.clone(), opt.volumes.clone()).await?,
|
||||
);
|
||||
let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(opt.address.clone(), opt.volumes.clone()).await?);
|
||||
|
||||
// Enable authentication
|
||||
if let (Some(ak), Some(sk)) = (opt.access_key, opt.secret_key) {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use bytes::Bytes;
|
||||
use ecstore::disk_api::DiskError;
|
||||
use ecstore::disk::error::DiskError;
|
||||
use ecstore::store_api::BucketOptions;
|
||||
use ecstore::store_api::CompletePart;
|
||||
use ecstore::store_api::HTTPRangeSpec;
|
||||
@@ -23,7 +23,7 @@ use std::str::FromStr;
|
||||
use time::OffsetDateTime;
|
||||
use transform_stream::AsyncTryStream;
|
||||
|
||||
use anyhow::Result;
|
||||
use ecstore::error::Result;
|
||||
use ecstore::store::ECStore;
|
||||
use tracing::debug;
|
||||
|
||||
@@ -112,7 +112,7 @@ impl S3 for FS {
|
||||
let input = req.input;
|
||||
|
||||
if let Err(e) = self.store.get_bucket_info(&input.bucket, &BucketOptions {}).await {
|
||||
if DiskError::is_err(&e, &DiskError::VolumeNotFound) {
|
||||
if DiskError::VolumeNotFound.is(&e) {
|
||||
return Err(s3_error!(NoSuchBucket));
|
||||
} else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
|
||||
@@ -179,7 +179,7 @@ impl S3 for FS {
|
||||
let input = req.input;
|
||||
|
||||
if let Err(e) = self.store.get_bucket_info(&input.bucket, &BucketOptions {}).await {
|
||||
if DiskError::is_err(&e, &DiskError::VolumeNotFound) {
|
||||
if DiskError::VolumeNotFound.is(&e) {
|
||||
return Err(s3_error!(NoSuchBucket));
|
||||
} else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
|
||||
|
||||
Reference in New Issue
Block a user