pool select idx

fixs:#346, #339, #338, #337, #336, #334

test healbucket

test get_available_pool_idx

fix
This commit is contained in:
weisd
2025-04-19 01:52:27 +08:00
committed by weisd
parent cf70e12e14
commit fff7e5f827
15 changed files with 427 additions and 157 deletions

View File

@@ -2,3 +2,6 @@
rustflags = [
"-C", "link-arg=-fuse-ld=bfd"
]
[target.x86_64-unknown-linux-musl]
linker = "x86_64-linux-musl-gcc"

View File

@@ -1,17 +1,17 @@
[workspace]
members = [
"madmin", # Management dashboard and admin API interface
"rustfs", # Core file system implementation
"ecstore", # Erasure coding storage implementation
"e2e_test", # End-to-end test suite
"common/common", # Shared utilities and data structures
"common/lock", # Distributed locking implementation
"common/protos", # Protocol buffer definitions
"madmin", # Management dashboard and admin API interface
"rustfs", # Core file system implementation
"ecstore", # Erasure coding storage implementation
"e2e_test", # End-to-end test suite
"common/common", # Shared utilities and data structures
"common/lock", # Distributed locking implementation
"common/protos", # Protocol buffer definitions
"common/workers", # Worker thread pools and task scheduling
"iam", # Identity and Access Management
"crypto", # Cryptography and security features
"iam", # Identity and Access Management
"crypto", # Cryptography and security features
"cli/rustfs-gui", # Graphical user interface client
"crates/obs", # Observability utilities
"crates/obs", # Observability utilities
"s3select/api",
"s3select/query",
"appauth",
@@ -67,7 +67,11 @@ http = "1.3.1"
http-body = "1.0.1"
humantime = "2.2.0"
jsonwebtoken = "9.3.1"
keyring = { version = "3.6.2", features = ["apple-native", "windows-native", "sync-secret-service"] }
keyring = { version = "3.6.2", features = [
"apple-native",
"windows-native",
"sync-secret-service",
] }
lock = { path = "./common/lock" }
lazy_static = "1.5.0"
libsystemd = { version = "0.7" }
@@ -77,12 +81,17 @@ md-5 = "0.10.6"
mime = "0.3.17"
netif = "0.1.6"
opentelemetry = { version = "0.29.1" }
opentelemetry-appender-tracing = { version = "0.29.1", features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
opentelemetry-appender-tracing = { version = "0.29.1", features = [
"experimental_use_tracing_span_context",
"experimental_metadata_attributes",
] }
opentelemetry_sdk = { version = "0.29" }
opentelemetry-stdout = { version = "0.29.0" }
opentelemetry-otlp = { version = "0.29" }
opentelemetry-prometheus = { version = "0.29.1" }
opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] }
opentelemetry-semantic-conventions = { version = "0.29.0", features = [
"semconv_experimental",
] }
pin-project-lite = "0.2"
prometheus = "0.14.0"
# pin-utils = "0.1.0"
@@ -93,8 +102,19 @@ protobuf = "3.7"
protos = { path = "./common/protos" }
rand = "0.8.5"
rdkafka = { version = "0.37", features = ["tokio"] }
reqwest = { version = "0.12.15", default-features = false, features = ["rustls-tls", "charset", "http2", "macos-system-configuration", "stream", "json", "blocking"] }
rfd = { version = "0.15.3", default-features = false, features = ["xdg-portal", "tokio"] }
reqwest = { version = "0.12.15", default-features = false, features = [
"rustls-tls",
"charset",
"http2",
"macos-system-configuration",
"stream",
"json",
"blocking",
] }
rfd = { version = "0.15.3", default-features = false, features = [
"xdg-portal",
"tokio",
] }
rmp = "0.8.14"
rmp-serde = "1.3.0"
rustfs-obs = { path = "crates/obs", version = "0.0.1" }
@@ -154,6 +174,10 @@ inherits = "dev"
inherits = "dev"
[profile.release]
opt-level = 3 # Optimization Level (0-3)
lto = true # Optimize when linking
codegen-units = 1 # Reduce code generation units to improve optimization
opt-level = 3
lto = "thin"
[profile.production]
inherits = "release"
lto = "fat"
codegen-units = 1

View File

