fix heal replication

This commit is contained in:
weisd
2025-12-09 17:07:39 +08:00
parent 1768e7bbdb
commit d8f8bfa5b7
2 changed files with 80 additions and 39 deletions

View File

@@ -27,8 +27,14 @@ use rustfs_common::data_usage::{BucketUsageInfo, DataUsageInfo, SizeSummary};
use rustfs_common::metrics::{Metric, Metrics, global_metrics};
use rustfs_ecstore::{
self as ecstore, StorageAPI,
bucket::versioning_sys::BucketVersioningSys,
bucket::{object_lock::objectlock_sys::BucketObjectLockSys, versioning::VersioningApi},
bucket::{
bucket_target_sys::BucketTargetSys,
metadata_sys::get_replication_config,
object_lock::objectlock_sys::BucketObjectLockSys,
replication::{ReplicationConfig, ReplicationConfigurationExt},
versioning::VersioningApi,
versioning_sys::BucketVersioningSys,
},
data_usage::{aggregate_local_snapshots, store_data_usage_in_backend},
disk::{Disk, DiskAPI, DiskStore, RUSTFS_META_BUCKET, WalkDirOptions},
set_disk::SetDisks,
@@ -389,7 +395,10 @@ impl Scanner {
enable_deep_scan: bool,
run_lifecycle: bool,
) -> Result<u64> {
debug!("Listing buckets for lifecycle/replication evaluation");
debug!(
"Listing buckets for lifecycle/replication evaluation, enable_healing: {}, enable_deep_scan: {}, run_lifecycle: {}",
enable_healing, enable_deep_scan, run_lifecycle
);
let buckets = ecstore
.list_bucket(&rustfs_ecstore::store_api::BucketOptions::default())
.await
@@ -401,6 +410,8 @@ impl Scanner {
cfg.replication_pending_grace
};
let bucket_target_sys = BucketTargetSys::get();
for bucket_info in buckets {
let bucket_name = &bucket_info.name;
@@ -467,8 +478,25 @@ impl Scanner {
let object_lock_config = BucketObjectLockSys::get(bucket_name).await;
let replication_metrics =
Some(ReplicationMetricsHandle::new(Arc::clone(&self.metrics), Arc::clone(&self.bucket_metrics)));
let replication_config = {
if let Ok((cfg, _)) = get_replication_config(bucket_name).await {
if cfg.has_active_rules("", true) {
Some(ReplicationConfig::new(
Some(cfg),
bucket_target_sys.list_bucket_targets(bucket_name).await.ok(),
))
} else {
None
}
} else {
None
}
};
let mut scanner_item = ScannerItem::new(
bucket_name.to_string(),
replication_config,
lifecycle_config.clone(),
Some(versioning_config.clone()),
object_lock_config,
@@ -477,7 +505,7 @@ impl Scanner {
);
if let Err(e) = self
.process_bucket_objects_for_lifecycle(ecstore, bucket_name, &mut scanner_item, records)
.process_bucket_objects_for_lifecycle(bucket_name, &mut scanner_item, records)
.await
{
warn!("Failed to process lifecycle/replication actions for bucket {}: {}", bucket_name, e);
@@ -750,7 +778,6 @@ impl Scanner {
/// Process bucket objects for lifecycle actions
async fn process_bucket_objects_for_lifecycle(
&self,
ecstore: &Arc<rustfs_ecstore::store::ECStore>,
bucket_name: &str,
scanner_item: &mut ScannerItem,
records: &[LocalObjectRecord],
@@ -763,14 +790,7 @@ impl Scanner {
continue;
}
let mut object_info = Self::convert_record_to_object_info(record);
if object_info.replication_status.is_empty() && object_info.replication_status_internal.is_none() {
let object_name = object_info.name.clone();
let object_opts = rustfs_ecstore::store_api::ObjectOptions::default();
if let Ok(fresh_info) = ecstore.get_object_info(bucket_name, &object_name, &object_opts).await {
object_info = fresh_info;
}
}
let object_info = Self::convert_record_to_object_info(record);
let mut size_summary = SizeSummary::default();
let (deleted, _size) = scanner_item.apply_actions(&object_info, &mut size_summary).await;
if deleted {
@@ -2405,8 +2425,21 @@ impl Scanner {
let cfg = self.config.read().await;
cfg.replication_pending_grace
};
let replication_config = {
if let Ok((cfg, _)) = get_replication_config(bucket).await {
Some(ReplicationConfig::new(
Some(cfg),
BucketTargetSys::get().list_bucket_targets(bucket).await.ok(),
))
} else {
None
}
};
let mut scanner_item = ScannerItem::new(
bucket.to_string(),
replication_config,
Some(lifecycle_config.clone()),
versioning_config.clone(),
object_lock_config,
@@ -2762,12 +2795,13 @@ impl Scanner {
_ = interval.tick() => {
// Check if scanner should still be running
if !self.state.read().await.is_running {
warn!("Scanner is not running, exiting scanner loop");
break;
}
// check cancel signal
if cancel_token.is_cancelled() {
info!("Cancellation requested, exiting scanner loop");
warn!("Cancellation requested, exiting scanner loop");
break;
}
@@ -2787,13 +2821,13 @@ impl Scanner {
}
}
_ = cancel_token.cancelled() => {
info!("Received cancellation, stopping scanner loop");
warn!("Received cancellation, exiting scanner loop");
break;
}
}
}
info!("Scanner loop stopped");
warn!("Scanner loop stopped");
Ok(())
}

View File

@@ -25,7 +25,7 @@ use rustfs_ecstore::bucket::{
lifecycle,
lifecycle::Lifecycle,
},
metadata_sys::{get_bucket_targets_config, get_object_lock_config, get_replication_config},
metadata_sys::get_object_lock_config,
object_lock::objectlock_sys::{BucketObjectLockSys, enforce_retention_for_deletion},
versioning::VersioningApi,
versioning_sys::BucketVersioningSys,
@@ -58,6 +58,7 @@ static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(102
pub struct ScannerItem {
pub bucket: String,
pub object_name: String,
pub replication: Option<ReplicationConfig>,
pub lifecycle: Option<Arc<LifecycleConfig>>,
pub versioning: Option<Arc<VersioningConfiguration>>,
pub object_lock_config: Option<DefaultRetention>,
@@ -129,6 +130,7 @@ impl ScannerItem {
pub fn new(
bucket: String,
replication: Option<ReplicationConfig>,
lifecycle: Option<Arc<LifecycleConfig>>,
versioning: Option<Arc<VersioningConfiguration>>,
object_lock_config: Option<DefaultRetention>,
@@ -138,6 +140,7 @@ impl ScannerItem {
Self {
bucket,
object_name: "".to_string(),
replication,
lifecycle,
versioning,
object_lock_config,
@@ -377,32 +380,36 @@ impl ScannerItem {
enriched.bucket, enriched.name, enriched.replication_status, enriched.replication_status_internal
);
if !self.needs_replication_heal(&enriched, pending_lagging) {
return Ok(());
}
// if !self.needs_replication_heal(&enriched, pending_lagging) {
// return Ok(());
// }
let replication_cfg = match get_replication_config(&self.bucket).await {
Ok((cfg, _)) => Some(cfg),
Err(err) => {
debug!("heal_replication: failed to fetch replication config for bucket {}: {}", self.bucket, err);
None
}
};
// let replication_cfg = match get_replication_config(&self.bucket).await {
// Ok((cfg, _)) => Some(cfg),
// Err(err) => {
// debug!("heal_replication: failed to fetch replication config for bucket {}: {}", self.bucket, err);
// None
// }
// };
if replication_cfg.is_none() {
return Ok(());
}
// if replication_cfg.is_none() {
// return Ok(());
// }
let bucket_targets = match get_bucket_targets_config(&self.bucket).await {
Ok(targets) => Some(targets),
Err(err) => {
debug!("heal_replication: no bucket targets for bucket {}: {}", self.bucket, err);
None
}
};
// let bucket_targets = match get_bucket_targets_config(&self.bucket).await {
// Ok(targets) => Some(targets),
// Err(err) => {
// debug!("heal_replication: no bucket targets for bucket {}: {}", self.bucket, err);
// None
// }
// };
let replication_cfg = ReplicationConfig::new(replication_cfg, bucket_targets);
if replication_cfg.is_empty() {
// let replication_cfg = ReplicationConfig::new(replication_cfg, bucket_targets);
let replication_cfg = self.replication.clone().unwrap_or_default();
if replication_cfg.config.is_none() && replication_cfg.remotes.is_none() {
debug!("heal_replication: no replication config for {}/{}", enriched.bucket, enriched.name);
return Ok(());
}