Merge branch 'endpoint' into erasure

This commit is contained in:
shiro.lee
2024-07-04 18:37:13 +08:00
8 changed files with 559 additions and 611 deletions

View File

@@ -54,14 +54,14 @@ pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result<DiskStore> {
}
pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec<Option<DiskStore>>, Vec<Option<Error>>) {
let mut futures = Vec::with_capacity(eps.len());
let mut futures = Vec::with_capacity(eps.as_ref().len());
for ep in eps.iter() {
for ep in eps.as_ref().iter() {
futures.push(new_disk(ep, opt));
}
let mut res = Vec::with_capacity(eps.len());
let mut errors = Vec::with_capacity(eps.len());
let mut res = Vec::with_capacity(eps.as_ref().len());
let mut errors = Vec::with_capacity(eps.as_ref().len());
let results = join_all(futures).await;
for result in results {
@@ -119,7 +119,7 @@ impl LocalDisk {
let fm = FormatV3::try_from(s)?;
let (set_idx, disk_idx) = fm.find_disk_index_by_disk_id(fm.erasure.this)?;
if set_idx as i32 != ep.set_idx || disk_idx as i32 != ep.disk_idx {
if Some(set_idx) != ep.set_idx || Some(disk_idx) != ep.disk_idx {
return Err(Error::new(DiskError::InconsistentDisk));
}
@@ -606,7 +606,7 @@ mod test {
let p = "./testv";
fs::create_dir_all(&p).await.unwrap();
let ep = match Endpoint::new(&p) {
let ep = match Endpoint::try_from(p) {
Ok(e) => e,
Err(e) => {
println!("{e}");

View File

@@ -1,5 +1,5 @@
use super::ellipses::*;
use anyhow::{Error, Result};
use super::error::{Error, Result};
use serde::Deserialize;
use std::collections::HashSet;
@@ -9,8 +9,20 @@ const SET_SIZES: [usize; 15] = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
#[derive(Deserialize, Debug, Default)]
pub struct PoolDisksLayout {
pub cmd_line: String,
pub layout: Vec<Vec<String>>,
cmd_line: String,
layout: Vec<Vec<String>>,
}
impl AsRef<Vec<Vec<String>>> for PoolDisksLayout {
fn as_ref(&self) -> &Vec<Vec<String>> {
&self.layout
}
}
impl AsMut<Vec<Vec<String>>> for PoolDisksLayout {
fn as_mut(&mut self) -> &mut Vec<Vec<String>> {
&mut self.layout
}
}
impl PoolDisksLayout {
@@ -20,12 +32,32 @@ impl PoolDisksLayout {
layout,
}
}
pub fn count(&self) -> usize {
self.layout.len()
}
pub fn as_cmd_line(&self) -> &str {
&self.cmd_line
}
}
#[derive(Deserialize, Debug, Default)]
pub struct DisksLayout {
pub legacy: bool,
pub pools: Vec<PoolDisksLayout>,
pools: Vec<PoolDisksLayout>,
}
impl AsRef<Vec<PoolDisksLayout>> for DisksLayout {
fn as_ref(&self) -> &Vec<PoolDisksLayout> {
&self.pools
}
}
impl AsMut<Vec<PoolDisksLayout>> for DisksLayout {
fn as_mut(&mut self) -> &mut Vec<PoolDisksLayout> {
&mut self.pools
}
}
impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
@@ -33,7 +65,7 @@ impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
fn try_from(args: &[T]) -> Result<Self, Self::Error> {
if args.is_empty() {
return Err(Error::msg("Invalid argument"));
return Err(Error::from_string("Invalid argument"));
}
let is_ellipses = args.iter().any(|v| has_ellipses(&[v]));
@@ -54,7 +86,9 @@ impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
let mut layout = Vec::with_capacity(args.len());
for arg in args.iter() {
if !has_ellipses(&[arg]) && args.len() > 1 {
return Err(Error::msg("all args must have ellipses for pool expansion (Invalid arguments specified)"));
return Err(Error::from_string(
"all args must have ellipses for pool expansion (Invalid arguments specified)",
));
}
let set_args = get_all_sets(is_ellipses, &[arg])?;
@@ -69,6 +103,27 @@ impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
}
}
impl DisksLayout {
pub fn is_empty_layout(&self) -> bool {
self.pools.is_empty()
|| self.pools[0].layout.is_empty()
|| self.pools[0].layout[0].is_empty()
|| self.pools[0].layout[0][0].is_empty()
}
pub fn is_single_drive_layout(&self) -> bool {
self.pools.len() == 1 && self.pools[0].layout.len() == 1 && self.pools[0].layout[0].len() == 1
}
pub fn get_single_drive_layout(&self) -> &str {
&self.pools[0].layout[0][0]
}
pub fn get_layout(&self, idx: usize) -> Option<&PoolDisksLayout> {
self.pools.get(idx)
}
}
/// parses all ellipses input arguments, expands them into
/// corresponding list of endpoints chunked evenly in accordance with a
/// specific set size.
@@ -95,7 +150,7 @@ fn get_all_sets<T: AsRef<str>>(is_ellipses: bool, args: &[T]) -> Result<Vec<Vec<
for args in set_args.iter() {
for arg in args {
if unique_args.contains(arg) {
return Err(Error::msg(format!("Input args {} has duplicate ellipses", arg)));
return Err(Error::from_string(format!("Input args {} has duplicate ellipses", arg)));
}
unique_args.insert(arg);
}
@@ -254,20 +309,20 @@ fn possible_set_counts_with_symmetry(set_counts: &[usize], arg_patterns: &[ArgPa
/// indexes (total sets)
fn get_set_indexes<T: AsRef<str>>(args: &[T], total_sizes: &[usize], arg_patterns: &[ArgPattern]) -> Result<Vec<Vec<usize>>> {
if args.is_empty() || total_sizes.is_empty() {
return Err(Error::msg("Invalid argument"));
return Err(Error::from_string("Invalid argument"));
}
for &size in total_sizes {
// Check if total_sizes has minimum range upto set_size
if size < SET_SIZES[0] {
return Err(Error::msg(format!("Incorrect number of endpoints provided, size {}", size)));
return Err(Error::from_string(format!("Incorrect number of endpoints provided, size {}", size)));
}
}
let common_size = get_divisible_size(total_sizes);
let mut set_counts = possible_set_counts(common_size);
if set_counts.is_empty() {
return Err(Error::msg(format!(
return Err(Error::from_string(format!(
"Incorrect number of endpoints provided, number of drives {} is not divisible by any supported erasure set sizes {}",
common_size, 0
)));
@@ -278,13 +333,13 @@ fn get_set_indexes<T: AsRef<str>>(args: &[T], total_sizes: &[usize], arg_pattern
// Returns possible set counts with symmetry.
set_counts = possible_set_counts_with_symmetry(&set_counts, arg_patterns);
if set_counts.is_empty() {
return Err(Error::msg("No symmetric distribution detected with input endpoints provided"));
return Err(Error::from_string("No symmetric distribution detected with input endpoints provided"));
}
// Final set size with all the symmetry accounted for.
let set_size = common_set_drive_count(common_size, &set_counts);
if !is_valid_set_size(set_size) {
return Err(Error::msg("Incorrect number of endpoints provided3"));
return Err(Error::from_string("Incorrect number of endpoints provided3"));
}
Ok(total_sizes
@@ -475,7 +530,7 @@ mod test {
arg_patterns.push(patterns);
}
Err(err) => {
panic!("Test{}: Unexpected failure {}", test_case.num, err);
panic!("Test{}: Unexpected failure {:?}", test_case.num, err);
}
}
}
@@ -494,7 +549,7 @@ mod test {
}
Err(err) => {
if test_case.success {
panic!("Test{}: Expected success but failed instead {}", test_case.num, err);
panic!("Test{}: Expected success but failed instead {:?}", test_case.num, err);
}
}
}
@@ -754,7 +809,7 @@ mod test {
}
Err(err) => {
if test_case.success {
panic!("Test{}: Expected success but failed instead {}", test_case.num, err);
panic!("Test{}: Expected success but failed instead {:?}", test_case.num, err);
}
}
}

View File

@@ -1,6 +1,6 @@
use lazy_static::*;
use anyhow::{Error, Result};
use super::error::{Error, Result};
use regex::Regex;
lazy_static! {
@@ -88,7 +88,7 @@ pub fn find_ellipses_patterns(arg: &str) -> Result<ArgPattern> {
let mut parts = match ELLIPSES_RE.captures(arg) {
Some(caps) => caps,
None => {
return Err(Error::msg(format!("Invalid ellipsis format in ({}), Ellipsis range must be provided in format {{N...M}} where N and M are positive integers, M must be greater than N, with an allowed minimum range of 4", arg)));
return Err(Error::from_string(format!("Invalid ellipsis format in ({}), Ellipsis range must be provided in format {{N...M}} where N and M are positive integers, M must be greater than N, with an allowed minimum range of 4", arg)));
}
};
@@ -125,7 +125,7 @@ pub fn find_ellipses_patterns(arg: &str) -> Result<ArgPattern> {
|| p.suffix.contains(OPEN_BRACES)
|| p.suffix.contains(CLOSE_BRACES)
{
return Err(Error::msg(format!("Invalid ellipsis format in ({}), Ellipsis range must be provided in format {{N...M}} where N and M are positive integers, M must be greater than N, with an allowed minimum range of 4", arg)));
return Err(Error::from_string(format!("Invalid ellipsis format in ({}), Ellipsis range must be provided in format {{N...M}} where N and M are positive integers, M must be greater than N, with an allowed minimum range of 4", arg)));
}
}
@@ -146,10 +146,10 @@ pub fn has_ellipses<T: AsRef<str>>(s: &[T]) -> bool {
/// {33...64}
pub fn parse_ellipses_range(pattern: &str) -> Result<Vec<String>> {
if !pattern.contains(OPEN_BRACES) {
return Err(Error::msg("Invalid argument"));
return Err(Error::from_string("Invalid argument"));
}
if !pattern.contains(OPEN_BRACES) {
return Err(Error::msg("Invalid argument"));
return Err(Error::from_string("Invalid argument"));
}
let ellipses_range: Vec<&str> = pattern
@@ -159,7 +159,7 @@ pub fn parse_ellipses_range(pattern: &str) -> Result<Vec<String>> {
.collect();
if ellipses_range.len() != 2 {
return Err(Error::msg("Invalid argument"));
return Err(Error::from_string("Invalid argument"));
}
// TODO: Add support for hexadecimals.
@@ -167,7 +167,7 @@ pub fn parse_ellipses_range(pattern: &str) -> Result<Vec<String>> {
let end = ellipses_range[1].parse::<usize>()?;
if start > end {
return Err(Error::msg("Invalid argument:range start cannot be bigger than end"));
return Err(Error::from_string("Invalid argument:range start cannot be bigger than end"));
}
let mut ret: Vec<String> = Vec::with_capacity(end - start + 1);

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,7 @@
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::StdError;
use std::fmt::Display;
use std::panic::Location;
use tracing::error;
@@ -44,6 +44,12 @@ impl From<Error> for S3Error {
}
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.source.fmt(f)
}
}
#[inline]
#[track_caller]
pub(crate) fn log(source: &dyn std::error::Error) {

View File

@@ -27,7 +27,7 @@ impl S3PeerSys {
pub fn new(eps: &EndpointServerPools, local_disks: Vec<DiskStore>) -> Self {
Self {
clients: Self::new_clients(eps, local_disks),
pools_count: eps.len(),
pools_count: eps.as_ref().len(),
}
}
@@ -37,15 +37,11 @@ impl S3PeerSys {
.iter()
.map(|e| {
if e.is_local {
let cli: Box<dyn PeerS3Client> = Box::new(LocalPeerS3Client::new(
local_disks.clone(),
e.clone(),
e.pools.clone(),
));
let cli: Box<dyn PeerS3Client> =
Box::new(LocalPeerS3Client::new(local_disks.clone(), e.clone(), e.pools.clone()));
Arc::new(cli)
} else {
let cli: Box<dyn PeerS3Client> =
Box::new(RemotePeerS3Client::new(e.clone(), e.pools.clone()));
let cli: Box<dyn PeerS3Client> = Box::new(RemotePeerS3Client::new(e.clone(), e.pools.clone()));
Arc::new(cli)
}
})

View File

@@ -33,7 +33,7 @@ impl ECStore {
let mut deployment_id = None;
let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address, &layouts.pools, layouts.legacy)?;
let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address.as_str(), &layouts)?;
let mut pools = Vec::with_capacity(endpoint_pools.len());
let mut disk_map = HashMap::with_capacity(endpoint_pools.len());

View File

@@ -1,12 +1,43 @@
use crate::error::{Error, Result};
use std::{
collections::HashMap,
net::{IpAddr, ToSocketAddrs},
collections::HashSet,
net::{IpAddr, SocketAddr, ToSocketAddrs},
};
use anyhow::{Error, Result};
use netif;
use url::Host;
/// helper for validating if the provided arg is an ip address.
pub fn is_socket_addr(host: &str) -> bool {
host.parse::<SocketAddr>().is_ok() || host.parse::<IpAddr>().is_ok()
}
/// checks if server_addr is valid and local host.
pub fn check_local_server_addr(server_addr: &str) -> Result<SocketAddr> {
let addr: Vec<SocketAddr> = match server_addr.to_socket_addrs() {
Ok(addr) => addr.collect(),
Err(err) => return Err(Error::new(Box::new(err))),
};
// 0.0.0.0 is a wildcard address and refers to local network
// addresses. I.e, 0.0.0.0:9000 like ":9000" refers to port
// 9000 on localhost.
for a in addr {
if a.ip().is_unspecified() {
return Ok(a);
}
let host = match a {
SocketAddr::V4(a) => Host::<&str>::Ipv4(*a.ip()),
SocketAddr::V6(a) => Host::Ipv6(*a.ip()),
};
if is_local_host(host, 0, 0).is_ok() {
return Ok(a);
}
}
return Err(Error::from_string("host in server address should be this server"));
}
pub fn split_host_port(s: &str) -> Result<(String, u16)> {
let parts: Vec<&str> = s.split(':').collect();
if parts.len() == 2 {
@@ -14,57 +45,41 @@ pub fn split_host_port(s: &str) -> Result<(String, u16)> {
return Ok((parts[0].to_string(), port));
}
}
Err(Error::msg("Invalid address format or port number"))
Err(Error::from_string("Invalid address format or port number"))
}
// is_local_host 判断是否是本地ip
pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool {
/// checks if the given parameter correspond to one of
/// the local IP of the current machine
pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> Result<bool> {
let local_ips = must_get_local_ips();
let local_map =
local_ips
.iter()
.map(|ip| ip.to_string())
.fold(HashMap::new(), |mut acc, item| {
*acc.entry(item).or_insert(true) = true;
acc
});
let local_set: HashSet<IpAddr> = local_ips.into_iter().collect();
let is_local_host = match host {
Host::Domain(domain) => {
let ips: Vec<String> = (domain, 0)
.to_socket_addrs()
.unwrap_or(Vec::new().into_iter())
.map(|addr| addr.ip().to_string())
.collect();
let ips = match (domain, 0).to_socket_addrs().map(|v| v.map(|v| v.ip()).collect::<Vec<_>>()) {
Ok(ips) => ips,
Err(err) => return Err(Error::new(Box::new(err))),
};
let mut isok = false;
for ip in ips.iter() {
if local_map.contains_key(ip) {
isok = true;
break;
}
}
isok
ips.iter().any(|ip| local_set.contains(ip))
}
Host::Ipv4(ip) => local_map.contains_key(&ip.to_string()),
Host::Ipv6(ip) => local_map.contains_key(&ip.to_string()),
Host::Ipv4(ip) => local_set.contains(&IpAddr::V4(ip)),
Host::Ipv6(ip) => local_set.contains(&IpAddr::V6(ip)),
};
if port > 0 {
return is_local_host && port == local_port;
return Ok(is_local_host && port == local_port);
}
is_local_host
Ok(is_local_host)
}
pub fn must_get_local_ips() -> Vec<IpAddr> {
let mut v: Vec<IpAddr> = Vec::new();
if let Some(up) = netif::up().ok() {
v = up.map(|x| x.address().to_owned()).collect();
/// returns IPs of local interface
fn must_get_local_ips() -> Vec<IpAddr> {
match netif::up() {
Ok(up) => up.map(|x| x.address().to_owned()).collect(),
Err(_) => vec![],
}
v
}
#[cfg(test)]
@@ -74,21 +89,28 @@ mod test {
use super::*;
#[test]
fn test_must_get_local_ips() {
let ips = must_get_local_ips();
for ip in ips.iter() {
println!("{:?}", ip)
fn test_is_socket_addr() {
let test_cases = [
("localhost", false),
("localhost:9000", false),
("example.com", false),
("http://192.168.1.0", false),
("http://192.168.1.0:9000", false),
("192.168.1.0", true),
("[2001:db8::1]:9000", true),
];
for (addr, expected) in test_cases {
let ret = is_socket_addr(addr);
assert_eq!(expected, ret, "addr: {}, expected: {}, got: {}", addr, expected, ret);
}
}
#[test]
fn test_is_local_host() {
// let host = Host::Ipv4(Ipv4Addr::new(192, 168, 0, 233));
let host = Host::Ipv4(Ipv4Addr::new(127, 0, 0, 1));
// let host = Host::Domain("localhost");
let port = 0;
let local_port = 9000;
let is = is_local_host(host, port, local_port);
assert!(is)
fn test_must_get_local_ips() {
let local_ips = must_get_local_ips();
let local_set: HashSet<IpAddr> = local_ips.into_iter().collect();
assert!(local_set.contains(&IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))));
}
}