diff --git a/ecstore/src/bucket/metadata.rs b/ecstore/src/bucket/metadata.rs index 5f6dd388..0097f23e 100644 --- a/ecstore/src/bucket/metadata.rs +++ b/ecstore/src/bucket/metadata.rs @@ -21,8 +21,6 @@ use crate::error::{Error, Result}; use crate::disk::BUCKET_META_PREFIX; use crate::store::ECStore; -type TypeConfigFile = &'static str; - pub const BUCKET_METADATA_FILE: &str = ".metadata.bin"; pub const BUCKET_METADATA_FORMAT: u16 = 1; pub const BUCKET_METADATA_VERSION: u16 = 1; @@ -318,7 +316,7 @@ async fn read_bucket_metadata(api: &ECStore, bucket: &str) -> Result(deserializer: D) -> core::result::Result +fn _deserialize_from_str<'de, S, D>(_deserializer: D) -> core::result::Result where S: FromStr, S::Err: Display, diff --git a/ecstore/src/bucket/metadata_sys.rs b/ecstore/src/bucket/metadata_sys.rs index 2e0711ce..af0316c7 100644 --- a/ecstore/src/bucket/metadata_sys.rs +++ b/ecstore/src/bucket/metadata_sys.rs @@ -14,7 +14,7 @@ use futures::future::join_all; use lazy_static::lazy_static; use time::OffsetDateTime; use tokio::sync::RwLock; -use tracing::{error, info, warn}; +use tracing::{error, warn}; use super::metadata::{load_bucket_metadata, BucketMetadata}; use super::tags; @@ -118,10 +118,6 @@ impl BucketMetadataSys { } } - async fn refresh_buckets_metadata_loop(&self, failed_buckets: &HashSet) -> Result<()> { - unimplemented!() - } - pub async fn get(&self, bucket: &str) -> Result { if is_meta_bucketname(bucket) { return Err(Error::new(ConfigError::NotFound)); @@ -142,10 +138,10 @@ impl BucketMetadataSys { } } - async fn reset(&mut self) { - let mut map = self.metadata_map.write().await; - map.clear(); - } + // async fn reset(&mut self) { + // let mut map = self.metadata_map.write().await; + // map.clear(); + // } pub async fn update(&mut self, bucket: &str, config_file: &str, data: Vec) -> Result { self.update_and_parse(bucket, config_file, data, true).await diff --git a/ecstore/src/bucket/policy/resource.rs b/ecstore/src/bucket/policy/resource.rs index 32ebc5a3..81576b26 100644 --- a/ecstore/src/bucket/policy/resource.rs +++ b/ecstore/src/bucket/policy/resource.rs @@ -10,9 +10,9 @@ pub enum ResourceARNType { ResourceARNKMS, } -// 定义资源ARN前缀 -const RESOURCE_ARN_PREFIX: &str = "arn:aws:s3:::"; -const RESOURCE_ARN_KMS_PREFIX: &str = "arn:rustfs:kms::::"; +// // 定义资源ARN前缀 +// const RESOURCE_ARN_PREFIX: &str = "arn:aws:s3:::"; +// const RESOURCE_ARN_KMS_PREFIX: &str = "arn:rustfs:kms::::"; // 定义Resource结构体 #[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)] diff --git a/ecstore/src/bucket_meta.rs b/ecstore/src/bucket_meta.rs deleted file mode 100644 index 4a559b75..00000000 --- a/ecstore/src/bucket_meta.rs +++ /dev/null @@ -1,65 +0,0 @@ -use std::collections::HashMap; - -use rmp_serde::Serializer; -use serde::{Deserialize, Serialize}; -use time::OffsetDateTime; - -use crate::error::Result; - -use crate::disk::BUCKET_META_PREFIX; - -pub const BUCKET_METADATA_FILE: &str = ".metadata.bin"; -pub const BUCKET_METADATA_FORMAT: u16 = 1; -pub const BUCKET_METADATA_VERSION: u16 = 1; - -#[derive(Debug, PartialEq, Deserialize, Serialize, Default)] -pub struct BucketMetadata { - pub format: u16, - pub version: u16, - pub name: String, - pub tagging: Option>, - pub created: Option, -} - -// impl Default for BucketMetadata { -// fn default() -> Self { -// Self { -// format: Default::default(), -// version: Default::default(), -// name: Default::default(), -// created: OffsetDateTime::now_utc(), -// } -// } -// } - -impl BucketMetadata { - pub fn new(name: &str) -> Self { - BucketMetadata { - format: BUCKET_METADATA_FORMAT, - version: BUCKET_METADATA_VERSION, - name: name.to_string(), - ..Default::default() - } - } - - pub fn save_file_path(&self) -> String { - format!("{}/{}/{}", BUCKET_META_PREFIX, self.name.as_str(), BUCKET_METADATA_FILE) - // PathBuf::new() - // .join(BUCKET_META_PREFIX) - // .join(self.name.as_str()) - // .join(BUCKET_METADATA_FILE) - } - - pub fn marshal_msg(&self) -> Result> { - let mut buf = Vec::new(); - - self.serialize(&mut Serializer::new(&mut buf))?; - - Ok(buf) - } - - pub fn unmarshal_from(buf: &[u8]) -> Result { - let t: BucketMetadata = rmp_serde::from_slice(buf)?; - Ok(t) - } -} diff --git a/ecstore/src/config/common.rs b/ecstore/src/config/common.rs index 62e53252..6dedfbed 100644 --- a/ecstore/src/config/common.rs +++ b/ecstore/src/config/common.rs @@ -5,7 +5,6 @@ use crate::store_api::{HTTPRangeSpec, ObjectIO, ObjectInfo, ObjectOptions, PutOb use http::HeaderMap; use s3s::dto::StreamingBlob; use s3s::Body; -use tracing::warn; use super::error::ConfigError; diff --git a/ecstore/src/disks_layout.rs b/ecstore/src/disks_layout.rs index b83c41e8..d0296188 100644 --- a/ecstore/src/disks_layout.rs +++ b/ecstore/src/disks_layout.rs @@ -2,10 +2,13 @@ use crate::error::{Error, Result}; use crate::utils::ellipses::*; use serde::Deserialize; use std::collections::HashSet; +use std::env; +use tracing::debug; /// Supported set sizes this is used to find the optimal /// single set size. const SET_SIZES: [usize; 15] = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; +const ENV_RUSTFS_ERASURE_SET_DRIVE_COUNT: &str = "RUSTFS_ERASURE_SET_DRIVE_COUNT"; #[derive(Deserialize, Debug, Default)] pub struct PoolDisksLayout { @@ -40,19 +43,69 @@ pub struct DisksLayout { pub pools: Vec, } -impl> TryFrom<&[T]> for DisksLayout { - type Error = Error; +// impl> TryFrom<&[T]> for DisksLayout { +// type Error = Error; - fn try_from(args: &[T]) -> Result { +// fn try_from(args: &[T]) -> Result { +// if args.is_empty() { +// return Err(Error::from_string("Invalid argument")); +// } + +// let is_ellipses = args.iter().any(|v| has_ellipses(&[v])); + +// // None of the args have ellipses use the old style. +// if !is_ellipses { +// let set_args = get_all_sets(is_ellipses, args)?; + +// return Ok(DisksLayout { +// legacy: true, +// pools: vec![PoolDisksLayout::new( +// args.iter().map(AsRef::as_ref).collect::>().join(" "), +// set_args, +// )], +// }); +// } + +// let mut layout = Vec::with_capacity(args.len()); +// for arg in args.iter() { +// if !has_ellipses(&[arg]) && args.len() > 1 { +// return Err(Error::from_string( +// "all args must have ellipses for pool expansion (Invalid arguments specified)", +// )); +// } + +// let set_args = get_all_sets(is_ellipses, &[arg])?; + +// layout.push(PoolDisksLayout::new(arg.as_ref(), set_args)); +// } + +// Ok(DisksLayout { +// legacy: false, +// pools: layout, +// }) +// } +// } + +impl DisksLayout { + pub fn from_volumes>(args: &[T]) -> Result { if args.is_empty() { return Err(Error::from_string("Invalid argument")); } let is_ellipses = args.iter().any(|v| has_ellipses(&[v])); + let set_drive_count_env = match env::var(ENV_RUSTFS_ERASURE_SET_DRIVE_COUNT) { + Ok(res) => res, + Err(err) => { + debug!("{} not set use default:0, {:?}", ENV_RUSTFS_ERASURE_SET_DRIVE_COUNT, err); + format!("0") + } + }; + let set_drive_count: usize = set_drive_count_env.parse()?; + // None of the args have ellipses use the old style. if !is_ellipses { - let set_args = get_all_sets(is_ellipses, args)?; + let set_args = get_all_sets(set_drive_count, is_ellipses, args)?; return Ok(DisksLayout { legacy: true, @@ -71,7 +124,7 @@ impl> TryFrom<&[T]> for DisksLayout { )); } - let set_args = get_all_sets(is_ellipses, &[arg])?; + let set_args = get_all_sets(set_drive_count, is_ellipses, &[arg])?; layout.push(PoolDisksLayout::new(arg.as_ref(), set_args)); } @@ -81,9 +134,7 @@ impl> TryFrom<&[T]> for DisksLayout { pools: layout, }) } -} -impl DisksLayout { pub fn is_empty_layout(&self) -> bool { self.pools.is_empty() || self.pools[0].layout.is_empty() @@ -121,12 +172,12 @@ impl DisksLayout { /// /// For example: {1...64} is divided into 4 sets each of size 16. /// This applies to even distributed setup syntax as well. -fn get_all_sets>(is_ellipses: bool, args: &[T]) -> Result>> { +fn get_all_sets>(set_drive_count: usize, is_ellipses: bool, args: &[T]) -> Result>> { let endpoint_set = if is_ellipses { - EndpointSet::try_from(args)? + EndpointSet::from_volumes(args, set_drive_count)? } else { let set_indexes = if args.len() > 1 { - get_set_indexes(args, &[args.len()], &[])? + get_set_indexes(args, &[args.len()], set_drive_count, &[])? } else { vec![vec![args.len()]] }; @@ -159,17 +210,52 @@ struct EndpointSet { set_indexes: Vec>, } -impl> TryFrom<&[T]> for EndpointSet { - type Error = Error; +// impl> TryFrom<&[T]> for EndpointSet { +// type Error = Error; - fn try_from(args: &[T]) -> Result { +// fn try_from(args: &[T]) -> Result { +// let mut arg_patterns = Vec::with_capacity(args.len()); +// for arg in args { +// arg_patterns.push(find_ellipses_patterns(arg.as_ref())?); +// } + +// let total_sizes = get_total_sizes(&arg_patterns); +// let set_indexes = get_set_indexes(args, &total_sizes, &arg_patterns)?; + +// let mut endpoints = Vec::new(); +// for ap in arg_patterns.iter() { +// let aps = ap.expand(); +// for bs in aps { +// endpoints.push(bs.join("")); +// } +// } + +// Ok(EndpointSet { +// set_indexes, +// _arg_patterns: arg_patterns, +// endpoints, +// }) +// } +// } + +impl EndpointSet { + /// Create a new EndpointSet with the given endpoints and set indexes. + pub fn new(endpoints: Vec, set_indexes: Vec>) -> Self { + Self { + endpoints, + set_indexes, + ..Default::default() + } + } + + pub fn from_volumes>(args: &[T], set_drive_count: usize) -> Result { let mut arg_patterns = Vec::with_capacity(args.len()); for arg in args { arg_patterns.push(find_ellipses_patterns(arg.as_ref())?); } let total_sizes = get_total_sizes(&arg_patterns); - let set_indexes = get_set_indexes(args, &total_sizes, &arg_patterns)?; + let set_indexes = get_set_indexes(args, &total_sizes, set_drive_count, &arg_patterns)?; let mut endpoints = Vec::new(); for ap in arg_patterns.iter() { @@ -185,17 +271,6 @@ impl> TryFrom<&[T]> for EndpointSet { endpoints, }) } -} - -impl EndpointSet { - /// Create a new EndpointSet with the given endpoints and set indexes. - pub fn new(endpoints: Vec, set_indexes: Vec>) -> Self { - Self { - endpoints, - set_indexes, - ..Default::default() - } - } /// returns the sets representation of the endpoints /// this function also intelligently decides on what will @@ -298,14 +373,19 @@ fn possible_set_counts_with_symmetry(set_counts: &[usize], arg_patterns: &[ArgPa /// on each index, this function also determines the final set size /// The final set size has the affinity towards choosing smaller /// indexes (total sets) -fn get_set_indexes>(args: &[T], total_sizes: &[usize], arg_patterns: &[ArgPattern]) -> Result>> { +fn get_set_indexes>( + args: &[T], + total_sizes: &[usize], + set_drive_count: usize, + arg_patterns: &[ArgPattern], +) -> Result>> { if args.is_empty() || total_sizes.is_empty() { return Err(Error::from_string("Invalid argument")); } for &size in total_sizes { // Check if total_sizes has minimum range upto set_size - if size < SET_SIZES[0] { + if size < SET_SIZES[0] || size < set_drive_count { return Err(Error::from_string(format!("Incorrect number of endpoints provided, size {}", size))); } } @@ -319,16 +399,37 @@ fn get_set_indexes>(args: &[T], total_sizes: &[usize], arg_pattern ))); } - // TODO Add custom set drive count - // Returns possible set counts with symmetry. set_counts = possible_set_counts_with_symmetry(&set_counts, arg_patterns); if set_counts.is_empty() { return Err(Error::from_string("No symmetric distribution detected with input endpoints provided")); } - // Final set size with all the symmetry accounted for. - let set_size = common_set_drive_count(common_size, &set_counts); + let set_size = { + if set_drive_count > 0 { + let has_set_drive_count = set_counts.contains(&set_drive_count); + + if !has_set_drive_count { + return Err(Error::from_string(format!( + "Invalid set drive count. Acceptable values for {:?} number drives are {:?}", + common_size, &set_counts + ))); + } + set_drive_count + } else { + set_counts = possible_set_counts_with_symmetry(&set_counts, arg_patterns); + if set_counts.is_empty() { + return Err(Error::from_string(format!( + "No symmetric distribution detected with input endpoints , drives {} cannot be spread symmetrically by any supported erasure set sizes {:?}", + common_size, &set_counts + ))); + } + // Final set size with all the symmetry accounted for. + let set_size = common_set_drive_count(common_size, &set_counts); + set_size + } + }; + if !is_valid_set_size(set_size) { return Err(Error::from_string("Incorrect number of endpoints provided3")); } @@ -532,7 +633,7 @@ mod test { } } - match get_set_indexes(test_case.args.as_slice(), test_case.total_sizes.as_slice(), arg_patterns.as_slice()) { + match get_set_indexes(test_case.args.as_slice(), test_case.total_sizes.as_slice(), 0, arg_patterns.as_slice()) { Ok(got_indexes) => { if !test_case.success { panic!("Test{}: Expected failure but passed instead", test_case.num); @@ -792,7 +893,7 @@ mod test { ]; for test_case in test_cases { - match EndpointSet::try_from([test_case.arg].as_slice()) { + match EndpointSet::from_volumes([test_case.arg].as_slice(), 0) { Ok(got_es) => { if !test_case.success { panic!("Test{}: Expected failure but passed instead", test_case.num); diff --git a/ecstore/src/endpoints.rs b/ecstore/src/endpoints.rs index 14547bbf..f8bec4f1 100644 --- a/ecstore/src/endpoints.rs +++ b/ecstore/src/endpoints.rs @@ -413,7 +413,7 @@ impl EndpointServerPools { self.0 = eps; } pub fn from_volumes(server_addr: &str, endpoints: Vec) -> Result<(EndpointServerPools, SetupType)> { - let layouts = DisksLayout::try_from(endpoints.as_slice())?; + let layouts = DisksLayout::from_volumes(endpoints.as_slice())?; Self::create_server_endpoints(server_addr, &layouts) } @@ -1139,7 +1139,7 @@ mod test { ]; for test_case in test_cases { - let disks_layout = match DisksLayout::try_from(test_case.args.as_slice()) { + let disks_layout = match DisksLayout::from_volumes(test_case.args.as_slice()) { Ok(v) => v, Err(e) => { if test_case.expected_err.is_none() { @@ -1244,7 +1244,7 @@ mod test { ]; for (i, test_case) in test_cases.iter().enumerate() { - let disks_layout = match DisksLayout::try_from(test_case.1.as_slice()) { + let disks_layout = match DisksLayout::from_volumes(test_case.1.as_slice()) { Ok(v) => v, Err(e) => { if test_case.2 { diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index c6ff3c19..aae5ea12 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -400,9 +400,10 @@ impl ShardReader { } // debug!("ec decode read ress {:?}", &ress); - warn!("ec decode read errors {:?}", &errors); if !self.can_decode(&ress) { + warn!("ec decode read errors {:?}", &errors); + return Err(Error::msg("shard reader read faild")); } diff --git a/ecstore/src/lib.rs b/ecstore/src/lib.rs index bd8bced9..542213cb 100644 --- a/ecstore/src/lib.rs +++ b/ecstore/src/lib.rs @@ -1,4 +1,3 @@ -pub mod bucket_meta; mod chunk_stream; mod config; pub mod disk; diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 41bcba04..ede8631b 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -47,7 +47,7 @@ pub struct ECStore { impl ECStore { #[allow(clippy::new_ret_no_self)] pub async fn new(_address: String, endpoint_pools: EndpointServerPools) -> Result { - // let layouts = DisksLayout::try_from(endpoints.as_slice())?; + // let layouts = DisksLayout::from_volumes(endpoints.as_slice())?; let mut deployment_id = None; diff --git a/ecstore/src/writer.rs b/ecstore/src/writer.rs deleted file mode 100644 index a10b55cb..00000000 --- a/ecstore/src/writer.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::{io, task::Poll}; - -use futures::{ready, Future}; -use tokio::io::{AsyncWrite, BufWriter}; -use tracing::debug; -use uuid::Uuid; - -use crate::disk::DiskStore; - -pub struct AppendWriter<'a> { - disk: DiskStore, - volume: &'a str, - path: &'a str, -} - -impl<'a> AppendWriter<'a> { - pub fn new(disk: DiskStore, volume: &'a str, path: &'a str) -> Self { - debug!("AppendWriter new {}: {}/{}", disk.id(), volume, path); - Self { disk, volume, path } - } - - async fn async_write(&self, buf: &[u8]) -> Result<(), std::io::Error> { - debug!("async_write {}: {}: {}", self.disk.id(), &self.path, buf.len()); - - // self.disk - // .append_file(&self.volume, &self.path, buf) - // .await - // .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - Ok(()) - } -} - -impl<'a> AsyncWrite for AppendWriter<'a> { - fn poll_write( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - let mut fut = Box::pin(self.async_write(buf)); - debug!("AsyncWrite poll_write {}, buf:{}", self.disk.id(), buf.len()); - - let mut fut = self.get_mut().async_write(buf); - match futures::future::poll_fn(|cx| fut.as_mut().poll(cx)).start(cx) { - Ready(Ok(n)) => Ready(Ok(n)), - Ready(Err(e)) => Ready(Err(e)), - Pending => Pending, - } - } - - fn poll_flush( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Poll::Ready(Ok(())) - } -} diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index c23e04a6..c19d8357 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -22,7 +22,7 @@ use hyper_util::{ use protos::proto_gen::node_service::node_service_server::NodeServiceServer; use s3s::{auth::SimpleAuth, service::S3ServiceBuilder}; use service::hybrid; -use std::{io::IsTerminal, net::SocketAddr, str::FromStr}; +use std::{io::IsTerminal, net::SocketAddr, process::exit, str::FromStr}; use tokio::net::TcpListener; use tonic::{metadata::MetadataValue, Request, Status}; use tracing::{debug, info}; @@ -93,6 +93,14 @@ async fn run(opt: config::Opt) -> Result<()> { // 用于rpc let (endpoint_pools, setup_type) = EndpointServerPools::from_volumes(opt.address.clone().as_str(), opt.volumes.clone()) .map_err(|err| Error::from_string(err.to_string()))?; + + for (i, eps) in endpoint_pools.as_ref().iter().enumerate() { + debug!( + "created endpoints {}, set_count:{}, drives_per_set: {}, cmd: {:?}", + i, eps.set_count, eps.drives_per_set, eps.cmd_line + ); + } + set_global_endpoints(endpoint_pools.as_ref().clone()).await; update_erasure_type(setup_type).await; diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 2cbb3dcb..eaa66fc2 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -1,11 +1,8 @@ -use bytes::BufMut; use bytes::Bytes; use ecstore::bucket::get_bucket_metadata_sys; use ecstore::bucket::metadata::BUCKET_TAGGING_CONFIG; use ecstore::bucket::tags::Tags; -use ecstore::bucket_meta::BucketMetadata; use ecstore::disk::error::DiskError; -use ecstore::disk::RUSTFS_META_BUCKET; use ecstore::new_object_layer_fn; use ecstore::store_api::BucketOptions; use ecstore::store_api::CompletePart; @@ -24,7 +21,6 @@ use http::HeaderMap; use log::warn; use s3s::dto::*; use s3s::s3_error; -use s3s::Body; use s3s::S3Error; use s3s::S3ErrorCode; use s3s::S3Result; diff --git a/scripts/run.sh b/scripts/run.sh index 1a7b2073..1385973a 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -10,7 +10,9 @@ if [ -z "$RUST_LOG" ]; then export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug" fi -DATA_DIR_ARG="./target/volume/test{0...4}" +export RUSTFS_ERASURE_SET_DRIVE_COUNT=8 + +DATA_DIR_ARG="./target/volume/test{0...15}" if [ -n "$1" ]; then DATA_DIR_ARG="$1"