From 669808069dc41d4cbfbef27fe77f2bc36ab33e0a Mon Sep 17 00:00:00 2001 From: "shiro.lee" Date: Mon, 5 Aug 2024 23:06:37 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=B0=83=E6=95=B4init=5Fdisks=E4=BD=8D?= =?UTF-8?q?=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecstore/src/disk/local.rs | 65 ++++----------------------------------- ecstore/src/disk/mod.rs | 21 +++++++++++-- ecstore/src/store.rs | 4 +-- ecstore/src/store_init.rs | 61 +++++++++++++++++++++++++----------- 4 files changed, 69 insertions(+), 82 deletions(-) diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 269831b5..4c4a2714 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -2,13 +2,17 @@ use super::{endpoint::Endpoint, error::DiskError, format::FormatV3}; use super::{ DeleteOptions, DiskAPI, FileReader, FileWriter, ReadMultipleReq, ReadMultipleResp, ReadOptions, RenameDataResp, VolumeInfo, }; +use crate::{ + error::{Error, Result}, + file_meta::FileMeta, + store_api::{FileInfo, RawFileInfo}, + utils, +}; use bytes::Bytes; -use futures::future::join_all; use path_absolutize::Absolutize; use std::{ fs::Metadata, path::{Path, PathBuf}, - sync::Arc, }; use time::OffsetDateTime; use tokio::fs::{self, File}; @@ -16,63 +20,6 @@ use tokio::io::ErrorKind; use tracing::{debug, warn}; use uuid::Uuid; -use crate::{ - endpoints::Endpoints, - error::{Error, Result}, - file_meta::FileMeta, - store_api::{FileInfo, RawFileInfo}, - utils, -}; - -pub type DiskStore = Arc>; - -pub struct DiskOption { - pub cleanup: bool, - pub health_check: bool, -} - -pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result { - if ep.is_local { - let s = LocalDisk::new(ep, opt.cleanup).await?; - Ok(Arc::new(Box::new(s))) - } else { - let _ = opt.health_check; - unimplemented!() - // Ok(Disk::Remote(RemoteDisk::new(ep, opt.health_check)?)) - } -} - -pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec>, Vec>) { - let mut futures = Vec::with_capacity(eps.as_ref().len()); - - for ep in eps.as_ref().iter() { - futures.push(new_disk(ep, opt)); - } - - let mut res = Vec::with_capacity(eps.as_ref().len()); - let mut errors = Vec::with_capacity(eps.as_ref().len()); - - let results = join_all(futures).await; - for result in results { - match result { - Ok(s) => { - res.push(Some(s)); - errors.push(None); - } - Err(e) => { - res.push(None); - errors.push(Some(e)); - } - } - } - - (res, errors) -} - -// pub async fn load_format(&self, heal: bool) -> Result { -// unimplemented!() -// } - #[derive(Debug)] pub struct LocalDisk { pub root: PathBuf, diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index 31bf4872..17377a08 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -3,8 +3,6 @@ pub mod error; pub mod format; mod local; -pub use local::{init_disks, DiskOption, DiskStore}; - pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys"; pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart"; pub const RUSTFS_META_TMP_BUCKET: &str = ".rustfs.sys/tmp"; @@ -19,7 +17,7 @@ use crate::{ store_api::{FileInfo, RawFileInfo}, }; use bytes::Bytes; -use std::{fmt::Debug, io::SeekFrom, pin::Pin}; +use std::{fmt::Debug, io::SeekFrom, pin::Pin, sync::Arc}; use time::OffsetDateTime; use tokio::{ fs::File, @@ -27,6 +25,18 @@ use tokio::{ }; use uuid::Uuid; +pub type DiskStore = Arc>; + +pub async fn new_disk(ep: &endpoint::Endpoint, opt: &DiskOption) -> Result { + if ep.is_local { + let s = local::LocalDisk::new(ep, opt.cleanup).await?; + Ok(Arc::new(Box::new(s))) + } else { + let _ = opt.health_check; + unimplemented!() + } +} + #[async_trait::async_trait] pub trait DiskAPI: Debug + Send + Sync + 'static { fn is_local(&self) -> bool; @@ -71,6 +81,11 @@ pub trait DiskAPI: Debug + Send + Sync + 'static { async fn read_multiple(&self, req: ReadMultipleReq) -> Result>; } +pub struct DiskOption { + pub cleanup: bool, + pub health_check: bool, +} + pub struct RenameDataResp { pub old_data_dir: String, } diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index cb991550..cf86481a 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -1,7 +1,7 @@ use crate::{ bucket_meta::BucketMetadata, disk::error::DiskError, - disk::{self, DiskOption, DiskStore, RUSTFS_META_BUCKET}, + disk::{DiskOption, DiskStore, RUSTFS_META_BUCKET}, disks_layout::DisksLayout, endpoints::EndpointServerPools, error::{Error, Result}, @@ -47,7 +47,7 @@ impl ECStore { // TODO: read from config parseStorageClass let partiy_count = store_init::default_partiy_count(pool_eps.drives_per_set); - let (disks, errs) = disk::init_disks( + let (disks, errs) = crate::store_init::init_disks( &pool_eps.endpoints, &DiskOption { cleanup: true, diff --git a/ecstore/src/store_init.rs b/ecstore/src/store_init.rs index f7d16569..a1178d88 100644 --- a/ecstore/src/store_init.rs +++ b/ecstore/src/store_init.rs @@ -1,27 +1,53 @@ -use futures::future::join_all; -use tracing::warn; -use uuid::Uuid; - use crate::{ disk::error::DiskError, disk::format::{FormatErasureVersion, FormatMetaVersion, FormatV3}, - disk::{DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET}, + disk::{new_disk, DiskOption, DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET}, + endpoints::Endpoints, error::{Error, Result}, }; - +use futures::future::join_all; use std::{ collections::{hash_map::Entry, HashMap}, fmt::Debug, }; +use tracing::warn; +use uuid::Uuid; + +pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec>, Vec>) { + let mut futures = Vec::with_capacity(eps.as_ref().len()); + + for ep in eps.as_ref().iter() { + futures.push(new_disk(ep, opt)); + } + + let mut res = Vec::with_capacity(eps.as_ref().len()); + let mut errors = Vec::with_capacity(eps.as_ref().len()); + + let results = join_all(futures).await; + for result in results { + match result { + Ok(s) => { + res.push(Some(s)); + errors.push(None); + } + Err(e) => { + res.push(None); + errors.push(Some(e)); + } + } + } + + (res, errors) +} pub async fn do_init_format_file( first_disk: bool, - disks: &Vec>, + disks: &[Option], set_count: usize, set_drive_count: usize, deployment_id: Option, ) -> Result { - let (formats, errs) = read_format_file_all(&disks, false).await; + let (formats, errs) = read_format_file_all(disks, false).await; DiskError::check_disk_fatal_errs(&errs)?; @@ -30,9 +56,9 @@ pub async fn do_init_format_file( if first_disk && DiskError::should_init_erasure_disks(&errs) { // UnformattedDisk, not format file create // new format and save - let fms = init_format_files(&disks, set_count, set_drive_count, deployment_id); + let fms = init_format_files(disks, set_count, set_drive_count, deployment_id); - let _errs = save_format_file_all(&disks, &fms).await; + let _errs = save_format_file_all(disks, &fms).await; // TODO: check quorum // reduceWriteQuorumErrs(&errs)?; @@ -53,11 +79,11 @@ pub async fn do_init_format_file( let fm = get_format_file_in_quorum(&formats)?; - return Ok(fm); + Ok(fm) } fn init_format_files( - disks: &Vec>, + disks: &[Option], set_count: usize, set_drive_count: usize, deployment_id: Option, @@ -80,7 +106,7 @@ fn init_format_files( fms } -fn get_format_file_in_quorum(formats: &Vec>) -> Result { +fn get_format_file_in_quorum(formats: &[Option]) -> Result { let mut countmap = HashMap::new(); for f in formats.iter() { @@ -105,8 +131,7 @@ fn get_format_file_in_quorum(formats: &Vec>) -> Result>) -> Result>, + formats: &[Option], // disks: &Vec>, set_drive_count: usize, ) -> Result<()> { @@ -160,7 +185,7 @@ pub fn default_partiy_count(drive: usize) -> usize { } } // read_format_file_all 读取所有foramt.json -async fn read_format_file_all(disks: &Vec>, heal: bool) -> (Vec>, Vec>) { +async fn read_format_file_all(disks: &[Option], heal: bool) -> (Vec>, Vec>) { let mut futures = Vec::with_capacity(disks.len()); for ep in disks.iter() { @@ -210,7 +235,7 @@ async fn read_format_file(disk: &Option, _heal: bool) -> Result>, formats: &Vec>) -> Vec> { +async fn save_format_file_all(disks: &[Option], formats: &[Option]) -> Vec> { let mut futures = Vec::with_capacity(disks.len()); for (i, ep) in disks.iter().enumerate() {