mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-16 17:20:33 +00:00
Merge branch 'main' of https://github.com/rustfs/s3-rustfs into dev_objectEncrypt_v1
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
services:
|
||||
otel-collector:
|
||||
image: ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.124.0
|
||||
image: ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.127.0
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
volumes:
|
||||
@@ -16,7 +16,7 @@ services:
|
||||
networks:
|
||||
- otel-network
|
||||
jaeger:
|
||||
image: jaegertracing/jaeger:2.5.0
|
||||
image: jaegertracing/jaeger:2.6.0
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
ports:
|
||||
@@ -26,7 +26,7 @@ services:
|
||||
networks:
|
||||
- otel-network
|
||||
prometheus:
|
||||
image: prom/prometheus:v3.3.0
|
||||
image: prom/prometheus:v3.4.1
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
volumes:
|
||||
@@ -36,7 +36,7 @@ services:
|
||||
networks:
|
||||
- otel-network
|
||||
loki:
|
||||
image: grafana/loki:3.5.0
|
||||
image: grafana/loki:3.5.1
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
volumes:
|
||||
@@ -47,7 +47,7 @@ services:
|
||||
networks:
|
||||
- otel-network
|
||||
grafana:
|
||||
image: grafana/grafana:11.6.1
|
||||
image: grafana/grafana:12.0.1
|
||||
ports:
|
||||
- "3000:3000" # Web UI
|
||||
environment:
|
||||
|
||||
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@@ -19,7 +19,7 @@ jobs:
|
||||
permissions:
|
||||
actions: write
|
||||
contents: read
|
||||
runs-on: self-hosted
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
should_skip: ${{ steps.skip_check.outputs.should_skip }}
|
||||
steps:
|
||||
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -18,4 +18,4 @@ deploy/certs/*
|
||||
.rustfs.sys
|
||||
.cargo
|
||||
profile.json
|
||||
.docker/openobserve-otel/data
|
||||
.docker/openobserve-otel/data
|
||||
|
||||
2060
Cargo.lock
generated
2060
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
59
Cargo.toml
59
Cargo.toml
@@ -59,13 +59,15 @@ atoi = "2.0.0"
|
||||
async-recursion = "1.1.1"
|
||||
async-trait = "0.1.88"
|
||||
atomic_enum = "0.3.0"
|
||||
aws-sdk-s3 = "1.29.0"
|
||||
axum = "0.8.4"
|
||||
axum-extra = "0.10.1"
|
||||
axum-server = { version = "0.7.2", features = ["tls-rustls"] }
|
||||
backon = "1.5.0"
|
||||
backon = "1.5.1"
|
||||
blake2 = "0.10.6"
|
||||
bytes = "1.10.1"
|
||||
bytesize = "2.0.1"
|
||||
byteorder = "1.5.0"
|
||||
chrono = { version = "0.4.41", features = ["serde"] }
|
||||
clap = { version = "4.5.39", features = ["derive", "env"] }
|
||||
config = "0.15.11"
|
||||
@@ -83,7 +85,7 @@ glob = "0.3.2"
|
||||
hex = "0.4.3"
|
||||
highway = { version = "1.3.0" }
|
||||
hyper = "1.6.0"
|
||||
hyper-util = { version = "0.1.11", features = [
|
||||
hyper-util = { version = "0.1.14", features = [
|
||||
"tokio",
|
||||
"server-auto",
|
||||
"server-graceful",
|
||||
@@ -91,6 +93,7 @@ hyper-util = { version = "0.1.11", features = [
|
||||
http = "1.3.1"
|
||||
http-body = "1.0.1"
|
||||
humantime = "2.2.0"
|
||||
include_dir = "0.7.4"
|
||||
jsonwebtoken = "9.3.1"
|
||||
keyring = { version = "3.6.2", features = [
|
||||
"apple-native",
|
||||
@@ -107,36 +110,40 @@ mime_guess = "2.0.5"
|
||||
netif = "0.1.6"
|
||||
nix = { version = "0.30.1", features = ["fs"] }
|
||||
nu-ansi-term = "0.50.1"
|
||||
num_cpus = { version = "1.16.0" }
|
||||
nvml-wrapper = "0.10.0"
|
||||
num_cpus = { version = "1.17.0" }
|
||||
nvml-wrapper = "0.11.0"
|
||||
object_store = "0.11.2"
|
||||
opentelemetry = { version = "0.29.1" }
|
||||
opentelemetry-appender-tracing = { version = "0.29.1", features = [
|
||||
once_cell = "1.21.3"
|
||||
opentelemetry = { version = "0.30.0" }
|
||||
opentelemetry-appender-tracing = { version = "0.30.1", features = [
|
||||
"experimental_use_tracing_span_context",
|
||||
"experimental_metadata_attributes",
|
||||
"spec_unstable_logs_enabled"
|
||||
] }
|
||||
opentelemetry_sdk = { version = "0.29.0" }
|
||||
opentelemetry-stdout = { version = "0.29.0" }
|
||||
opentelemetry-otlp = { version = "0.29.0" }
|
||||
opentelemetry-semantic-conventions = { version = "0.29.0", features = [
|
||||
opentelemetry_sdk = { version = "0.30.0" }
|
||||
opentelemetry-stdout = { version = "0.30.0" }
|
||||
opentelemetry-otlp = { version = "0.30.0", default-features = false, features = [
|
||||
"grpc-tonic", "gzip-tonic", "trace", "metrics", "logs", "internal-logs"
|
||||
] }
|
||||
opentelemetry-semantic-conventions = { version = "0.30.0", features = [
|
||||
"semconv_experimental",
|
||||
] }
|
||||
parking_lot = "0.12.3"
|
||||
parking_lot = "0.12.4"
|
||||
percent-encoding = "2.3.1"
|
||||
pin-project-lite = "0.2.16"
|
||||
# pin-utils = "0.1.0"
|
||||
prost = "0.13.5"
|
||||
prost-build = "0.13.5"
|
||||
prost-types = "0.13.5"
|
||||
protobuf = "3.7"
|
||||
rand = "0.8.5"
|
||||
rdkafka = { version = "0.37.0", features = ["tokio"] }
|
||||
reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] }
|
||||
regex = { version = "1.11.1" }
|
||||
reqwest = { version = "0.12.16", default-features = false, features = [
|
||||
reqwest = { version = "0.12.19", default-features = false, features = [
|
||||
"rustls-tls",
|
||||
"charset",
|
||||
"http2",
|
||||
"macos-system-configuration",
|
||||
"system-proxy",
|
||||
"stream",
|
||||
"json",
|
||||
"blocking",
|
||||
@@ -148,9 +155,10 @@ rfd = { version = "0.15.3", default-features = false, features = [
|
||||
rmp = "0.8.14"
|
||||
rmp-serde = "1.3.0"
|
||||
rumqttc = { version = "0.24" }
|
||||
rust-embed = { version = "8.7.1" }
|
||||
rust-embed = { version = "8.7.2" }
|
||||
rustfs-rsc = "2025.506.1"
|
||||
rustls = { version = "0.23.27" }
|
||||
rustls-pki-types = "1.11.0"
|
||||
rustls-pki-types = "1.12.0"
|
||||
rustls-pemfile = "2.2.0"
|
||||
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" }
|
||||
s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "4733cdfb27b2713e832967232cbff413bb768c10" }
|
||||
@@ -161,11 +169,11 @@ serde_urlencoded = "0.7.1"
|
||||
serde_with = "3.12.0"
|
||||
sha2 = "0.10.9"
|
||||
smallvec = { version = "1.15.0", features = ["serde"] }
|
||||
snafu = "0.8.5"
|
||||
socket2 = "0.5.9"
|
||||
snafu = "0.8.6"
|
||||
socket2 = "0.5.10"
|
||||
strum = { version = "0.27.1", features = ["derive"] }
|
||||
sysinfo = "0.35.1"
|
||||
tempfile = "3.19.1"
|
||||
sysinfo = "0.35.2"
|
||||
tempfile = "3.20.0"
|
||||
test-case = "3.3.1"
|
||||
thiserror = "2.0.12"
|
||||
time = { version = "0.3.41", features = [
|
||||
@@ -182,16 +190,17 @@ tokio-rustls = { version = "0.26.2", default-features = false }
|
||||
tokio-stream = { version = "0.1.17" }
|
||||
tokio-util = { version = "0.7.15", features = ["io", "compat"] }
|
||||
tower = { version = "0.5.2", features = ["timeout"] }
|
||||
tower-http = { version = "0.6.2", features = ["cors"] }
|
||||
tower-http = { version = "0.6.6", features = ["cors"] }
|
||||
tracing = "0.1.41"
|
||||
tracing-core = "0.1.33"
|
||||
tracing-core = "0.1.34"
|
||||
tracing-error = "0.2.1"
|
||||
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "time"] }
|
||||
tracing-appender = "0.2.3"
|
||||
tracing-opentelemetry = "0.30.0"
|
||||
tracing-opentelemetry = "0.31.0"
|
||||
transform-stream = "0.3.1"
|
||||
url = "2.5.4"
|
||||
uuid = { version = "1.16.0", features = [
|
||||
urlencoding = "2.1.3"
|
||||
uuid = { version = "1.17.0", features = [
|
||||
"v4",
|
||||
"fast-rng",
|
||||
"macro-diagnostics",
|
||||
@@ -223,4 +232,4 @@ codegen-units = 1
|
||||
|
||||
[profile.profiling]
|
||||
inherits = "release"
|
||||
debug = true
|
||||
debug = true
|
||||
|
||||
45
bucket_replicate_test.md
Normal file
45
bucket_replicate_test.md
Normal file
@@ -0,0 +1,45 @@
|
||||
启动两个rustfs
|
||||
rustfs --address 0.0.0.0:9000 /rustfs-data9000
|
||||
rustfs --address 0.0.0.0:9001 /rustfs-data9001
|
||||
|
||||
|
||||
### 使用 minio mc 设置 alias 分别为 rustfs 和 rustfs2
|
||||
|
||||
|
||||
### 创建 bucket
|
||||
mc mb rustfs/srcbucket
|
||||
|
||||
### 创建 desc bucket
|
||||
|
||||
mc mb rustfs2/destbucket
|
||||
|
||||
|
||||
|
||||
### 开启版本控制
|
||||
|
||||
mc version enable rustfs/srcbucket
|
||||
mc version enable rustfs2/destbucket
|
||||
|
||||
#### 使用修改过的 mc 才能 add bucket replication
|
||||
|
||||
./mc replication add rustfs/srcbucket --remote-bucket rustfs2/destbucket
|
||||
|
||||
|
||||
|
||||
###### 复制一个小文件;
|
||||
mc cp ./1.txt rustfs/srcbucket
|
||||
|
||||
###### 查看是否成功
|
||||
mc ls --versions rustfs/srcbucket/1.txt
|
||||
mc ls --versions rustfs/destbucket/1.txt
|
||||
|
||||
|
||||
##### 复制一个大文件
|
||||
1 创建一个大文件
|
||||
dd if=/dev/zero of=./dd.out bs=4096000 count=1000
|
||||
|
||||
mc cp ./dd.out rustfs/srcbucket/
|
||||
|
||||
##### 查看是否成功
|
||||
mc ls --versions rustfs/srcbucket/dd.out
|
||||
mc ls --versions rustfs2/destbucket/dd.out
|
||||
21
build_rustfs.sh
Executable file
21
build_rustfs.sh
Executable file
@@ -0,0 +1,21 @@
|
||||
#!/bin/bash
|
||||
clear
|
||||
|
||||
# 获取当前平台架构
|
||||
ARCH=$(uname -m)
|
||||
|
||||
# 根据架构设置 target 目录
|
||||
if [ "$ARCH" == "x86_64" ]; then
|
||||
TARGET_DIR="target/x86_64"
|
||||
elif [ "$ARCH" == "aarch64" ]; then
|
||||
TARGET_DIR="target/arm64"
|
||||
else
|
||||
TARGET_DIR="target/unknown"
|
||||
fi
|
||||
|
||||
# 设置 CARGO_TARGET_DIR 并构建项目
|
||||
CARGO_TARGET_DIR=$TARGET_DIR RUSTFLAGS="-C link-arg=-fuse-ld=mold" cargo build --package rustfs
|
||||
|
||||
echo -e "\a"
|
||||
echo -e "\a"
|
||||
echo -e "\a"
|
||||
73
common/common/src/bucket_stats.rs
Normal file
73
common/common/src/bucket_stats.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::last_minute::{self};
|
||||
pub struct ReplicationLatency {
|
||||
// 单个和多部分 PUT 请求的延迟
|
||||
upload_histogram: last_minute::LastMinuteHistogram,
|
||||
}
|
||||
|
||||
impl ReplicationLatency {
|
||||
// 合并两个 ReplicationLatency
|
||||
pub fn merge(&mut self, other: &mut ReplicationLatency) -> &ReplicationLatency {
|
||||
self.upload_histogram.merge(&other.upload_histogram);
|
||||
self
|
||||
}
|
||||
|
||||
// 获取上传延迟(按对象大小区间分类)
|
||||
pub fn get_upload_latency(&mut self) -> HashMap<String, u64> {
|
||||
let mut ret = HashMap::new();
|
||||
let avg = self.upload_histogram.get_avg_data();
|
||||
for (i, v) in avg.iter().enumerate() {
|
||||
let avg_duration = v.avg();
|
||||
ret.insert(self.size_tag_to_string(i), avg_duration.as_millis() as u64);
|
||||
}
|
||||
ret
|
||||
}
|
||||
pub fn update(&mut self, size: i64, during: std::time::Duration) {
|
||||
self.upload_histogram.add(size, during);
|
||||
}
|
||||
|
||||
// 模拟从 size tag 到字符串的转换
|
||||
fn size_tag_to_string(&self, tag: usize) -> String {
|
||||
match tag {
|
||||
0 => String::from("Size < 1 KiB"),
|
||||
1 => String::from("Size < 1 MiB"),
|
||||
2 => String::from("Size < 10 MiB"),
|
||||
3 => String::from("Size < 100 MiB"),
|
||||
4 => String::from("Size < 1 GiB"),
|
||||
_ => String::from("Size > 1 GiB"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// #[derive(Debug, Clone, Default)]
|
||||
// pub struct ReplicationLastMinute {
|
||||
// pub last_minute: LastMinuteLatency,
|
||||
// }
|
||||
|
||||
// impl ReplicationLastMinute {
|
||||
// pub fn merge(&mut self, other: ReplicationLastMinute) -> ReplicationLastMinute {
|
||||
// let mut nl = ReplicationLastMinute::default();
|
||||
// nl.last_minute = self.last_minute.merge(&mut other.last_minute);
|
||||
// nl
|
||||
// }
|
||||
|
||||
// pub fn add_size(&mut self, n: i64) {
|
||||
// let t = SystemTime::now()
|
||||
// .duration_since(UNIX_EPOCH)
|
||||
// .expect("Time went backwards")
|
||||
// .as_secs();
|
||||
// self.last_minute.add_all(t - 1, &AccElem { total: t - 1, size: n as u64, n: 1 });
|
||||
// }
|
||||
|
||||
// pub fn get_total(&self) -> AccElem {
|
||||
// self.last_minute.get_total()
|
||||
// }
|
||||
// }
|
||||
|
||||
// impl fmt::Display for ReplicationLastMinute {
|
||||
// fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
// let t = self.last_minute.get_total();
|
||||
// write!(f, "ReplicationLastMinute sz= {}, n= {}, dur= {}", t.size, t.n, t.total)
|
||||
// }
|
||||
// }
|
||||
@@ -1,6 +1,81 @@
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Default)]
|
||||
struct TimedAction {
|
||||
count: u64,
|
||||
acc_time: u64,
|
||||
min_time: Option<u64>,
|
||||
max_time: Option<u64>,
|
||||
bytes: u64,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl TimedAction {
|
||||
// Avg returns the average time spent on the action.
|
||||
pub fn avg(&self) -> Option<std::time::Duration> {
|
||||
if self.count == 0 {
|
||||
return None;
|
||||
}
|
||||
Some(std::time::Duration::from_nanos(self.acc_time / self.count))
|
||||
}
|
||||
|
||||
// AvgBytes returns the average bytes processed.
|
||||
pub fn avg_bytes(&self) -> u64 {
|
||||
if self.count == 0 {
|
||||
return 0;
|
||||
}
|
||||
self.bytes / self.count
|
||||
}
|
||||
|
||||
// Merge other into t.
|
||||
pub fn merge(&mut self, other: TimedAction) {
|
||||
self.count += other.count;
|
||||
self.acc_time += other.acc_time;
|
||||
self.bytes += other.bytes;
|
||||
|
||||
if self.count == 0 {
|
||||
self.min_time = other.min_time;
|
||||
}
|
||||
if let Some(other_min) = other.min_time {
|
||||
self.min_time = self.min_time.map_or(Some(other_min), |min| Some(min.min(other_min)));
|
||||
}
|
||||
|
||||
self.max_time = self
|
||||
.max_time
|
||||
.map_or(other.max_time, |max| Some(max.max(other.max_time.unwrap_or(0))));
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug)]
|
||||
enum SizeCategory {
|
||||
SizeLessThan1KiB = 0,
|
||||
SizeLessThan1MiB,
|
||||
SizeLessThan10MiB,
|
||||
SizeLessThan100MiB,
|
||||
SizeLessThan1GiB,
|
||||
SizeGreaterThan1GiB,
|
||||
// Add new entries here
|
||||
SizeLastElemMarker,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for SizeCategory {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let s = match *self {
|
||||
SizeCategory::SizeLessThan1KiB => "SizeLessThan1KiB",
|
||||
SizeCategory::SizeLessThan1MiB => "SizeLessThan1MiB",
|
||||
SizeCategory::SizeLessThan10MiB => "SizeLessThan10MiB",
|
||||
SizeCategory::SizeLessThan100MiB => "SizeLessThan100MiB",
|
||||
SizeCategory::SizeLessThan1GiB => "SizeLessThan1GiB",
|
||||
SizeCategory::SizeGreaterThan1GiB => "SizeGreaterThan1GiB",
|
||||
SizeCategory::SizeLastElemMarker => "SizeLastElemMarker",
|
||||
};
|
||||
write!(f, "{}", s)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Copy)]
|
||||
pub struct AccElem {
|
||||
pub total: u64,
|
||||
pub size: u64,
|
||||
@@ -28,7 +103,7 @@ impl AccElem {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LastMinuteLatency {
|
||||
pub totals: Vec<AccElem>,
|
||||
pub last_sec: u64,
|
||||
@@ -44,10 +119,11 @@ impl Default for LastMinuteLatency {
|
||||
}
|
||||
|
||||
impl LastMinuteLatency {
|
||||
pub fn merge(&mut self, o: &mut LastMinuteLatency) -> LastMinuteLatency {
|
||||
pub fn merge(&mut self, o: &LastMinuteLatency) -> LastMinuteLatency {
|
||||
let mut merged = LastMinuteLatency::default();
|
||||
let mut x = o.clone();
|
||||
if self.last_sec > o.last_sec {
|
||||
o.forward_to(self.last_sec);
|
||||
x.forward_to(self.last_sec);
|
||||
merged.last_sec = self.last_sec;
|
||||
} else {
|
||||
self.forward_to(o.last_sec);
|
||||
@@ -111,7 +187,6 @@ impl LastMinuteLatency {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -415,7 +490,7 @@ mod tests {
|
||||
latency2.totals[0].total = 20;
|
||||
latency2.totals[0].n = 3;
|
||||
|
||||
let merged = latency1.merge(&mut latency2);
|
||||
let merged = latency1.merge(&latency2);
|
||||
|
||||
assert_eq!(merged.last_sec, 1000);
|
||||
assert_eq!(merged.totals[0].total, 30); // 10 + 20
|
||||
@@ -434,7 +509,7 @@ mod tests {
|
||||
latency1.totals[0].total = 10;
|
||||
latency2.totals[0].total = 20;
|
||||
|
||||
let merged = latency1.merge(&mut latency2);
|
||||
let merged = latency1.merge(&latency2);
|
||||
|
||||
assert_eq!(merged.last_sec, 1010); // Should use the later time
|
||||
assert_eq!(merged.totals[0].total, 30);
|
||||
@@ -443,9 +518,9 @@ mod tests {
|
||||
#[test]
|
||||
fn test_last_minute_latency_merge_empty() {
|
||||
let mut latency1 = LastMinuteLatency::default();
|
||||
let mut latency2 = LastMinuteLatency::default();
|
||||
let latency2 = LastMinuteLatency::default();
|
||||
|
||||
let merged = latency1.merge(&mut latency2);
|
||||
let merged = latency1.merge(&latency2);
|
||||
|
||||
assert_eq!(merged.last_sec, 0);
|
||||
for elem in &merged.totals {
|
||||
@@ -558,7 +633,7 @@ mod tests {
|
||||
n: 5,
|
||||
};
|
||||
|
||||
let cloned = elem.clone();
|
||||
let cloned = elem;
|
||||
assert_eq!(elem.total, cloned.total);
|
||||
assert_eq!(elem.size, cloned.size);
|
||||
assert_eq!(elem.n, cloned.n);
|
||||
@@ -755,3 +830,44 @@ mod tests {
|
||||
assert_eq!(elem.avg(), Duration::from_secs(0));
|
||||
}
|
||||
}
|
||||
|
||||
const SIZE_LAST_ELEM_MARKER: usize = 10; // 这里假设你的 marker 是 10,请根据实际情况修改
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct LastMinuteHistogram {
|
||||
histogram: Vec<LastMinuteLatency>,
|
||||
size: u32,
|
||||
}
|
||||
|
||||
impl LastMinuteHistogram {
|
||||
pub fn merge(&mut self, other: &LastMinuteHistogram) {
|
||||
for i in 0..self.histogram.len() {
|
||||
self.histogram[i].merge(&other.histogram[i]);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add(&mut self, size: i64, t: std::time::Duration) {
|
||||
let index = size_to_tag(size);
|
||||
self.histogram[index].add(&t);
|
||||
}
|
||||
|
||||
pub fn get_avg_data(&mut self) -> [AccElem; SIZE_LAST_ELEM_MARKER] {
|
||||
let mut res = [AccElem::default(); SIZE_LAST_ELEM_MARKER];
|
||||
for (i, elem) in self.histogram.iter_mut().enumerate() {
|
||||
res[i] = elem.get_total();
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
fn size_to_tag(size: i64) -> usize {
|
||||
match size {
|
||||
_ if size < 1024 => 0, // sizeLessThan1KiB
|
||||
_ if size < 1024 * 1024 => 1, // sizeLessThan1MiB
|
||||
_ if size < 10 * 1024 * 1024 => 2, // sizeLessThan10MiB
|
||||
_ if size < 100 * 1024 * 1024 => 3, // sizeLessThan100MiB
|
||||
_ if size < 1024 * 1024 * 1024 => 4, // sizeLessThan1GiB
|
||||
_ => 5, // sizeGreaterThan1GiB
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod bucket_stats;
|
||||
pub mod error;
|
||||
pub mod globals;
|
||||
pub mod last_minute;
|
||||
|
||||
@@ -27,7 +27,7 @@ opentelemetry = { workspace = true }
|
||||
opentelemetry-appender-tracing = { workspace = true, features = ["experimental_use_tracing_span_context", "experimental_metadata_attributes"] }
|
||||
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
|
||||
opentelemetry-stdout = { workspace = true }
|
||||
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "gzip-tonic"] }
|
||||
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "gzip-tonic", "trace", "metrics", "logs", "internal-logs"] }
|
||||
opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"] }
|
||||
rustfs-utils = { workspace = true, features = ["ip"] }
|
||||
serde = { workspace = true }
|
||||
@@ -38,7 +38,7 @@ tracing-error = { workspace = true }
|
||||
tracing-opentelemetry = { workspace = true }
|
||||
tracing-subscriber = { workspace = true, features = ["registry", "std", "fmt", "env-filter", "tracing-log", "time", "local-time", "json"] }
|
||||
tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread", "rt", "time", "macros"] }
|
||||
reqwest = { workspace = true, optional = true, default-features = false }
|
||||
reqwest = { workspace = true, optional = true }
|
||||
serde_json = { workspace = true }
|
||||
sysinfo = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
@@ -329,7 +329,16 @@ pub(crate) fn init_telemetry(config: &OtelConfig) -> OtelGuard {
|
||||
};
|
||||
|
||||
// Configure the flexi_logger
|
||||
let flexi_logger_result = flexi_logger::Logger::with(log_spec)
|
||||
let flexi_logger_result = flexi_logger::Logger::try_with_env_or_str(logger_level)
|
||||
.unwrap_or_else(|e| {
|
||||
eprintln!(
|
||||
"Invalid logger level: {}, using default: {},failed error:{}",
|
||||
logger_level,
|
||||
DEFAULT_LOG_LEVEL,
|
||||
e.to_string()
|
||||
);
|
||||
flexi_logger::Logger::with(log_spec.clone())
|
||||
})
|
||||
.log_to_file(
|
||||
FileSpec::default()
|
||||
.directory(log_directory)
|
||||
|
||||
@@ -18,7 +18,7 @@ async-compression = { version = "0.4.0", features = [
|
||||
] }
|
||||
async_zip = { version = "0.0.17", features = ["tokio"] }
|
||||
zip = "2.2.0"
|
||||
tokio = { version = "1.45.0", features = ["full"] }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tokio-stream = "0.1.17"
|
||||
tokio-tar = { workspace = true }
|
||||
xz2 = { version = "0.1", optional = true, features = ["static"] }
|
||||
|
||||
@@ -28,6 +28,97 @@ pub fn decrypt_data(password: &[u8], data: &[u8]) -> Result<Vec<u8>, crate::Erro
|
||||
}
|
||||
}
|
||||
|
||||
// use argon2::{Argon2, PasswordHasher};
|
||||
// use argon2::password_hash::{SaltString};
|
||||
// use aes_gcm::{Aes256Gcm, Key, Nonce}; // For AES-GCM
|
||||
// use chacha20poly1305::{ChaCha20Poly1305, Key as ChaChaKey, Nonce as ChaChaNonce}; // For ChaCha20
|
||||
// use pbkdf2::pbkdf2;
|
||||
// use sha2::Sha256;
|
||||
// use std::io::{self, Read};
|
||||
// use thiserror::Error;
|
||||
|
||||
// #[derive(Debug, Error)]
|
||||
// pub enum DecryptError {
|
||||
// #[error("unexpected header")]
|
||||
// UnexpectedHeader,
|
||||
// #[error("invalid encryption algorithm ID")]
|
||||
// InvalidAlgorithmId,
|
||||
// #[error("IO error")]
|
||||
// Io(#[from] io::Error),
|
||||
// #[error("decryption error")]
|
||||
// DecryptionError,
|
||||
// }
|
||||
|
||||
// pub fn decrypt_data2<R: Read>(password: &str, mut data: R) -> Result<Vec<u8>, DecryptError> {
|
||||
// // Parse the stream header
|
||||
// let mut hdr = [0u8; 32 + 1 + 8];
|
||||
// if data.read_exact(&mut hdr).is_err() {
|
||||
// return Err(DecryptError::UnexpectedHeader);
|
||||
// }
|
||||
|
||||
// let salt = &hdr[0..32];
|
||||
// let id = hdr[32];
|
||||
// let nonce = &hdr[33..41];
|
||||
|
||||
// let key = match id {
|
||||
// // Argon2id + AES-GCM
|
||||
// 0x01 => {
|
||||
// let salt = SaltString::encode_b64(salt).map_err(|_| DecryptError::DecryptionError)?;
|
||||
// let argon2 = Argon2::default();
|
||||
// let hashed_key = argon2.hash_password(password.as_bytes(), &salt)
|
||||
// .map_err(|_| DecryptError::DecryptionError)?;
|
||||
// hashed_key.hash.unwrap().as_bytes().to_vec()
|
||||
// }
|
||||
// // Argon2id + ChaCha20Poly1305
|
||||
// 0x02 => {
|
||||
// let salt = SaltString::encode_b64(salt).map_err(|_| DecryptError::DecryptionError)?;
|
||||
// let argon2 = Argon2::default();
|
||||
// let hashed_key = argon2.hash_password(password.as_bytes(), &salt)
|
||||
// .map_err(|_| DecryptError::DecryptionError)?;
|
||||
// hashed_key.hash.unwrap().as_bytes().to_vec()
|
||||
// }
|
||||
// // PBKDF2 + AES-GCM
|
||||
// // 0x03 => {
|
||||
// // let mut key = [0u8; 32];
|
||||
// // pbkdf2::<Sha256>(password.as_bytes(), salt, 10000, &mut key);
|
||||
// // key.to_vec()
|
||||
// // }
|
||||
// _ => return Err(DecryptError::InvalidAlgorithmId),
|
||||
// };
|
||||
|
||||
// // Decrypt data using the corresponding cipher
|
||||
// let mut encrypted_data = Vec::new();
|
||||
// data.read_to_end(&mut encrypted_data)?;
|
||||
|
||||
// let plaintext = match id {
|
||||
// 0x01 => {
|
||||
// let cipher = Aes256Gcm::new(Key::from_slice(&key));
|
||||
// let nonce = Nonce::from_slice(nonce);
|
||||
// cipher
|
||||
// .decrypt(nonce, encrypted_data.as_ref())
|
||||
// .map_err(|_| DecryptError::DecryptionError)?
|
||||
// }
|
||||
// 0x02 => {
|
||||
// let cipher = ChaCha20Poly1305::new(ChaChaKey::from_slice(&key));
|
||||
// let nonce = ChaChaNonce::from_slice(nonce);
|
||||
// cipher
|
||||
// .decrypt(nonce, encrypted_data.as_ref())
|
||||
// .map_err(|_| DecryptError::DecryptionError)?
|
||||
// }
|
||||
// 0x03 => {
|
||||
|
||||
// let cipher = Aes256Gcm::new(Key::from_slice(&key));
|
||||
// let nonce = Nonce::from_slice(nonce);
|
||||
// cipher
|
||||
// .decrypt(nonce, encrypted_data.as_ref())
|
||||
// .map_err(|_| DecryptError::DecryptionError)?
|
||||
// }
|
||||
// _ => return Err(DecryptError::InvalidAlgorithmId),
|
||||
// };
|
||||
|
||||
// Ok(plaintext)
|
||||
// }
|
||||
|
||||
#[cfg(any(test, feature = "crypto"))]
|
||||
#[inline]
|
||||
fn decryp<T: aes_gcm::aead::Aead>(stream: T, nonce: &[u8], data: &[u8]) -> Result<Vec<u8>, crate::Error> {
|
||||
|
||||
@@ -18,6 +18,7 @@ backon.workspace = true
|
||||
blake2 = { workspace = true }
|
||||
bytes.workspace = true
|
||||
base64 = "0.22.1"
|
||||
byteorder = { workspace = true }
|
||||
common.workspace = true
|
||||
policy.workspace = true
|
||||
chrono.workspace = true
|
||||
@@ -58,12 +59,8 @@ tempfile.workspace = true
|
||||
tokio = { workspace = true, features = ["io-util", "sync", "signal"] }
|
||||
tokio-stream = { workspace = true }
|
||||
tonic.workspace = true
|
||||
tower.workspace = true
|
||||
byteorder = "1.5.0"
|
||||
xxhash-rust = { version = "0.8.15", features = ["xxh64"] }
|
||||
num = "0.4.3"
|
||||
xxhash-rust = { version = "0.8.15", features = ["xxh64", "xxh3"] }
|
||||
num_cpus = { workspace = true }
|
||||
s3s-policy.workspace = true
|
||||
rand.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
md-5.workspace = true
|
||||
@@ -71,7 +68,10 @@ madmin.workspace = true
|
||||
workers.workspace = true
|
||||
reqwest = { workspace = true }
|
||||
ring = "0.17.14"
|
||||
urlencoding = "2.1.3"
|
||||
aws-sdk-s3 = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
rustfs-rsc = { workspace = true }
|
||||
urlencoding = { workspace = true }
|
||||
smallvec = { workspace = true }
|
||||
shadow-rs.workspace = true
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::error;
|
||||
|
||||
use crate::bucket::target::BucketTarget;
|
||||
use crate::config::com::{read_config, save_config};
|
||||
use crate::{config, new_object_layer_fn};
|
||||
use common::error::{Error, Result};
|
||||
@@ -278,8 +279,11 @@ impl BucketMetadata {
|
||||
self.replication_config_updated_at = updated;
|
||||
}
|
||||
BUCKET_TARGETS_FILE => {
|
||||
self.tagging_config_xml = data;
|
||||
self.tagging_config_updated_at = updated;
|
||||
// let x = data.clone();
|
||||
// let str = std::str::from_utf8(&x).expect("Invalid UTF-8");
|
||||
// println!("update config:{}", str);
|
||||
self.bucket_targets_config_json = data.clone();
|
||||
self.bucket_targets_config_updated_at = updated;
|
||||
}
|
||||
_ => return Err(Error::msg(format!("config file not found : {}", config_file))),
|
||||
}
|
||||
@@ -340,8 +344,10 @@ impl BucketMetadata {
|
||||
if !self.replication_config_xml.is_empty() {
|
||||
self.replication_config = Some(deserialize::<ReplicationConfiguration>(&self.replication_config_xml)?);
|
||||
}
|
||||
//let temp = self.bucket_targets_config_json.clone();
|
||||
if !self.bucket_targets_config_json.is_empty() {
|
||||
self.bucket_target_config = Some(BucketTargets::unmarshal(&self.bucket_targets_config_json)?);
|
||||
let arr: Vec<BucketTarget> = serde_json::from_slice(&self.bucket_targets_config_json)?;
|
||||
self.bucket_target_config = Some(BucketTargets { targets: arr });
|
||||
} else {
|
||||
self.bucket_target_config = Some(BucketTargets::default())
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::{collections::HashMap, sync::Arc};
|
||||
use crate::bucket::error::BucketMetadataError;
|
||||
use crate::bucket::metadata::{load_bucket_metadata_parse, BUCKET_LIFECYCLE_CONFIG};
|
||||
use crate::bucket::utils::is_meta_bucketname;
|
||||
use crate::cmd::bucket_targets;
|
||||
use crate::config::error::ConfigError;
|
||||
use crate::disk::error::DiskError;
|
||||
use crate::global::{is_dist_erasure, is_erasure, new_object_layer_fn, GLOBAL_Endpoints};
|
||||
@@ -228,7 +229,9 @@ impl BucketMetadataSys {
|
||||
match res {
|
||||
Ok(res) => {
|
||||
if let Some(bucket) = buckets.get(idx) {
|
||||
mp.insert(bucket.clone(), Arc::new(res));
|
||||
let x = Arc::new(res);
|
||||
mp.insert(bucket.clone(), x.clone());
|
||||
bucket_targets::init_bucket_targets(bucket, x.clone()).await;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -340,6 +343,7 @@ impl BucketMetadataSys {
|
||||
}
|
||||
|
||||
pub async fn get_config_from_disk(&self, bucket: &str) -> Result<BucketMetadata> {
|
||||
println!("load data from disk");
|
||||
if is_meta_bucketname(bucket) {
|
||||
return Err(Error::msg("errInvalidArgument"));
|
||||
}
|
||||
@@ -549,7 +553,12 @@ impl BucketMetadataSys {
|
||||
|
||||
pub async fn get_replication_config(&self, bucket: &str) -> Result<(ReplicationConfiguration, OffsetDateTime)> {
|
||||
let (bm, reload) = match self.get_config(bucket).await {
|
||||
Ok(res) => res,
|
||||
Ok(res) => {
|
||||
if res.0.replication_config.is_none() {
|
||||
return Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound));
|
||||
}
|
||||
res
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("get_replication_config err {:?}", &err);
|
||||
return if config::error::is_err_config_not_found(&err) {
|
||||
@@ -564,7 +573,7 @@ impl BucketMetadataSys {
|
||||
if reload {
|
||||
// TODO: globalBucketTargetSys
|
||||
}
|
||||
|
||||
//println!("549 {:?}", config.clone());
|
||||
Ok((config.clone(), bm.replication_config_updated_at))
|
||||
} else {
|
||||
Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound))
|
||||
@@ -584,9 +593,12 @@ impl BucketMetadataSys {
|
||||
}
|
||||
};
|
||||
|
||||
println!("573");
|
||||
|
||||
if let Some(config) = &bm.bucket_target_config {
|
||||
if reload {
|
||||
// TODO: globalBucketTargetSys
|
||||
//config.
|
||||
}
|
||||
|
||||
Ok(config.clone())
|
||||
|
||||
@@ -4,8 +4,9 @@ pub mod metadata_sys;
|
||||
pub mod object_lock;
|
||||
pub mod policy_sys;
|
||||
mod quota;
|
||||
pub mod replication;
|
||||
pub mod tagging;
|
||||
mod target;
|
||||
pub mod target;
|
||||
pub mod utils;
|
||||
pub mod versioning;
|
||||
pub mod versioning_sys;
|
||||
|
||||
27
ecstore/src/bucket/replication/datatypes.rs
Normal file
27
ecstore/src/bucket/replication/datatypes.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
// Replication status type for x-amz-replication-status header
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum StatusType {
|
||||
Pending,
|
||||
Completed,
|
||||
CompletedLegacy,
|
||||
Failed,
|
||||
Replica,
|
||||
}
|
||||
|
||||
impl StatusType {
|
||||
// Converts the enum variant to its string representation
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
StatusType::Pending => "PENDING",
|
||||
StatusType::Completed => "COMPLETED",
|
||||
StatusType::CompletedLegacy => "COMPLETE",
|
||||
StatusType::Failed => "FAILED",
|
||||
StatusType::Replica => "REPLICA",
|
||||
}
|
||||
}
|
||||
|
||||
// Checks if the status is empty (not set)
|
||||
pub fn is_empty(&self) -> bool {
|
||||
matches!(self, StatusType::Pending) // Adjust this as needed
|
||||
}
|
||||
}
|
||||
1
ecstore/src/bucket/replication/mod.rs
Normal file
1
ecstore/src/bucket/replication/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod datatypes;
|
||||
@@ -25,6 +25,7 @@ pub fn encode_tags(tags: Vec<Tag>) -> String {
|
||||
|
||||
for tag in tags.iter() {
|
||||
if let (Some(k), Some(v)) = (tag.key.as_ref(), tag.value.as_ref()) {
|
||||
//encoded.append_pair(k.as_ref().unwrap().as_str(), v.as_ref().unwrap().as_str());
|
||||
encoded.append_pair(k.as_str(), v.as_str());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
use common::error::Result;
|
||||
use rmp_serde::Serializer as rmpSerializer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
|
||||
pub struct Credentials {
|
||||
access_key: String,
|
||||
secret_key: String,
|
||||
session_token: Option<String>,
|
||||
expiration: Option<OffsetDateTime>,
|
||||
#[serde(rename = "accessKey")]
|
||||
pub access_key: String,
|
||||
#[serde(rename = "secretKey")]
|
||||
pub secret_key: String,
|
||||
pub session_token: Option<String>,
|
||||
pub expiration: Option<chrono::DateTime<chrono::Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
|
||||
@@ -20,52 +21,53 @@ pub enum ServiceType {
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
|
||||
pub struct LatencyStat {
|
||||
curr: Duration, // 当前延迟
|
||||
avg: Duration, // 平均延迟
|
||||
max: Duration, // 最大延迟
|
||||
curr: u64, // 当前延迟
|
||||
avg: u64, // 平均延迟
|
||||
max: u64, // 最大延迟
|
||||
}
|
||||
|
||||
// 定义 BucketTarget 结构体
|
||||
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
|
||||
pub struct BucketTarget {
|
||||
source_bucket: String,
|
||||
#[serde(rename = "sourcebucket")]
|
||||
pub source_bucket: String,
|
||||
|
||||
endpoint: String,
|
||||
pub endpoint: String,
|
||||
|
||||
credentials: Option<Credentials>,
|
||||
|
||||
target_bucket: String,
|
||||
pub credentials: Option<Credentials>,
|
||||
#[serde(rename = "targetbucket")]
|
||||
pub target_bucket: String,
|
||||
|
||||
secure: bool,
|
||||
|
||||
path: Option<String>,
|
||||
pub path: Option<String>,
|
||||
|
||||
api: Option<String>,
|
||||
|
||||
arn: Option<String>,
|
||||
pub arn: Option<String>,
|
||||
#[serde(rename = "type")]
|
||||
pub type_: Option<String>,
|
||||
|
||||
type_: ServiceType,
|
||||
|
||||
region: Option<String>,
|
||||
pub region: Option<String>,
|
||||
|
||||
bandwidth_limit: Option<i64>,
|
||||
|
||||
#[serde(rename = "replicationSync")]
|
||||
replication_sync: bool,
|
||||
|
||||
storage_class: Option<String>,
|
||||
|
||||
health_check_duration: Option<Duration>,
|
||||
|
||||
#[serde(rename = "healthCheckDuration")]
|
||||
health_check_duration: u64,
|
||||
#[serde(rename = "disableProxy")]
|
||||
disable_proxy: bool,
|
||||
|
||||
reset_before_date: Option<OffsetDateTime>,
|
||||
|
||||
#[serde(rename = "resetBeforeDate")]
|
||||
reset_before_date: String,
|
||||
reset_id: Option<String>,
|
||||
|
||||
total_downtime: Duration,
|
||||
#[serde(rename = "totalDowntime")]
|
||||
total_downtime: u64,
|
||||
|
||||
last_online: Option<OffsetDateTime>,
|
||||
|
||||
#[serde(rename = "isOnline")]
|
||||
online: bool,
|
||||
|
||||
latency: LatencyStat,
|
||||
@@ -73,6 +75,15 @@ pub struct BucketTarget {
|
||||
deployment_id: Option<String>,
|
||||
|
||||
edge: bool,
|
||||
#[serde(rename = "edgeSyncBeforeExpiry")]
|
||||
edge_sync_before_expiry: bool,
|
||||
}
|
||||
|
||||
impl BucketTarget {
|
||||
pub fn is_empty(self) -> bool {
|
||||
//self.target_bucket.is_empty() && self.endpoint.is_empty() && self.arn.is_empty()
|
||||
self.target_bucket.is_empty() && self.endpoint.is_empty() && self.arn.is_none()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
|
||||
@@ -93,4 +104,18 @@ impl BucketTargets {
|
||||
let t: BucketTargets = rmp_serde::from_slice(buf)?;
|
||||
Ok(t)
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
if self.targets.is_empty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
for target in &self.targets {
|
||||
if !target.clone().is_empty() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
2755
ecstore/src/cmd/bucket_replication.rs
Normal file
2755
ecstore/src/cmd/bucket_replication.rs
Normal file
File diff suppressed because it is too large
Load Diff
55
ecstore/src/cmd/bucket_replication_utils.rs
Normal file
55
ecstore/src/cmd/bucket_replication_utils.rs
Normal file
@@ -0,0 +1,55 @@
|
||||
use std::collections::HashMap;
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
// Representation of the replication status
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum StatusType {
|
||||
Pending,
|
||||
Completed,
|
||||
CompletedLegacy,
|
||||
Failed,
|
||||
Replica,
|
||||
}
|
||||
|
||||
// Representation of version purge status type (customize as needed)
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum VersionPurgeStatusType {
|
||||
Pending,
|
||||
Completed,
|
||||
Failed,
|
||||
}
|
||||
|
||||
// ReplicationState struct definition
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ReplicationState {
|
||||
// Timestamp when the last replica update was received
|
||||
pub replica_time_stamp: DateTime<Utc>,
|
||||
|
||||
// Replica status
|
||||
pub replica_status: StatusType,
|
||||
|
||||
// Represents DeleteMarker replication state
|
||||
pub delete_marker: bool,
|
||||
|
||||
// Timestamp when the last replication activity happened
|
||||
pub replication_time_stamp: DateTime<Utc>,
|
||||
|
||||
// Stringified representation of all replication activity
|
||||
pub replication_status_internal: String,
|
||||
|
||||
// Stringified representation of all version purge statuses
|
||||
// Example format: "arn1=PENDING;arn2=COMPLETED;"
|
||||
pub version_purge_status_internal: String,
|
||||
|
||||
// Stringified representation of replication decision for each target
|
||||
pub replicate_decision_str: String,
|
||||
|
||||
// Map of ARN -> replication status for ongoing replication activity
|
||||
pub targets: HashMap<String, StatusType>,
|
||||
|
||||
// Map of ARN -> VersionPurgeStatus for all the targets
|
||||
pub purge_targets: HashMap<String, VersionPurgeStatusType>,
|
||||
|
||||
// Map of ARN -> stringified reset id and timestamp for all the targets
|
||||
pub reset_statuses_map: HashMap<String, String>,
|
||||
}
|
||||
871
ecstore/src/cmd/bucket_targets.rs
Normal file
871
ecstore/src/cmd/bucket_targets.rs
Normal file
@@ -0,0 +1,871 @@
|
||||
#![allow(unused_variables)]
|
||||
#![allow(dead_code)]
|
||||
use crate::{
|
||||
bucket::{self, target::BucketTargets},
|
||||
new_object_layer_fn, peer, store_api,
|
||||
};
|
||||
use crate::{
|
||||
bucket::{metadata_sys, target::BucketTarget},
|
||||
endpoints::Node,
|
||||
peer::{PeerS3Client, RemotePeerS3Client},
|
||||
StorageAPI,
|
||||
};
|
||||
//use tokio::sync::RwLock;
|
||||
use aws_sdk_s3::Client as S3Client;
|
||||
use chrono::Utc;
|
||||
use lazy_static::lazy_static;
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
pub struct TClient {
|
||||
pub s3cli: S3Client,
|
||||
pub remote_peer_client: peer::RemotePeerS3Client,
|
||||
pub arn: String,
|
||||
}
|
||||
impl TClient {
|
||||
pub fn new(s3cli: S3Client, remote_peer_client: RemotePeerS3Client, arn: String) -> Self {
|
||||
TClient {
|
||||
s3cli,
|
||||
remote_peer_client,
|
||||
arn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EpHealth {
|
||||
pub endpoint: String,
|
||||
pub scheme: String,
|
||||
pub online: bool,
|
||||
pub last_online: SystemTime,
|
||||
pub last_hc_at: SystemTime,
|
||||
pub offline_duration: Duration,
|
||||
pub latency: LatencyStat, // Assuming LatencyStat is a custom struct
|
||||
}
|
||||
|
||||
impl EpHealth {
|
||||
pub fn new(
|
||||
endpoint: String,
|
||||
scheme: String,
|
||||
online: bool,
|
||||
last_online: SystemTime,
|
||||
last_hc_at: SystemTime,
|
||||
offline_duration: Duration,
|
||||
latency: LatencyStat,
|
||||
) -> Self {
|
||||
EpHealth {
|
||||
endpoint,
|
||||
scheme,
|
||||
online,
|
||||
last_online,
|
||||
last_hc_at,
|
||||
offline_duration,
|
||||
latency,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LatencyStat {
|
||||
// Define the fields of LatencyStat as per your requirements
|
||||
}
|
||||
|
||||
pub struct ArnTarget {
|
||||
client: TargetClient,
|
||||
last_refresh: chrono::DateTime<Utc>,
|
||||
}
|
||||
impl ArnTarget {
|
||||
pub fn new(bucket: String, endpoint: String, ak: String, sk: String) -> Self {
|
||||
Self {
|
||||
client: TargetClient {
|
||||
bucket,
|
||||
storage_class: "STANDRD".to_string(),
|
||||
disable_proxy: false,
|
||||
health_check_duration: Duration::from_secs(100),
|
||||
endpoint,
|
||||
reset_id: "0".to_string(),
|
||||
replicate_sync: false,
|
||||
secure: false,
|
||||
arn: "".to_string(),
|
||||
client: reqwest::Client::new(),
|
||||
ak,
|
||||
sk,
|
||||
},
|
||||
last_refresh: Utc::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pub fn get_s3client_from_para(
|
||||
// ak: &str,
|
||||
// sk: &str,
|
||||
// url: &str,
|
||||
// _region: &str,
|
||||
// ) -> Result<S3Client, Box<dyn Error>> {
|
||||
// let credentials = Credentials::new(ak, sk, None, None, "");
|
||||
// let region = Region::new("us-east-1".to_string());
|
||||
|
||||
// let config = Config::builder()
|
||||
// .region(region)
|
||||
// .endpoint_url(url.to_string())
|
||||
// .credentials_provider(credentials)
|
||||
// .behavior_version(BehaviorVersion::latest()) // Adjust as necessary
|
||||
// .build();
|
||||
// Ok(S3Client::from_conf(config))
|
||||
// }
|
||||
|
||||
pub struct BucketTargetSys {
|
||||
arn_remote_map: Arc<RwLock<HashMap<String, ArnTarget>>>,
|
||||
targets_map: Arc<RwLock<HashMap<String, Vec<bucket::target::BucketTarget>>>>,
|
||||
hc: HashMap<String, EpHealth>,
|
||||
//store:Option<Arc<ecstore::store::ECStore>>,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
pub static ref GLOBAL_Bucket_Target_Sys: std::sync::OnceLock<BucketTargetSys> = BucketTargetSys::new().into();
|
||||
}
|
||||
|
||||
//#[derive(Debug)]
|
||||
// pub enum SetTargetError {
|
||||
// NotFound,
|
||||
// }
|
||||
|
||||
pub async fn get_bucket_target_client(bucket: &str, arn: &str) -> Result<TargetClient, SetTargetError> {
|
||||
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
|
||||
sys.get_remote_target_client2(arn).await
|
||||
} else {
|
||||
Err(SetTargetError::TargetNotFound(bucket.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BucketRemoteTargetNotFound {
|
||||
pub bucket: String,
|
||||
}
|
||||
|
||||
pub async fn init_bucket_targets(bucket: &str, meta: Arc<bucket::metadata::BucketMetadata>) {
|
||||
println!("140 {}", bucket);
|
||||
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
|
||||
if let Some(tgts) = meta.bucket_target_config.clone() {
|
||||
for tgt in tgts.targets {
|
||||
warn!("ak and sk is:{:?}", tgt.credentials);
|
||||
let _ = sys.set_target(bucket, &tgt, false, true).await;
|
||||
//sys.targets_map.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn remove_bucket_target(bucket: &str, arn_str: &str) {
|
||||
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
|
||||
let _ = sys.remove_target(bucket, arn_str).await;
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for BucketTargetSys {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl BucketTargetSys {
|
||||
pub fn new() -> Self {
|
||||
BucketTargetSys {
|
||||
arn_remote_map: Arc::new(RwLock::new(HashMap::new())),
|
||||
targets_map: Arc::new(RwLock::new(HashMap::new())),
|
||||
hc: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_bucket_targets(&self, bucket: &str) -> Result<BucketTargets, BucketRemoteTargetNotFound> {
|
||||
let targets_map = self.targets_map.read().await;
|
||||
if let Some(targets) = targets_map.get(bucket) {
|
||||
Ok(BucketTargets {
|
||||
targets: targets.clone(),
|
||||
})
|
||||
} else {
|
||||
Err(BucketRemoteTargetNotFound {
|
||||
bucket: bucket.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_targets(&self, bucket: Option<&str>, _arn_type: Option<&str>) -> Vec<BucketTarget> {
|
||||
let _ = _arn_type;
|
||||
//let health_stats = self.health_stats();
|
||||
|
||||
let mut targets = Vec::new();
|
||||
|
||||
if let Some(bucket_name) = bucket {
|
||||
if let Ok(ts) = self.list_bucket_targets(bucket_name).await {
|
||||
for t in ts.targets {
|
||||
//if arn_type.map_or(true, |arn| t.target_type == arn) {
|
||||
//if let Some(hs) = health_stats.get(&t.url().host) {
|
||||
// t.total_downtime = hs.offline_duration;
|
||||
// t.online = hs.online;
|
||||
// t.last_online = hs.last_online;
|
||||
// t.latency = LatencyStat {
|
||||
// curr: hs.latency.curr,
|
||||
// avg: hs.latency.avg,
|
||||
// max: hs.latency.peak,
|
||||
// };
|
||||
//}
|
||||
targets.push(t.clone());
|
||||
//}
|
||||
}
|
||||
}
|
||||
return targets;
|
||||
}
|
||||
|
||||
// Locking and iterating over all targets in the system
|
||||
let targets_map = self.targets_map.read().await;
|
||||
for tgts in targets_map.values() {
|
||||
for t in tgts {
|
||||
//if arn_type.map_or(true, |arn| t.target_type == arn) {
|
||||
// if let Some(hs) = health_stats.get(&t.url().host) {
|
||||
// t.total_downtime = hs.offline_duration;
|
||||
// t.online = hs.online;
|
||||
// t.last_online = hs.last_online;
|
||||
// t.latency = LatencyStat {
|
||||
// curr: hs.latency.curr,
|
||||
// avg: hs.latency.avg,
|
||||
// max: hs.latency.peak,
|
||||
// };
|
||||
// }
|
||||
targets.push(t.clone());
|
||||
//}
|
||||
}
|
||||
}
|
||||
|
||||
targets
|
||||
}
|
||||
|
||||
pub async fn remove_target(&self, bucket: &str, arn_str: &str) -> Result<(), SetTargetError> {
|
||||
//to do need lock;
|
||||
let mut targets_map = self.targets_map.write().await;
|
||||
let tgts = targets_map.get(bucket);
|
||||
let mut arn_remotes_map = self.arn_remote_map.write().await;
|
||||
if tgts.is_none() {
|
||||
//Err(SetTargetError::TargetNotFound(bucket.to_string()));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let tgts = tgts.unwrap(); // 安全解引用
|
||||
let mut targets = Vec::with_capacity(tgts.len());
|
||||
let mut found = false;
|
||||
|
||||
// 遍历 targets,找出不匹配的 ARN
|
||||
for tgt in tgts {
|
||||
if tgt.arn != Some(arn_str.to_string()) {
|
||||
targets.push(tgt.clone()); // 克隆符合条件的项
|
||||
} else {
|
||||
found = true; // 找到匹配的 ARN
|
||||
}
|
||||
}
|
||||
|
||||
// 如果没有找到匹配的 ARN,则返回错误
|
||||
if !found {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// 更新 targets_map
|
||||
targets_map.insert(bucket.to_string(), targets);
|
||||
arn_remotes_map.remove(arn_str);
|
||||
|
||||
let targets = self.list_targets(Some(&bucket), None).await;
|
||||
println!("targets is {}", targets.len());
|
||||
match serde_json::to_vec(&targets) {
|
||||
Ok(json) => {
|
||||
let _ = metadata_sys::update(bucket, "bucket-targets.json", json).await;
|
||||
}
|
||||
Err(e) => {
|
||||
println!("序列化失败{}", e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_remote_arn(&self, bucket: &str, target: Option<&BucketTarget>, depl_id: &str) -> (Option<String>, bool) {
|
||||
if target.is_none() {
|
||||
return (None, false);
|
||||
}
|
||||
|
||||
let target = target.unwrap();
|
||||
|
||||
let targets_map = self.targets_map.read().await;
|
||||
|
||||
// 获取锁以访问 arn_remote_map
|
||||
let mut _arn_remotes_map = self.arn_remote_map.read().await;
|
||||
if let Some(tgts) = targets_map.get(bucket) {
|
||||
for tgt in tgts {
|
||||
if tgt.type_ == target.type_
|
||||
&& tgt.target_bucket == target.target_bucket
|
||||
&& tgt.endpoint == target.endpoint
|
||||
&& tgt.credentials.as_ref().unwrap().access_key == target.credentials.as_ref().unwrap().access_key
|
||||
{
|
||||
return (tgt.arn.clone(), true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if !target.type_.is_valid() {
|
||||
// return (None, false);
|
||||
// }
|
||||
|
||||
println!("generate_arn");
|
||||
|
||||
(Some(generate_arn(target.clone(), depl_id.to_string())), false)
|
||||
}
|
||||
|
||||
pub async fn get_remote_target_client2(&self, arn: &str) -> Result<TargetClient, SetTargetError> {
|
||||
let map = self.arn_remote_map.read().await;
|
||||
info!("get remote target client and arn is: {}", arn);
|
||||
if let Some(value) = map.get(arn) {
|
||||
let mut x = value.client.clone();
|
||||
x.arn = arn.to_string();
|
||||
Ok(x)
|
||||
} else {
|
||||
error!("not find target");
|
||||
Err(SetTargetError::TargetNotFound(arn.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
// pub async fn get_remote_target_client(&self, _tgt: &BucketTarget) -> Result<TargetClient, SetTargetError> {
|
||||
// // Mocked implementation for obtaining a remote client
|
||||
// let tcli = TargetClient {
|
||||
// bucket: _tgt.target_bucket.clone(),
|
||||
// storage_class: "STANDRD".to_string(),
|
||||
// disable_proxy: false,
|
||||
// health_check_duration: Duration::from_secs(100),
|
||||
// endpoint: _tgt.endpoint.clone(),
|
||||
// reset_id: "0".to_string(),
|
||||
// replicate_sync: false,
|
||||
// secure: false,
|
||||
// arn: "".to_string(),
|
||||
// client: reqwest::Client::new(),
|
||||
// ak: _tgt.
|
||||
|
||||
// };
|
||||
// Ok(tcli)
|
||||
// }
|
||||
// pub async fn get_remote_target_client_with_bucket(&self, _bucket: String) -> Result<TargetClient, SetTargetError> {
|
||||
// // Mocked implementation for obtaining a remote client
|
||||
// let tcli = TargetClient {
|
||||
// bucket: _tgt.target_bucket.clone(),
|
||||
// storage_class: "STANDRD".to_string(),
|
||||
// disable_proxy: false,
|
||||
// health_check_duration: Duration::from_secs(100),
|
||||
// endpoint: _tgt.endpoint.clone(),
|
||||
// reset_id: "0".to_string(),
|
||||
// replicate_sync: false,
|
||||
// secure: false,
|
||||
// arn: "".to_string(),
|
||||
// client: reqwest::Client::new(),
|
||||
// };
|
||||
// Ok(tcli)
|
||||
// }
|
||||
|
||||
async fn local_is_bucket_versioned(&self, _bucket: &str) -> bool {
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return false;
|
||||
};
|
||||
//store.get_bucket_info(bucket, opts)
|
||||
|
||||
// let binfo:BucketInfo = store
|
||||
// .get_bucket_info(bucket, &ecstore::store_api::BucketOptions::default()).await;
|
||||
match store.get_bucket_info(_bucket, &store_api::BucketOptions::default()).await {
|
||||
Ok(info) => {
|
||||
println!("Bucket Info: {:?}", info);
|
||||
info.versionning
|
||||
}
|
||||
Err(err) => {
|
||||
eprintln!("Error: {:?}", err);
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn is_bucket_versioned(&self, _bucket: &str) -> bool {
|
||||
return true;
|
||||
// let url_str = "http://127.0.0.1:9001";
|
||||
|
||||
// // 转换为 Url 类型
|
||||
// let parsed_url = url::Url::parse(url_str).unwrap();
|
||||
|
||||
// let node = Node {
|
||||
// url: parsed_url,
|
||||
// pools: vec![],
|
||||
// is_local: false,
|
||||
// grid_host: "".to_string(),
|
||||
// };
|
||||
// let cli = ecstore::peer::RemotePeerS3Client::new(Some(node), None);
|
||||
|
||||
// match cli.get_bucket_info(_bucket, &ecstore::store_api::BucketOptions::default()).await
|
||||
// {
|
||||
// Ok(info) => {
|
||||
// println!("Bucket Info: {:?}", info);
|
||||
// info.versionning
|
||||
// }
|
||||
// Err(err) => {
|
||||
// eprintln!("Error: {:?}", err);
|
||||
// return false;
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
pub async fn set_target(&self, bucket: &str, tgt: &BucketTarget, update: bool, fromdisk: bool) -> Result<(), SetTargetError> {
|
||||
// if !tgt.type_.is_valid() && !update {
|
||||
// return Err(SetTargetError::InvalidTargetType(bucket.to_string()));
|
||||
// }
|
||||
|
||||
//let client = self.get_remote_target_client(tgt).await?;
|
||||
if tgt.type_ == Some("replication".to_string()) && !fromdisk {
|
||||
let versioning_config = self.local_is_bucket_versioned(bucket).await;
|
||||
if !versioning_config {
|
||||
// println!("111111111");
|
||||
return Err(SetTargetError::TargetNotVersioned(bucket.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let url_str = format!("http://{}", tgt.endpoint.clone());
|
||||
|
||||
println!("url str is {}", url_str);
|
||||
// 转换为 Url 类型
|
||||
let parsed_url = url::Url::parse(&url_str).unwrap();
|
||||
|
||||
let node = Node {
|
||||
url: parsed_url,
|
||||
pools: vec![],
|
||||
is_local: false,
|
||||
grid_host: "".to_string(),
|
||||
};
|
||||
|
||||
let cli = peer::RemotePeerS3Client::new(Some(node), None);
|
||||
|
||||
match cli
|
||||
.get_bucket_info(&tgt.target_bucket, &store_api::BucketOptions::default())
|
||||
.await
|
||||
{
|
||||
Ok(info) => {
|
||||
println!("Bucket Info: {:?}", info);
|
||||
if !info.versionning {
|
||||
println!("2222222222 {}", info.versionning);
|
||||
return Err(SetTargetError::TargetNotVersioned(tgt.target_bucket.to_string()));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
println!("remote bucket 369 is:{}", tgt.target_bucket);
|
||||
eprintln!("Error: {:?}", err);
|
||||
return Err(SetTargetError::SourceNotVersioned(tgt.target_bucket.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
//if tgt.target_type == BucketTargetType::ReplicationService {
|
||||
// Check if target is a MinIO server and alive
|
||||
// let hc_result = tokio::time::timeout(Duration::from_secs(3), client.health_check(&tgt.endpoint)).await;
|
||||
// match hc_result {
|
||||
// Ok(Ok(true)) => {} // Server is alive
|
||||
// Ok(Ok(false)) | Ok(Err(_)) | Err(_) => {
|
||||
// return Err(SetTargetError::HealthCheckFailed(tgt.target_bucket.clone()));
|
||||
// }
|
||||
// }
|
||||
|
||||
//Lock and update target maps
|
||||
let mut targets_map = self.targets_map.write().await;
|
||||
let mut arn_remotes_map = self.arn_remote_map.write().await;
|
||||
|
||||
let targets = targets_map.entry(bucket.to_string()).or_default();
|
||||
let mut found = false;
|
||||
|
||||
for existing_target in targets.iter_mut() {
|
||||
println!("418 exist:{}", existing_target.source_bucket.clone());
|
||||
if existing_target.type_ == tgt.type_ {
|
||||
if existing_target.arn == tgt.arn {
|
||||
if !update {
|
||||
return Err(SetTargetError::TargetAlreadyExists(existing_target.target_bucket.clone()));
|
||||
}
|
||||
*existing_target = tgt.clone();
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if existing_target.endpoint == tgt.endpoint {
|
||||
println!("endpoint is same:{}", tgt.endpoint.clone());
|
||||
return Err(SetTargetError::TargetAlreadyExists(existing_target.target_bucket.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !found && !update {
|
||||
println!("437 exist:{}", tgt.arn.clone().unwrap());
|
||||
targets.push(tgt.clone());
|
||||
}
|
||||
let arntgt: ArnTarget = ArnTarget::new(
|
||||
tgt.target_bucket.clone(),
|
||||
tgt.endpoint.clone(),
|
||||
tgt.credentials.clone().unwrap().access_key.clone(),
|
||||
tgt.credentials.clone().unwrap().secret_key,
|
||||
);
|
||||
|
||||
arn_remotes_map.insert(tgt.arn.clone().unwrap().clone(), arntgt);
|
||||
//self.update_bandwidth_limit(bucket, &tgt.arn, tgt.bandwidth_limit).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TargetClient {
|
||||
pub client: reqwest::Client, // Using reqwest HTTP client
|
||||
pub health_check_duration: Duration,
|
||||
pub bucket: String, // Remote bucket target
|
||||
pub replicate_sync: bool,
|
||||
pub storage_class: String, // Storage class on remote
|
||||
pub disable_proxy: bool,
|
||||
pub arn: String, // ARN to uniquely identify remote target
|
||||
pub reset_id: String,
|
||||
pub endpoint: String,
|
||||
pub secure: bool,
|
||||
pub ak: String,
|
||||
pub sk: String,
|
||||
}
|
||||
|
||||
impl TargetClient {
|
||||
pub fn new(
|
||||
client: reqwest::Client,
|
||||
health_check_duration: Duration,
|
||||
bucket: String,
|
||||
replicate_sync: bool,
|
||||
storage_class: String,
|
||||
disable_proxy: bool,
|
||||
arn: String,
|
||||
reset_id: String,
|
||||
endpoint: String,
|
||||
secure: bool,
|
||||
ak: String,
|
||||
sk: String,
|
||||
) -> Self {
|
||||
TargetClient {
|
||||
client,
|
||||
health_check_duration,
|
||||
bucket,
|
||||
replicate_sync,
|
||||
storage_class,
|
||||
disable_proxy,
|
||||
arn,
|
||||
reset_id,
|
||||
endpoint,
|
||||
secure,
|
||||
ak,
|
||||
sk,
|
||||
}
|
||||
}
|
||||
pub async fn bucket_exists(&self, _bucket: &str) -> Result<bool, SetTargetError> {
|
||||
Ok(true) // Mocked implementation
|
||||
}
|
||||
}
|
||||
use tracing::{error, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct VersioningConfig {
|
||||
pub enabled: bool,
|
||||
}
|
||||
|
||||
impl VersioningConfig {
|
||||
pub fn is_enabled(&self) -> bool {
|
||||
self.enabled
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Client;
|
||||
|
||||
impl Client {
|
||||
pub async fn bucket_exists(&self, _bucket: &str) -> Result<bool, SetTargetError> {
|
||||
Ok(true) // Mocked implementation
|
||||
}
|
||||
|
||||
pub async fn get_bucket_versioning(&self, _bucket: &str) -> Result<VersioningConfig, SetTargetError> {
|
||||
Ok(VersioningConfig { enabled: true })
|
||||
}
|
||||
|
||||
pub async fn health_check(&self, _endpoint: &str) -> Result<bool, SetTargetError> {
|
||||
Ok(true) // Mocked health check
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct ServiceType(String);
|
||||
|
||||
impl ServiceType {
|
||||
pub fn is_valid(&self) -> bool {
|
||||
!self.0.is_empty() // 根据需求添加具体的验证逻辑
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct ARN {
|
||||
pub arn_type: String,
|
||||
pub id: String,
|
||||
pub region: String,
|
||||
pub bucket: String,
|
||||
}
|
||||
|
||||
impl ARN {
|
||||
/// 检查 ARN 是否为空
|
||||
pub fn is_empty(&self) -> bool {
|
||||
//!self.arn_type.is_valid()
|
||||
false
|
||||
}
|
||||
|
||||
/// 将 ARN 转为字符串格式
|
||||
pub fn to_string(&self) -> String {
|
||||
format!("arn:minio:{}:{}:{}:{}", self.arn_type, self.region, self.id, self.bucket)
|
||||
}
|
||||
|
||||
/// 从字符串解析 ARN
|
||||
pub fn parse(s: &str) -> Result<Self, String> {
|
||||
// ARN 必须是格式 arn:minio:<Type>:<REGION>:<ID>:<remote-bucket>
|
||||
if !s.starts_with("arn:minio:") {
|
||||
return Err(format!("Invalid ARN {}", s));
|
||||
}
|
||||
|
||||
let tokens: Vec<&str> = s.split(':').collect();
|
||||
if tokens.len() != 6 || tokens[4].is_empty() || tokens[5].is_empty() {
|
||||
return Err(format!("Invalid ARN {}", s));
|
||||
}
|
||||
|
||||
Ok(ARN {
|
||||
arn_type: tokens[2].to_string(),
|
||||
region: tokens[3].to_string(),
|
||||
id: tokens[4].to_string(),
|
||||
bucket: tokens[5].to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// 实现 `Display` trait,使得可以直接使用 `format!` 或 `{}` 输出 ARN
|
||||
impl std::fmt::Display for ARN {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
fn must_get_uuid() -> String {
|
||||
Uuid::new_v4().to_string()
|
||||
// match Uuid::new_v4() {
|
||||
// Ok(uuid) => uuid.to_string(),
|
||||
// Err(err) => {
|
||||
// error!("Critical error: {}", err);
|
||||
// panic!("Failed to generate UUID: {}", err); // Ensures similar behavior as Go's logger.CriticalIf
|
||||
// }
|
||||
// }
|
||||
}
|
||||
fn generate_arn(target: BucketTarget, depl_id: String) -> String {
|
||||
let mut uuid: String = depl_id;
|
||||
if uuid.is_empty() {
|
||||
uuid = must_get_uuid();
|
||||
}
|
||||
|
||||
let arn: ARN = ARN {
|
||||
arn_type: target.type_.unwrap(),
|
||||
id: (uuid),
|
||||
region: "us-east-1".to_string(),
|
||||
bucket: (target.target_bucket),
|
||||
};
|
||||
arn.to_string()
|
||||
}
|
||||
|
||||
// use std::collections::HashMap;
|
||||
// use std::sync::{Arc, Mutex, RwLock};
|
||||
// use std::time::Duration;
|
||||
// use tokio::time::timeout;
|
||||
// use tokio::sync::RwLock as AsyncRwLock;
|
||||
// use serde::Deserialize;
|
||||
// use thiserror::Error;
|
||||
|
||||
// #[derive(Debug, Clone, PartialEq)]
|
||||
// pub enum BucketTargetType {
|
||||
// ReplicationService,
|
||||
// // Add other service types as needed
|
||||
// }
|
||||
|
||||
// impl BucketTargetType {
|
||||
// pub fn is_valid(&self) -> bool {
|
||||
// matches!(self, BucketTargetType::ReplicationService)
|
||||
// }
|
||||
// }
|
||||
|
||||
// #[derive(Debug, Clone)]
|
||||
// pub struct BucketTarget {
|
||||
// pub arn: String,
|
||||
// pub target_bucket: String,
|
||||
// pub endpoint: String,
|
||||
// pub credentials: Credentials,
|
||||
// pub secure: bool,
|
||||
// pub bandwidth_limit: Option<u64>,
|
||||
// pub target_type: BucketTargetType,
|
||||
// }
|
||||
|
||||
// #[derive(Debug, Clone)]
|
||||
// pub struct Credentials {
|
||||
// pub access_key: String,
|
||||
// pub secret_key: String,
|
||||
// }
|
||||
|
||||
// #[derive(Debug)]
|
||||
// pub struct BucketTargetSys {
|
||||
// targets_map: Arc<RwLock<HashMap<String, Vec<BucketTarget>>>>,
|
||||
// arn_remotes_map: Arc<Mutex<HashMap<String, ArnTarget>>>,
|
||||
// }
|
||||
|
||||
// impl BucketTargetSys {
|
||||
// pub fn new() -> Self {
|
||||
// Self {
|
||||
// targets_map: Arc::new(RwLock::new(HashMap::new())),
|
||||
// arn_remotes_map: Arc::new(Mutex::new(HashMap::new())),
|
||||
// }
|
||||
// }
|
||||
|
||||
// pub async fn set_target(
|
||||
// &self,
|
||||
// bucket: &str,
|
||||
// tgt: &BucketTarget,
|
||||
// update: bool,
|
||||
// ) -> Result<(), SetTargetError> {
|
||||
// if !tgt.target_type.is_valid() && !update {
|
||||
// return Err(SetTargetError::InvalidTargetType(bucket.to_string()));
|
||||
// }
|
||||
|
||||
// let client = self.get_remote_target_client(tgt).await?;
|
||||
|
||||
// // Validate if target credentials are OK
|
||||
// let exists = client.bucket_exists(&tgt.target_bucket).await?;
|
||||
// if !exists {
|
||||
// return Err(SetTargetError::TargetNotFound(tgt.target_bucket.clone()));
|
||||
// }
|
||||
|
||||
// if tgt.target_type == BucketTargetType::ReplicationService {
|
||||
// if !self.is_bucket_versioned(bucket).await {
|
||||
// return Err(SetTargetError::SourceNotVersioned(bucket.to_string()));
|
||||
// }
|
||||
|
||||
// let versioning_config = client.get_bucket_versioning(&tgt.target_bucket).await?;
|
||||
// if !versioning_config.is_enabled() {
|
||||
// return Err(SetTargetError::TargetNotVersioned(tgt.target_bucket.clone()));
|
||||
// }
|
||||
// }
|
||||
|
||||
// // Check if target is a MinIO server and alive
|
||||
// let hc_result = timeout(Duration::from_secs(3), client.health_check(&tgt.endpoint)).await;
|
||||
// match hc_result {
|
||||
// Ok(Ok(true)) => {} // Server is alive
|
||||
// Ok(Ok(false)) | Ok(Err(_)) | Err(_) => {
|
||||
// return Err(SetTargetError::HealthCheckFailed(tgt.target_bucket.clone()));
|
||||
// }
|
||||
// }
|
||||
|
||||
// // Lock and update target maps
|
||||
// let mut targets_map = self.targets_map.write().await;
|
||||
// let mut arn_remotes_map = self.arn_remotes_map.lock().unwrap();
|
||||
|
||||
// let targets = targets_map.entry(bucket.to_string()).or_default();
|
||||
// let mut found = false;
|
||||
|
||||
// for existing_target in targets.iter_mut() {
|
||||
// if existing_target.target_type == tgt.target_type {
|
||||
// if existing_target.arn == tgt.arn {
|
||||
// if !update {
|
||||
// return Err(SetTargetError::TargetAlreadyExists(existing_target.target_bucket.clone()));
|
||||
// }
|
||||
// *existing_target = tgt.clone();
|
||||
// found = true;
|
||||
// break;
|
||||
// }
|
||||
|
||||
// if existing_target.endpoint == tgt.endpoint {
|
||||
// return Err(SetTargetError::TargetAlreadyExists(existing_target.target_bucket.clone()));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// if !found && !update {
|
||||
// targets.push(tgt.clone());
|
||||
// }
|
||||
|
||||
// arn_remotes_map.insert(tgt.arn.clone(), ArnTarget { client });
|
||||
// self.update_bandwidth_limit(bucket, &tgt.arn, tgt.bandwidth_limit).await;
|
||||
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
// async fn get_remote_target_client(&self, tgt: &BucketTarget) -> Result<Client, SetTargetError> {
|
||||
// // Mocked implementation for obtaining a remote client
|
||||
// Ok(Client {})
|
||||
// }
|
||||
|
||||
// async fn is_bucket_versioned(&self, bucket: &str) -> bool {
|
||||
// // Mocked implementation for checking if a bucket is versioned
|
||||
// true
|
||||
// }
|
||||
|
||||
// async fn update_bandwidth_limit(
|
||||
// &self,
|
||||
// bucket: &str,
|
||||
// arn: &str,
|
||||
// limit: Option<u64>,
|
||||
// ) {
|
||||
// // Mocked implementation for updating bandwidth limits
|
||||
// }
|
||||
// }
|
||||
|
||||
// #[derive(Debug)]
|
||||
// pub struct Client;
|
||||
|
||||
// impl Client {
|
||||
// pub async fn bucket_exists(&self, _bucket: &str) -> Result<bool, SetTargetError> {
|
||||
// Ok(true) // Mocked implementation
|
||||
// }
|
||||
|
||||
// pub async fn get_bucket_versioning(
|
||||
// &self,
|
||||
// _bucket: &str,
|
||||
// ) -> Result<VersioningConfig, SetTargetError> {
|
||||
// Ok(VersioningConfig { enabled: true })
|
||||
// }
|
||||
|
||||
// pub async fn health_check(&self, _endpoint: &str) -> Result<bool, SetTargetError> {
|
||||
// Ok(true) // Mocked health check
|
||||
// }
|
||||
// }
|
||||
|
||||
// #[derive(Debug, Clone)]
|
||||
// pub struct ArnTarget {
|
||||
// pub client: Client,
|
||||
// }
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum SetTargetError {
|
||||
#[error("Invalid target type for bucket {0}")]
|
||||
InvalidTargetType(String),
|
||||
|
||||
#[error("Target bucket {0} not found")]
|
||||
TargetNotFound(String),
|
||||
|
||||
#[error("Source bucket {0} is not versioned")]
|
||||
SourceNotVersioned(String),
|
||||
|
||||
#[error("Target bucket {0} is not versioned")]
|
||||
TargetNotVersioned(String),
|
||||
|
||||
#[error("Health check failed for bucket {0}")]
|
||||
HealthCheckFailed(String),
|
||||
|
||||
#[error("Target bucket {0} already exists")]
|
||||
TargetAlreadyExists(String),
|
||||
}
|
||||
0
ecstore/src/cmd/bucketreplicationhandler.rs
Normal file
0
ecstore/src/cmd/bucketreplicationhandler.rs
Normal file
2
ecstore/src/cmd/mod.rs
Normal file
2
ecstore/src/cmd/mod.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
pub mod bucket_replication;
|
||||
pub mod bucket_targets;
|
||||
@@ -2430,7 +2430,7 @@ impl DiskAPI for LocalDisk {
|
||||
for info in obj_infos.iter() {
|
||||
let done = ScannerMetrics::time(ScannerMetric::ApplyVersion);
|
||||
let sz: usize;
|
||||
(obj_deleted, sz) = item.apply_actions(info, &size_s).await;
|
||||
(obj_deleted, sz) = item.apply_actions(info, &mut size_s).await;
|
||||
done();
|
||||
|
||||
if obj_deleted {
|
||||
|
||||
@@ -606,6 +606,28 @@ impl FileMeta {
|
||||
versions.push(fi);
|
||||
}
|
||||
|
||||
let num = versions.len();
|
||||
let mut prev_mod_time = None;
|
||||
for (i, fi) in versions.iter_mut().enumerate() {
|
||||
if i == 0 {
|
||||
fi.is_latest = true;
|
||||
} else {
|
||||
fi.successor_mod_time = prev_mod_time;
|
||||
}
|
||||
fi.num_versions = num;
|
||||
prev_mod_time = fi.mod_time;
|
||||
}
|
||||
|
||||
if versions.is_empty() {
|
||||
versions.push(FileInfo {
|
||||
name: path.to_string(),
|
||||
volume: volume.to_string(),
|
||||
deleted: true,
|
||||
is_latest: true,
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
|
||||
Ok(FileInfoVersions {
|
||||
volume: volume.to_string(),
|
||||
name: path.to_string(),
|
||||
|
||||
@@ -18,8 +18,10 @@ use super::{
|
||||
data_usage_cache::{DataUsageCache, DataUsageEntry, DataUsageHash},
|
||||
heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN},
|
||||
};
|
||||
use crate::cmd::bucket_replication::queue_replication_heal;
|
||||
use crate::{
|
||||
bucket::{versioning::VersioningApi, versioning_sys::BucketVersioningSys},
|
||||
bucket::{metadata_sys, versioning::VersioningApi, versioning_sys::BucketVersioningSys},
|
||||
cmd::bucket_replication::ReplicationStatusType,
|
||||
heal::data_usage::DATA_USAGE_ROOT,
|
||||
};
|
||||
use crate::{
|
||||
@@ -550,13 +552,114 @@ impl ScannerItem {
|
||||
Ok(object_infos)
|
||||
}
|
||||
|
||||
pub async fn apply_actions(&self, oi: &ObjectInfo, _size_s: &SizeSummary) -> (bool, usize) {
|
||||
pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, usize) {
|
||||
let done = ScannerMetrics::time(ScannerMetric::Ilm);
|
||||
//todo: lifecycle
|
||||
info!(
|
||||
"apply_actions {} {} {:?} {:?}",
|
||||
oi.bucket.clone(),
|
||||
oi.name.clone(),
|
||||
oi.version_id.clone(),
|
||||
oi.user_defined.clone()
|
||||
);
|
||||
|
||||
// Create a mutable clone if you need to modify fields
|
||||
let mut oi = oi.clone();
|
||||
oi.replication_status = ReplicationStatusType::from(
|
||||
oi.user_defined
|
||||
.as_ref()
|
||||
.and_then(|map| map.get("x-amz-bucket-replication-status"))
|
||||
.unwrap_or(&"PENDING".to_string()),
|
||||
);
|
||||
info!("apply status is: {:?}", oi.replication_status);
|
||||
self.heal_replication(&oi, _size_s).await;
|
||||
done();
|
||||
|
||||
(false, oi.size)
|
||||
}
|
||||
|
||||
pub async fn heal_replication(&mut self, oi: &ObjectInfo, size_s: &mut SizeSummary) {
|
||||
if oi.version_id.is_none() {
|
||||
error!(
|
||||
"heal_replication: no version_id or replication config {} {} {}",
|
||||
oi.bucket,
|
||||
oi.name,
|
||||
oi.version_id.is_none()
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
//let config = s3s::dto::ReplicationConfiguration{ role: todo!(), rules: todo!() };
|
||||
// Use the provided variable instead of borrowing self mutably.
|
||||
let replication = match metadata_sys::get_replication_config(&oi.bucket).await {
|
||||
Ok((replication, _)) => replication,
|
||||
Err(_) => {
|
||||
error!("heal_replication: failed to get replication config for bucket {} {}", oi.bucket, oi.name);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if replication.rules.is_empty() {
|
||||
error!("heal_replication: no replication rules for bucket {} {}", oi.bucket, oi.name);
|
||||
return;
|
||||
}
|
||||
if replication.role.is_empty() {
|
||||
// error!("heal_replication: no replication role for bucket {} {}", oi.bucket, oi.name);
|
||||
// return;
|
||||
}
|
||||
|
||||
//if oi.delete_marker || !oi.version_purge_status.is_empty() {
|
||||
if oi.delete_marker {
|
||||
error!(
|
||||
"heal_replication: delete marker or version purge status {} {} {:?} {} {:?}",
|
||||
oi.bucket, oi.name, oi.version_id, oi.delete_marker, oi.version_purge_status
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if oi.replication_status == ReplicationStatusType::Completed {
|
||||
return;
|
||||
}
|
||||
|
||||
info!("replication status is: {:?} and user define {:?}", oi.replication_status, oi.user_defined);
|
||||
|
||||
let roi = queue_replication_heal(&oi.bucket, oi, &replication, 3).await;
|
||||
|
||||
if roi.is_none() {
|
||||
info!("not need heal {} {} {:?}", oi.bucket, oi.name, oi.version_id);
|
||||
return;
|
||||
}
|
||||
|
||||
for (arn, tgt_status) in &roi.unwrap().target_statuses {
|
||||
let tgt_size_s = size_s.repl_target_stats.entry(arn.clone()).or_default();
|
||||
|
||||
match tgt_status {
|
||||
ReplicationStatusType::Pending => {
|
||||
tgt_size_s.pending_count += 1;
|
||||
tgt_size_s.pending_size += oi.size;
|
||||
size_s.pending_count += 1;
|
||||
size_s.pending_size += oi.size;
|
||||
}
|
||||
ReplicationStatusType::Failed => {
|
||||
tgt_size_s.failed_count += 1;
|
||||
tgt_size_s.failed_size += oi.size;
|
||||
size_s.failed_count += 1;
|
||||
size_s.failed_size += oi.size;
|
||||
}
|
||||
ReplicationStatusType::Completed | ReplicationStatusType::CompletedLegacy => {
|
||||
tgt_size_s.replicated_count += 1;
|
||||
tgt_size_s.replicated_size += oi.size;
|
||||
size_s.replicated_count += 1;
|
||||
size_s.replicated_size += oi.size;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
if matches!(oi.replication_status, ReplicationStatusType::Replica) {
|
||||
size_s.replica_count += 1;
|
||||
size_s.replica_size += oi.size;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
||||
@@ -3,6 +3,7 @@ pub mod bitrot;
|
||||
pub mod bucket;
|
||||
pub mod cache_value;
|
||||
mod chunk_stream;
|
||||
pub mod cmd;
|
||||
pub mod config;
|
||||
pub mod disk;
|
||||
pub mod disks_layout;
|
||||
|
||||
@@ -1,3 +1,17 @@
|
||||
use async_trait::async_trait;
|
||||
use futures::future::join_all;
|
||||
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
|
||||
use protos::node_service_time_out_client;
|
||||
use protos::proto_gen::node_service::{
|
||||
DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest,
|
||||
};
|
||||
use regex::Regex;
|
||||
use std::{collections::HashMap, fmt::Debug, sync::Arc};
|
||||
use tokio::sync::RwLock;
|
||||
use tonic::Request;
|
||||
use tracing::info;
|
||||
|
||||
use crate::bucket::metadata_sys;
|
||||
use crate::disk::error::is_all_buckets_not_found;
|
||||
use crate::disk::{DiskAPI, DiskStore};
|
||||
use crate::error::clone_err;
|
||||
@@ -15,19 +29,7 @@ use crate::{
|
||||
endpoints::{EndpointServerPools, Node},
|
||||
store_api::{BucketInfo, BucketOptions, DeleteBucketOptions, MakeBucketOptions},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use common::error::{Error, Result};
|
||||
use futures::future::join_all;
|
||||
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
|
||||
use protos::node_service_time_out_client;
|
||||
use protos::proto_gen::node_service::{
|
||||
DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest,
|
||||
};
|
||||
use regex::Regex;
|
||||
use std::{collections::HashMap, fmt::Debug, sync::Arc};
|
||||
use tokio::sync::RwLock;
|
||||
use tonic::Request;
|
||||
use tracing::info;
|
||||
|
||||
type Client = Arc<Box<dyn PeerS3Client>>;
|
||||
|
||||
@@ -430,14 +432,17 @@ impl PeerS3Client for LocalPeerS3Client {
|
||||
}
|
||||
|
||||
// TODO: reduceWriteQuorumErrs
|
||||
|
||||
// debug!("get_bucket_info errs:{:?}", errs);
|
||||
let mut versioned = false;
|
||||
if let Ok(sys) = metadata_sys::get(bucket).await {
|
||||
versioned = sys.versioning();
|
||||
}
|
||||
|
||||
ress.iter()
|
||||
.find_map(|op| {
|
||||
op.as_ref().map(|v| BucketInfo {
|
||||
name: v.name.clone(),
|
||||
created: v.created,
|
||||
versionning: versioned,
|
||||
..Default::default()
|
||||
})
|
||||
})
|
||||
@@ -496,10 +501,13 @@ pub struct RemotePeerS3Client {
|
||||
}
|
||||
|
||||
impl RemotePeerS3Client {
|
||||
fn new(node: Option<Node>, pools: Option<Vec<usize>>) -> Self {
|
||||
pub fn new(node: Option<Node>, pools: Option<Vec<usize>>) -> Self {
|
||||
let addr = node.as_ref().map(|v| v.url.to_string()).unwrap_or_default().to_string();
|
||||
Self { node, pools, addr }
|
||||
}
|
||||
pub fn get_addr(&self) -> String {
|
||||
self.addr.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::cmd::bucket_replication::{ReplicationStatusType, VersionPurgeStatusType};
|
||||
use crate::heal::heal_ops::HealSequence;
|
||||
use crate::io::FileReader;
|
||||
use crate::store_utils::clean_metadata;
|
||||
@@ -175,7 +176,6 @@ impl FileInfo {
|
||||
let content_type = meta.get("content-type").cloned();
|
||||
let content_encoding = meta.get("content-encoding").cloned();
|
||||
let etag = meta.get("etag").cloned();
|
||||
|
||||
(content_type, content_encoding, etag)
|
||||
} else {
|
||||
(None, None, None)
|
||||
@@ -686,6 +686,11 @@ pub struct ObjectInfo {
|
||||
pub inlined: bool,
|
||||
pub metadata_only: bool,
|
||||
pub version_only: bool,
|
||||
pub replication_status_internal: String,
|
||||
pub replication_status: ReplicationStatusType,
|
||||
pub version_purge_status_internal: String,
|
||||
pub version_purge_status: VersionPurgeStatusType,
|
||||
pub checksum: Vec<u8>,
|
||||
}
|
||||
|
||||
impl Clone for ObjectInfo {
|
||||
@@ -714,6 +719,11 @@ impl Clone for ObjectInfo {
|
||||
inlined: self.inlined,
|
||||
metadata_only: self.metadata_only,
|
||||
version_only: self.version_only,
|
||||
replication_status_internal: self.replication_status_internal.clone(),
|
||||
replication_status: self.replication_status.clone(),
|
||||
version_purge_status_internal: self.version_purge_status_internal.clone(),
|
||||
version_purge_status: self.version_purge_status.clone(),
|
||||
checksum: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub const AMZ_OBJECT_TAGGING: &str = "X-Amz-Tagging";
|
||||
pub const AMZ_BUCKET_REPLICATION_STATUS: &str = "X-Amz-Replication-Status";
|
||||
pub const AMZ_STORAGE_CLASS: &str = "x-amz-storage-class";
|
||||
pub const AMZ_DECODED_CONTENT_LENGTH: &str = "X-Amz-Decoded-Content-Length";
|
||||
|
||||
@@ -22,6 +22,7 @@ api = { workspace = true }
|
||||
appauth = { workspace = true }
|
||||
atoi = { workspace = true }
|
||||
atomic_enum = { workspace = true }
|
||||
aws-sdk-s3 = { workspace = true }
|
||||
axum.workspace = true
|
||||
axum-extra = { workspace = true }
|
||||
axum-server = { workspace = true }
|
||||
@@ -43,14 +44,18 @@ hyper-util.workspace = true
|
||||
http.workspace = true
|
||||
http-body.workspace = true
|
||||
iam = { workspace = true }
|
||||
include_dir = { workspace = true }
|
||||
jsonwebtoken = { workspace = true }
|
||||
lock.workspace = true
|
||||
matchit = { workspace = true }
|
||||
mime.workspace = true
|
||||
mime_guess = { workspace = true }
|
||||
opentelemetry = { workspace = true }
|
||||
percent-encoding = { workspace = true }
|
||||
pin-project-lite.workspace = true
|
||||
protos.workspace = true
|
||||
query = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
rmp-serde.workspace = true
|
||||
rustfs-config = { workspace = true }
|
||||
rustfs-event-notifier = { workspace = true }
|
||||
@@ -64,6 +69,7 @@ serde_json.workspace = true
|
||||
serde_urlencoded = { workspace = true }
|
||||
shadow-rs = { workspace = true, features = ["build", "metadata"] }
|
||||
socket2 = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing.workspace = true
|
||||
time = { workspace = true, features = ["parsing", "formatting", "serde"] }
|
||||
tokio-util.workspace = true
|
||||
@@ -85,6 +91,7 @@ tower-http = { workspace = true, features = [
|
||||
"compression-gzip",
|
||||
"cors",
|
||||
] }
|
||||
urlencoding = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
|
||||
@@ -2,10 +2,15 @@ use super::router::Operation;
|
||||
use crate::auth::check_key_valid;
|
||||
use crate::auth::get_condition_values;
|
||||
use crate::auth::get_session_token;
|
||||
//use ecstore::error::Error as ec_Error;
|
||||
use crate::storage::error::to_s3_error;
|
||||
use bytes::Bytes;
|
||||
use common::error::Error as ec_Error;
|
||||
use ecstore::admin_server_info::get_server_info;
|
||||
use ecstore::bucket::metadata_sys::{self, get_replication_config};
|
||||
use ecstore::bucket::target::BucketTarget;
|
||||
use ecstore::bucket::versioning_sys::BucketVersioningSys;
|
||||
use ecstore::cmd::bucket_targets::{self, GLOBAL_Bucket_Target_Sys};
|
||||
use ecstore::global::GLOBAL_ALlHealState;
|
||||
use ecstore::heal::data_usage::load_data_usage_from_backend;
|
||||
use ecstore::heal::heal_commands::HealOpts;
|
||||
@@ -23,9 +28,11 @@ use http::{HeaderMap, Uri};
|
||||
use hyper::StatusCode;
|
||||
use iam::get_global_action_cred;
|
||||
use iam::store::MappedPolicy;
|
||||
// use lazy_static::lazy_static;
|
||||
use madmin::metrics::RealtimeMetrics;
|
||||
use madmin::utils::parse_duration;
|
||||
use matchit::Params;
|
||||
use percent_encoding::{percent_encode, AsciiSet, CONTROLS};
|
||||
use policy::policy::action::Action;
|
||||
use policy::policy::action::S3Action;
|
||||
use policy::policy::default::DEFAULT_POLICIES;
|
||||
@@ -36,6 +43,7 @@ use s3s::stream::{ByteStream, DynByteStream};
|
||||
use s3s::{s3_error, Body, S3Error, S3Request, S3Response, S3Result};
|
||||
use s3s::{S3ErrorCode, StdError};
|
||||
use serde::{Deserialize, Serialize};
|
||||
// use serde_json::to_vec;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
@@ -47,6 +55,7 @@ use tokio::time::interval;
|
||||
use tokio::{select, spawn};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tracing::{error, info, warn};
|
||||
// use url::UrlQuery;
|
||||
|
||||
pub mod event;
|
||||
pub mod group;
|
||||
@@ -57,6 +66,7 @@ pub mod service_account;
|
||||
pub mod sts;
|
||||
pub mod trace;
|
||||
pub mod user;
|
||||
use urlencoding::decode;
|
||||
|
||||
#[derive(Debug, Serialize, Default)]
|
||||
#[serde(rename_all = "PascalCase", default)]
|
||||
@@ -745,6 +755,278 @@ impl Operation for BackgroundHealStatusHandler {
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_query_params(uri: &Uri) -> HashMap<String, String> {
|
||||
let mut params = HashMap::new();
|
||||
|
||||
if let Some(query) = uri.query() {
|
||||
query.split('&').for_each(|pair| {
|
||||
if let Some((key, value)) = pair.split_once('=') {
|
||||
params.insert(key.to_string(), value.to_string());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
params
|
||||
}
|
||||
|
||||
//disable encrypto from client because minio use len 8 Nonce but rustfs use 12 len Nonce
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn is_local_host(_host: String) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
//awscurl --service s3 --region us-east-1 --access_key rustfsadmin --secret_key rustfsadmin "http://:9000/minio/admin/v3/replicationmetrics?bucket=1"
|
||||
pub struct GetReplicationMetricsHandler {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for GetReplicationMetricsHandler {
|
||||
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
error!("GetReplicationMetricsHandler");
|
||||
let querys = extract_query_params(&_req.uri);
|
||||
if let Some(bucket) = querys.get("bucket") {
|
||||
error!("get bucket:{} metris", bucket);
|
||||
}
|
||||
//return Err(s3_error!(InvalidArgument, "Invalid bucket name"));
|
||||
//Ok(S3Response::with_headers((StatusCode::OK, Body::from()), header))
|
||||
return Ok(S3Response::new((StatusCode::OK, Body::from("Ok".to_string()))));
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SetRemoteTargetHandler {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for SetRemoteTargetHandler {
|
||||
async fn call(&self, mut _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
//return Ok(S3Response::new((StatusCode::OK, Body::from("OK".to_string()))));
|
||||
// println!("handle MetricsHandler, params: {:?}", _req.input);
|
||||
info!("handle MetricsHandler, params: {:?}", _req.credentials);
|
||||
let querys = extract_query_params(&_req.uri);
|
||||
let Some(_cred) = _req.credentials else {
|
||||
error!("credentials null");
|
||||
return Err(s3_error!(InvalidRequest, "get cred failed"));
|
||||
};
|
||||
let _is_owner = true; // 先按 true 处理,后期根据请求决定
|
||||
let body = _req.input.store_all_unlimited().await.unwrap();
|
||||
//println!("body: {}", std::str::from_utf8(&body.clone()).unwrap());
|
||||
|
||||
//println!("bucket is:{}", bucket.clone());
|
||||
if let Some(bucket) = querys.get("bucket") {
|
||||
if bucket.is_empty() {
|
||||
println!("have bucket: {}", bucket);
|
||||
return Ok(S3Response::new((StatusCode::OK, Body::from("fuck".to_string()))));
|
||||
}
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
// let binfo:BucketInfo = store
|
||||
// .get_bucket_info(bucket, &ecstore::store_api::BucketOptions::default()).await;
|
||||
match store
|
||||
.get_bucket_info(bucket, &ecstore::store_api::BucketOptions::default())
|
||||
.await
|
||||
{
|
||||
Ok(info) => {
|
||||
println!("Bucket Info: {:?}", info);
|
||||
if !info.versionning {
|
||||
return Ok(S3Response::new((StatusCode::FORBIDDEN, Body::from("bucket need versioned".to_string()))));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
eprintln!("Error: {:?}", err);
|
||||
return Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::from("empty bucket".to_string()))));
|
||||
}
|
||||
}
|
||||
|
||||
let mut remote_target: BucketTarget = serde_json::from_slice(&body).map_err(|arg0| to_s3_error(arg0.into()))?; // 错误会被传播
|
||||
remote_target.source_bucket = bucket.clone();
|
||||
|
||||
info!("remote target {} And arn is:", remote_target.source_bucket.clone());
|
||||
|
||||
if let Some(val) = remote_target.arn.clone() {
|
||||
info!("arn is {}", val);
|
||||
}
|
||||
|
||||
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
|
||||
let (arn, exist) = sys.get_remote_arn(bucket, Some(&remote_target), "").await;
|
||||
info!("exist: {} {}", exist, arn.clone().unwrap_or_default());
|
||||
if exist && arn.is_some() {
|
||||
let jsonarn = serde_json::to_string(&arn).expect("failed to serialize");
|
||||
//Ok(S3Response::new)
|
||||
return Ok(S3Response::new((StatusCode::OK, Body::from(jsonarn))));
|
||||
} else {
|
||||
remote_target.arn = arn;
|
||||
match sys.set_target(bucket, &remote_target, false, false).await {
|
||||
Ok(_) => {
|
||||
{
|
||||
//todo 各种持久化的工作
|
||||
let targets = sys.list_targets(Some(bucket), None).await;
|
||||
info!("targets is {}", targets.len());
|
||||
match serde_json::to_vec(&targets) {
|
||||
Ok(json) => {
|
||||
//println!("json is:{:?}", json.clone().to_ascii_lowercase());
|
||||
//metadata_sys::GLOBAL_BucketMetadataSys::
|
||||
//BUCKET_TARGETS_FILE: &str = "bucket-targets.json"
|
||||
let _ = metadata_sys::update(bucket, "bucket-targets.json", json).await;
|
||||
// if let Err(err) = metadata_sys::GLOBAL_BucketMetadataSys.get().
|
||||
// .update(ctx, bucket, "bucketTargetsFile", tgt_bytes)
|
||||
// .await
|
||||
// {
|
||||
// write_error_response(ctx, &err)?;
|
||||
// return Err(err);
|
||||
// }
|
||||
}
|
||||
Err(e) => {
|
||||
error!("序列化失败{}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let jsonarn = serde_json::to_string(&remote_target.arn.clone()).expect("failed to serialize");
|
||||
return Ok(S3Response::new((StatusCode::OK, Body::from(jsonarn))));
|
||||
}
|
||||
Err(e) => {
|
||||
error!("set target error {}", e);
|
||||
return Ok(S3Response::new((
|
||||
StatusCode::BAD_REQUEST,
|
||||
Body::from("remote target not ready".to_string()),
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!("GLOBAL_BUCKET _TARGET_SYS is not initialized");
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::InternalError,
|
||||
"GLOBAL_BUCKET_TARGET_SYS is not initialized".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
// return Err(s3_error!(InvalidArgument));
|
||||
return Ok(S3Response::new((StatusCode::OK, Body::from("Ok".to_string()))));
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ListRemoteTargetHandler {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for ListRemoteTargetHandler {
|
||||
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
warn!("list GetRemoteTargetHandler, params: {:?}", _req.credentials);
|
||||
|
||||
let querys = extract_query_params(&_req.uri);
|
||||
let Some(_cred) = _req.credentials else {
|
||||
error!("credentials null");
|
||||
return Err(s3_error!(InvalidRequest, "get cred failed"));
|
||||
};
|
||||
|
||||
if let Some(bucket) = querys.get("bucket") {
|
||||
if bucket.is_empty() {
|
||||
error!("bucket parameter is empty");
|
||||
return Ok(S3Response::new((
|
||||
StatusCode::BAD_REQUEST,
|
||||
Body::from("Bucket parameter is required".to_string()),
|
||||
)));
|
||||
}
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not initialized".to_string()));
|
||||
};
|
||||
|
||||
match store
|
||||
.get_bucket_info(bucket, &ecstore::store_api::BucketOptions::default())
|
||||
.await
|
||||
{
|
||||
Ok(info) => {
|
||||
println!("Bucket Info: {:?}", info);
|
||||
if !info.versionning {
|
||||
return Ok(S3Response::new((
|
||||
StatusCode::FORBIDDEN,
|
||||
Body::from("Bucket needs versioning".to_string()),
|
||||
)));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
eprintln!("Error fetching bucket info: {:?}", err);
|
||||
return Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::from("Invalid bucket".to_string()))));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
|
||||
let targets = sys.list_targets(Some(bucket), None).await;
|
||||
error!("target sys len {}", targets.len());
|
||||
if targets.is_empty() {
|
||||
return Ok(S3Response::new((
|
||||
StatusCode::NOT_FOUND,
|
||||
Body::from("No remote targets found".to_string()),
|
||||
)));
|
||||
}
|
||||
|
||||
let json_targets = serde_json::to_string(&targets).map_err(|e| {
|
||||
error!("Serialization error: {}", e);
|
||||
S3Error::with_message(S3ErrorCode::InternalError, "Failed to serialize targets".to_string())
|
||||
})?;
|
||||
|
||||
return Ok(S3Response::new((StatusCode::OK, Body::from(json_targets))));
|
||||
} else {
|
||||
println!("GLOBAL_BUCKET_TARGET_SYS is not initialized");
|
||||
return Err(S3Error::with_message(
|
||||
S3ErrorCode::InternalError,
|
||||
"GLOBAL_BUCKET_TARGET_SYS is not initialized".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
println!("Bucket parameter missing in request");
|
||||
Ok(S3Response::new((
|
||||
StatusCode::BAD_REQUEST,
|
||||
Body::from("Bucket parameter is required".to_string()),
|
||||
)))
|
||||
//return Err(s3_error!(NotImplemented));
|
||||
}
|
||||
}
|
||||
const COLON: AsciiSet = CONTROLS.add(b':');
|
||||
pub struct RemoveRemoteTargetHandler {}
|
||||
#[async_trait::async_trait]
|
||||
impl Operation for RemoveRemoteTargetHandler {
|
||||
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
|
||||
error!("remove remote target called");
|
||||
let querys = extract_query_params(&_req.uri);
|
||||
|
||||
if let Some(arnstr) = querys.get("arn") {
|
||||
if let Some(bucket) = querys.get("bucket") {
|
||||
if bucket.is_empty() {
|
||||
error!("bucket parameter is empty");
|
||||
return Ok(S3Response::new((StatusCode::NOT_FOUND, Body::from("bucket not found".to_string()))));
|
||||
}
|
||||
let _arn = bucket_targets::ARN::parse(arnstr);
|
||||
|
||||
match get_replication_config(bucket).await {
|
||||
Ok((conf, _ts)) => {
|
||||
for ru in conf.rules {
|
||||
let encoded = percent_encode(ru.destination.bucket.as_bytes(), &COLON);
|
||||
let encoded_str = encoded.to_string();
|
||||
if *arnstr == encoded_str {
|
||||
error!("target in use");
|
||||
return Ok(S3Response::new((StatusCode::FORBIDDEN, Body::from("Ok".to_string()))));
|
||||
}
|
||||
info!("bucket: {} and arn str is {} ", encoded_str, arnstr);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("get replication config err: {}", err);
|
||||
return Ok(S3Response::new((StatusCode::NOT_FOUND, Body::from(err.to_string()))));
|
||||
}
|
||||
}
|
||||
//percent_decode_str(&arnstr);
|
||||
let decoded_str = decode(arnstr).unwrap();
|
||||
error!("need delete target is {}", decoded_str);
|
||||
bucket_targets::remove_bucket_target(bucket, arnstr).await;
|
||||
}
|
||||
}
|
||||
//return Err(s3_error!(InvalidArgument, "Invalid bucket name"));
|
||||
//Ok(S3Response::with_headers((StatusCode::OK, Body::from()), header))
|
||||
return Ok(S3Response::new((StatusCode::OK, Body::from("Ok".to_string()))));
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use ecstore::heal::heal_commands::HealOpts;
|
||||
|
||||
@@ -11,12 +11,14 @@ use handlers::{
|
||||
sts, user,
|
||||
};
|
||||
|
||||
use handlers::{GetReplicationMetricsHandler, ListRemoteTargetHandler, RemoveRemoteTargetHandler, SetRemoteTargetHandler};
|
||||
use hyper::Method;
|
||||
use router::{AdminOperation, S3Router};
|
||||
use rpc::regist_rpc_route;
|
||||
use s3s::route::S3Route;
|
||||
|
||||
const ADMIN_PREFIX: &str = "/rustfs/admin";
|
||||
const MINIO_ADMIN_PREFIX: &str = "/minio/admin";
|
||||
|
||||
pub fn make_admin_route() -> Result<impl S3Route> {
|
||||
let mut r: S3Router<AdminOperation> = S3Router::new();
|
||||
@@ -228,6 +230,53 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> Result<()> {
|
||||
AdminOperation(&AddServiceAccount {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", MINIO_ADMIN_PREFIX, "/v3/list-remote-targets").as_str(),
|
||||
AdminOperation(&ListRemoteTargetHandler {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/list-remote-targets").as_str(),
|
||||
AdminOperation(&ListRemoteTargetHandler {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", MINIO_ADMIN_PREFIX, "/v3/replicationmetrics").as_str(),
|
||||
AdminOperation(&GetReplicationMetricsHandler {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::GET,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/replicationmetrics").as_str(),
|
||||
AdminOperation(&GetReplicationMetricsHandler {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::PUT,
|
||||
format!("{}{}", MINIO_ADMIN_PREFIX, "/v3/set-remote-target").as_str(),
|
||||
AdminOperation(&SetRemoteTargetHandler {}),
|
||||
)?;
|
||||
r.insert(
|
||||
Method::PUT,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/set-remote-target").as_str(),
|
||||
AdminOperation(&SetRemoteTargetHandler {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::DELETE,
|
||||
format!("{}{}", MINIO_ADMIN_PREFIX, "/v3/remove-remote-target").as_str(),
|
||||
AdminOperation(&RemoveRemoteTargetHandler {}),
|
||||
)?;
|
||||
|
||||
r.insert(
|
||||
Method::DELETE,
|
||||
format!("{}{}", ADMIN_PREFIX, "/v3/remove-remote-target").as_str(),
|
||||
AdminOperation(&RemoveRemoteTargetHandler {}),
|
||||
)?;
|
||||
|
||||
// list-canned-policies?bucket=xxx
|
||||
r.insert(
|
||||
Method::GET,
|
||||
|
||||
@@ -16,6 +16,7 @@ use s3s::S3Result;
|
||||
|
||||
use super::rpc::RPC_PREFIX;
|
||||
use super::ADMIN_PREFIX;
|
||||
use super::MINIO_ADMIN_PREFIX;
|
||||
|
||||
pub struct S3Router<T> {
|
||||
router: Router<T>,
|
||||
@@ -64,7 +65,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX)
|
||||
uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX) || uri.path().starts_with(MINIO_ADMIN_PREFIX)
|
||||
}
|
||||
|
||||
async fn call(&self, req: S3Request<Body>) -> S3Result<S3Response<Body>> {
|
||||
|
||||
@@ -240,6 +240,8 @@ impl Node for NodeService {
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
println!("bucket info {}", bucket_info.clone());
|
||||
Ok(tonic::Response::new(GetBucketInfoResponse {
|
||||
success: true,
|
||||
bucket_info,
|
||||
@@ -247,6 +249,7 @@ impl Node for NodeService {
|
||||
}))
|
||||
}
|
||||
|
||||
// println!("vuc")
|
||||
Err(err) => Ok(tonic::Response::new(GetBucketInfoResponse {
|
||||
success: false,
|
||||
bucket_info: String::new(),
|
||||
|
||||
@@ -23,6 +23,7 @@ use common::{
|
||||
};
|
||||
use crypto::init_kms;
|
||||
use ecstore::bucket::metadata_sys::init_bucket_metadata_sys;
|
||||
use ecstore::cmd::bucket_replication::init_bucket_replication_pool;
|
||||
use ecstore::config as ecconfig;
|
||||
use ecstore::config::GLOBAL_ConfigSys;
|
||||
use ecstore::heal::background_heal_ops::init_auto_heal;
|
||||
@@ -544,6 +545,11 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
|
||||
init_console_cfg(local_ip, server_port);
|
||||
|
||||
print_server_info();
|
||||
init_bucket_replication_pool().await;
|
||||
|
||||
init_console_cfg(local_ip, server_port);
|
||||
|
||||
print_server_info();
|
||||
|
||||
if opt.console_enable {
|
||||
|
||||
@@ -4,14 +4,12 @@ use super::options::extract_metadata;
|
||||
use super::options::put_opts;
|
||||
use crate::auth::get_condition_values;
|
||||
use crate::storage::access::ReqInfo;
|
||||
use crate::storage::error::to_s3_error;
|
||||
use crate::storage::options::copy_dst_opts;
|
||||
use crate::storage::options::copy_src_opts;
|
||||
use crate::storage::options::{extract_metadata_from_mime, get_opts};
|
||||
use api::query::Context;
|
||||
use api::query::Query;
|
||||
use api::server::dbms::DatabaseManagerSystem;
|
||||
use bytes::Bytes;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use common::error::Result;
|
||||
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
|
||||
use datafusion::arrow::json::writer::JsonArray;
|
||||
@@ -30,6 +28,9 @@ use ecstore::bucket::policy_sys::PolicySys;
|
||||
use ecstore::bucket::tagging::decode_tags;
|
||||
use ecstore::bucket::tagging::encode_tags;
|
||||
use ecstore::bucket::versioning_sys::BucketVersioningSys;
|
||||
use ecstore::cmd::bucket_replication::get_must_replicate_options;
|
||||
use ecstore::cmd::bucket_replication::must_replicate;
|
||||
use ecstore::cmd::bucket_replication::schedule_replication;
|
||||
use ecstore::io::READ_BUFFER_SIZE;
|
||||
use ecstore::new_object_layer_fn;
|
||||
use ecstore::store_api::BucketOptions;
|
||||
@@ -43,6 +44,7 @@ use ecstore::store_api::ObjectOptions;
|
||||
use ecstore::store_api::ObjectToDelete;
|
||||
use ecstore::store_api::PutObjReader;
|
||||
use ecstore::store_api::StorageAPI;
|
||||
// use ecstore::store_api::RESERVED_METADATA_PREFIX;
|
||||
use ecstore::store_api::RESERVED_METADATA_PREFIX_LOWER;
|
||||
use ecstore::utils::path::path_join_buf;
|
||||
use ecstore::utils::xml;
|
||||
@@ -85,6 +87,13 @@ use tracing::warn;
|
||||
use transform_stream::AsyncTryStream;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::storage::error::to_s3_error;
|
||||
use crate::storage::options::copy_dst_opts;
|
||||
use crate::storage::options::copy_src_opts;
|
||||
use crate::storage::options::{extract_metadata_from_mime, get_opts};
|
||||
use ecstore::cmd::bucket_replication::ReplicationStatusType;
|
||||
use ecstore::cmd::bucket_replication::ReplicationType;
|
||||
|
||||
macro_rules! try_ {
|
||||
($result:expr) => {
|
||||
match $result {
|
||||
@@ -924,6 +933,7 @@ impl S3 for FS {
|
||||
return self.put_object_extract(req).await;
|
||||
}
|
||||
|
||||
info!("put object");
|
||||
let input = req.input;
|
||||
|
||||
if let Some(ref storage_class) = input.storage_class {
|
||||
@@ -976,10 +986,27 @@ impl S3 for FS {
|
||||
metadata.insert(xhttp::AMZ_OBJECT_TAGGING.to_owned(), tags);
|
||||
}
|
||||
|
||||
let opts: ObjectOptions = put_opts(&bucket, &key, version_id, &req.headers, Some(metadata))
|
||||
let mt = metadata.clone();
|
||||
let mt2 = metadata.clone();
|
||||
|
||||
let opts: ObjectOptions = put_opts(&bucket, &key, version_id, &req.headers, Some(mt))
|
||||
.await
|
||||
.map_err(to_s3_error)?;
|
||||
|
||||
let repoptions =
|
||||
get_must_replicate_options(&mt2, "", ReplicationStatusType::Unknown, ReplicationType::ObjectReplicationType, &opts);
|
||||
|
||||
let dsc = must_replicate(&bucket, &key, &repoptions).await;
|
||||
warn!("dsc {}", &dsc.replicate_any().clone());
|
||||
if dsc.replicate_any() {
|
||||
let k = format!("{}{}", RESERVED_METADATA_PREFIX_LOWER, "replication-timestamp");
|
||||
let now: DateTime<Utc> = Utc::now();
|
||||
let formatted_time = now.to_rfc3339();
|
||||
metadata.insert(k, formatted_time);
|
||||
let k = format!("{}{}", RESERVED_METADATA_PREFIX_LOWER, "replication-status");
|
||||
metadata.insert(k, dsc.pending_status());
|
||||
}
|
||||
|
||||
debug!("put_object opts {:?}", &opts);
|
||||
|
||||
let obj_info = store
|
||||
@@ -987,9 +1014,17 @@ impl S3 for FS {
|
||||
.await
|
||||
.map_err(to_s3_error)?;
|
||||
|
||||
let e_tag = obj_info.etag;
|
||||
let e_tag = obj_info.etag.clone();
|
||||
|
||||
// store.put_object(bucket, object, data, opts);
|
||||
let repoptions =
|
||||
get_must_replicate_options(&mt2, "", ReplicationStatusType::Unknown, ReplicationType::ObjectReplicationType, &opts);
|
||||
|
||||
let dsc = must_replicate(&bucket, &key, &repoptions).await;
|
||||
|
||||
if dsc.replicate_any() {
|
||||
let objectlayer = new_object_layer_fn();
|
||||
schedule_replication(obj_info, objectlayer.unwrap(), dsc, 1).await;
|
||||
}
|
||||
|
||||
let output = PutObjectOutput {
|
||||
e_tag,
|
||||
@@ -1152,17 +1187,30 @@ impl S3 for FS {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
let oi = store
|
||||
let obj_info = store
|
||||
.complete_multipart_upload(&bucket, &key, &upload_id, uploaded_parts, opts)
|
||||
.await
|
||||
.map_err(to_s3_error)?;
|
||||
|
||||
let output = CompleteMultipartUploadOutput {
|
||||
bucket: Some(bucket),
|
||||
key: Some(key),
|
||||
e_tag: oi.etag,
|
||||
bucket: Some(bucket.clone()),
|
||||
key: Some(key.clone()),
|
||||
e_tag: obj_info.etag.clone(),
|
||||
location: Some("us-east-1".to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mt2 = HashMap::new();
|
||||
let repoptions =
|
||||
get_must_replicate_options(&mt2, "", ReplicationStatusType::Unknown, ReplicationType::ObjectReplicationType, opts);
|
||||
|
||||
let dsc = must_replicate(&bucket, &key, &repoptions).await;
|
||||
|
||||
if dsc.replicate_any() {
|
||||
warn!("need multipart replication");
|
||||
let objectlayer = new_object_layer_fn();
|
||||
schedule_replication(obj_info, objectlayer.unwrap(), dsc, 1).await;
|
||||
}
|
||||
Ok(S3Response::new(output))
|
||||
}
|
||||
|
||||
@@ -1725,17 +1773,35 @@ impl S3 for FS {
|
||||
.await
|
||||
.map_err(to_s3_error)?;
|
||||
|
||||
let replication_configuration = match metadata_sys::get_replication_config(&bucket).await {
|
||||
let rcfg = match metadata_sys::get_replication_config(&bucket).await {
|
||||
Ok((cfg, _created)) => Some(cfg),
|
||||
Err(err) => {
|
||||
warn!("get_object_lock_config err {:?}", err);
|
||||
None
|
||||
error!("get_replication_config err {:?}", err);
|
||||
return Err(to_s3_error(err));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(S3Response::new(GetBucketReplicationOutput {
|
||||
replication_configuration,
|
||||
}))
|
||||
if rcfg.is_none() {
|
||||
return Err(S3Error::with_message(S3ErrorCode::NoSuchBucket, "replication not found".to_string()));
|
||||
}
|
||||
|
||||
// Ok(S3Response::new(GetBucketReplicationOutput {
|
||||
// replication_configuration: rcfg,
|
||||
// }))
|
||||
|
||||
if rcfg.is_some() {
|
||||
Ok(S3Response::new(GetBucketReplicationOutput {
|
||||
replication_configuration: rcfg,
|
||||
}))
|
||||
} else {
|
||||
let rep = ReplicationConfiguration {
|
||||
role: "".to_string(),
|
||||
rules: vec![],
|
||||
};
|
||||
Ok(S3Response::new(GetBucketReplicationOutput {
|
||||
replication_configuration: Some(rep),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
async fn put_bucket_replication(
|
||||
@@ -1747,6 +1813,7 @@ impl S3 for FS {
|
||||
replication_configuration,
|
||||
..
|
||||
} = req.input;
|
||||
warn!("put bucket replication");
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
@@ -1786,6 +1853,7 @@ impl S3 for FS {
|
||||
.map_err(to_s3_error)?;
|
||||
|
||||
// TODO: remove targets
|
||||
error!("delete bucket");
|
||||
|
||||
Ok(S3Response::new(DeleteBucketReplicationOutput::default()))
|
||||
}
|
||||
@@ -2105,13 +2173,15 @@ impl S3 for FS {
|
||||
None
|
||||
};
|
||||
|
||||
if legal_hold.is_none() {
|
||||
return Err(s3_error!(InvalidRequest, "Object does not have legal hold"));
|
||||
}
|
||||
let status = if let Some(v) = legal_hold {
|
||||
v
|
||||
} else {
|
||||
ObjectLockLegalHoldStatus::OFF.to_string()
|
||||
};
|
||||
|
||||
Ok(S3Response::new(GetObjectLegalHoldOutput {
|
||||
legal_hold: Some(ObjectLockLegalHold {
|
||||
status: Some(ObjectLockLegalHoldStatus::from(legal_hold.unwrap_or_default())),
|
||||
status: Some(ObjectLockLegalHoldStatus::from(status)),
|
||||
}),
|
||||
}))
|
||||
}
|
||||
@@ -2173,6 +2243,103 @@ impl S3 for FS {
|
||||
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn get_object_retention(
|
||||
&self,
|
||||
req: S3Request<GetObjectRetentionInput>,
|
||||
) -> S3Result<S3Response<GetObjectRetentionOutput>> {
|
||||
let GetObjectRetentionInput {
|
||||
bucket, key, version_id, ..
|
||||
} = req.input;
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
// check object lock
|
||||
let _ = metadata_sys::get_object_lock_config(&bucket).await.map_err(to_s3_error)?;
|
||||
|
||||
let opts: ObjectOptions = get_opts(&bucket, &key, version_id, None, &req.headers)
|
||||
.await
|
||||
.map_err(to_s3_error)?;
|
||||
|
||||
let object_info = store.get_object_info(&bucket, &key, &opts).await.map_err(|e| {
|
||||
error!("get_object_info failed, {}", e.to_string());
|
||||
s3_error!(InternalError, "{}", e.to_string())
|
||||
})?;
|
||||
|
||||
let mode = if let Some(ref ud) = object_info.user_defined {
|
||||
ud.get("x-amz-object-lock-mode")
|
||||
.map(|v| ObjectLockRetentionMode::from(v.as_str().to_string()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let retain_until_date = if let Some(ref ud) = object_info.user_defined {
|
||||
ud.get("x-amz-object-lock-retain-until-date")
|
||||
.and_then(|v| OffsetDateTime::parse(v.as_str(), &Rfc3339).ok())
|
||||
.map(Timestamp::from)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(S3Response::new(GetObjectRetentionOutput {
|
||||
retention: Some(ObjectLockRetention { mode, retain_until_date }),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn put_object_retention(
|
||||
&self,
|
||||
req: S3Request<PutObjectRetentionInput>,
|
||||
) -> S3Result<S3Response<PutObjectRetentionOutput>> {
|
||||
let PutObjectRetentionInput {
|
||||
bucket,
|
||||
key,
|
||||
retention,
|
||||
version_id,
|
||||
..
|
||||
} = req.input;
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
// check object lock
|
||||
let _ = metadata_sys::get_object_lock_config(&bucket).await.map_err(to_s3_error)?;
|
||||
|
||||
// TODO: check allow
|
||||
|
||||
let mut eval_metadata = HashMap::new();
|
||||
|
||||
if let Some(v) = retention {
|
||||
let mode = v.mode.map(|v| v.as_str().to_string()).unwrap_or_default();
|
||||
let retain_until_date = v
|
||||
.retain_until_date
|
||||
.map(|v| OffsetDateTime::from(v).format(&Rfc3339).unwrap())
|
||||
.unwrap_or_default();
|
||||
let now = OffsetDateTime::now_utc();
|
||||
eval_metadata.insert("x-amz-object-lock-mode".to_string(), mode);
|
||||
eval_metadata.insert("x-amz-object-lock-retain-until-date".to_string(), retain_until_date);
|
||||
eval_metadata.insert(
|
||||
format!("{}{}", RESERVED_METADATA_PREFIX_LOWER, "objectlock-retention-timestamp"),
|
||||
format!("{}.{:09}Z", now.format(&Rfc3339).unwrap(), now.nanosecond()),
|
||||
);
|
||||
}
|
||||
|
||||
let mut opts: ObjectOptions = get_opts(&bucket, &key, version_id, None, &req.headers)
|
||||
.await
|
||||
.map_err(to_s3_error)?;
|
||||
opts.eval_metadata = Some(eval_metadata);
|
||||
|
||||
store.put_object_metadata(&bucket, &key, &opts).await.map_err(|e| {
|
||||
error!("put_object_metadata failed, {}", e.to_string());
|
||||
s3_error!(InternalError, "{}", e.to_string())
|
||||
})?;
|
||||
|
||||
Ok(S3Response::new(PutObjectRetentionOutput {
|
||||
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use common::error::Error;
|
||||
use ecstore::{disk::error::is_err_file_not_found, store_err::StorageError};
|
||||
use ecstore::{bucket::error::BucketMetadataError, disk::error::is_err_file_not_found, store_err::StorageError};
|
||||
use s3s::{s3_error, S3Error, S3ErrorCode};
|
||||
pub fn to_s3_error(err: Error) -> S3Error {
|
||||
if let Some(storage_err) = err.downcast_ref::<StorageError>() {
|
||||
@@ -80,6 +80,15 @@ pub fn to_s3_error(err: Error) -> S3Error {
|
||||
StorageError::DoneForNow => s3_error!(InternalError, "DoneForNow"),
|
||||
};
|
||||
}
|
||||
//需要添加 not found bucket replication config
|
||||
if let Some(meta_err) = err.downcast_ref::<BucketMetadataError>() {
|
||||
return match meta_err {
|
||||
BucketMetadataError::BucketReplicationConfigNotFound => {
|
||||
S3Error::with_message(S3ErrorCode::ReplicationConfigurationNotFoundError, format!("{}", err))
|
||||
}
|
||||
_ => S3Error::with_message(S3ErrorCode::InternalError, format!("{}", err)), // 处理其他情况
|
||||
};
|
||||
}
|
||||
|
||||
if is_err_file_not_found(&err) {
|
||||
return S3Error::with_message(S3ErrorCode::NoSuchKey, format!(" ec err {}", err));
|
||||
|
||||
@@ -11,7 +11,7 @@ datafusion = { workspace = true }
|
||||
derive_builder = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
lazy_static = { workspace = true }
|
||||
parking_lot = { version = "0.12.3" }
|
||||
parking_lot = { workspace = true }
|
||||
s3s.workspace = true
|
||||
snafu = { workspace = true, features = ["backtrace"] }
|
||||
tokio = { workspace = true }
|
||||
|
||||
Reference in New Issue
Block a user