rename ecstore

This commit is contained in:
weisd
2024-06-25 15:45:21 +08:00
parent 7634eea4a2
commit 0a9f06096b
19 changed files with 2844 additions and 57 deletions

938
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[workspace]
resolver = "2"
members = ["rustfs", "store"]
members = ["rustfs", "ecstore"]
[workspace.package]
edition = "2021"
@@ -18,4 +18,6 @@ http = "1.1.0"
thiserror = "1.0.61"
time = "0.3.36"
async-trait = "0.1.80"
tokio = { version = "1.38.0", features = ["macros", "rt", "rt-multi-thread"] }
# tokio = { version = "1.38.0", features = ["macros", "rt", "rt-multi-thread", "fs", "io-util"] }
tokio = { version = "1.38.0", features = ["full"] }
tokio-util = { version = "0.7.8", features = ["io"] }

View File

@@ -1,5 +1,5 @@
[package]
name = "store"
name = "ecstore"
version = "0.1.0"
edition.workspace = true
license.workspace = true
@@ -17,3 +17,11 @@ bytes.workspace = true
tokio.workspace = true
thiserror.workspace = true
futures.workspace = true
anyhow = "1.0.86"
serde.workspace = true
lazy_static = "1.5.0"
regex = "1.10.5"
netif = "0.1.6"
async-trait = "0.1.80"
s3s = "0.10.0"
tracing.workspace = true

376
ecstore/src/disks_layout.rs Normal file
View File

@@ -0,0 +1,376 @@
use std::collections::{HashMap, HashSet};
use anyhow::Error;
use serde::Deserialize;
use super::ellipses::*;
#[derive(Deserialize, Debug, Default)]
pub struct PoolDisksLayout {
pub cmdline: String,
pub layout: Vec<Vec<String>>,
}
#[derive(Deserialize, Debug, Default)]
pub struct DisksLayout {
pub legacy: bool,
pub pools: Vec<PoolDisksLayout>,
}
impl DisksLayout {
pub fn new(args: Vec<String>) -> Result<DisksLayout, Error> {
if args.is_empty() {
return Err(Error::msg("Invalid argument"));
}
let mut ok = true;
for arg in args.iter() {
ok = ok && !has_ellipses(&vec![arg.to_string()])
}
// TODO: from env
let set_drive_count: usize = 0;
if ok {
let set_args = get_all_sets(set_drive_count, &args)?;
return Ok(DisksLayout {
legacy: true,
pools: vec![PoolDisksLayout {
layout: set_args,
cmdline: args.join(" "),
}],
});
}
let mut ret = DisksLayout {
pools: Vec::new(),
..Default::default()
};
for arg in args.iter() {
let varg = vec![arg.to_string()];
if !has_ellipses(&varg) && args.len() > 1 {
return Err(Error::msg("所有参数必须包含省略号以用于池扩展"));
}
let set_args = get_all_sets(set_drive_count, &varg)?;
ret.pools.push(PoolDisksLayout {
layout: set_args,
cmdline: arg.clone(),
})
}
Ok(ret)
}
}
fn get_all_sets(set_drive_count: usize, args: &Vec<String>) -> Result<Vec<Vec<String>>, Error> {
let set_args;
if !has_ellipses(args) {
let set_indexes: Vec<Vec<usize>>;
if args.len() > 1 {
let totalsizes = vec![args.len()];
set_indexes = get_set_indexes(args, &totalsizes, set_drive_count, &Vec::new())?;
} else {
set_indexes = vec![vec![args.len()]];
}
let mut s = EndpointSet {
endpoints: args.clone(),
set_indexes,
..Default::default()
};
set_args = s.get();
} else {
let mut s = EndpointSet::new(args, set_drive_count)?;
set_args = s.get();
}
let mut seen = HashSet::with_capacity(set_args.len());
for args in set_args.iter() {
for arg in args {
if seen.contains(arg) {
return Err(Error::msg(format!(
"Input args {} has duplicate ellipses",
arg
)));
}
seen.insert(arg);
}
}
Ok(set_args)
}
#[derive(Debug, Default)]
pub struct EndpointSet {
pub arg_patterns: Vec<ArgPattern>,
pub endpoints: Vec<String>,
pub set_indexes: Vec<Vec<usize>>,
}
impl EndpointSet {
pub fn new(args: &Vec<String>, set_div_count: usize) -> Result<EndpointSet, Error> {
let mut arg_patterns = Vec::with_capacity(args.len());
for arg in args.iter() {
arg_patterns.push(find_ellipses_patterns(arg.as_str())?);
}
let totalsizes = get_total_sizes(&arg_patterns);
let set_indexes = get_set_indexes(args, &totalsizes, set_div_count, &arg_patterns)?;
Ok(EndpointSet {
set_indexes,
arg_patterns,
..Default::default()
})
}
pub fn get(&mut self) -> Vec<Vec<String>> {
let mut sets: Vec<Vec<String>> = Vec::new();
let eps = self.get_endpoints();
let mut start = 0;
for sidx in self.set_indexes.iter() {
for idx in sidx {
let end = idx + start;
sets.push(eps[start..end].to_vec());
start = end;
}
}
sets
}
fn get_endpoints(&mut self) -> Vec<String> {
if !self.endpoints.is_empty() {
return self.endpoints.clone();
}
let mut endpoints = Vec::new();
for ap in self.arg_patterns.iter() {
let aps = ap.expand();
for bs in aps {
endpoints.push(bs.join(""));
}
}
self.endpoints = endpoints;
self.endpoints.clone()
}
}
// fn parse_endpoint_set(set_div_count: usize, args: &Vec<String>) -> Result<EndpointSet, Error> {
// let mut arg_patterns = Vec::with_capacity(args.len());
// for arg in args.iter() {
// arg_patterns.push(find_ellipses_patterns(arg.as_str())?);
// }
// let totalsizes = get_total_sizes(&arg_patterns);
// let set_indexes = get_set_indexes(args, &totalsizes, set_div_count, &arg_patterns)?;
// Ok(EndpointSet {
// set_indexes,
// arg_patterns,
// ..Default::default()
// })
// }
static SET_SIZES: [usize; 15] = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
fn gcd(mut x: usize, mut y: usize) -> usize {
while y != 0 {
let t = y;
y = x % y;
x = t;
}
x
}
fn get_divisible_size(totalsizes: &Vec<usize>) -> usize {
let mut ret = totalsizes[0];
for s in totalsizes.iter() {
ret = gcd(ret, *s)
}
ret
}
fn possible_set_counts(set_size: usize) -> Vec<usize> {
let mut ss = Vec::new();
for s in SET_SIZES {
if set_size % s == 0 {
ss.push(s);
}
}
ss
}
fn is_valid_set_size(count: usize) -> bool {
&count >= SET_SIZES.first().unwrap() && &count <= SET_SIZES.last().unwrap()
}
fn common_set_drive_count(divisible_size: usize, set_counts: Vec<usize>) -> usize {
// prefers set_counts to be sorted for optimal behavior.
if &divisible_size < set_counts.last().unwrap_or(&0) {
return divisible_size;
}
let mut prev_d = divisible_size / set_counts[0];
let mut set_size = 0;
for cnt in set_counts {
if divisible_size % cnt == 0 {
let d = divisible_size / cnt;
if d <= prev_d {
prev_d = d;
set_size = cnt;
}
}
}
set_size
}
fn possible_set_counts_with_symmetry(
set_counts: Vec<usize>,
arg_patterns: &Vec<ArgPattern>,
) -> Vec<usize> {
let mut new_set_counts: HashMap<usize, ()> = HashMap::new();
for ss in set_counts {
let mut symmetry = false;
for arg_pattern in arg_patterns {
for p in arg_pattern.inner.iter() {
if p.seq.len() > ss {
symmetry = p.seq.len() % ss == 0;
} else {
symmetry = ss % p.seq.len() == 0;
}
}
}
if !new_set_counts.contains_key(&ss) && (symmetry || arg_patterns.is_empty()) {
new_set_counts.insert(ss, ());
}
}
let mut set_counts: Vec<usize> = Vec::from_iter(new_set_counts.keys().cloned());
set_counts.sort_unstable();
set_counts
}
fn get_set_indexes(
args: &Vec<String>,
totalsizes: &Vec<usize>,
set_div_count: usize,
arg_patterns: &Vec<ArgPattern>,
) -> Result<Vec<Vec<usize>>, Error> {
if args.is_empty() || totalsizes.is_empty() {
return Err(Error::msg("Invalid argument"));
}
for size in totalsizes.iter() {
if size.lt(&SET_SIZES[0]) || size < &set_div_count {
return Err(Error::msg(format!(
"Incorrect number of endpoints provided,size {}",
size
)));
}
}
let common_size = get_divisible_size(totalsizes);
let mut set_counts = possible_set_counts(common_size);
if set_counts.is_empty() {
return Err(Error::msg("Incorrect number of endpoints provided2"));
}
let set_size;
if set_div_count > 0 {
let mut found = false;
for ss in set_counts {
if ss == set_div_count {
found = true
}
}
if !found {
return Err(Error::msg("Invalid set drive count."));
}
set_size = set_div_count
// TODO globalCustomErasureDriveCount = true
} else {
set_counts = possible_set_counts_with_symmetry(set_counts, arg_patterns);
if set_counts.is_empty() {
return Err(Error::msg(
"No symmetric distribution detected with input endpoints provided",
));
}
set_size = common_set_drive_count(common_size, set_counts);
}
if !is_valid_set_size(set_size) {
return Err(Error::msg("Incorrect number of endpoints provided3"));
}
let mut set_indexs = Vec::with_capacity(totalsizes.len());
for size in totalsizes.iter() {
let mut sizes = Vec::with_capacity(size / set_size);
for _ in 0..size / set_size {
sizes.push(set_size);
}
set_indexs.push(sizes)
}
Ok(set_indexs)
}
fn get_total_sizes(arg_patterns: &Vec<ArgPattern>) -> Vec<usize> {
let mut sizes = Vec::with_capacity(arg_patterns.len());
for ap in arg_patterns {
let mut size = 1;
for p in ap.inner.iter() {
size *= p.seq.len()
}
sizes.push(size)
}
sizes
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_parse_disks_layout_from_env_args() {
// let pattern = String::from("http://[2001:3984:3989::{001...002}]/disk{1...4}");
// let pattern = String::from("/export{1...10}/disk{1...10}");
let pattern = String::from("http://rustfs{1...2}:9000/mnt/disk{1...16}");
let mut args = Vec::new();
args.push(pattern);
match DisksLayout::new(args) {
Ok(set) => {
for pool in set.pools {
println!("cmd: {:?}", pool.cmdline);
for (i, set) in pool.layout.iter().enumerate() {
for (j, v) in set.iter().enumerate() {
println!("{:?}.{}: {:?}", i, j, v);
}
}
}
}
Err(err) => println!("{err:?}"),
}
}
}

