mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
merge erasure
This commit is contained in:
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -321,6 +321,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"netif",
|
||||
"path-absolutize",
|
||||
"path-clean",
|
||||
"reed-solomon-erasure",
|
||||
"regex",
|
||||
"rmp-serde",
|
||||
@@ -885,6 +886,12 @@ dependencies = [
|
||||
"path-dedot",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "path-clean"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "17359afc20d7ab31fdb42bb844c8b3bb1dabd7dcf7e68428492da7f16966fcef"
|
||||
|
||||
[[package]]
|
||||
name = "path-dedot"
|
||||
version = "3.1.1"
|
||||
|
||||
@@ -17,6 +17,8 @@ async-trait.workspace = true
|
||||
tracing.workspace = true
|
||||
serde.workspace = true
|
||||
anyhow.workspace = true
|
||||
time.workspace = true
|
||||
serde_json.workspace = true
|
||||
url = "2.5.2"
|
||||
uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] }
|
||||
reed-solomon-erasure = "6.0.0"
|
||||
@@ -25,9 +27,7 @@ lazy_static = "1.5.0"
|
||||
regex = "1.10.5"
|
||||
netif = "0.1.6"
|
||||
tracing-error = "0.2.0"
|
||||
serde_json.workspace = true
|
||||
path-absolutize = "3.1.1"
|
||||
time.workspace = true
|
||||
rmp-serde = "1.3.0"
|
||||
tokio-util = { version = "0.7.11", features = ["io"] }
|
||||
s3s = "0.10.0"
|
||||
@@ -36,7 +36,7 @@ siphasher = "1.0.1"
|
||||
base64-simd = "0.8.0"
|
||||
sha2 = "0.10.8"
|
||||
hex-simd = "0.8.0"
|
||||
|
||||
path-clean = "1.0.1"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
|
||||
|
||||
@@ -54,14 +54,14 @@ pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result<DiskStore> {
|
||||
}
|
||||
|
||||
pub async fn init_disks(eps: &Endpoints, opt: &DiskOption) -> (Vec<Option<DiskStore>>, Vec<Option<Error>>) {
|
||||
let mut futures = Vec::with_capacity(eps.len());
|
||||
let mut futures = Vec::with_capacity(eps.as_ref().len());
|
||||
|
||||
for ep in eps.iter() {
|
||||
for ep in eps.as_ref().iter() {
|
||||
futures.push(new_disk(ep, opt));
|
||||
}
|
||||
|
||||
let mut res = Vec::with_capacity(eps.len());
|
||||
let mut errors = Vec::with_capacity(eps.len());
|
||||
let mut res = Vec::with_capacity(eps.as_ref().len());
|
||||
let mut errors = Vec::with_capacity(eps.as_ref().len());
|
||||
|
||||
let results = join_all(futures).await;
|
||||
for result in results {
|
||||
@@ -119,7 +119,7 @@ impl LocalDisk {
|
||||
let fm = FormatV3::try_from(s)?;
|
||||
let (set_idx, disk_idx) = fm.find_disk_index_by_disk_id(fm.erasure.this)?;
|
||||
|
||||
if set_idx as i32 != ep.set_idx || disk_idx as i32 != ep.disk_idx {
|
||||
if Some(set_idx) != ep.set_idx || Some(disk_idx) != ep.disk_idx {
|
||||
return Err(Error::new(DiskError::InconsistentDisk));
|
||||
}
|
||||
|
||||
@@ -685,7 +685,7 @@ mod test {
|
||||
let p = "./testv";
|
||||
fs::create_dir_all(&p).await.unwrap();
|
||||
|
||||
let ep = match Endpoint::new(&p) {
|
||||
let ep = match Endpoint::try_from(p) {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
println!("{e}");
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use super::ellipses::*;
|
||||
use anyhow::{Error, Result};
|
||||
use super::error::{Error, Result};
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashSet;
|
||||
|
||||
@@ -9,8 +9,20 @@ const SET_SIZES: [usize; 15] = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
|
||||
|
||||
#[derive(Deserialize, Debug, Default)]
|
||||
pub struct PoolDisksLayout {
|
||||
pub cmd_line: String,
|
||||
pub layout: Vec<Vec<String>>,
|
||||
cmd_line: String,
|
||||
layout: Vec<Vec<String>>,
|
||||
}
|
||||
|
||||
impl AsRef<Vec<Vec<String>>> for PoolDisksLayout {
|
||||
fn as_ref(&self) -> &Vec<Vec<String>> {
|
||||
&self.layout
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<Vec<Vec<String>>> for PoolDisksLayout {
|
||||
fn as_mut(&mut self) -> &mut Vec<Vec<String>> {
|
||||
&mut self.layout
|
||||
}
|
||||
}
|
||||
|
||||
impl PoolDisksLayout {
|
||||
@@ -20,12 +32,32 @@ impl PoolDisksLayout {
|
||||
layout,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn count(&self) -> usize {
|
||||
self.layout.len()
|
||||
}
|
||||
|
||||
pub fn get_cmd_line(&self) -> &str {
|
||||
&self.cmd_line
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Default)]
|
||||
pub struct DisksLayout {
|
||||
pub legacy: bool,
|
||||
pub pools: Vec<PoolDisksLayout>,
|
||||
pools: Vec<PoolDisksLayout>,
|
||||
}
|
||||
|
||||
impl AsRef<Vec<PoolDisksLayout>> for DisksLayout {
|
||||
fn as_ref(&self) -> &Vec<PoolDisksLayout> {
|
||||
&self.pools
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<Vec<PoolDisksLayout>> for DisksLayout {
|
||||
fn as_mut(&mut self) -> &mut Vec<PoolDisksLayout> {
|
||||
&mut self.pools
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
|
||||
@@ -33,7 +65,7 @@ impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
|
||||
|
||||
fn try_from(args: &[T]) -> Result<Self, Self::Error> {
|
||||
if args.is_empty() {
|
||||
return Err(Error::msg("Invalid argument"));
|
||||
return Err(Error::from_string("Invalid argument"));
|
||||
}
|
||||
|
||||
let is_ellipses = args.iter().any(|v| has_ellipses(&[v]));
|
||||
@@ -54,7 +86,9 @@ impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
|
||||
let mut layout = Vec::with_capacity(args.len());
|
||||
for arg in args.iter() {
|
||||
if !has_ellipses(&[arg]) && args.len() > 1 {
|
||||
return Err(Error::msg("all args must have ellipses for pool expansion (Invalid arguments specified)"));
|
||||
return Err(Error::from_string(
|
||||
"all args must have ellipses for pool expansion (Invalid arguments specified)",
|
||||
));
|
||||
}
|
||||
|
||||
let set_args = get_all_sets(is_ellipses, &[arg])?;
|
||||
@@ -69,6 +103,27 @@ impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
|
||||
}
|
||||
}
|
||||
|
||||
impl DisksLayout {
|
||||
pub fn is_empty_layout(&self) -> bool {
|
||||
self.pools.is_empty()
|
||||
|| self.pools[0].layout.is_empty()
|
||||
|| self.pools[0].layout[0].is_empty()
|
||||
|| self.pools[0].layout[0][0].is_empty()
|
||||
}
|
||||
|
||||
pub fn is_single_drive_layout(&self) -> bool {
|
||||
self.pools.len() == 1 && self.pools[0].layout.len() == 1 && self.pools[0].layout[0].len() == 1
|
||||
}
|
||||
|
||||
pub fn get_single_drive_layout(&self) -> &str {
|
||||
&self.pools[0].layout[0][0]
|
||||
}
|
||||
|
||||
pub fn get_layout(&self, idx: usize) -> Option<&PoolDisksLayout> {
|
||||
self.pools.get(idx)
|
||||
}
|
||||
}
|
||||
|
||||
/// parses all ellipses input arguments, expands them into
|
||||
/// corresponding list of endpoints chunked evenly in accordance with a
|
||||
/// specific set size.
|
||||
@@ -95,7 +150,7 @@ fn get_all_sets<T: AsRef<str>>(is_ellipses: bool, args: &[T]) -> Result<Vec<Vec<
|
||||
for args in set_args.iter() {
|
||||
for arg in args {
|
||||
if unique_args.contains(arg) {
|
||||
return Err(Error::msg(format!("Input args {} has duplicate ellipses", arg)));
|
||||
return Err(Error::from_string(format!("Input args {} has duplicate ellipses", arg)));
|
||||
}
|
||||
unique_args.insert(arg);
|
||||
}
|
||||
@@ -228,11 +283,11 @@ fn possible_set_counts_with_symmetry(set_counts: &[usize], arg_patterns: &[ArgPa
|
||||
for &ss in set_counts {
|
||||
let mut symmetry = false;
|
||||
for arg_pattern in arg_patterns {
|
||||
for p in arg_pattern.inner.iter() {
|
||||
if p.seq.len() > ss {
|
||||
symmetry = (p.seq.len() % ss) == 0;
|
||||
for p in arg_pattern.as_ref().iter() {
|
||||
if p.len() > ss {
|
||||
symmetry = (p.len() % ss) == 0;
|
||||
} else {
|
||||
symmetry = (ss % p.seq.len()) == 0;
|
||||
symmetry = (ss % p.len()) == 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -254,20 +309,20 @@ fn possible_set_counts_with_symmetry(set_counts: &[usize], arg_patterns: &[ArgPa
|
||||
/// indexes (total sets)
|
||||
fn get_set_indexes<T: AsRef<str>>(args: &[T], total_sizes: &[usize], arg_patterns: &[ArgPattern]) -> Result<Vec<Vec<usize>>> {
|
||||
if args.is_empty() || total_sizes.is_empty() {
|
||||
return Err(Error::msg("Invalid argument"));
|
||||
return Err(Error::from_string("Invalid argument"));
|
||||
}
|
||||
|
||||
for &size in total_sizes {
|
||||
// Check if total_sizes has minimum range upto set_size
|
||||
if size < SET_SIZES[0] {
|
||||
return Err(Error::msg(format!("Incorrect number of endpoints provided, size {}", size)));
|
||||
return Err(Error::from_string(format!("Incorrect number of endpoints provided, size {}", size)));
|
||||
}
|
||||
}
|
||||
|
||||
let common_size = get_divisible_size(total_sizes);
|
||||
let mut set_counts = possible_set_counts(common_size);
|
||||
if set_counts.is_empty() {
|
||||
return Err(Error::msg(format!(
|
||||
return Err(Error::from_string(format!(
|
||||
"Incorrect number of endpoints provided, number of drives {} is not divisible by any supported erasure set sizes {}",
|
||||
common_size, 0
|
||||
)));
|
||||
@@ -278,13 +333,13 @@ fn get_set_indexes<T: AsRef<str>>(args: &[T], total_sizes: &[usize], arg_pattern
|
||||
// Returns possible set counts with symmetry.
|
||||
set_counts = possible_set_counts_with_symmetry(&set_counts, arg_patterns);
|
||||
if set_counts.is_empty() {
|
||||
return Err(Error::msg("No symmetric distribution detected with input endpoints provided"));
|
||||
return Err(Error::from_string("No symmetric distribution detected with input endpoints provided"));
|
||||
}
|
||||
|
||||
// Final set size with all the symmetry accounted for.
|
||||
let set_size = common_set_drive_count(common_size, &set_counts);
|
||||
if !is_valid_set_size(set_size) {
|
||||
return Err(Error::msg("Incorrect number of endpoints provided3"));
|
||||
return Err(Error::from_string("Incorrect number of endpoints provided3"));
|
||||
}
|
||||
|
||||
Ok(total_sizes
|
||||
@@ -465,6 +520,13 @@ mod test {
|
||||
]],
|
||||
success: true,
|
||||
},
|
||||
TestCase {
|
||||
num: 15,
|
||||
args: vec!["https://node{1...3}.example.net/mnt/drive{1...8}"],
|
||||
total_sizes: vec![24],
|
||||
indexes: vec![vec![12, 12]],
|
||||
success: true,
|
||||
},
|
||||
];
|
||||
|
||||
for test_case in test_cases {
|
||||
@@ -475,7 +537,7 @@ mod test {
|
||||
arg_patterns.push(patterns);
|
||||
}
|
||||
Err(err) => {
|
||||
panic!("Test{}: Unexpected failure {}", test_case.num, err);
|
||||
panic!("Test{}: Unexpected failure {:?}", test_case.num, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -494,7 +556,7 @@ mod test {
|
||||
}
|
||||
Err(err) => {
|
||||
if test_case.success {
|
||||
panic!("Test{}: Expected success but failed instead {}", test_case.num, err);
|
||||
panic!("Test{}: Expected success but failed instead {:?}", test_case.num, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -754,7 +816,7 @@ mod test {
|
||||
}
|
||||
Err(err) => {
|
||||
if test_case.success {
|
||||
panic!("Test{}: Expected success but failed instead {}", test_case.num, err);
|
||||
panic!("Test{}: Expected success but failed instead {:?}", test_case.num, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use lazy_static::*;
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
use super::error::{Error, Result};
|
||||
use regex::Regex;
|
||||
|
||||
lazy_static! {
|
||||
@@ -16,9 +16,9 @@ const ELLIPSES: &str = "...";
|
||||
/// associated prefix and suffixes.
|
||||
#[derive(Debug, Default, PartialEq, Eq)]
|
||||
pub struct Pattern {
|
||||
pub prefix: String,
|
||||
pub suffix: String,
|
||||
pub seq: Vec<String>,
|
||||
pub(crate) prefix: String,
|
||||
pub(crate) suffix: String,
|
||||
pub(crate) seq: Vec<String>,
|
||||
}
|
||||
|
||||
impl Pattern {
|
||||
@@ -37,12 +37,28 @@ impl Pattern {
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.seq.len()
|
||||
}
|
||||
}
|
||||
|
||||
/// contains a list of patterns provided in the input.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct ArgPattern {
|
||||
pub inner: Vec<Pattern>,
|
||||
inner: Vec<Pattern>,
|
||||
}
|
||||
|
||||
impl AsRef<Vec<Pattern>> for ArgPattern {
|
||||
fn as_ref(&self) -> &Vec<Pattern> {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<Vec<Pattern>> for ArgPattern {
|
||||
fn as_mut(&mut self) -> &mut Vec<Pattern> {
|
||||
&mut self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl ArgPattern {
|
||||
@@ -88,7 +104,7 @@ pub fn find_ellipses_patterns(arg: &str) -> Result<ArgPattern> {
|
||||
let mut parts = match ELLIPSES_RE.captures(arg) {
|
||||
Some(caps) => caps,
|
||||
None => {
|
||||
return Err(Error::msg(format!("Invalid ellipsis format in ({}), Ellipsis range must be provided in format {{N...M}} where N and M are positive integers, M must be greater than N, with an allowed minimum range of 4", arg)));
|
||||
return Err(Error::from_string(format!("Invalid ellipsis format in ({}), Ellipsis range must be provided in format {{N...M}} where N and M are positive integers, M must be greater than N, with an allowed minimum range of 4", arg)));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -125,7 +141,7 @@ pub fn find_ellipses_patterns(arg: &str) -> Result<ArgPattern> {
|
||||
|| p.suffix.contains(OPEN_BRACES)
|
||||
|| p.suffix.contains(CLOSE_BRACES)
|
||||
{
|
||||
return Err(Error::msg(format!("Invalid ellipsis format in ({}), Ellipsis range must be provided in format {{N...M}} where N and M are positive integers, M must be greater than N, with an allowed minimum range of 4", arg)));
|
||||
return Err(Error::from_string(format!("Invalid ellipsis format in ({}), Ellipsis range must be provided in format {{N...M}} where N and M are positive integers, M must be greater than N, with an allowed minimum range of 4", arg)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,10 +162,10 @@ pub fn has_ellipses<T: AsRef<str>>(s: &[T]) -> bool {
|
||||
/// {33...64}
|
||||
pub fn parse_ellipses_range(pattern: &str) -> Result<Vec<String>> {
|
||||
if !pattern.contains(OPEN_BRACES) {
|
||||
return Err(Error::msg("Invalid argument"));
|
||||
return Err(Error::from_string("Invalid argument"));
|
||||
}
|
||||
if !pattern.contains(OPEN_BRACES) {
|
||||
return Err(Error::msg("Invalid argument"));
|
||||
return Err(Error::from_string("Invalid argument"));
|
||||
}
|
||||
|
||||
let ellipses_range: Vec<&str> = pattern
|
||||
@@ -159,7 +175,7 @@ pub fn parse_ellipses_range(pattern: &str) -> Result<Vec<String>> {
|
||||
.collect();
|
||||
|
||||
if ellipses_range.len() != 2 {
|
||||
return Err(Error::msg("Invalid argument"));
|
||||
return Err(Error::from_string("Invalid argument"));
|
||||
}
|
||||
|
||||
// TODO: Add support for hexadecimals.
|
||||
@@ -167,7 +183,7 @@ pub fn parse_ellipses_range(pattern: &str) -> Result<Vec<String>> {
|
||||
let end = ellipses_range[1].parse::<usize>()?;
|
||||
|
||||
if start > end {
|
||||
return Err(Error::msg("Invalid argument:range start cannot be bigger than end"));
|
||||
return Err(Error::from_string("Invalid argument:range start cannot be bigger than end"));
|
||||
}
|
||||
|
||||
let mut ret: Vec<String> = Vec::with_capacity(end - start + 1);
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,7 +1,7 @@
|
||||
use s3s::S3Error;
|
||||
use s3s::S3ErrorCode;
|
||||
use s3s::StdError;
|
||||
|
||||
use std::fmt::Display;
|
||||
use std::panic::Location;
|
||||
|
||||
use tracing::error;
|
||||
@@ -44,6 +44,12 @@ impl From<Error> for S3Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.source.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[track_caller]
|
||||
pub(crate) fn log(source: &dyn std::error::Error) {
|
||||
|
||||
@@ -17,7 +17,7 @@ type Client = Arc<Box<dyn PeerS3Client>>;
|
||||
pub trait PeerS3Client: Debug + Sync + Send + 'static {
|
||||
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()>;
|
||||
async fn get_bucket_info(&self, bucket: &str, opts: &BucketOptions) -> Result<BucketInfo>;
|
||||
fn get_pools(&self) -> Vec<i32>;
|
||||
fn get_pools(&self) -> Vec<usize>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -30,7 +30,7 @@ impl S3PeerSys {
|
||||
pub fn new(eps: &EndpointServerPools, local_disks: Vec<DiskStore>) -> Self {
|
||||
Self {
|
||||
clients: Self::new_clients(eps, local_disks),
|
||||
pools_count: eps.len(),
|
||||
pools_count: eps.as_ref().len(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ impl S3PeerSys {
|
||||
|
||||
#[async_trait]
|
||||
impl PeerS3Client for S3PeerSys {
|
||||
fn get_pools(&self) -> Vec<i32> {
|
||||
fn get_pools(&self) -> Vec<usize> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
|
||||
@@ -83,7 +83,7 @@ impl PeerS3Client for S3PeerSys {
|
||||
let mut per_pool_errs = Vec::with_capacity(self.clients.len());
|
||||
for (j, cli) in self.clients.iter().enumerate() {
|
||||
let pools = cli.get_pools();
|
||||
let idx = i as i32;
|
||||
let idx = i;
|
||||
if pools.contains(&idx) {
|
||||
per_pool_errs.push(errors[j].as_ref());
|
||||
}
|
||||
@@ -123,7 +123,7 @@ impl PeerS3Client for S3PeerSys {
|
||||
let mut per_pool_errs = Vec::with_capacity(self.clients.len());
|
||||
for (j, cli) in self.clients.iter().enumerate() {
|
||||
let pools = cli.get_pools();
|
||||
let idx = i as i32;
|
||||
let idx = i;
|
||||
if pools.contains(&idx) {
|
||||
per_pool_errs.push(errors[j].as_ref());
|
||||
}
|
||||
@@ -142,11 +142,11 @@ impl PeerS3Client for S3PeerSys {
|
||||
pub struct LocalPeerS3Client {
|
||||
pub local_disks: Vec<DiskStore>,
|
||||
pub node: Node,
|
||||
pub pools: Vec<i32>,
|
||||
pub pools: Vec<usize>,
|
||||
}
|
||||
|
||||
impl LocalPeerS3Client {
|
||||
fn new(local_disks: Vec<DiskStore>, node: Node, pools: Vec<i32>) -> Self {
|
||||
fn new(local_disks: Vec<DiskStore>, node: Node, pools: Vec<usize>) -> Self {
|
||||
Self {
|
||||
local_disks,
|
||||
node,
|
||||
@@ -157,7 +157,7 @@ impl LocalPeerS3Client {
|
||||
|
||||
#[async_trait]
|
||||
impl PeerS3Client for LocalPeerS3Client {
|
||||
fn get_pools(&self) -> Vec<i32> {
|
||||
fn get_pools(&self) -> Vec<usize> {
|
||||
self.pools.clone()
|
||||
}
|
||||
async fn make_bucket(&self, bucket: &str, opts: &MakeBucketOptions) -> Result<()> {
|
||||
@@ -234,18 +234,18 @@ impl PeerS3Client for LocalPeerS3Client {
|
||||
#[derive(Debug)]
|
||||
pub struct RemotePeerS3Client {
|
||||
pub node: Node,
|
||||
pub pools: Vec<i32>,
|
||||
pub pools: Vec<usize>,
|
||||
}
|
||||
|
||||
impl RemotePeerS3Client {
|
||||
fn new(node: Node, pools: Vec<i32>) -> Self {
|
||||
fn new(node: Node, pools: Vec<usize>) -> Self {
|
||||
Self { node, pools }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PeerS3Client for RemotePeerS3Client {
|
||||
fn get_pools(&self) -> Vec<i32> {
|
||||
fn get_pools(&self) -> Vec<usize> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn make_bucket(&self, _bucket: &str, _opts: &MakeBucketOptions) -> Result<()> {
|
||||
|
||||
@@ -32,20 +32,21 @@ pub struct ECStore {
|
||||
|
||||
impl ECStore {
|
||||
pub async fn new(address: String, endpoints: Vec<String>) -> Result<Self> {
|
||||
let layouts = DisksLayout::try_from(endpoints.as_slice())?;
|
||||
let layouts = DisksLayout::try_from(endpoints.as_slice()).map_err(|v| Error::msg(v))?;
|
||||
|
||||
let mut deployment_id = None;
|
||||
|
||||
let (endpoint_pools, _) = EndpointServerPools::create_server_endpoints(address, &layouts.pools, layouts.legacy)?;
|
||||
let (endpoint_pools, _) =
|
||||
EndpointServerPools::create_server_endpoints(address.as_str(), &layouts).map_err(|v| Error::msg(v))?;
|
||||
|
||||
let mut pools = Vec::with_capacity(endpoint_pools.len());
|
||||
let mut disk_map = HashMap::with_capacity(endpoint_pools.len());
|
||||
let mut pools = Vec::with_capacity(endpoint_pools.as_ref().len());
|
||||
let mut disk_map = HashMap::with_capacity(endpoint_pools.as_ref().len());
|
||||
|
||||
let first_is_local = endpoint_pools.first_is_local();
|
||||
let first_is_local = endpoint_pools.first_local();
|
||||
|
||||
let mut local_disks = Vec::new();
|
||||
|
||||
for (i, pool_eps) in endpoint_pools.iter().enumerate() {
|
||||
for (i, pool_eps) in endpoint_pools.as_ref().iter().enumerate() {
|
||||
// TODO: read from config parseStorageClass
|
||||
let partiy_count = store_init::default_partiy_count(pool_eps.drives_per_set);
|
||||
|
||||
|
||||
@@ -1,70 +1,103 @@
|
||||
use crate::error::{Error, Result};
|
||||
use lazy_static::lazy_static;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::{IpAddr, ToSocketAddrs},
|
||||
collections::HashSet,
|
||||
net::{IpAddr, SocketAddr, ToSocketAddrs},
|
||||
};
|
||||
|
||||
use anyhow::{Error, Result};
|
||||
use netif;
|
||||
use url::Host;
|
||||
|
||||
pub fn split_host_port(s: &str) -> Result<(String, u16)> {
|
||||
let parts: Vec<&str> = s.split(':').collect();
|
||||
if parts.len() == 2 {
|
||||
if let Ok(port) = parts[1].parse::<u16>() {
|
||||
return Ok((parts[0].to_string(), port));
|
||||
}
|
||||
}
|
||||
Err(Error::msg("Invalid address format or port number"))
|
||||
lazy_static! {
|
||||
static ref LOCAL_IPS: Vec<IpAddr> = must_get_local_ips().unwrap();
|
||||
}
|
||||
|
||||
// is_local_host 判断是否是本地ip
|
||||
pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool {
|
||||
let local_ips = must_get_local_ips();
|
||||
/// helper for validating if the provided arg is an ip address.
|
||||
pub fn is_socket_addr(addr: &str) -> bool {
|
||||
// TODO IPv6 zone information?
|
||||
|
||||
let local_map =
|
||||
local_ips
|
||||
.iter()
|
||||
.map(|ip| ip.to_string())
|
||||
.fold(HashMap::new(), |mut acc, item| {
|
||||
*acc.entry(item).or_insert(true) = true;
|
||||
acc
|
||||
});
|
||||
addr.parse::<SocketAddr>().is_ok() || addr.parse::<IpAddr>().is_ok()
|
||||
}
|
||||
|
||||
/// checks if server_addr is valid and local host.
|
||||
pub fn check_local_server_addr(server_addr: &str) -> Result<SocketAddr> {
|
||||
let addr: Vec<SocketAddr> = match server_addr.to_socket_addrs() {
|
||||
Ok(addr) => addr.collect(),
|
||||
Err(err) => return Err(Error::new(Box::new(err))),
|
||||
};
|
||||
|
||||
// 0.0.0.0 is a wildcard address and refers to local network
|
||||
// addresses. I.e, 0.0.0.0:9000 like ":9000" refers to port
|
||||
// 9000 on localhost.
|
||||
for a in addr {
|
||||
if a.ip().is_unspecified() {
|
||||
return Ok(a);
|
||||
}
|
||||
|
||||
let host = match a {
|
||||
SocketAddr::V4(a) => Host::<&str>::Ipv4(*a.ip()),
|
||||
SocketAddr::V6(a) => Host::Ipv6(*a.ip()),
|
||||
};
|
||||
|
||||
if is_local_host(host, 0, 0)? {
|
||||
return Ok(a);
|
||||
}
|
||||
}
|
||||
|
||||
Err(Error::from_string("host in server address should be this server"))
|
||||
}
|
||||
|
||||
/// checks if the given parameter correspond to one of
|
||||
/// the local IP of the current machine
|
||||
pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> Result<bool> {
|
||||
let local_set: HashSet<IpAddr> = LOCAL_IPS.iter().copied().collect();
|
||||
let is_local_host = match host {
|
||||
Host::Domain(domain) => {
|
||||
let ips: Vec<String> = (domain, 0)
|
||||
.to_socket_addrs()
|
||||
.unwrap_or(Vec::new().into_iter())
|
||||
.map(|addr| addr.ip().to_string())
|
||||
.collect();
|
||||
let ips = match (domain, 0).to_socket_addrs().map(|v| v.map(|v| v.ip()).collect::<Vec<_>>()) {
|
||||
Ok(ips) => ips,
|
||||
Err(err) => return Err(Error::new(Box::new(err))),
|
||||
};
|
||||
|
||||
let mut isok = false;
|
||||
for ip in ips.iter() {
|
||||
if local_map.contains_key(ip) {
|
||||
isok = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
isok
|
||||
ips.iter().any(|ip| local_set.contains(ip))
|
||||
}
|
||||
Host::Ipv4(ip) => local_map.contains_key(&ip.to_string()),
|
||||
Host::Ipv6(ip) => local_map.contains_key(&ip.to_string()),
|
||||
Host::Ipv4(ip) => local_set.contains(&IpAddr::V4(ip)),
|
||||
Host::Ipv6(ip) => local_set.contains(&IpAddr::V6(ip)),
|
||||
};
|
||||
|
||||
if port > 0 {
|
||||
return is_local_host && port == local_port;
|
||||
return Ok(is_local_host && port == local_port);
|
||||
}
|
||||
|
||||
is_local_host
|
||||
Ok(is_local_host)
|
||||
}
|
||||
|
||||
pub fn must_get_local_ips() -> Vec<IpAddr> {
|
||||
let mut v: Vec<IpAddr> = Vec::new();
|
||||
if let Some(up) = netif::up().ok() {
|
||||
v = up.map(|x| x.address().to_owned()).collect();
|
||||
/// returns IP address of given host.
|
||||
pub fn get_host_ip(host: Host<&str>) -> Result<HashSet<IpAddr>> {
|
||||
match host {
|
||||
Host::Domain(domain) => match (domain, 0)
|
||||
.to_socket_addrs()
|
||||
.map(|v| v.map(|v| v.ip()).collect::<HashSet<_>>())
|
||||
{
|
||||
Ok(ips) => Ok(ips),
|
||||
Err(err) => Err(Error::new(Box::new(err))),
|
||||
},
|
||||
Host::Ipv4(ip) => {
|
||||
let mut set = HashSet::with_capacity(1);
|
||||
set.insert(IpAddr::V4(ip));
|
||||
Ok(set)
|
||||
}
|
||||
Host::Ipv6(ip) => {
|
||||
let mut set = HashSet::with_capacity(1);
|
||||
set.insert(IpAddr::V6(ip));
|
||||
Ok(set)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
v
|
||||
/// returns IPs of local interface
|
||||
pub(crate) fn must_get_local_ips() -> Result<Vec<IpAddr>> {
|
||||
match netif::up() {
|
||||
Ok(up) => Ok(up.map(|x| x.address().to_owned()).collect()),
|
||||
Err(err) => Err(Error::from_string(format!("Unable to get IP addresses of this host: {}", err))),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -74,21 +107,55 @@ mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_must_get_local_ips() {
|
||||
let ips = must_get_local_ips();
|
||||
for ip in ips.iter() {
|
||||
println!("{:?}", ip)
|
||||
fn test_is_socket_addr() {
|
||||
let test_cases = [
|
||||
("localhost", false),
|
||||
("localhost:9000", false),
|
||||
("example.com", false),
|
||||
("http://192.168.1.0", false),
|
||||
("http://192.168.1.0:9000", false),
|
||||
("192.168.1.0", true),
|
||||
("[2001:db8::1]:9000", true),
|
||||
];
|
||||
|
||||
for (addr, expected) in test_cases {
|
||||
let ret = is_socket_addr(addr);
|
||||
assert_eq!(expected, ret, "addr: {}, expected: {}, got: {}", addr, expected, ret);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_local_host() {
|
||||
// let host = Host::Ipv4(Ipv4Addr::new(192, 168, 0, 233));
|
||||
let host = Host::Ipv4(Ipv4Addr::new(127, 0, 0, 1));
|
||||
// let host = Host::Domain("localhost");
|
||||
let port = 0;
|
||||
let local_port = 9000;
|
||||
let is = is_local_host(host, port, local_port);
|
||||
assert!(is)
|
||||
fn test_check_local_server_addr() {
|
||||
let test_cases = [
|
||||
(":54321", Ok(())),
|
||||
("localhost:54321", Ok(())),
|
||||
("0.0.0.0:9000", Ok(())),
|
||||
(":0", Ok(())),
|
||||
("localhost", Err(Error::from_string("invalid socket address"))),
|
||||
("", Err(Error::from_string("invalid socket address"))),
|
||||
(
|
||||
"example.org:54321",
|
||||
Err(Error::from_string("host in server address should be this server")),
|
||||
),
|
||||
(":-10", Err(Error::from_string("invalid port value"))),
|
||||
];
|
||||
|
||||
for test_case in test_cases {
|
||||
let ret = check_local_server_addr(test_case.0);
|
||||
if test_case.1.is_ok() && ret.is_err() {
|
||||
panic!("{}: error: expected = <nil>, got = {:?}", test_case.0, ret);
|
||||
}
|
||||
if test_case.1.is_err() && ret.is_ok() {
|
||||
panic!("{}: error: expected = {:?}, got = <nil>", test_case.0, test_case.1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_must_get_local_ips() {
|
||||
let local_ips = must_get_local_ips().unwrap();
|
||||
let local_set: HashSet<IpAddr> = local_ips.into_iter().collect();
|
||||
|
||||
assert!(local_set.contains(&IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user