init:store

This commit is contained in:
weisd
2024-06-28 10:54:58 +08:00
parent 8d820c3096
commit 3463fbec81
7 changed files with 515 additions and 418 deletions

View File

@@ -77,13 +77,13 @@ pub async fn init_disks(
#[derive(Debug)]
pub struct LocalDisk {
root: PathBuf,
id: Uuid,
format_data: Vec<u8>,
format_meta: Option<Metadata>,
format_path: PathBuf,
format_legacy: bool,
format_last_check: OffsetDateTime,
pub root: PathBuf,
pub id: Uuid,
pub format_data: Vec<u8>,
pub format_meta: Option<Metadata>,
pub format_path: PathBuf,
pub format_legacy: bool,
pub format_last_check: OffsetDateTime,
}
impl LocalDisk {
@@ -180,7 +180,7 @@ pub async fn read_file_exists(
let (data, meta) = match read_file_all(&p).await {
Ok((data, meta)) => (data, Some(meta)),
Err(e) => {
if is_err(&e, &DiskError::FileNotFound) {
if DiskError::is_err(&e, &DiskError::FileNotFound) {
(Vec::new(), None)
} else {
return Err(e);
@@ -336,13 +336,13 @@ impl DiskAPI for LocalDisk {
}
}
pub struct RemoteDisk {}
// pub struct RemoteDisk {}
impl RemoteDisk {
pub fn new(_ep: &Endpoint, _health_check: bool) -> Result<Self, Error> {
Ok(Self {})
}
}
// impl RemoteDisk {
// pub fn new(_ep: &Endpoint, _health_check: bool) -> Result<Self, Error> {
// Ok(Self {})
// }
// }
#[derive(Debug, thiserror::Error)]
pub enum DiskError {
@@ -373,88 +373,89 @@ pub enum DiskError {
VolumeNotFound,
}
impl DiskError {
pub fn check_disk_fatal_errs(errs: &Vec<Option<Error>>) -> Result<(), Error> {
if Self::count_errs(errs, &DiskError::UnsupportedDisk) == errs.len() {
return Err(Error::new(DiskError::UnsupportedDisk));
}
// if count_errs(errs, &DiskError::DiskAccessDenied) == errs.len() {
// return Err(Error::new(DiskError::DiskAccessDenied));
// }
if Self::count_errs(errs, &DiskError::FileAccessDenied) == errs.len() {
return Err(Error::new(DiskError::FileAccessDenied));
}
// if count_errs(errs, &DiskError::FaultyDisk) == errs.len() {
// return Err(Error::new(DiskError::FaultyDisk));
// }
if Self::count_errs(errs, &DiskError::DiskNotDir) == errs.len() {
return Err(Error::new(DiskError::DiskNotDir));
}
// if count_errs(errs, &DiskError::XLBackend) == errs.len() {
// return Err(Error::new(DiskError::XLBackend));
// }
Ok(())
}
pub fn count_errs(errs: &Vec<Option<Error>>, err: &DiskError) -> usize {
return errs
.iter()
.filter(|&e| {
if e.is_some() {
let e = e.as_ref().unwrap();
let cast = e.downcast_ref::<DiskError>();
if cast.is_some() {
let cast = cast.unwrap();
return cast == err;
}
}
true
})
.count();
}
pub fn quorum_unformatted_disks(errs: &Vec<Option<Error>>) -> bool {
Self::count_errs(errs, &DiskError::UnformattedDisk) >= (errs.len() / 2) + 1
}
pub fn is_err(err: &Error, disk_err: &DiskError) -> bool {
let cast = err.downcast_ref::<DiskError>();
if cast.is_none() {
return false;
}
let e = cast.unwrap();
e == disk_err
}
// pub fn match_err(err: Error, matchs: Vec<DiskError>) -> bool {
// let cast = err.downcast_ref::<DiskError>();
// if cast.is_none() {
// return false;
// }
// let e = cast.unwrap();
// for i in matchs.iter() {
// if e == i {
// return true;
// }
// }
// return false;
// }
}
impl PartialEq for DiskError {
fn eq(&self, other: &Self) -> bool {
core::mem::discriminant(self) == core::mem::discriminant(other)
}
}
pub fn check_disk_fatal_errs(errs: &Vec<Option<Error>>) -> Result<(), Error> {
if count_errs(errs, &DiskError::UnsupportedDisk) == errs.len() {
return Err(Error::new(DiskError::UnsupportedDisk));
}
// if count_errs(errs, &DiskError::DiskAccessDenied) == errs.len() {
// return Err(Error::new(DiskError::DiskAccessDenied));
// }
if count_errs(errs, &DiskError::FileAccessDenied) == errs.len() {
return Err(Error::new(DiskError::FileAccessDenied));
}
// if count_errs(errs, &DiskError::FaultyDisk) == errs.len() {
// return Err(Error::new(DiskError::FaultyDisk));
// }
if count_errs(errs, &DiskError::DiskNotDir) == errs.len() {
return Err(Error::new(DiskError::DiskNotDir));
}
// if count_errs(errs, &DiskError::XLBackend) == errs.len() {
// return Err(Error::new(DiskError::XLBackend));
// }
Ok(())
}
pub fn quorum_unformatted_disks(errs: &Vec<Option<Error>>) -> bool {
count_errs(errs, &DiskError::UnformattedDisk) >= (errs.len() / 2) + 1
}
pub fn count_errs(errs: &Vec<Option<Error>>, err: &DiskError) -> usize {
return errs
.iter()
.filter(|&e| {
if e.is_some() {
let e = e.as_ref().unwrap();
let cast = e.downcast_ref::<DiskError>();
if cast.is_some() {
let cast = cast.unwrap();
return cast == err;
}
}
true
})
.count();
}
pub fn is_err(err: &Error, disk_err: &DiskError) -> bool {
let cast = err.downcast_ref::<DiskError>();
if cast.is_none() {
return false;
}
let e = cast.unwrap();
e == disk_err
}
pub fn match_err(err: Error, matchs: Vec<DiskError>) -> bool {
let cast = err.downcast_ref::<DiskError>();
if cast.is_none() {
return false;
}
let e = cast.unwrap();
for i in matchs.iter() {
if e == i {
return true;
}
}
return false;
}
#[cfg(test)]
mod test {

View File

@@ -311,7 +311,7 @@ impl PoolEndpointList {
// PoolEndpoints represent endpoints in a given pool
// along with its setCount and setDriveCount.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct PoolEndpoints {
// indicates if endpoints are provided in non-ellipses style
pub legacy: bool,
@@ -331,6 +331,36 @@ impl EndpointServerPools {
Self(Vec::new())
}
// create_server_endpoints
pub fn create_server_endpoints(
server_addr: String,
pool_args: &Vec<PoolDisksLayout>,
legacy: bool,
) -> Result<(EndpointServerPools, SetupType), Error> {
if pool_args.is_empty() {
return Err(Error::msg("无效参数"));
}
let (pooleps, setup_type) = create_pool_endpoints(server_addr, pool_args)?;
let mut ret = EndpointServerPools::new();
for (i, eps) in pooleps.iter().enumerate() {
let ep = PoolEndpoints {
legacy: legacy,
set_count: pool_args[i].layout.len(),
drives_per_set: pool_args[i].layout[0].len(),
endpoints: eps.clone(),
cmd_line: pool_args[i].cmdline.clone(),
platform: String::new(),
};
ret.add(ep)?;
}
Ok((ret, setup_type))
}
pub fn first_is_local(&self) -> bool {
if self.0.is_empty() {
return false;
@@ -537,7 +567,7 @@ pub fn create_pool_endpoints(
}
// create_server_endpoints
pub fn create_server_endpoints(
fn create_server_endpoints(
server_addr: String,
pool_args: &Vec<PoolDisksLayout>,
legacy: bool,

View File

@@ -6,6 +6,9 @@ mod endpoint;
mod erasure;
pub mod error;
mod format;
mod sets;
pub mod store;
mod store_api;
mod store_init;
mod stream;
mod utils;

68
ecstore/src/sets.rs Normal file
View File

@@ -0,0 +1,68 @@
use anyhow::Error;
use uuid::Uuid;
use crate::{endpoint::PoolEndpoints, format::FormatV3};
#[derive(Debug)]
pub struct Sets {
pub id: Uuid,
// pub sets: Vec<Objects>,
pub disk_indexs: Vec<Vec<usize>>, // [set_count_idx][set_drive_count_idx] = disk_idx
pub pool_idx: usize,
pub endpoints: PoolEndpoints,
pub format: FormatV3,
pub partiy_count: usize,
pub set_count: usize,
pub set_drive_count: usize,
}
impl Sets {
pub fn new(
endpoints: &PoolEndpoints,
fm: &FormatV3,
pool_idx: usize,
partiy_count: usize,
) -> Result<Self, Error> {
let set_count = fm.erasure.sets.len();
let set_drive_count = fm.erasure.sets[0].len();
let mut disk_indexs = Vec::with_capacity(set_count);
for i in 0..set_count {
let mut set_indexs = Vec::with_capacity(set_drive_count);
for j in 0..set_drive_count {
let idx = i * set_drive_count + j;
set_indexs.push(idx);
}
disk_indexs.push(set_indexs);
}
let sets = Self {
id: fm.id.clone(),
// sets: todo!(),
disk_indexs,
pool_idx,
endpoints: endpoints.clone(),
format: fm.clone(),
partiy_count,
set_count,
set_drive_count,
};
Ok(sets)
}
pub fn get_disks(&self, set_idx: usize) -> Vec<usize> {
self.disk_indexs[set_idx].clone()
}
}
// #[derive(Debug)]
// pub struct Objects {
// pub endpoints: Vec<Endpoint>,
// pub disks: Vec<usize>,
// pub set_index: usize,
// pub pool_index: usize,
// pub set_drive_count: usize,
// pub default_parity_count: usize,
// }

View File

@@ -1,52 +1,44 @@
use bytes::Bytes;
use futures::future::join_all;
use std::collections::HashMap;
use anyhow::Error;
use uuid::Uuid;
use crate::{
disk::{
self, check_disk_fatal_errs, count_errs, quorum_unformatted_disks, DiskError, DiskOption,
DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET,
},
disk_api::DiskAPI,
disk::{self, DiskError, DiskOption, DiskStore},
disks_layout::DisksLayout,
endpoint::create_server_endpoints,
format::{FormatErasureVersion, FormatMetaVersion, FormatV3},
endpoint::EndpointServerPools,
sets::Sets,
store_api::StorageAPI,
store_init,
};
use super::endpoint::Endpoint;
use anyhow::{Error, Result};
use std::{
collections::{hash_map::Entry, HashMap},
fmt::Debug,
};
type StoreDisk = dyn DiskAPI;
#[derive(Debug)]
pub struct ECStore {
pub id: uuid::Uuid,
// pub disks: Vec<Box<dyn DiskAPI>>,
disk_map: HashMap<usize, Vec<Option<Box<dyn DiskAPI>>>>,
// pub disks: Vec<DiskStore>,
pub disk_map: HashMap<usize, Vec<Option<DiskStore>>>,
pub pools: Vec<Sets>,
pub peer: Vec<String>,
}
impl ECStore {
pub async fn new(address: String, endpoints: Vec<String>) -> Result<Self> {
pub async fn new(address: String, endpoints: Vec<String>) -> Result<Self, Error> {
let layouts = DisksLayout::new(&endpoints)?;
let mut deployment_id = None;
let mut setsv = Vec::with_capacity(endpoints.len());
let (endpoint_pools, _) =
EndpointServerPools::create_server_endpoints(address, &layouts.pools, layouts.legacy)?;
let (pools, _) = create_server_endpoints(address, &layouts.pools, layouts.legacy)?;
let mut pools = Vec::with_capacity(endpoint_pools.len());
let mut disk_map = HashMap::with_capacity(endpoint_pools.len());
let mut disk_map = HashMap::with_capacity(pools.len());
let first_is_local = endpoint_pools.first_is_local();
let first_is_local = pools.first_is_local();
for (i, pool_eps) in endpoint_pools.iter().enumerate() {
// TODO: read from config parseStorageClass
let partiy_count = store_init::default_partiy_count(pool_eps.drives_per_set);
for (i, pool_eps) in pools.iter().enumerate() {
let (disks, errs) = disk::init_disks(
&pool_eps.endpoints,
&DiskOption {
@@ -56,9 +48,9 @@ impl ECStore {
)
.await;
check_disk_fatal_errs(&errs)?;
DiskError::check_disk_fatal_errs(&errs)?;
let fm = do_init_format_file(
let fm = store_init::do_init_format_file(
first_is_local,
&disks,
pool_eps.set_count,
@@ -81,313 +73,22 @@ impl ECStore {
disk_map.insert(i, disks);
let sets = Sets::new()?;
let sets = Sets::new(pool_eps, &fm, i, partiy_count)?;
setsv.push(sets);
pools.push(sets);
}
Ok(ECStore {
id: Uuid::nil(),
id: deployment_id.unwrap(),
disk_map,
pools: Vec::new(),
pools,
peer: Vec::new(),
})
}
}
async fn do_init_format_file(
first_disk: bool,
disks: &Vec<Option<DiskStore>>,
set_count: usize,
set_drive_count: usize,
deployment_id: Option<Uuid>,
) -> Result<FormatV3, Error> {
let (formats, errs) = read_format_file_all(&disks, false).await;
check_disk_fatal_errs(&errs)?;
check_format_erasure_values(&formats, &disks, set_drive_count)?;
if first_disk && should_init_erasure_disks(&errs) {
// UnformattedDisk, not format file create
// new format and save
let fms = init_format_files(&disks, set_count, set_drive_count, deployment_id);
let _errs = save_format_file_all(&disks, &fms).await;
// TODO: check quorum
// reduceWriteQuorumErrs(&errs)?;
let fm = get_format_file_in_quorum(&fms)?;
return Ok(fm);
}
let unformatted = quorum_unformatted_disks(&errs);
if unformatted && !first_disk {
return Err(Error::new(ErasureError::NotFirstDisk));
}
if unformatted && first_disk {
return Err(Error::new(ErasureError::FirstDiskWait));
}
let fm = get_format_file_in_quorum(&formats)?;
return Ok(fm);
}
fn init_format_files(
disks: &Vec<Option<DiskStore>>,
set_count: usize,
set_drive_count: usize,
deployment_id: Option<Uuid>,
) -> Vec<Option<FormatV3>> {
let fm = FormatV3::new(set_count, set_drive_count);
let mut fms = vec![None; disks.len()];
for i in 0..set_count {
for j in 0..set_drive_count {
let idx = i * set_drive_count + j;
let mut newfm = fm.clone();
newfm.erasure.this = fm.erasure.sets[i][j];
if deployment_id.is_some() {
newfm.id = deployment_id.unwrap();
}
fms[idx] = Some(newfm);
}
}
fms
}
fn get_format_file_in_quorum(formats: &Vec<Option<FormatV3>>) -> Result<FormatV3> {
let mut countmap = HashMap::new();
for f in formats.iter() {
if f.is_some() {
let ds = f.as_ref().unwrap().drives();
let v = countmap.entry(ds);
match v {
Entry::Occupied(mut entry) => *entry.get_mut() += 1,
Entry::Vacant(vacant) => {
vacant.insert(1);
}
};
}
}
let (max_drives, max_count) = countmap.iter().max_by_key(|&(_, c)| c).unwrap_or((&0, &0));
if *max_drives == 0 || *max_count < formats.len() / 2 {
return Err(Error::new(ErasureError::ErasureReadQuorum));
}
let format = formats
.iter()
.filter(|f| f.is_some() && f.as_ref().unwrap().drives() == *max_drives)
.next()
.ok_or(Error::new(ErasureError::ErasureReadQuorum))?;
let mut format = format.as_ref().unwrap().clone();
format.erasure.this = Uuid::nil();
Ok(format)
}
fn should_init_erasure_disks(errs: &Vec<Option<Error>>) -> bool {
count_errs(errs, &DiskError::UnformattedDisk) == errs.len()
}
fn check_format_erasure_values(
formats: &Vec<Option<FormatV3>>,
disks: &Vec<Option<DiskStore>>,
set_drive_count: usize,
) -> Result<()> {
for f in formats.iter() {
if f.is_none() {
continue;
}
let f = f.as_ref().unwrap();
check_format_erasure_value(f)?;
if formats.len() != f.erasure.sets.len() * f.erasure.sets[0].len() {
return Err(Error::msg("formats length for erasure.sets not mtach"));
}
if f.erasure.sets[0].len() != set_drive_count {
return Err(Error::msg("erasure set length not match set_drive_count"));
}
}
Ok(())
}
fn check_format_erasure_value(format: &FormatV3) -> Result<()> {
if format.version != FormatMetaVersion::V1 {
return Err(Error::msg("invalid FormatMetaVersion"));
}
if format.erasure.version != FormatErasureVersion::V3 {
return Err(Error::msg("invalid FormatErasureVersion"));
}
Ok(())
}
fn default_partiy_blocks(drive: usize) -> usize {
match drive {
1 => 0,
2 | 3 => 1,
4 | 5 => 2,
6 | 7 => 3,
_ => 4,
}
}
// read_format_file_all 读取所有foramt.json
async fn read_format_file_all(
disks: &Vec<Option<DiskStore>>,
heal: bool,
) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(disks.len());
for ep in disks.iter() {
futures.push(read_format_file(ep, heal));
}
let mut datas = Vec::with_capacity(disks.len());
let mut errors = Vec::with_capacity(disks.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(s) => {
datas.push(Some(s));
errors.push(None);
}
Err(e) => {
datas.push(None);
errors.push(Some(e));
}
}
}
(datas, errors)
}
async fn read_format_file(disk: &Option<DiskStore>, heal: bool) -> Result<FormatV3, Error> {
if disk.is_none() {
return Err(Error::new(DiskError::DiskNotFound));
}
let disk = disk.as_ref().unwrap();
let data = disk
.read_all(RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE)
.await
.map_err(|e| match &e.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) => Error::new(DiskError::UnformattedDisk),
Some(DiskError::DiskNotFound) => Error::new(DiskError::UnformattedDisk),
Some(_) => e,
None => e,
})?;
let fm = FormatV3::try_from(data.as_ref())?;
// TODO: heal
Ok(fm)
}
async fn save_format_file_all(
disks: &Vec<Option<DiskStore>>,
formats: &Vec<Option<FormatV3>>,
) -> Vec<Option<Error>> {
let mut futures = Vec::with_capacity(disks.len());
for (i, ep) in disks.iter().enumerate() {
futures.push(save_format_file(ep, &formats[i]));
}
let mut errors = Vec::with_capacity(disks.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(_) => {
errors.push(None);
}
Err(e) => {
errors.push(Some(e));
}
}
}
errors
}
async fn save_format_file(disk: &Option<DiskStore>, format: &Option<FormatV3>) -> Result<()> {
if disk.is_none() {
return Err(Error::new(DiskError::DiskNotFound));
}
let format = format.as_ref().unwrap();
let json_data = format.to_json().map(|data| Bytes::from(data))?;
let tmpfile = Uuid::new_v4().to_string();
let disk = disk.as_ref().unwrap();
disk.write_all(RUSTFS_META_BUCKET, tmpfile.as_str(), json_data)
.await?;
disk.rename_file(
RUSTFS_META_BUCKET,
tmpfile.as_str(),
RUSTFS_META_BUCKET,
FORMAT_CONFIG_FILE,
)
.await?;
// let mut disk = disk;
// disk.set_disk_id(format.erasure.this);
Ok(())
}
#[derive(Debug)]
pub struct Sets {
pub sets: Vec<Objects>,
}
impl Sets {
fn new() -> Result<Self, Error> {
impl StorageAPI for ECStore {
async fn put_object(&self, bucket: &str, objcet: &str) -> Result<(), Error> {
unimplemented!()
}
}
#[derive(Debug)]
pub struct Objects {
pub endpoints: Vec<Endpoint>,
pub disks: Vec<usize>,
pub set_index: usize,
pub pool_index: usize,
pub set_drive_count: usize,
pub default_parity_count: usize,
}
pub trait StorageAPI: Debug + Send + Sync + 'static {}
#[derive(Debug, thiserror::Error)]
pub enum ErasureError {
#[error("erasure read quorum")]
ErasureReadQuorum,
#[error("erasure write quorum")]
ErasureWriteQuorum,
#[error("not first disk")]
NotFirstDisk,
#[error("first disk wiat")]
FirstDiskWait,
}

5
ecstore/src/store_api.rs Normal file
View File

@@ -0,0 +1,5 @@
use anyhow::Error;
pub trait StorageAPI {
async fn put_object(&self, bucket: &str, objcet: &str) -> Result<(), Error>;
}

289
ecstore/src/store_init.rs Normal file
View File

@@ -0,0 +1,289 @@
use bytes::Bytes;
use futures::future::join_all;
use uuid::Uuid;
use crate::{
disk::{DiskError, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET},
format::{FormatErasureVersion, FormatMetaVersion, FormatV3},
};
use anyhow::{Error, Result};
use std::{
collections::{hash_map::Entry, HashMap},
fmt::Debug,
};
pub async fn do_init_format_file(
first_disk: bool,
disks: &Vec<Option<DiskStore>>,
set_count: usize,
set_drive_count: usize,
deployment_id: Option<Uuid>,
) -> Result<FormatV3, Error> {
let (formats, errs) = read_format_file_all(&disks, false).await;
DiskError::check_disk_fatal_errs(&errs)?;
check_format_erasure_values(&formats, set_drive_count)?;
if first_disk && should_init_erasure_disks(&errs) {
// UnformattedDisk, not format file create
// new format and save
let fms = init_format_files(&disks, set_count, set_drive_count, deployment_id);
let _errs = save_format_file_all(&disks, &fms).await;
// TODO: check quorum
// reduceWriteQuorumErrs(&errs)?;
let fm = get_format_file_in_quorum(&fms)?;
return Ok(fm);
}
let unformatted = DiskError::quorum_unformatted_disks(&errs);
if unformatted && !first_disk {
return Err(Error::new(ErasureError::NotFirstDisk));
}
if unformatted && first_disk {
return Err(Error::new(ErasureError::FirstDiskWait));
}
let fm = get_format_file_in_quorum(&formats)?;
return Ok(fm);
}
fn init_format_files(
disks: &Vec<Option<DiskStore>>,
set_count: usize,
set_drive_count: usize,
deployment_id: Option<Uuid>,
) -> Vec<Option<FormatV3>> {
let fm = FormatV3::new(set_count, set_drive_count);
let mut fms = vec![None; disks.len()];
for i in 0..set_count {
for j in 0..set_drive_count {
let idx = i * set_drive_count + j;
let mut newfm = fm.clone();
newfm.erasure.this = fm.erasure.sets[i][j];
if deployment_id.is_some() {
newfm.id = deployment_id.unwrap();
}
fms[idx] = Some(newfm);
}
}
fms
}
fn get_format_file_in_quorum(formats: &Vec<Option<FormatV3>>) -> Result<FormatV3> {
let mut countmap = HashMap::new();
for f in formats.iter() {
if f.is_some() {
let ds = f.as_ref().unwrap().drives();
let v = countmap.entry(ds);
match v {
Entry::Occupied(mut entry) => *entry.get_mut() += 1,
Entry::Vacant(vacant) => {
vacant.insert(1);
}
};
}
}
let (max_drives, max_count) = countmap.iter().max_by_key(|&(_, c)| c).unwrap_or((&0, &0));
if *max_drives == 0 || *max_count < formats.len() / 2 {
return Err(Error::new(ErasureError::ErasureReadQuorum));
}
let format = formats
.iter()
.filter(|f| f.is_some() && f.as_ref().unwrap().drives() == *max_drives)
.next()
.ok_or(Error::new(ErasureError::ErasureReadQuorum))?;
let mut format = format.as_ref().unwrap().clone();
format.erasure.this = Uuid::nil();
Ok(format)
}
fn should_init_erasure_disks(errs: &Vec<Option<Error>>) -> bool {
DiskError::count_errs(errs, &DiskError::UnformattedDisk) == errs.len()
}
fn check_format_erasure_values(
formats: &Vec<Option<FormatV3>>,
// disks: &Vec<Option<DiskStore>>,
set_drive_count: usize,
) -> Result<()> {
for f in formats.iter() {
if f.is_none() {
continue;
}
let f = f.as_ref().unwrap();
check_format_erasure_value(f)?;
if formats.len() != f.erasure.sets.len() * f.erasure.sets[0].len() {
return Err(Error::msg("formats length for erasure.sets not mtach"));
}
if f.erasure.sets[0].len() != set_drive_count {
return Err(Error::msg("erasure set length not match set_drive_count"));
}
}
Ok(())
}
fn check_format_erasure_value(format: &FormatV3) -> Result<()> {
if format.version != FormatMetaVersion::V1 {
return Err(Error::msg("invalid FormatMetaVersion"));
}
if format.erasure.version != FormatErasureVersion::V3 {
return Err(Error::msg("invalid FormatErasureVersion"));
}
Ok(())
}
pub fn default_partiy_count(drive: usize) -> usize {
match drive {
1 => 0,
2 | 3 => 1,
4 | 5 => 2,
6 | 7 => 3,
_ => 4,
}
}
// read_format_file_all 读取所有foramt.json
async fn read_format_file_all(
disks: &Vec<Option<DiskStore>>,
heal: bool,
) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(disks.len());
for ep in disks.iter() {
futures.push(read_format_file(ep, heal));
}
let mut datas = Vec::with_capacity(disks.len());
let mut errors = Vec::with_capacity(disks.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(s) => {
datas.push(Some(s));
errors.push(None);
}
Err(e) => {
datas.push(None);
errors.push(Some(e));
}
}
}
(datas, errors)
}
async fn read_format_file(disk: &Option<DiskStore>, _heal: bool) -> Result<FormatV3, Error> {
if disk.is_none() {
return Err(Error::new(DiskError::DiskNotFound));
}
let disk = disk.as_ref().unwrap();
let data = disk
.read_all(RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE)
.await
.map_err(|e| match &e.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) => Error::new(DiskError::UnformattedDisk),
Some(DiskError::DiskNotFound) => Error::new(DiskError::UnformattedDisk),
Some(_) => e,
None => e,
})?;
let fm = FormatV3::try_from(data.as_ref())?;
// TODO: heal
Ok(fm)
}
async fn save_format_file_all(
disks: &Vec<Option<DiskStore>>,
formats: &Vec<Option<FormatV3>>,
) -> Vec<Option<Error>> {
let mut futures = Vec::with_capacity(disks.len());
for (i, ep) in disks.iter().enumerate() {
futures.push(save_format_file(ep, &formats[i]));
}
let mut errors = Vec::with_capacity(disks.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(_) => {
errors.push(None);
}
Err(e) => {
errors.push(Some(e));
}
}
}
errors
}
async fn save_format_file(disk: &Option<DiskStore>, format: &Option<FormatV3>) -> Result<()> {
if disk.is_none() {
return Err(Error::new(DiskError::DiskNotFound));
}
let format = format.as_ref().unwrap();
let json_data = format.to_json().map(|data| Bytes::from(data))?;
let tmpfile = Uuid::new_v4().to_string();
let disk = disk.as_ref().unwrap();
disk.write_all(RUSTFS_META_BUCKET, tmpfile.as_str(), json_data)
.await?;
disk.rename_file(
RUSTFS_META_BUCKET,
tmpfile.as_str(),
RUSTFS_META_BUCKET,
FORMAT_CONFIG_FILE,
)
.await?;
// let mut disk = disk;
// disk.set_disk_id(format.erasure.this);
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum ErasureError {
#[error("erasure read quorum")]
ErasureReadQuorum,
#[error("erasure write quorum")]
_ErasureWriteQuorum,
#[error("not first disk")]
NotFirstDisk,
#[error("first disk wiat")]
FirstDiskWait,
}