diff --git a/.cargo/config.toml b/.cargo/config.toml index 52d967b7..a1c92ecf 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -2,3 +2,6 @@ rustflags = [ "-C", "link-arg=-fuse-ld=bfd" ] + +[target.x86_64-unknown-linux-musl] +linker = "x86_64-linux-musl-gcc" \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 381305c4..e2ef3b56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,17 +1,17 @@ [workspace] members = [ - "madmin", # Management dashboard and admin API interface - "rustfs", # Core file system implementation - "ecstore", # Erasure coding storage implementation - "e2e_test", # End-to-end test suite - "common/common", # Shared utilities and data structures - "common/lock", # Distributed locking implementation - "common/protos", # Protocol buffer definitions + "madmin", # Management dashboard and admin API interface + "rustfs", # Core file system implementation + "ecstore", # Erasure coding storage implementation + "e2e_test", # End-to-end test suite + "common/common", # Shared utilities and data structures + "common/lock", # Distributed locking implementation + "common/protos", # Protocol buffer definitions "common/workers", # Worker thread pools and task scheduling - "iam", # Identity and Access Management - "crypto", # Cryptography and security features + "iam", # Identity and Access Management + "crypto", # Cryptography and security features "cli/rustfs-gui", # Graphical user interface client - "crates/obs", # Observability utilities + "crates/obs", # Observability utilities "s3select/api", "s3select/query", "appauth", @@ -67,7 +67,11 @@ http = "1.3.1" http-body = "1.0.1" humantime = "2.2.0" jsonwebtoken = "9.3.1" -keyring = { version = "3.6.2", features = ["apple-native", "windows-native", "sync-secret-service"] } +keyring = { version = "3.6.2", features = [ + "apple-native", + "windows-native", + "sync-secret-service", +] } lock = { path = "./common/lock" } lazy_static = "1.5.0" libsystemd = { version = "0.7" } @@ -77,12 +81,17 @@ md-5 = "0.10.6" mime = "0.3.17" netif = "0.1.6" opentelemetry = { version = "0.29.1" } -opentelemetry-appender-tracing = { version = "0.29.1", features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] } +opentelemetry-appender-tracing = { version = "0.29.1", features = [ + "experimental_use_tracing_span_context", + "experimental_metadata_attributes", +] } opentelemetry_sdk = { version = "0.29" } opentelemetry-stdout = { version = "0.29.0" } opentelemetry-otlp = { version = "0.29" } opentelemetry-prometheus = { version = "0.29.1" } -opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] } +opentelemetry-semantic-conventions = { version = "0.29.0", features = [ + "semconv_experimental", +] } pin-project-lite = "0.2" prometheus = "0.14.0" # pin-utils = "0.1.0" @@ -93,8 +102,19 @@ protobuf = "3.7" protos = { path = "./common/protos" } rand = "0.8.5" rdkafka = { version = "0.37", features = ["tokio"] } -reqwest = { version = "0.12.15", default-features = false, features = ["rustls-tls", "charset", "http2", "macos-system-configuration", "stream", "json", "blocking"] } -rfd = { version = "0.15.3", default-features = false, features = ["xdg-portal", "tokio"] } +reqwest = { version = "0.12.15", default-features = false, features = [ + "rustls-tls", + "charset", + "http2", + "macos-system-configuration", + "stream", + "json", + "blocking", +] } +rfd = { version = "0.15.3", default-features = false, features = [ + "xdg-portal", + "tokio", +] } rmp = "0.8.14" rmp-serde = "1.3.0" rustfs-obs = { path = "crates/obs", version = "0.0.1" } @@ -154,6 +174,10 @@ inherits = "dev" inherits = "dev" [profile.release] -opt-level = 3 # Optimization Level (0-3) -lto = true # Optimize when linking -codegen-units = 1 # Reduce code generation units to improve optimization +opt-level = 3 +lto = "thin" + +[profile.production] +inherits = "release" +lto = "fat" +codegen-units = 1 diff --git a/common/common/src/error.rs b/common/common/src/error.rs index 2c889053..21af4dba 100644 --- a/common/common/src/error.rs +++ b/common/common/src/error.rs @@ -66,6 +66,10 @@ impl Error { self.downcast_ref::() .map(|e| std::io::Error::new(e.kind(), e.to_string())) } + + pub fn inner_string(&self) -> String { + self.inner.to_string() + } } impl From for Error { diff --git a/crates/obs/src/telemetry.rs b/crates/obs/src/telemetry.rs index 82c93de7..8efa869d 100644 --- a/crates/obs/src/telemetry.rs +++ b/crates/obs/src/telemetry.rs @@ -254,7 +254,11 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard { let filter_otel = match logger_level { "trace" | "debug" => { info!("OpenTelemetry tracing initialized with level: {}", logger_level); - EnvFilter::new(logger_level) + let mut filter = EnvFilter::new(logger_level); + for directive in ["hyper", "tonic", "h2", "reqwest", "tower"] { + filter = filter.add_directive(format!("{}=off", directive).parse().unwrap()); + } + filter } _ => { let mut filter = EnvFilter::new(logger_level); diff --git a/ecstore/src/bucket/metadata_sys.rs b/ecstore/src/bucket/metadata_sys.rs index b3a7be99..62dc0dcf 100644 --- a/ecstore/src/bucket/metadata_sys.rs +++ b/ecstore/src/bucket/metadata_sys.rs @@ -1,16 +1,18 @@ use std::collections::HashSet; use std::sync::OnceLock; +use std::time::Duration; use std::{collections::HashMap, sync::Arc}; use crate::bucket::error::BucketMetadataError; use crate::bucket::metadata::{load_bucket_metadata_parse, BUCKET_LIFECYCLE_CONFIG}; use crate::bucket::utils::is_meta_bucketname; -use crate::config; use crate::config::error::ConfigError; use crate::disk::error::DiskError; use crate::global::{is_dist_erasure, is_erasure, new_object_layer_fn, GLOBAL_Endpoints}; +use crate::heal::heal_commands::HealOpts; use crate::store::ECStore; use crate::utils::xml::deserialize; +use crate::{config, StorageAPI}; use common::error::{Error, Result}; use futures::future::join_all; use policy::policy::BucketPolicy; @@ -20,6 +22,7 @@ use s3s::dto::{ }; use time::OffsetDateTime; use tokio::sync::RwLock; +use tokio::time::sleep; use tracing::{error, warn}; use super::metadata::{load_bucket_metadata, BucketMetadata}; @@ -196,7 +199,22 @@ impl BucketMetadataSys { let mut futures = Vec::new(); for bucket in buckets.iter() { - futures.push(load_bucket_metadata(self.api.clone(), bucket.as_str())); + // TODO: HealBucket + let api = self.api.clone(); + let bucket = bucket.clone(); + futures.push(async move { + sleep(Duration::from_millis(30)).await; + let _ = api + .heal_bucket( + &bucket, + &HealOpts { + recreate: true, + ..Default::default() + }, + ) + .await; + load_bucket_metadata(self.api.clone(), bucket.as_str()).await + }); } let results = join_all(futures).await; @@ -205,6 +223,7 @@ impl BucketMetadataSys { let mut mp = self.metadata_map.write().await; + // TODO:EventNotifier,BucketTargetSys for res in results { match res { Ok(res) => { diff --git a/ecstore/src/disk/local.rs b/ecstore/src/disk/local.rs index 24e5782f..10c711ad 100644 --- a/ecstore/src/disk/local.rs +++ b/ecstore/src/disk/local.rs @@ -455,6 +455,7 @@ impl LocalDisk { { if let Err(aerr) = access(volume_dir.as_ref()).await { if os_is_not_exist(&aerr) { + warn!("read_metadata_with_dmtime os err {:?}", &aerr); return Err(Error::new(DiskError::VolumeNotFound)); } } @@ -534,6 +535,7 @@ impl LocalDisk { if !skip_access_checks(volume) { if let Err(er) = utils::fs::access(volume_dir.as_ref()).await { if os_is_not_exist(&er) { + warn!("read_all_data_with_dmtime os err {:?}", &er); return Err(Error::new(DiskError::VolumeNotFound)); } } @@ -2090,6 +2092,8 @@ impl DiskAPI for LocalDisk { Ok(fi) } + + #[tracing::instrument(level = "debug", skip(self))] async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result { let file_path = self.get_object_path(volume, path)?; let file_dir = self.get_bucket_path(volume)?; @@ -2328,10 +2332,7 @@ impl DiskAPI for LocalDisk { } } - let vcfg = match BucketVersioningSys::get(&cache.info.name).await { - Ok(vcfg) => Some(vcfg), - Err(_) => None, - }; + let vcfg = (BucketVersioningSys::get(&cache.info.name).await).ok(); let loc = self.get_disk_location(); let disks = store.get_disks(loc.pool_idx.unwrap(), loc.disk_idx.unwrap()).await?; diff --git a/ecstore/src/disk/mod.rs b/ecstore/src/disk/mod.rs index fd0fb89e..e26eebb8 100644 --- a/ecstore/src/disk/mod.rs +++ b/ecstore/src/disk/mod.rs @@ -285,6 +285,7 @@ impl DiskAPI for Disk { } } + #[tracing::instrument] async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result { match self { Disk::Local(local_disk) => local_disk.read_xl(volume, path, read_data).await, diff --git a/ecstore/src/disk/remote.rs b/ecstore/src/disk/remote.rs index bac27b0d..c635d744 100644 --- a/ecstore/src/disk/remote.rs +++ b/ecstore/src/disk/remote.rs @@ -680,6 +680,7 @@ impl DiskAPI for RemoteDisk { Ok(file_info) } + #[tracing::instrument(level = "debug", skip(self))] async fn read_xl(&self, volume: &str, path: &str, read_data: bool) -> Result { info!("read_xl {}/{}/{}", self.endpoint.to_string(), volume, path); let mut client = node_service_time_out_client(&self.addr) diff --git a/ecstore/src/file_meta.rs b/ecstore/src/file_meta.rs index c9558578..e4da3882 100644 --- a/ecstore/src/file_meta.rs +++ b/ecstore/src/file_meta.rs @@ -99,7 +99,7 @@ impl FileMeta { Ok((bin_len, &buf[5..])) } - #[tracing::instrument] + pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { let i = buf.len() as u64; @@ -711,7 +711,6 @@ impl FileMetaVersion { Ok(data_dir) } - #[tracing::instrument] pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { let mut cur = Cursor::new(buf); @@ -998,7 +997,7 @@ impl FileMetaVersionHeader { Ok(wr) } - #[tracing::instrument] + pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { let mut cur = Cursor::new(buf); let alen = rmp::decode::read_array_len(&mut cur)?; @@ -1144,7 +1143,6 @@ pub struct MetaObject { } impl MetaObject { - #[tracing::instrument] pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { let mut cur = Cursor::new(buf); diff --git a/ecstore/src/heal/data_scanner.rs b/ecstore/src/heal/data_scanner.rs index 315b21fe..ae0b39d0 100644 --- a/ecstore/src/heal/data_scanner.rs +++ b/ecstore/src/heal/data_scanner.rs @@ -344,7 +344,6 @@ impl CurrentScannerCycle { Ok(result) } - #[tracing::instrument] pub fn unmarshal_msg(&mut self, buf: &[u8]) -> Result { let mut cur = Cursor::new(buf); @@ -1076,10 +1075,9 @@ pub fn lc_has_active_rules(config: &BucketLifecycleConfiguration, prefix: &str) continue; } let rule_prefix = lc_get_prefix(rule); - if !prefix.is_empty() && !rule_prefix.is_empty() { - if !prefix.starts_with(&rule_prefix) && !rule_prefix.starts_with(prefix) { - continue; - } + if !prefix.is_empty() && !rule_prefix.is_empty() && !prefix.starts_with(&rule_prefix) && !rule_prefix.starts_with(prefix) + { + continue; } if let Some(e) = &rule.noncurrent_version_expiration { @@ -1102,7 +1100,7 @@ pub fn lc_has_active_rules(config: &BucketLifecycleConfiguration, prefix: &str) return true; } - if let Some(Some(true)) = rule.expiration.as_ref().map(|e| e.expired_object_delete_marker.map(|m| m)) { + if let Some(Some(true)) = rule.expiration.as_ref().map(|e| e.expired_object_delete_marker) { return true; } diff --git a/ecstore/src/quorum.rs b/ecstore/src/quorum.rs index 09b652d9..d89fa523 100644 --- a/ecstore/src/quorum.rs +++ b/ecstore/src/quorum.rs @@ -78,22 +78,27 @@ fn reduce_errs(errs: &[Option], ignored_errs: &[Box]) - let mut error_map: HashMap = HashMap::new(); // 存err位置 let nil = "nil".to_string(); for (i, operr) in errs.iter().enumerate() { - if operr.is_none() { + if let Some(err) = operr { + if is_err_ignored(err, ignored_errs) { + continue; + } + + let errstr = err.inner_string(); + + let _ = *error_map.entry(errstr.clone()).or_insert(i); + *error_counts.entry(errstr.clone()).or_insert(0) += 1; + } else { *error_counts.entry(nil.clone()).or_insert(0) += 1; let _ = *error_map.entry(nil.clone()).or_insert(i); continue; } - let err = operr.as_ref().unwrap(); + // let err = operr.as_ref().unwrap(); - if is_err_ignored(err, ignored_errs) { - continue; - } + // let errstr = err.to_string(); - let errstr = err.to_string(); - - let _ = *error_map.entry(errstr.clone()).or_insert(i); - *error_counts.entry(errstr.clone()).or_insert(0) += 1; + // let _ = *error_map.entry(errstr.clone()).or_insert(i); + // *error_counts.entry(errstr.clone()).or_insert(0) += 1; } let mut max = 0; @@ -105,17 +110,14 @@ fn reduce_errs(errs: &[Option], ignored_errs: &[Box]) - } } - if let Some(&c) = error_counts.get(&max_err) { - if let Some(&err_idx) = error_map.get(&max_err) { - let err = errs[err_idx].as_ref().map(clone_err); - - return (c, err); - } - - return (c, None); + if let Some(&err_idx) = error_map.get(&max_err) { + let err = errs[err_idx].as_ref().map(clone_err); + (max, err) + } else if max_err == nil { + (max, None) + } else { + (0, None) } - - (0, None) } // 根据quorum验证错误数量 @@ -152,3 +154,114 @@ pub fn reduce_write_quorum_errs( ) -> Option { reduce_quorum_errs(errs, ignored_errs, write_quorum, QuorumError::Write) } + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Debug)] + struct MockErrorChecker { + target_error: String, + } + + impl CheckErrorFn for MockErrorChecker { + fn is(&self, e: &Error) -> bool { + e.inner_string() == self.target_error + } + } + + fn mock_error(message: &str) -> Error { + Error::msg(message.to_string()) + } + + #[test] + fn test_reduce_errs_with_no_errors() { + let errs: Vec> = vec![]; + let ignored_errs: Vec> = vec![]; + + let (count, err) = reduce_errs(&errs, &ignored_errs); + + assert_eq!(count, 0); + assert!(err.is_none()); + } + + #[test] + fn test_reduce_errs_with_ignored_errors() { + let errs = vec![Some(mock_error("ignored_error")), Some(mock_error("ignored_error"))]; + let ignored_errs: Vec> = vec![Box::new(MockErrorChecker { + target_error: "ignored_error".to_string(), + })]; + + let (count, err) = reduce_errs(&errs, &ignored_errs); + + assert_eq!(count, 0); + assert!(err.is_none()); + } + + #[test] + fn test_reduce_errs_with_mixed_errors() { + let errs = vec![ + Some(Error::new(DiskError::FileNotFound)), + Some(Error::new(DiskError::FileNotFound)), + Some(Error::new(DiskError::FileNotFound)), + Some(Error::new(DiskError::FileNotFound)), + Some(Error::new(DiskError::FileNotFound)), + Some(Error::new(DiskError::FileNotFound)), + Some(Error::new(DiskError::FileNotFound)), + Some(Error::new(DiskError::FileNotFound)), + Some(Error::new(DiskError::FileNotFound)), + ]; + let ignored_errs: Vec> = vec![Box::new(MockErrorChecker { + target_error: "error2".to_string(), + })]; + + let (count, err) = reduce_errs(&errs, &ignored_errs); + println!("count: {}, err: {:?}", count, err); + assert_eq!(count, 9); + assert_eq!(err.unwrap().to_string(), DiskError::FileNotFound.to_string()); + } + + #[test] + fn test_reduce_errs_with_nil_errors() { + let errs = vec![None, Some(mock_error("error1")), None]; + let ignored_errs: Vec> = vec![]; + + let (count, err) = reduce_errs(&errs, &ignored_errs); + + assert_eq!(count, 2); + assert!(err.is_none()); + } + + #[test] + fn test_reduce_read_quorum_errs() { + let errs = vec![ + Some(mock_error("error1")), + Some(mock_error("error1")), + Some(mock_error("error2")), + None, + None, + ]; + let ignored_errs: Vec> = vec![]; + let read_quorum = 2; + + let result = reduce_read_quorum_errs(&errs, &ignored_errs, read_quorum); + + assert!(result.is_none()); + } + + #[test] + fn test_reduce_write_quorum_errs_with_quorum_error() { + let errs = vec![ + Some(mock_error("error1")), + Some(mock_error("error2")), + Some(mock_error("error2")), + ]; + let ignored_errs: Vec> = vec![]; + let write_quorum = 3; + + let result = reduce_write_quorum_errs(&errs, &ignored_errs, write_quorum); + + assert!(result.is_some()); + assert_eq!(result.unwrap().to_string(), QuorumError::Write.to_string()); + } +} diff --git a/ecstore/src/set_disk.rs b/ecstore/src/set_disk.rs index 0148935e..39b421c0 100644 --- a/ecstore/src/set_disk.rs +++ b/ecstore/src/set_disk.rs @@ -59,12 +59,7 @@ use common::error::{Error, Result}; use futures::future::join_all; use glob::Pattern; use http::HeaderMap; -use lock::{ - // drwmutex::Options, - drwmutex::Options, - namespace_lock::{new_nslock, NsLockMap}, - LockApi, -}; +use lock::{namespace_lock::NsLockMap, LockApi}; use madmin::heal_commands::{HealDriveInfo, HealResultItem}; use md5::{Digest as Md5Digest, Md5}; use rand::{ @@ -407,6 +402,7 @@ impl SetDisks { } #[allow(dead_code)] + #[tracing::instrument(level = "debug", skip(self, disks))] async fn commit_rename_data_dir( &self, disks: &[Option], @@ -852,6 +848,7 @@ impl SetDisks { // Returns per object readQuorum and writeQuorum // readQuorum is the min required disks to read data. // writeQuorum is the min required disks to write data. + #[tracing::instrument(level = "debug", skip(parts_metadata))] fn object_quorum_from_meta( parts_metadata: &[FileInfo], errs: &[Option], @@ -864,7 +861,7 @@ impl SetDisks { }; if let Some(err) = reduce_read_quorum_errs(errs, object_op_ignored_errs().as_ref(), expected_rquorum) { - // warn!("object_quorum_from_meta err {:?}", &err); + warn!("object_quorum_from_meta err {:?}", &err); return Err(err); } @@ -1586,7 +1583,7 @@ impl SetDisks { // Ok(()) // } - // #[tracing::instrument(skip(self))] + #[tracing::instrument(skip(self))] pub async fn delete_all(&self, bucket: &str, prefix: &str) -> Result<()> { let disks = self.disks.read().await; @@ -1749,7 +1746,7 @@ impl SetDisks { // TODO: 优化并发 可用数量中断 let (parts_metadata, errs) = Self::read_all_fileinfo(&disks, "", bucket, object, vid.as_str(), read_data, false).await; // warn!("get_object_fileinfo parts_metadata {:?}", &parts_metadata); - // warn!("get_object_fileinfo {}/{} errs {:?}", bucket, object, &errs); + warn!("get_object_fileinfo {}/{} errs {:?}", bucket, object, &errs); let _min_disks = self.set_drive_count - self.default_parity_count; @@ -3672,33 +3669,33 @@ impl ObjectIO for SetDisks { async fn put_object(&self, bucket: &str, object: &str, data: &mut PutObjReader, opts: &ObjectOptions) -> Result { let disks = self.disks.read().await; - let mut _ns = None; - if !opts.no_lock { - let paths = vec![object.to_string()]; - let ns_lock = new_nslock( - Arc::clone(&self.ns_mutex), - self.locker_owner.clone(), - bucket.to_string(), - paths, - self.lockers.clone(), - ) - .await; - if !ns_lock - .0 - .write() - .await - .get_lock(&Options { - timeout: Duration::from_secs(5), - retry_interval: Duration::from_secs(1), - }) - .await - .map_err(|err| Error::from_string(err.to_string()))? - { - return Err(Error::from_string("can not get lock. please retry".to_string())); - } + // let mut _ns = None; + // if !opts.no_lock { + // let paths = vec![object.to_string()]; + // let ns_lock = new_nslock( + // Arc::clone(&self.ns_mutex), + // self.locker_owner.clone(), + // bucket.to_string(), + // paths, + // self.lockers.clone(), + // ) + // .await; + // if !ns_lock + // .0 + // .write() + // .await + // .get_lock(&Options { + // timeout: Duration::from_secs(5), + // retry_interval: Duration::from_secs(1), + // }) + // .await + // .map_err(|err| Error::from_string(err.to_string()))? + // { + // return Err(Error::from_string("can not get lock. please retry".to_string())); + // } - _ns = Some(ns_lock); - } + // _ns = Some(ns_lock); + // } let mut user_defined = opts.user_defined.clone().unwrap_or_default(); @@ -4165,34 +4162,37 @@ impl StorageAPI for SetDisks { unimplemented!() } async fn get_object_info(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result { - let mut _ns = None; - if !opts.no_lock { - let paths = vec![object.to_string()]; - let ns_lock = new_nslock( - Arc::clone(&self.ns_mutex), - self.locker_owner.clone(), - bucket.to_string(), - paths, - self.lockers.clone(), - ) - .await; - if !ns_lock - .0 - .write() - .await - .get_lock(&Options { - timeout: Duration::from_secs(5), - retry_interval: Duration::from_secs(1), - }) - .await - .map_err(|err| Error::from_string(err.to_string()))? - { - return Err(Error::from_string("can not get lock. please retry".to_string())); - } + // let mut _ns = None; + // if !opts.no_lock { + // let paths = vec![object.to_string()]; + // let ns_lock = new_nslock( + // Arc::clone(&self.ns_mutex), + // self.locker_owner.clone(), + // bucket.to_string(), + // paths, + // self.lockers.clone(), + // ) + // .await; + // if !ns_lock + // .0 + // .write() + // .await + // .get_lock(&Options { + // timeout: Duration::from_secs(5), + // retry_interval: Duration::from_secs(1), + // }) + // .await + // .map_err(|err| Error::from_string(err.to_string()))? + // { + // return Err(Error::from_string("can not get lock. please retry".to_string())); + // } - _ns = Some(ns_lock); - } - let (fi, _, _) = self.get_object_fileinfo(bucket, object, opts, false).await?; + // _ns = Some(ns_lock); + // } + let (fi, _, _) = self + .get_object_fileinfo(bucket, object, opts, false) + .await + .map_err(|e| to_object_err(e, vec![bucket, object]))?; // warn!("get object_info fi {:?}", &fi); diff --git a/ecstore/src/store.rs b/ecstore/src/store.rs index 037f82c9..5be291b1 100644 --- a/ecstore/src/store.rs +++ b/ecstore/src/store.rs @@ -232,11 +232,19 @@ impl ECStore { } let wait_sec = 5; + let mut exit_count = 0; loop { if let Err(err) = ec.init().await { error!("init err: {}", err); error!("retry after {} second", wait_sec); sleep(Duration::from_secs(wait_sec)).await; + + if exit_count > 10 { + return Err(Error::msg("ec init faild")); + } + + exit_count += 1; + continue; } @@ -481,6 +489,8 @@ impl ECStore { } async fn get_available_pool_idx(&self, bucket: &str, object: &str, size: i64) -> Option { + // // 先随机返回一个 + let mut server_pools = self.get_server_pools_available_space(bucket, object, size).await; server_pools.filter_max_used(100 - (100_f64 * DISK_RESERVE_FRACTION) as u64); let total = server_pools.total_available(); @@ -521,28 +531,24 @@ impl ECStore { } } - let mut server_pools = Vec::new(); + let mut server_pools = vec![PoolAvailableSpace::default(); self.pools.len()]; for (i, zinfo) in infos.iter().enumerate() { if zinfo.is_empty() { - server_pools.push(PoolAvailableSpace { + server_pools[i] = PoolAvailableSpace { index: i, ..Default::default() - }); + }; continue; } - if !is_meta_bucketname(bucket) { - let avail = has_space_for(zinfo, size).await.unwrap_or_default(); + if !is_meta_bucketname(bucket) && !has_space_for(zinfo, size).await.unwrap_or_default() { + server_pools[i] = PoolAvailableSpace { + index: i, + ..Default::default() + }; - if !avail { - server_pools.push(PoolAvailableSpace { - index: i, - ..Default::default() - }); - - continue; - } + continue; } let mut available = 0; @@ -699,7 +705,7 @@ impl ECStore { let at = a.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH); let bt = b.object_info.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH); - at.cmp(&bt) + bt.cmp(&at) }); let mut def_pool = PoolObjInfo::default(); @@ -915,29 +921,30 @@ impl ECStore { // TODO: test order idx_res.sort_by(|a, b| { - if let Some(obj1) = &a.res { - if let Some(obj2) = &b.res { - let cmp = obj1.mod_time.cmp(&obj2.mod_time); - match cmp { - // eq use lowest - Ordering::Equal => { - if a.idx < b.idx { - Ordering::Greater - } else { - Ordering::Less - } - } - _ => cmp, - } - } else { - Ordering::Greater - } + let a_mod = if let Some(o1) = &a.res { + o1.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH) } else { - Ordering::Less + OffsetDateTime::UNIX_EPOCH + }; + + let b_mod = if let Some(o2) = &b.res { + o2.mod_time.unwrap_or(OffsetDateTime::UNIX_EPOCH) + } else { + OffsetDateTime::UNIX_EPOCH + }; + + if a_mod == b_mod { + if a.idx < b.idx { + return Ordering::Greater; + } else { + return Ordering::Less; + } } + + b_mod.cmp(&a_mod) }); - for res in idx_res { + for res in idx_res.into_iter() { if let Some(obj) = res.res { return Ok((obj, res.idx)); } @@ -2516,7 +2523,7 @@ fn check_put_object_args(bucket: &str, object: &str) -> Result<()> { Ok(()) } -async fn get_disk_infos(disks: &[Option]) -> Vec> { +pub async fn get_disk_infos(disks: &[Option]) -> Vec> { let opts = &DiskInfoOptions::default(); let mut res = vec![None; disks.len()]; for (idx, disk_op) in disks.iter().enumerate() { @@ -2530,21 +2537,22 @@ async fn get_disk_infos(disks: &[Option]) -> Vec> { res } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct PoolAvailableSpace { pub index: usize, pub available: u64, // in bytes pub max_used_pct: u64, // Used disk percentage of most filled disk, rounded down. } +#[derive(Debug, Default, Clone)] pub struct ServerPoolsAvailableSpace(Vec); impl ServerPoolsAvailableSpace { - fn iter(&self) -> Iter<'_, PoolAvailableSpace> { + pub fn iter(&self) -> Iter<'_, PoolAvailableSpace> { self.0.iter() } // TotalAvailable - total available space - fn total_available(&self) -> u64 { + pub fn total_available(&self) -> u64 { let mut total = 0; for pool in &self.0 { total += pool.available; @@ -2554,7 +2562,7 @@ impl ServerPoolsAvailableSpace { // FilterMaxUsed will filter out any pools that has used percent bigger than max, // unless all have that, in which case all are preserved. - fn filter_max_used(&mut self, max: u64) { + pub fn filter_max_used(&mut self, max: u64) { if self.0.len() <= 1 { // Nothing to do. return; @@ -2575,13 +2583,14 @@ impl ServerPoolsAvailableSpace { // Remove entries that are above. for pool in self.0.iter_mut() { if pool.available > 0 && pool.max_used_pct < max { - pool.available = 0 + continue; } + pool.available = 0 } } } -async fn has_space_for(dis: &[Option], size: i64) -> Result { +pub async fn has_space_for(dis: &[Option], size: i64) -> Result { let size = { if size < 0 { DISK_ASSUME_UNKNOWN_SIZE diff --git a/ecstore/src/utils/os/mod.rs b/ecstore/src/utils/os/mod.rs index 73fda2f9..ac479dbb 100644 --- a/ecstore/src/utils/os/mod.rs +++ b/ecstore/src/utils/os/mod.rs @@ -14,7 +14,7 @@ pub use unix::{get_drive_stats, get_info, same_disk}; #[cfg(target_os = "windows")] pub use windows::{get_drive_stats, get_info, same_disk}; -#[derive(Debug, Default)] +#[derive(Debug, Default, PartialEq)] pub struct IOStats { pub read_ios: u64, pub read_merges: u64, @@ -34,3 +34,59 @@ pub struct IOStats { pub flush_ios: u64, pub flush_ticks: u64, } + +#[cfg(test)] +mod tests { + use super::*; + use std::path::PathBuf; + + #[test] + fn test_get_info_valid_path() { + let temp_dir = tempfile::tempdir().unwrap(); + let info = get_info(temp_dir.path()).unwrap(); + + println!("Disk Info: {:?}", info); + + assert!(info.total > 0); + assert!(info.free > 0); + assert!(info.used > 0); + assert!(info.files > 0); + assert!(info.ffree > 0); + assert!(!info.fstype.is_empty()); + } + + #[test] + fn test_get_info_invalid_path() { + let invalid_path = PathBuf::from("/invalid/path"); + let result = get_info(&invalid_path); + + assert!(result.is_err()); + } + + #[test] + fn test_same_disk_same_path() { + let temp_dir = tempfile::tempdir().unwrap(); + let path = temp_dir.path().to_str().unwrap(); + + let result = same_disk(path, path).unwrap(); + assert!(result); + } + + #[test] + fn test_same_disk_different_paths() { + let temp_dir1 = tempfile::tempdir().unwrap(); + let temp_dir2 = tempfile::tempdir().unwrap(); + + let path1 = temp_dir1.path().to_str().unwrap(); + let path2 = temp_dir2.path().to_str().unwrap(); + + let result = same_disk(path1, path2).unwrap(); + assert!(!result); + } + + #[test] + fn test_get_drive_stats_default() { + let stats = get_drive_stats(0, 0).unwrap(); + assert_eq!(stats, IOStats::default()); + } +} diff --git a/scripts/dev.sh b/scripts/dev.sh new file mode 100755 index 00000000..8dc19c88 --- /dev/null +++ b/scripts/dev.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +# 脚本名称: scp_to_servers.sh + +rm ./target/x86_64-unknown-linux-musl/release/rustfs.zip +# 压缩./target/x86_64-unknown-linux-musl/release/rustfs +zip ./target/x86_64-unknown-linux-musl/release/rustfs.zip ./target/x86_64-unknown-linux-musl/release/rustfs + + +# 本地文件路径 +LOCAL_FILE="./target/x86_64-unknown-linux-musl/release/rustfs.zip" +REMOTE_PATH="~" + +# 定义服务器列表数组 +# 格式:服务器IP 用户名 目标路径 +SERVER_LIST=( + "root@121.89.80.13" + "root@121.89.80.198" + "root@8.130.78.237" + "root@8.130.189.236" + "root@121.89.80.230" + "root@121.89.80.45" + "root@8.130.191.95" + "root@121.89.80.91" +) + +# 遍历服务器列表 +for SERVER in "${SERVER_LIST[@]}"; do + echo "正在将文件复制到服务器: $SERVER 目标路径: $REMOTE_PATH" + scp "$LOCAL_FILE" "${SERVER}:${REMOTE_PATH}" + if [ $? -eq 0 ]; then + echo "成功复制到 $SERVER" + else + echo "复制到 $SERVER 失败" + fi +done + + +# ps -ef | grep rustfs | awk '{print $2}'| xargs kill -9 \ No newline at end of file