bucket replication

This commit is contained in:
lygn128
2025-06-05 10:00:00 +00:00
parent b52198d7f2
commit 5850c3e8a3
36 changed files with 4748 additions and 11146 deletions

2
.gitignore vendored
View File

@@ -18,4 +18,4 @@ deploy/certs/*
.rustfs.sys
.cargo
profile.json
.docker/openobserve-otel/data
.docker/openobserve-otel/data

11059
Cargo.lock generated

File diff suppressed because it is too large Load Diff

45
bucket_replicate_test.md Normal file
View File

@@ -0,0 +1,45 @@
启动两个rustfs
rustfs --address 0.0.0.0:9000 /rustfs-data9000
rustfs --address 0.0.0.0:9001 /rustfs-data9001
### 使用 minio mc 设置 alias 分别为 rustfs 和 rustfs2
### 创建 bucket
mc mb rustfs/srcbucket
### 创建 desc bucket
mc mb rustfs2/destbucket
### 开启版本控制
mc version enable rustfs/srcbucket
mc version enable rustfs2/destbucket
#### 使用修改过的 mc 才能 add bucket replication
./mc replication add rustfs/srcbucket --remote-bucket rustfs2/destbucket
###### 复制一个小文件;
mc cp ./1.txt rustfs/srcbucket
###### 查看是否成功
mc ls --versions rustfs/srcbucket/1.txt
mc ls --versions rustfs/destbucket/1.txt
##### 复制一个大文件
1 创建一个大文件
dd if=/dev/zero of=./dd.out bs=4096000 count=1000
mc cp ./dd.out rustfs/srcbucket/
##### 查看是否成功
mc ls --versions rustfs/srcbucket/dd.out
mc ls --versions rustfs2/destbucket/dd.out

21
build_rustfs.sh Executable file
View File

@@ -0,0 +1,21 @@
#!/bin/bash
clear
# 获取当前平台架构
ARCH=$(uname -m)
# 根据架构设置 target 目录
if [ "$ARCH" == "x86_64" ]; then
TARGET_DIR="target/x86_64"
elif [ "$ARCH" == "aarch64" ]; then
TARGET_DIR="target/arm64"
else
TARGET_DIR="target/unknown"
fi
# 设置 CARGO_TARGET_DIR 并构建项目
CARGO_TARGET_DIR=$TARGET_DIR RUSTFLAGS="-C link-arg=-fuse-ld=mold" cargo build --package rustfs
echo -e "\a"
echo -e "\a"
echo -e "\a"

View File

@@ -0,0 +1,73 @@
use std::{collections::HashMap, i64, u64};
use crate::last_minute::{self};
pub struct ReplicationLatency {
// 单个和多部分 PUT 请求的延迟
upload_histogram: last_minute::LastMinuteHistogram,
}
impl ReplicationLatency {
// 合并两个 ReplicationLatency
pub fn merge(&mut self, other: &mut ReplicationLatency) -> &ReplicationLatency {
self.upload_histogram.merge(&other.upload_histogram);
return self;
}
// 获取上传延迟(按对象大小区间分类)
pub fn get_upload_latency(&mut self) -> HashMap<String, u64> {
let mut ret = HashMap::new();
let avg = self.upload_histogram.get_avg_data();
for (i, v) in avg.iter().enumerate() {
let avg_duration = v.avg();
ret.insert(self.size_tag_to_string(i), avg_duration.as_millis() as u64);
}
ret
}
pub fn update(&mut self, size: i64, during: std::time::Duration) {
self.upload_histogram.add(size, during);
}
// 模拟从 size tag 到字符串的转换
fn size_tag_to_string(&self, tag: usize) -> String {
match tag {
0 => String::from("Size < 1 KiB"),
1 => String::from("Size < 1 MiB"),
2 => String::from("Size < 10 MiB"),
3 => String::from("Size < 100 MiB"),
4 => String::from("Size < 1 GiB"),
_ => String::from("Size > 1 GiB"),
}
}
}
// #[derive(Debug, Clone, Default)]
// pub struct ReplicationLastMinute {
// pub last_minute: LastMinuteLatency,
// }
// impl ReplicationLastMinute {
// pub fn merge(&mut self, other: ReplicationLastMinute) -> ReplicationLastMinute {
// let mut nl = ReplicationLastMinute::default();
// nl.last_minute = self.last_minute.merge(&mut other.last_minute);
// nl
// }
// pub fn add_size(&mut self, n: i64) {
// let t = SystemTime::now()
// .duration_since(UNIX_EPOCH)
// .expect("Time went backwards")
// .as_secs();
// self.last_minute.add_all(t - 1, &AccElem { total: t - 1, size: n as u64, n: 1 });
// }
// pub fn get_total(&self) -> AccElem {
// self.last_minute.get_total()
// }
// }
// impl fmt::Display for ReplicationLastMinute {
// fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// let t = self.last_minute.get_total();
// write!(f, "ReplicationLastMinute sz= {}, n= {}, dur= {}", t.size, t.n, t.total)
// }
// }

View File

@@ -1,6 +1,81 @@
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{time::{Duration, SystemTime, UNIX_EPOCH}};
#[derive(Clone, Debug, Default)]
#[allow(dead_code)]
#[derive(Debug, Default)]
struct TimedAction {
count: u64,
acc_time: u64,
min_time: Option<u64>,
max_time: Option<u64>,
bytes: u64,
}
#[allow(dead_code)]
impl TimedAction {
// Avg returns the average time spent on the action.
pub fn avg(&self) -> Option<std::time::Duration> {
if self.count == 0 {
return None;
}
Some(std::time::Duration::from_nanos(self.acc_time / self.count))
}
// AvgBytes returns the average bytes processed.
pub fn avg_bytes(&self) -> u64 {
if self.count == 0 {
return 0;
}
self.bytes / self.count
}
// Merge other into t.
pub fn merge(&mut self, other: TimedAction) {
self.count += other.count;
self.acc_time += other.acc_time;
self.bytes += other.bytes;
if self.count == 0 {
self.min_time = other.min_time;
}
if let Some(other_min) = other.min_time {
self.min_time = self.min_time.map_or(Some(other_min), |min| Some(min.min(other_min)));
}
self.max_time = self
.max_time
.map_or(other.max_time, |max| Some(max.max(other.max_time.unwrap_or(0))));
}
}
#[allow(dead_code)]
#[derive(Debug)]
enum SizeCategory {
SizeLessThan1KiB = 0,
SizeLessThan1MiB,
SizeLessThan10MiB,
SizeLessThan100MiB,
SizeLessThan1GiB,
SizeGreaterThan1GiB,
// Add new entries here
SizeLastElemMarker,
}
#[allow(dead_code)]
impl SizeCategory {
fn to_string(&self) -> String {
match *self {
SizeCategory::SizeLessThan1KiB => "SizeLessThan1KiB".to_string(),
SizeCategory::SizeLessThan1MiB => "SizeLessThan1MiB".to_string(),
SizeCategory::SizeLessThan10MiB => "SizeLessThan10MiB".to_string(),
SizeCategory::SizeLessThan100MiB => "SizeLessThan100MiB".to_string(),
SizeCategory::SizeLessThan1GiB => "SizeLessThan1GiB".to_string(),
SizeCategory::SizeGreaterThan1GiB => "SizeGreaterThan1GiB".to_string(),
SizeCategory::SizeLastElemMarker => "SizeLastElemMarker".to_string(),
}
}
}
#[derive(Clone, Debug, Default, Copy)]
pub struct AccElem {
pub total: u64,
pub size: u64,
@@ -28,7 +103,7 @@ impl AccElem {
}
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct LastMinuteLatency {
pub totals: Vec<AccElem>,
pub last_sec: u64,
@@ -44,10 +119,11 @@ impl Default for LastMinuteLatency {
}
impl LastMinuteLatency {
pub fn merge(&mut self, o: &mut LastMinuteLatency) -> LastMinuteLatency {
pub fn merge(&mut self, o: &LastMinuteLatency) -> LastMinuteLatency {
let mut merged = LastMinuteLatency::default();
let mut x = o.clone();
if self.last_sec > o.last_sec {
o.forward_to(self.last_sec);
x.forward_to(self.last_sec);
merged.last_sec = self.last_sec;
} else {
self.forward_to(o.last_sec);
@@ -111,7 +187,6 @@ impl LastMinuteLatency {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -755,3 +830,48 @@ mod tests {
assert_eq!(elem.avg(), Duration::from_secs(0));
}
}
const SIZE_LAST_ELEM_MARKER: usize = 10; // 这里假设你的 marker 是 10请根据实际情况修改
#[allow(dead_code)]
#[derive(Debug, Default)]
pub struct LastMinuteHistogram {
histogram: Vec<LastMinuteLatency>,
size: u32,
}
impl LastMinuteHistogram {
pub fn merge(&mut self, other: &LastMinuteHistogram) {
for i in 0..self.histogram.len() {
self.histogram[i].merge(&other.histogram[i]);
}
}
pub fn add(&mut self, size: i64, t: std::time::Duration) {
let index = size_to_tag(size);
self.histogram[index].add(&t);
}
pub fn get_avg_data(&mut self) -> [AccElem; SIZE_LAST_ELEM_MARKER] {
let mut res = [AccElem::default(); SIZE_LAST_ELEM_MARKER];
for (i, elem) in self.histogram.iter_mut().enumerate() {
res[i] = elem.get_total();
}
res
}
}
fn size_to_tag(size: i64) -> usize {
match size {
_ if size < 1024 => 0, // sizeLessThan1KiB
_ if size < 1024 * 1024 => 1, // sizeLessThan1MiB
_ if size < 10 * 1024 * 1024 => 2, // sizeLessThan10MiB
_ if size < 100 * 1024 * 1024 => 3, // sizeLessThan100MiB
_ if size < 1024 * 1024 * 1024 => 4, // sizeLessThan1GiB
_ => 5, // sizeGreaterThan1GiB
}
}

View File

@@ -1,6 +1,7 @@
pub mod error;
pub mod globals;
pub mod last_minute;
pub mod bucket_stats;
// is ','
pub static DEFAULT_DELIMITER: u8 = 44;

View File

@@ -28,6 +28,97 @@ pub fn decrypt_data(password: &[u8], data: &[u8]) -> Result<Vec<u8>, crate::Erro
}
}
// use argon2::{Argon2, PasswordHasher};
// use argon2::password_hash::{SaltString};
// use aes_gcm::{Aes256Gcm, Key, Nonce}; // For AES-GCM
// use chacha20poly1305::{ChaCha20Poly1305, Key as ChaChaKey, Nonce as ChaChaNonce}; // For ChaCha20
// use pbkdf2::pbkdf2;
// use sha2::Sha256;
// use std::io::{self, Read};
// use thiserror::Error;
// #[derive(Debug, Error)]
// pub enum DecryptError {
// #[error("unexpected header")]
// UnexpectedHeader,
// #[error("invalid encryption algorithm ID")]
// InvalidAlgorithmId,
// #[error("IO error")]
// Io(#[from] io::Error),
// #[error("decryption error")]
// DecryptionError,
// }
// pub fn decrypt_data2<R: Read>(password: &str, mut data: R) -> Result<Vec<u8>, DecryptError> {
// // Parse the stream header
// let mut hdr = [0u8; 32 + 1 + 8];
// if data.read_exact(&mut hdr).is_err() {
// return Err(DecryptError::UnexpectedHeader);
// }
// let salt = &hdr[0..32];
// let id = hdr[32];
// let nonce = &hdr[33..41];
// let key = match id {
// // Argon2id + AES-GCM
// 0x01 => {
// let salt = SaltString::encode_b64(salt).map_err(|_| DecryptError::DecryptionError)?;
// let argon2 = Argon2::default();
// let hashed_key = argon2.hash_password(password.as_bytes(), &salt)
// .map_err(|_| DecryptError::DecryptionError)?;
// hashed_key.hash.unwrap().as_bytes().to_vec()
// }
// // Argon2id + ChaCha20Poly1305
// 0x02 => {
// let salt = SaltString::encode_b64(salt).map_err(|_| DecryptError::DecryptionError)?;
// let argon2 = Argon2::default();
// let hashed_key = argon2.hash_password(password.as_bytes(), &salt)
// .map_err(|_| DecryptError::DecryptionError)?;
// hashed_key.hash.unwrap().as_bytes().to_vec()
// }
// // PBKDF2 + AES-GCM
// // 0x03 => {
// // let mut key = [0u8; 32];
// // pbkdf2::<Sha256>(password.as_bytes(), salt, 10000, &mut key);
// // key.to_vec()
// // }
// _ => return Err(DecryptError::InvalidAlgorithmId),
// };
// // Decrypt data using the corresponding cipher
// let mut encrypted_data = Vec::new();
// data.read_to_end(&mut encrypted_data)?;
// let plaintext = match id {
// 0x01 => {
// let cipher = Aes256Gcm::new(Key::from_slice(&key));
// let nonce = Nonce::from_slice(nonce);
// cipher
// .decrypt(nonce, encrypted_data.as_ref())
// .map_err(|_| DecryptError::DecryptionError)?
// }
// 0x02 => {
// let cipher = ChaCha20Poly1305::new(ChaChaKey::from_slice(&key));
// let nonce = ChaChaNonce::from_slice(nonce);
// cipher
// .decrypt(nonce, encrypted_data.as_ref())
// .map_err(|_| DecryptError::DecryptionError)?
// }
// 0x03 => {
// let cipher = Aes256Gcm::new(Key::from_slice(&key));
// let nonce = Nonce::from_slice(nonce);
// cipher
// .decrypt(nonce, encrypted_data.as_ref())
// .map_err(|_| DecryptError::DecryptionError)?
// }
// _ => return Err(DecryptError::InvalidAlgorithmId),
// };
// Ok(plaintext)
// }
#[cfg(any(test, feature = "crypto"))]
#[inline]
fn decryp<T: aes_gcm::aead::Aead>(stream: T, nonce: &[u8], data: &[u8]) -> Result<Vec<u8>, crate::Error> {

View File

@@ -58,7 +58,7 @@ tokio-stream = { workspace = true }
tonic.workspace = true
tower.workspace = true
byteorder = "1.5.0"
xxhash-rust = { version = "0.8.15", features = ["xxh64"] }
xxhash-rust = { version = "0.8.15", features = ["xxh64","xxh3"] }
num = "0.4.3"
num_cpus = { workspace = true }
s3s-policy.workspace = true
@@ -67,7 +67,14 @@ pin-project-lite.workspace = true
md-5.workspace = true
madmin.workspace = true
workers.workspace = true
reqwest = { workspace = true }
reqwest = "0.12.12"
aws-sdk-s3 = "1.29.0"
#log.workspace = true
once_cell = "1.20.3"
aws-smithy-types = "1.2.13"
rustfs-rsc = "2025.220.3"
hyper.workspace = true
#reqwest = { workspace = true }
urlencoding = "2.1.3"
smallvec = { workspace = true }
shadow-rs.workspace = true

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use time::OffsetDateTime;
use tracing::error;
use crate::bucket::target::BucketTarget;
use crate::config::com::{read_config, save_config};
use crate::{config, new_object_layer_fn};
use common::error::{Error, Result};
@@ -278,8 +279,11 @@ impl BucketMetadata {
self.replication_config_updated_at = updated;
}
BUCKET_TARGETS_FILE => {
self.tagging_config_xml = data;
self.tagging_config_updated_at = updated;
// let x = data.clone();
// let str = std::str::from_utf8(&x).expect("Invalid UTF-8");
// println!("update config:{}", str);
self.bucket_targets_config_json = data.clone();
self.bucket_targets_config_updated_at = updated;
}
_ => return Err(Error::msg(format!("config file not found : {}", config_file))),
}
@@ -340,8 +344,10 @@ impl BucketMetadata {
if !self.replication_config_xml.is_empty() {
self.replication_config = Some(deserialize::<ReplicationConfiguration>(&self.replication_config_xml)?);
}
//let temp = self.bucket_targets_config_json.clone();
if !self.bucket_targets_config_json.is_empty() {
self.bucket_target_config = Some(BucketTargets::unmarshal(&self.bucket_targets_config_json)?);
let arr:Vec<BucketTarget> = serde_json::from_slice(&self.bucket_targets_config_json)?;
self.bucket_target_config = Some(BucketTargets { targets: arr });
} else {
self.bucket_target_config = Some(BucketTargets::default())
}

View File

@@ -6,6 +6,7 @@ use std::{collections::HashMap, sync::Arc};
use crate::bucket::error::BucketMetadataError;
use crate::bucket::metadata::{load_bucket_metadata_parse, BUCKET_LIFECYCLE_CONFIG};
use crate::bucket::utils::is_meta_bucketname;
use crate::cmd::bucket_targets;
use crate::config::error::ConfigError;
use crate::disk::error::DiskError;
use crate::global::{is_dist_erasure, is_erasure, new_object_layer_fn, GLOBAL_Endpoints};
@@ -228,7 +229,9 @@ impl BucketMetadataSys {
match res {
Ok(res) => {
if let Some(bucket) = buckets.get(idx) {
mp.insert(bucket.clone(), Arc::new(res));
let x = Arc::new(res);
mp.insert(bucket.clone(), x.clone());
bucket_targets::init_bucket_targets(bucket, x.clone()).await;
}
}
Err(e) => {
@@ -340,6 +343,7 @@ impl BucketMetadataSys {
}
pub async fn get_config_from_disk(&self, bucket: &str) -> Result<BucketMetadata> {
println!("load data from disk");
if is_meta_bucketname(bucket) {
return Err(Error::msg("errInvalidArgument"));
}
@@ -549,7 +553,12 @@ impl BucketMetadataSys {
pub async fn get_replication_config(&self, bucket: &str) -> Result<(ReplicationConfiguration, OffsetDateTime)> {
let (bm, reload) = match self.get_config(bucket).await {
Ok(res) => res,
Ok(res) => {
if res.0.replication_config.is_none() {
return Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound));
}
res
},
Err(err) => {
warn!("get_replication_config err {:?}", &err);
return if config::error::is_err_config_not_found(&err) {
@@ -564,7 +573,7 @@ impl BucketMetadataSys {
if reload {
// TODO: globalBucketTargetSys
}
//println!("549 {:?}", config.clone());
Ok((config.clone(), bm.replication_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound))
@@ -584,9 +593,13 @@ impl BucketMetadataSys {
}
};
println!("573");
if let Some(config) = &bm.bucket_target_config {
if reload {
// TODO: globalBucketTargetSys
//config.
}
Ok(config.clone())

View File

@@ -5,7 +5,8 @@ pub mod object_lock;
pub mod policy_sys;
mod quota;
pub mod tagging;
mod target;
pub mod target;
pub mod utils;
pub mod versioning;
pub mod replication;
pub mod versioning_sys;

View File

@@ -0,0 +1,27 @@
// Replication status type for x-amz-replication-status header
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StatusType {
Pending,
Completed,
CompletedLegacy,
Failed,
Replica,
}
impl StatusType {
// Converts the enum variant to its string representation
pub fn as_str(&self) -> &'static str {
match self {
StatusType::Pending => "PENDING",
StatusType::Completed => "COMPLETED",
StatusType::CompletedLegacy => "COMPLETE",
StatusType::Failed => "FAILED",
StatusType::Replica => "REPLICA",
}
}
// Checks if the status is empty (not set)
pub fn is_empty(&self) -> bool {
matches!(self, StatusType::Pending) // Adjust this as needed
}
}

View File

@@ -0,0 +1 @@
pub mod datatypes;

View File

@@ -25,6 +25,7 @@ pub fn encode_tags(tags: Vec<Tag>) -> String {
for tag in tags.iter() {
if let (Some(k), Some(v)) = (tag.key.as_ref(), tag.value.as_ref()) {
//encoded.append_pair(k.as_ref().unwrap().as_str(), v.as_ref().unwrap().as_str());
encoded.append_pair(k.as_str(), v.as_str());
}
}

View File

@@ -1,15 +1,16 @@
use common::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use time::OffsetDateTime;
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Credentials {
access_key: String,
secret_key: String,
session_token: Option<String>,
expiration: Option<OffsetDateTime>,
#[serde(rename = "accessKey")]
pub access_key: String,
#[serde(rename = "secretKey")]
pub secret_key: String,
pub session_token: Option<String>,
pub expiration: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
@@ -20,52 +21,53 @@ pub enum ServiceType {
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct LatencyStat {
curr: Duration, // 当前延迟
avg: Duration, // 平均延迟
max: Duration, // 最大延迟
curr: u64, // 当前延迟
avg: u64, // 平均延迟
max: u64, // 最大延迟
}
// 定义 BucketTarget 结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketTarget {
source_bucket: String,
#[serde(rename = "sourcebucket")]
pub source_bucket: String,
endpoint: String,
pub endpoint: String,
credentials: Option<Credentials>,
target_bucket: String,
pub credentials: Option<Credentials>,
#[serde(rename = "targetbucket")]
pub target_bucket: String,
secure: bool,
path: Option<String>,
pub path: Option<String>,
api: Option<String>,
arn: Option<String>,
pub arn: Option<String>,
#[serde(rename = "type")]
pub type_: Option<String>,
type_: ServiceType,
region: Option<String>,
pub region: Option<String>,
bandwidth_limit: Option<i64>,
#[serde(rename = "replicationSync")]
replication_sync: bool,
storage_class: Option<String>,
health_check_duration: Option<Duration>,
#[serde(rename = "healthCheckDuration")]
health_check_duration: u64,
#[serde(rename = "disableProxy")]
disable_proxy: bool,
reset_before_date: Option<OffsetDateTime>,
#[serde(rename = "resetBeforeDate")]
reset_before_date: String,
reset_id: Option<String>,
total_downtime: Duration,
#[serde(rename = "totalDowntime")]
total_downtime: u64,
last_online: Option<OffsetDateTime>,
#[serde(rename = "isOnline")]
online: bool,
latency: LatencyStat,
@@ -73,6 +75,15 @@ pub struct BucketTarget {
deployment_id: Option<String>,
edge: bool,
#[serde(rename = "edgeSyncBeforeExpiry")]
edge_sync_before_expiry: bool,
}
impl BucketTarget {
pub fn is_empty(self) -> bool {
//self.target_bucket.is_empty() && self.endpoint.is_empty() && self.arn.is_empty()
self.target_bucket.is_empty() && self.endpoint.is_empty() && self.arn.is_none()
}
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
@@ -93,4 +104,18 @@ impl BucketTargets {
let t: BucketTargets = rmp_serde::from_slice(buf)?;
Ok(t)
}
pub fn is_empty(&self) -> bool {
if self.targets.is_empty() {
return true;
}
for target in &self.targets {
if !target.clone().is_empty() {
return false;
}
}
true
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,55 @@
use std::collections::HashMap;
use chrono::{DateTime, Utc};
// Representation of the replication status
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StatusType {
Pending,
Completed,
CompletedLegacy,
Failed,
Replica,
}
// Representation of version purge status type (customize as needed)
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum VersionPurgeStatusType {
Pending,
Completed,
Failed,
}
// ReplicationState struct definition
#[derive(Debug, Clone)]
pub struct ReplicationState {
// Timestamp when the last replica update was received
pub replica_time_stamp: DateTime<Utc>,
// Replica status
pub replica_status: StatusType,
// Represents DeleteMarker replication state
pub delete_marker: bool,
// Timestamp when the last replication activity happened
pub replication_time_stamp: DateTime<Utc>,
// Stringified representation of all replication activity
pub replication_status_internal: String,
// Stringified representation of all version purge statuses
// Example format: "arn1=PENDING;arn2=COMPLETED;"
pub version_purge_status_internal: String,
// Stringified representation of replication decision for each target
pub replicate_decision_str: String,
// Map of ARN -> replication status for ongoing replication activity
pub targets: HashMap<String, StatusType>,
// Map of ARN -> VersionPurgeStatus for all the targets
pub purge_targets: HashMap<String, VersionPurgeStatusType>,
// Map of ARN -> stringified reset id and timestamp for all the targets
pub reset_statuses_map: HashMap<String, String>,
}

View File

@@ -0,0 +1,864 @@
#![allow(unused_variables)]
#![allow(dead_code)]
use crate::{
bucket::{metadata_sys, target::BucketTarget},
endpoints::Node,
peer::{PeerS3Client, RemotePeerS3Client},
StorageAPI,
};
use crate::{
bucket::{self, target::BucketTargets},
new_object_layer_fn, peer, store_api,
};
use chrono::Utc;
use lazy_static::lazy_static;
use std::{
collections::HashMap,
time::{Duration, SystemTime},
};
//use tokio::sync::RwLock;
use aws_sdk_s3::Client as S3Client;
use std::sync::{Arc};
use tokio::sync::RwLock;
use thiserror::Error;
pub struct TClient {
pub s3cli: S3Client,
pub remote_peer_client: peer::RemotePeerS3Client,
pub arn: String,
}
impl TClient {
pub fn new(s3cli: S3Client, remote_peer_client: RemotePeerS3Client, arn: String) -> Self {
TClient {
s3cli,
remote_peer_client,
arn,
}
}
}
pub struct EpHealth {
pub endpoint: String,
pub scheme: String,
pub online: bool,
pub last_online: SystemTime,
pub last_hc_at: SystemTime,
pub offline_duration: Duration,
pub latency: LatencyStat, // Assuming LatencyStat is a custom struct
}
impl EpHealth {
pub fn new(
endpoint: String,
scheme: String,
online: bool,
last_online: SystemTime,
last_hc_at: SystemTime,
offline_duration: Duration,
latency: LatencyStat,
) -> Self {
EpHealth {
endpoint,
scheme,
online,
last_online,
last_hc_at,
offline_duration,
latency,
}
}
}
pub struct LatencyStat {
// Define the fields of LatencyStat as per your requirements
}
pub struct ArnTarget {
client: TargetClient,
last_refresh: chrono::DateTime<Utc>,
}
impl ArnTarget {
pub fn new(bucket: String, endpoint: String, ak:String, sk:String) -> Self {
Self {
client: TargetClient {
bucket: bucket,
storage_class: "STANDRD".to_string(),
disable_proxy: false,
health_check_duration: Duration::from_secs(100),
endpoint: endpoint,
reset_id: "0".to_string(),
replicate_sync: false,
secure: false,
arn: "".to_string(),
client: reqwest::Client::new(),
ak:ak,
sk:sk,
},
last_refresh: Utc::now(),
}
}
}
// pub fn get_s3client_from_para(
// ak: &str,
// sk: &str,
// url: &str,
// _region: &str,
// ) -> Result<S3Client, Box<dyn Error>> {
// let credentials = Credentials::new(ak, sk, None, None, "");
// let region = Region::new("us-east-1".to_string());
// let config = Config::builder()
// .region(region)
// .endpoint_url(url.to_string())
// .credentials_provider(credentials)
// .behavior_version(BehaviorVersion::latest()) // Adjust as necessary
// .build();
// Ok(S3Client::from_conf(config))
// }
pub struct BucketTargetSys {
arn_remote_map: Arc<RwLock<HashMap<String, ArnTarget>>>,
targets_map: Arc<RwLock<HashMap<String, Vec<bucket::target::BucketTarget>>>>,
hc: HashMap<String, EpHealth>,
//store:Option<Arc<ecstore::store::ECStore>>,
}
lazy_static! {
pub static ref GLOBAL_Bucket_Target_Sys: std::sync::OnceLock<BucketTargetSys> = BucketTargetSys::new().into();
}
//#[derive(Debug)]
// pub enum SetTargetError {
// NotFound,
// }
pub async fn get_bucket_target_client(bucket: &str, arn: &str) -> Result<TargetClient, SetTargetError> {
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
sys.get_remote_target_client2(arn).await
} else {
Err(SetTargetError::TargetNotFound(bucket.to_string()))
}
}
#[derive(Debug)]
pub struct BucketRemoteTargetNotFound {
pub bucket: String,
}
pub async fn init_bucket_targets(bucket: &str, meta: Arc<bucket::metadata::BucketMetadata>) {
println!("140 {}", bucket);
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
if let Some(tgts) = meta.bucket_target_config.clone() {
for tgt in tgts.targets {
warn!("ak and sk is:{:?}",tgt.credentials);
let _ = sys.set_target(bucket, &tgt, false, true).await;
//sys.targets_map.
}
}
}
}
pub async fn remove_bucket_target(bucket: &str, arn_str:&str) {
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
let _ = sys.remove_target(bucket, arn_str).await;
}
}
impl BucketTargetSys {
pub fn new() -> Self {
BucketTargetSys {
arn_remote_map: Arc::new(RwLock::new(HashMap::new())),
targets_map: Arc::new(RwLock::new(HashMap::new())),
hc: HashMap::new(),
}
}
pub async fn list_bucket_targets(&self, bucket: &str) -> Result<BucketTargets, BucketRemoteTargetNotFound> {
let targets_map = self.targets_map.read().await;
if let Some(targets) = targets_map.get(bucket) {
Ok(BucketTargets {
targets: targets.clone(),
})
} else {
Err(BucketRemoteTargetNotFound {
bucket: bucket.to_string(),
})
}
}
pub async fn list_targets(&self, bucket: Option<&str>, _arn_type: Option<&str>) -> Vec<BucketTarget> {
let _ = _arn_type;
//let health_stats = self.health_stats();
let mut targets = Vec::new();
if let Some(bucket_name) = bucket {
if let Ok(ts) = self.list_bucket_targets(bucket_name).await {
for t in ts.targets {
//if arn_type.map_or(true, |arn| t.target_type == arn) {
//if let Some(hs) = health_stats.get(&t.url().host) {
// t.total_downtime = hs.offline_duration;
// t.online = hs.online;
// t.last_online = hs.last_online;
// t.latency = LatencyStat {
// curr: hs.latency.curr,
// avg: hs.latency.avg,
// max: hs.latency.peak,
// };
//}
targets.push(t.clone());
//}
}
}
return targets;
}
// Locking and iterating over all targets in the system
let targets_map = self.targets_map.read().await;
for tgts in targets_map.values() {
for t in tgts {
//if arn_type.map_or(true, |arn| t.target_type == arn) {
// if let Some(hs) = health_stats.get(&t.url().host) {
// t.total_downtime = hs.offline_duration;
// t.online = hs.online;
// t.last_online = hs.last_online;
// t.latency = LatencyStat {
// curr: hs.latency.curr,
// avg: hs.latency.avg,
// max: hs.latency.peak,
// };
// }
targets.push(t.clone());
//}
}
}
targets
}
pub async fn remove_target(&self, bucket:&str, arn_str:&str) -> Result<(), SetTargetError> {
//to do need lock;
let mut targets_map = self.targets_map.write().await;
let tgts = targets_map.get(bucket);
let mut arn_remotes_map = self.arn_remote_map.write().await;
if tgts.is_none() {
//Err(SetTargetError::TargetNotFound(bucket.to_string()));
return Ok(());
}
let tgts = tgts.unwrap(); // 安全解引用
let mut targets = Vec::with_capacity(tgts.len());
let mut found = false;
// 遍历 targets找出不匹配的 ARN
for tgt in tgts {
if tgt.arn != Some(arn_str.to_string()) {
targets.push(tgt.clone()); // 克隆符合条件的项
} else {
found = true; // 找到匹配的 ARN
}
}
// 如果没有找到匹配的 ARN则返回错误
if !found {
return Ok(());
}
// 更新 targets_map
targets_map.insert(bucket.to_string(), targets);
arn_remotes_map.remove(arn_str);
let targets = self.list_targets(Some(&bucket), None).await;
println!("targets is {}", targets.len());
match serde_json::to_vec(&targets) {
Ok(json) => {
let _ = metadata_sys::update(bucket, "bucket-targets.json", json).await;
}
Err(e) => {
println!("序列化失败{}", e);
}
}
Ok(())
}
pub async fn get_remote_arn(&self, bucket: &str, target: Option<&BucketTarget>, depl_id: &str) -> (Option<String>, bool) {
if target.is_none() {
return (None, false);
}
let target = target.unwrap();
let targets_map = self.targets_map.read().await;
// 获取锁以访问 arn_remote_map
let mut _arn_remotes_map = self.arn_remote_map.read().await;
if let Some(tgts) = targets_map.get(bucket) {
for tgt in tgts {
if tgt.type_ == target.type_
&& tgt.target_bucket == target.target_bucket
&& tgt.endpoint == target.endpoint
&& tgt.credentials.as_ref().unwrap().access_key == target.credentials.as_ref().unwrap().access_key
{
return (tgt.arn.clone(), true);
}
}
}
// if !target.type_.is_valid() {
// return (None, false);
// }
println!("generate_arn");
(Some(generate_arn(target.clone(), depl_id.to_string())), false)
}
pub async fn get_remote_target_client2(&self, arn: &str) -> Result<TargetClient, SetTargetError> {
let map = self.arn_remote_map.read().await;
info!("get remote target client and arn is: {}", arn);
if let Some(value) = map.get(arn) {
let mut x = value.client.clone();
x.arn = arn.to_string();
Ok(x)
} else {
error!("not find target");
Err(SetTargetError::TargetNotFound(arn.to_string()))
}
}
// pub async fn get_remote_target_client(&self, _tgt: &BucketTarget) -> Result<TargetClient, SetTargetError> {
// // Mocked implementation for obtaining a remote client
// let tcli = TargetClient {
// bucket: _tgt.target_bucket.clone(),
// storage_class: "STANDRD".to_string(),
// disable_proxy: false,
// health_check_duration: Duration::from_secs(100),
// endpoint: _tgt.endpoint.clone(),
// reset_id: "0".to_string(),
// replicate_sync: false,
// secure: false,
// arn: "".to_string(),
// client: reqwest::Client::new(),
// ak: _tgt.
// };
// Ok(tcli)
// }
// pub async fn get_remote_target_client_with_bucket(&self, _bucket: String) -> Result<TargetClient, SetTargetError> {
// // Mocked implementation for obtaining a remote client
// let tcli = TargetClient {
// bucket: _tgt.target_bucket.clone(),
// storage_class: "STANDRD".to_string(),
// disable_proxy: false,
// health_check_duration: Duration::from_secs(100),
// endpoint: _tgt.endpoint.clone(),
// reset_id: "0".to_string(),
// replicate_sync: false,
// secure: false,
// arn: "".to_string(),
// client: reqwest::Client::new(),
// };
// Ok(tcli)
// }
async fn local_is_bucket_versioned(&self, _bucket: &str) -> bool {
let Some(store) = new_object_layer_fn() else {
return false;
};
//store.get_bucket_info(bucket, opts)
// let binfo:BucketInfo = store
// .get_bucket_info(bucket, &ecstore::store_api::BucketOptions::default()).await;
match store.get_bucket_info(_bucket, &store_api::BucketOptions::default()).await {
Ok(info) => {
println!("Bucket Info: {:?}", info);
return info.versionning;
}
Err(err) => {
eprintln!("Error: {:?}", err);
return false;
}
}
}
async fn is_bucket_versioned(&self, _bucket: &str) -> bool {
return true;
// let url_str = "http://127.0.0.1:9001";
// // 转换为 Url 类型
// let parsed_url = url::Url::parse(url_str).unwrap();
// let node = Node {
// url: parsed_url,
// pools: vec![],
// is_local: false,
// grid_host: "".to_string(),
// };
// let cli = ecstore::peer::RemotePeerS3Client::new(Some(node), None);
// match cli.get_bucket_info(_bucket, &ecstore::store_api::BucketOptions::default()).await
// {
// Ok(info) => {
// println!("Bucket Info: {:?}", info);
// info.versionning
// }
// Err(err) => {
// eprintln!("Error: {:?}", err);
// return false;
// }
// }
}
pub async fn set_target(&self, bucket: &str, tgt: &BucketTarget, update: bool, fromdisk: bool) -> Result<(), SetTargetError> {
// if !tgt.type_.is_valid() && !update {
// return Err(SetTargetError::InvalidTargetType(bucket.to_string()));
// }
//let client = self.get_remote_target_client(tgt).await?;
if tgt.type_ == Some("replication".to_string()) {
if !fromdisk {
let versioning_config = self.local_is_bucket_versioned(bucket).await;
if !versioning_config {
// println!("111111111");
return Err(SetTargetError::TargetNotVersioned(bucket.to_string()));
}
}
}
let url_str = format!("http://{}", tgt.endpoint.clone());
println!("url str is {}", url_str);
// 转换为 Url 类型
let parsed_url = url::Url::parse(&url_str).unwrap();
let node = Node {
url: parsed_url,
pools: vec![],
is_local: false,
grid_host: "".to_string(),
};
let cli = peer::RemotePeerS3Client::new(Some(node), None);
match cli
.get_bucket_info(&tgt.target_bucket, &store_api::BucketOptions::default())
.await
{
Ok(info) => {
println!("Bucket Info: {:?}", info);
if !info.versionning {
println!("2222222222 {}", info.versionning);
return Err(SetTargetError::TargetNotVersioned(tgt.target_bucket.to_string()));
}
}
Err(err) => {
println!("remote bucket 369 is:{}", tgt.target_bucket);
eprintln!("Error: {:?}", err);
return Err(SetTargetError::SourceNotVersioned(tgt.target_bucket.to_string()));
}
}
//if tgt.target_type == BucketTargetType::ReplicationService {
// Check if target is a MinIO server and alive
// let hc_result = tokio::time::timeout(Duration::from_secs(3), client.health_check(&tgt.endpoint)).await;
// match hc_result {
// Ok(Ok(true)) => {} // Server is alive
// Ok(Ok(false)) | Ok(Err(_)) | Err(_) => {
// return Err(SetTargetError::HealthCheckFailed(tgt.target_bucket.clone()));
// }
// }
//Lock and update target maps
let mut targets_map = self.targets_map.write().await;
let mut arn_remotes_map = self.arn_remote_map.write().await;
let targets = targets_map.entry(bucket.to_string()).or_default();
let mut found = false;
for existing_target in targets.iter_mut() {
println!("418 exist:{}", existing_target.source_bucket.clone());
if existing_target.type_ == tgt.type_ {
if existing_target.arn == tgt.arn {
if !update {
return Err(SetTargetError::TargetAlreadyExists(existing_target.target_bucket.clone()));
}
*existing_target = tgt.clone();
found = true;
break;
}
if existing_target.endpoint == tgt.endpoint {
println!("endpoint is same:{}", tgt.endpoint.clone());
return Err(SetTargetError::TargetAlreadyExists(existing_target.target_bucket.clone()));
}
}
}
if !found && !update {
println!("437 exist:{}", tgt.arn.clone().unwrap());
targets.push(tgt.clone());
}
let arntgt: ArnTarget = ArnTarget::new(tgt.target_bucket.clone(), tgt.endpoint.clone(),tgt.credentials.clone().unwrap().access_key.clone(), tgt.credentials.clone().unwrap().secret_key);
arn_remotes_map.insert(tgt.arn.clone().unwrap().clone(), arntgt);
//self.update_bandwidth_limit(bucket, &tgt.arn, tgt.bandwidth_limit).await;
Ok(())
}
}
#[derive(Clone)]
pub struct TargetClient {
pub client: reqwest::Client, // Using reqwest HTTP client
pub health_check_duration: Duration,
pub bucket: String, // Remote bucket target
pub replicate_sync: bool,
pub storage_class: String, // Storage class on remote
pub disable_proxy: bool,
pub arn: String, // ARN to uniquely identify remote target
pub reset_id: String,
pub endpoint: String,
pub secure: bool,
pub ak:String,
pub sk:String,
}
impl TargetClient {
pub fn new(
client: reqwest::Client,
health_check_duration: Duration,
bucket: String,
replicate_sync: bool,
storage_class: String,
disable_proxy: bool,
arn: String,
reset_id: String,
endpoint: String,
secure: bool,
ak:String,
sk:String,
) -> Self {
TargetClient {
client,
health_check_duration,
bucket,
replicate_sync,
storage_class,
disable_proxy,
arn,
reset_id,
endpoint,
secure,
ak,
sk,
}
}
pub async fn bucket_exists(&self, _bucket: &str) -> Result<bool, SetTargetError> {
Ok(true) // Mocked implementation
}
}
use tracing::{error, warn, info};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct VersioningConfig {
pub enabled: bool,
}
impl VersioningConfig {
pub fn is_enabled(&self) -> bool {
self.enabled
}
}
#[derive(Debug)]
pub struct Client;
impl Client {
pub async fn bucket_exists(&self, _bucket: &str) -> Result<bool, SetTargetError> {
Ok(true) // Mocked implementation
}
pub async fn get_bucket_versioning(&self, _bucket: &str) -> Result<VersioningConfig, SetTargetError> {
Ok(VersioningConfig { enabled: true })
}
pub async fn health_check(&self, _endpoint: &str) -> Result<bool, SetTargetError> {
Ok(true) // Mocked health check
}
}
#[derive(Debug, PartialEq)]
pub struct ServiceType(String);
impl ServiceType {
pub fn is_valid(&self) -> bool {
!self.0.is_empty() // 根据需求添加具体的验证逻辑
}
}
#[derive(Debug, PartialEq)]
pub struct ARN {
pub arn_type: String,
pub id: String,
pub region: String,
pub bucket: String,
}
impl ARN {
/// 检查 ARN 是否为空
pub fn is_empty(&self) -> bool {
//!self.arn_type.is_valid()
return false;
}
/// 将 ARN 转为字符串格式
pub fn to_string(&self) -> String {
format!("arn:minio:{}:{}:{}:{}", self.arn_type, self.region, self.id, self.bucket)
}
/// 从字符串解析 ARN
pub fn parse(s: &str) -> Result<Self, String> {
// ARN 必须是格式 arn:minio:<Type>:<REGION>:<ID>:<remote-bucket>
if !s.starts_with("arn:minio:") {
return Err(format!("Invalid ARN {}", s).into());
}
let tokens: Vec<&str> = s.split(':').collect();
if tokens.len() != 6 || tokens[4].is_empty() || tokens[5].is_empty() {
return Err(format!("Invalid ARN {}", s).into());
}
Ok(ARN {
arn_type: tokens[2].to_string(),
region: tokens[3].to_string(),
id: tokens[4].to_string(),
bucket: tokens[5].to_string(),
})
}
}
// 实现 `Display` trait使得可以直接使用 `format!` 或 `{}` 输出 ARN
impl std::fmt::Display for ARN {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_string())
}
}
fn must_get_uuid() -> String {
Uuid::new_v4().to_string()
// match Uuid::new_v4() {
// Ok(uuid) => uuid.to_string(),
// Err(err) => {
// error!("Critical error: {}", err);
// panic!("Failed to generate UUID: {}", err); // Ensures similar behavior as Go's logger.CriticalIf
// }
// }
}
fn generate_arn(target: BucketTarget, depl_id: String) -> String {
let mut uuid: String = depl_id;
if uuid == "" {
uuid = must_get_uuid();
}
let arn: ARN = ARN {
arn_type: target.type_.unwrap(),
id: (uuid),
region: "us-east-1".to_string(),
bucket: (target.target_bucket),
};
return arn.to_string();
}
// use std::collections::HashMap;
// use std::sync::{Arc, Mutex, RwLock};
// use std::time::Duration;
// use tokio::time::timeout;
// use tokio::sync::RwLock as AsyncRwLock;
// use serde::Deserialize;
// use thiserror::Error;
// #[derive(Debug, Clone, PartialEq)]
// pub enum BucketTargetType {
// ReplicationService,
// // Add other service types as needed
// }
// impl BucketTargetType {
// pub fn is_valid(&self) -> bool {
// matches!(self, BucketTargetType::ReplicationService)
// }
// }
// #[derive(Debug, Clone)]
// pub struct BucketTarget {
// pub arn: String,
// pub target_bucket: String,
// pub endpoint: String,
// pub credentials: Credentials,
// pub secure: bool,
// pub bandwidth_limit: Option<u64>,
// pub target_type: BucketTargetType,
// }
// #[derive(Debug, Clone)]
// pub struct Credentials {
// pub access_key: String,
// pub secret_key: String,
// }
// #[derive(Debug)]
// pub struct BucketTargetSys {
// targets_map: Arc<RwLock<HashMap<String, Vec<BucketTarget>>>>,
// arn_remotes_map: Arc<Mutex<HashMap<String, ArnTarget>>>,
// }
// impl BucketTargetSys {
// pub fn new() -> Self {
// Self {
// targets_map: Arc::new(RwLock::new(HashMap::new())),
// arn_remotes_map: Arc::new(Mutex::new(HashMap::new())),
// }
// }
// pub async fn set_target(
// &self,
// bucket: &str,
// tgt: &BucketTarget,
// update: bool,
// ) -> Result<(), SetTargetError> {
// if !tgt.target_type.is_valid() && !update {
// return Err(SetTargetError::InvalidTargetType(bucket.to_string()));
// }
// let client = self.get_remote_target_client(tgt).await?;
// // Validate if target credentials are OK
// let exists = client.bucket_exists(&tgt.target_bucket).await?;
// if !exists {
// return Err(SetTargetError::TargetNotFound(tgt.target_bucket.clone()));
// }
// if tgt.target_type == BucketTargetType::ReplicationService {
// if !self.is_bucket_versioned(bucket).await {
// return Err(SetTargetError::SourceNotVersioned(bucket.to_string()));
// }
// let versioning_config = client.get_bucket_versioning(&tgt.target_bucket).await?;
// if !versioning_config.is_enabled() {
// return Err(SetTargetError::TargetNotVersioned(tgt.target_bucket.clone()));
// }
// }
// // Check if target is a MinIO server and alive
// let hc_result = timeout(Duration::from_secs(3), client.health_check(&tgt.endpoint)).await;
// match hc_result {
// Ok(Ok(true)) => {} // Server is alive
// Ok(Ok(false)) | Ok(Err(_)) | Err(_) => {
// return Err(SetTargetError::HealthCheckFailed(tgt.target_bucket.clone()));
// }
// }
// // Lock and update target maps
// let mut targets_map = self.targets_map.write().await;
// let mut arn_remotes_map = self.arn_remotes_map.lock().unwrap();
// let targets = targets_map.entry(bucket.to_string()).or_default();
// let mut found = false;
// for existing_target in targets.iter_mut() {
// if existing_target.target_type == tgt.target_type {
// if existing_target.arn == tgt.arn {
// if !update {
// return Err(SetTargetError::TargetAlreadyExists(existing_target.target_bucket.clone()));
// }
// *existing_target = tgt.clone();
// found = true;
// break;
// }
// if existing_target.endpoint == tgt.endpoint {
// return Err(SetTargetError::TargetAlreadyExists(existing_target.target_bucket.clone()));
// }
// }
// }
// if !found && !update {
// targets.push(tgt.clone());
// }
// arn_remotes_map.insert(tgt.arn.clone(), ArnTarget { client });
// self.update_bandwidth_limit(bucket, &tgt.arn, tgt.bandwidth_limit).await;
// Ok(())
// }
// async fn get_remote_target_client(&self, tgt: &BucketTarget) -> Result<Client, SetTargetError> {
// // Mocked implementation for obtaining a remote client
// Ok(Client {})
// }
// async fn is_bucket_versioned(&self, bucket: &str) -> bool {
// // Mocked implementation for checking if a bucket is versioned
// true
// }
// async fn update_bandwidth_limit(
// &self,
// bucket: &str,
// arn: &str,
// limit: Option<u64>,
// ) {
// // Mocked implementation for updating bandwidth limits
// }
// }
// #[derive(Debug)]
// pub struct Client;
// impl Client {
// pub async fn bucket_exists(&self, _bucket: &str) -> Result<bool, SetTargetError> {
// Ok(true) // Mocked implementation
// }
// pub async fn get_bucket_versioning(
// &self,
// _bucket: &str,
// ) -> Result<VersioningConfig, SetTargetError> {
// Ok(VersioningConfig { enabled: true })
// }
// pub async fn health_check(&self, _endpoint: &str) -> Result<bool, SetTargetError> {
// Ok(true) // Mocked health check
// }
// }
// #[derive(Debug, Clone)]
// pub struct ArnTarget {
// pub client: Client,
// }
#[derive(Debug, Error)]
pub enum SetTargetError {
#[error("Invalid target type for bucket {0}")]
InvalidTargetType(String),
#[error("Target bucket {0} not found")]
TargetNotFound(String),
#[error("Source bucket {0} is not versioned")]
SourceNotVersioned(String),
#[error("Target bucket {0} is not versioned")]
TargetNotVersioned(String),
#[error("Health check failed for bucket {0}")]
HealthCheckFailed(String),
#[error("Target bucket {0} already exists")]
TargetAlreadyExists(String),
}

2
ecstore/src/cmd/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod bucket_targets;
pub mod bucket_replication;

View File

@@ -2430,7 +2430,7 @@ impl DiskAPI for LocalDisk {
for info in obj_infos.iter() {
let done = ScannerMetrics::time(ScannerMetric::ApplyVersion);
let sz: usize;
(obj_deleted, sz) = item.apply_actions(info, &size_s).await;
(obj_deleted, sz) = item.apply_actions(info, &mut size_s).await;
done();
if obj_deleted {

View File

@@ -19,9 +19,9 @@ use super::{
heal_commands::{HealScanMode, HEAL_DEEP_SCAN, HEAL_NORMAL_SCAN},
};
use crate::{
bucket::{versioning::VersioningApi, versioning_sys::BucketVersioningSys},
heal::data_usage::DATA_USAGE_ROOT,
bucket::{metadata_sys, versioning::VersioningApi, versioning_sys::BucketVersioningSys}, cmd::bucket_replication::{ReplicationStatusType}, heal::data_usage::DATA_USAGE_ROOT
};
use crate::cmd::bucket_replication::queue_replication_heal;
use crate::{
cache_value::metacache_set::{list_path_raw, ListPathRawOptions},
config::{
@@ -62,7 +62,7 @@ use tokio::{
},
time::sleep,
};
use tracing::{error, info};
use tracing::{error, info, debug};
const DATA_SCANNER_SLEEP_PER_FOLDER: Duration = Duration::from_millis(1); // Time to wait between folders.
const DATA_USAGE_UPDATE_DIR_CYCLES: u32 = 16; // Visit all folders every n cycles.
@@ -550,13 +550,107 @@ impl ScannerItem {
Ok(object_infos)
}
pub async fn apply_actions(&self, oi: &ObjectInfo, _size_s: &SizeSummary) -> (bool, usize) {
pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, usize) {
let done = ScannerMetrics::time(ScannerMetric::Ilm);
//todo: lifecycle
info!("apply_actions {} {} {:?} {:?}", oi.bucket.clone(), oi.name.clone(), oi.version_id.clone(), oi.user_defined.clone());
// Create a mutable clone if you need to modify fields
let mut oi = oi.clone();
oi.replication_status = ReplicationStatusType::from(
oi.user_defined
.as_ref()
.and_then(|map| map.get("x-amz-bucket-replication-status"))
.unwrap_or(&"PENDING".to_string())
);
info!("apply status is: {:?}", oi.replication_status);
self.heal_replication(&oi, _size_s).await;
done();
(false, oi.size)
}
pub async fn heal_replication(&mut self, oi: &ObjectInfo, size_s: &mut SizeSummary) {
if oi.version_id.is_none() {
error!("heal_replication: no version_id or replication config {} {} {}", oi.bucket, oi.name, oi.version_id.is_none());
return;
}
//let config = s3s::dto::ReplicationConfiguration{ role: todo!(), rules: todo!() };
// Use the provided variable instead of borrowing self mutably.
let replication = match metadata_sys::get_replication_config(&oi.bucket).await {
Ok((replication, _)) => replication,
Err(_) => {
error!("heal_replication: failed to get replication config for bucket {} {}", oi.bucket, oi.name);
return;
}
};
if replication.rules.is_empty() {
error!("heal_replication: no replication rules for bucket {} {}", oi.bucket, oi.name);
return;
}
if replication.role.is_empty() {
// error!("heal_replication: no replication role for bucket {} {}", oi.bucket, oi.name);
// return;
}
//if oi.delete_marker || !oi.version_purge_status.is_empty() {
if oi.delete_marker {
error!("heal_replication: delete marker or version purge status {} {} {:?} {} {:?}", oi.bucket, oi.name, oi.version_id, oi.delete_marker, oi.version_purge_status);
return;
}
if oi.replication_status == ReplicationStatusType::Completed {
return;
}
info!("replication status is: {:?} and user define {:?}", oi.replication_status, oi.user_defined);
let roi = queue_replication_heal(&oi.bucket, oi, &replication, 3).await;
if roi.is_none() {
info!("not need heal {} {} {:?}", oi.bucket, oi.name, oi.version_id);
return;
}
for (arn, tgt_status) in &roi.unwrap().target_statuses {
let tgt_size_s = size_s
.repl_target_stats
.entry(arn.clone())
.or_default();
match tgt_status {
ReplicationStatusType::Pending => {
tgt_size_s.pending_count += 1;
tgt_size_s.pending_size += oi.size;
size_s.pending_count += 1;
size_s.pending_size += oi.size;
}
ReplicationStatusType::Failed => {
tgt_size_s.failed_count += 1;
tgt_size_s.failed_size += oi.size;
size_s.failed_count += 1;
size_s.failed_size += oi.size;
}
ReplicationStatusType::Completed | ReplicationStatusType::CompletedLegacy => {
tgt_size_s.replicated_count += 1;
tgt_size_s.replicated_size += oi.size;
size_s.replicated_count += 1;
size_s.replicated_size += oi.size;
}
_ => {}
}
}
if matches!(oi.replication_status, ReplicationStatusType::Replica) {
size_s.replica_count += 1;
size_s.replica_size += oi.size;
}
}
}
#[derive(Debug, Default)]

View File

@@ -32,6 +32,7 @@ pub mod store_list_objects;
mod store_utils;
pub mod utils;
pub mod xhttp;
pub mod cmd;
pub use global::new_object_layer_fn;
pub use global::set_global_endpoints;

View File

@@ -1,3 +1,17 @@
use async_trait::async_trait;
use futures::future::join_all;
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
use protos::node_service_time_out_client;
use protos::proto_gen::node_service::{
DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest,
};
use regex::Regex;
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tokio::sync::RwLock;
use tonic::Request;
use tracing::info;
use crate::bucket::metadata_sys;
use crate::disk::error::is_all_buckets_not_found;
use crate::disk::{DiskAPI, DiskStore};
use crate::error::clone_err;
@@ -15,19 +29,7 @@ use crate::{
endpoints::{EndpointServerPools, Node},
store_api::{BucketInfo, BucketOptions, DeleteBucketOptions, MakeBucketOptions},
};
use async_trait::async_trait;
use common::error::{Error, Result};
use futures::future::join_all;
use madmin::heal_commands::{HealDriveInfo, HealResultItem};
use protos::node_service_time_out_client;
use protos::proto_gen::node_service::{
DeleteBucketRequest, GetBucketInfoRequest, HealBucketRequest, ListBucketRequest, MakeBucketRequest,
};
use regex::Regex;
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tokio::sync::RwLock;
use tonic::Request;
use tracing::info;
type Client = Arc<Box<dyn PeerS3Client>>;
@@ -430,14 +432,17 @@ impl PeerS3Client for LocalPeerS3Client {
}
// TODO: reduceWriteQuorumErrs
// debug!("get_bucket_info errs:{:?}", errs);
let mut versioned = false;
if let Ok(sys) = metadata_sys::get(bucket).await {
versioned = sys.versioning();
}
ress.iter()
.find_map(|op| {
op.as_ref().map(|v| BucketInfo {
name: v.name.clone(),
created: v.created,
versionning: versioned,
..Default::default()
})
})
@@ -496,14 +501,17 @@ pub struct RemotePeerS3Client {
}
impl RemotePeerS3Client {
fn new(node: Option<Node>, pools: Option<Vec<usize>>) -> Self {
pub fn new(node: Option<Node>, pools: Option<Vec<usize>>) -> Self {
let addr = node.as_ref().map(|v| v.url.to_string()).unwrap_or_default().to_string();
Self { node, pools, addr }
}
pub fn get_addr(&self)->String {
return self.addr.clone();
}
}
#[async_trait]
impl PeerS3Client for RemotePeerS3Client {
impl PeerS3Client for RemotePeerS3Client {
fn get_pools(&self) -> Option<Vec<usize>> {
self.pools.clone()
}

View File

@@ -1374,7 +1374,7 @@ impl StorageAPI for ECStore {
info.versionning = sys.versioning();
info.object_locking = sys.object_locking();
}
Ok(info)
}
#[tracing::instrument(skip(self))]

View File

@@ -1,3 +1,4 @@
use crate::cmd::bucket_replication::{ReplicationStatusType, VersionPurgeStatusType};
use crate::heal::heal_ops::HealSequence;
use crate::io::FileReader;
use crate::store_utils::clean_metadata;
@@ -175,7 +176,6 @@ impl FileInfo {
let content_type = meta.get("content-type").cloned();
let content_encoding = meta.get("content-encoding").cloned();
let etag = meta.get("etag").cloned();
(content_type, content_encoding, etag)
} else {
(None, None, None)
@@ -686,6 +686,12 @@ pub struct ObjectInfo {
pub inlined: bool,
pub metadata_only: bool,
pub version_only: bool,
pub replication_status_internal:String,
pub replication_status:ReplicationStatusType,
pub version_purge_status_internal: String,
pub version_purge_status: VersionPurgeStatusType,
pub checksum:Vec<u8>,
}
impl Clone for ObjectInfo {
@@ -714,8 +720,14 @@ impl Clone for ObjectInfo {
inlined: self.inlined,
metadata_only: self.metadata_only,
version_only: self.version_only,
replication_status_internal: self.replication_status_internal.clone(),
replication_status: self.replication_status.clone(),
version_purge_status_internal: self.version_purge_status_internal.clone(),
version_purge_status: self.version_purge_status.clone(),
checksum:Default::default(),
}
}
}
impl ObjectInfo {

View File

@@ -1,3 +1,4 @@
pub const AMZ_OBJECT_TAGGING: &str = "X-Amz-Tagging";
pub const AMZ_BUCKET_REPLICATION_STATUS: &str = "X-Amz-Replication-Status";
pub const AMZ_STORAGE_CLASS: &str = "x-amz-storage-class";
pub const AMZ_DECODED_CONTENT_LENGTH: &str = "X-Amz-Decoded-Content-Length";

View File

@@ -92,6 +92,26 @@ libsystemd.workspace = true
[target.'cfg(all(target_os = "linux", target_env = "gnu"))'.dependencies]
tikv-jemallocator = "0.6"
uuid = "1.12.1"
url.workspace = true
#admin = { path = "../api/admin" }
axum.workspace = true
matchit = "0.8.3"
shadow-rs = "0.38.0"
const-str = { version = "0.6.1", features = ["std", "proc"] }
atoi = "2.0.0"
serde_urlencoded = "0.7.1"
crypto = { path = "../crypto" }
iam = { path = "../iam" }
jsonwebtoken = "9.3.0"
tower-http = { version = "0.6.2", features = ["cors"] }
chrono.workspace = true
thiserror.workspace = true
regex = "1.11.1"
aws-sdk-s3 = "1.29.0"
include_dir = "0.7.4"
percent-encoding = "2.3.1"
urlencoding = "2.1.3"
[build-dependencies]
prost-build.workspace = true

View File

@@ -6,6 +6,11 @@ use bytes::Bytes;
use common::error::Error as ec_Error;
use ecstore::admin_server_info::get_server_info;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::bucket::metadata_sys::{self, get_replication_config};
use ecstore::bucket::target::{BucketTarget};
use ecstore::cmd::bucket_targets::{self, GLOBAL_Bucket_Target_Sys};
//use ecstore::error::Error as ec_Error;
use crate::storage::error::to_s3_error;
use ecstore::global::GLOBAL_ALlHealState;
use ecstore::heal::data_usage::load_data_usage_from_backend;
use ecstore::heal::heal_commands::HealOpts;
@@ -16,13 +21,14 @@ use ecstore::peer::is_reserved_or_invalid_bucket;
use ecstore::pools::{get_total_usable_capacity, get_total_usable_capacity_free};
use ecstore::store::is_valid_object_prefix;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::StorageAPI;
use ecstore::store_api::{StorageAPI};
use ecstore::utils::path::path_join;
use futures::{Stream, StreamExt};
use http::{HeaderMap, Uri};
use hyper::StatusCode;
use iam::get_global_action_cred;
use iam::store::MappedPolicy;
// use lazy_static::lazy_static;
use madmin::metrics::RealtimeMetrics;
use madmin::utils::parse_duration;
use matchit::Params;
@@ -31,11 +37,13 @@ use policy::policy::action::S3Action;
use policy::policy::default::DEFAULT_POLICIES;
use policy::policy::Args;
use policy::policy::BucketPolicy;
use percent_encoding::{percent_encode, AsciiSet, CONTROLS};
use s3s::header::CONTENT_TYPE;
use s3s::stream::{ByteStream, DynByteStream};
use s3s::{s3_error, Body, S3Error, S3Request, S3Response, S3Result};
use s3s::{S3ErrorCode, StdError};
use serde::{Deserialize, Serialize};
// use serde_json::to_vec;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::pin::Pin;
@@ -47,6 +55,7 @@ use tokio::time::interval;
use tokio::{select, spawn};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, info, warn};
// use url::UrlQuery;
pub mod event;
pub mod group;
@@ -57,6 +66,7 @@ pub mod service_account;
pub mod sts;
pub mod trace;
pub mod user;
use urlencoding::decode;
#[derive(Debug, Serialize, Default)]
#[serde(rename_all = "PascalCase", default)]
@@ -745,6 +755,278 @@ impl Operation for BackgroundHealStatusHandler {
}
}
fn extract_query_params(uri: &Uri) -> HashMap<String, String> {
let mut params = HashMap::new();
if let Some(query) = uri.query() {
query.split('&').for_each(|pair| {
if let Some((key, value)) = pair.split_once('=') {
params.insert(key.to_string(), value.to_string());
}
});
}
params
}
//disable encrypto from client because minio use len 8 Nonce but rustfs use 12 len Nonce
#[allow(dead_code)]
fn is_local_host(_host: String) -> bool {
return false;
}
//awscurl --service s3 --region us-east-1 --access_key rustfsadmin --secret_key rustfsadmin "http://:9000/minio/admin/v3/replicationmetrics?bucket=1"
pub struct GetReplicationMetricsHandler {}
#[async_trait::async_trait]
impl Operation for GetReplicationMetricsHandler {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
error!("GetReplicationMetricsHandler");
let querys = extract_query_params(&_req.uri);
if let Some(bucket) = querys.get("bucket") {
error!("get bucket:{} metris", bucket);
}
//return Err(s3_error!(InvalidArgument, "Invalid bucket name"));
//Ok(S3Response::with_headers((StatusCode::OK, Body::from()), header))
return Ok(S3Response::new((StatusCode::OK, Body::from("Ok".to_string()))));
}
}
pub struct SetRemoteTargetHandler {}
#[async_trait::async_trait]
impl Operation for SetRemoteTargetHandler {
async fn call(&self, mut _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
//return Ok(S3Response::new((StatusCode::OK, Body::from("OK".to_string()))));
// println!("handle MetricsHandler, params: {:?}", _req.input);
info!("handle MetricsHandler, params: {:?}", _req.credentials);
let querys = extract_query_params(&_req.uri);
let Some(_cred) = _req.credentials else {
error!("credentials null");
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
let _is_owner = true; // 先按true处理后期根据请求决定
let body = _req.input.store_all_unlimited().await.unwrap();
//println!("body: {}", std::str::from_utf8(&body.clone()).unwrap());
//println!("bucket is:{}", bucket.clone());
if let Some(bucket) = querys.get("bucket") {
if bucket.is_empty() {
println!("have bucket: {}", bucket);
return Ok(S3Response::new((StatusCode::OK, Body::from("fuck".to_string()))));
}
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
// let binfo:BucketInfo = store
// .get_bucket_info(bucket, &ecstore::store_api::BucketOptions::default()).await;
match store
.get_bucket_info(bucket, &ecstore::store_api::BucketOptions::default())
.await
{
Ok(info) => {
println!("Bucket Info: {:?}", info);
if !info.versionning {
return Ok(S3Response::new((StatusCode::FORBIDDEN, Body::from("bucket need versioned".to_string()))));
}
}
Err(err) => {
eprintln!("Error: {:?}", err);
return Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::from("empty bucket".to_string()))));
}
}
let mut remote_target: BucketTarget = serde_json::from_slice(&body).map_err(|arg0| to_s3_error(arg0.into()))?; // 错误会被传播
remote_target.source_bucket = bucket.clone();
info!("remote target {} And arn is:", remote_target.source_bucket.clone());
if let Some(val) = remote_target.arn.clone() {
info!("arn is {}", val);
}
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
let (arn, exist) = sys.get_remote_arn(&bucket, Some(&remote_target), "").await;
info!("exist: {} {}", exist, arn.clone().unwrap_or_default());
if exist && arn.is_some() {
let jsonarn = serde_json::to_string(&arn).expect("failed to serialize");
//Ok(S3Response::new)
return Ok(S3Response::new((StatusCode::OK, Body::from(jsonarn))));
} else {
remote_target.arn = arn;
match sys.set_target(bucket, &remote_target, false, false).await {
Ok(_) => {
{
//todo 各种持久化的工作
let targets = sys.list_targets(Some(&bucket), None).await;
info!("targets is {}", targets.len());
match serde_json::to_vec(&targets) {
Ok(json) => {
//println!("json is:{:?}", json.clone().to_ascii_lowercase());
//metadata_sys::GLOBAL_BucketMetadataSys::
//BUCKET_TARGETS_FILE: &str = "bucket-targets.json"
let _ = metadata_sys::update(bucket, "bucket-targets.json", json).await;
// if let Err(err) = metadata_sys::GLOBAL_BucketMetadataSys.get().
// .update(ctx, bucket, "bucketTargetsFile", tgt_bytes)
// .await
// {
// write_error_response(ctx, &err)?;
// return Err(err);
// }
}
Err(e) => {
error!("序列化失败{}", e);
}
}
}
let jsonarn = serde_json::to_string(&remote_target.arn.clone()).expect("failed to serialize");
return Ok(S3Response::new((StatusCode::OK, Body::from(jsonarn))));
}
Err(e) => {
error!("set target error {}", e);
return Ok(S3Response::new((
StatusCode::BAD_REQUEST,
Body::from("remote target not ready".to_string()),
)));
}
}
}
} else {
error!("GLOBAL_BUCKET _TARGET_SYS is not initialized");
return Err(S3Error::with_message(
S3ErrorCode::InternalError,
"GLOBAL_BUCKET_TARGET_SYS is not initialized".to_string(),
));
}
}
// return Err(s3_error!(InvalidArgument));
return Ok(S3Response::new((StatusCode::OK, Body::from("Ok".to_string()))));
}
}
pub struct ListRemoteTargetHandler {}
#[async_trait::async_trait]
impl Operation for ListRemoteTargetHandler {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("list GetRemoteTargetHandler, params: {:?}", _req.credentials);
let querys = extract_query_params(&_req.uri);
let Some(_cred) = _req.credentials else {
error!("credentials null");
return Err(s3_error!(InvalidRequest, "get cred failed"));
};
if let Some(bucket) = querys.get("bucket") {
if bucket.is_empty() {
error!("bucket parameter is empty");
return Ok(S3Response::new((
StatusCode::BAD_REQUEST,
Body::from("Bucket parameter is required".to_string()),
)));
}
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not initialized".to_string()));
};
match store
.get_bucket_info(bucket, &ecstore::store_api::BucketOptions::default())
.await
{
Ok(info) => {
println!("Bucket Info: {:?}", info);
if !info.versionning {
return Ok(S3Response::new((
StatusCode::FORBIDDEN,
Body::from("Bucket needs versioning".to_string()),
)));
}
}
Err(err) => {
eprintln!("Error fetching bucket info: {:?}", err);
return Ok(S3Response::new((StatusCode::BAD_REQUEST, Body::from("Invalid bucket".to_string()))));
}
}
if let Some(sys) = GLOBAL_Bucket_Target_Sys.get() {
let targets = sys.list_targets(Some(&bucket), None).await;
error!("target sys len {}", targets.len());
if targets.is_empty() {
return Ok(S3Response::new((
StatusCode::NOT_FOUND,
Body::from("No remote targets found".to_string()),
)));
}
let json_targets = serde_json::to_string(&targets).map_err(|e| {
error!("Serialization error: {}", e);
S3Error::with_message(S3ErrorCode::InternalError, "Failed to serialize targets".to_string())
})?;
return Ok(S3Response::new((StatusCode::OK, Body::from(json_targets))));
} else {
println!("GLOBAL_BUCKET_TARGET_SYS is not initialized");
return Err(S3Error::with_message(
S3ErrorCode::InternalError,
"GLOBAL_BUCKET_TARGET_SYS is not initialized".to_string(),
));
}
}
println!("Bucket parameter missing in request");
Ok(S3Response::new((
StatusCode::BAD_REQUEST,
Body::from("Bucket parameter is required".to_string()),
)))
//return Err(s3_error!(NotImplemented));
}
}
const COLON: AsciiSet = CONTROLS.add(b':');
pub struct RemoveRemoteTargetHandler {}
#[async_trait::async_trait]
impl Operation for RemoveRemoteTargetHandler {
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
error!("remove remote target called");
let querys = extract_query_params(&_req.uri);
if let Some(arnstr) = querys.get("arn") {
if let Some(bucket) = querys.get("bucket") {
if bucket.is_empty() {
error!("bucket parameter is empty");
return Ok(S3Response::new((StatusCode::NOT_FOUND, Body::from("bucket not found".to_string()))));
}
let _arn = bucket_targets::ARN::parse(&arnstr);
match get_replication_config(&bucket).await {
Ok((conf, _ts)) => {
for ru in conf.rules {
let encoded = percent_encode(ru.destination.bucket.as_bytes(), &COLON);
let encoded_str = encoded.to_string();
if *arnstr == encoded_str {
error!("target in use");
return Ok(S3Response::new((StatusCode::FORBIDDEN, Body::from("Ok".to_string()))));
}
info!("bucket: {} and arn str is {} ", encoded_str, arnstr);
}
}
Err(err) => {
error!("get replication config err: {}", err);
return Ok(S3Response::new((StatusCode::NOT_FOUND, Body::from(err.to_string()))));
}
}
//percent_decode_str(&arnstr);
let decoded_str = decode(arnstr).unwrap();
error!("need delete target is {}" , decoded_str);
bucket_targets::remove_bucket_target(bucket, arnstr).await;
}
}
//return Err(s3_error!(InvalidArgument, "Invalid bucket name"));
//Ok(S3Response::with_headers((StatusCode::OK, Body::from()), header))
return Ok(S3Response::new((StatusCode::OK, Body::from("Ok".to_string()))));
}
}
#[cfg(test)]
mod test {
use ecstore::heal::heal_commands::HealOpts;

View File

@@ -11,12 +11,14 @@ use handlers::{
sts, user,
};
use handlers::{ListRemoteTargetHandler, SetRemoteTargetHandler, RemoveRemoteTargetHandler, GetReplicationMetricsHandler};
use hyper::Method;
use router::{AdminOperation, S3Router};
use rpc::regist_rpc_route;
use s3s::route::S3Route;
const ADMIN_PREFIX: &str = "/rustfs/admin";
const MINIO_ADMIN_PREFIX: &str = "/minio/admin";
pub fn make_admin_route() -> Result<impl S3Route> {
let mut r: S3Router<AdminOperation> = S3Router::new();
@@ -228,6 +230,55 @@ fn register_user_route(r: &mut S3Router<AdminOperation>) -> Result<()> {
AdminOperation(&AddServiceAccount {}),
)?;
r.insert(
Method::GET,
format!("{}{}", MINIO_ADMIN_PREFIX, "/v3/list-remote-targets").as_str(),
AdminOperation(&ListRemoteTargetHandler {}),
)?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/list-remote-targets").as_str(),
AdminOperation(&ListRemoteTargetHandler {}),
)?;
r.insert(
Method::GET,
format!("{}{}", MINIO_ADMIN_PREFIX, "/v3/replicationmetrics").as_str(),
AdminOperation(&GetReplicationMetricsHandler {}),
)?;
r.insert(
Method::GET,
format!("{}{}", ADMIN_PREFIX, "/v3/replicationmetrics").as_str(),
AdminOperation(&GetReplicationMetricsHandler {}),
)?;
r.insert(
Method::PUT,
format!("{}{}", MINIO_ADMIN_PREFIX, "/v3/set-remote-target").as_str(),
AdminOperation(&SetRemoteTargetHandler {}),
)?;
r.insert(
Method::PUT,
format!("{}{}", ADMIN_PREFIX, "/v3/set-remote-target").as_str(),
AdminOperation(&SetRemoteTargetHandler {}),
)?;
r.insert(
Method::DELETE,
format!("{}{}", MINIO_ADMIN_PREFIX, "/v3/remove-remote-target").as_str(),
AdminOperation(&RemoveRemoteTargetHandler {}),
)?;
r.insert(
Method::DELETE,
format!("{}{}", ADMIN_PREFIX, "/v3/remove-remote-target").as_str(),
AdminOperation(&RemoveRemoteTargetHandler {}),
)?;
// list-canned-policies?bucket=xxx
r.insert(
Method::GET,

View File

@@ -16,6 +16,7 @@ use s3s::S3Result;
use super::rpc::RPC_PREFIX;
use super::ADMIN_PREFIX;
use super::MINIO_ADMIN_PREFIX;
pub struct S3Router<T> {
router: Router<T>,
@@ -64,7 +65,7 @@ where
}
}
uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX)
uri.path().starts_with(ADMIN_PREFIX) || uri.path().starts_with(RPC_PREFIX) || uri.path().starts_with(MINIO_ADMIN_PREFIX)
}
async fn call(&self, req: S3Request<Body>) -> S3Result<S3Response<Body>> {

View File

@@ -240,6 +240,8 @@ impl Node for NodeService {
}));
}
};
println!("bucket info {}", bucket_info.clone());
Ok(tonic::Response::new(GetBucketInfoResponse {
success: true,
bucket_info,
@@ -247,6 +249,8 @@ impl Node for NodeService {
}))
}
// println!("vuc")
Err(err) => Ok(tonic::Response::new(GetBucketInfoResponse {
success: false,
bucket_info: String::new(),

View File

@@ -17,6 +17,7 @@ use crate::server::{wait_for_shutdown, ServiceState, ServiceStateManager, Shutdo
use bytes::Bytes;
use chrono::Datelike;
use clap::Parser;
use ecstore::cmd::bucket_replication::init_bucket_replication_pool;
use common::{
error::{Error, Result},
globals::set_global_addr,
@@ -534,6 +535,11 @@ async fn run(opt: config::Opt) -> Result<()> {
init_console_cfg(local_ip, server_port);
print_server_info();
init_bucket_replication_pool().await;
init_console_cfg(local_ip, server_port);
print_server_info();
if opt.console_enable {

View File

@@ -4,14 +4,12 @@ use super::options::extract_metadata;
use super::options::put_opts;
use crate::auth::get_condition_values;
use crate::storage::access::ReqInfo;
use crate::storage::error::to_s3_error;
use crate::storage::options::copy_dst_opts;
use crate::storage::options::copy_src_opts;
use crate::storage::options::{extract_metadata_from_mime, get_opts};
use api::query::Context;
use api::query::Query;
use api::server::dbms::DatabaseManagerSystem;
use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use common::error::Result;
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
use datafusion::arrow::json::writer::JsonArray;
@@ -31,6 +29,9 @@ use ecstore::bucket::tagging::decode_tags;
use ecstore::bucket::tagging::encode_tags;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::io::READ_BUFFER_SIZE;
use ecstore::cmd::bucket_replication::get_must_replicate_options;
use ecstore::cmd::bucket_replication::must_replicate;
use ecstore::cmd::bucket_replication::schedule_replication;
use ecstore::new_object_layer_fn;
use ecstore::store_api::BucketOptions;
use ecstore::store_api::CompletePart;
@@ -43,6 +44,7 @@ use ecstore::store_api::ObjectOptions;
use ecstore::store_api::ObjectToDelete;
use ecstore::store_api::PutObjReader;
use ecstore::store_api::StorageAPI;
// use ecstore::store_api::RESERVED_METADATA_PREFIX;
use ecstore::store_api::RESERVED_METADATA_PREFIX_LOWER;
use ecstore::utils::path::path_join_buf;
use ecstore::utils::xml;
@@ -51,6 +53,7 @@ use futures::pin_mut;
use futures::{Stream, StreamExt};
use http::HeaderMap;
use lazy_static::lazy_static;
use tracing::debug;
use policy::auth;
use policy::policy::action::Action;
use policy::policy::action::S3Action;
@@ -78,13 +81,19 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_tar::Archive;
use tokio_util::io::ReaderStream;
use tokio_util::io::StreamReader;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
use transform_stream::AsyncTryStream;
use uuid::Uuid;
use crate::storage::error::to_s3_error;
use crate::storage::options::copy_dst_opts;
use crate::storage::options::copy_src_opts;
use crate::storage::options::{extract_metadata_from_mime, get_opts};
use ecstore::cmd::bucket_replication::ReplicationStatusType;
use ecstore::cmd::bucket_replication::ReplicationType;
macro_rules! try_ {
($result:expr) => {
match $result {
@@ -924,6 +933,7 @@ impl S3 for FS {
return self.put_object_extract(req).await;
}
info!("put object");
let input = req.input;
if let Some(ref storage_class) = input.storage_class {
@@ -976,10 +986,29 @@ impl S3 for FS {
metadata.insert(xhttp::AMZ_OBJECT_TAGGING.to_owned(), tags);
}
let opts: ObjectOptions = put_opts(&bucket, &key, version_id, &req.headers, Some(metadata))
let mt = metadata.clone();
let mt2 = metadata.clone();
let opts: ObjectOptions = put_opts(&bucket, &key, version_id, &req.headers, Some(mt))
.await
.map_err(to_s3_error)?;
let repoptions = get_must_replicate_options(&mt2, "", ReplicationStatusType::Unknown, ReplicationType::ObjectReplicationType , &opts);
let dsc = must_replicate(&bucket, &key, &repoptions).await;
warn!("dsc {}", &dsc.replicate_any().clone());
if dsc.replicate_any() {
let k = format!("{}{}", RESERVED_METADATA_PREFIX_LOWER, "replication-timestamp");
let now: DateTime<Utc> = Utc::now();
let formatted_time = now.to_rfc3339();
metadata.insert(k, formatted_time);
let k = format!("{}{}", RESERVED_METADATA_PREFIX_LOWER, "replication-status");
metadata.insert(k, dsc.pending_status());
}
debug!("put_object opts {:?}", &opts);
let obj_info = store
@@ -987,9 +1016,17 @@ impl S3 for FS {
.await
.map_err(to_s3_error)?;
let e_tag = obj_info.etag;
let e_tag = obj_info.etag.clone();
// store.put_object(bucket, object, data, opts);
let repoptions = get_must_replicate_options(&mt2, "", ReplicationStatusType::Unknown, ReplicationType::ObjectReplicationType , &opts);
let dsc = must_replicate(&bucket, &key, &repoptions).await;
if dsc.replicate_any() {
let objectlayer = new_object_layer_fn();
schedule_replication(obj_info, objectlayer.unwrap(), dsc, 1).await;
}
let output = PutObjectOutput {
e_tag,
@@ -1152,17 +1189,30 @@ impl S3 for FS {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let oi = store
let obj_info = store
.complete_multipart_upload(&bucket, &key, &upload_id, uploaded_parts, opts)
.await
.map_err(to_s3_error)?;
let output = CompleteMultipartUploadOutput {
bucket: Some(bucket),
key: Some(key),
e_tag: oi.etag,
bucket: Some(bucket.clone()),
key: Some(key.clone()),
e_tag:obj_info.etag.clone(),
location: Some("us-east-1".to_string()),
..Default::default()
};
let mt2 = HashMap::new();
let repoptions = get_must_replicate_options(&mt2, "", ReplicationStatusType::Unknown, ReplicationType::ObjectReplicationType , &opts);
let dsc = must_replicate(&bucket, &key, &repoptions).await;
if dsc.replicate_any() {
warn!("need multipart replication");
let objectlayer = new_object_layer_fn();
schedule_replication(obj_info, objectlayer.unwrap(), dsc, 1).await;
}
Ok(S3Response::new(output))
}
@@ -1725,17 +1775,40 @@ impl S3 for FS {
.await
.map_err(to_s3_error)?;
let replication_configuration = match metadata_sys::get_replication_config(&bucket).await {
let rcfg = match metadata_sys::get_replication_config(&bucket).await {
Ok((cfg, _created)) => Some(cfg),
Err(err) => {
warn!("get_object_lock_config err {:?}", err);
None
error!("get_replication_config err {:?}", err);
return Err(to_s3_error(err));
}
};
Ok(S3Response::new(GetBucketReplicationOutput {
replication_configuration,
}))
if let None = rcfg {
return Err(S3Error::with_message(S3ErrorCode::NoSuchBucket, "replication not found".to_string()));
}
// Ok(S3Response::new(GetBucketReplicationOutput {
// replication_configuration: rcfg,
// }))
if rcfg.is_some() {
Ok(S3Response::new(GetBucketReplicationOutput {
replication_configuration: rcfg,
}))
} else {
let rep = ReplicationConfiguration{
role: "".to_string(),
rules: vec![],
};
Ok(S3Response::new(GetBucketReplicationOutput {
replication_configuration: Some(rep),
}))
}
}
async fn put_bucket_replication(
@@ -1747,6 +1820,7 @@ impl S3 for FS {
replication_configuration,
..
} = req.input;
warn!("put bucket replication");
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
@@ -1786,6 +1860,7 @@ impl S3 for FS {
.map_err(to_s3_error)?;
// TODO: remove targets
error!("delete bucket");
Ok(S3Response::new(DeleteBucketReplicationOutput::default()))
}

View File

@@ -1,5 +1,5 @@
use common::error::Error;
use ecstore::{disk::error::is_err_file_not_found, store_err::StorageError};
use ecstore::{bucket::error::BucketMetadataError, disk::error::is_err_file_not_found, store_err::StorageError};
use s3s::{s3_error, S3Error, S3ErrorCode};
pub fn to_s3_error(err: Error) -> S3Error {
if let Some(storage_err) = err.downcast_ref::<StorageError>() {
@@ -80,6 +80,15 @@ pub fn to_s3_error(err: Error) -> S3Error {
StorageError::DoneForNow => s3_error!(InternalError, "DoneForNow"),
};
}
//需要添加 not found bucket replication config
if let Some(meta_err) = err.downcast_ref::<BucketMetadataError>() {
return match meta_err {
BucketMetadataError::BucketReplicationConfigNotFound => {
S3Error::with_message(S3ErrorCode::ReplicationConfigurationNotFoundError, format!("{}", err))
}
_ => S3Error::with_message(S3ErrorCode::InternalError, format!("{}", err)), // 处理其他情况
};
}
if is_err_file_not_found(&err) {
return S3Error::with_message(S3ErrorCode::NoSuchKey, format!(" ec err {}", err));