253
ecstore/src/ellipses.rs Normal file
View File

@@ -0,0 +1,253 @@
use lazy_static::*;
use anyhow::Error;
use regex::Regex;
lazy_static! {
static ref ELLIPSES_RE: Regex = Regex::new(r"(.*)(\{[0-9a-z]*\.\.\.[0-9a-z]*\})(.*)").unwrap();
}
// Ellipses constants
const OPEN_BRACES: &str = "{";
const CLOSE_BRACES: &str = "}";
const ELLIPSES: &str = "...";
#[derive(Debug, Default)]
pub struct Pattern {
pub prefix: String,
pub suffix: String,
pub seq: Vec<String>,
}
impl Pattern {
#[allow(dead_code)]
pub fn expand(&self) -> Vec<String> {
let mut ret = Vec::with_capacity(self.suffix.len());
for v in self.seq.iter() {
if !self.prefix.is_empty() && self.suffix.is_empty() {
ret.push(format!("{}{}", self.prefix, v))
} else if self.prefix.is_empty() && !self.suffix.is_empty() {
ret.push(format!("{}{}", v, self.suffix))
} else if self.prefix.is_empty() && self.suffix.is_empty() {
ret.push(v.to_string())
} else {
ret.push(format!("{}{}{}", self.prefix, v, self.suffix));
}
}
ret
}
}
#[derive(Debug)]
pub struct ArgPattern {
pub inner: Vec<Pattern>,
}
impl ArgPattern {
#[allow(dead_code)]
pub fn new(inner: Vec<Pattern>) -> Self {
Self { inner }
}
#[allow(dead_code)]
pub fn expand(&self) -> Vec<Vec<String>> {
let mut ret = Vec::new();
for v in self.inner.iter() {
ret.push(v.expand());
}
Self::arg_expander(&ret)
}
fn arg_expander(lbs: &Vec<Vec<String>>) -> Vec<Vec<String>> {
let mut ret = Vec::new();
if lbs.len() == 1 {
let arr = lbs.get(0).unwrap();
for bs in arr {
ret.push(vec![bs.to_string()])
}
return ret;
}
let first = &lbs[0];
let (_, other) = lbs.split_at(1);
let others = Vec::from(other);
// let other = lbs[1..lbs.len()];
for bs in first {
let ots = Self::arg_expander(&others);
for obs in ots {
let mut v = obs;
v.push(bs.to_string());
ret.push(v);
}
}
ret
}
}
#[allow(dead_code)]
pub fn find_ellipses_patterns(arg: &str) -> Result<ArgPattern, Error> {
let mut caps = match ELLIPSES_RE.captures(arg) {
Some(caps) => caps,
None => return Err(Error::msg("Invalid argument")),
};
if caps.len() == 0 {
return Err(Error::msg("Invalid format"));
}
let mut pattens = Vec::new();
loop {
let m = match caps.get(1) {
Some(m) => m,
None => break,
};
let cs = match ELLIPSES_RE.captures(m.into()) {
Some(cs) => cs,
None => {
break;
}
};
let seq = caps
.get(2)
.map(|m| parse_ellipses_range(m.into()).unwrap_or(Vec::new()))
.unwrap();
let suffix = caps
.get(3)
.map(|m| m.as_str().to_string())
.unwrap_or(String::new());
pattens.push(Pattern {
suffix,
seq,
..Default::default()
});
if cs.len() > 0 {
caps = cs;
continue;
}
break;
}
if caps.len() > 0 {
let seq = caps
.get(2)
.map(|m| parse_ellipses_range(m.into()).unwrap_or(Vec::new()))
.unwrap();
let suffix = caps
.get(3)
.map(|m| m.as_str().to_string())
.unwrap_or(String::new());
let prefix = caps
.get(1)
.map(|m| m.as_str().to_string())
.unwrap_or(String::new());
pattens.push(Pattern {
prefix,
suffix,
seq,
..Default::default()
});
}
Ok(ArgPattern::new(pattens))
}
// has_ellipse return ture if has
#[allow(dead_code)]
pub fn has_ellipses(s: &Vec<String>) -> bool {
let mut ret = true;
for v in s {
ret =
ret && (v.contains(ELLIPSES) || (v.contains(OPEN_BRACES) && v.contains(CLOSE_BRACES)));
}
ret
}
// Parses an ellipses range pattern of following style
// `{1...64}`
// `{33...64}`
#[allow(dead_code)]
pub fn parse_ellipses_range(partten: &str) -> Result<Vec<String>, Error> {
if !partten.contains(OPEN_BRACES) {
return Err(Error::msg("Invalid argument"));
}
if !partten.contains(OPEN_BRACES) {
return Err(Error::msg("Invalid argument"));
}
let v: Vec<&str> = partten
.trim_start_matches(OPEN_BRACES)
.trim_end_matches(CLOSE_BRACES)
.split(ELLIPSES)
.collect();
if v.len() != 2 {
return Err(Error::msg("Invalid argument"));
}
// let start = usize::from_str_radix(v[0], 16)?;
// let end = usize::from_str_radix(v[1], 16)?;
let start = v[0].parse::<usize>()?;
let end = v[1].parse::<usize>()?;
if start > end {
return Err(Error::msg(
"Invalid argument:range start cannot be bigger than end",
));
}
let mut ret: Vec<String> = Vec::with_capacity(end + 1);
for i in start..end + 1 {
if v[0].starts_with('0') && v[0].len() > 1 {
ret.push(format!("{:0witdth$}", i, witdth = v[0].len()));
} else {
ret.push(format!("{}", i));
}
}
Ok(ret)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_has_ellipses() {
assert_eq!(has_ellipses(vec!["/sdf".to_string()].as_ref()), false);
assert_eq!(has_ellipses(vec!["{1...3}".to_string()].as_ref()), true);
}
#[test]
fn test_parse_ellipses_range() {
let s = "{1...16}";
match parse_ellipses_range(s) {
Ok(res) => {
println!("{:?}", res)
}
Err(err) => println!("{err:?}"),
};
}
#[test]
fn test_find_ellipses_patterns() {
use std::result::Result::Ok;
let pattern = "http://rustfs{1...2}:9000/mnt/disk{1...16}";
// let pattern = "http://[2001:3984:3989::{01...f}]/disk{1...10}";
match find_ellipses_patterns(pattern) {
Ok(caps) => println!("caps{caps:?}"),
Err(err) => println!("{err:?}"),
}
}
}

