refact bucket replication

This commit is contained in:
lygn128
2025-07-01 04:08:02 +00:00
parent 4fa389c047
commit f62db65a39
7 changed files with 27 additions and 35 deletions

View File

@@ -14,7 +14,7 @@ else
fi
# Set CARGO_TARGET_DIR and build the project
CARGO_TARGET_DIR=$TARGET_DIR RUSTFLAGS="-C link-arg=-fuse-ld=mold" cargo build --package rustfs
CARGO_TARGET_DIR=$TARGET_DIR RUSTFLAGS="-C link-arg=-fuse-ld=mold" cargo build --release --package rustfs
echo -e "\a"
echo -e "\a"

View File

@@ -70,7 +70,7 @@ pub struct BucketTarget {
#[serde(rename = "isOnline")]
online: bool,
latency: LatencyStat,
latency: Option<LatencyStat>,
deployment_id: Option<String>,

View File

@@ -1136,19 +1136,6 @@ impl FromStr for ReplicationAction {
// }
// }
// 定义 MinioObjectInfo 结构体
#[derive(Debug)]
pub struct MinioObjectInfo {
pub e_tag: String,
pub version_id: String,
pub size: i64,
pub last_modified: DateTime<Utc>,
pub is_delete_marker: bool,
pub content_type: String,
pub metadata: HashMap<String, Vec<String>>,
pub user_tag_count: usize,
pub user_tags: HashMap<String, String>,
}
// 忽略大小写比较字符串列表
// fn equals(k1: &str, keys: &[&str]) -> bool {
@@ -1551,7 +1538,7 @@ impl ConfigProcess for s3s::dto::ReplicationConfiguration {
if obj.op_type == ReplicationType::DeleteReplicationType {
return if !obj.version_id.is_empty() {
// MinIO 扩展:检查版本化删除
// 扩展:检查版本化删除
if rule.delete_replication.is_none() {
warn!("need replicate failed");
return false;
@@ -1680,8 +1667,8 @@ pub trait TraitForObjectInfo {
fn replication_state(&self) -> ReplicationState;
}
const RESERVED_METADATA_PREFIX: &str = "X-Minio-Internal-";
const RESERVED_METADATA_PREFIX_LOWER: &str = "x-minio-internal-";
const RESERVED_METADATA_PREFIX: &str = "X-Rustfs-Internal-";
const RESERVED_METADATA_PREFIX_LOWER: &str = "x-rustfs-internal-";
lazy_static! {
static ref THROTTLE_DEADLINE: std::time::Duration = std::time::Duration::from_secs(3600);
}
@@ -1695,7 +1682,7 @@ pub const REPLICA_TIMESTAMP: &str = "replica-timestamp";
pub const TAGGING_TIMESTAMP: &str = "tagging-timestamp";
pub const OBJECT_LOCK_RETENTION_TIMESTAMP: &str = "objectlock-retention-timestamp";
pub const OBJECT_LOCK_LEGAL_HOLD_TIMESTAMP: &str = "objectlock-legalhold-timestamp";
pub const REPLICATION_SSEC_CHECKSUM_HEADER: &str = "X-Minio-Replication-Ssec-Crc";
pub const REPLICATION_SSEC_CHECKSUM_HEADER: &str = "X-Rustfs-Replication-Ssec-Crc";
impl TraitForObjectInfo for ObjectInfo {
fn replication_state(&self) -> ReplicationState {
@@ -2118,14 +2105,14 @@ async fn replicate_object_with_multipart(
) -> Result<(), Error> {
let store = new_object_layer_fn().unwrap();
let provider = StaticProvider::new(&tgt_cli.ak, &tgt_cli.sk, None);
let minio_cli = Minio::builder()
let rustfs_cli = Minio::builder()
.endpoint(target_info.endpoint.clone())
.provider(provider)
.secure(false)
.build()
.map_err(|e| Error::other(format!("build minio client failed: {e}")))?;
.map_err(|e| Error::other(format!("build rustfs client failed: {e}")))?;
let ret = minio_cli
let ret = rustfs_cli
.create_multipart_upload_with_versionid(tgt_cli.bucket.clone(), local_obj_info.name.clone(), rep_obj.version_id.clone())
.await;
match ret {
@@ -2135,13 +2122,13 @@ async fn replicate_object_with_multipart(
let version_id = local_obj_info.version_id.expect("missing version_id");
let task = Arc::new(task); // clone safe
let store = Arc::new(store);
let minio_cli = Arc::new(minio_cli);
let rustfs_cli = Arc::new(rustfs_cli);
let mut upload_futures = FuturesUnordered::new();
for (index, _) in local_obj_info.parts.iter().enumerate() {
let store = Arc::clone(&store);
let minio_cli = Arc::clone(&minio_cli);
let rustfs_cli = Arc::clone(&rustfs_cli);
let task = Arc::clone(&task);
let bucket = local_obj_info.bucket.clone();
let name = local_obj_info.name.clone();
@@ -2161,7 +2148,7 @@ async fn replicate_object_with_multipart(
Ok(ret) => {
debug!("readall suc:");
let body = Bytes::from(ret);
match minio_cli.upload_part(&task, index + 1, body).await {
match rustfs_cli.upload_part(&task, index + 1, body).await {
Ok(part) => {
debug!("multipar upload suc:");
Ok((index, part))
@@ -2203,7 +2190,7 @@ async fn replicate_object_with_multipart(
let parts: Vec<_> = part_results.into_iter().flatten().collect();
let ret = minio_cli.complete_multipart_upload(&task, parts, None).await;
let ret = rustfs_cli.complete_multipart_upload(&task, parts, None).await;
match ret {
Ok(res) => {
warn!("finish upload suc:{:?} version_id={:?}", res, local_obj_info.version_id);
@@ -2350,14 +2337,14 @@ impl ReplicateObjectInfo {
match res {
Ok(ret) => {
let body = rustfs_rsc::Data::from(ret);
let minio_cli = Minio::builder()
let rustfs_cli = Minio::builder()
.endpoint(rinfo.endpoint.clone())
.provider(provider)
.secure(false)
.build()
.unwrap();
let ex = minio_cli.executor(Method::PUT);
let ex = rustfs_cli.executor(Method::PUT);
let ret = ex
.bucket_name(target.bucket.clone())
.object_name(self.name.clone())
@@ -2662,7 +2649,7 @@ pub async fn replicate_object(ri: ReplicateObjectInfo, object_api: Arc<store::EC
for rinfo in &rs.targets {
// if !rinfo.resync_timestamp.is_empty() {
// eval_metadata.insert(
// format!("x-minio-replication-reset-status-{}", rinfo.arn),
// format!("x-rustfs-replication-reset-status-{}", rinfo.arn),
// rinfo.resync_timestamp.clone(),
// );
// }

View File

@@ -465,7 +465,7 @@ impl BucketTargetSys {
}
//if tgt.target_type == BucketTargetType::ReplicationService {
// Check if target is a RustFS server and alive
// Check if target is a rustfs server and alive
// let hc_result = tokio::time::timeout(Duration::from_secs(3), client.health_check(&tgt.endpoint)).await;
// match hc_result {
// Ok(Ok(true)) => {} // Server is alive
@@ -758,7 +758,7 @@ fn generate_arn(target: BucketTarget, depl_id: String) -> String {
// }
// }
// // Check if target is a RustFS server and alive
// // Check if target is a rustfs server and alive
// let hc_result = timeout(Duration::from_secs(3), client.health_check(&tgt.endpoint)).await;
// match hc_result {
// Ok(Ok(true)) => {} // Server is alive

View File

@@ -759,7 +759,7 @@ impl ScannerItem {
let replication = match metadata_sys::get_replication_config(&oi.bucket).await {
Ok((replication, _)) => replication,
Err(_) => {
error!("heal_replication: failed to get replication config for bucket {} {}", oi.bucket, oi.name);
error!("heal_replication: failed to get replication config for bucket: {} and object name: {}", oi.bucket, oi.name);
return;
}
};

View File

@@ -796,7 +796,7 @@ impl Operation for SetRemoteTargetHandler {
async fn call(&self, mut _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
//return Ok(S3Response::new((StatusCode::OK, Body::from("OK".to_string()))));
// println!("handle MetricsHandler, params: {:?}", _req.input);
info!("handle MetricsHandler, params: {:?}", _req.credentials);
info!("SetRemoteTargetHandler params: {:?}", _req.credentials);
let querys = extract_query_params(&_req.uri);
let Some(_cred) = _req.credentials else {
error!("credentials null");
@@ -834,7 +834,12 @@ impl Operation for SetRemoteTargetHandler {
}
}
let mut remote_target: BucketTarget = serde_json::from_slice(&body).map_err(ApiError::other)?; // 错误会被传播
tracing::debug!("body is: {}", std::str::from_utf8(&body).unwrap_or("Invalid UTF-8"));
let mut remote_target: BucketTarget = serde_json::from_slice(&body).map_err(|e| {
tracing::error!("Failed to parse BucketTarget from body: {}", e);
ApiError::other(e)
})?;
remote_target.source_bucket = bucket.clone();
info!("remote target {} And arn is:", remote_target.source_bucket.clone());

View File

@@ -2177,13 +2177,13 @@ impl S3 for FS {
let rcfg = match metadata_sys::get_replication_config(&bucket).await {
Ok((cfg, _created)) => Some(cfg),
Err(err) => {
error!("get_replication_config err {:?}", err);
if err == StorageError::ConfigNotFound {
return Err(S3Error::with_message(
S3ErrorCode::ReplicationConfigurationNotFoundError,
"replication not found".to_string(),
));
}
error!("get_replication_config err {:?}", err);
return Err(ApiError::from(err).into());
}
};