merge fix/pools

This commit is contained in:
weisd
2025-04-23 16:48:57 +08:00
20 changed files with 384 additions and 63 deletions

View File

@@ -2,3 +2,6 @@
rustflags = [
"-C", "link-arg=-fuse-ld=bfd"
]
[target.x86_64-unknown-linux-musl]
linker = "x86_64-linux-musl-gcc"

1
Cargo.lock generated
View File

@@ -3111,6 +3111,7 @@ dependencies = [
"serde",
"serde_json",
"sha2 0.11.0-pre.5",
"shadow-rs",
"siphasher 1.0.1",
"smallvec",
"tempfile",

View File

@@ -1,21 +1,21 @@
[workspace]
members = [
"madmin", # Management dashboard and admin API interface
"rustfs", # Core file system implementation
"ecstore", # Erasure coding storage implementation
"e2e_test", # End-to-end test suite
"common/common", # Shared utilities and data structures
"common/lock", # Distributed locking implementation
"common/protos", # Protocol buffer definitions
"common/workers", # Worker thread pools and task scheduling
"iam", # Identity and Access Management
"crypto", # Cryptography and security features
"cli/rustfs-gui", # Graphical user interface client
"crates/obs", # Observability utilities
"madmin", # Management dashboard and admin API interface
"rustfs", # Core file system implementation
"ecstore", # Erasure coding storage implementation
"e2e_test", # End-to-end test suite
"common/common", # Shared utilities and data structures
"common/lock", # Distributed locking implementation
"common/protos", # Protocol buffer definitions
"common/workers", # Worker thread pools and task scheduling
"iam", # Identity and Access Management
"crypto", # Cryptography and security features
"cli/rustfs-gui", # Graphical user interface client
"crates/obs", # Observability utilities
"crates/event-notifier", # Event notification system
"s3select/api", # S3 Select API interface
"s3select/query", # S3 Select query engine
"appauth", # Application authentication and authorization
"s3select/api", # S3 Select API interface
"s3select/query", # S3 Select query engine
"appauth", # Application authentication and authorization
]
resolver = "2"
@@ -80,7 +80,11 @@ http = "1.3.1"
http-body = "1.0.1"
humantime = "2.2.0"
jsonwebtoken = "9.3.1"
keyring = { version = "3.6.2", features = ["apple-native", "windows-native", "sync-secret-service"] }
keyring = { version = "3.6.2", features = [
"apple-native",
"windows-native",
"sync-secret-service",
] }
lazy_static = "1.5.0"
libsystemd = { version = "0.7.1" }
local-ip-address = "0.6.3"
@@ -91,12 +95,17 @@ mime_guess = "2.0.5"
netif = "0.1.6"
object_store = "0.11.2"
opentelemetry = { version = "0.29.1" }
opentelemetry-appender-tracing = { version = "0.29.1", features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
opentelemetry-appender-tracing = { version = "0.29.1", features = [
"experimental_use_tracing_span_context",
"experimental_metadata_attributes",
] }
opentelemetry_sdk = { version = "0.29.0" }
opentelemetry-stdout = { version = "0.29.0" }
opentelemetry-otlp = { version = "0.29.0" }
opentelemetry-prometheus = { version = "0.29.1" }
opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] }
opentelemetry-semantic-conventions = { version = "0.29.0", features = [
"semconv_experimental",
] }
parking_lot = "0.12.3"
pin-project-lite = "0.2.16"
prometheus = "0.14.0"
@@ -107,8 +116,19 @@ prost-types = "0.13.5"
protobuf = "3.7"
rand = "0.8.5"
rdkafka = { version = "0.37.0", features = ["tokio"] }
reqwest = { version = "0.12.15", default-features = false, features = ["rustls-tls", "charset", "http2", "macos-system-configuration", "stream", "json", "blocking"] }
rfd = { version = "0.15.3", default-features = false, features = ["xdg-portal", "tokio"] }
reqwest = { version = "0.12.15", default-features = false, features = [
"rustls-tls",
"charset",
"http2",
"macos-system-configuration",
"stream",
"json",
"blocking",
] }
rfd = { version = "0.15.3", default-features = false, features = [
"xdg-portal",
"tokio",
] }
rmp = "0.8.14"
rmp-serde = "1.3.0"
rumqttc = { version = "0.24" }

