diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index 792dfd48..8b7a5d17 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -9,7 +9,7 @@ rust-version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio.workspace = true +tokio = { workspace = true, features = ["io-util"] } bytes.workspace = true thiserror.workspace = true futures.workspace = true diff --git a/ecstore/src/disks_layout.rs b/ecstore/src/disks_layout.rs index 40809bf2..fd2c3e91 100644 --- a/ecstore/src/disks_layout.rs +++ b/ecstore/src/disks_layout.rs @@ -1,111 +1,111 @@ -use std::collections::{HashMap, HashSet}; - +use super::ellipses::*; use anyhow::{Error, Result}; use serde::Deserialize; +use std::collections::HashSet; -use super::ellipses::*; +/// Supported set sizes this is used to find the optimal +/// single set size. +const SET_SIZES: [usize; 15] = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; #[derive(Deserialize, Debug, Default)] pub struct PoolDisksLayout { - pub cmdline: String, + pub cmd_line: String, pub layout: Vec>, } +impl PoolDisksLayout { + pub fn new(args: impl Into, layout: Vec>) -> Self { + PoolDisksLayout { + cmd_line: args.into(), + layout, + } + } +} + #[derive(Deserialize, Debug, Default)] pub struct DisksLayout { pub legacy: bool, pub pools: Vec, } -impl DisksLayout { - pub fn new(args: &Vec) -> Result { +impl> TryFrom<&[T]> for DisksLayout { + type Error = Error; + + fn try_from(args: &[T]) -> Result { if args.is_empty() { return Err(Error::msg("Invalid argument")); } - let mut ok = true; - for arg in args.iter() { - ok = ok && !has_ellipses(&vec![arg.to_string()]) - } + let is_ellipses = args.iter().any(|v| has_ellipses(&[v])); - // TODO: from env - let set_drive_count: usize = 0; - - if ok { - let set_args = get_all_sets(set_drive_count, &args)?; + // None of the args have ellipses use the old style. + if !is_ellipses { + let set_args = get_all_sets(is_ellipses, args)?; return Ok(DisksLayout { legacy: true, - pools: vec![PoolDisksLayout { - layout: set_args, - cmdline: args.join(" "), - }], + pools: vec![PoolDisksLayout::new( + args.iter().map(AsRef::as_ref).collect::>().join(" "), + set_args, + )], }); } - let mut ret = DisksLayout { - pools: Vec::new(), - ..Default::default() - }; - + let mut layout = Vec::with_capacity(args.len()); for arg in args.iter() { - let varg = vec![arg.to_string()]; - - if !has_ellipses(&varg) && args.len() > 1 { - return Err(Error::msg("所有参数必须包含省略号以用于池扩展")); + if !has_ellipses(&[arg]) && args.len() > 1 { + return Err(Error::msg("all args must have ellipses for pool expansion (Invalid arguments specified)")); } - let set_args = get_all_sets(set_drive_count, &varg)?; + let set_args = get_all_sets(is_ellipses, &[arg])?; - ret.pools.push(PoolDisksLayout { - layout: set_args, - cmdline: arg.clone(), - }) + layout.push(PoolDisksLayout::new(arg.as_ref(), set_args)); } - Ok(ret) + Ok(DisksLayout { + legacy: false, + pools: layout, + }) } } -fn get_all_sets(set_drive_count: usize, args: &Vec) -> Result>> { - let set_args; - if !has_ellipses(args) { - let set_indexes: Vec>; - if args.len() > 1 { - let totalsizes = vec![args.len()]; - set_indexes = get_set_indexes(args, &totalsizes, set_drive_count, &Vec::new())?; - } else { - set_indexes = vec![vec![args.len()]]; - } - - let mut s = EndpointSet { - endpoints: args.clone(), - set_indexes, - ..Default::default() - }; - - set_args = s.get(); +/// parses all ellipses input arguments, expands them into +/// corresponding list of endpoints chunked evenly in accordance with a +/// specific set size. +/// +/// For example: {1...64} is divided into 4 sets each of size 16. +/// This applies to even distributed setup syntax as well. +fn get_all_sets>(is_ellipses: bool, args: &[T]) -> Result>> { + let endpoint_set = if is_ellipses { + EndpointSet::try_from(args)? } else { - let mut s = EndpointSet::new(args, set_drive_count)?; - set_args = s.get(); - } + let set_indexes = if args.len() > 1 { + get_set_indexes(args, &[args.len()], &[])? + } else { + vec![vec![args.len()]] + }; + let endpoints = args.iter().map(|v| v.as_ref().to_string()).collect(); - let mut seen = HashSet::with_capacity(set_args.len()); + EndpointSet::new(endpoints, set_indexes) + }; + + let set_args = endpoint_set.get(); + + let mut unique_args = HashSet::with_capacity(set_args.len()); for args in set_args.iter() { for arg in args { - if seen.contains(arg) { - return Err(Error::msg(format!( - "Input args {} has duplicate ellipses", - arg - ))); + if unique_args.contains(arg) { + return Err(Error::msg(format!("Input args {} has duplicate ellipses", arg))); } - seen.insert(arg); + unique_args.insert(arg); } } Ok(set_args) } +/// represents parsed ellipses values, also provides +/// methods to get the sets of endpoints. #[derive(Debug, Default)] pub struct EndpointSet { pub arg_patterns: Vec, @@ -113,90 +113,74 @@ pub struct EndpointSet { pub set_indexes: Vec>, } -impl EndpointSet { - pub fn new(args: &Vec, set_div_count: usize) -> Result { +impl> TryFrom<&[T]> for EndpointSet { + type Error = Error; + + fn try_from(args: &[T]) -> Result { let mut arg_patterns = Vec::with_capacity(args.len()); - for arg in args.iter() { - arg_patterns.push(find_ellipses_patterns(arg.as_str())?); + for arg in args { + arg_patterns.push(find_ellipses_patterns(arg.as_ref())?); } - let totalsizes = get_total_sizes(&arg_patterns); - let set_indexes = get_set_indexes(args, &totalsizes, set_div_count, &arg_patterns)?; - - Ok(EndpointSet { - set_indexes, - arg_patterns, - ..Default::default() - }) - } - - pub fn get(&mut self) -> Vec> { - let mut sets: Vec> = Vec::new(); - let eps = self.get_endpoints(); - - let mut start = 0; - for sidx in self.set_indexes.iter() { - for idx in sidx { - let end = idx + start; - sets.push(eps[start..end].to_vec()); - start = end; - } - } - sets - } - - fn get_endpoints(&mut self) -> Vec { - if !self.endpoints.is_empty() { - return self.endpoints.clone(); - } + let total_sizes = get_total_sizes(&arg_patterns); + let set_indexes = get_set_indexes(args, &total_sizes, &arg_patterns)?; let mut endpoints = Vec::new(); - for ap in self.arg_patterns.iter() { + for ap in arg_patterns.iter() { let aps = ap.expand(); for bs in aps { endpoints.push(bs.join("")); } } - self.endpoints = endpoints; - - self.endpoints.clone() + Ok(EndpointSet { + set_indexes, + arg_patterns, + endpoints, + }) } } -// fn parse_endpoint_set(set_div_count: usize, args: &Vec) -> Result { -// let mut arg_patterns = Vec::with_capacity(args.len()); -// for arg in args.iter() { -// arg_patterns.push(find_ellipses_patterns(arg.as_str())?); -// } - -// let totalsizes = get_total_sizes(&arg_patterns); -// let set_indexes = get_set_indexes(args, &totalsizes, set_div_count, &arg_patterns)?; - -// Ok(EndpointSet { -// set_indexes, -// arg_patterns, -// ..Default::default() -// }) -// } - -static SET_SIZES: [usize; 15] = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; - -fn gcd(mut x: usize, mut y: usize) -> usize { - while y != 0 { - let t = y; - y = x % y; - x = t; +impl EndpointSet { + /// Create a new EndpointSet with the given endpoints and set indexes. + pub fn new(endpoints: Vec, set_indexes: Vec>) -> Self { + Self { + endpoints, + set_indexes, + ..Default::default() + } + } + + /// returns the sets representation of the endpoints + /// this function also intelligently decides on what will + /// be the right set size etc. + pub fn get(&self) -> Vec> { + let mut sets: Vec> = Vec::new(); + + let mut start = 0; + for set_idx in self.set_indexes.iter() { + for idx in set_idx { + let end = idx + start; + sets.push(self.endpoints[start..end].to_vec()); + start = end; + } + } + sets } - x } -fn get_divisible_size(totalsizes: &Vec) -> usize { - let mut ret = totalsizes[0]; - for s in totalsizes.iter() { - ret = gcd(ret, *s) +/// returns a greatest common divisor of all the ellipses sizes. +fn get_divisible_size(total_sizes: &[usize]) -> usize { + fn gcd(mut x: usize, mut y: usize) -> usize { + while y != 0 { + // be equivalent to: x, y = y, x%y + std::mem::swap(&mut x, &mut y); + y %= x; + } + x } - ret + + total_sizes.iter().skip(1).fold(total_sizes[0], |acc, &y| gcd(acc, y)) } fn possible_set_counts(set_size: usize) -> Vec { @@ -209,19 +193,21 @@ fn possible_set_counts(set_size: usize) -> Vec { ss } +/// checks whether given count is a valid set size for erasure coding. fn is_valid_set_size(count: usize) -> bool { - &count >= SET_SIZES.first().unwrap() && &count <= SET_SIZES.last().unwrap() + count >= SET_SIZES[0] && count <= SET_SIZES[SET_SIZES.len() - 1] } -fn common_set_drive_count(divisible_size: usize, set_counts: Vec) -> usize { +/// Final set size with all the symmetry accounted for. +fn common_set_drive_count(divisible_size: usize, set_counts: &[usize]) -> usize { // prefers set_counts to be sorted for optimal behavior. - if &divisible_size < set_counts.last().unwrap_or(&0) { + if divisible_size < set_counts[set_counts.len() - 1] { return divisible_size; } let mut prev_d = divisible_size / set_counts[0]; let mut set_size = 0; - for cnt in set_counts { + for &cnt in set_counts { if divisible_size % cnt == 0 { let d = divisible_size / cnt; if d <= prev_d { @@ -233,123 +219,121 @@ fn common_set_drive_count(divisible_size: usize, set_counts: Vec) -> usiz set_size } -fn possible_set_counts_with_symmetry( - set_counts: Vec, - arg_patterns: &Vec, -) -> Vec { - let mut new_set_counts: HashMap = HashMap::new(); +/// returns symmetrical setCounts based on the input argument patterns, +/// the symmetry calculation is to ensure that we also use uniform number +/// of drives common across all ellipses patterns. +fn possible_set_counts_with_symmetry(set_counts: &[usize], arg_patterns: &[ArgPattern]) -> Vec { + let mut new_set_counts: HashSet = HashSet::new(); - for ss in set_counts { + for &ss in set_counts { let mut symmetry = false; for arg_pattern in arg_patterns { for p in arg_pattern.inner.iter() { if p.seq.len() > ss { - symmetry = p.seq.len() % ss == 0; + symmetry = (p.seq.len() % ss) == 0; } else { - symmetry = ss % p.seq.len() == 0; + symmetry = (ss % p.seq.len()) == 0; } } } - if !new_set_counts.contains_key(&ss) && (symmetry || arg_patterns.is_empty()) { - new_set_counts.insert(ss, ()); + if !new_set_counts.contains(&ss) && (symmetry || arg_patterns.is_empty()) { + new_set_counts.insert(ss); } } - let mut set_counts: Vec = Vec::from_iter(new_set_counts.keys().cloned()); + let mut set_counts: Vec = new_set_counts.into_iter().collect(); set_counts.sort_unstable(); set_counts } -fn get_set_indexes( - args: &Vec, - totalsizes: &Vec, - set_div_count: usize, - arg_patterns: &Vec, -) -> Result>> { - if args.is_empty() || totalsizes.is_empty() { +/// returns list of indexes which provides the set size +/// on each index, this function also determines the final set size +/// The final set size has the affinity towards choosing smaller +/// indexes (total sets) +fn get_set_indexes>(args: &[T], total_sizes: &[usize], arg_patterns: &[ArgPattern]) -> Result>> { + if args.is_empty() || total_sizes.is_empty() { return Err(Error::msg("Invalid argument")); } - for size in totalsizes.iter() { - if size.lt(&SET_SIZES[0]) || size < &set_div_count { - return Err(Error::msg(format!( - "Incorrect number of endpoints provided,size {}", - size - ))); + for &size in total_sizes { + // Check if total_sizes has minimum range upto set_size + if size < SET_SIZES[0] { + return Err(Error::msg(format!("Incorrect number of endpoints provided, size {}", size))); } } - let common_size = get_divisible_size(totalsizes); + let common_size = get_divisible_size(total_sizes); let mut set_counts = possible_set_counts(common_size); if set_counts.is_empty() { - return Err(Error::msg("Incorrect number of endpoints provided2")); + return Err(Error::msg(format!( + "Incorrect number of endpoints provided, number of drives {} is not divisible by any supported erasure set sizes {}", + common_size, 0 + ))); } - let set_size; + // TODO Add custom set drive count - if set_div_count > 0 { - let mut found = false; - for ss in set_counts { - if ss == set_div_count { - found = true - } - } - - if !found { - return Err(Error::msg("Invalid set drive count.")); - } - - set_size = set_div_count - // TODO globalCustomErasureDriveCount = true - } else { - set_counts = possible_set_counts_with_symmetry(set_counts, arg_patterns); - - if set_counts.is_empty() { - return Err(Error::msg( - "No symmetric distribution detected with input endpoints provided", - )); - } - - set_size = common_set_drive_count(common_size, set_counts); + // Returns possible set counts with symmetry. + set_counts = possible_set_counts_with_symmetry(&set_counts, arg_patterns); + if set_counts.is_empty() { + return Err(Error::msg("No symmetric distribution detected with input endpoints provided")); } + // Final set size with all the symmetry accounted for. + let set_size = common_set_drive_count(common_size, &set_counts); if !is_valid_set_size(set_size) { return Err(Error::msg("Incorrect number of endpoints provided3")); } - let mut set_indexs = Vec::with_capacity(totalsizes.len()); - - for size in totalsizes.iter() { - let mut sizes = Vec::with_capacity(size / set_size); - for _ in 0..size / set_size { - sizes.push(set_size); - } - - set_indexs.push(sizes) - } - - Ok(set_indexs) + Ok(total_sizes + .iter() + .map(|&size| (0..(size / set_size)).map(|_| set_size).collect()) + .collect()) } -fn get_total_sizes(arg_patterns: &Vec) -> Vec { - let mut sizes = Vec::with_capacity(arg_patterns.len()); - for ap in arg_patterns { - let mut size = 1; - for p in ap.inner.iter() { - size *= p.seq.len() - } - - sizes.push(size) - } - sizes +/// Return the total size for each argument patterns. +fn get_total_sizes(arg_patterns: &[ArgPattern]) -> Vec { + arg_patterns.iter().map(|v| v.total_sizes()).collect() } #[cfg(test)] mod test { use super::*; + #[test] + fn test_get_divisible_size() { + struct TestCase { + total_sizes: Vec, + result: usize, + } + + let test_cases = [ + TestCase { + total_sizes: vec![24, 32, 16], + result: 8, + }, + TestCase { + total_sizes: vec![32, 8, 4], + result: 4, + }, + TestCase { + total_sizes: vec![8, 8, 8], + result: 8, + }, + TestCase { + total_sizes: vec![24], + result: 24, + }, + ]; + + for (i, test_case) in test_cases.iter().enumerate() { + let ret = get_divisible_size(&test_case.total_sizes); + assert_eq!(ret, test_case.result, "Test{}: Expected {}, got {}", i + 1, test_case.result, ret); + } + } + #[test] fn test_parse_disks_layout_from_env_args() { // let pattern = String::from("http://[2001:3984:3989::{001...002}]/disk{1...4}"); @@ -358,10 +342,10 @@ mod test { let mut args = Vec::new(); args.push(pattern); - match DisksLayout::new(&args) { + match DisksLayout::try_from(args.as_slice()) { Ok(set) => { for pool in set.pools { - println!("cmd: {:?}", pool.cmdline); + println!("cmd: {:?}", pool.cmd_line); for (i, set) in pool.layout.iter().enumerate() { for (j, v) in set.iter().enumerate() { diff --git a/ecstore/src/ellipses.rs b/ecstore/src/ellipses.rs index 51122f49..75a0b95a 100644 --- a/ecstore/src/ellipses.rs +++ b/ecstore/src/ellipses.rs @@ -76,6 +76,11 @@ impl ArgPattern { ret } + + /// returns the total number of sizes in the given patterns. + pub fn total_sizes(&self) -> usize { + self.inner.iter().map(|v| v.seq.len()).sum() + } } /// finds all ellipses patterns, recursively and parses the ranges numerically. diff --git a/ecstore/src/endpoint.rs b/ecstore/src/endpoint.rs index ca0ab7f9..8cfc8b9e 100644 --- a/ecstore/src/endpoint.rs +++ b/ecstore/src/endpoint.rs @@ -135,9 +135,7 @@ impl Endpoint { } if !(url.scheme() == "http" || url.scheme() == "https") { - return Err(Error::msg( - "URL endpoint格式无效: Scheme字段必须包含'http'或'https'", - )); + return Err(Error::msg("URL endpoint格式无效: Scheme字段必须包含'http'或'https'")); } // 检查路径 @@ -209,11 +207,7 @@ impl Endpoint { fn update_islocal(&mut self) -> Result<()> { if self.url.has_host() { - self.is_local = is_local_host( - self.url.host().unwrap(), - self.url.port().unwrap(), - DEFAULT_PORT, - ); + self.is_local = is_local_host(self.url.host().unwrap(), self.url.port().unwrap(), DEFAULT_PORT); } Ok(()) @@ -351,7 +345,7 @@ impl EndpointServerPools { set_count: pool_args[i].layout.len(), drives_per_set: pool_args[i].layout[0].len(), endpoints: eps.clone(), - cmd_line: pool_args[i].cmdline.clone(), + cmd_line: pool_args[i].cmd_line.clone(), platform: String::new(), }; @@ -448,10 +442,7 @@ fn is_empty_layout(pools_layout: &Vec) -> bool { return true; } let first_layout = &pools_layout[0]; - if first_layout.layout.is_empty() - || first_layout.layout[0].is_empty() - || first_layout.layout[0][0].is_empty() - { + if first_layout.layout.is_empty() || first_layout.layout[0].is_empty() || first_layout.layout[0][0].is_empty() { return true; } false @@ -459,20 +450,14 @@ fn is_empty_layout(pools_layout: &Vec) -> bool { // 检查是否是单驱动器布局 fn is_single_drive_layout(pools_layout: &Vec) -> bool { - if pools_layout.len() == 1 - && pools_layout[0].layout.len() == 1 - && pools_layout[0].layout[0].len() == 1 - { + if pools_layout.len() == 1 && pools_layout[0].layout.len() == 1 && pools_layout[0].layout[0].len() == 1 { true } else { false } } -pub fn create_pool_endpoints( - server_addr: String, - pools: &Vec, -) -> Result<(Vec, SetupType)> { +pub fn create_pool_endpoints(server_addr: String, pools: &Vec) -> Result<(Vec, SetupType)> { if is_empty_layout(pools) { return Err(Error::msg("empty layout")); } @@ -586,7 +571,7 @@ fn create_server_endpoints( set_count: pool_args[i].layout.len(), drives_per_set: pool_args[i].layout[0].len(), endpoints: eps.clone(), - cmd_line: pool_args[i].cmdline.clone(), + cmd_line: pool_args[i].cmd_line.clone(), platform: String::new(), }; @@ -622,18 +607,14 @@ mod test { #[test] fn test_create_server_endpoints() { - let cases = vec![( - ":9000", - vec!["http://localhost:900{1...2}/export{1...64}".to_string()], - )]; + let cases = vec![(":9000", vec!["http://localhost:900{1...2}/export{1...64}".to_string()])]; for (addr, args) in cases { - let layouts = DisksLayout::new(&args).unwrap(); + let layouts = DisksLayout::try_from(args.as_slice()).unwrap(); println!("layouts:{:?},{}", &layouts.pools, &layouts.legacy); - let (server_pool, setup_type) = - create_server_endpoints(addr.to_string(), &layouts.pools, layouts.legacy).unwrap(); + let (server_pool, setup_type) = create_server_endpoints(addr.to_string(), &layouts.pools, layouts.legacy).unwrap(); println!("setup_type -- {:?}", setup_type); println!("server_pool == {:?}", server_pool);