init:store

This commit is contained in:
weisd
2024-06-26 15:53:11 +08:00
parent 918a263fd0
commit 03572ebcab
8 changed files with 586 additions and 12 deletions

120
Cargo.lock generated
View File

@@ -161,6 +161,12 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
[[package]]
name = "block-buffer"
version = "0.10.4"
@@ -310,10 +316,13 @@ dependencies = [
"futures",
"lazy_static",
"netif",
"path-absolutize",
"reed-solomon-erasure",
"regex",
"serde",
"serde_json",
"thiserror",
"time",
"tokio",
"tracing",
"tracing-error",
@@ -835,7 +844,17 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
"parking_lot_core 0.8.6",
]
[[package]]
name = "parking_lot"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
dependencies = [
"lock_api",
"parking_lot_core 0.9.10",
]
[[package]]
@@ -847,11 +866,42 @@ dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall",
"redox_syscall 0.2.16",
"smallvec",
"winapi",
]
[[package]]
name = "parking_lot_core"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.5.2",
"smallvec",
"windows-targets 0.52.5",
]
[[package]]
name = "path-absolutize"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4af381fe79fa195b4909485d99f73a80792331df0625188e707854f0b3383f5"
dependencies = [
"path-dedot",
]
[[package]]
name = "path-dedot"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07ba0ad7e047712414213ff67533e6dd477af0a4e1d14fb52343e53d30ea9397"
dependencies = [
"once_cell",
]
[[package]]
name = "percent-encoding"
version = "2.3.1"
@@ -876,6 +926,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro2"
version = "1.0.86"
@@ -904,13 +960,52 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags",
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd"
dependencies = [
"bitflags 2.6.0",
]
[[package]]
@@ -921,7 +1016,7 @@ checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f"
dependencies = [
"libm",
"lru",
"parking_lot",
"parking_lot 0.11.2",
"smallvec",
"spin",
]
@@ -1065,6 +1160,17 @@ dependencies = [
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.118"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d947f6b3163d8857ea16c4fa0dd4840d52f3041039a85decd46867eb1abef2e4"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@@ -1264,6 +1370,7 @@ dependencies = [
"libc",
"mio",
"num_cpus",
"parking_lot 0.12.3",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@@ -1431,6 +1538,11 @@ name = "uuid"
version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439"
dependencies = [
"getrandom",
"rand",
"serde",
]
[[package]]
name = "valuable"

View File

@@ -18,4 +18,4 @@ http = "1.1.0"
thiserror = "1.0.61"
time = "0.3.36"
async-trait = "0.1.80"
tokio = "1.38.0"
tokio = { version = "1.38.0", features = ["full"] }

View File

@@ -10,7 +10,7 @@ rust-version.workspace = true
[dependencies]
url = "2.5.2"
uuid = "1.8.0"
uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] }
reed-solomon-erasure = "6.0.0"
transform-stream = "0.3.0"
bytes.workspace = true
@@ -25,3 +25,6 @@ netif = "0.1.6"
async-trait = "0.1.80"
tracing.workspace = true
tracing-error = "0.2.0"
serde_json.workspace = true
path-absolutize = "3.1.1"
time.workspace = true

201
ecstore/src/disk.rs Normal file
View File

