init:store

This commit is contained in:
weisd
2024-06-26 16:02:25 +08:00
11 changed files with 121 additions and 137 deletions

47
Cargo.lock generated
View File

@@ -161,12 +161,6 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
[[package]]
name = "block-buffer"
version = "0.10.4"
@@ -844,17 +838,7 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core 0.8.6",
]
[[package]]
name = "parking_lot"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
dependencies = [
"lock_api",
"parking_lot_core 0.9.10",
"parking_lot_core",
]
[[package]]
@@ -866,24 +850,11 @@ dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall 0.2.16",
"redox_syscall",
"smallvec",
"winapi",
]
[[package]]
name = "parking_lot_core"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.5.2",
"smallvec",
"windows-targets 0.52.5",
]
[[package]]
name = "path-absolutize"
version = "3.1.1"
@@ -996,16 +967,7 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd"
dependencies = [
"bitflags 2.6.0",
"bitflags",
]
[[package]]
@@ -1016,7 +978,7 @@ checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f"
dependencies = [
"libm",
"lru",
"parking_lot 0.11.2",
"parking_lot",
"smallvec",
"spin",
]
@@ -1370,7 +1332,6 @@ dependencies = [
"libc",
"mio",
"num_cpus",
"parking_lot 0.12.3",
"pin-project-lite",
"signal-hook-registry",
"socket2",

View File

@@ -18,4 +18,5 @@ http = "1.1.0"
thiserror = "1.0.61"
time = "0.3.36"
async-trait = "0.1.80"
tokio = { version = "1.38.0", features = ["full"] }
tokio = { version = "1.38.0", features = ["fs"] }
anyhow = "1.0.86"

View File

@@ -9,22 +9,25 @@ rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio.workspace = true
bytes.workspace = true
thiserror.workspace = true
futures.workspace = true
async-trait.workspace = true
tracing.workspace = true
serde.workspace = true
anyhow.workspace = true
url = "2.5.2"
uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] }
reed-solomon-erasure = "6.0.0"
transform-stream = "0.3.0"
bytes.workspace = true
tokio.workspace = true
thiserror.workspace = true
futures.workspace = true
anyhow = "1.0.86"
serde.workspace = true
lazy_static = "1.5.0"
regex = "1.10.5"
netif = "0.1.6"
async-trait = "0.1.80"
tracing.workspace = true
tracing-error = "0.2.0"
serde_json.workspace = true
path-absolutize = "3.1.1"
time.workspace = true
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }

View File

