add env RUSTFS_ERASURE_SET_DRIVE_COUNT

This commit is contained in:
weisd
2024-10-05 12:33:39 +08:00
parent 10c63effd4
commit 46f7c010ca
14 changed files with 161 additions and 189 deletions

View File

@@ -21,8 +21,6 @@ use crate::error::{Error, Result};
use crate::disk::BUCKET_META_PREFIX;
use crate::store::ECStore;
type TypeConfigFile = &'static str;
pub const BUCKET_METADATA_FILE: &str = ".metadata.bin";
pub const BUCKET_METADATA_FORMAT: u16 = 1;
pub const BUCKET_METADATA_VERSION: u16 = 1;
@@ -318,7 +316,7 @@ async fn read_bucket_metadata(api: &ECStore, bucket: &str) -> Result<BucketMetad
Ok(bm)
}
fn _deserialize_from_str<'de, S, D>(deserializer: D) -> core::result::Result<S, D::Error>
fn _deserialize_from_str<'de, S, D>(_deserializer: D) -> core::result::Result<S, D::Error>
where
S: FromStr,
S::Err: Display,

View File

@@ -14,7 +14,7 @@ use futures::future::join_all;
use lazy_static::lazy_static;
use time::OffsetDateTime;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use tracing::{error, warn};
use super::metadata::{load_bucket_metadata, BucketMetadata};
use super::tags;
@@ -118,10 +118,6 @@ impl BucketMetadataSys {
}
}
async fn refresh_buckets_metadata_loop(&self, failed_buckets: &HashSet<String>) -> Result<()> {
unimplemented!()
}
pub async fn get(&self, bucket: &str) -> Result<BucketMetadata> {
if is_meta_bucketname(bucket) {
return Err(Error::new(ConfigError::NotFound));
@@ -142,10 +138,10 @@ impl BucketMetadataSys {
}
}
async fn reset(&mut self) {
let mut map = self.metadata_map.write().await;
map.clear();
}
// async fn reset(&mut self) {
// let mut map = self.metadata_map.write().await;
// map.clear();
// }
pub async fn update(&mut self, bucket: &str, config_file: &str, data: Vec<u8>) -> Result<OffsetDateTime> {
self.update_and_parse(bucket, config_file, data, true).await

View File

@@ -10,9 +10,9 @@ pub enum ResourceARNType {
ResourceARNKMS,
}
// 定义资源ARN前缀
const RESOURCE_ARN_PREFIX: &str = "arn:aws:s3:::";
const RESOURCE_ARN_KMS_PREFIX: &str = "arn:rustfs:kms::::";
// // 定义资源ARN前缀
// const RESOURCE_ARN_PREFIX: &str = "arn:aws:s3:::";
// const RESOURCE_ARN_KMS_PREFIX: &str = "arn:rustfs:kms::::";
// 定义Resource结构体
#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)]

View File

@@ -1,65 +0,0 @@
use std::collections::HashMap;
use rmp_serde::Serializer;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use crate::error::Result;
use crate::disk::BUCKET_META_PREFIX;
pub const BUCKET_METADATA_FILE: &str = ".metadata.bin";
pub const BUCKET_METADATA_FORMAT: u16 = 1;
pub const BUCKET_METADATA_VERSION: u16 = 1;
#[derive(Debug, PartialEq, Deserialize, Serialize, Default)]
pub struct BucketMetadata {
pub format: u16,
pub version: u16,
pub name: String,
pub tagging: Option<HashMap<String, String>>,
pub created: Option<OffsetDateTime>,
}
// impl Default for BucketMetadata {
// fn default() -> Self {
// Self {
// format: Default::default(),
// version: Default::default(),
// name: Default::default(),
// created: OffsetDateTime::now_utc(),
// }
// }
// }
impl BucketMetadata {
pub fn new(name: &str) -> Self {
BucketMetadata {
format: BUCKET_METADATA_FORMAT,
version: BUCKET_METADATA_VERSION,
name: name.to_string(),
..Default::default()
}
}
pub fn save_file_path(&self) -> String {
format!("{}/{}/{}", BUCKET_META_PREFIX, self.name.as_str(), BUCKET_METADATA_FILE)
// PathBuf::new()
// .join(BUCKET_META_PREFIX)
// .join(self.name.as_str())
// .join(BUCKET_METADATA_FILE)
}
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut Serializer::new(&mut buf))?;
Ok(buf)
}
pub fn unmarshal_from(buf: &[u8]) -> Result<Self> {
let t: BucketMetadata = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -5,7 +5,6 @@ use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectInfo, ObjectOptions, PutOb
use http::HeaderMap;
use s3s::dto::StreamingBlob;
use s3s::Body;
use tracing::warn;
use super::error::ConfigError;

View File

@@ -2,10 +2,13 @@ use crate::error::{Error, Result};
use crate::utils::ellipses::*;
use serde::Deserialize;
use std::collections::HashSet;
use std::env;
use tracing::debug;
/// Supported set sizes this is used to find the optimal
/// single set size.
const SET_SIZES: [usize; 15] = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
const ENV_RUSTFS_ERASURE_SET_DRIVE_COUNT: &str = "RUSTFS_ERASURE_SET_DRIVE_COUNT";
#[derive(Deserialize, Debug, Default)]
pub struct PoolDisksLayout {
@@ -40,19 +43,69 @@ pub struct DisksLayout {
pub pools: Vec<PoolDisksLayout>,
}
impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
type Error = Error;
// impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
// type Error = Error;
fn try_from(args: &[T]) -> Result<Self, Self::Error> {
// fn try_from(args: &[T]) -> Result<Self, Self::Error> {
// if args.is_empty() {
// return Err(Error::from_string("Invalid argument"));
// }
// let is_ellipses = args.iter().any(|v| has_ellipses(&[v]));
// // None of the args have ellipses use the old style.
// if !is_ellipses {
// let set_args = get_all_sets(is_ellipses, args)?;
// return Ok(DisksLayout {
// legacy: true,
// pools: vec![PoolDisksLayout::new(
// args.iter().map(AsRef::as_ref).collect::<Vec<&str>>().join(" "),
// set_args,
// )],
// });
// }
// let mut layout = Vec::with_capacity(args.len());
// for arg in args.iter() {
// if !has_ellipses(&[arg]) && args.len() > 1 {
// return Err(Error::from_string(
// "all args must have ellipses for pool expansion (Invalid arguments specified)",
// ));
// }
// let set_args = get_all_sets(is_ellipses, &[arg])?;
// layout.push(PoolDisksLayout::new(arg.as_ref(), set_args));
// }
// Ok(DisksLayout {
// legacy: false,
// pools: layout,
// })
// }
// }
impl DisksLayout {
pub fn from_volumes<T: AsRef<str>>(args: &[T]) -> Result<Self> {
if args.is_empty() {
return Err(Error::from_string("Invalid argument"));
}
let is_ellipses = args.iter().any(|v| has_ellipses(&[v]));
let set_drive_count_env = match env::var(ENV_RUSTFS_ERASURE_SET_DRIVE_COUNT) {
Ok(res) => res,
Err(err) => {
debug!("{} not set use default:0, {:?}", ENV_RUSTFS_ERASURE_SET_DRIVE_COUNT, err);
format!("0")
}
};
let set_drive_count: usize = set_drive_count_env.parse()?;
// None of the args have ellipses use the old style.
if !is_ellipses {
let set_args = get_all_sets(is_ellipses, args)?;
let set_args = get_all_sets(set_drive_count, is_ellipses, args)?;
return Ok(DisksLayout {
legacy: true,
@@ -71,7 +124,7 @@ impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
));
}
let set_args = get_all_sets(is_ellipses, &[arg])?;
let set_args = get_all_sets(set_drive_count, is_ellipses, &[arg])?;
layout.push(PoolDisksLayout::new(arg.as_ref(), set_args));
}
@@ -81,9 +134,7 @@ impl<T: AsRef<str>> TryFrom<&[T]> for DisksLayout {
pools: layout,
})
}
}
impl DisksLayout {
pub fn is_empty_layout(&self) -> bool {
self.pools.is_empty()
|| self.pools[0].layout.is_empty()
@@ -121,12 +172,12 @@ impl DisksLayout {
///
/// For example: {1...64} is divided into 4 sets each of size 16.
/// This applies to even distributed setup syntax as well.
fn get_all_sets<T: AsRef<str>>(is_ellipses: bool, args: &[T]) -> Result<Vec<Vec<String>>> {
fn get_all_sets<T: AsRef<str>>(set_drive_count: usize, is_ellipses: bool, args: &[T]) -> Result<Vec<Vec<String>>> {
let endpoint_set = if is_ellipses {
EndpointSet::try_from(args)?
EndpointSet::from_volumes(args, set_drive_count)?
} else {
let set_indexes = if args.len() > 1 {
get_set_indexes(args, &[args.len()], &[])?
get_set_indexes(args, &[args.len()], set_drive_count, &[])?
} else {
vec![vec![args.len()]]
};
@@ -159,17 +210,52 @@ struct EndpointSet {
set_indexes: Vec<Vec<usize>>,
}
impl<T: AsRef<str>> TryFrom<&[T]> for EndpointSet {
type Error = Error;
// impl<T: AsRef<str>> TryFrom<&[T]> for EndpointSet {
// type Error = Error;
fn try_from(args: &[T]) -> Result<Self, Self::Error> {
// fn try_from(args: &[T]) -> Result<Self, Self::Error> {
// let mut arg_patterns = Vec::with_capacity(args.len());
// for arg in args {
// arg_patterns.push(find_ellipses_patterns(arg.as_ref())?);
// }
// let total_sizes = get_total_sizes(&arg_patterns);
// let set_indexes = get_set_indexes(args, &total_sizes, &arg_patterns)?;
// let mut endpoints = Vec::new();
// for ap in arg_patterns.iter() {
// let aps = ap.expand();
// for bs in aps {
// endpoints.push(bs.join(""));
// }
// }
// Ok(EndpointSet {
// set_indexes,
// _arg_patterns: arg_patterns,
// endpoints,
// })
// }
// }
impl EndpointSet {
/// Create a new EndpointSet with the given endpoints and set indexes.
pub fn new(endpoints: Vec<String>, set_indexes: Vec<Vec<usize>>) -> Self {
Self {
endpoints,
set_indexes,
..Default::default()
}
}
pub fn from_volumes<T: AsRef<str>>(args: &[T], set_drive_count: usize) -> Result<Self, Error> {
let mut arg_patterns = Vec::with_capacity(args.len());
for arg in args {
arg_patterns.push(find_ellipses_patterns(arg.as_ref())?);
}
let total_sizes = get_total_sizes(&arg_patterns);
let set_indexes = get_set_indexes(args, &total_sizes, &arg_patterns)?;
let set_indexes = get_set_indexes(args, &total_sizes, set_drive_count, &arg_patterns)?;
let mut endpoints = Vec::new();
for ap in arg_patterns.iter() {
@@ -185,17 +271,6 @@ impl<T: AsRef<str>> TryFrom<&[T]> for EndpointSet {
endpoints,
})
}
}
impl EndpointSet {
/// Create a new EndpointSet with the given endpoints and set indexes.
pub fn new(endpoints: Vec<String>, set_indexes: Vec<Vec<usize>>) -> Self {
Self {
endpoints,
set_indexes,
..Default::default()
}
}
/// returns the sets representation of the endpoints
/// this function also intelligently decides on what will
@@ -298,14 +373,19 @@ fn possible_set_counts_with_symmetry(set_counts: &[usize], arg_patterns: &[ArgPa
/// on each index, this function also determines the final set size
/// The final set size has the affinity towards choosing smaller
/// indexes (total sets)
fn get_set_indexes<T: AsRef<str>>(args: &[T], total_sizes: &[usize], arg_patterns: &[ArgPattern]) -> Result<Vec<Vec<usize>>> {
fn get_set_indexes<T: AsRef<str>>(
args: &[T],
total_sizes: &[usize],
set_drive_count: usize,
arg_patterns: &[ArgPattern],
) -> Result<Vec<Vec<usize>>> {
if args.is_empty() || total_sizes.is_empty() {
return Err(Error::from_string("Invalid argument"));
}
for &size in total_sizes {
// Check if total_sizes has minimum range upto set_size
if size < SET_SIZES[0] {
if size < SET_SIZES[0] || size < set_drive_count {
return Err(Error::from_string(format!("Incorrect number of endpoints provided, size {}", size)));
}
}
@@ -319,16 +399,37 @@ fn get_set_indexes<T: AsRef<str>>(args: &[T], total_sizes: &[usize], arg_pattern
)));
}
// TODO Add custom set drive count
// Returns possible set counts with symmetry.
set_counts = possible_set_counts_with_symmetry(&set_counts, arg_patterns);
if set_counts.is_empty() {
return Err(Error::from_string("No symmetric distribution detected with input endpoints provided"));
}
// Final set size with all the symmetry accounted for.
let set_size = common_set_drive_count(common_size, &set_counts);
let set_size = {
if set_drive_count > 0 {
let has_set_drive_count = set_counts.contains(&set_drive_count);
if !has_set_drive_count {
return Err(Error::from_string(format!(
"Invalid set drive count. Acceptable values for {:?} number drives are {:?}",
common_size, &set_counts
)));
}
set_drive_count
} else {
set_counts = possible_set_counts_with_symmetry(&set_counts, arg_patterns);
if set_counts.is_empty() {
return Err(Error::from_string(format!(
"No symmetric distribution detected with input endpoints , drives {} cannot be spread symmetrically by any supported erasure set sizes {:?}",
common_size, &set_counts
)));
}
// Final set size with all the symmetry accounted for.
let set_size = common_set_drive_count(common_size, &set_counts);
set_size
}
};
if !is_valid_set_size(set_size) {
return Err(Error::from_string("Incorrect number of endpoints provided3"));
}
@@ -532,7 +633,7 @@ mod test {
}
}
match get_set_indexes(test_case.args.as_slice(), test_case.total_sizes.as_slice(), arg_patterns.as_slice()) {
match get_set_indexes(test_case.args.as_slice(), test_case.total_sizes.as_slice(), 0, arg_patterns.as_slice()) {
Ok(got_indexes) => {
if !test_case.success {
panic!("Test{}: Expected failure but passed instead", test_case.num);
@@ -792,7 +893,7 @@ mod test {
];
for test_case in test_cases {
match EndpointSet::try_from([test_case.arg].as_slice()) {
match EndpointSet::from_volumes([test_case.arg].as_slice(), 0) {
Ok(got_es) => {
if !test_case.success {
panic!("Test{}: Expected failure but passed instead", test_case.num);

View File

@@ -413,7 +413,7 @@ impl EndpointServerPools {
self.0 = eps;
}
pub fn from_volumes(server_addr: &str, endpoints: Vec<String>) -> Result<(EndpointServerPools, SetupType)> {
let layouts = DisksLayout::try_from(endpoints.as_slice())?;
let layouts = DisksLayout::from_volumes(endpoints.as_slice())?;
Self::create_server_endpoints(server_addr, &layouts)
}
@@ -1139,7 +1139,7 @@ mod test {
];
for test_case in test_cases {
let disks_layout = match DisksLayout::try_from(test_case.args.as_slice()) {
let disks_layout = match DisksLayout::from_volumes(test_case.args.as_slice()) {
Ok(v) => v,
Err(e) => {
if test_case.expected_err.is_none() {
@@ -1244,7 +1244,7 @@ mod test {
];
for (i, test_case) in test_cases.iter().enumerate() {
let disks_layout = match DisksLayout::try_from(test_case.1.as_slice()) {
let disks_layout = match DisksLayout::from_volumes(test_case.1.as_slice()) {
Ok(v) => v,
Err(e) => {
if test_case.2 {

View File

@@ -400,9 +400,10 @@ impl ShardReader {
}
// debug!("ec decode read ress {:?}", &ress);
warn!("ec decode read errors {:?}", &errors);
if !self.can_decode(&ress) {
warn!("ec decode read errors {:?}", &errors);
return Err(Error::msg("shard reader read faild"));
}

View File

@@ -1,4 +1,3 @@
pub mod bucket_meta;
mod chunk_stream;
mod config;
pub mod disk;

View File

@@ -47,7 +47,7 @@ pub struct ECStore {
impl ECStore {
#[allow(clippy::new_ret_no_self)]
pub async fn new(_address: String, endpoint_pools: EndpointServerPools) -> Result<Self> {
// let layouts = DisksLayout::try_from(endpoints.as_slice())?;
// let layouts = DisksLayout::from_volumes(endpoints.as_slice())?;
let mut deployment_id = None;

View File

@@ -1,63 +0,0 @@
use std::{io, task::Poll};
use futures::{ready, Future};
use tokio::io::{AsyncWrite, BufWriter};
use tracing::debug;
use uuid::Uuid;
use crate::disk::DiskStore;
pub struct AppendWriter<'a> {
disk: DiskStore,
volume: &'a str,
path: &'a str,
}
impl<'a> AppendWriter<'a> {
pub fn new(disk: DiskStore, volume: &'a str, path: &'a str) -> Self {
debug!("AppendWriter new {}: {}/{}", disk.id(), volume, path);
Self { disk, volume, path }
}
async fn async_write(&self, buf: &[u8]) -> Result<(), std::io::Error> {
debug!("async_write {}: {}: {}", self.disk.id(), &self.path, buf.len());
// self.disk
// .append_file(&self.volume, &self.path, buf)
// .await
// .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
Ok(())
}
}
impl<'a> AsyncWrite for AppendWriter<'a> {
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
let mut fut = Box::pin(self.async_write(buf));
debug!("AsyncWrite poll_write {}, buf:{}", self.disk.id(), buf.len());
let mut fut = self.get_mut().async_write(buf);
match futures::future::poll_fn(|cx| fut.as_mut().poll(cx)).start(cx) {
Ready(Ok(n)) => Ready(Ok(n)),
Ready(Err(e)) => Ready(Err(e)),
Pending => Pending,
}
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
}
}

View File

@@ -22,7 +22,7 @@ use hyper_util::{
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
use s3s::{auth::SimpleAuth, service::S3ServiceBuilder};
use service::hybrid;
use std::{io::IsTerminal, net::SocketAddr, str::FromStr};
use std::{io::IsTerminal, net::SocketAddr, process::exit, str::FromStr};
use tokio::net::TcpListener;
use tonic::{metadata::MetadataValue, Request, Status};
use tracing::{debug, info};
@@ -93,6 +93,14 @@ async fn run(opt: config::Opt) -> Result<()> {
// 用于rpc
let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(opt.address.clone().as_str(), opt.volumes.clone())
.map_err(|err| Error::from_string(err.to_string()))?;
for (i, eps) in endpoint_pools.as_ref().iter().enumerate() {
debug!(
"created endpoints {}, set_count:{}, drives_per_set: {}, cmd: {:?}",
i, eps.set_count, eps.drives_per_set, eps.cmd_line
);
}
set_global_endpoints(endpoint_pools.as_ref().clone()).await;
update_erasure_type(setup_type).await;

View File

@@ -1,11 +1,8 @@
use bytes::BufMut;
use bytes::Bytes;
use ecstore::bucket::get_bucket_metadata_sys;
use ecstore::bucket::metadata::BUCKET_TAGGING_CONFIG;
use ecstore::bucket::tags::Tags;
use ecstore::bucket_meta::BucketMetadata;
use ecstore::disk::error::DiskError;
use ecstore::disk::RUSTFS_META_BUCKET;
use ecstore::new_object_layer_fn;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::CompletePart;
@@ -24,7 +21,6 @@ use http::HeaderMap;
use log::warn;
use s3s::dto::*;
use s3s::s3_error;
use s3s::Body;
use s3s::S3Error;
use s3s::S3ErrorCode;
use s3s::S3Result;

View File

@@ -10,7 +10,9 @@ if [ -z "$RUST_LOG" ]; then
export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug"
fi
DATA_DIR_ARG="./target/volume/test{0...4}"
export RUSTFS_ERASURE_SET_DRIVE_COUNT=8
DATA_DIR_ARG="./target/volume/test{0...15}"
if [ -n "$1" ]; then
DATA_DIR_ARG="$1"