@@ -0,0 +1,201 @@
use std::{
fs::Metadata,
path::{Path, PathBuf},
};
use anyhow::Error;
use bytes::Bytes;
use futures::future::join_all;
use path_absolutize::Absolutize;
use time::OffsetDateTime;
use tokio::{fs, sync::RwLock};
use uuid::Uuid;
use crate::{
endpoint::{Endpoint, Endpoints},
format::{DistributionAlgoVersion, FormatV3},
};
pub const RUSTFS_META_BUCKET: &str = ".rustfs.sys";
pub const FORMAT_CONFIG_FILE: &str = "format.json";
pub struct DiskOption {
pub cleanup: bool,
pub health_check: bool,
}
pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result<impl DiskAPI, Error> {
if ep.is_local {
Ok(LocalDisk::new(ep, opt.cleanup).await?)
} else {
unimplemented!()
// Ok(Disk::Remote(RemoteDisk::new(ep, opt.health_check)?))
}
}
pub async fn init_disks(
eps: &Endpoints,
opt: &DiskOption,
) -> (Vec<Option<impl DiskAPI>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(eps.len());
for ep in eps.iter() {
futures.push(new_disk(ep, opt));
}
let mut storages = Vec::with_capacity(eps.len());
let mut errors = Vec::with_capacity(eps.len());
let results = join_all(futures).await;
for result in results {
match result {
Ok(s) => {
storages.push(Some(s));
errors.push(None);
}
Err(e) => {
storages.push(None);
errors.push(Some(e));
}
}
}
(storages, errors)
}
// pub async fn load_format(&self, heal: bool) -> Result<FormatV3, Error> {
// unimplemented!()
// }
pub struct LocalDisk {
root: PathBuf,
id: Uuid,
format_data: Vec<u8>,
format_meta: Option<Metadata>,
format_path: PathBuf,
format_legacy: bool,
format_last_check: OffsetDateTime,
}
impl LocalDisk {
pub async fn new(ep: &Endpoint, cleanup: bool) -> Result<Self, Error> {
let root = fs::canonicalize(ep.url.path()).await?;
if cleanup {
// TODO: 删除tmp数据
}
let format_path = Path::new(RUSTFS_META_BUCKET)
.join(Path::new(FORMAT_CONFIG_FILE))
.absolutize_virtually(&root)?
.into_owned();
let (format_data, format_meta) = read_file_exists(&format_path).await?;
let mut id = Uuid::nil();
let mut format_legacy = false;
let mut format_last_check = OffsetDateTime::UNIX_EPOCH;
if !format_data.is_empty() {
let s = format_data.as_slice();
let fm = FormatV3::try_from(s)?;
let (set_idx, disk_idx) = fm.find_disk_index_by_disk_id(fm.erasure.this)?;
if set_idx as i32 != ep.set_idx || disk_idx as i32 != ep.disk_idx {
return Err(Error::new(DiskError::InconsistentDisk));
}
id = fm.erasure.this;
format_legacy = fm.erasure.distribution_algo == DistributionAlgoVersion::V1;
format_last_check = OffsetDateTime::now_utc();
}
let disk = Self {
root,
id,
format_meta,
format_data: format_data,
format_path,
format_legacy,
format_last_check,
};
Ok(disk)
}
pub fn resolve_abs_path(&self, path: impl AsRef<Path>) -> Result<PathBuf, Error> {
Ok(path.as_ref().absolutize_virtually(&self.root)?.into_owned())
}
pub fn get_object_path(&self, bucket: &str, key: &str) -> Result<PathBuf, Error> {
let dir = Path::new(&bucket);
let file_path = Path::new(&key);
self.resolve_abs_path(dir.join(file_path))
}
pub fn get_bucket_path(&self, bucket: &str) -> Result<PathBuf, Error> {
let dir = Path::new(&bucket);
self.resolve_abs_path(dir)
}
// pub async fn load_format(&self) -> Result<Option<FormatV3>, Error> {
// let p = self.get_object_path(RUSTFS_META_BUCKET, FORMAT_CONFIG_FILE)?;
// let content = fs::read(&p).await?;
// unimplemented!()
// }
}
// 过滤 std::io::ErrorKind::NotFound
pub async fn read_file_exists(
path: impl AsRef<Path>,
) -> Result<(Vec<u8>, Option<Metadata>), Error> {
let p = path.as_ref();
let meta = match fs::metadata(&p).await {
Ok(meta) => Some(meta),
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => None,
_ => return Err(e.into()),
},
};
let mut data = Vec::new();
if meta.is_some() {
data = fs::read(&p).await?;
}
Ok((data, meta))
}
#[async_trait::async_trait]
impl DiskAPI for LocalDisk {
#[must_use]
async fn read_all(&self, volume: &str, path: &str) -> Result<Vec<u8>, Error> {
let p = self.get_object_path(&volume, &path)?;
if p.exists() {}
let content = fs::read(p).await?;
unimplemented!()
}
}
pub struct RemoteDisk {}
impl RemoteDisk {
pub fn new(ep: &Endpoint, health_check: bool) -> Result<Self, Error> {
Ok(Self {})
}
}
#[async_trait::async_trait]
pub trait DiskAPI {
async fn read_all(&self, volume: &str, path: &str) -> Result<Vec<u8>, Error>;
}
#[derive(Debug, thiserror::Error)]
pub enum DiskError {
#[error("disk not found")]
DiskNotFound,
#[error("InconsistentDisk")]
InconsistentDisk,
}

