From 2fec6d161cf579506ce17fb72ec5a2c65bb088ab Mon Sep 17 00:00:00 2001 From: houseme Date: Fri, 6 Jun 2025 15:13:55 +0800 Subject: [PATCH] fix: remove dep crate openssl relation --- Cargo.lock | 236 +------------------------- Cargo.toml | 3 +- common/common/src/bucket_stats.rs | 4 +- common/common/src/last_minute.rs | 52 +++--- ecstore/Cargo.toml | 7 +- ecstore/src/cmd/bucket_replication.rs | 93 +++++----- ecstore/src/cmd/bucket_targets.rs | 86 +++++----- ecstore/src/peer.rs | 8 +- s3select/query/Cargo.toml | 2 +- 9 files changed, 140 insertions(+), 351 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c6688da1..00f9570b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1709,7 +1709,7 @@ dependencies = [ "cocoa-foundation 0.1.2", "core-foundation 0.9.4", "core-graphics 0.23.2", - "foreign-types 0.5.0", + "foreign-types", "libc", "objc", ] @@ -1725,7 +1725,7 @@ dependencies = [ "cocoa-foundation 0.2.1", "core-foundation 0.10.1", "core-graphics 0.24.0", - "foreign-types 0.5.0", + "foreign-types", "libc", "objc", ] @@ -1982,7 +1982,7 @@ dependencies = [ "bitflags 1.3.2", "core-foundation 0.9.4", "core-graphics-types 0.1.3", - "foreign-types 0.5.0", + "foreign-types", "libc", ] @@ -1995,7 +1995,7 @@ dependencies = [ "bitflags 2.9.1", "core-foundation 0.10.1", "core-graphics-types 0.2.0", - "foreign-types 0.5.0", + "foreign-types", "libc", ] @@ -3523,7 +3523,6 @@ version = "0.0.1" dependencies = [ "async-trait", "aws-sdk-s3", - "aws-smithy-types", "backon", "base64-simd", "blake2", @@ -3539,14 +3538,12 @@ dependencies = [ "hex-simd", "highway", "http 1.3.1", - "hyper 1.6.0", "lazy_static", "lock", "madmin", "md-5", "netif", "nix 0.30.1", - "num", "num_cpus", "once_cell", "path-absolutize", @@ -3563,7 +3560,6 @@ dependencies = [ "rustfs-config", "rustfs-rsc", "s3s", - "s3s-policy", "serde", "serde_json", "sha2 0.11.0-pre.5", @@ -3577,7 +3573,6 @@ dependencies = [ "tokio-stream", "tokio-util", "tonic 0.13.1", - "tower 0.5.2", "tracing", "tracing-error", "transform-stream", @@ -3630,18 +3625,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3d8a32ae18130a3c84dd492d4215c3d913c3b07c6b63c2eb3eb7ff1101ab7bf" -[[package]] -name = "enum-as-inner" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc" -dependencies = [ - "heck 0.5.0", - "proc-macro2", - "quote", - "syn 2.0.101", -] - [[package]] name = "enumflags2" version = "0.7.11" @@ -3868,15 +3851,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared 0.1.1", -] - [[package]] name = "foreign-types" version = "0.5.0" @@ -3884,7 +3858,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" dependencies = [ "foreign-types-macros", - "foreign-types-shared 0.3.1", + "foreign-types-shared", ] [[package]] @@ -3898,12 +3872,6 @@ dependencies = [ "syn 2.0.101", ] -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "foreign-types-shared" version = "0.3.1" @@ -4582,51 +4550,6 @@ dependencies = [ "vsimd", ] -[[package]] -name = "hickory-proto" -version = "0.24.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92652067c9ce6f66ce53cc38d1169daa36e6e7eb7dd3b63b5103bd9d97117248" -dependencies = [ - "async-trait", - "cfg-if", - "data-encoding", - "enum-as-inner", - "futures-channel", - "futures-io", - "futures-util", - "idna", - "ipnet", - "once_cell", - "rand 0.8.5", - "thiserror 1.0.69", - "tinyvec", - "tokio", - "tracing", - "url", -] - -[[package]] -name = "hickory-resolver" -version = "0.24.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbb117a1ca520e111743ab2f6688eddee69db4e0ea242545a604dce8a66fd22e" -dependencies = [ - "cfg-if", - "futures-util", - "hickory-proto", - "ipconfig", - "lru-cache", - "once_cell", - "parking_lot 0.12.4", - "rand 0.8.5", - "resolv-conf", - "smallvec", - "thiserror 1.0.69", - "tokio", - "tracing", -] - [[package]] name = "highway" version = "1.3.0" @@ -4849,22 +4772,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "hyper-tls" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" -dependencies = [ - "bytes", - "http-body-util", - "hyper 1.6.0", - "hyper-util", - "native-tls", - "tokio", - "tokio-native-tls", - "tower-service", -] - [[package]] name = "hyper-util" version = "0.1.14" @@ -5151,18 +5058,6 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" -[[package]] -name = "ipconfig" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" -dependencies = [ - "socket2", - "widestring", - "windows-sys 0.48.0", - "winreg", -] - [[package]] name = "ipnet" version = "2.11.0" @@ -5598,12 +5493,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "linked-hash-map" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" - [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -5698,15 +5587,6 @@ dependencies = [ "hashbrown 0.15.3", ] -[[package]] -name = "lru-cache" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" -dependencies = [ - "linked-hash-map", -] - [[package]] name = "lru-slab" version = "0.1.2" @@ -5963,23 +5843,6 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" -[[package]] -name = "native-tls" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" -dependencies = [ - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework 2.11.1", - "security-framework-sys", - "tempfile", -] - [[package]] name = "ndk" version = "0.9.0" @@ -6593,50 +6456,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" -[[package]] -name = "openssl" -version = "0.10.73" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" -dependencies = [ - "bitflags 2.9.1", - "cfg-if", - "foreign-types 0.3.2", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.101", -] - [[package]] name = "openssl-probe" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" -[[package]] -name = "openssl-sys" -version = "0.9.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90096e2e47630d78b7d1c20952dc621f957103f8bc2c8359ec81290d75238571" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "opentelemetry" version = "0.29.1" @@ -7969,20 +7794,17 @@ dependencies = [ "futures-core", "futures-util", "h2 0.4.10", - "hickory-resolver", "http 1.3.1", "http-body 1.0.1", "http-body-util", "hyper 1.6.0", "hyper-rustls 0.27.6", - "hyper-tls", "hyper-util", "ipnet", "js-sys", "log", "mime", "mime_guess", - "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -7994,7 +7816,6 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", - "tokio-native-tls", "tokio-rustls 0.26.2", "tokio-util", "tower 0.5.2", @@ -8008,12 +7829,6 @@ dependencies = [ "webpki-roots", ] -[[package]] -name = "resolv-conf" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95325155c684b1c89f7765e30bc1c42e4a6da51ca513615660cb8a62ef9a88e3" - [[package]] name = "rfc6979" version = "0.3.1" @@ -8399,9 +8214,9 @@ dependencies = [ [[package]] name = "rustfs-rsc" -version = "2025.220.3" +version = "2025.506.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dedb827b094751bfc35c6f27b9332ea35eaacd564f8b985a135649fc69dfebdb" +checksum = "8229535cdd7a9d1f5757bd7b588f342e6ea984d66c6b1f6350f9de85d3ce9c25" dependencies = [ "async-stream", "base64 0.22.1", @@ -8678,17 +8493,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "s3s-policy" -version = "0.12.0-dev" -source = "git+https://github.com/Nugine/s3s.git?rev=4733cdfb27b2713e832967232cbff413bb768c10#4733cdfb27b2713e832967232cbff413bb768c10" -dependencies = [ - "indexmap 2.9.0", - "serde", - "serde_json", - "thiserror 2.0.12", -] - [[package]] name = "same-file" version = "1.0.6" @@ -9916,16 +9720,6 @@ dependencies = [ "syn 2.0.101", ] -[[package]] -name = "tokio-native-tls" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" -dependencies = [ - "native-tls", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.24.1" @@ -10914,12 +10708,6 @@ dependencies = [ "rustix 0.38.44", ] -[[package]] -name = "widestring" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd7cf3379ca1aac9eea11fba24fd7e315d621f8dfe35c8d7d2be8b793726e07d" - [[package]] name = "winapi" version = "0.3.9" @@ -11442,16 +11230,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "winreg" -version = "0.50.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" -dependencies = [ - "cfg-if", - "windows-sys 0.48.0", -] - [[package]] name = "wit-bindgen-rt" version = "0.39.0" diff --git a/Cargo.toml b/Cargo.toml index 1e955723..0157efac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,7 +110,6 @@ mime_guess = "2.0.5" netif = "0.1.6" nix = { version = "0.30.1", features = ["fs"] } nu-ansi-term = "0.50.1" -num = "0.4.3" num_cpus = { version = "1.17.0" } nvml-wrapper = "0.10.0" object_store = "0.11.2" @@ -132,7 +131,6 @@ pin-project-lite = "0.2.16" # pin-utils = "0.1.0" prost = "0.13.5" prost-build = "0.13.5" -prost-types = "0.13.5" protobuf = "3.7" rand = "0.8.5" rdkafka = { version = "0.37.0", features = ["tokio"] } @@ -155,6 +153,7 @@ rmp = "0.8.14" rmp-serde = "1.3.0" rumqttc = { version = "0.24" } rust-embed = { version = "8.7.2" } +rustfs-rsc = "2025.506.1" rustls = { version = "0.23.27" } rustls-pki-types = "1.12.0" rustls-pemfile = "2.2.0" diff --git a/common/common/src/bucket_stats.rs b/common/common/src/bucket_stats.rs index 4e0be53e..43970e8c 100644 --- a/common/common/src/bucket_stats.rs +++ b/common/common/src/bucket_stats.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, i64, u64}; +use std::collections::HashMap; use crate::last_minute::{self}; pub struct ReplicationLatency { @@ -10,7 +10,7 @@ impl ReplicationLatency { // 合并两个 ReplicationLatency pub fn merge(&mut self, other: &mut ReplicationLatency) -> &ReplicationLatency { self.upload_histogram.merge(&other.upload_histogram); - return self; + self } // 获取上传延迟(按对象大小区间分类) diff --git a/common/common/src/last_minute.rs b/common/common/src/last_minute.rs index 2b0c6f83..71d2648a 100644 --- a/common/common/src/last_minute.rs +++ b/common/common/src/last_minute.rs @@ -1,4 +1,4 @@ -use std::{time::{Duration, SystemTime, UNIX_EPOCH}}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; #[allow(dead_code)] #[derive(Debug, Default)] @@ -60,18 +60,18 @@ enum SizeCategory { SizeLastElemMarker, } -#[allow(dead_code)] -impl SizeCategory { - fn to_string(&self) -> String { - match *self { - SizeCategory::SizeLessThan1KiB => "SizeLessThan1KiB".to_string(), - SizeCategory::SizeLessThan1MiB => "SizeLessThan1MiB".to_string(), - SizeCategory::SizeLessThan10MiB => "SizeLessThan10MiB".to_string(), - SizeCategory::SizeLessThan100MiB => "SizeLessThan100MiB".to_string(), - SizeCategory::SizeLessThan1GiB => "SizeLessThan1GiB".to_string(), - SizeCategory::SizeGreaterThan1GiB => "SizeGreaterThan1GiB".to_string(), - SizeCategory::SizeLastElemMarker => "SizeLastElemMarker".to_string(), - } +impl std::fmt::Display for SizeCategory { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let s = match *self { + SizeCategory::SizeLessThan1KiB => "SizeLessThan1KiB", + SizeCategory::SizeLessThan1MiB => "SizeLessThan1MiB", + SizeCategory::SizeLessThan10MiB => "SizeLessThan10MiB", + SizeCategory::SizeLessThan100MiB => "SizeLessThan100MiB", + SizeCategory::SizeLessThan1GiB => "SizeLessThan1GiB", + SizeCategory::SizeGreaterThan1GiB => "SizeGreaterThan1GiB", + SizeCategory::SizeLastElemMarker => "SizeLastElemMarker", + }; + write!(f, "{}", s) } } @@ -490,7 +490,7 @@ mod tests { latency2.totals[0].total = 20; latency2.totals[0].n = 3; - let merged = latency1.merge(&mut latency2); + let merged = latency1.merge(&latency2); assert_eq!(merged.last_sec, 1000); assert_eq!(merged.totals[0].total, 30); // 10 + 20 @@ -509,7 +509,7 @@ mod tests { latency1.totals[0].total = 10; latency2.totals[0].total = 20; - let merged = latency1.merge(&mut latency2); + let merged = latency1.merge(&latency2); assert_eq!(merged.last_sec, 1010); // Should use the later time assert_eq!(merged.totals[0].total, 30); @@ -518,9 +518,9 @@ mod tests { #[test] fn test_last_minute_latency_merge_empty() { let mut latency1 = LastMinuteLatency::default(); - let mut latency2 = LastMinuteLatency::default(); + let latency2 = LastMinuteLatency::default(); - let merged = latency1.merge(&mut latency2); + let merged = latency1.merge(&latency2); assert_eq!(merged.last_sec, 0); for elem in &merged.totals { @@ -633,7 +633,7 @@ mod tests { n: 5, }; - let cloned = elem.clone(); + let cloned = elem; assert_eq!(elem.total, cloned.total); assert_eq!(elem.size, cloned.size); assert_eq!(elem.n, cloned.n); @@ -831,11 +831,8 @@ mod tests { } } - - const SIZE_LAST_ELEM_MARKER: usize = 10; // 这里假设你的 marker 是 10,请根据实际情况修改 - #[allow(dead_code)] #[derive(Debug, Default)] pub struct LastMinuteHistogram { @@ -843,7 +840,6 @@ pub struct LastMinuteHistogram { size: u32, } - impl LastMinuteHistogram { pub fn merge(&mut self, other: &LastMinuteHistogram) { for i in 0..self.histogram.len() { @@ -867,11 +863,11 @@ impl LastMinuteHistogram { fn size_to_tag(size: i64) -> usize { match size { - _ if size < 1024 => 0, // sizeLessThan1KiB - _ if size < 1024 * 1024 => 1, // sizeLessThan1MiB - _ if size < 10 * 1024 * 1024 => 2, // sizeLessThan10MiB - _ if size < 100 * 1024 * 1024 => 3, // sizeLessThan100MiB + _ if size < 1024 => 0, // sizeLessThan1KiB + _ if size < 1024 * 1024 => 1, // sizeLessThan1MiB + _ if size < 10 * 1024 * 1024 => 2, // sizeLessThan10MiB + _ if size < 100 * 1024 * 1024 => 3, // sizeLessThan100MiB _ if size < 1024 * 1024 * 1024 => 4, // sizeLessThan1GiB - _ => 5, // sizeGreaterThan1GiB + _ => 5, // sizeGreaterThan1GiB } -} \ No newline at end of file +} diff --git a/ecstore/Cargo.toml b/ecstore/Cargo.toml index dfb2c9ab..b9fd940c 100644 --- a/ecstore/Cargo.toml +++ b/ecstore/Cargo.toml @@ -57,11 +57,8 @@ tempfile.workspace = true tokio = { workspace = true, features = ["io-util", "sync", "signal"] } tokio-stream = { workspace = true } tonic.workspace = true -tower.workspace = true xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] } -num = { workspace = true } num_cpus = { workspace = true } -s3s-policy.workspace = true rand.workspace = true pin-project-lite.workspace = true md-5.workspace = true @@ -70,9 +67,7 @@ workers.workspace = true reqwest = { workspace = true } aws-sdk-s3 = { workspace = true } once_cell = { workspace = true } -aws-smithy-types = "1.2.13" -rustfs-rsc = "2025.220.3" -hyper.workspace = true +rustfs-rsc = { workspace = true } urlencoding = { workspace = true } smallvec = { workspace = true } shadow-rs.workspace = true diff --git a/ecstore/src/cmd/bucket_replication.rs b/ecstore/src/cmd/bucket_replication.rs index 09b5173a..5dcf14d1 100644 --- a/ecstore/src/cmd/bucket_replication.rs +++ b/ecstore/src/cmd/bucket_replication.rs @@ -1,12 +1,5 @@ #![allow(unused_variables)] #![allow(dead_code)] -use aws_sdk_s3::config::BehaviorVersion; -use aws_sdk_s3::config::Credentials; -use aws_sdk_s3::config::Region; -use aws_sdk_s3::Client as S3Client; -use aws_sdk_s3::Config; -use bytes::Bytes; -use uuid::Uuid; // use error::Error; use crate::bucket::metadata_sys::get_replication_config; use crate::bucket::versioning_sys::BucketVersioningSys; @@ -19,6 +12,12 @@ use crate::store_api::ObjectInfo; use crate::store_api::ObjectOptions; use crate::store_api::ObjectToDelete; use crate::StorageAPI; +use aws_sdk_s3::config::BehaviorVersion; +use aws_sdk_s3::config::Credentials; +use aws_sdk_s3::config::Region; +use aws_sdk_s3::Client as S3Client; +use aws_sdk_s3::Config; +use bytes::Bytes; use chrono::DateTime; use chrono::Duration; use chrono::Utc; @@ -28,6 +27,8 @@ use futures::StreamExt; use http::HeaderMap; use http::Method; use lazy_static::lazy_static; +// use std::time::SystemTime; +use once_cell::sync::Lazy; use regex::Regex; use rustfs_rsc::provider::StaticProvider; use rustfs_rsc::Minio; @@ -47,14 +48,13 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::vec; use time::OffsetDateTime; -use tokio::sync::Mutex; -use tokio::task; -use tracing::{debug, error, warn, info}; -use xxhash_rust::xxh3::xxh3_64; -// use std::time::SystemTime; -use once_cell::sync::Lazy; use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::Mutex; use tokio::sync::RwLock; +use tokio::task; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; +use xxhash_rust::xxh3::xxh3_64; // use bucket_targets::{self, GLOBAL_Bucket_Target_Sys}; #[derive(Serialize, Deserialize, Debug)] @@ -222,9 +222,7 @@ pub async fn queue_replication_heal( return None; } - if oi.replication_status == ReplicationStatusType::Completed - && !roi.existing_obj_resync.must_resync() - { + if oi.replication_status == ReplicationStatusType::Completed && !roi.existing_obj_resync.must_resync() { return None; } @@ -340,10 +338,7 @@ pub async fn check_replicate_delete( user_tags: Some(oi.user_tags.clone()), delete_marker: oi.delete_marker, //version_id: dobj.version_id.clone().map(|v| v.to_string()), - version_id: oi - .version_id - .map(|uuid| uuid.to_string()) - .unwrap_or_default(), + version_id: oi.version_id.map(|uuid| uuid.to_string()).unwrap_or_default(), op_type: ReplicationType::DeleteReplicationType, target_arn: None, replica: true, @@ -455,9 +450,9 @@ pub async fn get_heal_replicate_object_info( &oi.bucket, &ObjectToDelete { object_name: oi.name.clone(), - version_id: oi.version_id.clone(), + version_id: oi.version_id, }, - &oi, + oi, &ObjectOptions { // versioned: global_bucket_versioning_sys::prefix_enabled(&oi.bucket, &oi.name), // version_suspended: global_bucket_versioning_sys::prefix_suspended(&oi.bucket, &oi.name), @@ -466,7 +461,8 @@ pub async fn get_heal_replicate_object_info( ..Default::default() }, None, - ).await + ) + .await } else { // let opts: ObjectOptions = put_opts(&bucket, &key, version_id, &req.headers, Some(mt)) // .await @@ -480,8 +476,13 @@ pub async fn get_heal_replicate_object_info( mod_time: oi.mod_time, ..Default::default() }; - let repoptions = - get_must_replicate_options(mt2.as_ref().unwrap_or(&HashMap::new()), "", ReplicationStatusType::Unknown, ReplicationType::ObjectReplicationType, &opts); + let repoptions = get_must_replicate_options( + mt2.as_ref().unwrap_or(&HashMap::new()), + "", + ReplicationStatusType::Unknown, + ReplicationType::ObjectReplicationType, + &opts, + ); let decision = must_replicate(&oi.bucket, &oi.name, &repoptions).await; error!("decision:"); @@ -509,7 +510,10 @@ pub async fn get_heal_replicate_object_info( let asz = oi.get_actual_size().unwrap_or(0); let key = format!("{}{}", RESERVED_METADATA_PREFIX_LOWER, REPLICATION_TIMESTAMP); - let tm: Option> = user_defined.as_ref().unwrap().get(&key) + let tm: Option> = user_defined + .as_ref() + .unwrap() + .get(&key) .and_then(|v| DateTime::parse_from_rfc3339(v).ok()) .map(|dt| dt.with_timezone(&Utc)); @@ -536,7 +540,7 @@ pub async fn get_heal_replicate_object_info( existing_obj_resync: Default::default(), target_statuses: tgt_statuses, target_purge_statuses: purge_statuses, - replication_timestamp: tm.unwrap_or_else(|| chrono::Utc::now()), + replication_timestamp: tm.unwrap_or_else(|| Utc::now), //ssec: crypto::is_encrypted(&oi.user_defined), ssec: false, user_tags: oi.user_tags.clone(), @@ -551,7 +555,10 @@ pub async fn get_heal_replicate_object_info( result.checksum = oi.checksum.clone(); } - warn!("Replication heal for object {} in bucket {} is configured {:?}", oi.name, oi.bucket, oi.version_id); + warn!( + "Replication heal for object {} in bucket {} is configured {:?}", + oi.name, oi.bucket, oi.version_id + ); result } @@ -691,7 +698,7 @@ impl ReplicationPool { mrf_worker_size: workers, priority, max_workers, - obj_layer: obj_layer, + obj_layer, }; warn!("work size is: {}", workers); @@ -932,7 +939,7 @@ impl ReplicationPool { let max_workers = max_workers.min(WORKER_MAX_LIMIT); if worker_count < max_workers as i32 { //self.resize_workers((worker_count + 1 as usize).try_into().unwrap(), worker_count).await; - self.resize_workers(worker_count as usize + 1 as usize, Some(worker_count as usize)) + self.resize_workers(worker_count as usize + 1_usize, Some(worker_count as usize)) .await; } @@ -949,6 +956,12 @@ impl ReplicationPool { pub struct ReplicationResyncer; +impl Default for ReplicationResyncer { + fn default() -> Self { + Self + } +} + impl ReplicationResyncer { pub fn new() -> Self { Self @@ -967,6 +980,7 @@ pub async fn init_bucket_replication_pool() { warn!("init bucket replication pool"); ReplicationPool::init_bucket_replication_pool(store, opts, stat).await; } else { + // TODO: to be added } } @@ -1736,10 +1750,10 @@ impl TraitForObjectInfo for ObjectInfo { fn convert_offsetdatetime_to_chrono(offset_dt: Option) -> Option> { //offset_dt.map(|odt| { - let tm = offset_dt.unwrap().unix_timestamp(); - //let naive = NaiveDateTime::from_timestamp_opt(tm, 0).expect("Invalid timestamp"); - DateTime::::from_timestamp(tm, 0) - //DateTime::from_naive_utc_and_offset(naive, Utc) // Convert to Utc first + let tm = offset_dt.unwrap().unix_timestamp(); + //let naive = NaiveDateTime::from_timestamp_opt(tm, 0).expect("Invalid timestamp"); + DateTime::::from_timestamp(tm, 0) + //DateTime::from_naive_utc_and_offset(naive, Utc) // Convert to Utc first //}) } @@ -1860,8 +1874,8 @@ pub async fn must_replicate(bucket: &str, object: &str, mopts: &MustReplicateOpt //decision.set(ReplicateTargetDecision::new(replicate,synchronous)); info!("targe decision arn is:{}", tgt_arn.clone()); decision.set(ReplicateTargetDecision { - replicate: replicate, - synchronous: synchronous, + replicate, + synchronous, arn: tgt_arn.clone(), id: 0.to_string(), }); @@ -2227,7 +2241,7 @@ async fn replicate_object_with_multipart( return Err(err.into()); } } - return Ok(()); + Ok(()) } impl ReplicateObjectInfo { @@ -2374,7 +2388,7 @@ impl ReplicateObjectInfo { .await; match ret { Ok(_res) => { - warn!("replicate suc: {} {} {}", self.bucket, self.name, self.version_id); + warn!("replicate suc: {} {} {}", self.bucket, self.name, self.version_id); rinfo.replication_status = ReplicationStatusType::Completed; } Err(err) => { @@ -2465,7 +2479,7 @@ pub fn get_must_replicate_options( MustReplicateOptions { meta, - status: status, + status, op_type: op, replication_request: opts.replication_request, } @@ -2561,7 +2575,6 @@ impl ReplicatedInfos { } } - impl ReplicatedTargetInfo { fn empty(&self) -> bool { // Implement your logic to check if the target is empty diff --git a/ecstore/src/cmd/bucket_targets.rs b/ecstore/src/cmd/bucket_targets.rs index f359dbce..3cfba660 100644 --- a/ecstore/src/cmd/bucket_targets.rs +++ b/ecstore/src/cmd/bucket_targets.rs @@ -1,26 +1,27 @@ #![allow(unused_variables)] #![allow(dead_code)] +use crate::{ + bucket::{self, target::BucketTargets}, + new_object_layer_fn, peer, store_api, +}; use crate::{ bucket::{metadata_sys, target::BucketTarget}, endpoints::Node, peer::{PeerS3Client, RemotePeerS3Client}, StorageAPI, }; -use crate::{ - bucket::{self, target::BucketTargets}, - new_object_layer_fn, peer, store_api, -}; +//use tokio::sync::RwLock; +use aws_sdk_s3::Client as S3Client; use chrono::Utc; +use futures::future::err; use lazy_static::lazy_static; +use std::sync::Arc; use std::{ collections::HashMap, time::{Duration, SystemTime}, }; -//use tokio::sync::RwLock; -use aws_sdk_s3::Client as S3Client; -use std::sync::{Arc}; -use tokio::sync::RwLock; use thiserror::Error; +use tokio::sync::RwLock; pub struct TClient { pub s3cli: S3Client, @@ -78,21 +79,21 @@ pub struct ArnTarget { last_refresh: chrono::DateTime, } impl ArnTarget { - pub fn new(bucket: String, endpoint: String, ak:String, sk:String) -> Self { + pub fn new(bucket: String, endpoint: String, ak: String, sk: String) -> Self { Self { client: TargetClient { - bucket: bucket, + bucket, storage_class: "STANDRD".to_string(), disable_proxy: false, health_check_duration: Duration::from_secs(100), - endpoint: endpoint, + endpoint, reset_id: "0".to_string(), replicate_sync: false, secure: false, arn: "".to_string(), client: reqwest::Client::new(), - ak:ak, - sk:sk, + ak, + sk, }, last_refresh: Utc::now(), } @@ -149,9 +150,9 @@ pub struct BucketRemoteTargetNotFound { pub async fn init_bucket_targets(bucket: &str, meta: Arc) { println!("140 {}", bucket); if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() { - if let Some(tgts) = meta.bucket_target_config.clone() { + if let Some(tgts) = meta.bucket_target_config.clone() { for tgt in tgts.targets { - warn!("ak and sk is:{:?}",tgt.credentials); + warn!("ak and sk is:{:?}", tgt.credentials); let _ = sys.set_target(bucket, &tgt, false, true).await; //sys.targets_map. } @@ -159,12 +160,17 @@ pub async fn init_bucket_targets(bucket: &str, meta: Arc Self { + Self::new() + } +} impl BucketTargetSys { pub fn new() -> Self { @@ -238,7 +244,7 @@ impl BucketTargetSys { targets } - pub async fn remove_target(&self, bucket:&str, arn_str:&str) -> Result<(), SetTargetError> { + pub async fn remove_target(&self, bucket: &str, arn_str: &str) -> Result<(), SetTargetError> { //to do need lock; let mut targets_map = self.targets_map.write().await; let tgts = targets_map.get(bucket); @@ -263,14 +269,13 @@ impl BucketTargetSys { // 如果没有找到匹配的 ARN,则返回错误 if !found { - return Ok(()); + return Ok(()); } // 更新 targets_map targets_map.insert(bucket.to_string(), targets); arn_remotes_map.remove(arn_str); - let targets = self.list_targets(Some(&bucket), None).await; println!("targets is {}", targets.len()); match serde_json::to_vec(&targets) { @@ -376,11 +381,11 @@ impl BucketTargetSys { match store.get_bucket_info(_bucket, &store_api::BucketOptions::default()).await { Ok(info) => { println!("Bucket Info: {:?}", info); - return info.versionning; + info.versionning } Err(err) => { eprintln!("Error: {:?}", err); - return false; + false } } } @@ -419,13 +424,11 @@ impl BucketTargetSys { // } //let client = self.get_remote_target_client(tgt).await?; - if tgt.type_ == Some("replication".to_string()) { - if !fromdisk { - let versioning_config = self.local_is_bucket_versioned(bucket).await; - if !versioning_config { - // println!("111111111"); - return Err(SetTargetError::TargetNotVersioned(bucket.to_string())); - } + if tgt.type_ == Some("replication".to_string()) && !fromdisk { + let versioning_config = self.local_is_bucket_versioned(bucket).await; + if !versioning_config { + // println!("111111111"); + return Err(SetTargetError::TargetNotVersioned(bucket.to_string())); } } @@ -502,7 +505,12 @@ impl BucketTargetSys { println!("437 exist:{}", tgt.arn.clone().unwrap()); targets.push(tgt.clone()); } - let arntgt: ArnTarget = ArnTarget::new(tgt.target_bucket.clone(), tgt.endpoint.clone(),tgt.credentials.clone().unwrap().access_key.clone(), tgt.credentials.clone().unwrap().secret_key); + let arntgt: ArnTarget = ArnTarget::new( + tgt.target_bucket.clone(), + tgt.endpoint.clone(), + tgt.credentials.clone().unwrap().access_key.clone(), + tgt.credentials.clone().unwrap().secret_key, + ); arn_remotes_map.insert(tgt.arn.clone().unwrap().clone(), arntgt); //self.update_bandwidth_limit(bucket, &tgt.arn, tgt.bandwidth_limit).await; @@ -523,8 +531,8 @@ pub struct TargetClient { pub reset_id: String, pub endpoint: String, pub secure: bool, - pub ak:String, - pub sk:String, + pub ak: String, + pub sk: String, } impl TargetClient { @@ -539,8 +547,8 @@ impl TargetClient { reset_id: String, endpoint: String, secure: bool, - ak:String, - sk:String, + ak: String, + sk: String, ) -> Self { TargetClient { client, @@ -561,7 +569,7 @@ impl TargetClient { Ok(true) // Mocked implementation } } -use tracing::{error, warn, info}; +use tracing::{error, info, warn}; use uuid::Uuid; #[derive(Debug, Clone)] @@ -613,7 +621,7 @@ impl ARN { /// 检查 ARN 是否为空 pub fn is_empty(&self) -> bool { //!self.arn_type.is_valid() - return false; + false } /// 将 ARN 转为字符串格式 @@ -625,12 +633,12 @@ impl ARN { pub fn parse(s: &str) -> Result { // ARN 必须是格式 arn:rustfs:::: if !s.starts_with("arn:rustfs:") { - return Err(format!("Invalid ARN {}", s).into()); + return Err(format!("Invalid ARN {}", s)); } let tokens: Vec<&str> = s.split(':').collect(); if tokens.len() != 6 || tokens[4].is_empty() || tokens[5].is_empty() { - return Err(format!("Invalid ARN {}", s).into()); + return Err(format!("Invalid ARN {}", s)); } Ok(ARN { @@ -661,7 +669,7 @@ fn must_get_uuid() -> String { } fn generate_arn(target: BucketTarget, depl_id: String) -> String { let mut uuid: String = depl_id; - if uuid == "" { + if uuid.is_empty() { uuid = must_get_uuid(); } @@ -671,7 +679,7 @@ fn generate_arn(target: BucketTarget, depl_id: String) -> String { region: "us-east-1".to_string(), bucket: (target.target_bucket), }; - return arn.to_string(); + arn.to_string() } // use std::collections::HashMap; diff --git a/ecstore/src/peer.rs b/ecstore/src/peer.rs index 7d68744d..ff0f225b 100644 --- a/ecstore/src/peer.rs +++ b/ecstore/src/peer.rs @@ -434,7 +434,7 @@ impl PeerS3Client for LocalPeerS3Client { // TODO: reduceWriteQuorumErrs let mut versioned = false; if let Ok(sys) = metadata_sys::get(bucket).await { - versioned = sys.versioning(); + versioned = sys.versioning(); } ress.iter() @@ -505,13 +505,13 @@ impl RemotePeerS3Client { let addr = node.as_ref().map(|v| v.url.to_string()).unwrap_or_default().to_string(); Self { node, pools, addr } } - pub fn get_addr(&self)->String { - return self.addr.clone(); + pub fn get_addr(&self) -> String { + self.addr.clone() } } #[async_trait] -impl PeerS3Client for RemotePeerS3Client { +impl PeerS3Client for RemotePeerS3Client { fn get_pools(&self) -> Option> { self.pools.clone() } diff --git a/s3select/query/Cargo.toml b/s3select/query/Cargo.toml index 6629927c..0240b880 100644 --- a/s3select/query/Cargo.toml +++ b/s3select/query/Cargo.toml @@ -11,7 +11,7 @@ datafusion = { workspace = true } derive_builder = { workspace = true } futures = { workspace = true } lazy_static = { workspace = true } -parking_lot = { version = "0.12.3" } +parking_lot = { workspace = true } s3s.workspace = true snafu = { workspace = true, features = ["backtrace"] } tokio = { workspace = true }