diff --git a/Cargo.lock b/Cargo.lock index a3bf3952..d1a8696c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,12 +161,6 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" -[[package]] -name = "bitflags" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" - [[package]] name = "block-buffer" version = "0.10.4" @@ -844,17 +838,7 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", "lock_api", - "parking_lot_core 0.8.6", -] - -[[package]] -name = "parking_lot" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" -dependencies = [ - "lock_api", - "parking_lot_core 0.9.10", + "parking_lot_core", ] [[package]] @@ -866,24 +850,11 @@ dependencies = [ "cfg-if", "instant", "libc", - "redox_syscall 0.2.16", + "redox_syscall", "smallvec", "winapi", ] -[[package]] -name = "parking_lot_core" -version = "0.9.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall 0.5.2", - "smallvec", - "windows-targets 0.52.5", -] - [[package]] name = "path-absolutize" version = "3.1.1" @@ -996,16 +967,7 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" dependencies = [ - "bitflags 1.3.2", -] - -[[package]] -name = "redox_syscall" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" -dependencies = [ - "bitflags 2.6.0", + "bitflags", ] [[package]] @@ -1016,7 +978,7 @@ checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f" dependencies = [ "libm", "lru", - "parking_lot 0.11.2", + "parking_lot", "smallvec", "spin", ] @@ -1370,7 +1332,6 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2", diff --git a/Cargo.toml b/Cargo.toml index 525fcdbc..3e5de4b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,4 +18,5 @@ http = "1.1.0" thiserror = "1.0.61" time = "0.3.36" async-trait = "0.1.80" -tokio = { version = "1.38.0", features = ["full"] } +tokio = { version = "1.38.0", features = ["fs"] } +anyhow = "1.0.86" diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index a132f4f2..0eb5a1ab 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -9,22 +9,25 @@ rust-version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +tokio.workspace = true +bytes.workspace = true +thiserror.workspace = true +futures.workspace = true +async-trait.workspace = true +tracing.workspace = true +serde.workspace = true +anyhow.workspace = true url = "2.5.2" uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] } reed-solomon-erasure = "6.0.0" transform-stream = "0.3.0" -bytes.workspace = true -tokio.workspace = true -thiserror.workspace = true -futures.workspace = true -anyhow = "1.0.86" -serde.workspace = true lazy_static = "1.5.0" regex = "1.10.5" netif = "0.1.6" -async-trait = "0.1.80" -tracing.workspace = true tracing-error = "0.2.0" serde_json.workspace = true path-absolutize = "3.1.1" time.workspace = true + +[dev-dependencies] +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index 8477a4c5..606a1df6 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -8,7 +8,7 @@ use bytes::Bytes; use futures::future::join_all; use path_absolutize::Absolutize; use time::OffsetDateTime; -use tokio::{fs, sync::RwLock}; +use tokio::fs; use uuid::Uuid; use crate::{ diff --git a/ecstore/src/endpoint.rs b/ecstore/src/endpoint.rs index b2aacb6a..00136663 100644 --- a/ecstore/src/endpoint.rs +++ b/ecstore/src/endpoint.rs @@ -1,31 +1,30 @@ -use std::{collections::HashMap, net::IpAddr, path::Path, usize}; - use super::disks_layout::PoolDisksLayout; use super::utils::{ net::{is_local_host, split_host_port}, string::new_string_set, }; use anyhow::Error; -use url::Url; +use std::fmt::Display; +use std::{collections::HashMap, net::IpAddr, path::Path, usize}; +use url::{ParseError, Url}; pub const DEFAULT_PORT: u16 = 9000; -// #[derive(Debug, Clone)] -// struct Node { -// url: url::Url, -// pools: Vec, -// is_local: bool, -// grid_host: String, -// } - +/// enum for endpoint type. #[derive(PartialEq, Eq)] pub enum EndpointType { - Undefiend, - PathEndpointType, - URLEndpointType, + /// path style endpoint type enum. + Path, + + /// URL style endpoint type enum. + Url, + + /// Unknown endpoint type enum. + UnKnow, } -#[derive(Debug, Clone, PartialEq, Eq, Ord)] +/// holds information about a node in this cluster +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Node { pub url: url::Url, pub pools: Vec, @@ -33,24 +32,57 @@ pub struct Node { pub grid_host: String, // TODO "scheme://host:port" } -impl PartialOrd for Node { - fn partial_cmp(&self, other: &Self) -> Option { - self.grid_host.partial_cmp(&other.grid_host) - } -} +// impl PartialOrd for Node { +// fn partial_cmp(&self, other: &Self) -> Option { +// self.grid_host.partial_cmp(&other.grid_host) +// } +// } +/// any type of endpoint. #[derive(Debug, Clone)] pub struct Endpoint { pub url: url::Url, pub is_local: bool, + pub pool_idx: i32, pub set_idx: i32, pub disk_idx: i32, } -// 检查给定路径是否为空或根路径 +impl Display for Endpoint { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.url.has_host() { + write!(f, "{}", self.url) + } else { + write!(f, "{}", self.url.path()) + } + } +} + +impl TryFrom<&str> for Endpoint { + /// The type returned in the event of a conversion error. + type Error = String; + + /// Performs the conversion. + fn try_from(value: &str) -> Result { + if is_empty_path(value) { + return Err("empty or root endpoint is not supported".into()); + } + + // match Url::parse(value) { + // Ok(u) => u, + // Err(e) => match e { + // ParseError::EmptyHost => Err("") + // }, + // } + + unimplemented!() + } +} + +/// check whether given path is not empty. fn is_empty_path(path: &str) -> bool { - path == "" || path == "/" || path == "\\" + ["", "/", "\\"].iter().any(|&v| v.eq(path)) } // 检查给定字符串是否是IP地址 @@ -58,14 +90,6 @@ fn is_host_ip(ip_str: &str) -> bool { ip_str.parse::().is_ok() } -#[tokio::test] -async fn test_new_endpont() { - let arg = "./data"; - let ep = Endpoint::new(arg).unwrap(); - - println!("{:?}", ep); -} - impl Endpoint { fn new(arg: &str) -> Result { if is_empty_path(arg) { @@ -73,16 +97,16 @@ impl Endpoint { } let url = Url::parse(arg).or_else(|e| match e { - url::ParseError::EmptyHost => Err(Error::msg("远程地址,域名不能为空")), - url::ParseError::IdnaError => Err(Error::msg("域名格式不正确")), - url::ParseError::InvalidPort => Err(Error::msg("端口格式不正确")), - url::ParseError::InvalidIpv4Address => Err(Error::msg("IP格式不正确")), - url::ParseError::InvalidIpv6Address => Err(Error::msg("IP格式不正确")), - url::ParseError::InvalidDomainCharacter => Err(Error::msg("域名字符格式不正确")), + ParseError::EmptyHost => Err(Error::msg("远程地址,域名不能为空")), + ParseError::IdnaError => Err(Error::msg("域名格式不正确")), + ParseError::InvalidPort => Err(Error::msg("端口格式不正确")), + ParseError::InvalidIpv4Address => Err(Error::msg("IP格式不正确")), + ParseError::InvalidIpv6Address => Err(Error::msg("IP格式不正确")), + ParseError::InvalidDomainCharacter => Err(Error::msg("域名字符格式不正确")), // url::ParseError::RelativeUrlWithoutBase => todo!(), // url::ParseError::RelativeUrlWithCannotBeABaseBase => todo!(), // url::ParseError::SetHostOnCannotBeABaseUrl => todo!(), - url::ParseError::Overflow => Err(Error::msg("长度过长")), + ParseError::Overflow => Err(Error::msg("长度过长")), _ => { if is_host_ip(arg) { return Err(Error::msg("无效的URL endpoint格式: 缺少 http 或 https")); @@ -160,15 +184,11 @@ impl Endpoint { // } pub fn get_type(&self) -> EndpointType { - if self.url.scheme() == "file" { - return EndpointType::PathEndpointType; + if self.url.has_host() { + EndpointType::Url + } else { + EndpointType::Path } - - EndpointType::URLEndpointType - } - - pub fn to_string(&self) -> String { - self.url.as_str().to_string() } // pub fn get_scheme(&self) -> String { @@ -234,7 +254,7 @@ impl Endpoints { self.0.as_slice()[start..end].to_vec() } pub fn from_args(args: Vec) -> Result { - let mut ep_type = EndpointType::Undefiend; + let mut ep_type = EndpointType::UnKnow; let mut scheme = String::new(); let mut eps = Vec::new(); let mut uniq_args = new_string_set(); @@ -369,7 +389,7 @@ impl EndpointServerPools { let mut nodes: Vec = node_map.into_iter().map(|(_, n)| n).collect(); - nodes.sort_by(|a, b| a.cmp(b)); + // nodes.sort_by(|a, b| a.cmp(b)); nodes } @@ -433,7 +453,7 @@ pub fn create_pool_endpoints( let mut endpoint = Endpoint::new(pools[0].layout[0][0].as_str())?; endpoint.update_islocal()?; - if endpoint.get_type() != EndpointType::PathEndpointType { + if endpoint.get_type() != EndpointType::Path { return Err(Error::msg("use path style endpoint for single node setup")); } @@ -497,12 +517,12 @@ pub fn create_pool_endpoints( let erasure_type = uniq_host.to_slice().len() == 1; for eps in ret.iter() { - if eps.0[0].get_type() == EndpointType::PathEndpointType { + if eps.0[0].get_type() == EndpointType::Path { setup_type = SetupType::ErasureSetupType; break; } - if eps.0[0].get_type() == EndpointType::URLEndpointType { + if eps.0[0].get_type() == EndpointType::Url { if erasure_type { setup_type = SetupType::ErasureSetupType; } else { @@ -553,6 +573,23 @@ mod test { use super::*; + #[test] + fn test_url() { + let path = "/dir/sss"; + + let u = url::Url::parse(path); + + println!("{:#?}", u) + } + + #[test] + fn test_new_endpont() { + let arg = "./data"; + let ep = Endpoint::new(arg).unwrap(); + + println!("{:?}", ep); + } + #[test] fn test_create_server_endpoints() { let cases = vec![( diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index dd443620..d2899b4c 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -73,8 +73,8 @@ mod test { use super::*; - #[tokio::test] - async fn test_erasure() { + #[test] + fn test_erasure() { let data_shards = 3; let parity_shards = 2; let data: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]; diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 16e43ace..85538161 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -8,23 +8,16 @@ rust-version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[[bin]] -name = "rustfs" -# required-features = ["binary"] - - -# [features] -# binary = ["tokio/full", "dep:clap", "dep:tracing-subscriber", "dep:hyper-util"] - [dependencies] -ecstore = { path = "../ecstore" } -clap = { version = "4.5.7", features = ["derive"] } -s3s = { version = "0.10.0" } -anyhow = { version = "1.0.86" } -tracing = { workspace = true } -tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] } +async-trait.workspace = true +tracing.workspace = true +anyhow.workspace = true time = { workspace = true, features = ["parsing", "formatting"] } -async-trait = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "net", "signal"] } + +ecstore = { path = "../ecstore" } + +s3s = "0.10.0" +clap = { version = "4.5.7", features = ["derive"] } +tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] } hyper-util = { version = "0.1.5", features = ["tokio", "server-auto", "server-graceful"] } diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 4e47a63d..b446339f 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -3,7 +3,6 @@ mod storage; use anyhow::Result; use clap::Parser; -use ecstore::store::ECStore; use hyper_util::{ rt::{TokioExecutor, TokioIo}, server::conn::auto::Builder as ConnBuilder, @@ -39,10 +38,9 @@ async fn run(opt: config::Opt) -> Result<()> { debug!("opt: {:?}", &opt); // Setup S3 service let service = { - let mut b = S3ServiceBuilder::new(storage::ecfs::FS::new( - opt.address.clone(), - opt.volumes.clone(), - )?); + let mut b = S3ServiceBuilder::new( + storage::ecfs::FS::new(opt.address.clone(), opt.volumes.clone()).await?, + ); // Enable authentication if let (Some(ak), Some(sk)) = (opt.access_key, opt.secret_key) { diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index d188af8e..e6f8b789 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -15,8 +15,8 @@ pub struct FS { } impl FS { - pub fn new(address: String, endpoints: Vec) -> Result { - let store: ECStore = ECStore::new(address, endpoints)?; + pub async fn new(address: String, endpoints: Vec) -> Result { + let store: ECStore = ECStore::new(address, endpoints).await?; Ok(Self { store }) } } diff --git a/rustfs/src/storage/mod.rs b/rustfs/src/storage/mod.rs index 5b13c1fe..4723cd7d 100644 --- a/rustfs/src/storage/mod.rs +++ b/rustfs/src/storage/mod.rs @@ -1,4 +1 @@ pub mod ecfs; -mod simple_fs; - -pub use simple_fs::SimpleFS; diff --git a/rustfs/src/storage/simple_fs.rs b/rustfs/src/storage/simple_fs.rs deleted file mode 100644 index b8e0a9fd..00000000 --- a/rustfs/src/storage/simple_fs.rs +++ /dev/null @@ -1,6 +0,0 @@ -use s3s::S3; - -pub struct SimpleFS {} - -#[async_trait::async_trait] -impl S3 for SimpleFS {}