fix: 优化disks_layout

This commit is contained in:
shiro.lee
2024-07-02 18:51:50 +08:00
parent d1a62f6697
commit a225663049
4 changed files with 208 additions and 238 deletions

View File

@@ -9,7 +9,7 @@ rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio.workspace = true
tokio = { workspace = true, features = ["io-util"] }
bytes.workspace = true
thiserror.workspace = true
futures.workspace = true

View File

@@ -1,111 +1,111 @@
use std::collections::{HashMap, HashSet};
use super::ellipses::*;
use anyhow::{Error, Result};
use serde::Deserialize;
use std::collections::HashSet;
use super::ellipses::*;
/// Supported set sizes this is used to find the optimal
/// single set size.
const SET_SIZES: [usize; 15] = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
#[derive(Deserialize, Debug, Default)]
pub struct PoolDisksLayout {
pub cmdline: String,
pub cmd_line: String,
pub layout: Vec<Vec<String>>,
}
impl PoolDisksLayout {
pub fn new(args: impl Into<String>, layout: Vec<Vec<String>>) -> Self {
PoolDisksLayout {
cmd_line: args.into(),
layout,
}
}
}
#[derive(Deserialize, Debug, Default)]
pub struct DisksLayout {
pub legacy: bool,
pub pools: Vec<PoolDisksLayout>,
}
impl DisksLayout {
pub fn new(args: &Vec<String>) -> Result<DisksLayout> {
impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
type Error = Error;
fn try_from(args: &[T]) -> Result<Self, Self::Error> {
if args.is_empty() {
return Err(Error::msg("Invalid argument"));
}
let mut ok = true;
for arg in args.iter() {
ok = ok && !has_ellipses(&vec![arg.to_string()])
}
let is_ellipses = args.iter().any(|v| has_ellipses(&[v]));
// TODO: from env
let set_drive_count: usize = 0;
if ok {
let set_args = get_all_sets(set_drive_count, &args)?;
// None of the args have ellipses use the old style.
if !is_ellipses {
let set_args = get_all_sets(is_ellipses, args)?;
return Ok(DisksLayout {
legacy: true,
pools: vec![PoolDisksLayout {
layout: set_args,
cmdline: args.join(" "),
}],
pools: vec![PoolDisksLayout::new(
args.iter().map(AsRef::as_ref).collect::<Vec<&str>>().join(" "),
set_args,
)],
});
}
let mut ret = DisksLayout {
pools: Vec::new(),
..Default::default()
};
let mut layout = Vec::with_capacity(args.len());
for arg in args.iter() {
let varg = vec![arg.to_string()];
if !has_ellipses(&varg) && args.len() > 1 {
return Err(Error::msg("所有参数必须包含省略号以用于池扩展"));
if !has_ellipses(&[arg]) && args.len() > 1 {
return Err(Error::msg("all args must have ellipses for pool expansion (Invalid arguments specified)"));
}
let set_args = get_all_sets(set_drive_count, &varg)?;
let set_args = get_all_sets(is_ellipses, &[arg])?;
ret.pools.push(PoolDisksLayout {
layout: set_args,
cmdline: arg.clone(),
})
layout.push(PoolDisksLayout::new(arg.as_ref(), set_args));
}
Ok(ret)
Ok(DisksLayout {
legacy: false,
pools: layout,
})
}
}
fn get_all_sets(set_drive_count: usize, args: &Vec<String>) -> Result<Vec<Vec<String>>> {
let set_args;
if !has_ellipses(args) {
let set_indexes: Vec<Vec<usize>>;
if args.len() > 1 {
let totalsizes = vec![args.len()];
set_indexes = get_set_indexes(args, &totalsizes, set_drive_count, &Vec::new())?;
} else {
set_indexes = vec![vec![args.len()]];
}
let mut s = EndpointSet {
endpoints: args.clone(),
set_indexes,
..Default::default()
};
set_args = s.get();
/// parses all ellipses input arguments, expands them into
/// corresponding list of endpoints chunked evenly in accordance with a
/// specific set size.
///
/// For example: {1...64} is divided into 4 sets each of size 16.
/// This applies to even distributed setup syntax as well.
fn get_all_sets<T: AsRef<str>>(is_ellipses: bool, args: &[T]) -> Result<Vec<Vec<String>>> {
let endpoint_set = if is_ellipses {
EndpointSet::try_from(args)?
} else {
let mut s = EndpointSet::new(args, set_drive_count)?;
set_args = s.get();
}
let set_indexes = if args.len() > 1 {
get_set_indexes(args, &[args.len()], &[])?
} else {
vec![vec![args.len()]]
};
let endpoints = args.iter().map(|v| v.as_ref().to_string()).collect();
let mut seen = HashSet::with_capacity(set_args.len());
EndpointSet::new(endpoints, set_indexes)
};
let set_args = endpoint_set.get();
let mut unique_args = HashSet::with_capacity(set_args.len());
for args in set_args.iter() {
for arg in args {
if seen.contains(arg) {
return Err(Error::msg(format!(
"Input args {} has duplicate ellipses",
arg
)));
if unique_args.contains(arg) {
return Err(Error::msg(format!("Input args {} has duplicate ellipses", arg)));
}
seen.insert(arg);
unique_args.insert(arg);
}
}
Ok(set_args)
}
/// represents parsed ellipses values, also provides
/// methods to get the sets of endpoints.
#[derive(Debug, Default)]
pub struct EndpointSet {
pub arg_patterns: Vec<ArgPattern>,
@@ -113,90 +113,74 @@ pub struct EndpointSet {
pub set_indexes: Vec<Vec<usize>>,
}
impl EndpointSet {
pub fn new(args: &Vec<String>, set_div_count: usize) -> Result<EndpointSet> {
impl<T: AsRef<str>> TryFrom<&[T]> for EndpointSet {
type Error = Error;
fn try_from(args: &[T]) -> Result<Self, Self::Error> {
let mut arg_patterns = Vec::with_capacity(args.len());
for arg in args.iter() {
arg_patterns.push(find_ellipses_patterns(arg.as_str())?);
for arg in args {
arg_patterns.push(find_ellipses_patterns(arg.as_ref())?);
}
let totalsizes = get_total_sizes(&arg_patterns);
let set_indexes = get_set_indexes(args, &totalsizes, set_div_count, &arg_patterns)?;
Ok(EndpointSet {
set_indexes,
arg_patterns,
..Default::default()
})
}
pub fn get(&mut self) -> Vec<Vec<String>> {
let mut sets: Vec<Vec<String>> = Vec::new();
let eps = self.get_endpoints();
let mut start = 0;
for sidx in self.set_indexes.iter() {
for idx in sidx {
let end = idx + start;
sets.push(eps[start..end].to_vec());
start = end;
}
}
sets
}
fn get_endpoints(&mut self) -> Vec<String> {
if !self.endpoints.is_empty() {
return self.endpoints.clone();
}
let total_sizes = get_total_sizes(&arg_patterns);
let set_indexes = get_set_indexes(args, &total_sizes, &arg_patterns)?;
let mut endpoints = Vec::new();
for ap in self.arg_patterns.iter() {
for ap in arg_patterns.iter() {
let aps = ap.expand();
for bs in aps {
endpoints.push(bs.join(""));
}
}
self.endpoints = endpoints;
self.endpoints.clone()
Ok(EndpointSet {
set_indexes,
arg_patterns,
endpoints,
})
}
}
// fn parse_endpoint_set(set_div_count: usize, args: &Vec<String>) -> Result<EndpointSet> {
// let mut arg_patterns = Vec::with_capacity(args.len());
// for arg in args.iter() {
// arg_patterns.push(find_ellipses_patterns(arg.as_str())?);
// }
// let totalsizes = get_total_sizes(&arg_patterns);
// let set_indexes = get_set_indexes(args, &totalsizes, set_div_count, &arg_patterns)?;
// Ok(EndpointSet {
// set_indexes,
// arg_patterns,
// ..Default::default()
// })
// }
static SET_SIZES: [usize; 15] = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
fn gcd(mut x: usize, mut y: usize) -> usize {
while y != 0 {
let t = y;
y = x % y;
x = t;
impl EndpointSet {
/// Create a new EndpointSet with the given endpoints and set indexes.
pub fn new(endpoints: Vec<String>, set_indexes: Vec<Vec<usize>>) -> Self {
Self {
endpoints,
set_indexes,
..Default::default()
}
}
/// returns the sets representation of the endpoints
/// this function also intelligently decides on what will
/// be the right set size etc.
pub fn get(&self) -> Vec<Vec<String>> {
let mut sets: Vec<Vec<String>> = Vec::new();
let mut start = 0;
for set_idx in self.set_indexes.iter() {
for idx in set_idx {
let end = idx + start;
sets.push(self.endpoints[start..end].to_vec());
start = end;
}
}
sets
}
x
}
fn get_divisible_size(totalsizes: &Vec<usize>) -> usize {
let mut ret = totalsizes[0];
for s in totalsizes.iter() {
ret = gcd(ret, *s)
/// returns a greatest common divisor of all the ellipses sizes.
fn get_divisible_size(total_sizes: &[usize]) -> usize {
fn gcd(mut x: usize, mut y: usize) -> usize {
while y != 0 {
// be equivalent to: x, y = y, x%y
std::mem::swap(&mut x, &mut y);
y %= x;
}
x
}
ret
total_sizes.iter().skip(1).fold(total_sizes[0], |acc, &y| gcd(acc, y))
}
fn possible_set_counts(set_size: usize) -> Vec<usize> {
@@ -209,19 +193,21 @@ fn possible_set_counts(set_size: usize) -> Vec<usize> {
ss
}
/// checks whether given count is a valid set size for erasure coding.
fn is_valid_set_size(count: usize) -> bool {
&count >= SET_SIZES.first().unwrap() && &count <= SET_SIZES.last().unwrap()
count >= SET_SIZES[0] && count <= SET_SIZES[SET_SIZES.len() - 1]
}
fn common_set_drive_count(divisible_size: usize, set_counts: Vec<usize>) -> usize {
/// Final set size with all the symmetry accounted for.
fn common_set_drive_count(divisible_size: usize, set_counts: &[usize]) -> usize {
// prefers set_counts to be sorted for optimal behavior.
if &divisible_size < set_counts.last().unwrap_or(&0) {
if divisible_size < set_counts[set_counts.len() - 1] {
return divisible_size;
}
let mut prev_d = divisible_size / set_counts[0];
let mut set_size = 0;
for cnt in set_counts {
for &cnt in set_counts {
if divisible_size % cnt == 0 {
let d = divisible_size / cnt;
if d <= prev_d {
@@ -233,123 +219,121 @@ fn common_set_drive_count(divisible_size: usize, set_counts: Vec<usize>) -> usiz
set_size
}
fn possible_set_counts_with_symmetry(
set_counts: Vec<usize>,
arg_patterns: &Vec<ArgPattern>,
) -> Vec<usize> {
let mut new_set_counts: HashMap<usize, ()> = HashMap::new();
/// returns symmetrical setCounts based on the input argument patterns,
/// the symmetry calculation is to ensure that we also use uniform number
/// of drives common across all ellipses patterns.
fn possible_set_counts_with_symmetry(set_counts: &[usize], arg_patterns: &[ArgPattern]) -> Vec<usize> {
let mut new_set_counts: HashSet<usize> = HashSet::new();
for ss in set_counts {
for &ss in set_counts {
let mut symmetry = false;
for arg_pattern in arg_patterns {
for p in arg_pattern.inner.iter() {
if p.seq.len() > ss {
symmetry = p.seq.len() % ss == 0;
symmetry = (p.seq.len() % ss) == 0;
} else {
symmetry = ss % p.seq.len() == 0;
symmetry = (ss % p.seq.len()) == 0;
}
}
}
if !new_set_counts.contains_key(&ss) && (symmetry || arg_patterns.is_empty()) {
new_set_counts.insert(ss, ());
if !new_set_counts.contains(&ss) && (symmetry || arg_patterns.is_empty()) {
new_set_counts.insert(ss);
}
}
let mut set_counts: Vec<usize> = Vec::from_iter(new_set_counts.keys().cloned());
let mut set_counts: Vec<usize> = new_set_counts.into_iter().collect();
set_counts.sort_unstable();
set_counts
}
fn get_set_indexes(
args: &Vec<String>,
totalsizes: &Vec<usize>,
set_div_count: usize,
arg_patterns: &Vec<ArgPattern>,
) -> Result<Vec<Vec<usize>>> {
if args.is_empty() || totalsizes.is_empty() {
/// returns list of indexes which provides the set size
/// on each index, this function also determines the final set size
/// The final set size has the affinity towards choosing smaller
/// indexes (total sets)
fn get_set_indexes<T: AsRef<str>>(args: &[T], total_sizes: &[usize], arg_patterns: &[ArgPattern]) -> Result<Vec<Vec<usize>>> {
if args.is_empty() || total_sizes.is_empty() {
return Err(Error::msg("Invalid argument"));
}
for size in totalsizes.iter() {
if size.lt(&SET_SIZES[0]) || size < &set_div_count {
return Err(Error::msg(format!(
"Incorrect number of endpoints provided,size {}",
size
)));
for &size in total_sizes {
// Check if total_sizes has minimum range upto set_size
if size < SET_SIZES[0] {
return Err(Error::msg(format!("Incorrect number of endpoints provided, size {}", size)));
}
}
let common_size = get_divisible_size(totalsizes);
let common_size = get_divisible_size(total_sizes);
let mut set_counts = possible_set_counts(common_size);
if set_counts.is_empty() {
return Err(Error::msg("Incorrect number of endpoints provided2"));
return Err(Error::msg(format!(
"Incorrect number of endpoints provided, number of drives {} is not divisible by any supported erasure set sizes {}",
common_size, 0
)));
}
let set_size;
// TODO Add custom set drive count
if set_div_count > 0 {
let mut found = false;
for ss in set_counts {
if ss == set_div_count {
found = true
}
}
if !found {
return Err(Error::msg("Invalid set drive count."));
}
set_size = set_div_count
// TODO globalCustomErasureDriveCount = true
} else {
set_counts = possible_set_counts_with_symmetry(set_counts, arg_patterns);
if set_counts.is_empty() {
return Err(Error::msg(
"No symmetric distribution detected with input endpoints provided",
));
}
set_size = common_set_drive_count(common_size, set_counts);
// Returns possible set counts with symmetry.
set_counts = possible_set_counts_with_symmetry(&set_counts, arg_patterns);
if set_counts.is_empty() {
return Err(Error::msg("No symmetric distribution detected with input endpoints provided"));
}
// Final set size with all the symmetry accounted for.
let set_size = common_set_drive_count(common_size, &set_counts);
if !is_valid_set_size(set_size) {
return Err(Error::msg("Incorrect number of endpoints provided3"));
}
let mut set_indexs = Vec::with_capacity(totalsizes.len());
for size in totalsizes.iter() {
let mut sizes = Vec::with_capacity(size / set_size);
for _ in 0..size / set_size {
sizes.push(set_size);
}
set_indexs.push(sizes)
}
Ok(set_indexs)
Ok(total_sizes
.iter()
.map(|&size| (0..(size / set_size)).map(|_| set_size).collect())
.collect())
}
fn get_total_sizes(arg_patterns: &Vec<ArgPattern>) -> Vec<usize> {
let mut sizes = Vec::with_capacity(arg_patterns.len());
for ap in arg_patterns {
let mut size = 1;
for p in ap.inner.iter() {
size *= p.seq.len()
}
sizes.push(size)
}
sizes
/// Return the total size for each argument patterns.
fn get_total_sizes(arg_patterns: &[ArgPattern]) -> Vec<usize> {
arg_patterns.iter().map(|v| v.total_sizes()).collect()
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_get_divisible_size() {
struct TestCase {
total_sizes: Vec<usize>,
result: usize,
}
let test_cases = [
TestCase {
total_sizes: vec![24, 32, 16],
result: 8,
},
TestCase {
total_sizes: vec![32, 8, 4],
result: 4,
},
TestCase {
total_sizes: vec![8, 8, 8],
result: 8,
},
TestCase {
total_sizes: vec![24],
result: 24,
},
];
for (i, test_case) in test_cases.iter().enumerate() {
let ret = get_divisible_size(&test_case.total_sizes);
assert_eq!(ret, test_case.result, "Test{}: Expected {}, got {}", i + 1, test_case.result, ret);
}
}
#[test]
fn test_parse_disks_layout_from_env_args() {
// let pattern = String::from("http://[2001:3984:3989::{001...002}]/disk{1...4}");
@@ -358,10 +342,10 @@ mod test {
let mut args = Vec::new();
args.push(pattern);
match DisksLayout::new(&args) {
match DisksLayout::try_from(args.as_slice()) {
Ok(set) => {
for pool in set.pools {
println!("cmd: {:?}", pool.cmdline);
println!("cmd: {:?}", pool.cmd_line);
for (i, set) in pool.layout.iter().enumerate() {
for (j, v) in set.iter().enumerate() {

View File

@@ -76,6 +76,11 @@ impl ArgPattern {
ret
}
/// returns the total number of sizes in the given patterns.
pub fn total_sizes(&self) -> usize {
self.inner.iter().map(|v| v.seq.len()).sum()
}
}
/// finds all ellipses patterns, recursively and parses the ranges numerically.

View File

@@ -135,9 +135,7 @@ impl Endpoint {
}
if !(url.scheme() == "http" || url.scheme() == "https") {
return Err(Error::msg(
"URL endpoint格式无效: Scheme字段必须包含'http'或'https'",
));
return Err(Error::msg("URL endpoint格式无效: Scheme字段必须包含'http'或'https'"));
}
// 检查路径
@@ -209,11 +207,7 @@ impl Endpoint {
fn update_islocal(&mut self) -> Result<()> {
if self.url.has_host() {
self.is_local = is_local_host(
self.url.host().unwrap(),
self.url.port().unwrap(),
DEFAULT_PORT,
);
self.is_local = is_local_host(self.url.host().unwrap(), self.url.port().unwrap(), DEFAULT_PORT);
}
Ok(())
@@ -351,7 +345,7 @@ impl EndpointServerPools {
set_count: pool_args[i].layout.len(),
drives_per_set: pool_args[i].layout[0].len(),
endpoints: eps.clone(),
cmd_line: pool_args[i].cmdline.clone(),
cmd_line: pool_args[i].cmd_line.clone(),
platform: String::new(),
};
@@ -448,10 +442,7 @@ fn is_empty_layout(pools_layout: &Vec<PoolDisksLayout>) -> bool {
return true;
}
let first_layout = &pools_layout[0];
if first_layout.layout.is_empty()
|| first_layout.layout[0].is_empty()
|| first_layout.layout[0][0].is_empty()
{
if first_layout.layout.is_empty() || first_layout.layout[0].is_empty() || first_layout.layout[0][0].is_empty() {
return true;
}
false
@@ -459,20 +450,14 @@ fn is_empty_layout(pools_layout: &Vec<PoolDisksLayout>) -> bool {
// 检查是否是单驱动器布局
fn is_single_drive_layout(pools_layout: &Vec<PoolDisksLayout>) -> bool {
if pools_layout.len() == 1
&& pools_layout[0].layout.len() == 1
&& pools_layout[0].layout[0].len() == 1
{
if pools_layout.len() == 1 && pools_layout[0].layout.len() == 1 && pools_layout[0].layout[0].len() == 1 {
true
} else {
false
}
}
pub fn create_pool_endpoints(
server_addr: String,
pools: &Vec<PoolDisksLayout>,
) -> Result<(Vec<Endpoints>, SetupType)> {
pub fn create_pool_endpoints(server_addr: String, pools: &Vec<PoolDisksLayout>) -> Result<(Vec<Endpoints>, SetupType)> {
if is_empty_layout(pools) {
return Err(Error::msg("empty layout"));
}
@@ -586,7 +571,7 @@ fn create_server_endpoints(
set_count: pool_args[i].layout.len(),
drives_per_set: pool_args[i].layout[0].len(),
endpoints: eps.clone(),
cmd_line: pool_args[i].cmdline.clone(),
cmd_line: pool_args[i].cmd_line.clone(),
platform: String::new(),
};
@@ -622,18 +607,14 @@ mod test {
#[test]
fn test_create_server_endpoints() {
let cases = vec![(
":9000",
vec!["http://localhost:900{1...2}/export{1...64}".to_string()],
)];
let cases = vec![(":9000", vec!["http://localhost:900{1...2}/export{1...64}".to_string()])];
for (addr, args) in cases {
let layouts = DisksLayout::new(&args).unwrap();
let layouts = DisksLayout::try_from(args.as_slice()).unwrap();
println!("layouts:{:?},{}", &layouts.pools, &layouts.legacy);
let (server_pool, setup_type) =
create_server_endpoints(addr.to_string(), &layouts.pools, layouts.legacy).unwrap();
let (server_pool, setup_type) = create_server_endpoints(addr.to_string(), &layouts.pools, layouts.legacy).unwrap();
println!("setup_type -- {:?}", setup_type);
println!("server_pool == {:?}", server_pool);