init:store

This commit is contained in:
weisd
2024-06-27 18:18:38 +08:00
parent fd626ec94d
commit 8d820c3096
7 changed files with 476 additions and 76 deletions

View File

@@ -4,6 +4,7 @@ use std::{
};
use anyhow::Error;
use bytes::Bytes;
use futures::future::join_all;
use path_absolutize::Absolutize;
use time::OffsetDateTime;
@@ -12,20 +13,26 @@ use tokio::io::ErrorKind;
use uuid::Uuid;
use crate::{
disk_api::DiskAPI,
endpoint::{Endpoint, Endpoints},
format::{DistributionAlgoVersion, FormatV3},
};
pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys";
pub const RUSTFS_META_MULTIPART_BUCKET: &str = ".rustfs.sys/multipart";
pub const RUSTFS_META_TMP_BUCKET: &str = ".rustfs.sys/tmp";
pub const RUSTFS_META_TMP_DELETED_BUCKET: &str = ".rustfs.sys/tmp/.trash";
pub const BUCKET_META_PREFIX: &str = "buckets";
pub const FORMAT_CONFIG_FILE: &str = "format.json";
pub type DiskStore = Box<dyn DiskAPI>;
pub struct DiskOption {
pub cleanup: bool,
pub health_check: bool,
}
pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result<impl DiskAPI, Error> {
pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result<DiskStore, Error> {
if ep.is_local {
Ok(LocalDisk::new(ep, opt.cleanup).await?)
} else {
@@ -37,7 +44,7 @@ pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result<impl DiskAPI, E
pub async fn init_disks(
eps: &Endpoints,
opt: &DiskOption,
) -> (Vec<Option<impl DiskAPI>>, Vec<Option<Error>>) {
) -> (Vec<Option<DiskStore>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(eps.len());
for ep in eps.iter() {
@@ -68,6 +75,7 @@ pub async fn init_disks(
// unimplemented!()
// }
#[derive(Debug)]
pub struct LocalDisk {
root: PathBuf,
id: Uuid,
@@ -79,7 +87,7 @@ pub struct LocalDisk {
}
impl LocalDisk {
pub async fn new(ep: &Endpoint, cleanup: bool) -> Result<Self, Error> {
pub async fn new(ep: &Endpoint, cleanup: bool) -> Result<Box<Self>, Error> {
let root = fs::canonicalize(ep.url.path()).await?;
if cleanup {
@@ -123,7 +131,7 @@ impl LocalDisk {
disk.make_meta_volumes().await?;
Ok(disk)
Ok(Box::new(disk))
}
async fn make_meta_volumes(&self) -> Result<(), Error> {
@@ -190,25 +198,112 @@ pub async fn read_file_exists(
pub async fn read_file_all(path: impl AsRef<Path>) -> Result<(Vec<u8>, Metadata), Error> {
let p = path.as_ref();
let meta = fs::metadata(&p).await.map_err(|e| match e.kind() {
ErrorKind::NotFound => Error::new(DiskError::FileNotFound),
ErrorKind::PermissionDenied => Error::new(DiskError::FileAccessDenied),
_ => Error::new(e),
})?;
let meta = read_file_metadata(&path).await?;
let data = fs::read(&p).await?;
Ok((data, meta))
}
pub async fn read_file_metadata(p: impl AsRef<Path>) -> Result<Metadata, Error> {
let meta = fs::metadata(&p).await.map_err(|e| match e.kind() {
ErrorKind::NotFound => Error::new(DiskError::FileNotFound),
ErrorKind::PermissionDenied => Error::new(DiskError::FileAccessDenied),
_ => Error::new(e),
})?;
Ok(meta)
}
pub async fn check_volume_exists(p: impl AsRef<Path>) -> Result<(), Error> {
fs::metadata(&p).await.map_err(|e| match e.kind() {
ErrorKind::NotFound => Error::new(DiskError::VolumeNotFound),
ErrorKind::PermissionDenied => Error::new(DiskError::FileAccessDenied),
_ => Error::new(e),
})?;
Ok(())
}
fn skip_access_checks(p: impl AsRef<str>) -> bool {
let vols = vec![
RUSTFS_META_TMP_DELETED_BUCKET,
RUSTFS_META_TMP_BUCKET,
RUSTFS_META_MULTIPART_BUCKET,
RUSTFS_META_BUCKET,
];
for v in vols.iter() {
if p.as_ref().starts_with(v) {
return true;
}
}
false
}
#[async_trait::async_trait]
impl DiskAPI for LocalDisk {
#[must_use]
async fn read_all(&self, volume: &str, path: &str) -> Result<Vec<u8>, Error> {
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes, Error> {
let p = self.get_object_path(&volume, &path)?;
let (data, _) = read_file_all(&p).await?;
Ok(data)
Ok(Bytes::from(data))
}
async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<(), Error> {
let p = self.get_object_path(&volume, &path)?;
// create top dir if not exists
fs::create_dir_all(&p.parent().unwrap_or_else(|| Path::new("."))).await?;
fs::write(&p, data).await?;
Ok(())
}
async fn rename_file(
&self,
src_volume: &str,
src_path: &str,
dst_volume: &str,
dst_path: &str,
) -> Result<(), Error> {
if !skip_access_checks(&src_volume) {
check_volume_exists(&src_volume).await?;
}
if !skip_access_checks(&dst_volume) {
check_volume_exists(&dst_volume).await?;
}
let srcp = self.get_object_path(&src_volume, &src_path)?;
let dstp = self.get_object_path(&dst_volume, &dst_path)?;
let src_is_dir = srcp.is_dir();
let dst_is_dir = dstp.is_dir();
if !(src_is_dir && dst_is_dir || !src_is_dir && !dst_is_dir) {
return Err(Error::new(DiskError::FileAccessDenied));
}
// TODO: check path length
if src_is_dir {
// TODO: remove dst_dir
}
fs::create_dir_all(dstp.parent().unwrap_or_else(|| Path::new("."))).await?;
let mut idx = 0;
loop {
if let Err(e) = fs::rename(&srcp, &dstp).await {
if e.kind() == ErrorKind::NotFound && idx == 0 {
idx += 1;
continue;
}
};
break;
}
Ok(())
}
async fn make_volumes(&self, volumes: Vec<&str>) -> Result<(), Error> {
@@ -249,14 +344,6 @@ impl RemoteDisk {
}
}
#[async_trait::async_trait]
pub trait DiskAPI {
async fn read_all(&self, volume: &str, path: &str) -> Result<Vec<u8>, Error>;
async fn make_volumes(&self, volume: Vec<&str>) -> Result<(), Error>;
async fn make_volume(&self, volume: &str) -> Result<(), Error>;
}
#[derive(Debug, thiserror::Error)]
pub enum DiskError {
#[error("file not found")]
@@ -281,6 +368,9 @@ pub enum DiskError {
#[error("disk not a dir")]
DiskNotDir,
#[error("volume not found")]
VolumeNotFound,
}
impl PartialEq for DiskError {
@@ -316,6 +406,10 @@ pub fn check_disk_fatal_errs(errs: &Vec<Option<Error>>) -> Result<(), Error> {
Ok(())
}
pub fn quorum_unformatted_disks(errs: &Vec<Option<Error>>) -> bool {
count_errs(errs, &DiskError::UnformattedDisk) >= (errs.len() / 2) + 1
}
pub fn count_errs(errs: &Vec<Option<Error>>, err: &DiskError) -> usize {
return errs
.iter()
@@ -367,7 +461,22 @@ mod test {
use super::*;
#[tokio::test]
async fn test_check_errs() {}
async fn test_skip_access_checks() {
// let arr = Vec::new();
let vols = vec![
RUSTFS_META_TMP_DELETED_BUCKET,
RUSTFS_META_TMP_BUCKET,
RUSTFS_META_MULTIPART_BUCKET,
RUSTFS_META_BUCKET,
];
let paths: Vec<_> = vols.iter().map(|v| Path::new(v).join("test")).collect();
for p in paths.iter() {
assert!(skip_access_checks(p.to_str().unwrap()));
}
}
#[tokio::test]
async fn test_make_volume() {
@@ -384,6 +493,12 @@ mod test {
let disk = LocalDisk::new(&ep, false).await.unwrap();
let tmpp = disk
.resolve_abs_path(Path::new(RUSTFS_META_TMP_DELETED_BUCKET))
.unwrap();
println!("ppp :{:?}", &tmpp);
let volumes = vec!["a", "b", "c"];
disk.make_volumes(volumes.clone()).await.unwrap();

20
ecstore/src/disk_api.rs Normal file
View File

@@ -0,0 +1,20 @@
use std::fmt::Debug;
use anyhow::Error;
use bytes::Bytes;
#[async_trait::async_trait]
pub trait DiskAPI: Debug + Send + Sync + 'static {
async fn read_all(&self, volume: &str, path: &str) -> Result<Bytes, Error>;
async fn write_all(&self, volume: &str, path: &str, data: Bytes) -> Result<(), Error>;
async fn rename_file(
&self,
src_volume: &str,
src_path: &str,
dst_volume: &str,
dst_path: &str,
) -> Result<(), Error>;
async fn make_volumes(&self, volume: Vec<&str>) -> Result<(), Error>;
async fn make_volume(&self, volume: &str) -> Result<(), Error>;
}

View File

@@ -18,7 +18,7 @@ pub struct DisksLayout {
}
impl DisksLayout {
pub fn new(args: Vec<String>) -> Result<DisksLayout, Error> {
pub fn new(args: &Vec<String>) -> Result<DisksLayout, Error> {
if args.is_empty() {
return Err(Error::msg("Invalid argument"));
}
@@ -358,7 +358,7 @@ mod test {
let mut args = Vec::new();
args.push(pattern);
match DisksLayout::new(args) {
match DisksLayout::new(&args) {
Ok(set) => {
for pool in set.pools {
println!("cmd: {:?}", pool.cmdline);

View File

@@ -601,7 +601,7 @@ mod test {
)];
for (addr, args) in cases {
let layouts = DisksLayout::new(args).unwrap();
let layouts = DisksLayout::new(&args).unwrap();
println!("layouts:{:?},{}", &layouts.pools, &layouts.legacy);

View File

@@ -70,3 +70,5 @@ macro_rules! try_ {
}
};
}

View File

@@ -1,4 +1,5 @@
mod disk;
mod disk_api;
mod disks_layout;
mod ellipses;
mod endpoint;

View File

@@ -1,31 +1,51 @@
use bytes::Bytes;
use futures::future::join_all;
use uuid::Uuid;
use crate::{
disk::{self, DiskAPI, DiskError, DiskOption, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET},
disk::{
self, check_disk_fatal_errs, count_errs, quorum_unformatted_disks, DiskError, DiskOption,
DiskStore, FORMAT_CONFIG_FILE, RUSTFS_META_BUCKET,
},
disk_api::DiskAPI,
disks_layout::DisksLayout,
endpoint::create_server_endpoints,
format::FormatV3,
format::{FormatErasureVersion, FormatMetaVersion, FormatV3},
};
use super::endpoint::Endpoint;
use anyhow::{Error, Result};
use std::fmt::Debug;
use std::{
collections::{hash_map::Entry, HashMap},
fmt::Debug,
};
type StoreDisk = dyn DiskAPI;
#[derive(Debug)]
pub struct ECStore {
pub id: uuid::Uuid,
// pub disks: Vec<Box<dyn DiskAPI>>,
disk_map: HashMap<usize, Vec<Option<Box<dyn DiskAPI>>>>,
pub pools: Vec<Sets>,
pub peer: Vec<String>,
}
impl ECStore {
pub async fn new(address: String, endpoints: Vec<String>) -> Result<Self> {
let layouts = DisksLayout::new(endpoints)?;
let layouts = DisksLayout::new(&endpoints)?;
let mut deployment_id = None;
let mut setsv = Vec::with_capacity(endpoints.len());
let (pools, _) = create_server_endpoints(address, &layouts.pools, layouts.legacy)?;
let mut disk_map = HashMap::with_capacity(pools.len());
let first_is_local = pools.first_is_local();
for (i, pool_eps) in pools.iter().enumerate() {
let (disks, errs) = disk::init_disks(
&pool_eps.endpoints,
@@ -35,82 +55,303 @@ impl ECStore {
},
)
.await;
check_disk_fatal_errs(&errs)?;
let fm = do_init_format_file(
first_is_local,
&disks,
pool_eps.set_count,
pool_eps.drives_per_set,
deployment_id,
)
.await?;
if deployment_id.is_none() {
deployment_id = Some(fm.id.clone());
}
if deployment_id != Some(fm.id) {
return Err(Error::msg("deployment_id not same in one pool"));
}
if deployment_id.is_some() && deployment_id.unwrap().is_nil() {
deployment_id = Some(Uuid::new_v4());
}
disk_map.insert(i, disks);
let sets = Sets::new()?;
setsv.push(sets);
}
Ok(ECStore {
id: Uuid::nil(),
disk_map,
pools: Vec::new(),
peer: Vec::new(),
})
}
}
async fn init_format(disks: Vec<Option<impl DiskAPI>>) -> Result<FormatV3, Error> {
let (formats, errs) = Self::load_format_all(&disks, false).await;
unimplemented!()
async fn do_init_format_file(
first_disk: bool,
disks: &Vec<Option<DiskStore>>,
set_count: usize,
set_drive_count: usize,
deployment_id: Option<Uuid>,
) -> Result<FormatV3, Error> {
let (formats, errs) = read_format_file_all(&disks, false).await;
check_disk_fatal_errs(&errs)?;
check_format_erasure_values(&formats, &disks, set_drive_count)?;
if first_disk && should_init_erasure_disks(&errs) {
// UnformattedDisk, not format file create
// new format and save
let fms = init_format_files(&disks, set_count, set_drive_count, deployment_id);
let _errs = save_format_file_all(&disks, &fms).await;
// TODO: check quorum
// reduceWriteQuorumErrs(&errs)?;
let fm = get_format_file_in_quorum(&fms)?;
return Ok(fm);
}
async fn load_format_all(
disks: &Vec<Option<impl DiskAPI>>,
heal: bool,
) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(disks.len());
let unformatted = quorum_unformatted_disks(&errs);
if unformatted && !first_disk {
return Err(Error::new(ErasureError::NotFirstDisk));
}
for ep in disks.iter() {
futures.push(Self::load_format(ep, heal));
if unformatted && first_disk {
return Err(Error::new(ErasureError::FirstDiskWait));
}
let fm = get_format_file_in_quorum(&formats)?;
return Ok(fm);
}
fn init_format_files(
disks: &Vec<Option<DiskStore>>,
set_count: usize,
set_drive_count: usize,
deployment_id: Option<Uuid>,
) -> Vec<Option<FormatV3>> {
let fm = FormatV3::new(set_count, set_drive_count);
let mut fms = vec![None; disks.len()];
for i in 0..set_count {
for j in 0..set_drive_count {
let idx = i * set_drive_count + j;
let mut newfm = fm.clone();
newfm.erasure.this = fm.erasure.sets[i][j];
if deployment_id.is_some() {
newfm.id = deployment_id.unwrap();
}
fms[idx] = Some(newfm);
}
}
fms
}
fn get_format_file_in_quorum(formats: &Vec<Option<FormatV3>>) -> Result<FormatV3> {
let mut countmap = HashMap::new();
for f in formats.iter() {
if f.is_some() {
let ds = f.as_ref().unwrap().drives();
let v = countmap.entry(ds);
match v {
Entry::Occupied(mut entry) => *entry.get_mut() += 1,
Entry::Vacant(vacant) => {
vacant.insert(1);
}
};
}
}
let (max_drives, max_count) = countmap.iter().max_by_key(|&(_, c)| c).unwrap_or((&0, &0));
if *max_drives == 0 || *max_count < formats.len() / 2 {
return Err(Error::new(ErasureError::ErasureReadQuorum));
}
let format = formats
.iter()
.filter(|f| f.is_some() && f.as_ref().unwrap().drives() == *max_drives)
.next()
.ok_or(Error::new(ErasureError::ErasureReadQuorum))?;
let mut format = format.as_ref().unwrap().clone();
format.erasure.this = Uuid::nil();
Ok(format)
}
fn should_init_erasure_disks(errs: &Vec<Option<Error>>) -> bool {
count_errs(errs, &DiskError::UnformattedDisk) == errs.len()
}
fn check_format_erasure_values(
formats: &Vec<Option<FormatV3>>,
disks: &Vec<Option<DiskStore>>,
set_drive_count: usize,
) -> Result<()> {
for f in formats.iter() {
if f.is_none() {
continue;
}
let mut datas = Vec::with_capacity(disks.len());
let mut errors = Vec::with_capacity(disks.len());
let f = f.as_ref().unwrap();
let results = join_all(futures).await;
for result in results {
match result {
Ok(s) => {
datas.push(Some(s));
errors.push(None);
}
Err(e) => {
datas.push(None);
errors.push(Some(e));
}
check_format_erasure_value(f)?;
if formats.len() != f.erasure.sets.len() * f.erasure.sets[0].len() {
return Err(Error::msg("formats length for erasure.sets not mtach"));
}
if f.erasure.sets[0].len() != set_drive_count {
return Err(Error::msg("erasure set length not match set_drive_count"));
}
}
Ok(())
}
fn check_format_erasure_value(format: &FormatV3) -> Result<()> {
if format.version != FormatMetaVersion::V1 {
return Err(Error::msg("invalid FormatMetaVersion"));
}
if format.erasure.version != FormatErasureVersion::V3 {
return Err(Error::msg("invalid FormatErasureVersion"));
}
Ok(())
}
fn default_partiy_blocks(drive: usize) -> usize {
match drive {
1 => 0,
2 | 3 => 1,
4 | 5 => 2,
6 | 7 => 3,
_ => 4,
}
}
// read_format_file_all 读取所有foramt.json
async fn read_format_file_all(
disks: &Vec<Option<DiskStore>>,
heal: bool,
) -> (Vec<Option<FormatV3>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(disks.len());
for ep in disks.iter() {
futures.push(read_format_file(ep, heal));
}
let mut datas = Vec::with_capacity(disks.len());
let mut errors = Vec::with_capacity(disks.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(s) => {
datas.push(Some(s));
errors.push(None);
}
Err(e) => {
datas.push(None);
errors.push(Some(e));
}
}
(datas, errors)
}
async fn load_format(disk: &Option<impl DiskAPI>, heal: bool) -> Result<FormatV3, Error> {
if disk.is_none() {
return Err(Error::new(DiskError::DiskNotFound));
}
let disk = disk.as_ref().unwrap();
(datas, errors)
}
let data = disk
.read_all(RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE)
.await
.map_err(|e| match &e.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) => Error::new(DiskError::UnformattedDisk),
Some(DiskError::DiskNotFound) => Error::new(DiskError::UnformattedDisk),
Some(_) => e,
None => e,
})?;
async fn read_format_file(disk: &Option<DiskStore>, heal: bool) -> Result<FormatV3, Error> {
if disk.is_none() {
return Err(Error::new(DiskError::DiskNotFound));
}
let disk = disk.as_ref().unwrap();
let fm = FormatV3::try_from(data.as_slice())?;
let data = disk
.read_all(RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE)
.await
.map_err(|e| match &e.downcast_ref::<DiskError>() {
Some(DiskError::FileNotFound) => Error::new(DiskError::UnformattedDisk),
Some(DiskError::DiskNotFound) => Error::new(DiskError::UnformattedDisk),
Some(_) => e,
None => e,
})?;
// TODO: heal
let fm = FormatV3::try_from(data.as_ref())?;
Ok(fm)
// TODO: heal
Ok(fm)
}
async fn save_format_file_all(
disks: &Vec<Option<DiskStore>>,
formats: &Vec<Option<FormatV3>>,
) -> Vec<Option<Error>> {
let mut futures = Vec::with_capacity(disks.len());
for (i, ep) in disks.iter().enumerate() {
futures.push(save_format_file(ep, &formats[i]));
}
fn default_partiy_blocks(drive: usize) -> usize {
match drive {
1 => 0,
2 | 3 => 1,
4 | 5 => 2,
6 | 7 => 3,
_ => 4,
let mut errors = Vec::with_capacity(disks.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(_) => {
errors.push(None);
}
Err(e) => {
errors.push(Some(e));
}
}
}
errors
}
async fn save_format_file(disk: &Option<DiskStore>, format: &Option<FormatV3>) -> Result<()> {
if disk.is_none() {
return Err(Error::new(DiskError::DiskNotFound));
}
let format = format.as_ref().unwrap();
let json_data = format.to_json().map(|data| Bytes::from(data))?;
let tmpfile = Uuid::new_v4().to_string();
let disk = disk.as_ref().unwrap();
disk.write_all(RUSTFS_META_BUCKET, tmpfile.as_str(), json_data)
.await?;
disk.rename_file(
RUSTFS_META_BUCKET,
tmpfile.as_str(),
RUSTFS_META_BUCKET,
FORMAT_CONFIG_FILE,
)
.await?;
// let mut disk = disk;
// disk.set_disk_id(format.erasure.this);
Ok(())
}
#[derive(Debug)]
@@ -118,6 +359,12 @@ pub struct Sets {
pub sets: Vec<Objects>,
}
impl Sets {
fn new() -> Result<Self, Error> {
unimplemented!()
}
}
#[derive(Debug)]
pub struct Objects {
pub endpoints: Vec<Endpoint>,
@@ -129,3 +376,18 @@ pub struct Objects {
}
pub trait StorageAPI: Debug + Send + Sync + 'static {}
#[derive(Debug, thiserror::Error)]
pub enum ErasureError {
#[error("erasure read quorum")]
ErasureReadQuorum,
#[error("erasure write quorum")]
ErasureWriteQuorum,
#[error("not first disk")]
NotFirstDisk,
#[error("first disk wiat")]
FirstDiskWait,
}