576
ecstore/src/endpoint.rs Normal file
View File

@@ -0,0 +1,576 @@
use std::{collections::HashMap, net::IpAddr, path::Path, usize};
use super::disks_layout::PoolDisksLayout;
use super::utils::{
net::{is_local_host, split_host_port},
string::new_string_set,
};
use anyhow::Error;
use url::Url;
pub const DEFAULT_PORT: u16 = 9000;
// #[derive(Debug, Clone)]
// struct Node {
// url: url::Url,
// pools: Vec<usize>,
// is_local: bool,
// grid_host: String,
// }
#[derive(PartialEq, Eq)]
pub enum EndpointType {
Undefiend,
PathEndpointType,
URLEndpointType,
}
#[derive(Debug, Clone, PartialEq, Eq, Ord)]
pub struct Node {
pub url: url::Url,
pub pools: Vec<i32>,
pub is_local: bool,
pub grid_host: String, // TODO "scheme://host:port"
}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.grid_host.partial_cmp(&other.grid_host)
}
}
#[derive(Debug, Clone)]
pub struct Endpoint {
pub url: url::Url,
pub is_local: bool,
pub pool_idx: i32,
pub set_idx: i32,
pub disk_idx: i32,
}
// 检查给定路径是否为空或根路径
fn is_empty_path(path: &str) -> bool {
path == "" || path == "/" || path == "\\"
}
// 检查给定字符串是否是IP地址
fn is_host_ip(ip_str: &str) -> bool {
ip_str.parse::<IpAddr>().is_ok()
}
#[tokio::test]
async fn test_new_endpont() {
let arg = "./data";
let ep = Endpoint::new(arg).unwrap();
println!("{:?}", ep);
}
impl Endpoint {
fn new(arg: &str) -> Result<Self, Error> {
if is_empty_path(arg) {
return Err(Error::msg("不支持空或根endpoint"));
}
let url = Url::parse(arg).or_else(|e| match e {
url::ParseError::EmptyHost => Err(Error::msg("远程地址,域名不能为空")),
url::ParseError::IdnaError => Err(Error::msg("域名格式不正确")),
url::ParseError::InvalidPort => Err(Error::msg("端口格式不正确")),
url::ParseError::InvalidIpv4Address => Err(Error::msg("IP格式不正确")),
url::ParseError::InvalidIpv6Address => Err(Error::msg("IP格式不正确")),
url::ParseError::InvalidDomainCharacter => Err(Error::msg("域名字符格式不正确")),
// url::ParseError::RelativeUrlWithoutBase => todo!(),
// url::ParseError::RelativeUrlWithCannotBeABaseBase => todo!(),
// url::ParseError::SetHostOnCannotBeABaseUrl => todo!(),
url::ParseError::Overflow => Err(Error::msg("长度过长")),
_ => {
if is_host_ip(arg) {
return Err(Error::msg("无效的URL endpoint格式: 缺少 http 或 https"));
}
let abs_arg = Path::new(arg).canonicalize()?;
let abs = abs_arg.to_str().ok_or(Error::msg("绝对路径错误"))?;
let url = Url::from_file_path(abs).unwrap();
Ok(url)
}
})?;
if url.scheme() == "file" {
return Ok(Endpoint {
url: url,
is_local: true,
pool_idx: -1,
set_idx: -1,
disk_idx: -1,
});
}
if url.port().is_none() {
return Err(Error::msg("必须提供端口号"));
}
if !(url.scheme() == "http" || url.scheme() == "https") {
return Err(Error::msg(
"URL endpoint格式无效: Scheme字段必须包含'http'或'https'",
));
}
// 检查路径
let path = url.path();
if is_empty_path(path) {
return Err(Error::msg("URL endpoint不支持空或根路径"));
}
// TODO: Windows 系统上的路径处理
#[cfg(windows)]
{
use std::env;
if env::consts::OS == "windows" {
// 处理 Windows 路径的特殊逻辑
}
}
Ok(Endpoint {
url: url,
is_local: false,
pool_idx: -1,
set_idx: -1,
disk_idx: -1,
})
}
// pub fn host_port_str(&self) -> String {
// if self.url.has_host() && self.port() > 0 {
// return format!("{}:{}", self.hostname(), self.port());
// } else if self.url.has_host() && self.port() == 0 {
// return self.hostname().to_string();
// } else if !self.url.has_host() && self.port() > 0 {
// return format!(":{}", self.port());
// } else {
// return String::new();
// }
// }
// pub fn port(&self) -> u16 {
// self.url.port().unwrap_or(0)
// }
// pub fn hostname(&self) -> &str {
// self.url.host_str().unwrap_or("")
// }
pub fn get_type(&self) -> EndpointType {
if self.url.scheme() == "file" {
return EndpointType::PathEndpointType;
}
EndpointType::URLEndpointType
}
pub fn to_string(&self) -> String {
self.url.as_str().to_string()
}
// pub fn get_scheme(&self) -> String {
// self.url.scheme().to_string()
// }
pub fn set_pool_index(&mut self, idx: i32) {
self.pool_idx = idx
}
pub fn set_set_index(&mut self, idx: i32) {
self.set_idx = idx
}
pub fn set_disk_index(&mut self, idx: i32) {
self.disk_idx = idx
}
fn update_islocal(&mut self) -> Result<(), Error> {
if self.url.has_host() {
self.is_local = is_local_host(
self.url.host().unwrap(),
self.url.port().unwrap(),
DEFAULT_PORT,
);
}
Ok(())
}
fn grid_host(&self) -> String {
let host = self.url.host_str().unwrap_or("");
let port = self.url.port().unwrap_or(0);
if port > 0 {
format!("{}://{}:{}", self.url.scheme(), host, port)
} else {
format!("{}://{}", self.url.scheme(), host)
}
}
}
#[derive(Debug, Clone)]
pub struct Endpoints(Vec<Endpoint>);
impl Endpoints {
pub fn new() -> Self {
Self(Vec::new())
}
pub fn iter(&self) -> core::slice::Iter<Endpoint> {
self.0.iter()
}
pub fn iter_mut(&mut self) -> core::slice::IterMut<Endpoint> {
self.0.iter_mut()
}
pub fn slice(&self, start: usize, end: usize) -> Vec<Endpoint> {
self.0.as_slice()[start..end].to_vec()
}
pub fn from_args(args: Vec<String>) -> Result<Self, Error> {
let mut ep_type = EndpointType::Undefiend;
let mut scheme = String::new();
let mut eps = Vec::new();
let mut uniq_args = new_string_set();
for (i, arg) in args.iter().enumerate() {
let endpoint = Endpoint::new(arg)?;
if i == 0 {
ep_type = endpoint.get_type();
scheme = endpoint.url.scheme().to_string();
} else if endpoint.get_type() != ep_type {
return Err(Error::msg("不支持多种endpoints风格"));
} else if endpoint.url.scheme().to_string() != scheme {
return Err(Error::msg("不支持多种scheme"));
}
let arg_str = endpoint.to_string();
if uniq_args.contains(arg_str.as_str()) {
return Err(Error::msg("发现重复 endpoints"));
}
uniq_args.add(arg_str);
eps.push(endpoint.clone());
}
Ok(Endpoints(eps))
}
}
#[warn(dead_code)]
pub struct PoolEndpointList(Vec<Endpoints>);
impl PoolEndpointList {
fn from_vec(v: Vec<Endpoints>) -> Self {
Self(v)
}
pub fn push(&mut self, es: Endpoints) {
self.0.push(es)
}
// TODO: 解析域名,判断哪个是本地地址
fn update_is_local(&mut self) -> Result<(), Error> {
for eps in self.0.iter_mut() {
for ep in eps.iter_mut() {
// TODO:
ep.update_islocal()?
}
}
Ok(())
}
}
// PoolEndpoints represent endpoints in a given pool
// along with its setCount and setDriveCount.
#[derive(Debug)]
pub struct PoolEndpoints {
// indicates if endpoints are provided in non-ellipses style
pub legacy: bool,
pub set_count: usize,
pub drives_per_set: usize,
pub endpoints: Endpoints,
pub cmd_line: String,
pub platform: String,
}
// EndpointServerPools - list of list of endpoints
#[derive(Debug)]
pub struct EndpointServerPools(Vec<PoolEndpoints>);
impl EndpointServerPools {
pub fn new() -> Self {
Self(Vec::new())
}
pub fn first_is_local(&self) -> bool {
if self.0.is_empty() {
return false;
}
return self.0[0].endpoints.0[0].is_local;
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn iter(&self) -> core::slice::Iter<'_, PoolEndpoints> {
return self.0.iter();
}
pub fn push(&mut self, pes: PoolEndpoints) {
self.0.push(pes)
}
pub fn add(&mut self, eps: PoolEndpoints) -> Result<(), Error> {
let mut exits = new_string_set();
for peps in self.0.iter() {
for ep in peps.endpoints.0.iter() {
exits.add(ep.to_string());
}
}
for ep in eps.endpoints.0.iter() {
if exits.contains(&ep.to_string()) {
return Err(Error::msg("endpoints exists"));
}
}
self.0.push(eps);
Ok(())
}
pub fn get_nodes(&self) -> Vec<Node> {
let mut node_map = HashMap::new();
for pool in self.iter() {
for ep in pool.endpoints.iter() {
let mut node = Node {
url: ep.url.clone(),
pools: vec![],
is_local: ep.is_local,
grid_host: ep.grid_host(),
};
if !node.pools.contains(&ep.pool_idx) {
node.pools.push(ep.pool_idx)
}
node_map.insert(node.grid_host.clone(), node);
}
}
let mut nodes: Vec<Node> = node_map.into_iter().map(|(_, n)| n).collect();
nodes.sort_by(|a, b| a.cmp(b));
nodes
}
}
#[derive(Debug)]
pub enum SetupType {
// UnknownSetupType - starts with unknown setup type.
UnknownSetupType,
// FSSetupType - FS setup type enum.
FSSetupType,
// ErasureSDSetupType - Erasure single drive setup enum.
ErasureSDSetupType,
// ErasureSetupType - Erasure setup type enum.
ErasureSetupType,
// DistErasureSetupType - Distributed Erasure setup type enum.
DistErasureSetupType,
}
fn is_empty_layout(pools_layout: &Vec<PoolDisksLayout>) -> bool {
if pools_layout.is_empty() {
return true;
}
let first_layout = &pools_layout[0];
if first_layout.layout.is_empty()
|| first_layout.layout[0].is_empty()
|| first_layout.layout[0][0].is_empty()
{
return true;
}
false
}
// 检查是否是单驱动器布局
fn is_single_drive_layout(pools_layout: &Vec<PoolDisksLayout>) -> bool {
if pools_layout.len() == 1
&& pools_layout[0].layout.len() == 1
&& pools_layout[0].layout[0].len() == 1
{
true
} else {
false
}
}
pub fn create_pool_endpoints(
server_addr: String,
pools: &Vec<PoolDisksLayout>,
) -> Result<(Vec<Endpoints>, SetupType), Error> {
if is_empty_layout(pools) {
return Err(Error::msg("empty layout"));
}
// TODO: CheckLocalServerAddr
if is_single_drive_layout(pools) {
let mut endpoint = Endpoint::new(pools[0].layout[0][0].as_str())?;
endpoint.update_islocal()?;
if endpoint.get_type() != EndpointType::PathEndpointType {
return Err(Error::msg("use path style endpoint for single node setup"));
}
endpoint.set_pool_index(0);
endpoint.set_set_index(0);
endpoint.set_disk_index(0);
let mut endpoints = Vec::new();
endpoints.push(endpoint);
// TODO: checkCrossDeviceMounts
return Ok((vec![Endpoints(endpoints)], SetupType::ErasureSDSetupType));
}
let mut ret = Vec::with_capacity(pools.len());
for (pool_idx, pool) in pools.iter().enumerate() {
let mut endpoints = Endpoints::new();
for (set_idx, set_layout) in pool.layout.iter().enumerate() {
let mut eps = Endpoints::from_args(set_layout.to_owned())?;
// TODO: checkCrossDeviceMounts
for (disk_idx, ep) in eps.0.iter_mut().enumerate() {
ep.set_pool_index(pool_idx as i32);
ep.set_set_index(set_idx as i32);
ep.set_disk_index(disk_idx as i32);
endpoints.0.push(ep.to_owned());
}
}
if endpoints.0.is_empty() {
return Err(Error::msg("invalid number of endpoints"));
}
ret.push(endpoints);
}
// TODO:
PoolEndpointList::from_vec(ret.clone()).update_is_local()?;
let mut setup_type = SetupType::UnknownSetupType;
// TODO: parse server port
let (_, server_port) = split_host_port(server_addr.as_str())?;
let mut uniq_host = new_string_set();
for (_i, eps) in ret.iter_mut().enumerate() {
// TODO: 一些验证参考原m
for ep in eps.0.iter() {
if !ep.url.has_host() {
uniq_host.add(format!("localhost:{}", server_port));
} else {
// uniq_host.add(ep.url.domain().)
}
}
}
let erasure_type = uniq_host.to_slice().len() == 1;
for eps in ret.iter() {
if eps.0[0].get_type() == EndpointType::PathEndpointType {
setup_type = SetupType::ErasureSetupType;
break;
}
if eps.0[0].get_type() == EndpointType::URLEndpointType {
if erasure_type {
setup_type = SetupType::ErasureSetupType;
} else {
setup_type = SetupType::DistErasureSetupType;
}
break;
}
}
Ok((ret, setup_type))
}
// create_server_endpoints
pub fn create_server_endpoints(
server_addr: String,
pool_args: &Vec<PoolDisksLayout>,
legacy: bool,
) -> Result<(EndpointServerPools, SetupType), Error> {
if pool_args.is_empty() {
return Err(Error::msg("无效参数"));
}
let (pooleps, setup_type) = create_pool_endpoints(server_addr, pool_args)?;
let mut ret = EndpointServerPools::new();
for (i, eps) in pooleps.iter().enumerate() {
let ep = PoolEndpoints {
legacy: legacy,
set_count: pool_args[i].layout.len(),
drives_per_set: pool_args[i].layout[0].len(),
endpoints: eps.clone(),
cmd_line: pool_args[i].cmdline.clone(),
platform: String::new(),
};
ret.add(ep)?;
}
Ok((ret, setup_type))
}
#[cfg(test)]
mod test {
use crate::disks_layout::DisksLayout;
use super::*;
#[test]
fn test_create_server_endpoints() {
let cases = vec![(
":9000",
vec![
// "/Users/weisd/fs".to_string(),
"http://localhost:900{1...2}/export{1...64}".to_string(),
],
)];
for (addr, args) in cases {
let layouts = DisksLayout::new(args).unwrap();
println!("layouts:{:?},{}", &layouts.pools, &layouts.legacy);
let (server_pool, setup_type) =
create_server_endpoints(addr.to_string(), &layouts.pools, layouts.legacy).unwrap();
println!("setup_type -- {:?}", setup_type);
println!("server_pool == {:?}", server_pool);
}
// create_server_endpoints(server_addr, pool_args, legacy)
}
}