@@ -66,6 +66,10 @@ impl Error {
self.downcast_ref::<std::io::Error>()
.map(|e| std::io::Error::new(e.kind(), e.to_string()))
}
pub fn inner_string(&self) -> String {
self.inner.to_string()
}
}
impl<T: std::error::Error + Send + Sync + 'static> From<T> for Error {

View File

@@ -254,7 +254,11 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
let filter_otel = match logger_level {
"trace" | "debug" => {
info!("OpenTelemetry tracing initialized with level: {}", logger_level);
EnvFilter::new(logger_level)
let mut filter = EnvFilter::new(logger_level);
for directive in ["hyper", "tonic", "h2", "reqwest", "tower"] {
filter = filter.add_directive(format!("{}=off", directive).parse().unwrap());
}
filter
}
_ => {
let mut filter = EnvFilter::new(logger_level);

View File

@@ -1,16 +1,18 @@
use std::collections::HashSet;
use std::sync::OnceLock;
use std::time::Duration;
use std::{collections::HashMap, sync::Arc};
use crate::bucket::error::BucketMetadataError;
use crate::bucket::metadata::{load_bucket_metadata_parse, BUCKET_LIFECYCLE_CONFIG};
use crate::bucket::utils::is_meta_bucketname;
use crate::config;
use crate::config::error::ConfigError;
use crate::disk::error::DiskError;
use crate::global::{is_dist_erasure, is_erasure, new_object_layer_fn, GLOBAL_Endpoints};
use crate::heal::heal_commands::HealOpts;
use crate::store::ECStore;
use crate::utils::xml::deserialize;
use crate::{config, StorageAPI};
use common::error::{Error, Result};
use futures::future::join_all;
use policy::policy::BucketPolicy;
@@ -20,6 +22,7 @@ use s3s::dto::{
};
use time::OffsetDateTime;
use tokio::sync::RwLock;
use tokio::time::sleep;
use tracing::{error, warn};
use super::metadata::{load_bucket_metadata, BucketMetadata};
@@ -196,7 +199,22 @@ impl BucketMetadataSys {
let mut futures = Vec::new();
for bucket in buckets.iter() {
futures.push(load_bucket_metadata(self.api.clone(), bucket.as_str()));
// TODO: HealBucket
let api = self.api.clone();
let bucket = bucket.clone();
futures.push(async move {
sleep(Duration::from_millis(30)).await;
let _ = api
.heal_bucket(
&bucket,
&HealOpts {
recreate: true,
..Default::default()
},
)
.await;
load_bucket_metadata(self.api.clone(), bucket.as_str()).await
});
}
let results = join_all(futures).await;
@@ -205,6 +223,7 @@ impl BucketMetadataSys {
let mut mp = self.metadata_map.write().await;
// TODO:EventNotifier,BucketTargetSys
for res in results {
match res {
Ok(res) => {

View File

@@ -455,6 +455,7 @@ impl LocalDisk {
{
if let Err(aerr) = access(volume_dir.as_ref()).await {
if os_is_not_exist(&aerr) {
warn!("read_metadata_with_dmtime os err {:?}", &aerr);
return Err(Error::new(DiskError::VolumeNotFound));
}
}
@@ -534,6 +535,7 @@ impl LocalDisk {
if !skip_access_checks(volume) {
if let Err(er) = utils::fs::access(volume_dir.as_ref()).await {
if os_is_not_exist(&er) {
warn!("read_all_data_with_dmtime os err {:?}", &er);
return Err(Error::new(DiskError::VolumeNotFound));
}
}
@@ -2090,6 +2092,8 @@ impl DiskAPI for LocalDisk {
Ok(fi)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
let file_path = self.get_object_path(volume, path)?;
let file_dir = self.get_bucket_path(volume)?;
@@ -2328,10 +2332,7 @@ impl DiskAPI for LocalDisk {
}
}
let vcfg = match BucketVersioningSys::get(&cache.info.name).await {
Ok(vcfg) => Some(vcfg),
Err(_) => None,
};
let vcfg = (BucketVersioningSys::get(&cache.info.name).await).ok();
let loc = self.get_disk_location();
let disks = store.get_disks(loc.pool_idx.unwrap(), loc.disk_idx.unwrap()).await?;

View File

@@ -285,6 +285,7 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument]
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
match self {
Disk::Local(local_disk) => local_disk.read_xl(volume, path, read_data).await,

View File

@@ -680,6 +680,7 @@ impl DiskAPI for RemoteDisk {
Ok(file_info)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
info!("read_xl {}/{}/{}", self.endpoint.to_string(), volume, path);
let mut client = node_service_time_out_client(&self.addr)

View File

@@ -99,7 +99,7 @@ impl FileMeta {
Ok((bin_len, &buf[5..]))
}
#[tracing::instrument]
pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result<u64> {
let i = buf.len() as u64;
@@ -711,7 +711,6 @@ impl FileMetaVersion {
Ok(data_dir)
}
#[tracing::instrument]
pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result<u64> {
let mut cur = Cursor::new(buf);
@@ -998,7 +997,7 @@ impl FileMetaVersionHeader {
Ok(wr)
}
#[tracing::instrument]
pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result<u64> {
let mut cur = Cursor::new(buf);
let alen = rmp::decode::read_array_len(&mut cur)?;
@@ -1144,7 +1143,6 @@ pub struct MetaObject {
}
impl MetaObject {
#[tracing::instrument]
pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result<u64> {
let mut cur = Cursor::new(buf);

View File

@@ -344,7 +344,6 @@ impl CurrentScannerCycle {
Ok(result)
}
#[tracing::instrument]
pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result<u64> {
let mut cur = Cursor::new(buf);
@@ -1076,10 +1075,9 @@ pub fn lc_has_active_rules(config: &BucketLifecycleConfiguration, prefix: &str)
continue;
}
let rule_prefix = lc_get_prefix(rule);
if !prefix.is_empty() && !rule_prefix.is_empty() {
if !prefix.starts_with(&rule_prefix) && !rule_prefix.starts_with(prefix) {
continue;
}
if !prefix.is_empty() && !rule_prefix.is_empty() && !prefix.starts_with(&rule_prefix) && !rule_prefix.starts_with(prefix)
{
continue;
}
if let Some(e) = &rule.noncurrent_version_expiration {
@@ -1102,7 +1100,7 @@ pub fn lc_has_active_rules(config: &BucketLifecycleConfiguration, prefix: &str)
return true;
}
if let Some(Some(true)) = rule.expiration.as_ref().map(|e| e.expired_object_delete_marker.map(|m| m)) {
if let Some(Some(true)) = rule.expiration.as_ref().map(|e| e.expired_object_delete_marker) {
return true;
}

View File

@@ -78,22 +78,27 @@ fn reduce_errs(errs: &[Option<Error>], ignored_errs: &[Box<dyn CheckErrorFn>]) -
let mut error_map: HashMap<String, usize> = HashMap::new(); // 存err位置
let nil = "nil".to_string();
for (i, operr) in errs.iter().enumerate() {
if operr.is_none() {
if let Some(err) = operr {
if is_err_ignored(err, ignored_errs) {
continue;
}
let errstr = err.inner_string();
let _ = *error_map.entry(errstr.clone()).or_insert(i);
*error_counts.entry(errstr.clone()).or_insert(0) += 1;
} else {
*error_counts.entry(nil.clone()).or_insert(0) += 1;
let _ = *error_map.entry(nil.clone()).or_insert(i);
continue;
}
let err = operr.as_ref().unwrap();
// let err = operr.as_ref().unwrap();
if is_err_ignored(err, ignored_errs) {
continue;
}
// let errstr = err.to_string();
let errstr = err.to_string();
let _ = *error_map.entry(errstr.clone()).or_insert(i);
*error_counts.entry(errstr.clone()).or_insert(0) += 1;
// let _ = *error_map.entry(errstr.clone()).or_insert(i);
// *error_counts.entry(errstr.clone()).or_insert(0) += 1;
}
let mut max = 0;
@@ -105,17 +110,14 @@ fn reduce_errs(errs: &[Option<Error>], ignored_errs: &[Box<dyn CheckErrorFn>]) -
}
}
if let Some(&c) = error_counts.get(&max_err) {
if let Some(&err_idx) = error_map.get(&max_err) {
let err = errs[err_idx].as_ref().map(clone_err);
return (c, err);
}
return (c, None);
if let Some(&err_idx) = error_map.get(&max_err) {
let err = errs[err_idx].as_ref().map(clone_err);
(max, err)
} else if max_err == nil {
(max, None)
} else {
(0, None)
}
(0, None)
}
// 根据quorum验证错误数量
@@ -152,3 +154,114 @@ pub fn reduce_write_quorum_errs(
) -> Option<Error> {
reduce_quorum_errs(errs, ignored_errs, write_quorum, QuorumError::Write)
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug)]
struct MockErrorChecker {
target_error: String,
}
impl CheckErrorFn for MockErrorChecker {
fn is(&self, e: &Error) -> bool {
e.inner_string() == self.target_error
}
}
fn mock_error(message: &str) -> Error {
Error::msg(message.to_string())
}
#[test]
fn test_reduce_errs_with_no_errors() {
let errs: Vec<Option<Error>> = vec![];
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![];
let (count, err) = reduce_errs(&errs, &ignored_errs);
assert_eq!(count, 0);
assert!(err.is_none());
}
#[test]
fn test_reduce_errs_with_ignored_errors() {
let errs = vec![Some(mock_error("ignored_error")), Some(mock_error("ignored_error"))];
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![Box::new(MockErrorChecker {
target_error: "ignored_error".to_string(),
})];
let (count, err) = reduce_errs(&errs, &ignored_errs);
assert_eq!(count, 0);
assert!(err.is_none());
}
#[test]
fn test_reduce_errs_with_mixed_errors() {
let errs = vec![
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
];
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![Box::new(MockErrorChecker {
target_error: "error2".to_string(),
})];
let (count, err) = reduce_errs(&errs, &ignored_errs);
println!("count: {}, err: {:?}", count, err);
assert_eq!(count, 9);
assert_eq!(err.unwrap().to_string(), DiskError::FileNotFound.to_string());
}
#[test]
fn test_reduce_errs_with_nil_errors() {
let errs = vec![None, Some(mock_error("error1")), None];
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![];
let (count, err) = reduce_errs(&errs, &ignored_errs);
assert_eq!(count, 2);
assert!(err.is_none());
}
#[test]
fn test_reduce_read_quorum_errs() {
let errs = vec![
Some(mock_error("error1")),
Some(mock_error("error1")),
Some(mock_error("error2")),
None,
None,
];
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![];
let read_quorum = 2;
let result = reduce_read_quorum_errs(&errs, &ignored_errs, read_quorum);
assert!(result.is_none());
}
#[test]
fn test_reduce_write_quorum_errs_with_quorum_error() {
let errs = vec![
Some(mock_error("error1")),
Some(mock_error("error2")),
Some(mock_error("error2")),
];
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![];
let write_quorum = 3;
let result = reduce_write_quorum_errs(&errs, &ignored_errs, write_quorum);
assert!(result.is_some());
assert_eq!(result.unwrap().to_string(), QuorumError::Write.to_string());
}
}

View File

@@ -59,12 +59,7 @@ use common::error::{Error, Result};
use futures::future::join_all;
use glob::Pattern;
use http::HeaderMap;
use lock::{
// drwmutex::Options,
drwmutex::Options,
namespace_lock::{new_nslock, NsLockMap},
LockApi,
};
use lock::{namespace_lock::NsLockMap, LockApi};
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
use md5::{Digest as Md5Digest, Md5};
use rand::{
@@ -407,6 +402,7 @@ impl SetDisks {
}
#[allow(dead_code)]
#[tracing::instrument(level = "debug", skip(self, disks))]
async fn commit_rename_data_dir(
&self,
disks: &[Option<DiskStore>],
@@ -852,6 +848,7 @@ impl SetDisks {
// Returns per object readQuorum and writeQuorum
// readQuorum is the min required disks to read data.
// writeQuorum is the min required disks to write data.
#[tracing::instrument(level = "debug", skip(parts_metadata))]
fn object_quorum_from_meta(
parts_metadata: &[FileInfo],
errs: &[Option<Error>],
@@ -864,7 +861,7 @@ impl SetDisks {
};
if let Some(err) = reduce_read_quorum_errs(errs, object_op_ignored_errs().as_ref(), expected_rquorum) {
// warn!("object_quorum_from_meta err {:?}", &err);
warn!("object_quorum_from_meta err {:?}", &err);
return Err(err);
}
@@ -1586,7 +1583,7 @@ impl SetDisks {
// Ok(())
// }
// #[tracing::instrument(skip(self))]
#[tracing::instrument(skip(self))]
pub async fn delete_all(&self, bucket: &str, prefix: &str) -> Result<()> {
let disks = self.disks.read().await;
@@ -1749,7 +1746,7 @@ impl SetDisks {
// TODO: 优化并发 可用数量中断
let (parts_metadata, errs) = Self::read_all_fileinfo(&disks, "", bucket, object, vid.as_str(), read_data, false).await;
// warn!("get_object_fileinfo parts_metadata {:?}", &parts_metadata);
// warn!("get_object_fileinfo {}/{} errs {:?}", bucket, object, &errs);
warn!("get_object_fileinfo {}/{} errs {:?}", bucket, object, &errs);
let _min_disks = self.set_drive_count - self.default_parity_count;
@@ -3672,33 +3669,33 @@ impl ObjectIO for SetDisks {
async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result<ObjectInfo> {
let disks = self.disks.read().await;
let mut _ns = None;
if !opts.no_lock {
let paths = vec![object.to_string()];
let ns_lock = new_nslock(
Arc::clone(&self.ns_mutex),
self.locker_owner.clone(),
bucket.to_string(),
paths,
self.lockers.clone(),
)
.await;
if !ns_lock
.0
.write()
.await
.get_lock(&Options {
timeout: Duration::from_secs(5),
retry_interval: Duration::from_secs(1),
})
.await
.map_err(|err| Error::from_string(err.to_string()))?
{
return Err(Error::from_string("can not get lock. please retry".to_string()));
}
// let mut _ns = None;
// if !opts.no_lock {
// let paths = vec![object.to_string()];
// let ns_lock = new_nslock(
// Arc::clone(&self.ns_mutex),
// self.locker_owner.clone(),
// bucket.to_string(),
// paths,
// self.lockers.clone(),
// )
// .await;
// if !ns_lock
// .0
// .write()
// .await
// .get_lock(&Options {
// timeout: Duration::from_secs(5),
// retry_interval: Duration::from_secs(1),
// })
// .await
// .map_err(|err| Error::from_string(err.to_string()))?
// {
// return Err(Error::from_string("can not get lock. please retry".to_string()));
// }
_ns = Some(ns_lock);
}
// _ns = Some(ns_lock);
// }
let mut user_defined = opts.user_defined.clone().unwrap_or_default();
@@ -4165,34 +4162,37 @@ impl StorageAPI for SetDisks {
unimplemented!()
}
async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<ObjectInfo> {
let mut _ns = None;
if !opts.no_lock {
let paths = vec![object.to_string()];
let ns_lock = new_nslock(
Arc::clone(&self.ns_mutex),
self.locker_owner.clone(),
bucket.to_string(),
paths,
self.lockers.clone(),
)
.await;
if !ns_lock
.0
.write()
.await
.get_lock(&Options {
timeout: Duration::from_secs(5),
retry_interval: Duration::from_secs(1),
})
.await
.map_err(|err| Error::from_string(err.to_string()))?
{
return Err(Error::from_string("can not get lock. please retry".to_string()));
}
// let mut _ns = None;
// if !opts.no_lock {
// let paths = vec![object.to_string()];
// let ns_lock = new_nslock(
// Arc::clone(&self.ns_mutex),
// self.locker_owner.clone(),
// bucket.to_string(),
// paths,
// self.lockers.clone(),
// )
// .await;
// if !ns_lock
// .0
// .write()
// .await
// .get_lock(&Options {
// timeout: Duration::from_secs(5),
// retry_interval: Duration::from_secs(1),
// })
// .await
// .map_err(|err| Error::from_string(err.to_string()))?
// {
// return Err(Error::from_string("can not get lock. please retry".to_string()));
// }
_ns = Some(ns_lock);
}
let (fi, _, _) = self.get_object_fileinfo(bucket, object, opts, false).await?;
// _ns = Some(ns_lock);
// }
let (fi, _, _) = self
.get_object_fileinfo(bucket, object, opts, false)
.await
.map_err(|e| to_object_err(e, vec![bucket, object]))?;
// warn!("get object_info fi {:?}", &fi);

View File

@@ -232,11 +232,19 @@ impl ECStore {
}
let wait_sec = 5;
let mut exit_count = 0;
loop {
if let Err(err) = ec.init().await {
error!("init err: {}", err);
error!("retry after {} second", wait_sec);
sleep(Duration::from_secs(wait_sec)).await;
if exit_count > 10 {
return Err(Error::msg("ec init faild"));
}
exit_count += 1;
continue;
}
@@ -481,6 +489,8 @@ impl ECStore {
}
async fn get_available_pool_idx(&self, bucket: &str, object: &str, size: i64) -> Option<usize> {
// // 先随机返回一个
let mut server_pools = self.get_server_pools_available_space(bucket, object, size).await;
server_pools.filter_max_used(100 - (100_f64 * DISK_RESERVE_FRACTION) as u64);
let total = server_pools.total_available();
@@ -521,28 +531,24 @@ impl ECStore {
}
}
let mut server_pools = Vec::new();
let mut server_pools = vec![PoolAvailableSpace::default(); self.pools.len()];
for (i, zinfo) in infos.iter().enumerate() {
if zinfo.is_empty() {
server_pools.push(PoolAvailableSpace {
server_pools[i] = PoolAvailableSpace {
index: i,
..Default::default()
});
};
continue;
}
if !is_meta_bucketname(bucket) {
let avail = has_space_for(zinfo, size).await.unwrap_or_default();
if !is_meta_bucketname(bucket) && !has_space_for(zinfo, size).await.unwrap_or_default() {
server_pools[i] = PoolAvailableSpace {
index: i,
..Default::default()
};
if !avail {
server_pools.push(PoolAvailableSpace {
index: i,
..Default::default()
});
continue;
}
continue;
}
let mut available = 0;
@@ -699,7 +705,7 @@ impl ECStore {
let at = a.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH);
let bt = b.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH);
at.cmp(&bt)
bt.cmp(&at)
});
let mut def_pool = PoolObjInfo::default();
@@ -915,29 +921,30 @@ impl ECStore {
// TODO: test order
idx_res.sort_by(|a, b| {
if let Some(obj1) = &a.res {
if let Some(obj2) = &b.res {
let cmp = obj1.mod_time.cmp(&obj2.mod_time);
match cmp {
// eq use lowest
Ordering::Equal => {
if a.idx < b.idx {
Ordering::Greater
} else {
Ordering::Less
}
}
_ => cmp,
}
} else {
Ordering::Greater
}
let a_mod = if let Some(o1) = &a.res {
o1.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH)
} else {
Ordering::Less
OffsetDateTime::UNIX_EPOCH
};
let b_mod = if let Some(o2) = &b.res {
o2.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH)
} else {
OffsetDateTime::UNIX_EPOCH
};
if a_mod == b_mod {
if a.idx < b.idx {
return Ordering::Greater;
} else {
return Ordering::Less;
}
}
b_mod.cmp(&a_mod)
});
for res in idx_res {
for res in idx_res.into_iter() {
if let Some(obj) = res.res {
return Ok((obj, res.idx));
}
@@ -2516,7 +2523,7 @@ fn check_put_object_args(bucket: &str, object: &str) -> Result<()> {
Ok(())
}
async fn get_disk_infos(disks: &[Option<DiskStore>]) -> Vec<Option<DiskInfo>> {
pub async fn get_disk_infos(disks: &[Option<DiskStore>]) -> Vec<Option<DiskInfo>> {
let opts = &DiskInfoOptions::default();
let mut res = vec![None; disks.len()];
for (idx, disk_op) in disks.iter().enumerate() {
@@ -2530,21 +2537,22 @@ async fn get_disk_infos(disks: &[Option<DiskStore>]) -> Vec<Option<DiskInfo>> {
res
}
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct PoolAvailableSpace {
pub index: usize,
pub available: u64, // in bytes
pub max_used_pct: u64, // Used disk percentage of most filled disk, rounded down.
}
#[derive(Debug, Default, Clone)]
pub struct ServerPoolsAvailableSpace(Vec<PoolAvailableSpace>);
impl ServerPoolsAvailableSpace {
fn iter(&self) -> Iter<'_, PoolAvailableSpace> {
pub fn iter(&self) -> Iter<'_, PoolAvailableSpace> {
self.0.iter()
}
// TotalAvailable - total available space
fn total_available(&self) -> u64 {
pub fn total_available(&self) -> u64 {
let mut total = 0;
for pool in &self.0 {
total += pool.available;
@@ -2554,7 +2562,7 @@ impl ServerPoolsAvailableSpace {
// FilterMaxUsed will filter out any pools that has used percent bigger than max,
// unless all have that, in which case all are preserved.
fn filter_max_used(&mut self, max: u64) {
pub fn filter_max_used(&mut self, max: u64) {
if self.0.len() <= 1 {
// Nothing to do.
return;
@@ -2575,13 +2583,14 @@ impl ServerPoolsAvailableSpace {
// Remove entries that are above.
for pool in self.0.iter_mut() {
if pool.available > 0 && pool.max_used_pct < max {
pool.available = 0
continue;
}
pool.available = 0
}
}
}
async fn has_space_for(dis: &[Option<DiskInfo>], size: i64) -> Result<bool> {
pub async fn has_space_for(dis: &[Option<DiskInfo>], size: i64) -> Result<bool> {
let size = {
if size < 0 {
DISK_ASSUME_UNKNOWN_SIZE

View File

@@ -14,7 +14,7 @@ pub use unix::{get_drive_stats, get_info, same_disk};
#[cfg(target_os = "windows")]
pub use windows::{get_drive_stats, get_info, same_disk};
#[derive(Debug, Default)]
#[derive(Debug, Default, PartialEq)]
pub struct IOStats {
pub read_ios: u64,
pub read_merges: u64,
@@ -34,3 +34,59 @@ pub struct IOStats {
pub flush_ios: u64,
pub flush_ticks: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[test]
fn test_get_info_valid_path() {
let temp_dir = tempfile::tempdir().unwrap();
let info = get_info(temp_dir.path()).unwrap();
println!("Disk Info: {:?}", info);
assert!(info.total > 0);
assert!(info.free > 0);
assert!(info.used > 0);
assert!(info.files > 0);
assert!(info.ffree > 0);
assert!(!info.fstype.is_empty());
}
#[test]
fn test_get_info_invalid_path() {
let invalid_path = PathBuf::from("/invalid/path");
let result = get_info(&invalid_path);
assert!(result.is_err());
}
#[test]
fn test_same_disk_same_path() {
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().to_str().unwrap();
let result = same_disk(path, path).unwrap();
assert!(result);
}
#[test]
fn test_same_disk_different_paths() {
let temp_dir1 = tempfile::tempdir().unwrap();
let temp_dir2 = tempfile::tempdir().unwrap();
let path1 = temp_dir1.path().to_str().unwrap();
let path2 = temp_dir2.path().to_str().unwrap();
let result = same_disk(path1, path2).unwrap();
assert!(!result);
}
#[test]
fn test_get_drive_stats_default() {
let stats = get_drive_stats(0, 0).unwrap();
assert_eq!(stats, IOStats::default());
}
}

39
scripts/dev.sh Executable file
View File

@@ -0,0 +1,39 @@
#!/bin/bash
# 脚本名称: scp_to_servers.sh
rm ./target/x86_64-unknown-linux-musl/release/rustfs.zip
# 压缩./target/x86_64-unknown-linux-musl/release/rustfs
zip ./target/x86_64-unknown-linux-musl/release/rustfs.zip ./target/x86_64-unknown-linux-musl/release/rustfs
# 本地文件路径
LOCAL_FILE="./target/x86_64-unknown-linux-musl/release/rustfs.zip"
REMOTE_PATH="~"
# 定义服务器列表数组
# 格式服务器IP 用户名 目标路径
SERVER_LIST=(
"root@121.89.80.13"
"root@121.89.80.198"
"root@8.130.78.237"
"root@8.130.189.236"
"root@121.89.80.230"
"root@121.89.80.45"
"root@8.130.191.95"
"root@121.89.80.91"
)
# 遍历服务器列表
for SERVER in "${SERVER_LIST[@]}"; do
echo "正在将文件复制到服务器: $SERVER 目标路径: $REMOTE_PATH"
scp "$LOCAL_FILE" "${SERVER}:${REMOTE_PATH}"
if [ $? -eq 0 ]; then
echo "成功复制到 $SERVER"
else
echo "复制到 $SERVER 失败"
fi
done
# ps -ef | grep rustfs | awk '{print $2}'| xargs kill -9