diff --git a/ecstore/src/disk.rs b/ecstore/src/disk.rs index 22a42268..04aed70e 100644 --- a/ecstore/src/disk.rs +++ b/ecstore/src/disk.rs @@ -54,14 +54,14 @@ pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result { } pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec>, Vec>) { - let mut futures = Vec::with_capacity(eps.len()); + let mut futures = Vec::with_capacity(eps.as_ref().len()); - for ep in eps.iter() { + for ep in eps.as_ref().iter() { futures.push(new_disk(ep, opt)); } - let mut res = Vec::with_capacity(eps.len()); - let mut errors = Vec::with_capacity(eps.len()); + let mut res = Vec::with_capacity(eps.as_ref().len()); + let mut errors = Vec::with_capacity(eps.as_ref().len()); let results = join_all(futures).await; for result in results { @@ -119,7 +119,7 @@ impl LocalDisk { let fm = FormatV3::try_from(s)?; let (set_idx, disk_idx) = fm.find_disk_index_by_disk_id(fm.erasure.this)?; - if set_idx as i32 != ep.set_idx || disk_idx as i32 != ep.disk_idx { + if Some(set_idx) != ep.set_idx || Some(disk_idx) != ep.disk_idx { return Err(Error::new(DiskError::InconsistentDisk)); } @@ -606,7 +606,7 @@ mod test { let p = "./testv"; fs::create_dir_all(&p).await.unwrap(); - let ep = match Endpoint::new(&p) { + let ep = match Endpoint::try_from(p) { Ok(e) => e, Err(e) => { println!("{e}"); diff --git a/ecstore/src/disks_layout.rs b/ecstore/src/disks_layout.rs index 94ccc750..261a239c 100644 --- a/ecstore/src/disks_layout.rs +++ b/ecstore/src/disks_layout.rs @@ -9,8 +9,20 @@ const SET_SIZES: [usize; 15] = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, #[derive(Deserialize, Debug, Default)] pub struct PoolDisksLayout { - pub cmd_line: String, - pub layout: Vec>, + cmd_line: String, + layout: Vec>, +} + +impl AsRef>> for PoolDisksLayout { + fn as_ref(&self) -> &Vec> { + &self.layout + } +} + +impl AsMut>> for PoolDisksLayout { + fn as_mut(&mut self) -> &mut Vec> { + &mut self.layout + } } impl PoolDisksLayout { @@ -20,12 +32,32 @@ impl PoolDisksLayout { layout, } } + + pub fn count(&self) -> usize { + self.layout.len() + } + + pub fn as_cmd_line(&self) -> &str { + &self.cmd_line + } } #[derive(Deserialize, Debug, Default)] pub struct DisksLayout { pub legacy: bool, - pub pools: Vec, + pools: Vec, +} + +impl AsRef> for DisksLayout { + fn as_ref(&self) -> &Vec { + &self.pools + } +} + +impl AsMut> for DisksLayout { + fn as_mut(&mut self) -> &mut Vec { + &mut self.pools + } } impl> TryFrom<&[T]> for DisksLayout { @@ -82,6 +114,14 @@ impl DisksLayout { pub fn is_single_drive_layout(&self) -> bool { self.pools.len() == 1 && self.pools[0].layout.len() == 1 && self.pools[0].layout[0].len() == 1 } + + pub fn get_single_drive_layout(&self) -> &str { + &self.pools[0].layout[0][0] + } + + pub fn get_layout(&self, idx: usize) -> Option<&PoolDisksLayout> { + self.pools.get(idx) + } } /// parses all ellipses input arguments, expands them into diff --git a/ecstore/src/endpoint.rs b/ecstore/src/endpoint.rs index 5796df56..5ee20f3c 100644 --- a/ecstore/src/endpoint.rs +++ b/ecstore/src/endpoint.rs @@ -1,16 +1,12 @@ -use super::disks_layout::{DisksLayout, PoolDisksLayout}; +use super::disks_layout::DisksLayout; use super::error::{Error, Result}; -use super::utils::{ - net::{is_local_host, is_socket_addr, split_host_port}, - string::new_string_set, -}; +use super::utils::net; use path_absolutize::Absolutize; +use std::collections::HashSet; use std::fmt::Display; use std::{collections::HashMap, path::Path, usize}; use url::{ParseError, Url}; -pub const DEFAULT_PORT: u16 = 9000; - /// enum for endpoint type. #[derive(PartialEq, Eq)] pub enum EndpointType { @@ -19,26 +15,33 @@ pub enum EndpointType { /// URL style endpoint type enum. Url, +} - /// Unknown endpoint type enum. - UnKnow, +/// enum for setup type. +#[derive(Debug)] +pub enum SetupType { + /// FS setup type enum. + FS, + + /// Erasure single drive setup enum. + ErasureSD, + + /// Erasure setup type enum. + Erasure, + + /// Distributed Erasure setup type enum. + DistErasure, } /// holds information about a node in this cluster #[derive(Debug, Clone, PartialEq, Eq)] pub struct Node { pub url: url::Url, - pub pools: Vec, + pub pools: Vec, pub is_local: bool, - pub grid_host: String, // TODO "scheme://host:port" + pub grid_host: String, } -// impl PartialOrd for Node { -// fn partial_cmp(&self, other: &Self) -> Option { -// self.grid_host.partial_cmp(&other.grid_host) -// } -// } - /// any type of endpoint. #[derive(Debug, Clone)] pub struct Endpoint { @@ -66,6 +69,11 @@ impl TryFrom<&str> for Endpoint { /// Performs the conversion. fn try_from(value: &str) -> Result { + /// check whether given path is not empty. + fn is_empty_path(path: &str) -> bool { + ["", "/", "\\"].iter().any(|&v| v.eq(path)) + } + if is_empty_path(value) { return Err(Error::from_string("empty or root endpoint is not supported")); } @@ -122,7 +130,7 @@ impl TryFrom<&str> for Endpoint { // Only check if the arg is an ip address and ask for scheme since its absent. // localhost, example.com, any FQDN cannot be disambiguated from a regular file path such as // /mnt/export1. So we go ahead and start the rustfs server in FS modes in these cases. - if is_socket_addr(value) { + if net::is_socket_addr(value) { return Err(Error::from_string("invalid URL endpoint format: missing scheme http or https")); } @@ -153,11 +161,6 @@ impl TryFrom<&str> for Endpoint { } } -/// check whether given path is not empty. -fn is_empty_path(path: &str) -> bool { - ["", "/", "\\"].iter().any(|&v| v.eq(path)) -} - impl Endpoint { /// returns type of endpoint. pub fn get_type(&self) -> EndpointType { @@ -184,107 +187,198 @@ impl Endpoint { } /// resolves the host and updates if it is local or not. - fn update_is_local(&mut self) -> Result<()> { - if self.url.scheme() != "file" { - self.is_local = is_local_host(self.url.host().unwrap(), self.url.port().unwrap(), DEFAULT_PORT); + fn update_is_local(&mut self, local_port: u16) -> Result<()> { + match (self.url.scheme(), self.url.host()) { + (v, Some(host)) if v != "file" => { + self.is_local = net::is_local_host(host, self.url.port().unwrap_or_default(), local_port)?; + } + _ => {} } Ok(()) } + /// returns the host to be used for grid connections. fn grid_host(&self) -> String { - let host = self.url.host_str().unwrap_or(""); - let port = self.url.port().unwrap_or(0); - if port > 0 { - format!("{}://{}:{}", self.url.scheme(), host, port) - } else { - format!("{}://{}", self.url.scheme(), host) + match (self.url.host(), self.url.port()) { + (Some(host), Some(port)) => format!("{}://{}:{}", self.url.scheme(), host, port), + (Some(host), None) => format!("{}://{}", self.url.scheme(), host), + _ => String::new(), } } } -#[derive(Debug, Clone)] +/// list of same type of endpoint. +#[derive(Debug, Default)] pub struct Endpoints(Vec); -impl Endpoints { - pub fn new() -> Self { - Self(Vec::new()) - } - - pub fn len(&self) -> usize { - self.0.len() - } - - pub fn iter(&self) -> core::slice::Iter { - self.0.iter() - } - - pub fn iter_mut(&mut self) -> core::slice::IterMut { - self.0.iter_mut() - } - - pub fn slice(&self, start: usize, end: usize) -> Vec { - self.0.as_slice()[start..end].to_vec() - } - pub fn from_args(args: Vec) -> Result { - let mut ep_type = EndpointType::UnKnow; - let mut scheme = String::new(); - let mut eps = Vec::new(); - let mut uniq_args = new_string_set(); - for (i, arg) in args.iter().enumerate() { - let endpoint = Endpoint::try_from(arg.as_str())?; - if i == 0 { - ep_type = endpoint.get_type(); - scheme = endpoint.url.scheme().to_string(); - } else if endpoint.get_type() != ep_type { - return Err(Error::from_string("不支持多种endpoints风格")); - } else if endpoint.url.scheme().to_string() != scheme { - return Err(Error::from_string("不支持多种scheme")); - } - - let arg_str = endpoint.to_string(); - - if uniq_args.contains(arg_str.as_str()) { - return Err(Error::from_string("发现重复 endpoints")); - } - - uniq_args.add(arg_str); - - eps.push(endpoint.clone()); - } - - Ok(Endpoints(eps)) +impl AsRef> for Endpoints { + fn as_ref(&self) -> &Vec { + &self.0 } } -#[warn(dead_code)] -pub struct PoolEndpointList(Vec); +impl AsMut> for Endpoints { + fn as_mut(&mut self) -> &mut Vec { + &mut self.0 + } +} -impl PoolEndpointList { - fn from_vec(v: Vec) -> Self { +impl From> for Endpoints { + fn from(v: Vec) -> Self { Self(v) } +} - pub fn push(&mut self, es: Endpoints) { - self.0.push(es) - } +impl TryFrom<&[String]> for Endpoints { + type Error = Error; - // TODO: 解析域名,判断哪个是本地地址 - fn update_is_local(&mut self) -> Result<()> { - for eps in self.0.iter_mut() { - for ep in eps.iter_mut() { - // TODO: - ep.update_is_local()? + /// returns new endpoint list based on input args. + fn try_from(args: &[String]) -> Result { + let mut endpoint_type; + let mut schema; + let mut endpoints = Vec::with_capacity(args.len()); + let mut uniq_set = HashSet::with_capacity(args.len()); + + // Loop through args and adds to endpoint list. + for (i, arg) in args.iter().enumerate() { + let endpoint = match Endpoint::try_from(arg.as_str()) { + Ok(ep) => ep, + Err(e) => return Err(Error::from_string(format!("'{}': {}", arg, e))), + }; + + // All endpoints have to be same type and scheme if applicable. + if i == 0 { + endpoint_type = endpoint.get_type(); + schema = endpoint.url.scheme(); + } else if endpoint.get_type() != endpoint_type { + return Err(Error::from_string("mixed style endpoints are not supported")); + } else if endpoint.url.scheme() != schema { + return Err(Error::from_string("mixed scheme is not supported")); } + + // Check for duplicate endpoints. + let endpoint_str = endpoint.to_string(); + if uniq_set.contains(&endpoint_str) { + return Err(Error::from_string("duplicate endpoints found")); + } + + uniq_set.insert(endpoint_str); + endpoints.push(endpoint); } - Ok(()) + Ok(Endpoints(endpoints)) } } -// PoolEndpoints represent endpoints in a given pool -// along with its setCount and setDriveCount. -#[derive(Debug, Clone)] +/// a temporary type to holds the list of endpoints +pub struct PoolEndpointList { + inner: Vec, + setup_type: SetupType, +} + +impl AsRef> for PoolEndpointList { + fn as_ref(&self) -> &Vec { + &self.inner + } +} + +impl AsMut> for PoolEndpointList { + fn as_mut(&mut self) -> &mut Vec { + &mut self.inner + } +} + +impl PoolEndpointList { + /// creates a list of endpoints per pool, resolves their relevant + /// hostnames and discovers those are local or remote. + pub fn create_pool_endpoints(server_addr: &str, disks_layout: &DisksLayout) -> Result { + if disks_layout.is_empty_layout() { + return Err(Error::from_string("invalid number of endpoints")); + } + + let server_addr = net::check_local_server_addr(server_addr)?; + + // For single arg, return single drive EC setup. + if disks_layout.is_single_drive_layout() { + let mut endpoint = Endpoint::try_from(disks_layout.get_single_drive_layout())?; + endpoint.update_is_local(server_addr.port())?; + + endpoint.set_pool_index(0); + endpoint.set_set_index(0); + endpoint.set_disk_index(0); + + // TODO Check for cross device mounts if any. + + return Ok(Self { + inner: vec![Endpoints::from(vec![endpoint])], + setup_type: SetupType::ErasureSD, + }); + } + + let mut pool_endpoints = Vec::::with_capacity(disks_layout.as_ref().len()); + for (pool_idx, pool) in disks_layout.as_ref().iter().enumerate() { + let mut endpoints = Endpoints::default(); + for (set_idx, set_layout) in pool.as_ref().iter().enumerate() { + // Convert args to endpoints + let mut eps = Endpoints::try_from(set_layout.as_slice())?; + + // TODO Check for cross device mounts if any. + + for (disk_idx, ep) in eps.as_mut().iter_mut().enumerate() { + ep.set_pool_index(pool_idx); + ep.set_set_index(set_idx); + ep.set_disk_index(disk_idx); + } + + endpoints.as_mut().append(eps.as_mut()); + } + + if endpoints.as_ref().is_empty() { + return Err(Error::from_string("invalid number of endpoints")); + } + + pool_endpoints.push(endpoints); + } + + // setup type + let mut unique_args = HashSet::new(); + for pool in pool_endpoints.iter() { + for ep in pool.as_ref() { + if let Some(host) = ep.url.host_str() { + unique_args.insert(host); + } else { + unique_args.insert(format!("localhost:{}", server_addr.port()).as_str()); + } + } + } + + let setup_type = match pool_endpoints[0].as_ref()[0].get_type() { + EndpointType::Path => SetupType::Erasure, + EndpointType::Url => match unique_args.len() { + 1 => SetupType::Erasure, + _ => SetupType::DistErasure, + }, + }; + + let mut pool_endpoint_list = Self { + inner: pool_endpoints, + setup_type, + }; + + pool_endpoint_list.update_is_local()?; + + Ok(pool_endpoint_list) + } + + fn update_is_local(&mut self) -> Result<()> { + unimplemented!() + } +} + +/// represent endpoints in a given pool +/// along with its setCount and setDriveCount. +#[derive(Debug)] pub struct PoolEndpoints { // indicates if endpoints are provided in non-ellipses style pub legacy: bool, @@ -299,314 +393,102 @@ pub struct PoolEndpoints { #[derive(Debug)] pub struct EndpointServerPools(Vec); -impl EndpointServerPools { - pub fn new() -> Self { - Self(Vec::new()) +impl From> for EndpointServerPools { + fn from(v: Vec) -> Self { + Self(v) } +} - // create_server_endpoints - pub fn create_server_endpoints( - server_addr: String, - pool_args: &Vec, - legacy: bool, - ) -> Result<(EndpointServerPools, SetupType)> { - if pool_args.is_empty() { - return Err(Error::from_string("无效参数")); +impl AsRef> for EndpointServerPools { + fn as_ref(&self) -> &Vec { + &self.0 + } +} + +impl AsMut> for EndpointServerPools { + fn as_mut(&mut self) -> &mut Vec { + &mut self.0 + } +} + +impl EndpointServerPools { + /// validates and creates new endpoints from input args, supports + /// both ellipses and without ellipses transparently. + pub fn create_server_endpoints(server_addr: &str, disks_layout: &DisksLayout) -> Result<(EndpointServerPools, SetupType)> { + if disks_layout.as_ref().is_empty() { + return Err(Error::from_string("Invalid arguments specified")); } - let (pooleps, setup_type) = create_pool_endpoints(server_addr, pool_args)?; + let mut pool_eps = PoolEndpointList::create_pool_endpoints(server_addr, disks_layout)?; - let mut ret = EndpointServerPools::new(); + let mut ret: EndpointServerPools = Vec::with_capacity(pool_eps.as_ref().len()).into(); + for (i, eps) in pool_eps.as_mut().into_iter().enumerate() { + let layout = disks_layout.get_layout(i); + let set_count = layout.map_or(0, |v| v.count()); + let drives_per_set = layout.map_or(0, |v| v.as_ref().get(0).map_or(0, |v| v.len())); + let cmd_line = layout.map_or(String::new(), |v| v.as_cmd_line().to_owned()); - for (i, eps) in pooleps.iter().enumerate() { let ep = PoolEndpoints { - legacy: legacy, - set_count: pool_args[i].layout.len(), - drives_per_set: pool_args[i].layout[0].len(), - endpoints: eps.clone(), - cmd_line: pool_args[i].cmd_line.clone(), + legacy: disks_layout.legacy, + set_count, + drives_per_set, + endpoints: *eps, + cmd_line, platform: String::new(), }; - ret.add(ep)?; + ret.add(ep); } - Ok((ret, setup_type)) - } - - pub fn first_is_local(&self) -> bool { - if self.0.is_empty() { - return false; - } - return self.0[0].endpoints.0[0].is_local; - } - - pub fn len(&self) -> usize { - self.0.len() - } - - pub fn iter(&self) -> core::slice::Iter<'_, PoolEndpoints> { - return self.0.iter(); - } - - pub fn push(&mut self, pes: PoolEndpoints) { - self.0.push(pes) + Ok((ret, pool_eps.setup_type)) } + /// add pool endpoints pub fn add(&mut self, eps: PoolEndpoints) -> Result<()> { - let mut exits = new_string_set(); + let mut exits = HashSet::new(); for peps in self.0.iter() { - for ep in peps.endpoints.0.iter() { - exits.add(ep.to_string()); + for ep in peps.endpoints.as_ref() { + exits.insert(ep.to_string()); } } - for ep in eps.endpoints.0.iter() { + for ep in eps.endpoints.as_ref() { if exits.contains(&ep.to_string()) { - return Err(Error::from_string("endpoints exists")); + return Err(Error::from_string("duplicate endpoints found")); } } self.0.push(eps); + Ok(()) } + /// returns a sorted list of nodes in this cluster pub fn get_nodes(&self) -> Vec { let mut node_map = HashMap::new(); - for pool in self.iter() { - for ep in pool.endpoints.iter() { - let mut node = Node { + for pool in self.0.iter() { + for ep in pool.endpoints.as_ref() { + let host = ep.grid_host(); + let n = node_map.entry(host).or_insert(Node { url: ep.url.clone(), pools: vec![], is_local: ep.is_local, - grid_host: ep.grid_host(), - }; - if !node.pools.contains(&ep.pool_idx) { - node.pools.push(ep.pool_idx) - } + grid_host: host, + }); - node_map.insert(node.grid_host.clone(), node); + if let Some(pool_idx) = ep.pool_idx { + if !n.pools.contains(&pool_idx) { + n.pools.push(pool_idx); + } + } } } let mut nodes: Vec = node_map.into_iter().map(|(_, n)| n).collect(); - // nodes.sort_by(|a, b| a.cmp(b)); + nodes.sort_by(|a, b| a.grid_host.cmp(&b.grid_host)); nodes } } - -/// enum for setup type. -#[derive(Debug)] -pub enum SetupType { - /// starts with unknown setup type. - Unknown, - - /// FS setup type enum. - FS, - - /// Erasure single drive setup enum. - ErasureSD, - - /// Erasure setup type enum. - Erasure, - - /// Distributed Erasure setup type enum. - DistErasure, -} - -fn is_empty_layout(pools_layout: &Vec) -> bool { - if pools_layout.is_empty() { - return true; - } - let first_layout = &pools_layout[0]; - if first_layout.layout.is_empty() || first_layout.layout[0].is_empty() || first_layout.layout[0][0].is_empty() { - return true; - } - false -} - -// 检查是否是单驱动器布局 -fn is_single_drive_layout(pools_layout: &Vec) -> bool { - if pools_layout.len() == 1 && pools_layout[0].layout.len() == 1 && pools_layout[0].layout[0].len() == 1 { - true - } else { - false - } -} - -pub fn create_pool_endpoints_v2(server_addr: &str, disks_layout: &DisksLayout) -> Result<(Vec, SetupType)> { - if disks_layout.is_empty_layout() { - return Err(Error::from_string("invalid number of endpoints")); - } - - if disks_layout.is_single_drive_layout() {} - - unimplemented!() -} - -pub fn create_pool_endpoints(server_addr: String, pools: &Vec) -> Result<(Vec, SetupType)> { - if is_empty_layout(pools) { - return Err(Error::from_string("empty layout")); - } - - // TODO: CheckLocalServerAddr - - if is_single_drive_layout(pools) { - let mut endpoint = Endpoint::new(pools[0].layout[0][0].as_str())?; - endpoint.update_islocal()?; - - if endpoint.get_type() != EndpointType::Path { - return Err(Error::from_string("use path style endpoint for single node setup")); - } - - endpoint.set_pool_index(0); - endpoint.set_set_index(0); - endpoint.set_disk_index(0); - - let mut endpoints = Vec::new(); - endpoints.push(endpoint); - - // TODO: checkCrossDeviceMounts - - return Ok((vec![Endpoints(endpoints)], SetupType::ErasureSD)); - } - - let mut ret = Vec::with_capacity(pools.len()); - - for (pool_idx, pool) in pools.iter().enumerate() { - let mut endpoints = Endpoints::new(); - for (set_idx, set_layout) in pool.layout.iter().enumerate() { - let mut eps = Endpoints::from_args(set_layout.to_owned())?; - // TODO: checkCrossDeviceMounts - for (disk_idx, ep) in eps.0.iter_mut().enumerate() { - ep.set_pool_index(pool_idx); - ep.set_set_index(set_idx); - ep.set_disk_index(disk_idx); - - endpoints.0.push(ep.to_owned()); - } - } - - if endpoints.0.is_empty() { - return Err(Error::from_string("invalid number of endpoints")); - } - - ret.push(endpoints); - } - - // TODO: - PoolEndpointList::from_vec(ret.clone()).update_is_local()?; - - let mut setup_type = SetupType::Unknown; - - // TODO: parse server port - let (_, server_port) = split_host_port(server_addr.as_str())?; - - let mut uniq_host = new_string_set(); - - for (_i, eps) in ret.iter_mut().enumerate() { - // TODO: 一些验证,参考原m - - for ep in eps.0.iter() { - if !ep.url.has_host() { - uniq_host.add(format!("localhost:{}", server_port)); - } else { - // uniq_host.add(ep.url.domain().) - } - } - } - - let erasure_type = uniq_host.to_slice().len() == 1; - - for eps in ret.iter() { - if eps.0[0].get_type() == EndpointType::Path { - setup_type = SetupType::Erasure; - break; - } - - if eps.0[0].get_type() == EndpointType::Url { - if erasure_type { - setup_type = SetupType::Erasure; - } else { - setup_type = SetupType::DistErasure; - } - - break; - } - } - - Ok((ret, setup_type)) -} - -/// validates and creates new endpoints from input args, supports -/// both ellipses and without ellipses transparently. -// fn create_server_endpoints(server_addr: String, disks_layout: &DisksLayout) -> Result<(EndpointServerPools, SetupType)> { -// if disks_layout.is_empty() { -// return Err(Error::from_string("Invalid arguments specified")); -// } - -// let (pooleps, setup_type) = create_pool_endpoints(server_addr, &disks_layout.pools)?; - -// let mut ret = EndpointServerPools::new(); - -// for (i, eps) in pooleps.iter().enumerate() { -// let ep = PoolEndpoints { -// legacy: disks_layout.legacy, -// set_count: pool_args[i].layout.len(), -// drives_per_set: pool_args[i].layout[0].len(), -// endpoints: eps.clone(), -// cmd_line: pool_args[i].cmd_line.clone(), -// platform: String::new(), -// }; - -// ret.add(ep)?; -// } - -// Ok((ret, setup_type)) -// } - -#[cfg(test)] -mod test { - - use crate::disks_layout::DisksLayout; - - use super::*; - - #[test] - fn test_url() { - let path = "/dir/sss"; - - let u = url::Url::parse(path); - - println!("{:#?}", u) - } - - #[test] - fn test_new_endpont() { - let arg = "./data"; - let ep = Endpoint::new(arg).unwrap(); - - println!("{:?}", ep); - } - - // #[test] - // fn test_create_server_endpoints() { - // let cases = vec![(":9000", vec!["http://localhost:900{1...2}/export{1...64}".to_string()])]; - - // for (addr, args) in cases { - // let layouts = DisksLayout::try_from(args.as_slice()).unwrap(); - - // println!("layouts:{:?},{}", &layouts.pools, &layouts.legacy); - - // let (server_pool, setup_type) = create_server_endpoints(addr.to_string(), &layouts.pools, layouts.legacy).unwrap(); - - // println!("setup_type -- {:?}", setup_type); - // println!("server_pool == {:?}", server_pool); - // } - - // // create_server_endpoints(server_addr, pool_args, legacy) - // } -} diff --git a/ecstore/src/error.rs b/ecstore/src/error.rs index 57bc687c..f18a27b2 100644 --- a/ecstore/src/error.rs +++ b/ecstore/src/error.rs @@ -1,7 +1,7 @@ use s3s::S3Error; use s3s::S3ErrorCode; use s3s::StdError; - +use std::fmt::Display; use std::panic::Location; use tracing::error; @@ -44,6 +44,12 @@ impl From for S3Error { } } +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.source.fmt(f) + } +} + #[inline] #[track_caller] pub(crate) fn log(source: &dyn std::error::Error) { diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index ddfbe12e..7e61af87 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -27,7 +27,7 @@ impl S3PeerSys { pub fn new(eps: &EndpointServerPools, local_disks: Vec) -> Self { Self { clients: Self::new_clients(eps, local_disks), - pools_count: eps.len(), + pools_count: eps.as_ref().len(), } } @@ -37,15 +37,11 @@ impl S3PeerSys { .iter() .map(|e| { if e.is_local { - let cli: Box = Box::new(LocalPeerS3Client::new( - local_disks.clone(), - e.clone(), - e.pools.clone(), - )); + let cli: Box = + Box::new(LocalPeerS3Client::new(local_disks.clone(), e.clone(), e.pools.clone())); Arc::new(cli) } else { - let cli: Box = - Box::new(RemotePeerS3Client::new(e.clone(), e.pools.clone())); + let cli: Box = Box::new(RemotePeerS3Client::new(e.clone(), e.pools.clone())); Arc::new(cli) } }) diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 60d7c28a..dfdeaa0f 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -33,7 +33,7 @@ impl ECStore { let mut deployment_id = None; - let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address, &layouts.pools, layouts.legacy)?; + let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address.as_str(), &layouts)?; let mut pools = Vec::with_capacity(endpoint_pools.len()); let mut disk_map = HashMap::with_capacity(endpoint_pools.len()); diff --git a/ecstore/src/utils/net.rs b/ecstore/src/utils/net.rs index 6c91c4f7..81441512 100644 --- a/ecstore/src/utils/net.rs +++ b/ecstore/src/utils/net.rs @@ -1,17 +1,43 @@ +use crate::error::{Error, Result}; use std::{ collections::HashSet, net::{IpAddr, SocketAddr, ToSocketAddrs}, }; - -use anyhow::{Error, Result}; -use netif; use url::Host; -// helper for validating if the provided arg is an ip address. +/// helper for validating if the provided arg is an ip address. pub fn is_socket_addr(host: &str) -> bool { host.parse::().is_ok() || host.parse::().is_ok() } +/// checks if server_addr is valid and local host. +pub fn check_local_server_addr(server_addr: &str) -> Result { + let addr: Vec = match server_addr.to_socket_addrs() { + Ok(addr) => addr.collect(), + Err(err) => return Err(Error::new(Box::new(err))), + }; + + // 0.0.0.0 is a wildcard address and refers to local network + // addresses. I.e, 0.0.0.0:9000 like ":9000" refers to port + // 9000 on localhost. + for a in addr { + if a.ip().is_unspecified() { + return Ok(a); + } + + let host = match a { + SocketAddr::V4(a) => Host::<&str>::Ipv4(*a.ip()), + SocketAddr::V6(a) => Host::Ipv6(*a.ip()), + }; + + if is_local_host(host, 0, 0).is_ok() { + return Ok(a); + } + } + + return Err(Error::from_string("host in server address should be this server")); +} + pub fn split_host_port(s: &str) -> Result<(String, u16)> { let parts: Vec<&str> = s.split(':').collect(); if parts.len() == 2 { @@ -19,12 +45,12 @@ pub fn split_host_port(s: &str) -> Result<(String, u16)> { return Ok((parts[0].to_string(), port)); } } - Err(Error::msg("Invalid address format or port number")) + Err(Error::from_string("Invalid address format or port number")) } /// checks if the given parameter correspond to one of /// the local IP of the current machine -pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool { +pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> Result { let local_ips = must_get_local_ips(); let local_set: HashSet = local_ips.into_iter().collect(); @@ -32,7 +58,7 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool { Host::Domain(domain) => { let ips = match (domain, 0).to_socket_addrs().map(|v| v.map(|v| v.ip()).collect::>()) { Ok(ips) => ips, - Err(_) => return false, + Err(err) => return Err(Error::new(Box::new(err))), }; ips.iter().any(|ip| local_set.contains(ip)) @@ -42,14 +68,14 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool { }; if port > 0 { - return is_local_host && port == local_port; + return Ok(is_local_host && port == local_port); } - is_local_host + Ok(is_local_host) } /// returns IPs of local interface -pub fn must_get_local_ips() -> Vec { +fn must_get_local_ips() -> Vec { match netif::up() { Ok(up) => up.map(|x| x.address().to_owned()).collect(), Err(_) => vec![], @@ -58,10 +84,28 @@ pub fn must_get_local_ips() -> Vec { #[cfg(test)] mod test { - use std::net::{Ipv4Addr, Ipv6Addr}; + use std::net::Ipv4Addr; use super::*; + #[test] + fn test_is_socket_addr() { + let test_cases = [ + ("localhost", false), + ("localhost:9000", false), + ("example.com", false), + ("http://192.168.1.0", false), + ("http://192.168.1.0:9000", false), + ("192.168.1.0", true), + ("[2001:db8::1]:9000", true), + ]; + + for (addr, expected) in test_cases { + let ret = is_socket_addr(addr); + assert_eq!(expected, ret, "addr: {}, expected: {}, got: {}", addr, expected, ret); + } + } + #[test] fn test_must_get_local_ips() { let local_ips = must_get_local_ips(); @@ -69,23 +113,4 @@ mod test { assert!(local_set.contains(&IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))); } - - #[test] - fn test_is_local_host() { - let host = Host::Domain("localhost"); - let is = is_local_host(host, 0, 9000); - assert!(is); - - let host = Host::Ipv6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)); - let is = is_local_host(host, 0, 9000); - assert!(is); - - let host = Host::Ipv4(Ipv4Addr::new(8, 8, 8, 8)); - let is = is_local_host(host, 0, 9000); - assert!(!is); - - let host = Host::Ipv4(Ipv4Addr::new(127, 0, 0, 1)); - let is = is_local_host(host, 8000, 9000); - assert!(!is); - } }