9
ecstore/src/lib.rs Normal file
View File

@@ -0,0 +1,9 @@
mod disks_layout;
mod ellipses;
mod endpoint;
mod erasure;
mod error;
pub mod s3;
pub mod store;
mod stream;
mod utils;

276
ecstore/src/s3.rs Normal file
View File

@@ -0,0 +1,276 @@
use s3s::dto::*;
use s3s::s3_error;
use s3s::S3Result;
use s3s::S3;
use s3s::{S3Request, S3Response};
use crate::store::ECStore;
#[async_trait::async_trait]
impl S3 for ECStore {
#[tracing::instrument]
async fn create_bucket(
&self,
req: S3Request<CreateBucketInput>,
) -> S3Result<S3Response<CreateBucketOutput>> {
let input = req.input;
let output = CreateBucketOutput::default(); // TODO: handle other fields
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn copy_object(
&self,
req: S3Request<CopyObjectInput>,
) -> S3Result<S3Response<CopyObjectOutput>> {
let input = req.input;
let (bucket, key) = match input.copy_source {
CopySource::AccessPoint { .. } => return Err(s3_error!(NotImplemented)),
CopySource::Bucket {
ref bucket,
ref key,
..
} => (bucket, key),
};
let output = CopyObjectOutput {
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn delete_bucket(
&self,
req: S3Request<DeleteBucketInput>,
) -> S3Result<S3Response<DeleteBucketOutput>> {
let input = req.input;
Ok(S3Response::new(DeleteBucketOutput {}))
}
#[tracing::instrument]
async fn delete_object(
&self,
req: S3Request<DeleteObjectInput>,
) -> S3Result<S3Response<DeleteObjectOutput>> {
let input = req.input;
let output = DeleteObjectOutput::default(); // TODO: handle other fields
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn delete_objects(
&self,
req: S3Request<DeleteObjectsInput>,
) -> S3Result<S3Response<DeleteObjectsOutput>> {
let input = req.input;
let output = DeleteObjectsOutput {
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn get_bucket_location(
&self,
req: S3Request<GetBucketLocationInput>,
) -> S3Result<S3Response<GetBucketLocationOutput>> {
let input = req.input;
let output = GetBucketLocationOutput::default();
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn get_object(
&self,
req: S3Request<GetObjectInput>,
) -> S3Result<S3Response<GetObjectOutput>> {
let input = req.input;
let output = GetObjectOutput {
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn head_bucket(
&self,
req: S3Request<HeadBucketInput>,
) -> S3Result<S3Response<HeadBucketOutput>> {
let input = req.input;
Ok(S3Response::new(HeadBucketOutput::default()))
}
#[tracing::instrument]
async fn head_object(
&self,
req: S3Request<HeadObjectInput>,
) -> S3Result<S3Response<HeadObjectOutput>> {
let input = req.input;
let output = HeadObjectOutput {
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn list_buckets(
&self,
_: S3Request<ListBucketsInput>,
) -> S3Result<S3Response<ListBucketsOutput>> {
let output = ListBucketsOutput {
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn list_objects(
&self,
req: S3Request<ListObjectsInput>,
) -> S3Result<S3Response<ListObjectsOutput>> {
let v2_resp = self.list_objects_v2(req.map_input(Into::into)).await?;
Ok(v2_resp.map_output(|v2| ListObjectsOutput {
contents: v2.contents,
delimiter: v2.delimiter,
encoding_type: v2.encoding_type,
name: v2.name,
prefix: v2.prefix,
max_keys: v2.max_keys,
..Default::default()
}))
}
#[tracing::instrument]
async fn list_objects_v2(
&self,
req: S3Request<ListObjectsV2Input>,
) -> S3Result<S3Response<ListObjectsV2Output>> {
let input = req.input;
let output = ListObjectsV2Output {
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn put_object(
&self,
req: S3Request<PutObjectInput>,
) -> S3Result<S3Response<PutObjectOutput>> {
let input = req.input;
let output = PutObjectOutput {
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn create_multipart_upload(
&self,
req: S3Request<CreateMultipartUploadInput>,
) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
let input = req.input;
let output = CreateMultipartUploadOutput {
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn upload_part(
&self,
req: S3Request<UploadPartInput>,
) -> S3Result<S3Response<UploadPartOutput>> {
let UploadPartInput {
body,
upload_id,
part_number,
..
} = req.input;
let output = UploadPartOutput {
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn upload_part_copy(
&self,
req: S3Request<UploadPartCopyInput>,
) -> S3Result<S3Response<UploadPartCopyOutput>> {
let input = req.input;
let output = UploadPartCopyOutput {
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn list_parts(
&self,
req: S3Request<ListPartsInput>,
) -> S3Result<S3Response<ListPartsOutput>> {
let ListPartsInput {
bucket,
key,
upload_id,
..
} = req.input;
let output = ListPartsOutput {
bucket: Some(bucket),
key: Some(key),
upload_id: Some(upload_id),
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn complete_multipart_upload(
&self,
req: S3Request<CompleteMultipartUploadInput>,
) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
let CompleteMultipartUploadInput {
multipart_upload,
bucket,
key,
upload_id,
..
} = req.input;
let output = CompleteMultipartUploadOutput {
bucket: Some(bucket),
key: Some(key),
..Default::default()
};
Ok(S3Response::new(output))
}
#[tracing::instrument]
async fn abort_multipart_upload(
&self,
req: S3Request<AbortMultipartUploadInput>,
) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
Ok(S3Response::new(AbortMultipartUploadOutput {
..Default::default()
}))
}
}

View File

@@ -1,18 +1,28 @@
use super::endpoint::Endpoint;
use crate::endpoint::EndpointServerPools;
pub struct Store {
use std::fmt::Debug;
#[derive(Debug)]
pub struct ECStore {
pub id: uuid::Uuid,
pub disks: Vec<Box<dyn DiskAPI>>,
pub pools: Vec<Sets>,
pub peer: Vec<String>,
}
impl Store {}
impl ECStore {
pub fn new(endpoints: EndpointServerPools) {
unimplemented!()
}
}
#[derive(Debug)]
pub struct Sets {
pub sets: Vec<Objects>,
}
#[derive(Debug)]
pub struct Objects {
pub endpoints: Vec<Endpoint>,
pub disks: Vec<usize>,
@@ -22,6 +32,7 @@ pub struct Objects {
pub default_parity_count: usize,
}
trait DiskAPI {}
#[async_trait::async_trait]
trait DiskAPI: Debug + Send + Sync + 'static {}
pub trait StorageAPI {}

2
ecstore/src/utils/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod net;
pub mod string;

94
ecstore/src/utils/net.rs Normal file
View File

@@ -0,0 +1,94 @@
use std::{
collections::HashMap,
net::{IpAddr, ToSocketAddrs},
};
use anyhow::Error;
use netif;
use url::Host;
pub fn split_host_port(s: &str) -> Result<(String, u16), Error> {
let parts: Vec<&str> = s.split(':').collect();
if parts.len() == 2 {
if let Ok(port) = parts[1].parse::<u16>() {
return Ok((parts[0].to_string(), port));
}
}
Err(Error::msg("Invalid address format or port number"))
}
// is_local_host 判断是否是本地ip
pub fn is_local_host(host: Host<&str>, port: u16, local_port: u16) -> bool {
let local_ips = must_get_local_ips();
let local_map =
local_ips
.iter()
.map(|ip| ip.to_string())
.fold(HashMap::new(), |mut acc, item| {
*acc.entry(item).or_insert(true) = true;
acc
});
let is_local_host = match host {
Host::Domain(domain) => {
let ips: Vec<String> = (domain, 0)
.to_socket_addrs()
.unwrap_or(Vec::new().into_iter())
.map(|addr| addr.ip().to_string())
.collect();
let mut isok = false;
for ip in ips.iter() {
if local_map.contains_key(ip) {
isok = true;
break;
}
}
isok
}
Host::Ipv4(ip) => local_map.contains_key(&ip.to_string()),
Host::Ipv6(ip) => local_map.contains_key(&ip.to_string()),
};
if port > 0 {
return is_local_host && port == local_port;
}
is_local_host
}
pub fn must_get_local_ips() -> Vec<IpAddr> {
let mut v: Vec<IpAddr> = Vec::new();
if let Some(up) = netif::up().ok() {
v = up.map(|x| x.address().to_owned()).collect();
}
v
}
#[cfg(test)]
mod test {
use std::net::Ipv4Addr;
use super::*;
#[test]
fn test_must_get_local_ips() {
let ips = must_get_local_ips();
for ip in ips.iter() {
println!("{:?}", ip)
}
}
#[test]
fn test_is_local_host() {
// let host = Host::Ipv4(Ipv4Addr::new(192, 168, 0, 233));
let host = Host::Ipv4(Ipv4Addr::new(127, 0, 0, 1));
// let host = Host::Domain("localhost");
let port = 0;
let local_port = 9000;
let is = is_local_host(host, port, local_port);
assert!(is)
}
}

133
ecstore/src/utils/string.rs Normal file
View File

@@ -0,0 +1,133 @@
use std::collections::HashMap;
use std::fmt;
#[derive(Debug, Clone)]
pub struct StringSet(HashMap<String, ()>);
impl StringSet {
// ToSlice - returns StringSet as a vector of strings.
pub fn to_slice(&self) -> Vec<String> {
let mut keys = self.0.keys().cloned().collect::<Vec<String>>();
keys.sort();
keys
}
// IsEmpty - returns whether the set is empty or not.
pub fn is_empty(&self) -> bool {
self.0.len() == 0
}
// Add - adds a string to the set.
pub fn add(&mut self, s: String) {
self.0.insert(s, ());
}
// Remove - removes a string from the set. It does nothing if the string does not exist in the set.
pub fn remove(&mut self, s: &str) {
self.0.remove(s);
}
// Contains - checks if a string is in the set.
pub fn contains(&self, s: &str) -> bool {
self.0.contains_key(s)
}
// FuncMatch - returns a new set containing each value that passes the match function.
pub fn func_match<F>(&self, match_fn: F, match_string: &str) -> StringSet
where
F: Fn(&str, &str) -> bool,
{
StringSet(
self.0
.iter()
.filter(|(k, _)| match_fn(k, match_string))
.map(|(k, _)| (k.clone(), ()))
.collect::<HashMap<String, ()>>(),
)
}
// ApplyFunc - returns a new set containing each value processed by 'apply_fn'.
pub fn apply_func<F>(&self, apply_fn: F) -> StringSet
where
F: Fn(&str) -> String,
{
StringSet(
self.0
.iter()
.map(|(k, _)| (apply_fn(k), ()))
.collect::<HashMap<String, ()>>(),
)
}
// Equals - checks whether the given set is equal to the current set or not.
pub fn equals(&self, other: &StringSet) -> bool {
if self.0.len() != other.0.len() {
return false;
}
self.0.iter().all(|(k, _)| other.0.contains_key(k))
}
// Intersection - returns the intersection with the given set as a new set.
pub fn intersection(&self, other: &StringSet) -> StringSet {
StringSet(
self.0
.iter()
.filter(|(k, _)| other.0.contains_key::<String>(k))
.map(|(k, _)| (k.clone(), ()))
.collect::<HashMap<String, ()>>(),
)
}
// Difference - returns the difference with the given set as a new set.
pub fn difference(&self, other: &StringSet) -> StringSet {
StringSet(
self.0
.iter()
.filter(|(k, _)| !other.0.contains_key::<String>(k))
.map(|(k, _)| (k.clone(), ()))
.collect::<HashMap<String, ()>>(),
)
}
// Union - returns the union with the given set as a new set.
pub fn union(&self, other: &StringSet) -> StringSet {
let mut new_set = self.clone();
for (k, _) in other.0.iter() {
new_set.0.insert(k.clone(), ());
}
new_set
}
}
// Implementing JSON serialization and deserialization would require the serde crate.
// You would also need to implement Display and PartialEq traits for more idiomatic Rust.
// Implementing Display trait to provide a string representation of the set.
impl fmt::Display for StringSet {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.to_slice().join(", "))
}
}
// Implementing PartialEq and Eq traits to allow comparison of StringSet instances.
impl PartialEq for StringSet {
fn eq(&self, other: &StringSet) -> bool {
self.equals(other)
}
}
impl Eq for StringSet {}
// NewStringSet - creates a new string set.
pub fn new_string_set() -> StringSet {
StringSet(HashMap::new())
}
// CreateStringSet - creates a new string set with given string values.
pub fn create_string_set(sl: Vec<String>) -> StringSet {
let mut set = new_string_set();
for k in sl {
set.add(k);
}
set
}

View File

@@ -8,7 +8,24 @@ rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bin]]
name = "rustfs"
required-features = ["binary"]
[features]
binary = ["tokio/full", "dep:clap", "dep:tracing-subscriber", "dep:hyper-util"]
[dependencies]
store = { path = "../store" }
tracing-subscriber = { version = "0.3.18" }
time = { workspace = true, features = ["parsing", "formatting"] }
ecstore = { path = "../ecstore" }
# time = { workspace = true, features = ["parsing", "formatting"] }
# s3s = "0.10.0"
hyper-util = { version = "0.1.5", optional = true, features = ["server-auto", "server-graceful", "http1", "http2", "tokio"] }
tokio.workspace = true
clap = { version = "4.5.7", optional = true, features = ["derive"] }
# tracing.workspace = true
# tokio-util.workspace = true
tracing-subscriber = { version = "0.3.18", optional = true, features = ["env-filter", "time"] }
s3s = "0.10.0"
# async-trait = "0.1.80"

View File

@@ -1,3 +1,160 @@
fn main() {
println!("Hello, world!");
#![forbid(unsafe_code)]
#![deny(clippy::all, clippy::pedantic)]
use s3s_fs::FileSystem;
use s3s_fs::Result;
use s3s::auth::SimpleAuth;
use s3s::service::S3ServiceBuilder;
use std::io::IsTerminal;
use std::path::PathBuf;
use tokio::net::TcpListener;
use clap::{CommandFactory, Parser};
use tracing::info;
use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server::conn::auto::Builder as ConnBuilder;
#[derive(Debug, Parser)]
#[command(version)]
struct Opt {
/// Host name to listen on.
#[arg(long, default_value = "localhost")]
host: String,
/// Port number to listen on.
#[arg(long, default_value = "8014")] // The original design was finished on 2020-08-14.
port: u16,
/// Access key used for authentication.
#[arg(long)]
access_key: Option<String>,
/// Secret key used for authentication.
#[arg(long)]
secret_key: Option<String>,
/// Domain name used for virtual-hosted-style requests.
#[arg(long)]
domain_name: Option<String>,
/// Root directory of stored data.
root: PathBuf,
}
fn setup_tracing() {
use tracing_subscriber::EnvFilter;
let env_filter = EnvFilter::from_default_env();
let enable_color = std::io::stdout().is_terminal();
tracing_subscriber::fmt()
.pretty()
.with_env_filter(env_filter)
.with_ansi(enable_color)
.init();
}
fn check_cli_args(opt: &Opt) {
use clap::error::ErrorKind;
let mut cmd = Opt::command();
// TODO: how to specify the requirements with clap derive API?
if let (Some(_), None) | (None, Some(_)) = (&opt.access_key, &opt.secret_key) {
let msg = "access key and secret key must be specified together";
cmd.error(ErrorKind::MissingRequiredArgument, msg).exit();
}
if let Some(ref s) = opt.domain_name {
if s.contains('/') {
let msg = format!("expected domain name, found URL-like string: {s:?}");
cmd.error(ErrorKind::InvalidValue, msg).exit();
}
}
}
fn main() -> Result {
let opt = Opt::parse();
check_cli_args(&opt);
setup_tracing();
run(opt)
}
#[tokio::main]
async fn run(opt: Opt) -> Result {
// Setup S3 provider
let fs = FileSystem::new(opt.root)?;
// Setup S3 service
let service = {
let mut b = S3ServiceBuilder::new(fs);
// Enable authentication
if let (Some(ak), Some(sk)) = (opt.access_key, opt.secret_key) {
b.set_auth(SimpleAuth::from_single(ak, sk));
info!("authentication is enabled");
}
// Enable parsing virtual-hosted-style requests
if let Some(domain_name) = opt.domain_name {
b.set_base_domain(domain_name);
info!("virtual-hosted-style requests are enabled");
}
b.build()
};
// Run server
let listener = TcpListener::bind((opt.host.as_str(), opt.port)).await?;
let local_addr = listener.local_addr()?;
let hyper_service = service.into_shared();
let http_server = ConnBuilder::new(TokioExecutor::new());
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());
info!("server is running at http://{local_addr}");
loop {
let (socket, _) = tokio::select! {
res = listener.accept() => {
match res {
Ok(conn) => conn,
Err(err) => {
tracing::error!("error accepting connection: {err}");
continue;
}
}
}
_ = ctrl_c.as_mut() => {
break;
}
};
let conn = http_server.serve_connection(TokioIo::new(socket), hyper_service.clone());
let conn = graceful.watch(conn.into_owned());
tokio::spawn(async move {
let _ = conn.await;
});
}
tokio::select! {
() = graceful.shutdown() => {
tracing::debug!("Gracefully shutdown!");
},
() = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
tracing::debug!("Waited 10 seconds for graceful shutdown, aborting...");
}
}
info!("server is stopped");
Ok(())
}

View File

@@ -1,22 +0,0 @@
pub struct EndpointServerPools(Vec<PoolEndpoints>);
pub struct PoolEndpoints {
// indicates if endpoints are provided in non-ellipses style
pub legacy: bool,
pub set_count: usize,
pub drives_per_set: usize,
pub endpoints: Endpoints,
pub cmd_line: String,
pub platform: String,
}
pub struct Endpoints(Vec<Endpoint>);
pub struct Endpoint {
pub url: url::Url,
pub is_local: bool,
pub pool_idx: i32,
pub set_idx: i32,
pub disk_idx: i32,
}

View File

@@ -1,5 +0,0 @@
mod endpoint;
mod erasure;
mod error;
mod store;
mod stream;