From 1234db75b19b1b473782bcaae79a7b9a96fd1e0d Mon Sep 17 00:00:00 2001 From: "shiro.lee" Date: Wed, 3 Jul 2024 16:47:27 +0800 Subject: [PATCH 1/4] =?UTF-8?q?fix:=20=E6=9B=BF=E6=8D=A2Error?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecstore/src/disks_layout.rs | 39 +++++++++++++++++++++++++------------ ecstore/src/ellipses.rs | 14 ++++++------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/ecstore/src/disks_layout.rs b/ecstore/src/disks_layout.rs index ca0de51f..94ccc750 100644 --- a/ecstore/src/disks_layout.rs +++ b/ecstore/src/disks_layout.rs @@ -1,5 +1,5 @@ use super::ellipses::*; -use anyhow::{Error, Result}; +use super::error::{Error, Result}; use serde::Deserialize; use std::collections::HashSet; @@ -33,7 +33,7 @@ impl> TryFrom<&[T]> for DisksLayout { fn try_from(args: &[T]) -> Result { if args.is_empty() { - return Err(Error::msg("Invalid argument")); + return Err(Error::from_string("Invalid argument")); } let is_ellipses = args.iter().any(|v| has_ellipses(&[v])); @@ -54,7 +54,9 @@ impl> TryFrom<&[T]> for DisksLayout { let mut layout = Vec::with_capacity(args.len()); for arg in args.iter() { if !has_ellipses(&[arg]) && args.len() > 1 { - return Err(Error::msg("all args must have ellipses for pool expansion (Invalid arguments specified)")); + return Err(Error::from_string( + "all args must have ellipses for pool expansion (Invalid arguments specified)", + )); } let set_args = get_all_sets(is_ellipses, &[arg])?; @@ -69,6 +71,19 @@ impl> TryFrom<&[T]> for DisksLayout { } } +impl DisksLayout { + pub fn is_empty_layout(&self) -> bool { + self.pools.is_empty() + || self.pools[0].layout.is_empty() + || self.pools[0].layout[0].is_empty() + || self.pools[0].layout[0][0].is_empty() + } + + pub fn is_single_drive_layout(&self) -> bool { + self.pools.len() == 1 && self.pools[0].layout.len() == 1 && self.pools[0].layout[0].len() == 1 + } +} + /// parses all ellipses input arguments, expands them into /// corresponding list of endpoints chunked evenly in accordance with a /// specific set size. @@ -95,7 +110,7 @@ fn get_all_sets>(is_ellipses: bool, args: &[T]) -> Result>(args: &[T], total_sizes: &[usize], arg_patterns: &[ArgPattern]) -> Result>> { if args.is_empty() || total_sizes.is_empty() { - return Err(Error::msg("Invalid argument")); + return Err(Error::from_string("Invalid argument")); } for &size in total_sizes { // Check if total_sizes has minimum range upto set_size if size < SET_SIZES[0] { - return Err(Error::msg(format!("Incorrect number of endpoints provided, size {}", size))); + return Err(Error::from_string(format!("Incorrect number of endpoints provided, size {}", size))); } } let common_size = get_divisible_size(total_sizes); let mut set_counts = possible_set_counts(common_size); if set_counts.is_empty() { - return Err(Error::msg(format!( + return Err(Error::from_string(format!( "Incorrect number of endpoints provided, number of drives {} is not divisible by any supported erasure set sizes {}", common_size, 0 ))); @@ -278,13 +293,13 @@ fn get_set_indexes>(args: &[T], total_sizes: &[usize], arg_pattern // Returns possible set counts with symmetry. set_counts = possible_set_counts_with_symmetry(&set_counts, arg_patterns); if set_counts.is_empty() { - return Err(Error::msg("No symmetric distribution detected with input endpoints provided")); + return Err(Error::from_string("No symmetric distribution detected with input endpoints provided")); } // Final set size with all the symmetry accounted for. let set_size = common_set_drive_count(common_size, &set_counts); if !is_valid_set_size(set_size) { - return Err(Error::msg("Incorrect number of endpoints provided3")); + return Err(Error::from_string("Incorrect number of endpoints provided3")); } Ok(total_sizes @@ -475,7 +490,7 @@ mod test { arg_patterns.push(patterns); } Err(err) => { - panic!("Test{}: Unexpected failure {}", test_case.num, err); + panic!("Test{}: Unexpected failure {:?}", test_case.num, err); } } } @@ -494,7 +509,7 @@ mod test { } Err(err) => { if test_case.success { - panic!("Test{}: Expected success but failed instead {}", test_case.num, err); + panic!("Test{}: Expected success but failed instead {:?}", test_case.num, err); } } } @@ -754,7 +769,7 @@ mod test { } Err(err) => { if test_case.success { - panic!("Test{}: Expected success but failed instead {}", test_case.num, err); + panic!("Test{}: Expected success but failed instead {:?}", test_case.num, err); } } } diff --git a/ecstore/src/ellipses.rs b/ecstore/src/ellipses.rs index 4a29b98d..170ec0d7 100644 --- a/ecstore/src/ellipses.rs +++ b/ecstore/src/ellipses.rs @@ -1,6 +1,6 @@ use lazy_static::*; -use anyhow::{Error, Result}; +use super::error::{Error, Result}; use regex::Regex; lazy_static! { @@ -88,7 +88,7 @@ pub fn find_ellipses_patterns(arg: &str) -> Result { let mut parts = match ELLIPSES_RE.captures(arg) { Some(caps) => caps, None => { - return Err(Error::msg(format!("Invalid ellipsis format in ({}), Ellipsis range must be provided in format {{N...M}} where N and M are positive integers, M must be greater than N, with an allowed minimum range of 4", arg))); + return Err(Error::from_string(format!("Invalid ellipsis format in ({}), Ellipsis range must be provided in format {{N...M}} where N and M are positive integers, M must be greater than N, with an allowed minimum range of 4", arg))); } }; @@ -125,7 +125,7 @@ pub fn find_ellipses_patterns(arg: &str) -> Result { || p.suffix.contains(OPEN_BRACES) || p.suffix.contains(CLOSE_BRACES) { - return Err(Error::msg(format!("Invalid ellipsis format in ({}), Ellipsis range must be provided in format {{N...M}} where N and M are positive integers, M must be greater than N, with an allowed minimum range of 4", arg))); + return Err(Error::from_string(format!("Invalid ellipsis format in ({}), Ellipsis range must be provided in format {{N...M}} where N and M are positive integers, M must be greater than N, with an allowed minimum range of 4", arg))); } } @@ -146,10 +146,10 @@ pub fn has_ellipses>(s: &[T]) -> bool { /// {33...64} pub fn parse_ellipses_range(pattern: &str) -> Result> { if !pattern.contains(OPEN_BRACES) { - return Err(Error::msg("Invalid argument")); + return Err(Error::from_string("Invalid argument")); } if !pattern.contains(OPEN_BRACES) { - return Err(Error::msg("Invalid argument")); + return Err(Error::from_string("Invalid argument")); } let ellipses_range: Vec<&str> = pattern @@ -159,7 +159,7 @@ pub fn parse_ellipses_range(pattern: &str) -> Result> { .collect(); if ellipses_range.len() != 2 { - return Err(Error::msg("Invalid argument")); + return Err(Error::from_string("Invalid argument")); } // TODO: Add support for hexadecimals. @@ -167,7 +167,7 @@ pub fn parse_ellipses_range(pattern: &str) -> Result> { let end = ellipses_range[1].parse::()?; if start > end { - return Err(Error::msg("Invalid argument:range start cannot be bigger than end")); + return Err(Error::from_string("Invalid argument:range start cannot be bigger than end")); } let mut ret: Vec = Vec::with_capacity(end - start + 1); From b33a788c20a9c786a7ce017036ce7478827ec155 Mon Sep 17 00:00:00 2001 From: "shiro.lee" Date: Wed, 3 Jul 2024 18:56:47 +0800 Subject: [PATCH 2/4] =?UTF-8?q?fix:=20=E4=BC=98=E5=8C=96endpoint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecstore/src/endpoint.rs | 262 ++++++++++++++++++++++++++-------------- 1 file changed, 170 insertions(+), 92 deletions(-) diff --git a/ecstore/src/endpoint.rs b/ecstore/src/endpoint.rs index 8cfc8b9e..dd9f67f3 100644 --- a/ecstore/src/endpoint.rs +++ b/ecstore/src/endpoint.rs @@ -1,11 +1,17 @@ -use super::disks_layout::PoolDisksLayout; +use super::disks_layout::{DisksLayout, PoolDisksLayout}; +use super::error::{Error, Result}; use super::utils::{ net::{is_local_host, split_host_port}, string::new_string_set, }; -use anyhow::{Error, Result}; +use path_absolutize::Absolutize; use std::fmt::Display; -use std::{collections::HashMap, net::IpAddr, path::Path, usize}; +use std::{ + collections::HashMap, + net::{IpAddr, SocketAddr}, + path::Path, + usize, +}; use url::{ParseError, Url}; pub const DEFAULT_PORT: u16 = 9000; @@ -39,14 +45,14 @@ pub struct Node { // } /// any type of endpoint. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct Endpoint { pub url: url::Url, pub is_local: bool, - pub pool_idx: i32, - pub set_idx: i32, - pub disk_idx: i32, + pub pool_idx: Option, + pub set_idx: Option, + pub disk_idx: Option, } impl Display for Endpoint { @@ -61,22 +67,86 @@ impl Display for Endpoint { impl TryFrom<&str> for Endpoint { /// The type returned in the event of a conversion error. - type Error = String; + type Error = Error; /// Performs the conversion. fn try_from(value: &str) -> Result { if is_empty_path(value) { - return Err("empty or root endpoint is not supported".into()); + return Err(Error::from_string("empty or root endpoint is not supported")); } - // match Url::parse(value) { - // Ok(u) => u, - // Err(e) => match e { - // ParseError::EmptyHost => Err("") - // }, - // } + let mut is_local = false; + let url = match Url::parse(value) { + Ok(url) => { + // URL style of endpoint. + // Valid URL style endpoint is + // - Scheme field must contain "http" or "https" + // - All field should be empty except Host and Path. + if !((url.scheme() == "http" || url.scheme() == "https") + && url.username().is_empty() + && url.fragment().is_none() + && url.query().is_none()) + { + return Err(Error::from_string("invalid URL endpoint format")); + } + if is_empty_path(url.path()) { + return Err(Error::from_string("empty or root endpoint is not supported")); + } - unimplemented!() + // On windows having a preceding SlashSeparator will cause problems, if the + // command line already has C:/ match e { + ParseError::InvalidPort => { + return Err(Error::from_string("invalid URL endpoint format: port number must be between 1 to 65535")) + } + ParseError::EmptyHost => return Err(Error::from_string("invalid URL endpoint format: empty host name")), + ParseError::RelativeUrlWithoutBase => { + // Only check if the arg is an ip address and ask for scheme since its absent. + // localhost, example.com, any FQDN cannot be disambiguated from a regular file path such as + // /mnt/export1. So we go ahead and start the rustfs server in FS modes in these cases. + if is_socket_addr(value) { + return Err(Error::from_string("invalid URL endpoint format: missing scheme http or https")); + } + + let file_path = match Path::new(value).absolutize() { + Ok(path) => path, + Err(err) => return Err(Error::from_string(format!("absolute path failed: {}", err))), + }; + + match Url::from_file_path(file_path) { + Ok(url) => { + is_local = true; + url + } + Err(err) => return Err(Error::from_string("Convert a file path into an URL failed")), + } + } + _ => return Err(Error::from_string(format!("invalid URL endpoint format: {}", e))), + }, + }; + + Ok(Endpoint { + url, + is_local, + ..Default::default() + }) } } @@ -85,36 +155,36 @@ fn is_empty_path(path: &str) -> bool { ["", "/", "\\"].iter().any(|&v| v.eq(path)) } -// 检查给定字符串是否是IP地址 -fn is_host_ip(ip_str: &str) -> bool { - ip_str.parse::().is_ok() +// helper for validating if the provided arg is an ip address. +fn is_socket_addr(host: &str) -> bool { + host.parse::().is_ok() || host.parse::().is_ok() } impl Endpoint { pub fn new(arg: &str) -> Result { if is_empty_path(arg) { - return Err(Error::msg("不支持空或根endpoint")); + return Err(Error::from_string("不支持空或根endpoint")); } let url = Url::parse(arg).or_else(|e| match e { - ParseError::EmptyHost => Err(Error::msg("远程地址,域名不能为空")), - ParseError::IdnaError => Err(Error::msg("域名格式不正确")), - ParseError::InvalidPort => Err(Error::msg("端口格式不正确")), - ParseError::InvalidIpv4Address => Err(Error::msg("IP格式不正确")), - ParseError::InvalidIpv6Address => Err(Error::msg("IP格式不正确")), - ParseError::InvalidDomainCharacter => Err(Error::msg("域名字符格式不正确")), + ParseError::EmptyHost => Err(Error::from_string("远程地址,域名不能为空")), + ParseError::IdnaError => Err(Error::from_string("域名格式不正确")), + ParseError::InvalidPort => Err(Error::from_string("端口格式不正确")), + ParseError::InvalidIpv4Address => Err(Error::from_string("IP格式不正确")), + ParseError::InvalidIpv6Address => Err(Error::from_string("IP格式不正确")), + ParseError::InvalidDomainCharacter => Err(Error::from_string("域名字符格式不正确")), // url::ParseError::RelativeUrlWithoutBase => todo!(), // url::ParseError::RelativeUrlWithCannotBeABaseBase => todo!(), // url::ParseError::SetHostOnCannotBeABaseUrl => todo!(), - ParseError::Overflow => Err(Error::msg("长度过长")), + ParseError::Overflow => Err(Error::from_string("长度过长")), _ => { if is_host_ip(arg) { - return Err(Error::msg("无效的URL endpoint格式: 缺少 http 或 https")); + return Err(Error::from_string("无效的URL endpoint格式: 缺少 http 或 https")); } let abs_arg = Path::new(arg).canonicalize()?; - let abs = abs_arg.to_str().ok_or(Error::msg("绝对路径错误"))?; + let abs = abs_arg.to_str().ok_or(Error::from_string("绝对路径错误"))?; let url = Url::from_file_path(abs).unwrap(); Ok(url) } @@ -131,17 +201,17 @@ impl Endpoint { } if url.port().is_none() { - return Err(Error::msg("必须提供端口号")); + return Err(Error::from_string("必须提供端口号")); } if !(url.scheme() == "http" || url.scheme() == "https") { - return Err(Error::msg("URL endpoint格式无效: Scheme字段必须包含'http'或'https'")); + return Err(Error::from_string("URL endpoint格式无效: Scheme字段必须包含'http'或'https'")); } // 检查路径 let path = url.path(); if is_empty_path(path) { - return Err(Error::msg("URL endpoint不支持空或根路径")); + return Err(Error::from_string("URL endpoint不支持空或根路径")); } // TODO: Windows 系统上的路径处理 @@ -258,15 +328,15 @@ impl Endpoints { ep_type = endpoint.get_type(); scheme = endpoint.url.scheme().to_string(); } else if endpoint.get_type() != ep_type { - return Err(Error::msg("不支持多种endpoints风格")); + return Err(Error::from_string("不支持多种endpoints风格")); } else if endpoint.url.scheme().to_string() != scheme { - return Err(Error::msg("不支持多种scheme")); + return Err(Error::from_string("不支持多种scheme")); } let arg_str = endpoint.to_string(); if uniq_args.contains(arg_str.as_str()) { - return Err(Error::msg("发现重复 endpoints")); + return Err(Error::from_string("发现重复 endpoints")); } uniq_args.add(arg_str); @@ -316,7 +386,7 @@ pub struct PoolEndpoints { pub platform: String, } -// EndpointServerPools - list of list of endpoints +/// list of list of endpoints #[derive(Debug)] pub struct EndpointServerPools(Vec); @@ -332,7 +402,7 @@ impl EndpointServerPools { legacy: bool, ) -> Result<(EndpointServerPools, SetupType)> { if pool_args.is_empty() { - return Err(Error::msg("无效参数")); + return Err(Error::from_string("无效参数")); } let (pooleps, setup_type) = create_pool_endpoints(server_addr, pool_args)?; @@ -384,7 +454,7 @@ impl EndpointServerPools { for ep in eps.endpoints.0.iter() { if exits.contains(&ep.to_string()) { - return Err(Error::msg("endpoints exists")); + return Err(Error::from_string("endpoints exists")); } } @@ -419,22 +489,23 @@ impl EndpointServerPools { } } +/// enum for setup type. #[derive(Debug)] pub enum SetupType { - // UnknownSetupType - starts with unknown setup type. - UnknownSetupType, + /// starts with unknown setup type. + Unknown, - // FSSetupType - FS setup type enum. - FSSetupType, + /// FS setup type enum. + FS, - // ErasureSDSetupType - Erasure single drive setup enum. - ErasureSDSetupType, + /// Erasure single drive setup enum. + ErasureSD, - // ErasureSetupType - Erasure setup type enum. - ErasureSetupType, + /// Erasure setup type enum. + Erasure, - // DistErasureSetupType - Distributed Erasure setup type enum. - DistErasureSetupType, + /// Distributed Erasure setup type enum. + DistErasure, } fn is_empty_layout(pools_layout: &Vec) -> bool { @@ -457,9 +528,19 @@ fn is_single_drive_layout(pools_layout: &Vec) -> bool { } } +pub fn create_pool_endpoints_v2(server_addr: &str, disks_layout: &DisksLayout) -> Result<(Vec, SetupType)> { + if disks_layout.is_empty_layout() { + return Err(Error::from_string("invalid number of endpoints")); + } + + if disks_layout.is_single_drive_layout() {} + + unimplemented!() +} + pub fn create_pool_endpoints(server_addr: String, pools: &Vec) -> Result<(Vec, SetupType)> { if is_empty_layout(pools) { - return Err(Error::msg("empty layout")); + return Err(Error::from_string("empty layout")); } // TODO: CheckLocalServerAddr @@ -469,7 +550,7 @@ pub fn create_pool_endpoints(server_addr: String, pools: &Vec) endpoint.update_islocal()?; if endpoint.get_type() != EndpointType::Path { - return Err(Error::msg("use path style endpoint for single node setup")); + return Err(Error::from_string("use path style endpoint for single node setup")); } endpoint.set_pool_index(0); @@ -481,7 +562,7 @@ pub fn create_pool_endpoints(server_addr: String, pools: &Vec) // TODO: checkCrossDeviceMounts - return Ok((vec![Endpoints(endpoints)], SetupType::ErasureSDSetupType)); + return Ok((vec![Endpoints(endpoints)], SetupType::ErasureSD)); } let mut ret = Vec::with_capacity(pools.len()); @@ -501,7 +582,7 @@ pub fn create_pool_endpoints(server_addr: String, pools: &Vec) } if endpoints.0.is_empty() { - return Err(Error::msg("invalid number of endpoints")); + return Err(Error::from_string("invalid number of endpoints")); } ret.push(endpoints); @@ -510,7 +591,7 @@ pub fn create_pool_endpoints(server_addr: String, pools: &Vec) // TODO: PoolEndpointList::from_vec(ret.clone()).update_is_local()?; - let mut setup_type = SetupType::UnknownSetupType; + let mut setup_type = SetupType::Unknown; // TODO: parse server port let (_, server_port) = split_host_port(server_addr.as_str())?; @@ -533,15 +614,15 @@ pub fn create_pool_endpoints(server_addr: String, pools: &Vec) for eps in ret.iter() { if eps.0[0].get_type() == EndpointType::Path { - setup_type = SetupType::ErasureSetupType; + setup_type = SetupType::Erasure; break; } if eps.0[0].get_type() == EndpointType::Url { if erasure_type { - setup_type = SetupType::ErasureSetupType; + setup_type = SetupType::Erasure; } else { - setup_type = SetupType::DistErasureSetupType; + setup_type = SetupType::DistErasure; } break; @@ -551,35 +632,32 @@ pub fn create_pool_endpoints(server_addr: String, pools: &Vec) Ok((ret, setup_type)) } -// create_server_endpoints -fn create_server_endpoints( - server_addr: String, - pool_args: &Vec, - legacy: bool, -) -> Result<(EndpointServerPools, SetupType)> { - if pool_args.is_empty() { - return Err(Error::msg("无效参数")); - } +/// validates and creates new endpoints from input args, supports +/// both ellipses and without ellipses transparently. +// fn create_server_endpoints(server_addr: String, disks_layout: &DisksLayout) -> Result<(EndpointServerPools, SetupType)> { +// if disks_layout.is_empty() { +// return Err(Error::from_string("Invalid arguments specified")); +// } - let (pooleps, setup_type) = create_pool_endpoints(server_addr, pool_args)?; +// let (pooleps, setup_type) = create_pool_endpoints(server_addr, &disks_layout.pools)?; - let mut ret = EndpointServerPools::new(); +// let mut ret = EndpointServerPools::new(); - for (i, eps) in pooleps.iter().enumerate() { - let ep = PoolEndpoints { - legacy: legacy, - set_count: pool_args[i].layout.len(), - drives_per_set: pool_args[i].layout[0].len(), - endpoints: eps.clone(), - cmd_line: pool_args[i].cmd_line.clone(), - platform: String::new(), - }; +// for (i, eps) in pooleps.iter().enumerate() { +// let ep = PoolEndpoints { +// legacy: disks_layout.legacy, +// set_count: pool_args[i].layout.len(), +// drives_per_set: pool_args[i].layout[0].len(), +// endpoints: eps.clone(), +// cmd_line: pool_args[i].cmd_line.clone(), +// platform: String::new(), +// }; - ret.add(ep)?; - } +// ret.add(ep)?; +// } - Ok((ret, setup_type)) -} +// Ok((ret, setup_type)) +// } #[cfg(test)] mod test { @@ -605,21 +683,21 @@ mod test { println!("{:?}", ep); } - #[test] - fn test_create_server_endpoints() { - let cases = vec![(":9000", vec!["http://localhost:900{1...2}/export{1...64}".to_string()])]; + // #[test] + // fn test_create_server_endpoints() { + // let cases = vec![(":9000", vec!["http://localhost:900{1...2}/export{1...64}".to_string()])]; - for (addr, args) in cases { - let layouts = DisksLayout::try_from(args.as_slice()).unwrap(); + // for (addr, args) in cases { + // let layouts = DisksLayout::try_from(args.as_slice()).unwrap(); - println!("layouts:{:?},{}", &layouts.pools, &layouts.legacy); + // println!("layouts:{:?},{}", &layouts.pools, &layouts.legacy); - let (server_pool, setup_type) = create_server_endpoints(addr.to_string(), &layouts.pools, layouts.legacy).unwrap(); + // let (server_pool, setup_type) = create_server_endpoints(addr.to_string(), &layouts.pools, layouts.legacy).unwrap(); - println!("setup_type -- {:?}", setup_type); - println!("server_pool == {:?}", server_pool); - } + // println!("setup_type -- {:?}", setup_type); + // println!("server_pool == {:?}", server_pool); + // } - // create_server_endpoints(server_addr, pool_args, legacy) - } + // // create_server_endpoints(server_addr, pool_args, legacy) + // } } From f9462162a565f2465774f4e195910ab8bb0ff5ec Mon Sep 17 00:00:00 2001 From: "shiro.lee" Date: Wed, 3 Jul 2024 23:00:32 +0800 Subject: [PATCH 3/4] =?UTF-8?q?fix:=20=E4=BC=98=E5=8C=96net?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecstore/src/endpoint.rs | 169 +++++++++------------------------------ ecstore/src/utils/net.rs | 83 +++++++++---------- 2 files changed, 79 insertions(+), 173 deletions(-) diff --git a/ecstore/src/endpoint.rs b/ecstore/src/endpoint.rs index dd9f67f3..5796df56 100644 --- a/ecstore/src/endpoint.rs +++ b/ecstore/src/endpoint.rs @@ -1,17 +1,12 @@ use super::disks_layout::{DisksLayout, PoolDisksLayout}; use super::error::{Error, Result}; use super::utils::{ - net::{is_local_host, split_host_port}, + net::{is_local_host, is_socket_addr, split_host_port}, string::new_string_set, }; use path_absolutize::Absolutize; use std::fmt::Display; -use std::{ - collections::HashMap, - net::{IpAddr, SocketAddr}, - path::Path, - usize, -}; +use std::{collections::HashMap, path::Path, usize}; use url::{ParseError, Url}; pub const DEFAULT_PORT: u16 = 9000; @@ -45,7 +40,7 @@ pub struct Node { // } /// any type of endpoint. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct Endpoint { pub url: url::Url, pub is_local: bool, @@ -57,10 +52,10 @@ pub struct Endpoint { impl Display for Endpoint { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if self.url.has_host() { - write!(f, "{}", self.url) - } else { + if self.url.scheme() == "file" { write!(f, "{}", self.url.path()) + } else { + write!(f, "{}", self.url) } } } @@ -77,7 +72,7 @@ impl TryFrom<&str> for Endpoint { let mut is_local = false; let url = match Url::parse(value) { - Ok(url) => { + Ok(mut url) => { // URL style of endpoint. // Valid URL style endpoint is // - Scheme field must contain "http" or "https" @@ -89,6 +84,7 @@ impl TryFrom<&str> for Endpoint { { return Err(Error::from_string("invalid URL endpoint format")); } + if is_empty_path(url.path()) { return Err(Error::from_string("empty or root endpoint is not supported")); } @@ -108,7 +104,12 @@ impl TryFrom<&str> for Endpoint { // Another additional benefit here is that this style also // supports providing \\host\share support as well. #[cfg(windows)] - {} + { + let path = url.path().to_owned(); + if Path::new(&path[1..]).has_root() { + url.set_path(&path[1..]); + } + } url } @@ -135,7 +136,7 @@ impl TryFrom<&str> for Endpoint { is_local = true; url } - Err(err) => return Err(Error::from_string("Convert a file path into an URL failed")), + Err(_) => return Err(Error::from_string("Convert a file path into an URL failed")), } } _ => return Err(Error::from_string(format!("invalid URL endpoint format: {}", e))), @@ -145,7 +146,9 @@ impl TryFrom<&str> for Endpoint { Ok(Endpoint { url, is_local, - ..Default::default() + pool_idx: None, + set_idx: None, + disk_idx: None, }) } } @@ -155,128 +158,34 @@ fn is_empty_path(path: &str) -> bool { ["", "/", "\\"].iter().any(|&v| v.eq(path)) } -// helper for validating if the provided arg is an ip address. -fn is_socket_addr(host: &str) -> bool { - host.parse::().is_ok() || host.parse::().is_ok() -} - impl Endpoint { - pub fn new(arg: &str) -> Result { - if is_empty_path(arg) { - return Err(Error::from_string("不支持空或根endpoint")); - } - - let url = Url::parse(arg).or_else(|e| match e { - ParseError::EmptyHost => Err(Error::from_string("远程地址,域名不能为空")), - ParseError::IdnaError => Err(Error::from_string("域名格式不正确")), - ParseError::InvalidPort => Err(Error::from_string("端口格式不正确")), - ParseError::InvalidIpv4Address => Err(Error::from_string("IP格式不正确")), - ParseError::InvalidIpv6Address => Err(Error::from_string("IP格式不正确")), - ParseError::InvalidDomainCharacter => Err(Error::from_string("域名字符格式不正确")), - // url::ParseError::RelativeUrlWithoutBase => todo!(), - // url::ParseError::RelativeUrlWithCannotBeABaseBase => todo!(), - // url::ParseError::SetHostOnCannotBeABaseUrl => todo!(), - ParseError::Overflow => Err(Error::from_string("长度过长")), - _ => { - if is_host_ip(arg) { - return Err(Error::from_string("无效的URL endpoint格式: 缺少 http 或 https")); - } - - let abs_arg = Path::new(arg).canonicalize()?; - - let abs = abs_arg.to_str().ok_or(Error::from_string("绝对路径错误"))?; - let url = Url::from_file_path(abs).unwrap(); - Ok(url) - } - })?; - - if url.scheme() == "file" { - return Ok(Endpoint { - url: url, - is_local: true, - pool_idx: -1, - set_idx: -1, - disk_idx: -1, - }); - } - - if url.port().is_none() { - return Err(Error::from_string("必须提供端口号")); - } - - if !(url.scheme() == "http" || url.scheme() == "https") { - return Err(Error::from_string("URL endpoint格式无效: Scheme字段必须包含'http'或'https'")); - } - - // 检查路径 - let path = url.path(); - if is_empty_path(path) { - return Err(Error::from_string("URL endpoint不支持空或根路径")); - } - - // TODO: Windows 系统上的路径处理 - #[cfg(windows)] - { - use std::env; - if env::consts::OS == "windows" { - // 处理 Windows 路径的特殊逻辑 - } - } - - Ok(Endpoint { - url: url, - is_local: false, - pool_idx: -1, - set_idx: -1, - disk_idx: -1, - }) - } - - // pub fn host_port_str(&self) -> String { - // if self.url.has_host() && self.port() > 0 { - // return format!("{}:{}", self.hostname(), self.port()); - // } else if self.url.has_host() && self.port() == 0 { - // return self.hostname().to_string(); - // } else if !self.url.has_host() && self.port() > 0 { - // return format!(":{}", self.port()); - // } else { - // return String::new(); - // } - // } - - // pub fn port(&self) -> u16 { - // self.url.port().unwrap_or(0) - // } - // pub fn hostname(&self) -> &str { - // self.url.host_str().unwrap_or("") - // } - + /// returns type of endpoint. pub fn get_type(&self) -> EndpointType { - if self.url.has_host() { - EndpointType::Url - } else { + if self.url.scheme() == "file" { EndpointType::Path + } else { + EndpointType::Url } } - // pub fn get_scheme(&self) -> String { - // self.url.scheme().to_string() - // } - - pub fn set_pool_index(&mut self, idx: i32) { - self.pool_idx = idx + /// sets a specific pool number to this node + pub fn set_pool_index(&mut self, idx: usize) { + self.pool_idx = Some(idx) } - pub fn set_set_index(&mut self, idx: i32) { - self.set_idx = idx + /// sets a specific set number to this node + pub fn set_set_index(&mut self, idx: usize) { + self.set_idx = Some(idx) } - pub fn set_disk_index(&mut self, idx: i32) { - self.disk_idx = idx + /// sets a specific disk number to this node + pub fn set_disk_index(&mut self, idx: usize) { + self.disk_idx = Some(idx) } - fn update_islocal(&mut self) -> Result<()> { - if self.url.has_host() { + /// resolves the host and updates if it is local or not. + fn update_is_local(&mut self) -> Result<()> { + if self.url.scheme() != "file" { self.is_local = is_local_host(self.url.host().unwrap(), self.url.port().unwrap(), DEFAULT_PORT); } @@ -323,7 +232,7 @@ impl Endpoints { let mut eps = Vec::new(); let mut uniq_args = new_string_set(); for (i, arg) in args.iter().enumerate() { - let endpoint = Endpoint::new(arg)?; + let endpoint = Endpoint::try_from(arg.as_str())?; if i == 0 { ep_type = endpoint.get_type(); scheme = endpoint.url.scheme().to_string(); @@ -365,7 +274,7 @@ impl PoolEndpointList { for eps in self.0.iter_mut() { for ep in eps.iter_mut() { // TODO: - ep.update_islocal()? + ep.update_is_local()? } } @@ -573,9 +482,9 @@ pub fn create_pool_endpoints(server_addr: String, pools: &Vec) let mut eps = Endpoints::from_args(set_layout.to_owned())?; // TODO: checkCrossDeviceMounts for (disk_idx, ep) in eps.0.iter_mut().enumerate() { - ep.set_pool_index(pool_idx as i32); - ep.set_set_index(set_idx as i32); - ep.set_disk_index(disk_idx as i32); + ep.set_pool_index(pool_idx); + ep.set_set_index(set_idx); + ep.set_disk_index(disk_idx); endpoints.0.push(ep.to_owned()); } diff --git a/ecstore/src/utils/net.rs b/ecstore/src/utils/net.rs index 734e6a3a..6c91c4f7 100644 --- a/ecstore/src/utils/net.rs +++ b/ecstore/src/utils/net.rs @@ -1,12 +1,17 @@ use std::{ - collections::HashMap, - net::{IpAddr, ToSocketAddrs}, + collections::HashSet, + net::{IpAddr, SocketAddr, ToSocketAddrs}, }; use anyhow::{Error, Result}; use netif; use url::Host; +// helper for validating if the provided arg is an ip address. +pub fn is_socket_addr(host: &str) -> bool { + host.parse::().is_ok() || host.parse::().is_ok() +} + pub fn split_host_port(s: &str) -> Result<(String, u16)> { let parts: Vec<&str> = s.split(':').collect(); if parts.len() == 2 { @@ -17,38 +22,23 @@ pub fn split_host_port(s: &str) -> Result<(String, u16)> { Err(Error::msg("Invalid address format or port number")) } -// is_local_host 判断是否是本地ip +/// checks if the given parameter correspond to one of +/// the local IP of the current machine pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool { let local_ips = must_get_local_ips(); - let local_map = - local_ips - .iter() - .map(|ip| ip.to_string()) - .fold(HashMap::new(), |mut acc, item| { - *acc.entry(item).or_insert(true) = true; - acc - }); - + let local_set: HashSet = local_ips.into_iter().collect(); let is_local_host = match host { Host::Domain(domain) => { - let ips: Vec = (domain, 0) - .to_socket_addrs() - .unwrap_or(Vec::new().into_iter()) - .map(|addr| addr.ip().to_string()) - .collect(); + let ips = match (domain, 0).to_socket_addrs().map(|v| v.map(|v| v.ip()).collect::>()) { + Ok(ips) => ips, + Err(_) => return false, + }; - let mut isok = false; - for ip in ips.iter() { - if local_map.contains_key(ip) { - isok = true; - break; - } - } - isok + ips.iter().any(|ip| local_set.contains(ip)) } - Host::Ipv4(ip) => local_map.contains_key(&ip.to_string()), - Host::Ipv6(ip) => local_map.contains_key(&ip.to_string()), + Host::Ipv4(ip) => local_set.contains(&IpAddr::V4(ip)), + Host::Ipv6(ip) => local_set.contains(&IpAddr::V6(ip)), }; if port > 0 { @@ -58,37 +48,44 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool { is_local_host } +/// returns IPs of local interface pub fn must_get_local_ips() -> Vec { - let mut v: Vec = Vec::new(); - if let Some(up) = netif::up().ok() { - v = up.map(|x| x.address().to_owned()).collect(); + match netif::up() { + Ok(up) => up.map(|x| x.address().to_owned()).collect(), + Err(_) => vec![], } - - v } #[cfg(test)] mod test { - use std::net::Ipv4Addr; + use std::net::{Ipv4Addr, Ipv6Addr}; use super::*; #[test] fn test_must_get_local_ips() { - let ips = must_get_local_ips(); - for ip in ips.iter() { - println!("{:?}", ip) - } + let local_ips = must_get_local_ips(); + let local_set: HashSet = local_ips.into_iter().collect(); + + assert!(local_set.contains(&IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))); } #[test] fn test_is_local_host() { - // let host = Host::Ipv4(Ipv4Addr::new(192, 168, 0, 233)); + let host = Host::Domain("localhost"); + let is = is_local_host(host, 0, 9000); + assert!(is); + + let host = Host::Ipv6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)); + let is = is_local_host(host, 0, 9000); + assert!(is); + + let host = Host::Ipv4(Ipv4Addr::new(8, 8, 8, 8)); + let is = is_local_host(host, 0, 9000); + assert!(!is); + let host = Host::Ipv4(Ipv4Addr::new(127, 0, 0, 1)); - // let host = Host::Domain("localhost"); - let port = 0; - let local_port = 9000; - let is = is_local_host(host, port, local_port); - assert!(is) + let is = is_local_host(host, 8000, 9000); + assert!(!is); } } From 18b07128fc26c4e4a43906b29d76afa1a39f6c6a Mon Sep 17 00:00:00 2001 From: "shiro.lee" Date: Thu, 4 Jul 2024 18:31:08 +0800 Subject: [PATCH 4/4] =?UTF-8?q?fix:=20=E4=BC=98=E5=8C=96endpoint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ecstore/src/disk.rs | 12 +- ecstore/src/disks_layout.rs | 46 ++- ecstore/src/endpoint.rs | 614 +++++++++++++++--------------------- ecstore/src/error.rs | 8 +- ecstore/src/peer.rs | 12 +- ecstore/src/store.rs | 2 +- ecstore/src/utils/net.rs | 85 +++-- 7 files changed, 364 insertions(+), 415 deletions(-) diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index 22a42268..04aed70e 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -54,14 +54,14 @@ pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result { } pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec>, Vec>) { - let mut futures = Vec::with_capacity(eps.len()); + let mut futures = Vec::with_capacity(eps.as_ref().len()); - for ep in eps.iter() { + for ep in eps.as_ref().iter() { futures.push(new_disk(ep, opt)); } - let mut res = Vec::with_capacity(eps.len()); - let mut errors = Vec::with_capacity(eps.len()); + let mut res = Vec::with_capacity(eps.as_ref().len()); + let mut errors = Vec::with_capacity(eps.as_ref().len()); let results = join_all(futures).await; for result in results { @@ -119,7 +119,7 @@ impl LocalDisk { let fm = FormatV3::try_from(s)?; let (set_idx, disk_idx) = fm.find_disk_index_by_disk_id(fm.erasure.this)?; - if set_idx as i32 != ep.set_idx || disk_idx as i32 != ep.disk_idx { + if Some(set_idx) != ep.set_idx || Some(disk_idx) != ep.disk_idx { return Err(Error::new(DiskError::InconsistentDisk)); } @@ -606,7 +606,7 @@ mod test { let p = "./testv"; fs::create_dir_all(&p).await.unwrap(); - let ep = match Endpoint::new(&p) { + let ep = match Endpoint::try_from(p) { Ok(e) => e, Err(e) => { println!("{e}"); diff --git a/ecstore/src/disks_layout.rs b/ecstore/src/disks_layout.rs index 94ccc750..261a239c 100644 --- a/ecstore/src/disks_layout.rs +++ b/ecstore/src/disks_layout.rs @@ -9,8 +9,20 @@ const SET_SIZES: [usize; 15] = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, #[derive(Deserialize, Debug, Default)] pub struct PoolDisksLayout { - pub cmd_line: String, - pub layout: Vec>, + cmd_line: String, + layout: Vec>, +} + +impl AsRef>> for PoolDisksLayout { + fn as_ref(&self) -> &Vec> { + &self.layout + } +} + +impl AsMut>> for PoolDisksLayout { + fn as_mut(&mut self) -> &mut Vec> { + &mut self.layout + } } impl PoolDisksLayout { @@ -20,12 +32,32 @@ impl PoolDisksLayout { layout, } } + + pub fn count(&self) -> usize { + self.layout.len() + } + + pub fn as_cmd_line(&self) -> &str { + &self.cmd_line + } } #[derive(Deserialize, Debug, Default)] pub struct DisksLayout { pub legacy: bool, - pub pools: Vec, + pools: Vec, +} + +impl AsRef> for DisksLayout { + fn as_ref(&self) -> &Vec { + &self.pools + } +} + +impl AsMut> for DisksLayout { + fn as_mut(&mut self) -> &mut Vec { + &mut self.pools + } } impl> TryFrom<&[T]> for DisksLayout { @@ -82,6 +114,14 @@ impl DisksLayout { pub fn is_single_drive_layout(&self) -> bool { self.pools.len() == 1 && self.pools[0].layout.len() == 1 && self.pools[0].layout[0].len() == 1 } + + pub fn get_single_drive_layout(&self) -> &str { + &self.pools[0].layout[0][0] + } + + pub fn get_layout(&self, idx: usize) -> Option<&PoolDisksLayout> { + self.pools.get(idx) + } } /// parses all ellipses input arguments, expands them into diff --git a/ecstore/src/endpoint.rs b/ecstore/src/endpoint.rs index 5796df56..5ee20f3c 100644 --- a/ecstore/src/endpoint.rs +++ b/ecstore/src/endpoint.rs @@ -1,16 +1,12 @@ -use super::disks_layout::{DisksLayout, PoolDisksLayout}; +use super::disks_layout::DisksLayout; use super::error::{Error, Result}; -use super::utils::{ - net::{is_local_host, is_socket_addr, split_host_port}, - string::new_string_set, -}; +use super::utils::net; use path_absolutize::Absolutize; +use std::collections::HashSet; use std::fmt::Display; use std::{collections::HashMap, path::Path, usize}; use url::{ParseError, Url}; -pub const DEFAULT_PORT: u16 = 9000; - /// enum for endpoint type. #[derive(PartialEq, Eq)] pub enum EndpointType { @@ -19,26 +15,33 @@ pub enum EndpointType { /// URL style endpoint type enum. Url, +} - /// Unknown endpoint type enum. - UnKnow, +/// enum for setup type. +#[derive(Debug)] +pub enum SetupType { + /// FS setup type enum. + FS, + + /// Erasure single drive setup enum. + ErasureSD, + + /// Erasure setup type enum. + Erasure, + + /// Distributed Erasure setup type enum. + DistErasure, } /// holds information about a node in this cluster #[derive(Debug, Clone, PartialEq, Eq)] pub struct Node { pub url: url::Url, - pub pools: Vec, + pub pools: Vec, pub is_local: bool, - pub grid_host: String, // TODO "scheme://host:port" + pub grid_host: String, } -// impl PartialOrd for Node { -// fn partial_cmp(&self, other: &Self) -> Option { -// self.grid_host.partial_cmp(&other.grid_host) -// } -// } - /// any type of endpoint. #[derive(Debug, Clone)] pub struct Endpoint { @@ -66,6 +69,11 @@ impl TryFrom<&str> for Endpoint { /// Performs the conversion. fn try_from(value: &str) -> Result { + /// check whether given path is not empty. + fn is_empty_path(path: &str) -> bool { + ["", "/", "\\"].iter().any(|&v| v.eq(path)) + } + if is_empty_path(value) { return Err(Error::from_string("empty or root endpoint is not supported")); } @@ -122,7 +130,7 @@ impl TryFrom<&str> for Endpoint { // Only check if the arg is an ip address and ask for scheme since its absent. // localhost, example.com, any FQDN cannot be disambiguated from a regular file path such as // /mnt/export1. So we go ahead and start the rustfs server in FS modes in these cases. - if is_socket_addr(value) { + if net::is_socket_addr(value) { return Err(Error::from_string("invalid URL endpoint format: missing scheme http or https")); } @@ -153,11 +161,6 @@ impl TryFrom<&str> for Endpoint { } } -/// check whether given path is not empty. -fn is_empty_path(path: &str) -> bool { - ["", "/", "\\"].iter().any(|&v| v.eq(path)) -} - impl Endpoint { /// returns type of endpoint. pub fn get_type(&self) -> EndpointType { @@ -184,107 +187,198 @@ impl Endpoint { } /// resolves the host and updates if it is local or not. - fn update_is_local(&mut self) -> Result<()> { - if self.url.scheme() != "file" { - self.is_local = is_local_host(self.url.host().unwrap(), self.url.port().unwrap(), DEFAULT_PORT); + fn update_is_local(&mut self, local_port: u16) -> Result<()> { + match (self.url.scheme(), self.url.host()) { + (v, Some(host)) if v != "file" => { + self.is_local = net::is_local_host(host, self.url.port().unwrap_or_default(), local_port)?; + } + _ => {} } Ok(()) } + /// returns the host to be used for grid connections. fn grid_host(&self) -> String { - let host = self.url.host_str().unwrap_or(""); - let port = self.url.port().unwrap_or(0); - if port > 0 { - format!("{}://{}:{}", self.url.scheme(), host, port) - } else { - format!("{}://{}", self.url.scheme(), host) + match (self.url.host(), self.url.port()) { + (Some(host), Some(port)) => format!("{}://{}:{}", self.url.scheme(), host, port), + (Some(host), None) => format!("{}://{}", self.url.scheme(), host), + _ => String::new(), } } } -#[derive(Debug, Clone)] +/// list of same type of endpoint. +#[derive(Debug, Default)] pub struct Endpoints(Vec); -impl Endpoints { - pub fn new() -> Self { - Self(Vec::new()) - } - - pub fn len(&self) -> usize { - self.0.len() - } - - pub fn iter(&self) -> core::slice::Iter { - self.0.iter() - } - - pub fn iter_mut(&mut self) -> core::slice::IterMut { - self.0.iter_mut() - } - - pub fn slice(&self, start: usize, end: usize) -> Vec { - self.0.as_slice()[start..end].to_vec() - } - pub fn from_args(args: Vec) -> Result { - let mut ep_type = EndpointType::UnKnow; - let mut scheme = String::new(); - let mut eps = Vec::new(); - let mut uniq_args = new_string_set(); - for (i, arg) in args.iter().enumerate() { - let endpoint = Endpoint::try_from(arg.as_str())?; - if i == 0 { - ep_type = endpoint.get_type(); - scheme = endpoint.url.scheme().to_string(); - } else if endpoint.get_type() != ep_type { - return Err(Error::from_string("不支持多种endpoints风格")); - } else if endpoint.url.scheme().to_string() != scheme { - return Err(Error::from_string("不支持多种scheme")); - } - - let arg_str = endpoint.to_string(); - - if uniq_args.contains(arg_str.as_str()) { - return Err(Error::from_string("发现重复 endpoints")); - } - - uniq_args.add(arg_str); - - eps.push(endpoint.clone()); - } - - Ok(Endpoints(eps)) +impl AsRef> for Endpoints { + fn as_ref(&self) -> &Vec { + &self.0 } } -#[warn(dead_code)] -pub struct PoolEndpointList(Vec); +impl AsMut> for Endpoints { + fn as_mut(&mut self) -> &mut Vec { + &mut self.0 + } +} -impl PoolEndpointList { - fn from_vec(v: Vec) -> Self { +impl From> for Endpoints { + fn from(v: Vec) -> Self { Self(v) } +} - pub fn push(&mut self, es: Endpoints) { - self.0.push(es) - } +impl TryFrom<&[String]> for Endpoints { + type Error = Error; - // TODO: 解析域名,判断哪个是本地地址 - fn update_is_local(&mut self) -> Result<()> { - for eps in self.0.iter_mut() { - for ep in eps.iter_mut() { - // TODO: - ep.update_is_local()? + /// returns new endpoint list based on input args. + fn try_from(args: &[String]) -> Result { + let mut endpoint_type; + let mut schema; + let mut endpoints = Vec::with_capacity(args.len()); + let mut uniq_set = HashSet::with_capacity(args.len()); + + // Loop through args and adds to endpoint list. + for (i, arg) in args.iter().enumerate() { + let endpoint = match Endpoint::try_from(arg.as_str()) { + Ok(ep) => ep, + Err(e) => return Err(Error::from_string(format!("'{}': {}", arg, e))), + }; + + // All endpoints have to be same type and scheme if applicable. + if i == 0 { + endpoint_type = endpoint.get_type(); + schema = endpoint.url.scheme(); + } else if endpoint.get_type() != endpoint_type { + return Err(Error::from_string("mixed style endpoints are not supported")); + } else if endpoint.url.scheme() != schema { + return Err(Error::from_string("mixed scheme is not supported")); } + + // Check for duplicate endpoints. + let endpoint_str = endpoint.to_string(); + if uniq_set.contains(&endpoint_str) { + return Err(Error::from_string("duplicate endpoints found")); + } + + uniq_set.insert(endpoint_str); + endpoints.push(endpoint); } - Ok(()) + Ok(Endpoints(endpoints)) } } -// PoolEndpoints represent endpoints in a given pool -// along with its setCount and setDriveCount. -#[derive(Debug, Clone)] +/// a temporary type to holds the list of endpoints +pub struct PoolEndpointList { + inner: Vec, + setup_type: SetupType, +} + +impl AsRef> for PoolEndpointList { + fn as_ref(&self) -> &Vec { + &self.inner + } +} + +impl AsMut> for PoolEndpointList { + fn as_mut(&mut self) -> &mut Vec { + &mut self.inner + } +} + +impl PoolEndpointList { + /// creates a list of endpoints per pool, resolves their relevant + /// hostnames and discovers those are local or remote. + pub fn create_pool_endpoints(server_addr: &str, disks_layout: &DisksLayout) -> Result { + if disks_layout.is_empty_layout() { + return Err(Error::from_string("invalid number of endpoints")); + } + + let server_addr = net::check_local_server_addr(server_addr)?; + + // For single arg, return single drive EC setup. + if disks_layout.is_single_drive_layout() { + let mut endpoint = Endpoint::try_from(disks_layout.get_single_drive_layout())?; + endpoint.update_is_local(server_addr.port())?; + + endpoint.set_pool_index(0); + endpoint.set_set_index(0); + endpoint.set_disk_index(0); + + // TODO Check for cross device mounts if any. + + return Ok(Self { + inner: vec![Endpoints::from(vec![endpoint])], + setup_type: SetupType::ErasureSD, + }); + } + + let mut pool_endpoints = Vec::::with_capacity(disks_layout.as_ref().len()); + for (pool_idx, pool) in disks_layout.as_ref().iter().enumerate() { + let mut endpoints = Endpoints::default(); + for (set_idx, set_layout) in pool.as_ref().iter().enumerate() { + // Convert args to endpoints + let mut eps = Endpoints::try_from(set_layout.as_slice())?; + + // TODO Check for cross device mounts if any. + + for (disk_idx, ep) in eps.as_mut().iter_mut().enumerate() { + ep.set_pool_index(pool_idx); + ep.set_set_index(set_idx); + ep.set_disk_index(disk_idx); + } + + endpoints.as_mut().append(eps.as_mut()); + } + + if endpoints.as_ref().is_empty() { + return Err(Error::from_string("invalid number of endpoints")); + } + + pool_endpoints.push(endpoints); + } + + // setup type + let mut unique_args = HashSet::new(); + for pool in pool_endpoints.iter() { + for ep in pool.as_ref() { + if let Some(host) = ep.url.host_str() { + unique_args.insert(host); + } else { + unique_args.insert(format!("localhost:{}", server_addr.port()).as_str()); + } + } + } + + let setup_type = match pool_endpoints[0].as_ref()[0].get_type() { + EndpointType::Path => SetupType::Erasure, + EndpointType::Url => match unique_args.len() { + 1 => SetupType::Erasure, + _ => SetupType::DistErasure, + }, + }; + + let mut pool_endpoint_list = Self { + inner: pool_endpoints, + setup_type, + }; + + pool_endpoint_list.update_is_local()?; + + Ok(pool_endpoint_list) + } + + fn update_is_local(&mut self) -> Result<()> { + unimplemented!() + } +} + +/// represent endpoints in a given pool +/// along with its setCount and setDriveCount. +#[derive(Debug)] pub struct PoolEndpoints { // indicates if endpoints are provided in non-ellipses style pub legacy: bool, @@ -299,314 +393,102 @@ pub struct PoolEndpoints { #[derive(Debug)] pub struct EndpointServerPools(Vec); -impl EndpointServerPools { - pub fn new() -> Self { - Self(Vec::new()) +impl From> for EndpointServerPools { + fn from(v: Vec) -> Self { + Self(v) } +} - // create_server_endpoints - pub fn create_server_endpoints( - server_addr: String, - pool_args: &Vec, - legacy: bool, - ) -> Result<(EndpointServerPools, SetupType)> { - if pool_args.is_empty() { - return Err(Error::from_string("无效参数")); +impl AsRef> for EndpointServerPools { + fn as_ref(&self) -> &Vec { + &self.0 + } +} + +impl AsMut> for EndpointServerPools { + fn as_mut(&mut self) -> &mut Vec { + &mut self.0 + } +} + +impl EndpointServerPools { + /// validates and creates new endpoints from input args, supports + /// both ellipses and without ellipses transparently. + pub fn create_server_endpoints(server_addr: &str, disks_layout: &DisksLayout) -> Result<(EndpointServerPools, SetupType)> { + if disks_layout.as_ref().is_empty() { + return Err(Error::from_string("Invalid arguments specified")); } - let (pooleps, setup_type) = create_pool_endpoints(server_addr, pool_args)?; + let mut pool_eps = PoolEndpointList::create_pool_endpoints(server_addr, disks_layout)?; - let mut ret = EndpointServerPools::new(); + let mut ret: EndpointServerPools = Vec::with_capacity(pool_eps.as_ref().len()).into(); + for (i, eps) in pool_eps.as_mut().into_iter().enumerate() { + let layout = disks_layout.get_layout(i); + let set_count = layout.map_or(0, |v| v.count()); + let drives_per_set = layout.map_or(0, |v| v.as_ref().get(0).map_or(0, |v| v.len())); + let cmd_line = layout.map_or(String::new(), |v| v.as_cmd_line().to_owned()); - for (i, eps) in pooleps.iter().enumerate() { let ep = PoolEndpoints { - legacy: legacy, - set_count: pool_args[i].layout.len(), - drives_per_set: pool_args[i].layout[0].len(), - endpoints: eps.clone(), - cmd_line: pool_args[i].cmd_line.clone(), + legacy: disks_layout.legacy, + set_count, + drives_per_set, + endpoints: *eps, + cmd_line, platform: String::new(), }; - ret.add(ep)?; + ret.add(ep); } - Ok((ret, setup_type)) - } - - pub fn first_is_local(&self) -> bool { - if self.0.is_empty() { - return false; - } - return self.0[0].endpoints.0[0].is_local; - } - - pub fn len(&self) -> usize { - self.0.len() - } - - pub fn iter(&self) -> core::slice::Iter<'_, PoolEndpoints> { - return self.0.iter(); - } - - pub fn push(&mut self, pes: PoolEndpoints) { - self.0.push(pes) + Ok((ret, pool_eps.setup_type)) } + /// add pool endpoints pub fn add(&mut self, eps: PoolEndpoints) -> Result<()> { - let mut exits = new_string_set(); + let mut exits = HashSet::new(); for peps in self.0.iter() { - for ep in peps.endpoints.0.iter() { - exits.add(ep.to_string()); + for ep in peps.endpoints.as_ref() { + exits.insert(ep.to_string()); } } - for ep in eps.endpoints.0.iter() { + for ep in eps.endpoints.as_ref() { if exits.contains(&ep.to_string()) { - return Err(Error::from_string("endpoints exists")); + return Err(Error::from_string("duplicate endpoints found")); } } self.0.push(eps); + Ok(()) } + /// returns a sorted list of nodes in this cluster pub fn get_nodes(&self) -> Vec { let mut node_map = HashMap::new(); - for pool in self.iter() { - for ep in pool.endpoints.iter() { - let mut node = Node { + for pool in self.0.iter() { + for ep in pool.endpoints.as_ref() { + let host = ep.grid_host(); + let n = node_map.entry(host).or_insert(Node { url: ep.url.clone(), pools: vec![], is_local: ep.is_local, - grid_host: ep.grid_host(), - }; - if !node.pools.contains(&ep.pool_idx) { - node.pools.push(ep.pool_idx) - } + grid_host: host, + }); - node_map.insert(node.grid_host.clone(), node); + if let Some(pool_idx) = ep.pool_idx { + if !n.pools.contains(&pool_idx) { + n.pools.push(pool_idx); + } + } } } let mut nodes: Vec = node_map.into_iter().map(|(_, n)| n).collect(); - // nodes.sort_by(|a, b| a.cmp(b)); + nodes.sort_by(|a, b| a.grid_host.cmp(&b.grid_host)); nodes } } - -/// enum for setup type. -#[derive(Debug)] -pub enum SetupType { - /// starts with unknown setup type. - Unknown, - - /// FS setup type enum. - FS, - - /// Erasure single drive setup enum. - ErasureSD, - - /// Erasure setup type enum. - Erasure, - - /// Distributed Erasure setup type enum. - DistErasure, -} - -fn is_empty_layout(pools_layout: &Vec) -> bool { - if pools_layout.is_empty() { - return true; - } - let first_layout = &pools_layout[0]; - if first_layout.layout.is_empty() || first_layout.layout[0].is_empty() || first_layout.layout[0][0].is_empty() { - return true; - } - false -} - -// 检查是否是单驱动器布局 -fn is_single_drive_layout(pools_layout: &Vec) -> bool { - if pools_layout.len() == 1 && pools_layout[0].layout.len() == 1 && pools_layout[0].layout[0].len() == 1 { - true - } else { - false - } -} - -pub fn create_pool_endpoints_v2(server_addr: &str, disks_layout: &DisksLayout) -> Result<(Vec, SetupType)> { - if disks_layout.is_empty_layout() { - return Err(Error::from_string("invalid number of endpoints")); - } - - if disks_layout.is_single_drive_layout() {} - - unimplemented!() -} - -pub fn create_pool_endpoints(server_addr: String, pools: &Vec) -> Result<(Vec, SetupType)> { - if is_empty_layout(pools) { - return Err(Error::from_string("empty layout")); - } - - // TODO: CheckLocalServerAddr - - if is_single_drive_layout(pools) { - let mut endpoint = Endpoint::new(pools[0].layout[0][0].as_str())?; - endpoint.update_islocal()?; - - if endpoint.get_type() != EndpointType::Path { - return Err(Error::from_string("use path style endpoint for single node setup")); - } - - endpoint.set_pool_index(0); - endpoint.set_set_index(0); - endpoint.set_disk_index(0); - - let mut endpoints = Vec::new(); - endpoints.push(endpoint); - - // TODO: checkCrossDeviceMounts - - return Ok((vec![Endpoints(endpoints)], SetupType::ErasureSD)); - } - - let mut ret = Vec::with_capacity(pools.len()); - - for (pool_idx, pool) in pools.iter().enumerate() { - let mut endpoints = Endpoints::new(); - for (set_idx, set_layout) in pool.layout.iter().enumerate() { - let mut eps = Endpoints::from_args(set_layout.to_owned())?; - // TODO: checkCrossDeviceMounts - for (disk_idx, ep) in eps.0.iter_mut().enumerate() { - ep.set_pool_index(pool_idx); - ep.set_set_index(set_idx); - ep.set_disk_index(disk_idx); - - endpoints.0.push(ep.to_owned()); - } - } - - if endpoints.0.is_empty() { - return Err(Error::from_string("invalid number of endpoints")); - } - - ret.push(endpoints); - } - - // TODO: - PoolEndpointList::from_vec(ret.clone()).update_is_local()?; - - let mut setup_type = SetupType::Unknown; - - // TODO: parse server port - let (_, server_port) = split_host_port(server_addr.as_str())?; - - let mut uniq_host = new_string_set(); - - for (_i, eps) in ret.iter_mut().enumerate() { - // TODO: 一些验证,参考原m - - for ep in eps.0.iter() { - if !ep.url.has_host() { - uniq_host.add(format!("localhost:{}", server_port)); - } else { - // uniq_host.add(ep.url.domain().) - } - } - } - - let erasure_type = uniq_host.to_slice().len() == 1; - - for eps in ret.iter() { - if eps.0[0].get_type() == EndpointType::Path { - setup_type = SetupType::Erasure; - break; - } - - if eps.0[0].get_type() == EndpointType::Url { - if erasure_type { - setup_type = SetupType::Erasure; - } else { - setup_type = SetupType::DistErasure; - } - - break; - } - } - - Ok((ret, setup_type)) -} - -/// validates and creates new endpoints from input args, supports -/// both ellipses and without ellipses transparently. -// fn create_server_endpoints(server_addr: String, disks_layout: &DisksLayout) -> Result<(EndpointServerPools, SetupType)> { -// if disks_layout.is_empty() { -// return Err(Error::from_string("Invalid arguments specified")); -// } - -// let (pooleps, setup_type) = create_pool_endpoints(server_addr, &disks_layout.pools)?; - -// let mut ret = EndpointServerPools::new(); - -// for (i, eps) in pooleps.iter().enumerate() { -// let ep = PoolEndpoints { -// legacy: disks_layout.legacy, -// set_count: pool_args[i].layout.len(), -// drives_per_set: pool_args[i].layout[0].len(), -// endpoints: eps.clone(), -// cmd_line: pool_args[i].cmd_line.clone(), -// platform: String::new(), -// }; - -// ret.add(ep)?; -// } - -// Ok((ret, setup_type)) -// } - -#[cfg(test)] -mod test { - - use crate::disks_layout::DisksLayout; - - use super::*; - - #[test] - fn test_url() { - let path = "/dir/sss"; - - let u = url::Url::parse(path); - - println!("{:#?}", u) - } - - #[test] - fn test_new_endpont() { - let arg = "./data"; - let ep = Endpoint::new(arg).unwrap(); - - println!("{:?}", ep); - } - - // #[test] - // fn test_create_server_endpoints() { - // let cases = vec![(":9000", vec!["http://localhost:900{1...2}/export{1...64}".to_string()])]; - - // for (addr, args) in cases { - // let layouts = DisksLayout::try_from(args.as_slice()).unwrap(); - - // println!("layouts:{:?},{}", &layouts.pools, &layouts.legacy); - - // let (server_pool, setup_type) = create_server_endpoints(addr.to_string(), &layouts.pools, layouts.legacy).unwrap(); - - // println!("setup_type -- {:?}", setup_type); - // println!("server_pool == {:?}", server_pool); - // } - - // // create_server_endpoints(server_addr, pool_args, legacy) - // } -} diff --git a/ecstore/src/error.rs b/ecstore/src/error.rs index 57bc687c..f18a27b2 100644 --- a/ecstore/src/error.rs +++ b/ecstore/src/error.rs @@ -1,7 +1,7 @@ use s3s::S3Error; use s3s::S3ErrorCode; use s3s::StdError; - +use std::fmt::Display; use std::panic::Location; use tracing::error; @@ -44,6 +44,12 @@ impl From for S3Error { } } +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.source.fmt(f) + } +} + #[inline] #[track_caller] pub(crate) fn log(source: &dyn std::error::Error) { diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index ddfbe12e..7e61af87 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -27,7 +27,7 @@ impl S3PeerSys { pub fn new(eps: &EndpointServerPools, local_disks: Vec) -> Self { Self { clients: Self::new_clients(eps, local_disks), - pools_count: eps.len(), + pools_count: eps.as_ref().len(), } } @@ -37,15 +37,11 @@ impl S3PeerSys { .iter() .map(|e| { if e.is_local { - let cli: Box = Box::new(LocalPeerS3Client::new( - local_disks.clone(), - e.clone(), - e.pools.clone(), - )); + let cli: Box = + Box::new(LocalPeerS3Client::new(local_disks.clone(), e.clone(), e.pools.clone())); Arc::new(cli) } else { - let cli: Box = - Box::new(RemotePeerS3Client::new(e.clone(), e.pools.clone())); + let cli: Box = Box::new(RemotePeerS3Client::new(e.clone(), e.pools.clone())); Arc::new(cli) } }) diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 60d7c28a..dfdeaa0f 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -33,7 +33,7 @@ impl ECStore { let mut deployment_id = None; - let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address, &layouts.pools, layouts.legacy)?; + let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address.as_str(), &layouts)?; let mut pools = Vec::with_capacity(endpoint_pools.len()); let mut disk_map = HashMap::with_capacity(endpoint_pools.len()); diff --git a/ecstore/src/utils/net.rs b/ecstore/src/utils/net.rs index 6c91c4f7..81441512 100644 --- a/ecstore/src/utils/net.rs +++ b/ecstore/src/utils/net.rs @@ -1,17 +1,43 @@ +use crate::error::{Error, Result}; use std::{ collections::HashSet, net::{IpAddr, SocketAddr, ToSocketAddrs}, }; - -use anyhow::{Error, Result}; -use netif; use url::Host; -// helper for validating if the provided arg is an ip address. +/// helper for validating if the provided arg is an ip address. pub fn is_socket_addr(host: &str) -> bool { host.parse::().is_ok() || host.parse::().is_ok() } +/// checks if server_addr is valid and local host. +pub fn check_local_server_addr(server_addr: &str) -> Result { + let addr: Vec = match server_addr.to_socket_addrs() { + Ok(addr) => addr.collect(), + Err(err) => return Err(Error::new(Box::new(err))), + }; + + // 0.0.0.0 is a wildcard address and refers to local network + // addresses. I.e, 0.0.0.0:9000 like ":9000" refers to port + // 9000 on localhost. + for a in addr { + if a.ip().is_unspecified() { + return Ok(a); + } + + let host = match a { + SocketAddr::V4(a) => Host::<&str>::Ipv4(*a.ip()), + SocketAddr::V6(a) => Host::Ipv6(*a.ip()), + }; + + if is_local_host(host, 0, 0).is_ok() { + return Ok(a); + } + } + + return Err(Error::from_string("host in server address should be this server")); +} + pub fn split_host_port(s: &str) -> Result<(String, u16)> { let parts: Vec<&str> = s.split(':').collect(); if parts.len() == 2 { @@ -19,12 +45,12 @@ pub fn split_host_port(s: &str) -> Result<(String, u16)> { return Ok((parts[0].to_string(), port)); } } - Err(Error::msg("Invalid address format or port number")) + Err(Error::from_string("Invalid address format or port number")) } /// checks if the given parameter correspond to one of /// the local IP of the current machine -pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool { +pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> Result { let local_ips = must_get_local_ips(); let local_set: HashSet = local_ips.into_iter().collect(); @@ -32,7 +58,7 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool { Host::Domain(domain) => { let ips = match (domain, 0).to_socket_addrs().map(|v| v.map(|v| v.ip()).collect::>()) { Ok(ips) => ips, - Err(_) => return false, + Err(err) => return Err(Error::new(Box::new(err))), }; ips.iter().any(|ip| local_set.contains(ip)) @@ -42,14 +68,14 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool { }; if port > 0 { - return is_local_host && port == local_port; + return Ok(is_local_host && port == local_port); } - is_local_host + Ok(is_local_host) } /// returns IPs of local interface -pub fn must_get_local_ips() -> Vec { +fn must_get_local_ips() -> Vec { match netif::up() { Ok(up) => up.map(|x| x.address().to_owned()).collect(), Err(_) => vec![], @@ -58,10 +84,28 @@ pub fn must_get_local_ips() -> Vec { #[cfg(test)] mod test { - use std::net::{Ipv4Addr, Ipv6Addr}; + use std::net::Ipv4Addr; use super::*; + #[test] + fn test_is_socket_addr() { + let test_cases = [ + ("localhost", false), + ("localhost:9000", false), + ("example.com", false), + ("http://192.168.1.0", false), + ("http://192.168.1.0:9000", false), + ("192.168.1.0", true), + ("[2001:db8::1]:9000", true), + ]; + + for (addr, expected) in test_cases { + let ret = is_socket_addr(addr); + assert_eq!(expected, ret, "addr: {}, expected: {}, got: {}", addr, expected, ret); + } + } + #[test] fn test_must_get_local_ips() { let local_ips = must_get_local_ips(); @@ -69,23 +113,4 @@ mod test { assert!(local_set.contains(&IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))); } - - #[test] - fn test_is_local_host() { - let host = Host::Domain("localhost"); - let is = is_local_host(host, 0, 9000); - assert!(is); - - let host = Host::Ipv6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)); - let is = is_local_host(host, 0, 9000); - assert!(is); - - let host = Host::Ipv4(Ipv4Addr::new(8, 8, 8, 8)); - let is = is_local_host(host, 0, 9000); - assert!(!is); - - let host = Host::Ipv4(Ipv4Addr::new(127, 0, 0, 1)); - let is = is_local_host(host, 8000, 9000); - assert!(!is); - } }