fix: remove dep crate openssl relation

This commit is contained in:
houseme
2025-06-06 15:13:55 +08:00
parent 6302764512
commit 2fec6d161c
9 changed files with 140 additions and 351 deletions

236
Cargo.lock generated
View File

@@ -1709,7 +1709,7 @@ dependencies = [
"cocoa-foundation 0.1.2",
"core-foundation 0.9.4",
"core-graphics 0.23.2",
"foreign-types 0.5.0",
"foreign-types",
"libc",
"objc",
]
@@ -1725,7 +1725,7 @@ dependencies = [
"cocoa-foundation 0.2.1",
"core-foundation 0.10.1",
"core-graphics 0.24.0",
"foreign-types 0.5.0",
"foreign-types",
"libc",
"objc",
]
@@ -1982,7 +1982,7 @@ dependencies = [
"bitflags 1.3.2",
"core-foundation 0.9.4",
"core-graphics-types 0.1.3",
"foreign-types 0.5.0",
"foreign-types",
"libc",
]
@@ -1995,7 +1995,7 @@ dependencies = [
"bitflags 2.9.1",
"core-foundation 0.10.1",
"core-graphics-types 0.2.0",
"foreign-types 0.5.0",
"foreign-types",
"libc",
]
@@ -3523,7 +3523,6 @@ version = "0.0.1"
dependencies = [
"async-trait",
"aws-sdk-s3",
"aws-smithy-types",
"backon",
"base64-simd",
"blake2",
@@ -3539,14 +3538,12 @@ dependencies = [
"hex-simd",
"highway",
"http 1.3.1",
"hyper 1.6.0",
"lazy_static",
"lock",
"madmin",
"md-5",
"netif",
"nix 0.30.1",
"num",
"num_cpus",
"once_cell",
"path-absolutize",
@@ -3563,7 +3560,6 @@ dependencies = [
"rustfs-config",
"rustfs-rsc",
"s3s",
"s3s-policy",
"serde",
"serde_json",
"sha2 0.11.0-pre.5",
@@ -3577,7 +3573,6 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tonic 0.13.1",
"tower 0.5.2",
"tracing",
"tracing-error",
"transform-stream",
@@ -3630,18 +3625,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3d8a32ae18130a3c84dd492d4215c3d913c3b07c6b63c2eb3eb7ff1101ab7bf"
[[package]]
name = "enum-as-inner"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc"
dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.101",
]
[[package]]
name = "enumflags2"
version = "0.7.11"
@@ -3868,15 +3851,6 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared 0.1.1",
]
[[package]]
name = "foreign-types"
version = "0.5.0"
@@ -3884,7 +3858,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965"
dependencies = [
"foreign-types-macros",
"foreign-types-shared 0.3.1",
"foreign-types-shared",
]
[[package]]
@@ -3898,12 +3872,6 @@ dependencies = [
"syn 2.0.101",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "foreign-types-shared"
version = "0.3.1"
@@ -4582,51 +4550,6 @@ dependencies = [
"vsimd",
]
[[package]]
name = "hickory-proto"
version = "0.24.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92652067c9ce6f66ce53cc38d1169daa36e6e7eb7dd3b63b5103bd9d97117248"
dependencies = [
"async-trait",
"cfg-if",
"data-encoding",
"enum-as-inner",
"futures-channel",
"futures-io",
"futures-util",
"idna",
"ipnet",
"once_cell",
"rand 0.8.5",
"thiserror 1.0.69",
"tinyvec",
"tokio",
"tracing",
"url",
]
[[package]]
name = "hickory-resolver"
version = "0.24.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbb117a1ca520e111743ab2f6688eddee69db4e0ea242545a604dce8a66fd22e"
dependencies = [
"cfg-if",
"futures-util",
"hickory-proto",
"ipconfig",
"lru-cache",
"once_cell",
"parking_lot 0.12.4",
"rand 0.8.5",
"resolv-conf",
"smallvec",
"thiserror 1.0.69",
"tokio",
"tracing",
]
[[package]]
name = "highway"
version = "1.3.0"
@@ -4849,22 +4772,6 @@ dependencies = [
"tower-service",
]
[[package]]
name = "hyper-tls"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0"
dependencies = [
"bytes",
"http-body-util",
"hyper 1.6.0",
"hyper-util",
"native-tls",
"tokio",
"tokio-native-tls",
"tower-service",
]
[[package]]
name = "hyper-util"
version = "0.1.14"
@@ -5151,18 +5058,6 @@ version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
[[package]]
name = "ipconfig"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f"
dependencies = [
"socket2",
"widestring",
"windows-sys 0.48.0",
"winreg",
]
[[package]]
name = "ipnet"
version = "2.11.0"
@@ -5598,12 +5493,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "linked-hash-map"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "linux-raw-sys"
version = "0.4.15"
@@ -5698,15 +5587,6 @@ dependencies = [
"hashbrown 0.15.3",
]
[[package]]
name = "lru-cache"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c"
dependencies = [
"linked-hash-map",
]
[[package]]
name = "lru-slab"
version = "0.1.2"
@@ -5963,23 +5843,6 @@ version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
[[package]]
name = "native-tls"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e"
dependencies = [
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework 2.11.1",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "ndk"
version = "0.9.0"
@@ -6593,50 +6456,12 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "openssl"
version = "0.10.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8"
dependencies = [
"bitflags 2.9.1",
"cfg-if",
"foreign-types 0.3.2",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.101",
]
[[package]]
name = "openssl-probe"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "openssl-sys"
version = "0.9.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90096e2e47630d78b7d1c20952dc621f957103f8bc2c8359ec81290d75238571"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "opentelemetry"
version = "0.29.1"
@@ -7969,20 +7794,17 @@ dependencies = [
"futures-core",
"futures-util",
"h2 0.4.10",
"hickory-resolver",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
"hyper 1.6.0",
"hyper-rustls 0.27.6",
"hyper-tls",
"hyper-util",
"ipnet",
"js-sys",
"log",
"mime",
"mime_guess",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
@@ -7994,7 +7816,6 @@ dependencies = [
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tokio-native-tls",
"tokio-rustls 0.26.2",
"tokio-util",
"tower 0.5.2",
@@ -8008,12 +7829,6 @@ dependencies = [
"webpki-roots",
]
[[package]]
name = "resolv-conf"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95325155c684b1c89f7765e30bc1c42e4a6da51ca513615660cb8a62ef9a88e3"
[[package]]
name = "rfc6979"
version = "0.3.1"
@@ -8399,9 +8214,9 @@ dependencies = [
[[package]]
name = "rustfs-rsc"
version = "2025.220.3"
version = "2025.506.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dedb827b094751bfc35c6f27b9332ea35eaacd564f8b985a135649fc69dfebdb"
checksum = "8229535cdd7a9d1f5757bd7b588f342e6ea984d66c6b1f6350f9de85d3ce9c25"
dependencies = [
"async-stream",
"base64 0.22.1",
@@ -8678,17 +8493,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "s3s-policy"
version = "0.12.0-dev"
source = "git+https://github.com/Nugine/s3s.git?rev=4733cdfb27b2713e832967232cbff413bb768c10#4733cdfb27b2713e832967232cbff413bb768c10"
dependencies = [
"indexmap 2.9.0",
"serde",
"serde_json",
"thiserror 2.0.12",
]
[[package]]
name = "same-file"
version = "1.0.6"
@@ -9916,16 +9720,6 @@ dependencies = [
"syn 2.0.101",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
@@ -10914,12 +10708,6 @@ dependencies = [
"rustix 0.38.44",
]
[[package]]
name = "widestring"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd7cf3379ca1aac9eea11fba24fd7e315d621f8dfe35c8d7d2be8b793726e07d"
[[package]]
name = "winapi"
version = "0.3.9"
@@ -11442,16 +11230,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
dependencies = [
"cfg-if",
"windows-sys 0.48.0",
]
[[package]]
name = "wit-bindgen-rt"
version = "0.39.0"

View File

@@ -110,7 +110,6 @@ mime_guess = "2.0.5"
netif = "0.1.6"
nix = { version = "0.30.1", features = ["fs"] }
nu-ansi-term = "0.50.1"
num = "0.4.3"
num_cpus = { version = "1.17.0" }
nvml-wrapper = "0.10.0"
object_store = "0.11.2"
@@ -132,7 +131,6 @@ pin-project-lite = "0.2.16"
# pin-utils = "0.1.0"
prost = "0.13.5"
prost-build = "0.13.5"
prost-types = "0.13.5"
protobuf = "3.7"
rand = "0.8.5"
rdkafka = { version = "0.37.0", features = ["tokio"] }
@@ -155,6 +153,7 @@ rmp = "0.8.14"
rmp-serde = "1.3.0"
rumqttc = { version = "0.24" }
rust-embed = { version = "8.7.2" }
rustfs-rsc = "2025.506.1"
rustls = { version = "0.23.27" }
rustls-pki-types = "1.12.0"
rustls-pemfile = "2.2.0"

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, i64, u64};
use std::collections::HashMap;
use crate::last_minute::{self};
pub struct ReplicationLatency {
@@ -10,7 +10,7 @@ impl ReplicationLatency {
// 合并两个 ReplicationLatency
pub fn merge(&mut self, other: &mut ReplicationLatency) -> &ReplicationLatency {
self.upload_histogram.merge(&other.upload_histogram);
return self;
self
}
// 获取上传延迟(按对象大小区间分类)

View File

@@ -1,4 +1,4 @@
use std::{time::{Duration, SystemTime, UNIX_EPOCH}};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[allow(dead_code)]
#[derive(Debug, Default)]
@@ -60,18 +60,18 @@ enum SizeCategory {
SizeLastElemMarker,
}
#[allow(dead_code)]
impl SizeCategory {
fn to_string(&self) -> String {
match *self {
SizeCategory::SizeLessThan1KiB => "SizeLessThan1KiB".to_string(),
SizeCategory::SizeLessThan1MiB => "SizeLessThan1MiB".to_string(),
SizeCategory::SizeLessThan10MiB => "SizeLessThan10MiB".to_string(),
SizeCategory::SizeLessThan100MiB => "SizeLessThan100MiB".to_string(),
SizeCategory::SizeLessThan1GiB => "SizeLessThan1GiB".to_string(),
SizeCategory::SizeGreaterThan1GiB => "SizeGreaterThan1GiB".to_string(),
SizeCategory::SizeLastElemMarker => "SizeLastElemMarker".to_string(),
}
impl std::fmt::Display for SizeCategory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match *self {
SizeCategory::SizeLessThan1KiB => "SizeLessThan1KiB",
SizeCategory::SizeLessThan1MiB => "SizeLessThan1MiB",
SizeCategory::SizeLessThan10MiB => "SizeLessThan10MiB",
SizeCategory::SizeLessThan100MiB => "SizeLessThan100MiB",
SizeCategory::SizeLessThan1GiB => "SizeLessThan1GiB",
SizeCategory::SizeGreaterThan1GiB => "SizeGreaterThan1GiB",
SizeCategory::SizeLastElemMarker => "SizeLastElemMarker",
};
write!(f, "{}", s)
}
}
@@ -490,7 +490,7 @@ mod tests {
latency2.totals[0].total = 20;
latency2.totals[0].n = 3;
let merged = latency1.merge(&mut latency2);
let merged = latency1.merge(&latency2);
assert_eq!(merged.last_sec, 1000);
assert_eq!(merged.totals[0].total, 30); // 10 + 20
@@ -509,7 +509,7 @@ mod tests {
latency1.totals[0].total = 10;
latency2.totals[0].total = 20;
let merged = latency1.merge(&mut latency2);
let merged = latency1.merge(&latency2);
assert_eq!(merged.last_sec, 1010); // Should use the later time
assert_eq!(merged.totals[0].total, 30);
@@ -518,9 +518,9 @@ mod tests {
#[test]
fn test_last_minute_latency_merge_empty() {
let mut latency1 = LastMinuteLatency::default();
let mut latency2 = LastMinuteLatency::default();
let latency2 = LastMinuteLatency::default();
let merged = latency1.merge(&mut latency2);
let merged = latency1.merge(&latency2);
assert_eq!(merged.last_sec, 0);
for elem in &merged.totals {
@@ -633,7 +633,7 @@ mod tests {
n: 5,
};
let cloned = elem.clone();
let cloned = elem;
assert_eq!(elem.total, cloned.total);
assert_eq!(elem.size, cloned.size);
assert_eq!(elem.n, cloned.n);
@@ -831,11 +831,8 @@ mod tests {
}
}
const SIZE_LAST_ELEM_MARKER: usize = 10; // 这里假设你的 marker 是 10请根据实际情况修改
#[allow(dead_code)]
#[derive(Debug, Default)]
pub struct LastMinuteHistogram {
@@ -843,7 +840,6 @@ pub struct LastMinuteHistogram {
size: u32,
}
impl LastMinuteHistogram {
pub fn merge(&mut self, other: &LastMinuteHistogram) {
for i in 0..self.histogram.len() {
@@ -867,11 +863,11 @@ impl LastMinuteHistogram {
fn size_to_tag(size: i64) -> usize {
match size {
_ if size < 1024 => 0, // sizeLessThan1KiB
_ if size < 1024 * 1024 => 1, // sizeLessThan1MiB
_ if size < 10 * 1024 * 1024 => 2, // sizeLessThan10MiB
_ if size < 100 * 1024 * 1024 => 3, // sizeLessThan100MiB
_ if size < 1024 => 0, // sizeLessThan1KiB
_ if size < 1024 * 1024 => 1, // sizeLessThan1MiB
_ if size < 10 * 1024 * 1024 => 2, // sizeLessThan10MiB
_ if size < 100 * 1024 * 1024 => 3, // sizeLessThan100MiB
_ if size < 1024 * 1024 * 1024 => 4, // sizeLessThan1GiB
_ => 5, // sizeGreaterThan1GiB
_ => 5, // sizeGreaterThan1GiB
}
}
}

View File

@@ -57,11 +57,8 @@ tempfile.workspace = true
tokio = { workspace = true, features = ["io-util", "sync", "signal"] }
tokio-stream = { workspace = true }
tonic.workspace = true
tower.workspace = true
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
num = { workspace = true }
num_cpus = { workspace = true }
s3s-policy.workspace = true
rand.workspace = true
pin-project-lite.workspace = true
md-5.workspace = true
@@ -70,9 +67,7 @@ workers.workspace = true
reqwest = { workspace = true }
aws-sdk-s3 = { workspace = true }
once_cell = { workspace = true }
aws-smithy-types = "1.2.13"
rustfs-rsc = "2025.220.3"
hyper.workspace = true
rustfs-rsc = { workspace = true }
urlencoding = { workspace = true }
smallvec = { workspace = true }
shadow-rs.workspace = true

View File

@@ -1,12 +1,5 @@
#![allow(unused_variables)]
#![allow(dead_code)]
use aws_sdk_s3::config::BehaviorVersion;
use aws_sdk_s3::config::Credentials;
use aws_sdk_s3::config::Region;
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::Config;
use bytes::Bytes;
use uuid::Uuid;
// use error::Error;
use crate::bucket::metadata_sys::get_replication_config;
use crate::bucket::versioning_sys::BucketVersioningSys;
@@ -19,6 +12,12 @@ use crate::store_api::ObjectInfo;
use crate::store_api::ObjectOptions;
use crate::store_api::ObjectToDelete;
use crate::StorageAPI;
use aws_sdk_s3::config::BehaviorVersion;
use aws_sdk_s3::config::Credentials;
use aws_sdk_s3::config::Region;
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::Config;
use bytes::Bytes;
use chrono::DateTime;
use chrono::Duration;
use chrono::Utc;
@@ -28,6 +27,8 @@ use futures::StreamExt;
use http::HeaderMap;
use http::Method;
use lazy_static::lazy_static;
// use std::time::SystemTime;
use once_cell::sync::Lazy;
use regex::Regex;
use rustfs_rsc::provider::StaticProvider;
use rustfs_rsc::Minio;
@@ -47,14 +48,13 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::vec;
use time::OffsetDateTime;
use tokio::sync::Mutex;
use tokio::task;
use tracing::{debug, error, warn, info};
use xxhash_rust::xxh3::xxh3_64;
// use std::time::SystemTime;
use once_cell::sync::Lazy;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::task;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use xxhash_rust::xxh3::xxh3_64;
// use bucket_targets::{self, GLOBAL_Bucket_Target_Sys};
#[derive(Serialize, Deserialize, Debug)]
@@ -222,9 +222,7 @@ pub async fn queue_replication_heal(
return None;
}
if oi.replication_status == ReplicationStatusType::Completed
&& !roi.existing_obj_resync.must_resync()
{
if oi.replication_status == ReplicationStatusType::Completed && !roi.existing_obj_resync.must_resync() {
return None;
}
@@ -340,10 +338,7 @@ pub async fn check_replicate_delete(
user_tags: Some(oi.user_tags.clone()),
delete_marker: oi.delete_marker,
//version_id: dobj.version_id.clone().map(|v| v.to_string()),
version_id: oi
.version_id
.map(|uuid| uuid.to_string())
.unwrap_or_default(),
version_id: oi.version_id.map(|uuid| uuid.to_string()).unwrap_or_default(),
op_type: ReplicationType::DeleteReplicationType,
target_arn: None,
replica: true,
@@ -455,9 +450,9 @@ pub async fn get_heal_replicate_object_info(
&oi.bucket,
&ObjectToDelete {
object_name: oi.name.clone(),
version_id: oi.version_id.clone(),
version_id: oi.version_id,
},
&oi,
oi,
&ObjectOptions {
// versioned: global_bucket_versioning_sys::prefix_enabled(&oi.bucket, &oi.name),
// version_suspended: global_bucket_versioning_sys::prefix_suspended(&oi.bucket, &oi.name),
@@ -466,7 +461,8 @@ pub async fn get_heal_replicate_object_info(
..Default::default()
},
None,
).await
)
.await
} else {
// let opts: ObjectOptions = put_opts(&bucket, &key, version_id, &req.headers, Some(mt))
// .await
@@ -480,8 +476,13 @@ pub async fn get_heal_replicate_object_info(
mod_time: oi.mod_time,
..Default::default()
};
let repoptions =
get_must_replicate_options(mt2.as_ref().unwrap_or(&HashMap::new()), "", ReplicationStatusType::Unknown, ReplicationType::ObjectReplicationType, &opts);
let repoptions = get_must_replicate_options(
mt2.as_ref().unwrap_or(&HashMap::new()),
"",
ReplicationStatusType::Unknown,
ReplicationType::ObjectReplicationType,
&opts,
);
let decision = must_replicate(&oi.bucket, &oi.name, &repoptions).await;
error!("decision:");
@@ -509,7 +510,10 @@ pub async fn get_heal_replicate_object_info(
let asz = oi.get_actual_size().unwrap_or(0);
let key = format!("{}{}", RESERVED_METADATA_PREFIX_LOWER, REPLICATION_TIMESTAMP);
let tm: Option<DateTime<Utc>> = user_defined.as_ref().unwrap().get(&key)
let tm: Option<DateTime<Utc>> = user_defined
.as_ref()
.unwrap()
.get(&key)
.and_then(|v| DateTime::parse_from_rfc3339(v).ok())
.map(|dt| dt.with_timezone(&Utc));
@@ -536,7 +540,7 @@ pub async fn get_heal_replicate_object_info(
existing_obj_resync: Default::default(),
target_statuses: tgt_statuses,
target_purge_statuses: purge_statuses,
replication_timestamp: tm.unwrap_or_else(|| chrono::Utc::now()),
replication_timestamp: tm.unwrap_or_else(|| Utc::now),
//ssec: crypto::is_encrypted(&oi.user_defined),
ssec: false,
user_tags: oi.user_tags.clone(),
@@ -551,7 +555,10 @@ pub async fn get_heal_replicate_object_info(
result.checksum = oi.checksum.clone();
}
warn!("Replication heal for object {} in bucket {} is configured {:?}", oi.name, oi.bucket, oi.version_id);
warn!(
"Replication heal for object {} in bucket {} is configured {:?}",
oi.name, oi.bucket, oi.version_id
);
result
}
@@ -691,7 +698,7 @@ impl ReplicationPool {
mrf_worker_size: workers,
priority,
max_workers,
obj_layer: obj_layer,
obj_layer,
};
warn!("work size is: {}", workers);
@@ -932,7 +939,7 @@ impl ReplicationPool {
let max_workers = max_workers.min(WORKER_MAX_LIMIT);
if worker_count < max_workers as i32 {
//self.resize_workers((worker_count + 1 as usize).try_into().unwrap(), worker_count).await;
self.resize_workers(worker_count as usize + 1 as usize, Some(worker_count as usize))
self.resize_workers(worker_count as usize + 1_usize, Some(worker_count as usize))
.await;
}
@@ -949,6 +956,12 @@ impl ReplicationPool {
pub struct ReplicationResyncer;
impl Default for ReplicationResyncer {
fn default() -> Self {
Self
}
}
impl ReplicationResyncer {
pub fn new() -> Self {
Self
@@ -967,6 +980,7 @@ pub async fn init_bucket_replication_pool() {
warn!("init bucket replication pool");
ReplicationPool::init_bucket_replication_pool(store, opts, stat).await;
} else {
// TODO: to be added
}
}
@@ -1736,10 +1750,10 @@ impl TraitForObjectInfo for ObjectInfo {
fn convert_offsetdatetime_to_chrono(offset_dt: Option<OffsetDateTime>) -> Option<DateTime<Utc>> {
//offset_dt.map(|odt| {
let tm = offset_dt.unwrap().unix_timestamp();
//let naive = NaiveDateTime::from_timestamp_opt(tm, 0).expect("Invalid timestamp");
DateTime::<Utc>::from_timestamp(tm, 0)
//DateTime::from_naive_utc_and_offset(naive, Utc) // Convert to Utc first
let tm = offset_dt.unwrap().unix_timestamp();
//let naive = NaiveDateTime::from_timestamp_opt(tm, 0).expect("Invalid timestamp");
DateTime::<Utc>::from_timestamp(tm, 0)
//DateTime::from_naive_utc_and_offset(naive, Utc) // Convert to Utc first
//})
}
@@ -1860,8 +1874,8 @@ pub async fn must_replicate(bucket: &str, object: &str, mopts: &MustReplicateOpt
//decision.set(ReplicateTargetDecision::new(replicate,synchronous));
info!("targe decision arn is:{}", tgt_arn.clone());
decision.set(ReplicateTargetDecision {
replicate: replicate,
synchronous: synchronous,
replicate,
synchronous,
arn: tgt_arn.clone(),
id: 0.to_string(),
});
@@ -2227,7 +2241,7 @@ async fn replicate_object_with_multipart(
return Err(err.into());
}
}
return Ok(());
Ok(())
}
impl ReplicateObjectInfo {
@@ -2374,7 +2388,7 @@ impl ReplicateObjectInfo {
.await;
match ret {
Ok(_res) => {
warn!("replicate suc: {} {} {}", self.bucket, self.name, self.version_id);
warn!("replicate suc: {} {} {}", self.bucket, self.name, self.version_id);
rinfo.replication_status = ReplicationStatusType::Completed;
}
Err(err) => {
@@ -2465,7 +2479,7 @@ pub fn get_must_replicate_options(
MustReplicateOptions {
meta,
status: status,
status,
op_type: op,
replication_request: opts.replication_request,
}
@@ -2561,7 +2575,6 @@ impl ReplicatedInfos {
}
}
impl ReplicatedTargetInfo {
fn empty(&self) -> bool {
// Implement your logic to check if the target is empty

View File

@@ -1,26 +1,27 @@
#![allow(unused_variables)]
#![allow(dead_code)]
use crate::{
bucket::{self, target::BucketTargets},
new_object_layer_fn, peer, store_api,
};
use crate::{
bucket::{metadata_sys, target::BucketTarget},
endpoints::Node,
peer::{PeerS3Client, RemotePeerS3Client},
StorageAPI,
};
use crate::{
bucket::{self, target::BucketTargets},
new_object_layer_fn, peer, store_api,
};
//use tokio::sync::RwLock;
use aws_sdk_s3::Client as S3Client;
use chrono::Utc;
use futures::future::err;
use lazy_static::lazy_static;
use std::sync::Arc;
use std::{
collections::HashMap,
time::{Duration, SystemTime},
};
//use tokio::sync::RwLock;
use aws_sdk_s3::Client as S3Client;
use std::sync::{Arc};
use tokio::sync::RwLock;
use thiserror::Error;
use tokio::sync::RwLock;
pub struct TClient {
pub s3cli: S3Client,
@@ -78,21 +79,21 @@ pub struct ArnTarget {
last_refresh: chrono::DateTime<Utc>,
}
impl ArnTarget {
pub fn new(bucket: String, endpoint: String, ak:String, sk:String) -> Self {
pub fn new(bucket: String, endpoint: String, ak: String, sk: String) -> Self {
Self {
client: TargetClient {
bucket: bucket,
bucket,
storage_class: "STANDRD".to_string(),
disable_proxy: false,
health_check_duration: Duration::from_secs(100),
endpoint: endpoint,
endpoint,
reset_id: "0".to_string(),
replicate_sync: false,
secure: false,
arn: "".to_string(),
client: reqwest::Client::new(),
ak:ak,
sk:sk,
ak,
sk,
},
last_refresh: Utc::now(),
}
@@ -149,9 +150,9 @@ pub struct BucketRemoteTargetNotFound {
pub async fn init_bucket_targets(bucket: &str, meta: Arc<bucket::metadata::BucketMetadata>) {
println!("140 {}", bucket);
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
if let Some(tgts) = meta.bucket_target_config.clone() {
if let Some(tgts) = meta.bucket_target_config.clone() {
for tgt in tgts.targets {
warn!("ak and sk is:{:?}",tgt.credentials);
warn!("ak and sk is:{:?}", tgt.credentials);
let _ = sys.set_target(bucket, &tgt, false, true).await;
//sys.targets_map.
}
@@ -159,12 +160,17 @@ pub async fn init_bucket_targets(bucket: &str, meta: Arc<bucket::metadata::Bucke
}
}
pub async fn remove_bucket_target(bucket: &str, arn_str:&str) {
pub async fn remove_bucket_target(bucket: &str, arn_str: &str) {
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
let _ = sys.remove_target(bucket, arn_str).await;
}
}
impl Default for BucketTargetSys {
fn default() -> Self {
Self::new()
}
}
impl BucketTargetSys {
pub fn new() -> Self {
@@ -238,7 +244,7 @@ impl BucketTargetSys {
targets
}
pub async fn remove_target(&self, bucket:&str, arn_str:&str) -> Result<(), SetTargetError> {
pub async fn remove_target(&self, bucket: &str, arn_str: &str) -> Result<(), SetTargetError> {
//to do need lock;
let mut targets_map = self.targets_map.write().await;
let tgts = targets_map.get(bucket);
@@ -263,14 +269,13 @@ impl BucketTargetSys {
// 如果没有找到匹配的 ARN则返回错误
if !found {
return Ok(());
return Ok(());
}
// 更新 targets_map
targets_map.insert(bucket.to_string(), targets);
arn_remotes_map.remove(arn_str);
let targets = self.list_targets(Some(&bucket), None).await;
println!("targets is {}", targets.len());
match serde_json::to_vec(&targets) {
@@ -376,11 +381,11 @@ impl BucketTargetSys {
match store.get_bucket_info(_bucket, &store_api::BucketOptions::default()).await {
Ok(info) => {
println!("Bucket Info: {:?}", info);
return info.versionning;
info.versionning
}
Err(err) => {
eprintln!("Error: {:?}", err);
return false;
false
}
}
}
@@ -419,13 +424,11 @@ impl BucketTargetSys {
// }
//let client = self.get_remote_target_client(tgt).await?;
if tgt.type_ == Some("replication".to_string()) {
if !fromdisk {
let versioning_config = self.local_is_bucket_versioned(bucket).await;
if !versioning_config {
// println!("111111111");
return Err(SetTargetError::TargetNotVersioned(bucket.to_string()));
}
if tgt.type_ == Some("replication".to_string()) && !fromdisk {
let versioning_config = self.local_is_bucket_versioned(bucket).await;
if !versioning_config {
// println!("111111111");
return Err(SetTargetError::TargetNotVersioned(bucket.to_string()));
}
}
@@ -502,7 +505,12 @@ impl BucketTargetSys {
println!("437 exist:{}", tgt.arn.clone().unwrap());
targets.push(tgt.clone());
}
let arntgt: ArnTarget = ArnTarget::new(tgt.target_bucket.clone(), tgt.endpoint.clone(),tgt.credentials.clone().unwrap().access_key.clone(), tgt.credentials.clone().unwrap().secret_key);
let arntgt: ArnTarget = ArnTarget::new(
tgt.target_bucket.clone(),
tgt.endpoint.clone(),
tgt.credentials.clone().unwrap().access_key.clone(),
tgt.credentials.clone().unwrap().secret_key,
);
arn_remotes_map.insert(tgt.arn.clone().unwrap().clone(), arntgt);
//self.update_bandwidth_limit(bucket, &tgt.arn, tgt.bandwidth_limit).await;
@@ -523,8 +531,8 @@ pub struct TargetClient {
pub reset_id: String,
pub endpoint: String,
pub secure: bool,
pub ak:String,
pub sk:String,
pub ak: String,
pub sk: String,
}
impl TargetClient {
@@ -539,8 +547,8 @@ impl TargetClient {
reset_id: String,
endpoint: String,
secure: bool,
ak:String,
sk:String,
ak: String,
sk: String,
) -> Self {
TargetClient {
client,
@@ -561,7 +569,7 @@ impl TargetClient {
Ok(true) // Mocked implementation
}
}
use tracing::{error, warn, info};
use tracing::{error, info, warn};
use uuid::Uuid;
#[derive(Debug, Clone)]
@@ -613,7 +621,7 @@ impl ARN {
/// 检查 ARN 是否为空
pub fn is_empty(&self) -> bool {
//!self.arn_type.is_valid()
return false;
false
}
/// 将 ARN 转为字符串格式
@@ -625,12 +633,12 @@ impl ARN {
pub fn parse(s: &str) -> Result<Self, String> {
// ARN 必须是格式 arn:rustfs:<Type>:<REGION>:<ID>:<remote-bucket>
if !s.starts_with("arn:rustfs:") {
return Err(format!("Invalid ARN {}", s).into());
return Err(format!("Invalid ARN {}", s));
}
let tokens: Vec<&str> = s.split(':').collect();
if tokens.len() != 6 || tokens[4].is_empty() || tokens[5].is_empty() {
return Err(format!("Invalid ARN {}", s).into());
return Err(format!("Invalid ARN {}", s));
}
Ok(ARN {
@@ -661,7 +669,7 @@ fn must_get_uuid() -> String {
}
fn generate_arn(target: BucketTarget, depl_id: String) -> String {
let mut uuid: String = depl_id;
if uuid == "" {
if uuid.is_empty() {
uuid = must_get_uuid();
}
@@ -671,7 +679,7 @@ fn generate_arn(target: BucketTarget, depl_id: String) -> String {
region: "us-east-1".to_string(),
bucket: (target.target_bucket),
};
return arn.to_string();
arn.to_string()
}
// use std::collections::HashMap;

View File

@@ -434,7 +434,7 @@ impl PeerS3Client for LocalPeerS3Client {
// TODO: reduceWriteQuorumErrs
let mut versioned = false;
if let Ok(sys) = metadata_sys::get(bucket).await {
versioned = sys.versioning();
versioned = sys.versioning();
}
ress.iter()
@@ -505,13 +505,13 @@ impl RemotePeerS3Client {
let addr = node.as_ref().map(|v| v.url.to_string()).unwrap_or_default().to_string();
Self { node, pools, addr }
}
pub fn get_addr(&self)->String {
return self.addr.clone();
pub fn get_addr(&self) -> String {
self.addr.clone()
}
}
#[async_trait]
impl PeerS3Client for RemotePeerS3Client {
impl PeerS3Client for RemotePeerS3Client {
fn get_pools(&self) -> Option<Vec<usize>> {
self.pools.clone()
}

View File

@@ -11,7 +11,7 @@ datafusion = { workspace = true }
derive_builder = { workspace = true }
futures = { workspace = true }
lazy_static = { workspace = true }
parking_lot = { version = "0.12.3" }
parking_lot = { workspace = true }
s3s.workspace = true
snafu = { workspace = true, features = ["backtrace"] }
tokio = { workspace = true }