From 3463fbec818a727c6ef61ac67bc6a57f8ccf7d70 Mon Sep 17 00:00:00 2001 From: weisd Date: Fri, 28 Jun 2024 10:54:58 +0800 Subject: [PATCH] init:store --- ecstore/src/disk.rs | 181 +++++++++---------- ecstore/src/endpoint.rs | 34 +++- ecstore/src/lib.rs | 3 + ecstore/src/sets.rs | 68 ++++++++ ecstore/src/store.rs | 353 +++----------------------------------- ecstore/src/store_api.rs | 5 + ecstore/src/store_init.rs | 289 +++++++++++++++++++++++++++++++ 7 files changed, 515 insertions(+), 418 deletions(-) create mode 100644 ecstore/src/sets.rs create mode 100644 ecstore/src/store_api.rs create mode 100644 ecstore/src/store_init.rs diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index cea24b95..f043b70e 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -77,13 +77,13 @@ pub async fn init_disks( #[derive(Debug)] pub struct LocalDisk { - root: PathBuf, - id: Uuid, - format_data: Vec, - format_meta: Option, - format_path: PathBuf, - format_legacy: bool, - format_last_check: OffsetDateTime, + pub root: PathBuf, + pub id: Uuid, + pub format_data: Vec, + pub format_meta: Option, + pub format_path: PathBuf, + pub format_legacy: bool, + pub format_last_check: OffsetDateTime, } impl LocalDisk { @@ -180,7 +180,7 @@ pub async fn read_file_exists( let (data, meta) = match read_file_all(&p).await { Ok((data, meta)) => (data, Some(meta)), Err(e) => { - if is_err(&e, &DiskError::FileNotFound) { + if DiskError::is_err(&e, &DiskError::FileNotFound) { (Vec::new(), None) } else { return Err(e); @@ -336,13 +336,13 @@ impl DiskAPI for LocalDisk { } } -pub struct RemoteDisk {} +// pub struct RemoteDisk {} -impl RemoteDisk { - pub fn new(_ep: &Endpoint, _health_check: bool) -> Result { - Ok(Self {}) - } -} +// impl RemoteDisk { +// pub fn new(_ep: &Endpoint, _health_check: bool) -> Result { +// Ok(Self {}) +// } +// } #[derive(Debug, thiserror::Error)] pub enum DiskError { @@ -373,88 +373,89 @@ pub enum DiskError { VolumeNotFound, } +impl DiskError { + pub fn check_disk_fatal_errs(errs: &Vec>) -> Result<(), Error> { + if Self::count_errs(errs, &DiskError::UnsupportedDisk) == errs.len() { + return Err(Error::new(DiskError::UnsupportedDisk)); + } + + // if count_errs(errs, &DiskError::DiskAccessDenied) == errs.len() { + // return Err(Error::new(DiskError::DiskAccessDenied)); + // } + + if Self::count_errs(errs, &DiskError::FileAccessDenied) == errs.len() { + return Err(Error::new(DiskError::FileAccessDenied)); + } + + // if count_errs(errs, &DiskError::FaultyDisk) == errs.len() { + // return Err(Error::new(DiskError::FaultyDisk)); + // } + + if Self::count_errs(errs, &DiskError::DiskNotDir) == errs.len() { + return Err(Error::new(DiskError::DiskNotDir)); + } + + // if count_errs(errs, &DiskError::XLBackend) == errs.len() { + // return Err(Error::new(DiskError::XLBackend)); + // } + Ok(()) + } + pub fn count_errs(errs: &Vec>, err: &DiskError) -> usize { + return errs + .iter() + .filter(|&e| { + if e.is_some() { + let e = e.as_ref().unwrap(); + let cast = e.downcast_ref::(); + if cast.is_some() { + let cast = cast.unwrap(); + return cast == err; + } + } + true + }) + .count(); + } + + pub fn quorum_unformatted_disks(errs: &Vec>) -> bool { + Self::count_errs(errs, &DiskError::UnformattedDisk) >= (errs.len() / 2) + 1 + } + + pub fn is_err(err: &Error, disk_err: &DiskError) -> bool { + let cast = err.downcast_ref::(); + if cast.is_none() { + return false; + } + + let e = cast.unwrap(); + + e == disk_err + } + + // pub fn match_err(err: Error, matchs: Vec) -> bool { + // let cast = err.downcast_ref::(); + // if cast.is_none() { + // return false; + // } + + // let e = cast.unwrap(); + + // for i in matchs.iter() { + // if e == i { + // return true; + // } + // } + + // return false; + // } +} + impl PartialEq for DiskError { fn eq(&self, other: &Self) -> bool { core::mem::discriminant(self) == core::mem::discriminant(other) } } -pub fn check_disk_fatal_errs(errs: &Vec>) -> Result<(), Error> { - if count_errs(errs, &DiskError::UnsupportedDisk) == errs.len() { - return Err(Error::new(DiskError::UnsupportedDisk)); - } - - // if count_errs(errs, &DiskError::DiskAccessDenied) == errs.len() { - // return Err(Error::new(DiskError::DiskAccessDenied)); - // } - - if count_errs(errs, &DiskError::FileAccessDenied) == errs.len() { - return Err(Error::new(DiskError::FileAccessDenied)); - } - - // if count_errs(errs, &DiskError::FaultyDisk) == errs.len() { - // return Err(Error::new(DiskError::FaultyDisk)); - // } - - if count_errs(errs, &DiskError::DiskNotDir) == errs.len() { - return Err(Error::new(DiskError::DiskNotDir)); - } - - // if count_errs(errs, &DiskError::XLBackend) == errs.len() { - // return Err(Error::new(DiskError::XLBackend)); - // } - Ok(()) -} - -pub fn quorum_unformatted_disks(errs: &Vec>) -> bool { - count_errs(errs, &DiskError::UnformattedDisk) >= (errs.len() / 2) + 1 -} - -pub fn count_errs(errs: &Vec>, err: &DiskError) -> usize { - return errs - .iter() - .filter(|&e| { - if e.is_some() { - let e = e.as_ref().unwrap(); - let cast = e.downcast_ref::(); - if cast.is_some() { - let cast = cast.unwrap(); - return cast == err; - } - } - true - }) - .count(); -} - -pub fn is_err(err: &Error, disk_err: &DiskError) -> bool { - let cast = err.downcast_ref::(); - if cast.is_none() { - return false; - } - - let e = cast.unwrap(); - - e == disk_err -} - -pub fn match_err(err: Error, matchs: Vec) -> bool { - let cast = err.downcast_ref::(); - if cast.is_none() { - return false; - } - - let e = cast.unwrap(); - - for i in matchs.iter() { - if e == i { - return true; - } - } - - return false; -} - #[cfg(test)] mod test { diff --git a/ecstore/src/endpoint.rs b/ecstore/src/endpoint.rs index cc2c8f6b..7a88e88c 100644 --- a/ecstore/src/endpoint.rs +++ b/ecstore/src/endpoint.rs @@ -311,7 +311,7 @@ impl PoolEndpointList { // PoolEndpoints represent endpoints in a given pool // along with its setCount and setDriveCount. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct PoolEndpoints { // indicates if endpoints are provided in non-ellipses style pub legacy: bool, @@ -331,6 +331,36 @@ impl EndpointServerPools { Self(Vec::new()) } + // create_server_endpoints + pub fn create_server_endpoints( + server_addr: String, + pool_args: &Vec, + legacy: bool, + ) -> Result<(EndpointServerPools, SetupType), Error> { + if pool_args.is_empty() { + return Err(Error::msg("无效参数")); + } + + let (pooleps, setup_type) = create_pool_endpoints(server_addr, pool_args)?; + + let mut ret = EndpointServerPools::new(); + + for (i, eps) in pooleps.iter().enumerate() { + let ep = PoolEndpoints { + legacy: legacy, + set_count: pool_args[i].layout.len(), + drives_per_set: pool_args[i].layout[0].len(), + endpoints: eps.clone(), + cmd_line: pool_args[i].cmdline.clone(), + platform: String::new(), + }; + + ret.add(ep)?; + } + + Ok((ret, setup_type)) + } + pub fn first_is_local(&self) -> bool { if self.0.is_empty() { return false; @@ -537,7 +567,7 @@ pub fn create_pool_endpoints( } // create_server_endpoints -pub fn create_server_endpoints( +fn create_server_endpoints( server_addr: String, pool_args: &Vec, legacy: bool, diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index 34f42ddd..828e6ed3 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -6,6 +6,9 @@ mod endpoint; mod erasure; pub mod error; mod format; +mod sets; pub mod store; +mod store_api; +mod store_init; mod stream; mod utils; diff --git a/ecstore/src/sets.rs b/ecstore/src/sets.rs new file mode 100644 index 00000000..de4d20ad --- /dev/null +++ b/ecstore/src/sets.rs @@ -0,0 +1,68 @@ +use anyhow::Error; +use uuid::Uuid; + +use crate::{endpoint::PoolEndpoints, format::FormatV3}; + +#[derive(Debug)] +pub struct Sets { + pub id: Uuid, + // pub sets: Vec, + pub disk_indexs: Vec>, // [set_count_idx][set_drive_count_idx] = disk_idx + pub pool_idx: usize, + pub endpoints: PoolEndpoints, + pub format: FormatV3, + pub partiy_count: usize, + pub set_count: usize, + pub set_drive_count: usize, +} + +impl Sets { + pub fn new( + endpoints: &PoolEndpoints, + fm: &FormatV3, + pool_idx: usize, + partiy_count: usize, + ) -> Result { + let set_count = fm.erasure.sets.len(); + let set_drive_count = fm.erasure.sets[0].len(); + + let mut disk_indexs = Vec::with_capacity(set_count); + + for i in 0..set_count { + let mut set_indexs = Vec::with_capacity(set_drive_count); + for j in 0..set_drive_count { + let idx = i * set_drive_count + j; + set_indexs.push(idx); + } + + disk_indexs.push(set_indexs); + } + + let sets = Self { + id: fm.id.clone(), + // sets: todo!(), + disk_indexs, + pool_idx, + endpoints: endpoints.clone(), + format: fm.clone(), + partiy_count, + set_count, + set_drive_count, + }; + + Ok(sets) + } + pub fn get_disks(&self, set_idx: usize) -> Vec { + self.disk_indexs[set_idx].clone() + } +} + +// #[derive(Debug)] +// pub struct Objects { +// pub endpoints: Vec, +// pub disks: Vec, +// pub set_index: usize, +// pub pool_index: usize, +// pub set_drive_count: usize, +// pub default_parity_count: usize, +// } diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index f3293df8..c3573841 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -1,52 +1,44 @@ -use bytes::Bytes; -use futures::future::join_all; +use std::collections::HashMap; + +use anyhow::Error; use uuid::Uuid; use crate::{ - disk::{ - self, check_disk_fatal_errs, count_errs, quorum_unformatted_disks, DiskError, DiskOption, - DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET, - }, - disk_api::DiskAPI, + disk::{self, DiskError, DiskOption, DiskStore}, disks_layout::DisksLayout, - endpoint::create_server_endpoints, - format::{FormatErasureVersion, FormatMetaVersion, FormatV3}, + endpoint::EndpointServerPools, + sets::Sets, + store_api::StorageAPI, + store_init, }; -use super::endpoint::Endpoint; -use anyhow::{Error, Result}; - -use std::{ - collections::{hash_map::Entry, HashMap}, - fmt::Debug, -}; - -type StoreDisk = dyn DiskAPI; - #[derive(Debug)] pub struct ECStore { pub id: uuid::Uuid, - // pub disks: Vec>, - disk_map: HashMap>>>, + // pub disks: Vec, + pub disk_map: HashMap>>, pub pools: Vec, pub peer: Vec, } impl ECStore { - pub async fn new(address: String, endpoints: Vec) -> Result { + pub async fn new(address: String, endpoints: Vec) -> Result { let layouts = DisksLayout::new(&endpoints)?; let mut deployment_id = None; - let mut setsv = Vec::with_capacity(endpoints.len()); + let (endpoint_pools, _) = + EndpointServerPools::create_server_endpoints(address, &layouts.pools, layouts.legacy)?; - let (pools, _) = create_server_endpoints(address, &layouts.pools, layouts.legacy)?; + let mut pools = Vec::with_capacity(endpoint_pools.len()); + let mut disk_map = HashMap::with_capacity(endpoint_pools.len()); - let mut disk_map = HashMap::with_capacity(pools.len()); + let first_is_local = endpoint_pools.first_is_local(); - let first_is_local = pools.first_is_local(); + for (i, pool_eps) in endpoint_pools.iter().enumerate() { + // TODO: read from config parseStorageClass + let partiy_count = store_init::default_partiy_count(pool_eps.drives_per_set); - for (i, pool_eps) in pools.iter().enumerate() { let (disks, errs) = disk::init_disks( &pool_eps.endpoints, &DiskOption { @@ -56,9 +48,9 @@ impl ECStore { ) .await; - check_disk_fatal_errs(&errs)?; + DiskError::check_disk_fatal_errs(&errs)?; - let fm = do_init_format_file( + let fm = store_init::do_init_format_file( first_is_local, &disks, pool_eps.set_count, @@ -81,313 +73,22 @@ impl ECStore { disk_map.insert(i, disks); - let sets = Sets::new()?; + let sets = Sets::new(pool_eps, &fm, i, partiy_count)?; - setsv.push(sets); + pools.push(sets); } Ok(ECStore { - id: Uuid::nil(), + id: deployment_id.unwrap(), disk_map, - pools: Vec::new(), + pools, peer: Vec::new(), }) } } -async fn do_init_format_file( - first_disk: bool, - disks: &Vec>, - set_count: usize, - set_drive_count: usize, - deployment_id: Option, -) -> Result { - let (formats, errs) = read_format_file_all(&disks, false).await; - - check_disk_fatal_errs(&errs)?; - - check_format_erasure_values(&formats, &disks, set_drive_count)?; - - if first_disk && should_init_erasure_disks(&errs) { - // UnformattedDisk, not format file create - // new format and save - let fms = init_format_files(&disks, set_count, set_drive_count, deployment_id); - - let _errs = save_format_file_all(&disks, &fms).await; - - // TODO: check quorum - // reduceWriteQuorumErrs(&errs)?; - - let fm = get_format_file_in_quorum(&fms)?; - - return Ok(fm); - } - - let unformatted = quorum_unformatted_disks(&errs); - if unformatted && !first_disk { - return Err(Error::new(ErasureError::NotFirstDisk)); - } - - if unformatted && first_disk { - return Err(Error::new(ErasureError::FirstDiskWait)); - } - - let fm = get_format_file_in_quorum(&formats)?; - - return Ok(fm); -} - -fn init_format_files( - disks: &Vec>, - set_count: usize, - set_drive_count: usize, - deployment_id: Option, -) -> Vec> { - let fm = FormatV3::new(set_count, set_drive_count); - let mut fms = vec![None; disks.len()]; - for i in 0..set_count { - for j in 0..set_drive_count { - let idx = i * set_drive_count + j; - let mut newfm = fm.clone(); - newfm.erasure.this = fm.erasure.sets[i][j]; - if deployment_id.is_some() { - newfm.id = deployment_id.unwrap(); - } - - fms[idx] = Some(newfm); - } - } - - fms -} - -fn get_format_file_in_quorum(formats: &Vec>) -> Result { - let mut countmap = HashMap::new(); - - for f in formats.iter() { - if f.is_some() { - let ds = f.as_ref().unwrap().drives(); - let v = countmap.entry(ds); - match v { - Entry::Occupied(mut entry) => *entry.get_mut() += 1, - Entry::Vacant(vacant) => { - vacant.insert(1); - } - }; - } - } - - let (max_drives, max_count) = countmap.iter().max_by_key(|&(_, c)| c).unwrap_or((&0, &0)); - - if *max_drives == 0 || *max_count < formats.len() / 2 { - return Err(Error::new(ErasureError::ErasureReadQuorum)); - } - - let format = formats - .iter() - .filter(|f| f.is_some() && f.as_ref().unwrap().drives() == *max_drives) - .next() - .ok_or(Error::new(ErasureError::ErasureReadQuorum))?; - - let mut format = format.as_ref().unwrap().clone(); - format.erasure.this = Uuid::nil(); - - Ok(format) -} - -fn should_init_erasure_disks(errs: &Vec>) -> bool { - count_errs(errs, &DiskError::UnformattedDisk) == errs.len() -} - -fn check_format_erasure_values( - formats: &Vec>, - disks: &Vec>, - set_drive_count: usize, -) -> Result<()> { - for f in formats.iter() { - if f.is_none() { - continue; - } - - let f = f.as_ref().unwrap(); - - check_format_erasure_value(f)?; - - if formats.len() != f.erasure.sets.len() * f.erasure.sets[0].len() { - return Err(Error::msg("formats length for erasure.sets not mtach")); - } - - if f.erasure.sets[0].len() != set_drive_count { - return Err(Error::msg("erasure set length not match set_drive_count")); - } - } - Ok(()) -} -fn check_format_erasure_value(format: &FormatV3) -> Result<()> { - if format.version != FormatMetaVersion::V1 { - return Err(Error::msg("invalid FormatMetaVersion")); - } - - if format.erasure.version != FormatErasureVersion::V3 { - return Err(Error::msg("invalid FormatErasureVersion")); - } - Ok(()) -} - -fn default_partiy_blocks(drive: usize) -> usize { - match drive { - 1 => 0, - 2 | 3 => 1, - 4 | 5 => 2, - 6 | 7 => 3, - _ => 4, - } -} -// read_format_file_all 读取所有foramt.json -async fn read_format_file_all( - disks: &Vec>, - heal: bool, -) -> (Vec>, Vec>) { - let mut futures = Vec::with_capacity(disks.len()); - - for ep in disks.iter() { - futures.push(read_format_file(ep, heal)); - } - - let mut datas = Vec::with_capacity(disks.len()); - let mut errors = Vec::with_capacity(disks.len()); - - let results = join_all(futures).await; - for result in results { - match result { - Ok(s) => { - datas.push(Some(s)); - errors.push(None); - } - Err(e) => { - datas.push(None); - errors.push(Some(e)); - } - } - } - - (datas, errors) -} - -async fn read_format_file(disk: &Option, heal: bool) -> Result { - if disk.is_none() { - return Err(Error::new(DiskError::DiskNotFound)); - } - let disk = disk.as_ref().unwrap(); - - let data = disk - .read_all(RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE) - .await - .map_err(|e| match &e.downcast_ref::() { - Some(DiskError::FileNotFound) => Error::new(DiskError::UnformattedDisk), - Some(DiskError::DiskNotFound) => Error::new(DiskError::UnformattedDisk), - Some(_) => e, - None => e, - })?; - - let fm = FormatV3::try_from(data.as_ref())?; - - // TODO: heal - - Ok(fm) -} - -async fn save_format_file_all( - disks: &Vec>, - formats: &Vec>, -) -> Vec> { - let mut futures = Vec::with_capacity(disks.len()); - - for (i, ep) in disks.iter().enumerate() { - futures.push(save_format_file(ep, &formats[i])); - } - - let mut errors = Vec::with_capacity(disks.len()); - - let results = join_all(futures).await; - for result in results { - match result { - Ok(_) => { - errors.push(None); - } - Err(e) => { - errors.push(Some(e)); - } - } - } - - errors -} - -async fn save_format_file(disk: &Option, format: &Option) -> Result<()> { - if disk.is_none() { - return Err(Error::new(DiskError::DiskNotFound)); - } - - let format = format.as_ref().unwrap(); - - let json_data = format.to_json().map(|data| Bytes::from(data))?; - - let tmpfile = Uuid::new_v4().to_string(); - - let disk = disk.as_ref().unwrap(); - disk.write_all(RUSTFS_META_BUCKET, tmpfile.as_str(), json_data) - .await?; - - disk.rename_file( - RUSTFS_META_BUCKET, - tmpfile.as_str(), - RUSTFS_META_BUCKET, - FORMAT_CONFIG_FILE, - ) - .await?; - - // let mut disk = disk; - - // disk.set_disk_id(format.erasure.this); - - Ok(()) -} - -#[derive(Debug)] -pub struct Sets { - pub sets: Vec, -} - -impl Sets { - fn new() -> Result { +impl StorageAPI for ECStore { + async fn put_object(&self, bucket: &str, objcet: &str) -> Result<(), Error> { unimplemented!() } } - -#[derive(Debug)] -pub struct Objects { - pub endpoints: Vec, - pub disks: Vec, - pub set_index: usize, - pub pool_index: usize, - pub set_drive_count: usize, - pub default_parity_count: usize, -} - -pub trait StorageAPI: Debug + Send + Sync + 'static {} - -#[derive(Debug, thiserror::Error)] -pub enum ErasureError { - #[error("erasure read quorum")] - ErasureReadQuorum, - - #[error("erasure write quorum")] - ErasureWriteQuorum, - - #[error("not first disk")] - NotFirstDisk, - - #[error("first disk wiat")] - FirstDiskWait, -} diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs new file mode 100644 index 00000000..fdb42106 --- /dev/null +++ b/ecstore/src/store_api.rs @@ -0,0 +1,5 @@ +use anyhow::Error; + +pub trait StorageAPI { + async fn put_object(&self, bucket: &str, objcet: &str) -> Result<(), Error>; +} diff --git a/ecstore/src/store_init.rs b/ecstore/src/store_init.rs new file mode 100644 index 00000000..9b54a7a6 --- /dev/null +++ b/ecstore/src/store_init.rs @@ -0,0 +1,289 @@ +use bytes::Bytes; +use futures::future::join_all; +use uuid::Uuid; + +use crate::{ + disk::{DiskError, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET}, + format::{FormatErasureVersion, FormatMetaVersion, FormatV3}, +}; + +use anyhow::{Error, Result}; + +use std::{ + collections::{hash_map::Entry, HashMap}, + fmt::Debug, +}; + +pub async fn do_init_format_file( + first_disk: bool, + disks: &Vec>, + set_count: usize, + set_drive_count: usize, + deployment_id: Option, +) -> Result { + let (formats, errs) = read_format_file_all(&disks, false).await; + + DiskError::check_disk_fatal_errs(&errs)?; + + check_format_erasure_values(&formats, set_drive_count)?; + + if first_disk && should_init_erasure_disks(&errs) { + // UnformattedDisk, not format file create + // new format and save + let fms = init_format_files(&disks, set_count, set_drive_count, deployment_id); + + let _errs = save_format_file_all(&disks, &fms).await; + + // TODO: check quorum + // reduceWriteQuorumErrs(&errs)?; + + let fm = get_format_file_in_quorum(&fms)?; + + return Ok(fm); + } + + let unformatted = DiskError::quorum_unformatted_disks(&errs); + if unformatted && !first_disk { + return Err(Error::new(ErasureError::NotFirstDisk)); + } + + if unformatted && first_disk { + return Err(Error::new(ErasureError::FirstDiskWait)); + } + + let fm = get_format_file_in_quorum(&formats)?; + + return Ok(fm); +} + +fn init_format_files( + disks: &Vec>, + set_count: usize, + set_drive_count: usize, + deployment_id: Option, +) -> Vec> { + let fm = FormatV3::new(set_count, set_drive_count); + let mut fms = vec![None; disks.len()]; + for i in 0..set_count { + for j in 0..set_drive_count { + let idx = i * set_drive_count + j; + let mut newfm = fm.clone(); + newfm.erasure.this = fm.erasure.sets[i][j]; + if deployment_id.is_some() { + newfm.id = deployment_id.unwrap(); + } + + fms[idx] = Some(newfm); + } + } + + fms +} + +fn get_format_file_in_quorum(formats: &Vec>) -> Result { + let mut countmap = HashMap::new(); + + for f in formats.iter() { + if f.is_some() { + let ds = f.as_ref().unwrap().drives(); + let v = countmap.entry(ds); + match v { + Entry::Occupied(mut entry) => *entry.get_mut() += 1, + Entry::Vacant(vacant) => { + vacant.insert(1); + } + }; + } + } + + let (max_drives, max_count) = countmap.iter().max_by_key(|&(_, c)| c).unwrap_or((&0, &0)); + + if *max_drives == 0 || *max_count < formats.len() / 2 { + return Err(Error::new(ErasureError::ErasureReadQuorum)); + } + + let format = formats + .iter() + .filter(|f| f.is_some() && f.as_ref().unwrap().drives() == *max_drives) + .next() + .ok_or(Error::new(ErasureError::ErasureReadQuorum))?; + + let mut format = format.as_ref().unwrap().clone(); + format.erasure.this = Uuid::nil(); + + Ok(format) +} + +fn should_init_erasure_disks(errs: &Vec>) -> bool { + DiskError::count_errs(errs, &DiskError::UnformattedDisk) == errs.len() +} + +fn check_format_erasure_values( + formats: &Vec>, + // disks: &Vec>, + set_drive_count: usize, +) -> Result<()> { + for f in formats.iter() { + if f.is_none() { + continue; + } + + let f = f.as_ref().unwrap(); + + check_format_erasure_value(f)?; + + if formats.len() != f.erasure.sets.len() * f.erasure.sets[0].len() { + return Err(Error::msg("formats length for erasure.sets not mtach")); + } + + if f.erasure.sets[0].len() != set_drive_count { + return Err(Error::msg("erasure set length not match set_drive_count")); + } + } + Ok(()) +} +fn check_format_erasure_value(format: &FormatV3) -> Result<()> { + if format.version != FormatMetaVersion::V1 { + return Err(Error::msg("invalid FormatMetaVersion")); + } + + if format.erasure.version != FormatErasureVersion::V3 { + return Err(Error::msg("invalid FormatErasureVersion")); + } + Ok(()) +} + +pub fn default_partiy_count(drive: usize) -> usize { + match drive { + 1 => 0, + 2 | 3 => 1, + 4 | 5 => 2, + 6 | 7 => 3, + _ => 4, + } +} +// read_format_file_all 读取所有foramt.json +async fn read_format_file_all( + disks: &Vec>, + heal: bool, +) -> (Vec>, Vec>) { + let mut futures = Vec::with_capacity(disks.len()); + + for ep in disks.iter() { + futures.push(read_format_file(ep, heal)); + } + + let mut datas = Vec::with_capacity(disks.len()); + let mut errors = Vec::with_capacity(disks.len()); + + let results = join_all(futures).await; + for result in results { + match result { + Ok(s) => { + datas.push(Some(s)); + errors.push(None); + } + Err(e) => { + datas.push(None); + errors.push(Some(e)); + } + } + } + + (datas, errors) +} + +async fn read_format_file(disk: &Option, _heal: bool) -> Result { + if disk.is_none() { + return Err(Error::new(DiskError::DiskNotFound)); + } + let disk = disk.as_ref().unwrap(); + + let data = disk + .read_all(RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE) + .await + .map_err(|e| match &e.downcast_ref::() { + Some(DiskError::FileNotFound) => Error::new(DiskError::UnformattedDisk), + Some(DiskError::DiskNotFound) => Error::new(DiskError::UnformattedDisk), + Some(_) => e, + None => e, + })?; + + let fm = FormatV3::try_from(data.as_ref())?; + + // TODO: heal + + Ok(fm) +} + +async fn save_format_file_all( + disks: &Vec>, + formats: &Vec>, +) -> Vec> { + let mut futures = Vec::with_capacity(disks.len()); + + for (i, ep) in disks.iter().enumerate() { + futures.push(save_format_file(ep, &formats[i])); + } + + let mut errors = Vec::with_capacity(disks.len()); + + let results = join_all(futures).await; + for result in results { + match result { + Ok(_) => { + errors.push(None); + } + Err(e) => { + errors.push(Some(e)); + } + } + } + + errors +} + +async fn save_format_file(disk: &Option, format: &Option) -> Result<()> { + if disk.is_none() { + return Err(Error::new(DiskError::DiskNotFound)); + } + + let format = format.as_ref().unwrap(); + + let json_data = format.to_json().map(|data| Bytes::from(data))?; + + let tmpfile = Uuid::new_v4().to_string(); + + let disk = disk.as_ref().unwrap(); + disk.write_all(RUSTFS_META_BUCKET, tmpfile.as_str(), json_data) + .await?; + + disk.rename_file( + RUSTFS_META_BUCKET, + tmpfile.as_str(), + RUSTFS_META_BUCKET, + FORMAT_CONFIG_FILE, + ) + .await?; + + // let mut disk = disk; + + // disk.set_disk_id(format.erasure.this); + + Ok(()) +} + +#[derive(Debug, thiserror::Error)] +pub enum ErasureError { + #[error("erasure read quorum")] + ErasureReadQuorum, + + #[error("erasure write quorum")] + _ErasureWriteQuorum, + + #[error("not first disk")] + NotFirstDisk, + + #[error("first disk wiat")] + FirstDiskWait, +}