fix: replication delete (#1714)

This commit is contained in:
weisd
2026-02-04 13:39:35 +08:00
committed by GitHub
parent a4563f7b41
commit 4d19b069c3
6 changed files with 123 additions and 26 deletions

View File

@@ -28,6 +28,7 @@ use crate::error::{Error, Result, is_err_object_not_found, is_err_version_not_fo
use crate::event::name::EventName;
use crate::event_notification::{EventArgs, send_event};
use crate::global::GLOBAL_LocalNodeName;
use crate::set_disk::get_lock_acquire_timeout;
use crate::store_api::{DeletedObject, ObjectInfo, ObjectOptions, ObjectToDelete, WalkOptions};
use crate::{StorageAPI, new_object_layer_fn};
use aws_sdk_s3::error::SdkError;
@@ -1273,7 +1274,58 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
}
};
//TODO: nslock
let ns_lock = match storage
.new_ns_lock(&bucket, format!("/[replicate]/{}", dobj.delete_object.object_name).as_str())
.await
{
Ok(ns_lock) => ns_lock,
Err(e) => {
warn!(
"failed to get ns lock for bucket:{} object:{} error:{}",
bucket, dobj.delete_object.object_name, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
name: dobj.delete_object.object_name.clone(),
version_id,
delete_marker: dobj.delete_object.delete_marker,
..Default::default()
},
user_agent: "Internal: [Replication]".to_string(),
host: GLOBAL_LocalNodeName.to_string(),
..Default::default()
});
return;
}
};
let _lock_guard = match ns_lock.get_write_lock(get_lock_acquire_timeout()).await {
Ok(lock_guard) => lock_guard,
Err(e) => {
warn!(
"failed to get write lock for bucket:{} object:{} error:{}",
bucket, dobj.delete_object.object_name, e
);
send_event(EventArgs {
event_name: EventName::ObjectReplicationNotTracked.as_ref().to_string(),
bucket_name: bucket.clone(),
object: ObjectInfo {
bucket: bucket.clone(),
name: dobj.delete_object.object_name.clone(),
version_id,
delete_marker: dobj.delete_object.delete_marker,
..Default::default()
},
user_agent: "Internal: [Replication]".to_string(),
host: GLOBAL_LocalNodeName.to_string(),
..Default::default()
});
return;
}
};
// Initialize replicated infos
let mut rinfos = ReplicatedInfos {
@@ -1379,7 +1431,7 @@ pub async fn replicate_delete<S: StorageAPI>(dobj: DeletedObjectReplicationInfo,
dobj.delete_object.version_id.map(|v| v.to_string()),
);
if replication_status != prev_status {
drs.replica_timestamp = Some(OffsetDateTime::now_utc());
drs.replication_timestamp = Some(OffsetDateTime::now_utc());
}
let event_name = if replication_status == ReplicationStatusType::Completed {
@@ -1521,6 +1573,13 @@ async fn replicate_delete_to_target(dobj: &DeletedObjectReplicationInfo, tgt_cli
}
}
if rinfo.replication_status == ReplicationStatusType::Completed
&& !tgt_client.reset_id.is_empty()
&& dobj.op_type == ReplicationType::ExistingObject
{
rinfo.resync_timestamp = format!("{};{}", OffsetDateTime::now_utc().format(&Rfc3339).unwrap(), tgt_client.reset_id);
}
rinfo
}

View File

@@ -2421,11 +2421,12 @@ impl DiskAPI for LocalDisk {
return self.write_metadata("", volume, path, fi).await;
}
return if fi.version_id.is_some() {
Err(DiskError::FileVersionNotFound)
let ret_err = if fi.version_id.is_some() {
DiskError::FileVersionNotFound
} else {
Err(DiskError::FileNotFound)
DiskError::FileNotFound
};
return Err(ret_err);
}
};

View File