@@ -8,7 +8,7 @@ use bytes::Bytes;
use futures::future::join_all;
use path_absolutize::Absolutize;
use time::OffsetDateTime;
use tokio::{fs, sync::RwLock};
use tokio::fs;
use uuid::Uuid;
use crate::{

View File

@@ -1,31 +1,30 @@
use std::{collections::HashMap, net::IpAddr, path::Path, usize};
use super::disks_layout::PoolDisksLayout;
use super::utils::{
net::{is_local_host, split_host_port},
string::new_string_set,
};
use anyhow::Error;
use url::Url;
use std::fmt::Display;
use std::{collections::HashMap, net::IpAddr, path::Path, usize};
use url::{ParseError, Url};
pub const DEFAULT_PORT: u16 = 9000;
// #[derive(Debug, Clone)]
// struct Node {
// url: url::Url,
// pools: Vec<usize>,
// is_local: bool,
// grid_host: String,
// }
/// enum for endpoint type.
#[derive(PartialEq, Eq)]
pub enum EndpointType {
Undefiend,
PathEndpointType,
URLEndpointType,
/// path style endpoint type enum.
Path,
/// URL style endpoint type enum.
Url,
/// Unknown endpoint type enum.
UnKnow,
}
#[derive(Debug, Clone, PartialEq, Eq, Ord)]
/// holds information about a node in this cluster
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Node {
pub url: url::Url,
pub pools: Vec<i32>,
@@ -33,24 +32,57 @@ pub struct Node {
pub grid_host: String, // TODO "scheme://host:port"
}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.grid_host.partial_cmp(&other.grid_host)
}
}
// impl PartialOrd for Node {
// fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
// self.grid_host.partial_cmp(&other.grid_host)
// }
// }
/// any type of endpoint.
#[derive(Debug, Clone)]
pub struct Endpoint {
pub url: url::Url,
pub is_local: bool,
pub pool_idx: i32,
pub set_idx: i32,
pub disk_idx: i32,
}
// 检查给定路径是否为空或根路径
impl Display for Endpoint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.url.has_host() {
write!(f, "{}", self.url)
} else {
write!(f, "{}", self.url.path())
}
}
}
impl TryFrom<&str> for Endpoint {
/// The type returned in the event of a conversion error.
type Error = String;
/// Performs the conversion.
fn try_from(value: &str) -> Result<Self, Self::Error> {
if is_empty_path(value) {
return Err("empty or root endpoint is not supported".into());
}
// match Url::parse(value) {
// Ok(u) => u,
// Err(e) => match e {
// ParseError::EmptyHost => Err("")
// },
// }
unimplemented!()
}
}
/// check whether given path is not empty.
fn is_empty_path(path: &str) -> bool {
path == "" || path == "/" || path == "\\"
["", "/", "\\"].iter().any(|&v| v.eq(path))
}
// 检查给定字符串是否是IP地址
@@ -58,14 +90,6 @@ fn is_host_ip(ip_str: &str) -> bool {
ip_str.parse::<IpAddr>().is_ok()
}
#[tokio::test]
async fn test_new_endpont() {
let arg = "./data";
let ep = Endpoint::new(arg).unwrap();
println!("{:?}", ep);
}
impl Endpoint {
fn new(arg: &str) -> Result<Self, Error> {
if is_empty_path(arg) {
@@ -73,16 +97,16 @@ impl Endpoint {
}
let url = Url::parse(arg).or_else(|e| match e {
url::ParseError::EmptyHost => Err(Error::msg("远程地址,域名不能为空")),
url::ParseError::IdnaError => Err(Error::msg("域名格式不正确")),
url::ParseError::InvalidPort => Err(Error::msg("端口格式不正确")),
url::ParseError::InvalidIpv4Address => Err(Error::msg("IP格式不正确")),
url::ParseError::InvalidIpv6Address => Err(Error::msg("IP格式不正确")),
url::ParseError::InvalidDomainCharacter => Err(Error::msg("域名字符格式不正确")),
ParseError::EmptyHost => Err(Error::msg("远程地址,域名不能为空")),
ParseError::IdnaError => Err(Error::msg("域名格式不正确")),
ParseError::InvalidPort => Err(Error::msg("端口格式不正确")),
ParseError::InvalidIpv4Address => Err(Error::msg("IP格式不正确")),
ParseError::InvalidIpv6Address => Err(Error::msg("IP格式不正确")),
ParseError::InvalidDomainCharacter => Err(Error::msg("域名字符格式不正确")),
// url::ParseError::RelativeUrlWithoutBase => todo!(),
// url::ParseError::RelativeUrlWithCannotBeABaseBase => todo!(),
// url::ParseError::SetHostOnCannotBeABaseUrl => todo!(),
url::ParseError::Overflow => Err(Error::msg("长度过长")),
ParseError::Overflow => Err(Error::msg("长度过长")),
_ => {
if is_host_ip(arg) {
return Err(Error::msg("无效的URL endpoint格式: 缺少 http 或 https"));
@@ -160,15 +184,11 @@ impl Endpoint {
// }
pub fn get_type(&self) -> EndpointType {
if self.url.scheme() == "file" {
return EndpointType::PathEndpointType;
if self.url.has_host() {
EndpointType::Url
} else {
EndpointType::Path
}
EndpointType::URLEndpointType
}
pub fn to_string(&self) -> String {
self.url.as_str().to_string()
}
// pub fn get_scheme(&self) -> String {
@@ -234,7 +254,7 @@ impl Endpoints {
self.0.as_slice()[start..end].to_vec()
}
pub fn from_args(args: Vec<String>) -> Result<Self, Error> {
let mut ep_type = EndpointType::Undefiend;
let mut ep_type = EndpointType::UnKnow;
let mut scheme = String::new();
let mut eps = Vec::new();
let mut uniq_args = new_string_set();
@@ -369,7 +389,7 @@ impl EndpointServerPools {
let mut nodes: Vec<Node> = node_map.into_iter().map(|(_, n)| n).collect();
nodes.sort_by(|a, b| a.cmp(b));
// nodes.sort_by(|a, b| a.cmp(b));
nodes
}
@@ -433,7 +453,7 @@ pub fn create_pool_endpoints(
let mut endpoint = Endpoint::new(pools[0].layout[0][0].as_str())?;
endpoint.update_islocal()?;
if endpoint.get_type() != EndpointType::PathEndpointType {
if endpoint.get_type() != EndpointType::Path {
return Err(Error::msg("use path style endpoint for single node setup"));
}
@@ -497,12 +517,12 @@ pub fn create_pool_endpoints(
let erasure_type = uniq_host.to_slice().len() == 1;
for eps in ret.iter() {
if eps.0[0].get_type() == EndpointType::PathEndpointType {
if eps.0[0].get_type() == EndpointType::Path {
setup_type = SetupType::ErasureSetupType;
break;
}
if eps.0[0].get_type() == EndpointType::URLEndpointType {
if eps.0[0].get_type() == EndpointType::Url {
if erasure_type {
setup_type = SetupType::ErasureSetupType;
} else {
@@ -553,6 +573,23 @@ mod test {
use super::*;
#[test]
fn test_url() {
let path = "/dir/sss";
let u = url::Url::parse(path);
println!("{:#?}", u)
}
#[test]
fn test_new_endpont() {
let arg = "./data";
let ep = Endpoint::new(arg).unwrap();
println!("{:?}", ep);
}
#[test]
fn test_create_server_endpoints() {
let cases = vec![(

View File

@@ -73,8 +73,8 @@ mod test {
use super::*;
#[tokio::test]
async fn test_erasure() {
#[test]
fn test_erasure() {
let data_shards = 3;
let parity_shards = 2;
let data: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];

View File

@@ -8,23 +8,16 @@ rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bin]]
name = "rustfs"
# required-features = ["binary"]
# [features]
# binary = ["tokio/full", "dep:clap", "dep:tracing-subscriber", "dep:hyper-util"]
[dependencies]
ecstore = { path = "../ecstore" }
clap = { version = "4.5.7", features = ["derive"] }
s3s = { version = "0.10.0" }
anyhow = { version = "1.0.86" }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] }
async-trait.workspace = true
tracing.workspace = true
anyhow.workspace = true
time = { workspace = true, features = ["parsing", "formatting"] }
async-trait = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "net", "signal"] }
ecstore = { path = "../ecstore" }
s3s = "0.10.0"
clap = { version = "4.5.7", features = ["derive"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] }
hyper-util = { version = "0.1.5", features = ["tokio", "server-auto", "server-graceful"] }

View File

@@ -3,7 +3,6 @@ mod storage;
use anyhow::Result;
use clap::Parser;
use ecstore::store::ECStore;
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder as ConnBuilder,
@@ -39,10 +38,9 @@ async fn run(opt: config::Opt) -> Result<()> {
debug!("opt: {:?}", &opt);
// Setup S3 service
let service = {
let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new(
opt.address.clone(),
opt.volumes.clone(),
)?);
let mut b = S3ServiceBuilder::new(
storage::ecfs::FS::new(opt.address.clone(), opt.volumes.clone()).await?,
);
// Enable authentication
if let (Some(ak), Some(sk)) = (opt.access_key, opt.secret_key) {

View File

@@ -15,8 +15,8 @@ pub struct FS {
}
impl FS {
pub fn new(address: String, endpoints: Vec<String>) -> Result<Self> {
let store: ECStore = ECStore::new(address, endpoints)?;
pub async fn new(address: String, endpoints: Vec<String>) -> Result<Self> {
let store: ECStore = ECStore::new(address, endpoints).await?;
Ok(Self { store })
}
}

View File

@@ -1,4 +1 @@
pub mod ecfs;
mod simple_fs;
pub use simple_fs::SimpleFS;

View File

@@ -1,6 +0,0 @@
use s3s::S3;
pub struct SimpleFS {}
#[async_trait::async_trait]
impl S3 for SimpleFS {}