fix: 优化endpoint

This commit is contained in:
shiro.lee
2024-07-04 18:31:08 +08:00
parent f9462162a5
commit 18b07128fc
7 changed files with 364 additions and 415 deletions

View File

@@ -54,14 +54,14 @@ pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result<DiskStore> {
}
pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec<Option<DiskStore>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(eps.len());
let mut futures = Vec::with_capacity(eps.as_ref().len());
for ep in eps.iter() {
for ep in eps.as_ref().iter() {
futures.push(new_disk(ep, opt));
}
let mut res = Vec::with_capacity(eps.len());
let mut errors = Vec::with_capacity(eps.len());
let mut res = Vec::with_capacity(eps.as_ref().len());
let mut errors = Vec::with_capacity(eps.as_ref().len());
let results = join_all(futures).await;
for result in results {
@@ -119,7 +119,7 @@ impl LocalDisk {
let fm = FormatV3::try_from(s)?;
let (set_idx, disk_idx) = fm.find_disk_index_by_disk_id(fm.erasure.this)?;
if set_idx as i32 != ep.set_idx || disk_idx as i32 != ep.disk_idx {
if Some(set_idx) != ep.set_idx || Some(disk_idx) != ep.disk_idx {
return Err(Error::new(DiskError::InconsistentDisk));
}
@@ -606,7 +606,7 @@ mod test {
let p = "./testv";
fs::create_dir_all(&p).await.unwrap();
let ep = match Endpoint::new(&p) {
let ep = match Endpoint::try_from(p) {
Ok(e) => e,
Err(e) => {
println!("{e}");

View File

@@ -9,8 +9,20 @@ const SET_SIZES: [usize; 15] = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
#[derive(Deserialize, Debug, Default)]
pub struct PoolDisksLayout {
pub cmd_line: String,
pub layout: Vec<Vec<String>>,
cmd_line: String,
layout: Vec<Vec<String>>,
}
impl AsRef<Vec<Vec<String>>> for PoolDisksLayout {
fn as_ref(&self) -> &Vec<Vec<String>> {
&self.layout
}
}
impl AsMut<Vec<Vec<String>>> for PoolDisksLayout {
fn as_mut(&mut self) -> &mut Vec<Vec<String>> {
&mut self.layout
}
}
impl PoolDisksLayout {
@@ -20,12 +32,32 @@ impl PoolDisksLayout {
layout,
}
}
pub fn count(&self) -> usize {
self.layout.len()
}
pub fn as_cmd_line(&self) -> &str {
&self.cmd_line
}
}
#[derive(Deserialize, Debug, Default)]
pub struct DisksLayout {
pub legacy: bool,
pub pools: Vec<PoolDisksLayout>,
pools: Vec<PoolDisksLayout>,
}
impl AsRef<Vec<PoolDisksLayout>> for DisksLayout {
fn as_ref(&self) -> &Vec<PoolDisksLayout> {
&self.pools
}
}
impl AsMut<Vec<PoolDisksLayout>> for DisksLayout {
fn as_mut(&mut self) -> &mut Vec<PoolDisksLayout> {
&mut self.pools
}
}
impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
@@ -82,6 +114,14 @@ impl DisksLayout {
pub fn is_single_drive_layout(&self) -> bool {
self.pools.len() == 1 && self.pools[0].layout.len() == 1 && self.pools[0].layout[0].len() == 1
}
pub fn get_single_drive_layout(&self) -> &str {
&self.pools[0].layout[0][0]
}
pub fn get_layout(&self, idx: usize) -> Option<&PoolDisksLayout> {
self.pools.get(idx)
}
}
/// parses all ellipses input arguments, expands them into

View File

@@ -1,16 +1,12 @@
use super::disks_layout::{DisksLayout, PoolDisksLayout};
use super::disks_layout::DisksLayout;
use super::error::{Error, Result};
use super::utils::{
net::{is_local_host, is_socket_addr, split_host_port},
string::new_string_set,
};
use super::utils::net;
use path_absolutize::Absolutize;
use std::collections::HashSet;
use std::fmt::Display;
use std::{collections::HashMap, path::Path, usize};
use url::{ParseError, Url};
pub const DEFAULT_PORT: u16 = 9000;
/// enum for endpoint type.
#[derive(PartialEq, Eq)]
pub enum EndpointType {
@@ -19,26 +15,33 @@ pub enum EndpointType {
/// URL style endpoint type enum.
Url,
}
/// Unknown endpoint type enum.
UnKnow,
/// enum for setup type.
#[derive(Debug)]
pub enum SetupType {
/// FS setup type enum.
FS,
/// Erasure single drive setup enum.
ErasureSD,
/// Erasure setup type enum.
Erasure,
/// Distributed Erasure setup type enum.
DistErasure,
}
/// holds information about a node in this cluster
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Node {
pub url: url::Url,
pub pools: Vec<i32>,
pub pools: Vec<usize>,
pub is_local: bool,
pub grid_host: String, // TODO "scheme://host:port"
pub grid_host: String,
}
// impl PartialOrd for Node {
// fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
// self.grid_host.partial_cmp(&other.grid_host)
// }
// }
/// any type of endpoint.
#[derive(Debug, Clone)]
pub struct Endpoint {
@@ -66,6 +69,11 @@ impl TryFrom<&str> for Endpoint {
/// Performs the conversion.
fn try_from(value: &str) -> Result<Self, Self::Error> {
/// check whether given path is not empty.
fn is_empty_path(path: &str) -> bool {
["", "/", "\\"].iter().any(|&v| v.eq(path))
}
if is_empty_path(value) {
return Err(Error::from_string("empty or root endpoint is not supported"));
}
@@ -122,7 +130,7 @@ impl TryFrom<&str> for Endpoint {
// Only check if the arg is an ip address and ask for scheme since its absent.
// localhost, example.com, any FQDN cannot be disambiguated from a regular file path such as
// /mnt/export1. So we go ahead and start the rustfs server in FS modes in these cases.
if is_socket_addr(value) {
if net::is_socket_addr(value) {
return Err(Error::from_string("invalid URL endpoint format: missing scheme http or https"));
}
@@ -153,11 +161,6 @@ impl TryFrom<&str> for Endpoint {
}
}
/// check whether given path is not empty.
fn is_empty_path(path: &str) -> bool {
["", "/", "\\"].iter().any(|&v| v.eq(path))
}
impl Endpoint {
/// returns type of endpoint.
pub fn get_type(&self) -> EndpointType {
@@ -184,107 +187,198 @@ impl Endpoint {
}
/// resolves the host and updates if it is local or not.
fn update_is_local(&mut self) -> Result<()> {
if self.url.scheme() != "file" {
self.is_local = is_local_host(self.url.host().unwrap(), self.url.port().unwrap(), DEFAULT_PORT);
fn update_is_local(&mut self, local_port: u16) -> Result<()> {
match (self.url.scheme(), self.url.host()) {
(v, Some(host)) if v != "file" => {
self.is_local = net::is_local_host(host, self.url.port().unwrap_or_default(), local_port)?;
}
_ => {}
}
Ok(())
}
/// returns the host to be used for grid connections.
fn grid_host(&self) -> String {
let host = self.url.host_str().unwrap_or("");
let port = self.url.port().unwrap_or(0);
if port > 0 {
format!("{}://{}:{}", self.url.scheme(), host, port)
} else {
format!("{}://{}", self.url.scheme(), host)
match (self.url.host(), self.url.port()) {
(Some(host), Some(port)) => format!("{}://{}:{}", self.url.scheme(), host, port),
(Some(host), None) => format!("{}://{}", self.url.scheme(), host),
_ => String::new(),
}
}
}
#[derive(Debug, Clone)]
/// list of same type of endpoint.
#[derive(Debug, Default)]
pub struct Endpoints(Vec<Endpoint>);
impl Endpoints {
pub fn new() -> Self {
Self(Vec::new())
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn iter(&self) -> core::slice::Iter<Endpoint> {
self.0.iter()
}
pub fn iter_mut(&mut self) -> core::slice::IterMut<Endpoint> {
self.0.iter_mut()
}
pub fn slice(&self, start: usize, end: usize) -> Vec<Endpoint> {
self.0.as_slice()[start..end].to_vec()
}
pub fn from_args(args: Vec<String>) -> Result<Self> {
let mut ep_type = EndpointType::UnKnow;
let mut scheme = String::new();
let mut eps = Vec::new();
let mut uniq_args = new_string_set();
for (i, arg) in args.iter().enumerate() {
let endpoint = Endpoint::try_from(arg.as_str())?;
if i == 0 {
ep_type = endpoint.get_type();
scheme = endpoint.url.scheme().to_string();
} else if endpoint.get_type() != ep_type {
return Err(Error::from_string("不支持多种endpoints风格"));
} else if endpoint.url.scheme().to_string() != scheme {
return Err(Error::from_string("不支持多种scheme"));
}
let arg_str = endpoint.to_string();
if uniq_args.contains(arg_str.as_str()) {
return Err(Error::from_string("发现重复 endpoints"));
}
uniq_args.add(arg_str);
eps.push(endpoint.clone());
}
Ok(Endpoints(eps))
impl AsRef<Vec<Endpoint>> for Endpoints {
fn as_ref(&self) -> &Vec<Endpoint> {
&self.0
}
}
#[warn(dead_code)]
pub struct PoolEndpointList(Vec<Endpoints>);
impl AsMut<Vec<Endpoint>> for Endpoints {
fn as_mut(&mut self) -> &mut Vec<Endpoint> {
&mut self.0
}
}
impl PoolEndpointList {
fn from_vec(v: Vec<Endpoints>) -> Self {
impl From<Vec<Endpoint>> for Endpoints {
fn from(v: Vec<Endpoint>) -> Self {
Self(v)
}
}
pub fn push(&mut self, es: Endpoints) {
self.0.push(es)
}
impl TryFrom<&[String]> for Endpoints {
type Error = Error;
// TODO: 解析域名,判断哪个是本地地址
fn update_is_local(&mut self) -> Result<()> {
for eps in self.0.iter_mut() {
for ep in eps.iter_mut() {
// TODO:
ep.update_is_local()?
/// returns new endpoint list based on input args.
fn try_from(args: &[String]) -> Result<Self> {
let mut endpoint_type;
let mut schema;
let mut endpoints = Vec::with_capacity(args.len());
let mut uniq_set = HashSet::with_capacity(args.len());
// Loop through args and adds to endpoint list.
for (i, arg) in args.iter().enumerate() {
let endpoint = match Endpoint::try_from(arg.as_str()) {
Ok(ep) => ep,
Err(e) => return Err(Error::from_string(format!("'{}': {}", arg, e))),
};
// All endpoints have to be same type and scheme if applicable.
if i == 0 {
endpoint_type = endpoint.get_type();
schema = endpoint.url.scheme();
} else if endpoint.get_type() != endpoint_type {
return Err(Error::from_string("mixed style endpoints are not supported"));
} else if endpoint.url.scheme() != schema {
return Err(Error::from_string("mixed scheme is not supported"));
}
// Check for duplicate endpoints.
let endpoint_str = endpoint.to_string();
if uniq_set.contains(&endpoint_str) {
return Err(Error::from_string("duplicate endpoints found"));
}
uniq_set.insert(endpoint_str);
endpoints.push(endpoint);
}
Ok(())
Ok(Endpoints(endpoints))
}
}
// PoolEndpoints represent endpoints in a given pool
// along with its setCount and setDriveCount.
#[derive(Debug, Clone)]
/// a temporary type to holds the list of endpoints
pub struct PoolEndpointList {
inner: Vec<Endpoints>,
setup_type: SetupType,
}
impl AsRef<Vec<Endpoints>> for PoolEndpointList {
fn as_ref(&self) -> &Vec<Endpoints> {
&self.inner
}
}
impl AsMut<Vec<Endpoints>> for PoolEndpointList {
fn as_mut(&mut self) -> &mut Vec<Endpoints> {
&mut self.inner
}
}
impl PoolEndpointList {
/// creates a list of endpoints per pool, resolves their relevant
/// hostnames and discovers those are local or remote.
pub fn create_pool_endpoints(server_addr: &str, disks_layout: &DisksLayout) -> Result<Self> {
if disks_layout.is_empty_layout() {
return Err(Error::from_string("invalid number of endpoints"));
}
let server_addr = net::check_local_server_addr(server_addr)?;
// For single arg, return single drive EC setup.
if disks_layout.is_single_drive_layout() {
let mut endpoint = Endpoint::try_from(disks_layout.get_single_drive_layout())?;
endpoint.update_is_local(server_addr.port())?;
endpoint.set_pool_index(0);
endpoint.set_set_index(0);
endpoint.set_disk_index(0);
// TODO Check for cross device mounts if any.
return Ok(Self {
inner: vec![Endpoints::from(vec![endpoint])],
setup_type: SetupType::ErasureSD,
});
}
let mut pool_endpoints = Vec::<Endpoints>::with_capacity(disks_layout.as_ref().len());
for (pool_idx, pool) in disks_layout.as_ref().iter().enumerate() {
let mut endpoints = Endpoints::default();
for (set_idx, set_layout) in pool.as_ref().iter().enumerate() {
// Convert args to endpoints
let mut eps = Endpoints::try_from(set_layout.as_slice())?;
// TODO Check for cross device mounts if any.
for (disk_idx, ep) in eps.as_mut().iter_mut().enumerate() {
ep.set_pool_index(pool_idx);
ep.set_set_index(set_idx);
ep.set_disk_index(disk_idx);
}
endpoints.as_mut().append(eps.as_mut());
}
if endpoints.as_ref().is_empty() {
return Err(Error::from_string("invalid number of endpoints"));
}
pool_endpoints.push(endpoints);
}
// setup type
let mut unique_args = HashSet::new();
for pool in pool_endpoints.iter() {
for ep in pool.as_ref() {
if let Some(host) = ep.url.host_str() {
unique_args.insert(host);
} else {
unique_args.insert(format!("localhost:{}", server_addr.port()).as_str());
}
}
}
let setup_type = match pool_endpoints[0].as_ref()[0].get_type() {
EndpointType::Path => SetupType::Erasure,
EndpointType::Url => match unique_args.len() {
1 => SetupType::Erasure,
_ => SetupType::DistErasure,
},
};
let mut pool_endpoint_list = Self {
inner: pool_endpoints,
setup_type,
};
pool_endpoint_list.update_is_local()?;
Ok(pool_endpoint_list)
}
fn update_is_local(&mut self) -> Result<()> {
unimplemented!()
}
}
/// represent endpoints in a given pool
/// along with its setCount and setDriveCount.
#[derive(Debug)]
pub struct PoolEndpoints {
// indicates if endpoints are provided in non-ellipses style
pub legacy: bool,
@@ -299,314 +393,102 @@ pub struct PoolEndpoints {
#[derive(Debug)]
pub struct EndpointServerPools(Vec<PoolEndpoints>);
impl EndpointServerPools {
pub fn new() -> Self {
Self(Vec::new())
impl From<Vec<PoolEndpoints>> for EndpointServerPools {
fn from(v: Vec<PoolEndpoints>) -> Self {
Self(v)
}
}
// create_server_endpoints
pub fn create_server_endpoints(
server_addr: String,
pool_args: &Vec<PoolDisksLayout>,
legacy: bool,
) -> Result<(EndpointServerPools, SetupType)> {
if pool_args.is_empty() {
return Err(Error::from_string("无效参数"));
impl AsRef<Vec<PoolEndpoints>> for EndpointServerPools {
fn as_ref(&self) -> &Vec<PoolEndpoints> {
&self.0
}
}
impl AsMut<Vec<PoolEndpoints>> for EndpointServerPools {
fn as_mut(&mut self) -> &mut Vec<PoolEndpoints> {
&mut self.0
}
}
impl EndpointServerPools {
/// validates and creates new endpoints from input args, supports
/// both ellipses and without ellipses transparently.
pub fn create_server_endpoints(server_addr: &str, disks_layout: &DisksLayout) -> Result<(EndpointServerPools, SetupType)> {
if disks_layout.as_ref().is_empty() {
return Err(Error::from_string("Invalid arguments specified"));
}
let (pooleps, setup_type) = create_pool_endpoints(server_addr, pool_args)?;
let mut pool_eps = PoolEndpointList::create_pool_endpoints(server_addr, disks_layout)?;
let mut ret = EndpointServerPools::new();
let mut ret: EndpointServerPools = Vec::with_capacity(pool_eps.as_ref().len()).into();
for (i, eps) in pool_eps.as_mut().into_iter().enumerate() {
let layout = disks_layout.get_layout(i);
let set_count = layout.map_or(0, |v| v.count());
let drives_per_set = layout.map_or(0, |v| v.as_ref().get(0).map_or(0, |v| v.len()));
let cmd_line = layout.map_or(String::new(), |v| v.as_cmd_line().to_owned());
for (i, eps) in pooleps.iter().enumerate() {
let ep = PoolEndpoints {
legacy: legacy,
set_count: pool_args[i].layout.len(),
drives_per_set: pool_args[i].layout[0].len(),
endpoints: eps.clone(),
cmd_line: pool_args[i].cmd_line.clone(),
legacy: disks_layout.legacy,
set_count,
drives_per_set,
endpoints: *eps,
cmd_line,
platform: String::new(),
};
ret.add(ep)?;
ret.add(ep);
}
Ok((ret, setup_type))
}
pub fn first_is_local(&self) -> bool {
if self.0.is_empty() {
return false;
}
return self.0[0].endpoints.0[0].is_local;
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn iter(&self) -> core::slice::Iter<'_, PoolEndpoints> {
return self.0.iter();
}
pub fn push(&mut self, pes: PoolEndpoints) {
self.0.push(pes)
Ok((ret, pool_eps.setup_type))
}
/// add pool endpoints
pub fn add(&mut self, eps: PoolEndpoints) -> Result<()> {
let mut exits = new_string_set();
let mut exits = HashSet::new();
for peps in self.0.iter() {
for ep in peps.endpoints.0.iter() {
exits.add(ep.to_string());
for ep in peps.endpoints.as_ref() {
exits.insert(ep.to_string());
}
}
for ep in eps.endpoints.0.iter() {
for ep in eps.endpoints.as_ref() {
if exits.contains(&ep.to_string()) {
return Err(Error::from_string("endpoints exists"));
return Err(Error::from_string("duplicate endpoints found"));
}
}
self.0.push(eps);
Ok(())
}
/// returns a sorted list of nodes in this cluster
pub fn get_nodes(&self) -> Vec<Node> {
let mut node_map = HashMap::new();
for pool in self.iter() {
for ep in pool.endpoints.iter() {
let mut node = Node {
for pool in self.0.iter() {
for ep in pool.endpoints.as_ref() {
let host = ep.grid_host();
let n = node_map.entry(host).or_insert(Node {
url: ep.url.clone(),
pools: vec![],
is_local: ep.is_local,
grid_host: ep.grid_host(),
};
if !node.pools.contains(&ep.pool_idx) {
node.pools.push(ep.pool_idx)
}
grid_host: host,
});
node_map.insert(node.grid_host.clone(), node);
if let Some(pool_idx) = ep.pool_idx {
if !n.pools.contains(&pool_idx) {
n.pools.push(pool_idx);
}
}
}
}
let mut nodes: Vec<Node> = node_map.into_iter().map(|(_, n)| n).collect();
// nodes.sort_by(|a, b| a.cmp(b));
nodes.sort_by(|a, b| a.grid_host.cmp(&b.grid_host));
nodes
}
}
/// enum for setup type.
#[derive(Debug)]
pub enum SetupType {
/// starts with unknown setup type.
Unknown,
/// FS setup type enum.
FS,
/// Erasure single drive setup enum.
ErasureSD,
/// Erasure setup type enum.
Erasure,
/// Distributed Erasure setup type enum.
DistErasure,
}
fn is_empty_layout(pools_layout: &Vec<PoolDisksLayout>) -> bool {
if pools_layout.is_empty() {
return true;
}
let first_layout = &pools_layout[0];
if first_layout.layout.is_empty() || first_layout.layout[0].is_empty() || first_layout.layout[0][0].is_empty() {
return true;
}
false
}
// 检查是否是单驱动器布局
fn is_single_drive_layout(pools_layout: &Vec<PoolDisksLayout>) -> bool {
if pools_layout.len() == 1 && pools_layout[0].layout.len() == 1 && pools_layout[0].layout[0].len() == 1 {
true
} else {
false
}
}
pub fn create_pool_endpoints_v2(server_addr: &str, disks_layout: &DisksLayout) -> Result<(Vec<Endpoints>, SetupType)> {
if disks_layout.is_empty_layout() {
return Err(Error::from_string("invalid number of endpoints"));
}
if disks_layout.is_single_drive_layout() {}
unimplemented!()
}
pub fn create_pool_endpoints(server_addr: String, pools: &Vec<PoolDisksLayout>) -> Result<(Vec<Endpoints>, SetupType)> {
if is_empty_layout(pools) {
return Err(Error::from_string("empty layout"));
}
// TODO: CheckLocalServerAddr
if is_single_drive_layout(pools) {
let mut endpoint = Endpoint::new(pools[0].layout[0][0].as_str())?;
endpoint.update_islocal()?;
if endpoint.get_type() != EndpointType::Path {
return Err(Error::from_string("use path style endpoint for single node setup"));
}
endpoint.set_pool_index(0);
endpoint.set_set_index(0);
endpoint.set_disk_index(0);
let mut endpoints = Vec::new();
endpoints.push(endpoint);
// TODO: checkCrossDeviceMounts
return Ok((vec![Endpoints(endpoints)], SetupType::ErasureSD));
}
let mut ret = Vec::with_capacity(pools.len());
for (pool_idx, pool) in pools.iter().enumerate() {
let mut endpoints = Endpoints::new();
for (set_idx, set_layout) in pool.layout.iter().enumerate() {
let mut eps = Endpoints::from_args(set_layout.to_owned())?;
// TODO: checkCrossDeviceMounts
for (disk_idx, ep) in eps.0.iter_mut().enumerate() {
ep.set_pool_index(pool_idx);
ep.set_set_index(set_idx);
ep.set_disk_index(disk_idx);
endpoints.0.push(ep.to_owned());
}
}
if endpoints.0.is_empty() {
return Err(Error::from_string("invalid number of endpoints"));
}
ret.push(endpoints);
}
// TODO:
PoolEndpointList::from_vec(ret.clone()).update_is_local()?;
let mut setup_type = SetupType::Unknown;
// TODO: parse server port
let (_, server_port) = split_host_port(server_addr.as_str())?;
let mut uniq_host = new_string_set();
for (_i, eps) in ret.iter_mut().enumerate() {
// TODO: 一些验证参考原m
for ep in eps.0.iter() {
if !ep.url.has_host() {
uniq_host.add(format!("localhost:{}", server_port));
} else {
// uniq_host.add(ep.url.domain().)
}
}
}
let erasure_type = uniq_host.to_slice().len() == 1;
for eps in ret.iter() {
if eps.0[0].get_type() == EndpointType::Path {
setup_type = SetupType::Erasure;
break;
}
if eps.0[0].get_type() == EndpointType::Url {
if erasure_type {
setup_type = SetupType::Erasure;
} else {
setup_type = SetupType::DistErasure;
}
break;
}
}
Ok((ret, setup_type))
}
/// validates and creates new endpoints from input args, supports
/// both ellipses and without ellipses transparently.
// fn create_server_endpoints(server_addr: String, disks_layout: &DisksLayout) -> Result<(EndpointServerPools, SetupType)> {
// if disks_layout.is_empty() {
// return Err(Error::from_string("Invalid arguments specified"));
// }
// let (pooleps, setup_type) = create_pool_endpoints(server_addr, &disks_layout.pools)?;
// let mut ret = EndpointServerPools::new();
// for (i, eps) in pooleps.iter().enumerate() {
// let ep = PoolEndpoints {
// legacy: disks_layout.legacy,
// set_count: pool_args[i].layout.len(),
// drives_per_set: pool_args[i].layout[0].len(),
// endpoints: eps.clone(),
// cmd_line: pool_args[i].cmd_line.clone(),
// platform: String::new(),
// };
// ret.add(ep)?;
// }
// Ok((ret, setup_type))
// }
#[cfg(test)]
mod test {
use crate::disks_layout::DisksLayout;
use super::*;
#[test]
fn test_url() {
let path = "/dir/sss";
let u = url::Url::parse(path);
println!("{:#?}", u)
}
#[test]
fn test_new_endpont() {
let arg = "./data";
let ep = Endpoint::new(arg).unwrap();
println!("{:?}", ep);
}
// #[test]
// fn test_create_server_endpoints() {
// let cases = vec![(":9000", vec!["http://localhost:900{1...2}/export{1...64}".to_string()])];
// for (addr, args) in cases {
// let layouts = DisksLayout::try_from(args.as_slice()).unwrap();
// println!("layouts:{:?},{}", &layouts.pools, &layouts.legacy);
// let (server_pool, setup_type) = create_server_endpoints(addr.to_string(), &layouts.pools, layouts.legacy).unwrap();
// println!("setup_type -- {:?}", setup_type);
// println!("server_pool == {:?}", server_pool);
// }
// // create_server_endpoints(server_addr, pool_args, legacy)
// }
}

View File

@@ -1,7 +1,7 @@
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::StdError;
use std::fmt::Display;
use std::panic::Location;
use tracing::error;
@@ -44,6 +44,12 @@ impl From<Error> for S3Error {
}
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.source.fmt(f)
}
}
#[inline]
#[track_caller]
pub(crate) fn log(source: &dyn std::error::Error) {

View File

@@ -27,7 +27,7 @@ impl S3PeerSys {
pub fn new(eps: &EndpointServerPools, local_disks: Vec<DiskStore>) -> Self {
Self {
clients: Self::new_clients(eps, local_disks),
pools_count: eps.len(),
pools_count: eps.as_ref().len(),
}
}
@@ -37,15 +37,11 @@ impl S3PeerSys {
.iter()
.map(|e| {
if e.is_local {
let cli: Box<dyn PeerS3Client> = Box::new(LocalPeerS3Client::new(
local_disks.clone(),
e.clone(),
e.pools.clone(),
));
let cli: Box<dyn PeerS3Client> =
Box::new(LocalPeerS3Client::new(local_disks.clone(), e.clone(), e.pools.clone()));
Arc::new(cli)
} else {
let cli: Box<dyn PeerS3Client> =
Box::new(RemotePeerS3Client::new(e.clone(), e.pools.clone()));
let cli: Box<dyn PeerS3Client> = Box::new(RemotePeerS3Client::new(e.clone(), e.pools.clone()));
Arc::new(cli)
}
})

View File

@@ -33,7 +33,7 @@ impl ECStore {
let mut deployment_id = None;
let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address, &layouts.pools, layouts.legacy)?;
let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address.as_str(), &layouts)?;
let mut pools = Vec::with_capacity(endpoint_pools.len());
let mut disk_map = HashMap::with_capacity(endpoint_pools.len());

View File

@@ -1,17 +1,43 @@
use crate::error::{Error, Result};
use std::{
collections::HashSet,
net::{IpAddr, SocketAddr, ToSocketAddrs},
};
use anyhow::{Error, Result};
use netif;
use url::Host;
// helper for validating if the provided arg is an ip address.
/// helper for validating if the provided arg is an ip address.
pub fn is_socket_addr(host: &str) -> bool {
host.parse::<SocketAddr>().is_ok() || host.parse::<IpAddr>().is_ok()
}
/// checks if server_addr is valid and local host.
pub fn check_local_server_addr(server_addr: &str) -> Result<SocketAddr> {
let addr: Vec<SocketAddr> = match server_addr.to_socket_addrs() {
Ok(addr) => addr.collect(),
Err(err) => return Err(Error::new(Box::new(err))),
};
// 0.0.0.0 is a wildcard address and refers to local network
// addresses. I.e, 0.0.0.0:9000 like ":9000" refers to port
// 9000 on localhost.
for a in addr {
if a.ip().is_unspecified() {
return Ok(a);
}
let host = match a {
SocketAddr::V4(a) => Host::<&str>::Ipv4(*a.ip()),
SocketAddr::V6(a) => Host::Ipv6(*a.ip()),
};
if is_local_host(host, 0, 0).is_ok() {
return Ok(a);
}
}
return Err(Error::from_string("host in server address should be this server"));
}
pub fn split_host_port(s: &str) -> Result<(String, u16)> {
let parts: Vec<&str> = s.split(':').collect();
if parts.len() == 2 {
@@ -19,12 +45,12 @@ pub fn split_host_port(s: &str) -> Result<(String, u16)> {
return Ok((parts[0].to_string(), port));
}
}
Err(Error::msg("Invalid address format or port number"))
Err(Error::from_string("Invalid address format or port number"))
}
/// checks if the given parameter correspond to one of
/// the local IP of the current machine
pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool {
pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> Result<bool> {
let local_ips = must_get_local_ips();
let local_set: HashSet<IpAddr> = local_ips.into_iter().collect();
@@ -32,7 +58,7 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool {
Host::Domain(domain) => {
let ips = match (domain, 0).to_socket_addrs().map(|v| v.map(|v| v.ip()).collect::<Vec<_>>()) {
Ok(ips) => ips,
Err(_) => return false,
Err(err) => return Err(Error::new(Box::new(err))),
};
ips.iter().any(|ip| local_set.contains(ip))
@@ -42,14 +68,14 @@ pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool {
};
if port > 0 {
return is_local_host && port == local_port;
return Ok(is_local_host && port == local_port);
}
is_local_host
Ok(is_local_host)
}
/// returns IPs of local interface
pub fn must_get_local_ips() -> Vec<IpAddr> {
fn must_get_local_ips() -> Vec<IpAddr> {
match netif::up() {
Ok(up) => up.map(|x| x.address().to_owned()).collect(),
Err(_) => vec![],
@@ -58,10 +84,28 @@ pub fn must_get_local_ips() -> Vec<IpAddr> {
#[cfg(test)]
mod test {
use std::net::{Ipv4Addr, Ipv6Addr};
use std::net::Ipv4Addr;
use super::*;
#[test]
fn test_is_socket_addr() {
let test_cases = [
("localhost", false),
("localhost:9000", false),
("example.com", false),
("http://192.168.1.0", false),
("http://192.168.1.0:9000", false),
("192.168.1.0", true),
("[2001:db8::1]:9000", true),
];
for (addr, expected) in test_cases {
let ret = is_socket_addr(addr);
assert_eq!(expected, ret, "addr: {}, expected: {}, got: {}", addr, expected, ret);
}
}
#[test]
fn test_must_get_local_ips() {
let local_ips = must_get_local_ips();
@@ -69,23 +113,4 @@ mod test {
assert!(local_set.contains(&IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))));
}
#[test]
fn test_is_local_host() {
let host = Host::Domain("localhost");
let is = is_local_host(host, 0, 9000);
assert!(is);
let host = Host::Ipv6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1));
let is = is_local_host(host, 0, 9000);
assert!(is);
let host = Host::Ipv4(Ipv4Addr::new(8, 8, 8, 8));
let is = is_local_host(host, 0, 9000);
assert!(!is);
let host = Host::Ipv4(Ipv4Addr::new(127, 0, 0, 1));
let is = is_local_host(host, 8000, 9000);
assert!(!is);
}
}