View File

@@ -218,6 +218,10 @@ impl Endpoints {
Self(Vec::new())
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn iter(&self) -> core::slice::Iter<Endpoint> {
self.0.iter()
}

223
ecstore/src/format.rs Normal file
View File

@@ -0,0 +1,223 @@
use anyhow::Error;
use serde::{Deserialize, Serialize};
use serde_json::Error as JsonError;
use uuid::Uuid;
use crate::disk;
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum FormatMetaVersion {
#[serde(rename = "1")]
V1,
#[serde(other)]
Unknown,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum FormatBackend {
#[serde(rename = "xl")]
Erasure,
#[serde(rename = "xl-single")]
ErasureSingle,
#[serde(other)]
Unknown,
}
/// Represents the V3 backend disk structure version
/// under `.rustfs.sys` and actual data namespace.
///
/// FormatErasureV3 - structure holds format config version '3'.
///
/// The V3 format to support "large bucket" support where a bucket
/// can span multiple erasure sets.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct FormatErasureV3 {
/// Version of 'xl' format.
pub version: FormatErasureVersion,
/// This field carries assigned disk uuid.
pub this: Uuid,
/// Sets field carries the input disk order generated the first
/// time when fresh disks were supplied, it is a two dimensional
/// array second dimension represents list of disks used per set.
pub sets: Vec<Vec<Uuid>>,
/// Distribution algorithm represents the hashing algorithm
/// to pick the right set index for an object.
#[serde(rename = "distributionAlgo")]
pub distribution_algo: DistributionAlgoVersion,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum FormatErasureVersion {
#[serde(rename = "1")]
V1,
#[serde(rename = "2")]
V2,
#[serde(rename = "3")]
V3,
#[serde(other)]
Unknown,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum DistributionAlgoVersion {
#[serde(rename = "CRCMOD")]
V1,
#[serde(rename = "SIPMOD")]
V2,
#[serde(rename = "SIPMOD+PARITY")]
V3,
}
/// format.json currently has the format:
///
/// ```json
/// {
/// "version": "1",
/// "format": "XXXXX",
/// "id": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
/// "XXXXX": {
//
/// }
/// }
/// ```
///
/// Ideally we will never have a situation where we will have to change the
/// fields of this struct and deal with related migration.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct FormatV3 {
/// Version of the format config.
pub version: FormatMetaVersion,
/// Format indicates the backend format type, supports two values 'xl' and 'xl-single'.
pub format: FormatBackend,
/// ID is the identifier for the rustfs deployment
pub id: Uuid,
#[serde(rename = "xl")]
pub erasure: FormatErasureV3,
// /// DiskInfo is an extended type which returns current
// /// disk usage per path.
// #[serde(skip)]
// pub disk_info: Option<data_types::DeskInfo>,
}
impl TryFrom<&[u8]> for FormatV3 {
type Error = JsonError;
fn try_from(data: &[u8]) -> Result<Self, JsonError> {
serde_json::from_slice(data)
}
}
impl TryFrom<&str> for FormatV3 {
type Error = JsonError;
fn try_from(data: &str) -> Result<Self, JsonError> {
serde_json::from_str(data)
}
}
impl FormatV3 {
/// Create a new format config with the given number of sets and set length.
pub fn new(num_sets: usize, set_len: usize) -> Self {
let format = if set_len == 1 {
FormatBackend::ErasureSingle
} else {
FormatBackend::Erasure
};
let erasure = FormatErasureV3 {
version: FormatErasureVersion::V3,
this: Uuid::nil(),
sets: (0..num_sets)
.map(|_| (0..set_len).map(|_| Uuid::new_v4()).collect())
.collect(),
distribution_algo: DistributionAlgoVersion::V3,
};
Self {
version: FormatMetaVersion::V1,
format,
id: Uuid::new_v4(),
erasure,
// disk_info: None,
}
}
/// Returns the number of drives in the erasure set.
pub fn drives(&self) -> usize {
self.erasure.sets.iter().map(|v| v.len()).sum()
}
pub fn to_json(&self) -> Result<String, JsonError> {
serde_json::to_string(self)
}
/// returns the i,j'th position of the input `diskID` against the reference
///
/// format, after successful validation.
/// - i'th position is the set index
/// - j'th position is the disk index in the current set
pub fn find_disk_index_by_disk_id(&self, disk_id: Uuid) -> Result<(usize, usize), Error> {
if disk_id == Uuid::nil() {
return Err(Error::new(disk::DiskError::DiskNotFound));
}
if disk_id == Uuid::max() {
return Err(Error::msg("disk offline"));
}
for (i, set) in self.erasure.sets.iter().enumerate() {
for (j, d) in set.iter().enumerate() {
if disk_id.eq(d) {
return Ok((i, j));
}
}
}
Err(Error::msg(format!("disk id not found {}", disk_id)))
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_format_v1() {
let format = FormatV3::new(1, 4);
let str = serde_json::to_string(&format);
println!("{:?}", str);
let data = r#"
{
"version": "1",
"format": "xl",
"id": "321b3874-987d-4c15-8fa5-757c956b1243",
"xl": {
"version": "1",
"this": null,
"sets": [
[
"8ab9a908-f869-4f1f-8e42-eb067ffa7eb5",
"c26315da-05cf-4778-a9ea-b44ea09f58c5",
"fb87a891-18d3-44cf-a46f-bcc15093a038",
"356a925c-57b9-4313-88b3-053edf1104dc"
]
],
"distributionAlgo": "CRCMOD"
}
}"#;
let p = FormatV3::try_from(data);
println!("{:?}", p);
}
}

View File

@@ -1,8 +1,10 @@
mod disk;
mod disks_layout;
mod ellipses;
mod endpoint;
mod erasure;
pub mod error;
mod format;
pub mod store;
mod stream;
mod utils;

View File

@@ -1,9 +1,14 @@
use uuid::Uuid;
use crate::{disks_layout::DisksLayout, endpoint::create_server_endpoints};
use crate::{
disk::{self, DiskAPI, DiskOption},
disks_layout::DisksLayout,
endpoint::create_server_endpoints,
format::FormatV3,
};
use super::endpoint::Endpoint;
use anyhow::Result;
use anyhow::{Error, Result};
use std::fmt::Debug;
@@ -16,10 +21,20 @@ pub struct ECStore {
}
impl ECStore {
pub fn new(address: String, endpoints: Vec<String>) -> Result<Self> {
pub async fn new(address: String, endpoints: Vec<String>) -> Result<Self> {
let layouts = DisksLayout::new(endpoints)?;
let (pools, _) = create_server_endpoints(address, &layouts.pools, layouts.legacy)?;
for (i, pool_eps) in pools.iter().enumerate() {
let (disks, errs) = disk::init_disks(
&pool_eps.endpoints,
&DiskOption {
cleanup: true,
health_check: true,
},
)
.await;
}
Ok(ECStore {
id: Uuid::nil(),
@@ -27,6 +42,23 @@ impl ECStore {
peer: Vec::new(),
})
}
async fn load_formats(
disks: Vec<Option<impl DiskAPI>>,
heal: bool,
) -> (Vec<FormatV3>, Vec<Error>) {
unimplemented!()
}
fn default_partiy_blocks(drive: usize) -> usize {
match drive {
1 => 0,
2 | 3 => 1,
4 | 5 => 2,
6 | 7 => 3,
_ => 4,
}
}
}
#[derive(Debug)]
@@ -44,7 +76,4 @@ pub struct Objects {
pub default_parity_count: usize,
}
#[async_trait::async_trait]
trait DiskAPI: Debug + Send + Sync + 'static {}
pub trait StorageAPI: Debug + Send + Sync + 'static {}