@@ -71,7 +71,7 @@ use rustfs_common::heal_channel::{DriveState, HealChannelPriority, HealItemType,
use rustfs_config::MI_B;
use rustfs_filemeta::{
FileInfo, FileMeta, FileMetaShallowVersion, MetaCacheEntries, MetaCacheEntry, MetadataResolutionParams, ObjectPartInfo,
RawFileInfo, ReplicationStatusType, VersionPurgeStatusType, file_info_from_raw, merge_file_meta_versions,
RawFileInfo, ReplicateDecision, ReplicationStatusType, VersionPurgeStatusType, file_info_from_raw, merge_file_meta_versions,
};
use rustfs_lock::LockClient;
use rustfs_lock::fast_lock::types::LockResult;
@@ -122,7 +122,7 @@ const DISK_HEALTH_CACHE_TTL: Duration = Duration::from_millis(750);
/// Get lock acquire timeout from environment variable RUSTFS_LOCK_ACQUIRE_TIMEOUT (in seconds)
/// Defaults to 30 seconds if not set or invalid
fn get_lock_acquire_timeout() -> Duration {
pub fn get_lock_acquire_timeout() -> Duration {
Duration::from_secs(rustfs_utils::get_env_u64("RUSTFS_LOCK_ACQUIRE_TIMEOUT", 5))
}
@@ -2251,23 +2251,42 @@ impl SetDisks {
Ok((fi, parts_metadata, op_online_disks))
}
async fn get_object_info_and_quorum(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<(ObjectInfo, usize)> {
let (fi, _, _) = self.get_object_fileinfo(bucket, object, opts, false).await?;
async fn get_object_info_and_quorum(
&self,
bucket: &str,
object: &str,
opts: &ObjectOptions,
) -> (ObjectInfo, usize, Option<StorageError>) {
let fi = match self.get_object_fileinfo(bucket, object, opts, false).await {
Ok((fi, _, _)) => fi,
Err(e) => return (ObjectInfo::default(), 0, Some(e)),
};
let write_quorum = fi.write_quorum(self.default_write_quorum());
let oi = ObjectInfo::from_file_info(&fi, bucket, object, opts.versioned || opts.version_suspended);
// TODO: replicatio
if !fi.version_purge_status().is_empty() && opts.version_id.is_some() {
return (
oi,
write_quorum,
Some(to_object_err(StorageError::MethodNotAllowed, vec![bucket, object])),
);
}
if fi.deleted {
return if opts.version_id.is_none() || opts.delete_marker {
Err(to_object_err(StorageError::FileNotFound, vec![bucket, object]))
(oi, write_quorum, Some(to_object_err(StorageError::FileNotFound, vec![bucket, object])))
} else {
Err(to_object_err(StorageError::MethodNotAllowed, vec![bucket, object]))
(
oi,
write_quorum,
Some(to_object_err(StorageError::MethodNotAllowed, vec![bucket, object])),
)
};
}
Ok((oi, write_quorum))
(oi, write_quorum, None)
}
#[allow(clippy::too_many_arguments)]
@@ -4461,10 +4480,19 @@ impl StorageAPI for SetDisks {
return Ok(ObjectInfo::default());
}
let (mut goi, write_quorum, gerr) = match self.get_object_info_and_quorum(bucket, object, &opts).await {
Ok((oi, wq)) => (oi, wq, None),
Err(e) => (ObjectInfo::default(), 0, Some(e)),
};
// TODO: Lifecycle
let mut version_found = true;
let (mut goi, write_quorum, gerr) = self.get_object_info_and_quorum(bucket, object, &opts).await;
if let Some(err) = &gerr
&& goi.name.is_empty()
{
if opts.delete_marker {
version_found = false;
} else {
return Err(err.clone());
}
}
let otd = ObjectToDelete {
object_name: object.to_string(),
@@ -4475,9 +4503,16 @@ impl StorageAPI for SetDisks {
..Default::default()
};
let version_found = if opts.delete_marker { gerr.is_none() } else { true };
let dsc = check_replicate_delete(bucket, &otd, &goi, &opts, gerr.map(|e| e.to_string())).await;
let dsc = if opts
.delete_replication
.as_ref()
.map(|v| v.replica_status == ReplicationStatusType::Replica)
== Some(true)
{
ReplicateDecision::default()
} else {
check_replicate_delete(bucket, &otd, &goi, &opts, gerr.map(|e| e.to_string())).await
};
if dsc.replicate_any() {
opts.set_delete_replication_state(dsc);
@@ -4501,11 +4536,11 @@ impl StorageAPI for SetDisks {
mark_delete = false;
}
if opts.version_purge_status() != VersionPurgeStatusType::Complete {
if opts.version_purge_status() == VersionPurgeStatusType::Complete {
mark_delete = false;
}
if version_found && (goi.version_purge_status.is_empty() || !goi.delete_marker) {
if version_found && (!goi.version_purge_status.is_empty() || !goi.delete_marker) {
delete_marker = false;
}
}
@@ -4550,7 +4585,9 @@ impl StorageAPI for SetDisks {
.await
.map_err(|e| to_object_err(e, vec![bucket, object]))?;
return Ok(ObjectInfo::from_file_info(&fi, bucket, object, opts.versioned || opts.version_suspended));
let mut oi = ObjectInfo::from_file_info(&fi, bucket, object, opts.versioned || opts.version_suspended);
oi.replication_decision = goi.replication_decision;
return Ok(oi);
}
let version_id = opts.version_id.as_ref().and_then(|v| Uuid::parse_str(v).ok());

View File

@@ -615,7 +615,7 @@ impl FileMeta {
}
// delete_version deletes version, returns data_dir
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
pub fn delete_version(&mut self, fi: &FileInfo) -> Result<Option<Uuid>> {
let vid = Some(fi.version_id.unwrap_or(Uuid::nil()));

View File

@@ -281,7 +281,7 @@ impl ReplicationState {
return repl_status;
}
}
} else if self.replica_status != ReplicationStatusType::default() {
} else if !self.replica_status.is_empty() {
return self.replica_status.clone();
}

View File

@@ -1642,7 +1642,7 @@ impl S3 for FS {
return Ok(S3Response::with_status(DeleteObjectOutput::default(), StatusCode::NO_CONTENT));
}
if obj_info.replication_status == ReplicationStatusType::Replica
if obj_info.replication_status == ReplicationStatusType::Pending
|| obj_info.version_purge_status == VersionPurgeStatusType::Pending
{
schedule_replication_delete(DeletedObjectReplicationInfo {