fix: 调整init_disks位置

This commit is contained in:
shiro.lee
2024-08-05 23:06:37 +08:00
parent d73499ac79
commit 669808069d
4 changed files with 69 additions and 82 deletions

View File

@@ -2,13 +2,17 @@ use super::{endpoint::Endpoint, error::DiskError, format::FormatV3};
use super::{
DeleteOptions, DiskAPI, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, VolumeInfo,
};
use crate::{
error::{Error, Result},
file_meta::FileMeta,
store_api::{FileInfo, RawFileInfo},
utils,
};
use bytes::Bytes;
use futures::future::join_all;
use path_absolutize::Absolutize;
use std::{
fs::Metadata,
path::{Path, PathBuf},
sync::Arc,
};
use time::OffsetDateTime;
use tokio::fs::{self, File};
@@ -16,63 +20,6 @@ use tokio::io::ErrorKind;
use tracing::{debug, warn};
use uuid::Uuid;
use crate::{
endpoints::Endpoints,
error::{Error, Result},
file_meta::FileMeta,
store_api::{FileInfo, RawFileInfo},
utils,
};
pub type DiskStore = Arc<Box<dyn DiskAPI>>;
pub struct DiskOption {
pub cleanup: bool,
pub health_check: bool,
}
pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result<DiskStore> {
if ep.is_local {
let s = LocalDisk::new(ep, opt.cleanup).await?;
Ok(Arc::new(Box::new(s)))
} else {
let _ = opt.health_check;
unimplemented!()
// Ok(Disk::Remote(RemoteDisk::new(ep, opt.health_check)?))
}
}
pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec<Option<DiskStore>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(eps.as_ref().len());
for ep in eps.as_ref().iter() {
futures.push(new_disk(ep, opt));
}
let mut res = Vec::with_capacity(eps.as_ref().len());
let mut errors = Vec::with_capacity(eps.as_ref().len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(s) => {
res.push(Some(s));
errors.push(None);
}
Err(e) => {
res.push(None);
errors.push(Some(e));
}
}
}
(res, errors)
}
// pub async fn load_format(&self, heal: bool) -> Result<FormatV3> {
// unimplemented!()
// }
#[derive(Debug)]
pub struct LocalDisk {
pub root: PathBuf,

View File

@@ -3,8 +3,6 @@ pub mod error;
pub mod format;
mod local;
pub use local::{init_disks, DiskOption, DiskStore};
pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys";
pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart";
pub const RUSTFS_META_TMP_BUCKET: &str = ".rustfs.sys/tmp";
@@ -19,7 +17,7 @@ use crate::{
store_api::{FileInfo, RawFileInfo},
};
use bytes::Bytes;
use std::{fmt::Debug, io::SeekFrom, pin::Pin};
use std::{fmt::Debug, io::SeekFrom, pin::Pin, sync::Arc};
use time::OffsetDateTime;
use tokio::{
fs::File,
@@ -27,6 +25,18 @@ use tokio::{
};
use uuid::Uuid;
pub type DiskStore = Arc<Box<dyn DiskAPI>>;
pub async fn new_disk(ep: &endpoint::Endpoint, opt: &DiskOption) -> Result<DiskStore> {
if ep.is_local {
let s = local::LocalDisk::new(ep, opt.cleanup).await?;
Ok(Arc::new(Box::new(s)))
} else {
let _ = opt.health_check;
unimplemented!()
}
}
#[async_trait::async_trait]
pub trait DiskAPI: Debug + Send + Sync + 'static {
fn is_local(&self) -> bool;
@@ -71,6 +81,11 @@ pub trait DiskAPI: Debug + Send + Sync + 'static {
async fn read_multiple(&self, req: ReadMultipleReq) -> Result<Vec<ReadMultipleResp>>;
}
pub struct DiskOption {
pub cleanup: bool,
pub health_check: bool,
}
pub struct RenameDataResp {
pub old_data_dir: String,
}

View File

@@ -1,7 +1,7 @@
use crate::{
bucket_meta::BucketMetadata,
disk::error::DiskError,
disk::{self, DiskOption, DiskStore, RUSTFS_META_BUCKET},
disk::{DiskOption, DiskStore, RUSTFS_META_BUCKET},
disks_layout::DisksLayout,
endpoints::EndpointServerPools,
error::{Error, Result},
@@ -47,7 +47,7 @@ impl ECStore {
// TODO: read from config parseStorageClass
let partiy_count = store_init::default_partiy_count(pool_eps.drives_per_set);
let (disks, errs) = disk::init_disks(
let (disks, errs) = crate::store_init::init_disks(
&pool_eps.endpoints,
&DiskOption {
cleanup: true,

View File

@@ -1,27 +1,53 @@
use futures::future::join_all;
use tracing::warn;
use uuid::Uuid;
use crate::{
disk::error::DiskError,
disk::format::{FormatErasureVersion, FormatMetaVersion, FormatV3},
disk::{DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET},
disk::{new_disk, DiskOption, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET},
endpoints::Endpoints,
error::{Error, Result},
};
use futures::future::join_all;
use std::{
collections::{hash_map::Entry, HashMap},
fmt::Debug,
};
use tracing::warn;
use uuid::Uuid;
pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec<Option<DiskStore>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(eps.as_ref().len());
for ep in eps.as_ref().iter() {
futures.push(new_disk(ep, opt));
}
let mut res = Vec::with_capacity(eps.as_ref().len());
let mut errors = Vec::with_capacity(eps.as_ref().len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(s) => {
res.push(Some(s));
errors.push(None);
}
Err(e) => {
res.push(None);
errors.push(Some(e));
}
}
}
(res, errors)
}
pub async fn do_init_format_file(
first_disk: bool,
disks: &Vec<Option<DiskStore>>,
disks: &[Option<DiskStore>],
set_count: usize,
set_drive_count: usize,
deployment_id: Option<Uuid>,
) -> Result<FormatV3, Error> {
let (formats, errs) = read_format_file_all(&disks, false).await;
let (formats, errs) = read_format_file_all(disks, false).await;
DiskError::check_disk_fatal_errs(&errs)?;
@@ -30,9 +56,9 @@ pub async fn do_init_format_file(
if first_disk && DiskError::should_init_erasure_disks(&errs) {
// UnformattedDisk, not format file create
// new format and save
let fms = init_format_files(&disks, set_count, set_drive_count, deployment_id);
let fms = init_format_files(disks, set_count, set_drive_count, deployment_id);
let _errs = save_format_file_all(&disks, &fms).await;
let _errs = save_format_file_all(disks, &fms).await;
// TODO: check quorum
// reduceWriteQuorumErrs(&errs)?;
@@ -53,11 +79,11 @@ pub async fn do_init_format_file(
let fm = get_format_file_in_quorum(&formats)?;
return Ok(fm);
Ok(fm)
}
fn init_format_files(
disks: &Vec<Option<DiskStore>>,
disks: &[Option<DiskStore>],
set_count: usize,
set_drive_count: usize,
deployment_id: Option<Uuid>,
@@ -80,7 +106,7 @@ fn init_format_files(
fms
}
fn get_format_file_in_quorum(formats: &Vec<Option<FormatV3>>) -> Result<FormatV3> {
fn get_format_file_in_quorum(formats: &[Option<FormatV3>]) -> Result<FormatV3> {
let mut countmap = HashMap::new();
for f in formats.iter() {
@@ -105,8 +131,7 @@ fn get_format_file_in_quorum(formats: &Vec<Option<FormatV3>>) -> Result<FormatV3
let format = formats
.iter()
.filter(|f| f.is_some() && f.as_ref().unwrap().drives() == *max_drives)
.next()
.find(|f| f.as_ref().map_or(false, |v| v.drives().eq(max_drives)))
.ok_or(Error::new(ErasureError::ErasureReadQuorum))?;
let mut format = format.as_ref().unwrap().clone();
@@ -116,7 +141,7 @@ fn get_format_file_in_quorum(formats: &Vec<Option<FormatV3>>) -> Result<FormatV3
}
fn check_format_erasure_values(
formats: &Vec<Option<FormatV3>>,
formats: &[Option<FormatV3>],
// disks: &Vec<Option<DiskStore>>,
set_drive_count: usize,
) -> Result<()> {
@@ -160,7 +185,7 @@ pub fn default_partiy_count(drive: usize) -> usize {
}
}
// read_format_file_all 读取所有foramt.json
async fn read_format_file_all(disks: &Vec<Option<DiskStore>>, heal: bool) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
async fn read_format_file_all(disks: &[Option<DiskStore>], heal: bool) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(disks.len());
for ep in disks.iter() {
@@ -210,7 +235,7 @@ async fn read_format_file(disk: &Option<DiskStore>, _heal: bool) -> Result<Forma
Ok(fm)
}
async fn save_format_file_all(disks: &Vec<Option<DiskStore>>, formats: &Vec<Option<FormatV3>>) -> Vec<Option<Error>> {
async fn save_format_file_all(disks: &[Option<DiskStore>], formats: &[Option<FormatV3>]) -> Vec<Option<Error>> {
let mut futures = Vec::with_capacity(disks.len());
for (i, ep) in disks.iter().enumerate() {