View File

@@ -66,6 +66,10 @@ impl Error {
self.downcast_ref::<std::io::Error>()
.map(|e| std::io::Error::new(e.kind(), e.to_string()))
}
pub fn inner_string(&self) -> String {
self.inner.to_string()
}
}
impl<T: std::error::Error + Send + Sync + 'static> From<T> for Error {

View File

@@ -254,7 +254,11 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
let filter_otel = match logger_level {
"trace" | "debug" => {
info!("OpenTelemetry tracing initialized with level: {}", logger_level);
EnvFilter::new(logger_level)
let mut filter = EnvFilter::new(logger_level);
for directive in ["hyper", "tonic", "h2", "reqwest", "tower"] {
filter = filter.add_directive(format!("{}=off", directive).parse().unwrap());
}
filter
}
_ => {
let mut filter = EnvFilter::new(logger_level);

View File

@@ -69,6 +69,7 @@ workers.workspace = true
reqwest = { workspace = true }
urlencoding = "2.1.3"
smallvec = "1.15.0"
shadow-rs.workspace = true
[target.'cfg(not(windows))'.dependencies]
@@ -81,3 +82,6 @@ winapi = "0.3.9"
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
[build-dependencies]
shadow-rs.workspace = true

4
ecstore/build.rs Normal file
View File

@@ -0,0 +1,4 @@
fn main() -> shadow_rs::SdResult<()> {
shadow_rs::ShadowBuilder::builder().build()?;
Ok(())
}

View File

@@ -1,6 +1,6 @@
use crate::{
disk::endpoint::Endpoint,
global::GLOBAL_Endpoints,
global::{GLOBAL_Endpoints, GLOBAL_BOOT_TIME},
heal::{
data_usage::{load_data_usage_from_backend, DATA_USAGE_CACHE_NAME, DATA_USAGE_ROOT},
data_usage_cache::DataUsageCache,
@@ -22,12 +22,16 @@ use protos::{
};
use std::{
collections::{HashMap, HashSet},
time::{SystemTime, UNIX_EPOCH},
time::SystemTime,
};
use time::OffsetDateTime;
use tonic::Request;
use tracing::warn;
use shadow_rs::shadow;
shadow!(build);
// pub const ITEM_OFFLINE: &str = "offline";
// pub const ITEM_INITIALIZING: &str = "initializing";
// pub const ITEM_ONLINE: &str = "online";
@@ -140,8 +144,12 @@ pub async fn get_local_server_property() -> ServerProperties {
let mut props = ServerProperties {
endpoint: addr,
uptime: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
uptime: SystemTime::now()
.duration_since(*GLOBAL_BOOT_TIME.get().unwrap())
.unwrap_or_default()
.as_secs(),
network,
version: get_commit_id(),
..Default::default()
};
@@ -356,3 +364,14 @@ async fn get_pools_info(all_disks: &[Disk]) -> Result<HashMap<i32, HashMap<i32,
}
Ok(pools_info)
}
#[allow(clippy::const_is_empty)]
pub fn get_commit_id() -> String {
if !build::TAG.is_empty() {
build::TAG.to_string()
} else if !build::SHORT_COMMIT.is_empty() {
format!("@{}", build::SHORT_COMMIT)
} else {
build::PKG_VERSION.to_string()
}
}

View File

@@ -1,16 +1,18 @@
use std::collections::HashSet;
use std::sync::OnceLock;
use std::time::Duration;
use std::{collections::HashMap, sync::Arc};
use crate::bucket::error::BucketMetadataError;
use crate::bucket::metadata::{load_bucket_metadata_parse, BUCKET_LIFECYCLE_CONFIG};
use crate::bucket::utils::is_meta_bucketname;
use crate::config;
use crate::config::error::ConfigError;
use crate::disk::error::DiskError;
use crate::global::{is_dist_erasure, is_erasure, new_object_layer_fn, GLOBAL_Endpoints};
use crate::heal::heal_commands::HealOpts;
use crate::store::ECStore;
use crate::utils::xml::deserialize;
use crate::{config, StorageAPI};
use common::error::{Error, Result};
use futures::future::join_all;
use policy::policy::BucketPolicy;
@@ -20,6 +22,7 @@ use s3s::dto::{
};
use time::OffsetDateTime;
use tokio::sync::RwLock;
use tokio::time::sleep;
use tracing::{error, warn};
use super::metadata::{load_bucket_metadata, BucketMetadata};
@@ -196,7 +199,22 @@ impl BucketMetadataSys {
let mut futures = Vec::new();
for bucket in buckets.iter() {
futures.push(load_bucket_metadata(self.api.clone(), bucket.as_str()));
// TODO: HealBucket
let api = self.api.clone();
let bucket = bucket.clone();
futures.push(async move {
sleep(Duration::from_millis(30)).await;
let _ = api
.heal_bucket(
&bucket,
&HealOpts {
recreate: true,
..Default::default()
},
)
.await;
load_bucket_metadata(self.api.clone(), bucket.as_str()).await
});
}
let results = join_all(futures).await;
@@ -205,6 +223,7 @@ impl BucketMetadataSys {
let mut mp = self.metadata_map.write().await;
// TODO:EventNotifier,BucketTargetSys
for res in results {
match res {
Ok(res) => {

View File

@@ -455,6 +455,7 @@ impl LocalDisk {
{
if let Err(aerr) = access(volume_dir.as_ref()).await {
if os_is_not_exist(&aerr) {
warn!("read_metadata_with_dmtime os err {:?}", &aerr);
return Err(Error::new(DiskError::VolumeNotFound));
}
}
@@ -534,6 +535,7 @@ impl LocalDisk {
if !skip_access_checks(volume) {
if let Err(er) = utils::fs::access(volume_dir.as_ref()).await {
if os_is_not_exist(&er) {
warn!("read_all_data_with_dmtime os err {:?}", &er);
return Err(Error::new(DiskError::VolumeNotFound));
}
}
@@ -2090,6 +2092,8 @@ impl DiskAPI for LocalDisk {
Ok(fi)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
let file_path = self.get_object_path(volume, path)?;
let file_dir = self.get_bucket_path(volume)?;

View File

@@ -285,6 +285,7 @@ impl DiskAPI for Disk {
}
}
#[tracing::instrument]
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
match self {
Disk::Local(local_disk) => local_disk.read_xl(volume, path, read_data).await,

View File

@@ -680,6 +680,7 @@ impl DiskAPI for RemoteDisk {
Ok(file_info)
}
#[tracing::instrument(level = "debug", skip(self))]
async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result<RawFileInfo> {
info!("read_xl {}/{}/{}", self.endpoint.to_string(), volume, path);
let mut client = node_service_time_out_client(&self.addr)

View File

@@ -2,8 +2,9 @@ use lazy_static::lazy_static;
use std::{
collections::HashMap,
sync::{Arc, OnceLock},
time::SystemTime,
};
use tokio::sync::RwLock;
use tokio::sync::{OnceCell, RwLock};
use uuid::Uuid;
use crate::heal::mrf::MRFState;
@@ -37,6 +38,7 @@ lazy_static! {
pub static ref GLOBAL_ALlHealState: Arc<AllHealState> = AllHealState::new(false);
pub static ref GLOBAL_MRFState: Arc<MRFState> = Arc::new(MRFState::new());
static ref globalDeploymentIDPtr: OnceLock<Uuid> = OnceLock::new();
pub static ref GLOBAL_BOOT_TIME: OnceCell<SystemTime> = OnceCell::new();
}
pub fn global_rustfs_port() -> u16 {

View File

@@ -1073,10 +1073,9 @@ pub fn lc_has_active_rules(config: &BucketLifecycleConfiguration, prefix: &str)
continue;
}
let rule_prefix = lc_get_prefix(rule);
if !prefix.is_empty() && !rule_prefix.is_empty() {
if !prefix.starts_with(&rule_prefix) && !rule_prefix.starts_with(prefix) {
continue;
}
if !prefix.is_empty() && !rule_prefix.is_empty() && !prefix.starts_with(&rule_prefix) && !rule_prefix.starts_with(prefix)
{
continue;
}
if let Some(e) = &rule.noncurrent_version_expiration {

View File

@@ -1,4 +1,5 @@
use crate::global::get_global_endpoints;
use crate::admin_server_info::get_commit_id;
use crate::global::{get_global_endpoints, GLOBAL_BOOT_TIME};
use crate::peer_rest_client::PeerRestClient;
use crate::StorageAPI;
use crate::{endpoints::EndpointServerPools, new_object_layer_fn};
@@ -7,6 +8,7 @@ use futures::future::join_all;
use lazy_static::lazy_static;
use madmin::{ItemState, ServerProperties};
use std::sync::OnceLock;
use std::time::SystemTime;
use tracing::error;
lazy_static! {
@@ -104,6 +106,11 @@ impl NotificationSys {
match client.server_info().await {
Ok(info) => info,
Err(_) => ServerProperties {
uptime: SystemTime::now()
.duration_since(*GLOBAL_BOOT_TIME.get().unwrap())
.unwrap_or_default()
.as_secs(),
version: get_commit_id(),
endpoint: client.host.to_string(),
state: ItemState::Offline.to_string().to_owned(),
disks: get_offline_disks(&client.host.to_string(), &get_global_endpoints()),

View File

@@ -78,22 +78,27 @@ fn reduce_errs(errs: &[Option<Error>], ignored_errs: &[Box<dyn CheckErrorFn>]) -
let mut error_map: HashMap<String, usize> = HashMap::new(); // 存err位置
let nil = "nil".to_string();
for (i, operr) in errs.iter().enumerate() {
if operr.is_none() {
if let Some(err) = operr {
if is_err_ignored(err, ignored_errs) {
continue;
}
let errstr = err.inner_string();
let _ = *error_map.entry(errstr.clone()).or_insert(i);
*error_counts.entry(errstr.clone()).or_insert(0) += 1;
} else {
*error_counts.entry(nil.clone()).or_insert(0) += 1;
let _ = *error_map.entry(nil.clone()).or_insert(i);
continue;
}
let err = operr.as_ref().unwrap();
// let err = operr.as_ref().unwrap();
if is_err_ignored(err, ignored_errs) {
continue;
}
// let errstr = err.to_string();
let errstr = err.to_string();
let _ = *error_map.entry(errstr.clone()).or_insert(i);
*error_counts.entry(errstr.clone()).or_insert(0) += 1;
// let _ = *error_map.entry(errstr.clone()).or_insert(i);
// *error_counts.entry(errstr.clone()).or_insert(0) += 1;
}
let mut max = 0;
@@ -105,17 +110,14 @@ fn reduce_errs(errs: &[Option<Error>], ignored_errs: &[Box<dyn CheckErrorFn>]) -
}
}
if let Some(&c) = error_counts.get(&max_err) {
if let Some(&err_idx) = error_map.get(&max_err) {
let err = errs[err_idx].as_ref().map(clone_err);
return (c, err);
}
return (c, None);
if let Some(&err_idx) = error_map.get(&max_err) {
let err = errs[err_idx].as_ref().map(clone_err);
(max, err)
} else if max_err == nil {
(max, None)
} else {
(0, None)
}
(0, None)
}
// 根据quorum验证错误数量
@@ -152,3 +154,114 @@ pub fn reduce_write_quorum_errs(
) -> Option<Error> {
reduce_quorum_errs(errs, ignored_errs, write_quorum, QuorumError::Write)
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug)]
struct MockErrorChecker {
target_error: String,
}
impl CheckErrorFn for MockErrorChecker {
fn is(&self, e: &Error) -> bool {
e.inner_string() == self.target_error
}
}
fn mock_error(message: &str) -> Error {
Error::msg(message.to_string())
}
#[test]
fn test_reduce_errs_with_no_errors() {
let errs: Vec<Option<Error>> = vec![];
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![];
let (count, err) = reduce_errs(&errs, &ignored_errs);
assert_eq!(count, 0);
assert!(err.is_none());
}
#[test]
fn test_reduce_errs_with_ignored_errors() {
let errs = vec![Some(mock_error("ignored_error")), Some(mock_error("ignored_error"))];
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![Box::new(MockErrorChecker {
target_error: "ignored_error".to_string(),
})];
let (count, err) = reduce_errs(&errs, &ignored_errs);
assert_eq!(count, 0);
assert!(err.is_none());
}
#[test]
fn test_reduce_errs_with_mixed_errors() {
let errs = vec![
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
Some(Error::new(DiskError::FileNotFound)),
];
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![Box::new(MockErrorChecker {
target_error: "error2".to_string(),
})];
let (count, err) = reduce_errs(&errs, &ignored_errs);
println!("count: {}, err: {:?}", count, err);
assert_eq!(count, 9);
assert_eq!(err.unwrap().to_string(), DiskError::FileNotFound.to_string());
}
#[test]
fn test_reduce_errs_with_nil_errors() {
let errs = vec![None, Some(mock_error("error1")), None];
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![];
let (count, err) = reduce_errs(&errs, &ignored_errs);
assert_eq!(count, 2);
assert!(err.is_none());
}
#[test]
fn test_reduce_read_quorum_errs() {
let errs = vec![
Some(mock_error("error1")),
Some(mock_error("error1")),
Some(mock_error("error2")),
None,
None,
];
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![];
let read_quorum = 2;
let result = reduce_read_quorum_errs(&errs, &ignored_errs, read_quorum);
assert!(result.is_none());
}
#[test]
fn test_reduce_write_quorum_errs_with_quorum_error() {
let errs = vec![
Some(mock_error("error1")),
Some(mock_error("error2")),
Some(mock_error("error2")),
];
let ignored_errs: Vec<Box<dyn CheckErrorFn>> = vec![];
let write_quorum = 3;
let result = reduce_write_quorum_errs(&errs, &ignored_errs, write_quorum);
assert!(result.is_some());
assert_eq!(result.unwrap().to_string(), QuorumError::Write.to_string());
}
}

View File

@@ -402,6 +402,7 @@ impl SetDisks {
}
#[allow(dead_code)]
#[tracing::instrument(level = "debug", skip(self, disks))]
async fn commit_rename_data_dir(
&self,
disks: &[Option<DiskStore>],
@@ -847,6 +848,7 @@ impl SetDisks {
// Returns per object readQuorum and writeQuorum
// readQuorum is the min required disks to read data.
// writeQuorum is the min required disks to write data.
#[tracing::instrument(level = "debug", skip(parts_metadata))]
fn object_quorum_from_meta(
parts_metadata: &[FileInfo],
errs: &[Option<Error>],
@@ -859,7 +861,7 @@ impl SetDisks {
};
if let Some(err) = reduce_read_quorum_errs(errs, object_op_ignored_errs().as_ref(), expected_rquorum) {
// warn!("object_quorum_from_meta err {:?}", &err);
warn!("object_quorum_from_meta err {:?}", &err);
return Err(err);
}
@@ -1581,7 +1583,7 @@ impl SetDisks {
// Ok(())
// }
// #[tracing::instrument(skip(self))]
#[tracing::instrument(skip(self))]
pub async fn delete_all(&self, bucket: &str, prefix: &str) -> Result<()> {
let disks = self.disks.read().await;
@@ -1744,7 +1746,7 @@ impl SetDisks {
// TODO: 优化并发 可用数量中断
let (parts_metadata, errs) = Self::read_all_fileinfo(&disks, "", bucket, object, vid.as_str(), read_data, false).await;
// warn!("get_object_fileinfo parts_metadata {:?}", &parts_metadata);
// warn!("get_object_fileinfo {}/{} errs {:?}", bucket, object, &errs);
warn!("get_object_fileinfo {}/{} errs {:?}", bucket, object, &errs);
let _min_disks = self.set_drive_count - self.default_parity_count;
@@ -4187,7 +4189,11 @@ impl StorageAPI for SetDisks {
// _ns = Some(ns_lock);
// }
let (fi, _, _) = self.get_object_fileinfo(bucket, object, opts, false).await?;
let (fi, _, _) = self
.get_object_fileinfo(bucket, object, opts, false)
.await
.map_err(|e| to_object_err(e, vec![bucket, object]))?;
// warn!("get object_info fi {:?}", &fi);

View File

@@ -9,7 +9,8 @@ use crate::disk::{DiskAPI, DiskInfo, DiskInfoOptions, MetaCacheEntry};
use crate::error::clone_err;
use crate::global::{
get_global_endpoints, is_dist_erasure, is_erasure_sd, set_global_deployment_id, set_object_layer, DISK_ASSUME_UNKNOWN_SIZE,
DISK_FILL_FRACTION, DISK_MIN_INODES, DISK_RESERVE_FRACTION, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES,
DISK_FILL_FRACTION, DISK_MIN_INODES, DISK_RESERVE_FRACTION, GLOBAL_BOOT_TIME, GLOBAL_LOCAL_DISK_MAP,
GLOBAL_LOCAL_DISK_SET_DRIVES,
};
use crate::heal::data_usage::{DataUsageInfo, DATA_USAGE_ROOT};
use crate::heal::data_usage_cache::{DataUsageCache, DataUsageCacheInfo};
@@ -232,11 +233,19 @@ impl ECStore {
}
let wait_sec = 5;
let mut exit_count = 0;
loop {
if let Err(err) = ec.init().await {
error!("init err: {}", err);
error!("retry after {} second", wait_sec);
sleep(Duration::from_secs(wait_sec)).await;
if exit_count > 10 {
return Err(Error::msg("ec init faild"));
}
exit_count += 1;
continue;
}
@@ -249,6 +258,8 @@ impl ECStore {
}
pub async fn init(self: &Arc<Self>) -> Result<()> {
GLOBAL_BOOT_TIME.get_or_init(|| async { SystemTime::now() }).await;
if self.load_rebalance_meta().await.is_ok() {
self.start_rebalance().await;
}
@@ -481,6 +492,8 @@ impl ECStore {
}
async fn get_available_pool_idx(&self, bucket: &str, object: &str, size: i64) -> Option<usize> {
// // 先随机返回一个
let mut server_pools = self.get_server_pools_available_space(bucket, object, size).await;
server_pools.filter_max_used(100 - (100_f64 * DISK_RESERVE_FRACTION) as u64);
let total = server_pools.total_available();
@@ -2513,7 +2526,7 @@ fn check_put_object_args(bucket: &str, object: &str) -> Result<()> {
Ok(())
}
async fn get_disk_infos(disks: &[Option<DiskStore>]) -> Vec<Option<DiskInfo>> {
pub async fn get_disk_infos(disks: &[Option<DiskStore>]) -> Vec<Option<DiskInfo>> {
let opts = &DiskInfoOptions::default();
let mut res = vec![None; disks.len()];
for (idx, disk_op) in disks.iter().enumerate() {
@@ -2534,14 +2547,15 @@ pub struct PoolAvailableSpace {
pub max_used_pct: u64, // Used disk percentage of most filled disk, rounded down.
}
#[derive(Debug, Default, Clone)]
pub struct ServerPoolsAvailableSpace(Vec<PoolAvailableSpace>);
impl ServerPoolsAvailableSpace {
fn iter(&self) -> Iter<'_, PoolAvailableSpace> {
pub fn iter(&self) -> Iter<'_, PoolAvailableSpace> {
self.0.iter()
}
// TotalAvailable - total available space
fn total_available(&self) -> u64 {
pub fn total_available(&self) -> u64 {
let mut total = 0;
for pool in &self.0 {
total += pool.available;
@@ -2551,7 +2565,7 @@ impl ServerPoolsAvailableSpace {
// FilterMaxUsed will filter out any pools that has used percent bigger than max,
// unless all have that, in which case all are preserved.
fn filter_max_used(&mut self, max: u64) {
pub fn filter_max_used(&mut self, max: u64) {
if self.0.len() <= 1 {
// Nothing to do.
return;
@@ -2572,13 +2586,14 @@ impl ServerPoolsAvailableSpace {
// Remove entries that are above.
for pool in self.0.iter_mut() {
if pool.available > 0 && pool.max_used_pct < max {
pool.available = 0
continue;
}
pool.available = 0
}
}
}
async fn has_space_for(dis: &[Option<DiskInfo>], size: i64) -> Result<bool> {
pub async fn has_space_for(dis: &[Option<DiskInfo>], size: i64) -> Result<bool> {
let size = {
if size < 0 {
DISK_ASSUME_UNKNOWN_SIZE

View File

@@ -14,7 +14,7 @@ pub use unix::{get_drive_stats, get_info, same_disk};
#[cfg(target_os = "windows")]
pub use windows::{get_drive_stats, get_info, same_disk};
#[derive(Debug, Default)]
#[derive(Debug, Default, PartialEq)]
pub struct IOStats {
pub read_ios: u64,
pub read_merges: u64,
@@ -34,3 +34,59 @@ pub struct IOStats {
pub flush_ios: u64,
pub flush_ticks: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[test]
fn test_get_info_valid_path() {
let temp_dir = tempfile::tempdir().unwrap();
let info = get_info(temp_dir.path()).unwrap();
println!("Disk Info: {:?}", info);
assert!(info.total > 0);
assert!(info.free > 0);
assert!(info.used > 0);
assert!(info.files > 0);
assert!(info.ffree > 0);
assert!(!info.fstype.is_empty());
}
#[test]
fn test_get_info_invalid_path() {
let invalid_path = PathBuf::from("/invalid/path");
let result = get_info(&invalid_path);
assert!(result.is_err());
}
#[test]
fn test_same_disk_same_path() {
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().to_str().unwrap();
let result = same_disk(path, path).unwrap();
assert!(result);
}
#[test]
fn test_same_disk_different_paths() {
let temp_dir1 = tempfile::tempdir().unwrap();
let temp_dir2 = tempfile::tempdir().unwrap();
let path1 = temp_dir1.path().to_str().unwrap();
let path2 = temp_dir2.path().to_str().unwrap();
let result = same_disk(path1, path2).unwrap();
assert!(!result);
}
#[test]
fn test_get_drive_stats_default() {
let stats = get_drive_stats(0, 0).unwrap();
assert_eq!(stats, IOStats::default());
}
}

39
scripts/dev.sh Executable file
View File

@@ -0,0 +1,39 @@
#!/bin/bash
# 脚本名称: scp_to_servers.sh
rm ./target/x86_64-unknown-linux-musl/release/rustfs.zip
# 压缩./target/x86_64-unknown-linux-musl/release/rustfs
zip ./target/x86_64-unknown-linux-musl/release/rustfs.zip ./target/x86_64-unknown-linux-musl/release/rustfs
# 本地文件路径
LOCAL_FILE="./target/x86_64-unknown-linux-musl/release/rustfs.zip"
REMOTE_PATH="~"
# 定义服务器列表数组
# 格式服务器IP 用户名 目标路径
SERVER_LIST=(
"root@121.89.80.13"
"root@121.89.80.198"
"root@8.130.78.237"
"root@8.130.189.236"
"root@121.89.80.230"
"root@121.89.80.45"
"root@8.130.191.95"
"root@121.89.80.91"
)
# 遍历服务器列表
for SERVER in "${SERVER_LIST[@]}"; do
echo "正在将文件复制到服务器: $SERVER 目标路径: $REMOTE_PATH"
scp "$LOCAL_FILE" "${SERVER}:${REMOTE_PATH}"
if [ $? -eq 0 ]; then
echo "成功复制到 $SERVER"
else
echo "复制到 $SERVER 失败"
fi
done
# ps -ef | grep rustfs | awk '{print $2}'| xargs kill -9