Compare commits

...

12 Commits

Author SHA1 Message Date
weisd
00787cbce4 todo 2025-12-10 15:36:52 +08:00
weisd
3ac004510a fix clippy 2025-12-09 17:33:08 +08:00
weisd
d8f8bfa5b7 fix heal replication 2025-12-09 17:07:39 +08:00
weisd
1768e7bbdb Merge branch 'main' into feat/scan 2025-12-09 14:44:08 +08:00
houseme
3326737c01 Merge branch 'main' into feat/scan 2025-12-08 22:46:28 +08:00
weisd
91770ffd1b Merge branch 'main' into feat/scan 2025-12-08 21:25:51 +08:00
weisd
7940b69bf8 Merge branch 'main' into feat/scan 2025-12-08 21:24:36 +08:00
weisd
427d31d09c Merge branch 'main' into feat/scan 2025-12-08 17:31:56 +08:00
weisd
dbdcecb9c5 optimize 2025-12-08 17:26:07 +08:00
houseme
ad34f1b031 Merge branch 'main' into feat/scan 2025-12-07 21:45:36 +08:00
weisd
2a5ccd2211 fix meta bucket check 2025-12-04 17:12:01 +08:00
guojidan
c43166c4c6 Enhance heal and lifecycle integration tests, improve replication han… (#944)
Signed-off-by: junxiang Mu <1948535941@qq.com>
Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: weisd <im@weisd.in>
2025-12-04 16:14:47 +08:00
11 changed files with 2274 additions and 398 deletions

View File

@@ -31,7 +31,7 @@ use tokio::{
time::interval,
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};
/// Priority queue wrapper for heal requests
/// Uses BinaryHeap for priority-based ordering while maintaining FIFO for same-priority items
@@ -418,7 +418,12 @@ impl HealManager {
/// Get statistics
pub async fn get_statistics(&self) -> HealStatistics {
self.statistics.read().await.clone()
let stats = self.statistics.read().await.clone();
debug!(
"HealManager stats snapshot: total_tasks={}, successful_tasks={}, failed_tasks={}, running_tasks={}",
stats.total_tasks, stats.successful_tasks, stats.failed_tasks, stats.running_tasks
);
stats
}
/// Get active task count

File diff suppressed because it is too large Load Diff

View File

@@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::Result;
use crate::{
Result,
scanner::metrics::{BucketMetrics, MetricsCollector},
};
use rustfs_common::data_usage::SizeSummary;
use rustfs_common::metrics::IlmAction;
use rustfs_ecstore::bucket::{
@@ -27,15 +30,26 @@ use rustfs_ecstore::bucket::{
versioning::VersioningApi,
versioning_sys::BucketVersioningSys,
};
use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete};
use rustfs_filemeta::FileInfo;
use s3s::dto::{BucketLifecycleConfiguration as LifecycleConfig, VersioningConfiguration};
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
use rustfs_ecstore::bucket::{
replication::{GLOBAL_REPLICATION_POOL, ReplicationConfig, get_heal_replicate_object_info},
utils::is_meta_bucketname,
};
use time::OffsetDateTime;
use tracing::info;
use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete};
use rustfs_filemeta::{FileInfo, ReplicationStatusType, replication_statuses_map};
use rustfs_utils::http::headers::{AMZ_BUCKET_REPLICATION_STATUS, HeaderExt, VERSION_PURGE_STATUS_KEY};
use s3s::dto::DefaultRetention;
use s3s::dto::{BucketLifecycleConfiguration as LifecycleConfig, VersioningConfiguration};
use std::{
collections::HashMap,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration as StdDuration,
};
use time::{Duration as TimeDuration, OffsetDateTime};
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
static SCANNER_EXCESS_OBJECT_VERSIONS: AtomicU64 = AtomicU64::new(100);
static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(1024 * 1024 * 1024 * 1024); // 1 TB
@@ -44,21 +58,94 @@ static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(102
pub struct ScannerItem {
pub bucket: String,
pub object_name: String,
pub replication: Option<ReplicationConfig>,
pub lifecycle: Option<Arc<LifecycleConfig>>,
pub versioning: Option<Arc<VersioningConfiguration>>,
pub object_lock_config: Option<DefaultRetention>,
pub replication_pending_grace: StdDuration,
pub replication_metrics: Option<ReplicationMetricsHandle>,
}
#[derive(Clone)]
pub struct ReplicationMetricsHandle {
inner: Arc<ReplicationMetricsInner>,
}
struct ReplicationMetricsInner {
metrics: Arc<MetricsCollector>,
bucket_metrics: Arc<Mutex<HashMap<String, BucketMetrics>>>,
}
impl ReplicationMetricsHandle {
pub fn new(metrics: Arc<MetricsCollector>, bucket_metrics: Arc<Mutex<HashMap<String, BucketMetrics>>>) -> Self {
Self {
inner: Arc::new(ReplicationMetricsInner { metrics, bucket_metrics }),
}
}
pub async fn record_status(&self, bucket: &str, status: ReplicationStatusType, lagging: bool) {
match status {
ReplicationStatusType::Pending => self.inner.metrics.increment_replication_pending_objects(1),
ReplicationStatusType::Failed => self.inner.metrics.increment_replication_failed_objects(1),
_ => {}
}
if lagging {
self.inner.metrics.increment_replication_lagging_objects(1);
}
let mut guard = self.inner.bucket_metrics.lock().await;
let entry = guard.entry(bucket.to_string()).or_insert_with(|| BucketMetrics {
bucket: bucket.to_string(),
..Default::default()
});
match status {
ReplicationStatusType::Pending => {
entry.replication_pending = entry.replication_pending.saturating_add(1);
}
ReplicationStatusType::Failed => {
entry.replication_failed = entry.replication_failed.saturating_add(1);
}
_ => {}
}
if lagging {
entry.replication_lagging = entry.replication_lagging.saturating_add(1);
}
}
pub async fn record_task_submission(&self, bucket: &str) {
self.inner.metrics.increment_replication_tasks_queued(1);
let mut guard = self.inner.bucket_metrics.lock().await;
let entry = guard.entry(bucket.to_string()).or_insert_with(|| BucketMetrics {
bucket: bucket.to_string(),
..Default::default()
});
entry.replication_tasks_queued = entry.replication_tasks_queued.saturating_add(1);
}
}
impl ScannerItem {
const INTERNAL_REPLICATION_STATUS_KEY: &'static str = "x-rustfs-internal-replication-status";
pub fn new(
bucket: String,
replication: Option<ReplicationConfig>,
lifecycle: Option<Arc<LifecycleConfig>>,
versioning: Option<Arc<VersioningConfiguration>>,
object_lock_config: Option<DefaultRetention>,
replication_pending_grace: StdDuration,
replication_metrics: Option<ReplicationMetricsHandle>,
) -> Self {
Self {
bucket,
object_name: "".to_string(),
replication,
lifecycle,
versioning,
object_lock_config,
replication_pending_grace,
replication_metrics,
}
}
@@ -164,6 +251,23 @@ impl ScannerItem {
}
pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, i64) {
let object_locked = self.is_object_lock_protected(oi);
if let Err(err) = self.heal_replication(oi).await {
warn!(
"heal_replication failed for {}/{} (version {:?}): {}",
oi.bucket, oi.name, oi.version_id, err
);
}
if object_locked {
info!(
"apply_actions: Skipping lifecycle for {}/{} because object lock retention or legal hold is active",
oi.bucket, oi.name
);
return (false, oi.size);
}
let (action, _size) = self.apply_lifecycle(oi).await;
info!(
@@ -174,16 +278,6 @@ impl ScannerItem {
oi.user_defined.clone()
);
// Create a mutable clone if you need to modify fields
/*let mut oi = oi.clone();
oi.replication_status = ReplicationStatusType::from(
oi.user_defined
.get("x-amz-bucket-replication-status")
.unwrap_or(&"PENDING".to_string()),
);
info!("apply status is: {:?}", oi.replication_status);
self.heal_replication(&oi, _size_s).await;*/
if action.delete_all() {
return (true, 0);
}
@@ -200,7 +294,7 @@ impl ScannerItem {
info!("apply_lifecycle: Lifecycle config exists for object: {}", oi.name);
let (olcfg, rcfg) = if self.bucket != ".minio.sys" {
let (olcfg, rcfg) = if !is_meta_bucketname(&self.bucket) {
(
get_object_lock_config(&self.bucket).await.ok(),
None, // FIXME: replication config
@@ -266,4 +360,202 @@ impl ScannerItem {
(lc_evt.action, new_size)
}
fn is_object_lock_protected(&self, oi: &ObjectInfo) -> bool {
enforce_retention_for_deletion(oi)
}
async fn heal_replication(&self, oi: &ObjectInfo) -> Result<()> {
warn!("heal_replication: healing replication for {}/{}", oi.bucket, oi.name);
warn!("heal_replication: ObjectInfo oi: {:?}", oi);
let enriched = Self::hydrate_replication_metadata(oi);
let pending_lagging = self.is_pending_lagging(&enriched);
if let Some(handle) = &self.replication_metrics {
handle
.record_status(&self.bucket, enriched.replication_status.clone(), pending_lagging)
.await;
}
debug!(
"heal_replication: evaluating {}/{} with status {:?} and internal {:?}",
enriched.bucket, enriched.name, enriched.replication_status, enriched.replication_status_internal
);
// if !self.needs_replication_heal(&enriched, pending_lagging) {
// return Ok(());
// }
// let replication_cfg = match get_replication_config(&self.bucket).await {
// Ok((cfg, _)) => Some(cfg),
// Err(err) => {
// debug!("heal_replication: failed to fetch replication config for bucket {}: {}", self.bucket, err);
// None
// }
// };
// if replication_cfg.is_none() {
// return Ok(());
// }
// let bucket_targets = match get_bucket_targets_config(&self.bucket).await {
// Ok(targets) => Some(targets),
// Err(err) => {
// debug!("heal_replication: no bucket targets for bucket {}: {}", self.bucket, err);
// None
// }
// };
// let replication_cfg = ReplicationConfig::new(replication_cfg, bucket_targets);
let replication_cfg = self.replication.clone().unwrap_or_default();
if replication_cfg.config.is_none() && replication_cfg.remotes.is_none() {
debug!("heal_replication: no replication config for {}/{}", enriched.bucket, enriched.name);
return Ok(());
}
let replicate_info = get_heal_replicate_object_info(&enriched, &replication_cfg).await;
let should_replicate = replicate_info.dsc.replicate_any()
|| matches!(
enriched.replication_status,
ReplicationStatusType::Failed | ReplicationStatusType::Pending
);
if !should_replicate {
debug!("heal_replication: no actionable targets for {}/{}", enriched.bucket, enriched.name);
return Ok(());
}
if let Some(pool) = GLOBAL_REPLICATION_POOL.get() {
pool.queue_replica_task(replicate_info).await;
if let Some(handle) = &self.replication_metrics {
handle.record_task_submission(&self.bucket).await;
}
warn!("heal_replication: queued replication heal task for {}/{}", enriched.bucket, enriched.name);
} else {
warn!(
"heal_replication: GLOBAL_REPLICATION_POOL not initialized, skipping heal for {}/{}",
enriched.bucket, enriched.name
);
}
Ok(())
}
#[allow(dead_code)]
fn needs_replication_heal(&self, oi: &ObjectInfo, pending_lagging: bool) -> bool {
if matches!(oi.replication_status, ReplicationStatusType::Failed) {
return true;
}
if pending_lagging && matches!(oi.replication_status, ReplicationStatusType::Pending) {
return true;
}
if let Some(raw) = oi.replication_status_internal.as_ref() {
let statuses = replication_statuses_map(raw);
if statuses
.values()
.any(|status| matches!(status, ReplicationStatusType::Failed))
{
return true;
}
if pending_lagging
&& statuses
.values()
.any(|status| matches!(status, ReplicationStatusType::Pending))
{
return true;
}
}
false
}
fn hydrate_replication_metadata(oi: &ObjectInfo) -> ObjectInfo {
let mut enriched = oi.clone();
if enriched.replication_status.is_empty() {
if let Some(status) = enriched.user_defined.lookup(AMZ_BUCKET_REPLICATION_STATUS) {
enriched.replication_status = ReplicationStatusType::from(status);
}
}
if enriched.replication_status_internal.is_none() {
if let Some(raw) = enriched.user_defined.lookup(Self::INTERNAL_REPLICATION_STATUS_KEY) {
if !raw.is_empty() {
enriched.replication_status_internal = Some(raw.to_string());
}
}
}
if enriched.version_purge_status_internal.is_none() {
if let Some(raw) = enriched.user_defined.lookup(VERSION_PURGE_STATUS_KEY) {
if !raw.is_empty() {
enriched.version_purge_status_internal = Some(raw.to_string());
}
}
}
enriched
}
fn is_pending_lagging(&self, oi: &ObjectInfo) -> bool {
if !matches!(oi.replication_status, ReplicationStatusType::Pending) {
return false;
}
let Some(mod_time) = oi.mod_time else {
return false;
};
let grace = TimeDuration::try_from(self.replication_pending_grace).unwrap_or_else(|_| {
warn!(
"replication_pending_grace is invalid, using default value: 0 seconds, grace: {:?}",
self.replication_pending_grace
);
TimeDuration::seconds(0)
});
if grace.is_zero() {
return true;
}
let elapsed = OffsetDateTime::now_utc() - mod_time;
elapsed >= grace
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn replication_metrics_handle_tracks_counts() {
let metrics = Arc::new(MetricsCollector::new());
let bucket_metrics = Arc::new(Mutex::new(HashMap::new()));
let handle = ReplicationMetricsHandle::new(metrics.clone(), bucket_metrics.clone());
handle
.record_status("test-bucket", ReplicationStatusType::Pending, true)
.await;
handle
.record_status("test-bucket", ReplicationStatusType::Failed, false)
.await;
handle.record_task_submission("test-bucket").await;
let snapshot = metrics.get_metrics();
assert_eq!(snapshot.replication_pending_objects, 1);
assert_eq!(snapshot.replication_failed_objects, 1);
assert_eq!(snapshot.replication_lagging_objects, 1);
assert_eq!(snapshot.replication_tasks_queued, 1);
let guard = bucket_metrics.lock().await;
let bucket_entry = guard.get("test-bucket").expect("bucket metrics exists");
assert_eq!(bucket_entry.replication_pending, 1);
assert_eq!(bucket_entry.replication_failed, 1);
assert_eq!(bucket_entry.replication_lagging, 1);
assert_eq!(bucket_entry.replication_tasks_queued, 1);
}
}

View File

@@ -62,6 +62,7 @@ struct DiskScanResult {
pub struct LocalObjectRecord {
pub usage: LocalObjectUsage,
pub object_info: Option<rustfs_ecstore::store_api::ObjectInfo>,
pub file_info: Option<FileInfo>,
}
#[derive(Debug, Default)]
@@ -223,9 +224,11 @@ fn scan_disk_blocking(root: PathBuf, meta: LocalUsageSnapshotMeta, mut state: In
record.usage.last_modified_ns = mtime_ns;
state.objects.insert(rel_path.clone(), record.usage.clone());
emitted.insert(rel_path.clone());
warn!("compute_object_usage: record: {:?}", record.clone());
objects_by_bucket.entry(record.usage.bucket.clone()).or_default().push(record);
}
Ok(None) => {
warn!("compute_object_usage: None, rel_path: {:?}", rel_path);
state.objects.remove(&rel_path);
}
Err(err) => {
@@ -240,24 +243,27 @@ fn scan_disk_blocking(root: PathBuf, meta: LocalUsageSnapshotMeta, mut state: In
warn!("Failed to read xl.meta {:?}: {}", xl_path, err);
}
}
} else {
warn!("should_parse: false, rel_path: {:?}", rel_path);
}
}
state.objects.retain(|key, _| visited.contains(key));
state.last_scan_ns = Some(now_ns);
for (key, usage) in &state.objects {
if emitted.contains(key) {
continue;
}
objects_by_bucket
.entry(usage.bucket.clone())
.or_default()
.push(LocalObjectRecord {
usage: usage.clone(),
object_info: None,
});
}
// for (key, usage) in &state.objects {
// if emitted.contains(key) {
// continue;
// }
// objects_by_bucket
// .entry(usage.bucket.clone())
// .or_default()
// .push(LocalObjectRecord {
// usage: usage.clone(),
// object_info: None,
// file_info: None,
// });
// }
let snapshot = build_snapshot(meta, &state.objects, now);
status.snapshot_exists = true;
@@ -319,6 +325,7 @@ fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Res
let versioned = fi.version_id.is_some();
ObjectInfo::from_file_info(fi, bucket, object, versioned)
});
let file_info = latest_file_info.clone();
Ok(Some(LocalObjectRecord {
usage: LocalObjectUsage {
@@ -331,6 +338,7 @@ fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Res
has_live_object,
},
object_info,
file_info,
}))
}

View File

@@ -45,6 +45,14 @@ pub struct ScannerMetrics {
pub healthy_objects: u64,
/// Total corrupted objects found
pub corrupted_objects: u64,
/// Replication heal tasks queued
pub replication_tasks_queued: u64,
/// Objects observed with pending replication
pub replication_pending_objects: u64,
/// Objects observed with failed replication
pub replication_failed_objects: u64,
/// Objects with replication pending longer than grace period
pub replication_lagging_objects: u64,
/// Last scan activity time
pub last_activity: Option<SystemTime>,
/// Current scan cycle
@@ -86,6 +94,14 @@ pub struct BucketMetrics {
pub heal_tasks_completed: u64,
/// Heal tasks failed for this bucket
pub heal_tasks_failed: u64,
/// Objects observed with pending replication status
pub replication_pending: u64,
/// Objects observed with failed replication status
pub replication_failed: u64,
/// Objects exceeding replication grace period
pub replication_lagging: u64,
/// Replication heal tasks queued for this bucket
pub replication_tasks_queued: u64,
}
/// Disk-specific metrics
@@ -127,6 +143,10 @@ pub struct MetricsCollector {
total_cycles: AtomicU64,
healthy_objects: AtomicU64,
corrupted_objects: AtomicU64,
replication_tasks_queued: AtomicU64,
replication_pending_objects: AtomicU64,
replication_failed_objects: AtomicU64,
replication_lagging_objects: AtomicU64,
}
impl MetricsCollector {
@@ -146,6 +166,10 @@ impl MetricsCollector {
total_cycles: AtomicU64::new(0),
healthy_objects: AtomicU64::new(0),
corrupted_objects: AtomicU64::new(0),
replication_tasks_queued: AtomicU64::new(0),
replication_pending_objects: AtomicU64::new(0),
replication_failed_objects: AtomicU64::new(0),
replication_lagging_objects: AtomicU64::new(0),
}
}
@@ -194,6 +218,26 @@ impl MetricsCollector {
self.heal_tasks_failed.fetch_add(count, Ordering::Relaxed);
}
/// Increment replication tasks queued
pub fn increment_replication_tasks_queued(&self, count: u64) {
self.replication_tasks_queued.fetch_add(count, Ordering::Relaxed);
}
/// Increment replication pending objects
pub fn increment_replication_pending_objects(&self, count: u64) {
self.replication_pending_objects.fetch_add(count, Ordering::Relaxed);
}
/// Increment replication failed objects
pub fn increment_replication_failed_objects(&self, count: u64) {
self.replication_failed_objects.fetch_add(count, Ordering::Relaxed);
}
/// Increment replication lagging objects
pub fn increment_replication_lagging_objects(&self, count: u64) {
self.replication_lagging_objects.fetch_add(count, Ordering::Relaxed);
}
/// Set current cycle
pub fn set_current_cycle(&self, cycle: u64) {
self.current_cycle.store(cycle, Ordering::Relaxed);
@@ -228,6 +272,10 @@ impl MetricsCollector {
heal_tasks_failed: self.heal_tasks_failed.load(Ordering::Relaxed),
healthy_objects: self.healthy_objects.load(Ordering::Relaxed),
corrupted_objects: self.corrupted_objects.load(Ordering::Relaxed),
replication_tasks_queued: self.replication_tasks_queued.load(Ordering::Relaxed),
replication_pending_objects: self.replication_pending_objects.load(Ordering::Relaxed),
replication_failed_objects: self.replication_failed_objects.load(Ordering::Relaxed),
replication_lagging_objects: self.replication_lagging_objects.load(Ordering::Relaxed),
last_activity: Some(SystemTime::now()),
current_cycle: self.current_cycle.load(Ordering::Relaxed),
total_cycles: self.total_cycles.load(Ordering::Relaxed),
@@ -255,6 +303,10 @@ impl MetricsCollector {
self.total_cycles.store(0, Ordering::Relaxed);
self.healthy_objects.store(0, Ordering::Relaxed);
self.corrupted_objects.store(0, Ordering::Relaxed);
self.replication_tasks_queued.store(0, Ordering::Relaxed);
self.replication_pending_objects.store(0, Ordering::Relaxed);
self.replication_failed_objects.store(0, Ordering::Relaxed);
self.replication_lagging_objects.store(0, Ordering::Relaxed);
info!("Scanner metrics reset");
}

View File

@@ -19,6 +19,7 @@ use crate::scanner::{
};
use rustfs_common::data_usage::DataUsageInfo;
use rustfs_ecstore::StorageAPI;
use rustfs_ecstore::bucket::utils::is_meta_bucketname;
use rustfs_ecstore::disk::{DiskAPI, DiskStore};
use serde::{Deserialize, Serialize};
use std::{
@@ -879,7 +880,7 @@ impl NodeScanner {
let bucket_name = &bucket_info.name;
// skip system internal buckets
if bucket_name == ".minio.sys" {
if is_meta_bucketname(bucket_name) {
continue;
}

View File

@@ -12,31 +12,52 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_ahm::heal::{
manager::{HealConfig, HealManager},
storage::{ECStoreHealStorage, HealStorageAPI},
task::{HealOptions, HealPriority, HealRequest, HealTaskStatus, HealType},
use async_trait::async_trait;
use rustfs_ahm::{
heal::{
manager::{HealConfig, HealManager},
storage::{ECStoreHealStorage, HealStorageAPI},
task::{HealOptions, HealPriority, HealRequest, HealTaskStatus, HealType},
},
scanner::{ScanMode, Scanner},
};
use rustfs_common::heal_channel::{HealOpts, HealScanMode};
use rustfs_ecstore::bucket::metadata_sys::{self, set_bucket_metadata};
use rustfs_ecstore::bucket::replication::{
DeletedObjectReplicationInfo, DynReplicationPool, GLOBAL_REPLICATION_POOL, ReplicationPoolTrait, ReplicationPriority,
};
use rustfs_ecstore::bucket::target::{BucketTarget, BucketTargetType, BucketTargets};
use rustfs_ecstore::bucket::utils::serialize;
use rustfs_ecstore::error::Error as EcstoreError;
use rustfs_ecstore::{
disk::endpoint::Endpoint,
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
store::ECStore,
store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
};
use rustfs_filemeta::{ReplicateObjectInfo, ReplicationStatusType};
use rustfs_utils::http::headers::{AMZ_BUCKET_REPLICATION_STATUS, RESERVED_METADATA_PREFIX_LOWER};
use s3s::dto::{
BucketVersioningStatus, Destination, ExistingObjectReplication, ExistingObjectReplicationStatus, ReplicationConfiguration,
ReplicationRule, ReplicationRuleStatus, VersioningConfiguration,
};
use serial_test::serial;
use std::{
os::unix::fs::PermissionsExt,
path::PathBuf,
sync::{Arc, Once, OnceLock},
time::Duration,
};
use time::OffsetDateTime;
use tokio::fs;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use tracing::info;
use walkdir::WalkDir;
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage>)> = OnceLock::new();
static INIT: Once = Once::new();
const TEST_REPLICATION_TARGET_ARN: &str = "arn:aws:s3:::rustfs-replication-heal-target";
fn init_tracing() {
INIT.call_once(|| {
@@ -145,6 +166,225 @@ async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str,
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
}
fn delete_first_part_file(disk_paths: &[PathBuf], bucket: &str, object: &str) -> PathBuf {
for disk_path in disk_paths {
let obj_dir = disk_path.join(bucket).join(object);
if !obj_dir.exists() {
continue;
}
if let Some(part_path) = WalkDir::new(&obj_dir)
.min_depth(2)
.max_depth(2)
.into_iter()
.filter_map(Result::ok)
.find(|entry| {
entry.file_type().is_file()
&& entry
.file_name()
.to_str()
.map(|name| name.starts_with("part."))
.unwrap_or(false)
})
.map(|entry| entry.into_path())
{
std::fs::remove_file(&part_path).expect("Failed to delete part file");
return part_path;
}
}
panic!("Failed to locate part file for {}/{}", bucket, object);
}
fn delete_xl_meta_file(disk_paths: &[PathBuf], bucket: &str, object: &str) -> PathBuf {
for disk_path in disk_paths {
let xl_meta_path = disk_path.join(bucket).join(object).join("xl.meta");
if xl_meta_path.exists() {
std::fs::remove_file(&xl_meta_path).expect("Failed to delete xl.meta file");
return xl_meta_path;
}
}
panic!("Failed to locate xl.meta for {}/{}", bucket, object);
}
struct FormatPathGuard {
original: PathBuf,
backup: PathBuf,
}
impl FormatPathGuard {
fn new(original: PathBuf) -> std::io::Result<Self> {
let backup = original.with_extension("bak");
if backup.exists() {
std::fs::remove_file(&backup)?;
}
std::fs::rename(&original, &backup)?;
Ok(Self { original, backup })
}
}
impl Drop for FormatPathGuard {
fn drop(&mut self) {
if self.backup.exists() {
let _ = std::fs::rename(&self.backup, &self.original);
}
}
}
struct PermissionGuard {
path: PathBuf,
original_mode: u32,
}
impl PermissionGuard {
fn new(path: PathBuf, new_mode: u32) -> std::io::Result<Self> {
let metadata = std::fs::metadata(&path)?;
let original_mode = metadata.permissions().mode();
std::fs::set_permissions(&path, std::fs::Permissions::from_mode(new_mode))?;
Ok(Self { path, original_mode })
}
}
impl Drop for PermissionGuard {
fn drop(&mut self) {
if self.path.exists() {
let _ = std::fs::set_permissions(&self.path, std::fs::Permissions::from_mode(self.original_mode));
}
}
}
#[derive(Debug, Default)]
struct RecordingReplicationPool {
replica_tasks: Mutex<Vec<ReplicateObjectInfo>>,
delete_tasks: Mutex<Vec<DeletedObjectReplicationInfo>>,
}
impl RecordingReplicationPool {
async fn take_replica_tasks(&self) -> Vec<ReplicateObjectInfo> {
let mut guard = self.replica_tasks.lock().await;
guard.drain(..).collect()
}
async fn clear(&self) {
self.replica_tasks.lock().await.clear();
self.delete_tasks.lock().await.clear();
}
}
#[async_trait]
impl ReplicationPoolTrait for RecordingReplicationPool {
async fn queue_replica_task(&self, ri: ReplicateObjectInfo) {
self.replica_tasks.lock().await.push(ri);
}
async fn queue_replica_delete_task(&self, ri: DeletedObjectReplicationInfo) {
self.delete_tasks.lock().await.push(ri);
}
async fn resize(&self, _priority: ReplicationPriority, _max_workers: usize, _max_l_workers: usize) {}
async fn init_resync(
self: Arc<Self>,
_cancellation_token: CancellationToken,
_buckets: Vec<String>,
) -> Result<(), EcstoreError> {
Ok(())
}
}
async fn ensure_test_replication_pool() -> Arc<RecordingReplicationPool> {
static TEST_POOL: OnceLock<Arc<RecordingReplicationPool>> = OnceLock::new();
if let Some(pool) = TEST_POOL.get() {
pool.clear().await;
return pool.clone();
}
let pool = Arc::new(RecordingReplicationPool::default());
let dyn_pool: Arc<DynReplicationPool> = pool.clone();
let global_pool = GLOBAL_REPLICATION_POOL
.get_or_init(|| {
let pool_clone = dyn_pool.clone();
async move { pool_clone }
})
.await
.clone();
assert!(
Arc::ptr_eq(&dyn_pool, &global_pool),
"GLOBAL_REPLICATION_POOL initialized before test replication pool"
);
let _ = TEST_POOL.set(pool.clone());
pool.clear().await;
pool
}
async fn configure_bucket_replication(bucket: &str, target_arn: &str) {
let meta = metadata_sys::get(bucket)
.await
.expect("bucket metadata should exist for replication configuration");
let mut metadata = (*meta).clone();
let replication_rule = ReplicationRule {
delete_marker_replication: None,
delete_replication: None,
destination: Destination {
access_control_translation: None,
account: None,
bucket: target_arn.to_string(),
encryption_configuration: None,
metrics: None,
replication_time: None,
storage_class: None,
},
existing_object_replication: Some(ExistingObjectReplication {
status: ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::ENABLED),
}),
filter: None,
id: Some("heal-replication-rule".to_string()),
prefix: Some(String::new()),
priority: Some(1),
source_selection_criteria: None,
status: ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED),
};
let replication_cfg = ReplicationConfiguration {
role: target_arn.to_string(),
rules: vec![replication_rule],
};
let bucket_targets = BucketTargets {
targets: vec![BucketTarget {
source_bucket: bucket.to_string(),
endpoint: "replication.invalid".to_string(),
target_bucket: "replication-target".to_string(),
arn: target_arn.to_string(),
target_type: BucketTargetType::ReplicationService,
..Default::default()
}],
};
metadata.replication_config = Some(replication_cfg.clone());
metadata.replication_config_xml = serialize(&replication_cfg).expect("serialize replication config");
metadata.replication_config_updated_at = OffsetDateTime::now_utc();
metadata.bucket_target_config = Some(bucket_targets.clone());
metadata.bucket_targets_config_json = serde_json::to_vec(&bucket_targets).expect("serialize bucket targets");
metadata.bucket_targets_config_updated_at = OffsetDateTime::now_utc();
let versioning_cfg = VersioningConfiguration {
status: Some(BucketVersioningStatus::from_static(BucketVersioningStatus::ENABLED)),
..Default::default()
};
metadata.versioning_config = Some(versioning_cfg.clone());
metadata.versioning_config_xml = serialize(&versioning_cfg).expect("serialize versioning config");
metadata.versioning_config_updated_at = OffsetDateTime::now_utc();
set_bucket_metadata(bucket.to_string(), metadata)
.await
.expect("failed to update bucket metadata for replication");
}
mod serial_tests {
use super::*;
@@ -430,4 +670,380 @@ mod serial_tests {
info!("Direct heal storage API test passed");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_scanner_submits_heal_task_when_part_missing() {
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
let bucket_name = format!("scanner-heal-bucket-{}", uuid::Uuid::new_v4().simple());
let object_name = "scanner-heal-object.txt";
create_test_bucket(&ecstore, &bucket_name).await;
upload_test_object(&ecstore, &bucket_name, object_name, b"Scanner auto-heal data").await;
let heal_cfg = HealConfig {
enable_auto_heal: true,
heal_interval: Duration::from_millis(20),
max_concurrent_heals: 4,
..Default::default()
};
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
heal_manager.start().await.unwrap();
let scanner = Scanner::new(None, Some(heal_manager.clone()));
scanner.initialize_with_ecstore().await;
scanner.set_config_enable_healing(true).await;
scanner.set_config_scan_mode(ScanMode::Deep).await;
scanner
.scan_cycle()
.await
.expect("Initial scan should succeed before simulating failures");
let baseline_stats = heal_manager.get_statistics().await;
let deleted_part_path = delete_first_part_file(&disk_paths, &bucket_name, object_name);
assert!(!deleted_part_path.exists(), "Deleted part file should not exist before healing");
scanner
.scan_cycle()
.await
.expect("Scan after part deletion should finish and enqueue heal task");
tokio::time::sleep(Duration::from_millis(500)).await;
let updated_stats = heal_manager.get_statistics().await;
assert!(
updated_stats.total_tasks > baseline_stats.total_tasks,
"Scanner should submit heal tasks when data parts go missing"
);
// Allow heal manager to restore the missing part
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(
deleted_part_path.exists(),
"Missing part should be restored after heal: {:?}",
deleted_part_path
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_scanner_submits_metadata_heal_when_xl_meta_missing() {
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
let bucket_name = format!("scanner-meta-bucket-{}", uuid::Uuid::new_v4().simple());
let object_name = "scanner-meta-object.txt";
create_test_bucket(&ecstore, &bucket_name).await;
upload_test_object(&ecstore, &bucket_name, object_name, b"Scanner metadata heal data").await;
let heal_cfg = HealConfig {
enable_auto_heal: true,
heal_interval: Duration::from_millis(20),
max_concurrent_heals: 4,
..Default::default()
};
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
heal_manager.start().await.unwrap();
let scanner = Scanner::new(None, Some(heal_manager.clone()));
scanner.initialize_with_ecstore().await;
scanner.set_config_enable_healing(true).await;
scanner.set_config_scan_mode(ScanMode::Deep).await;
scanner
.scan_cycle()
.await
.expect("Initial scan should succeed before metadata deletion");
let baseline_stats = heal_manager.get_statistics().await;
let deleted_meta_path = delete_xl_meta_file(&disk_paths, &bucket_name, object_name);
assert!(!deleted_meta_path.exists(), "Deleted xl.meta should not exist before healing");
scanner
.scan_cycle()
.await
.expect("Scan after metadata deletion should finish and enqueue heal task");
tokio::time::sleep(Duration::from_millis(800)).await;
let updated_stats = heal_manager.get_statistics().await;
assert!(
updated_stats.total_tasks > baseline_stats.total_tasks,
"Scanner should submit metadata heal tasks when xl.meta is missing"
);
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(
deleted_meta_path.exists(),
"xl.meta should be restored after heal: {:?}",
deleted_meta_path
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_scanner_triggers_replication_heal_when_status_failed() {
let (_disk_paths, ecstore, heal_storage) = setup_test_env().await;
let bucket_name = format!("scanner-replication-bucket-{}", uuid::Uuid::new_v4().simple());
let object_name = "scanner-replication-heal-object";
create_test_bucket(&ecstore, &bucket_name).await;
configure_bucket_replication(&bucket_name, TEST_REPLICATION_TARGET_ARN).await;
let replication_pool = ensure_test_replication_pool().await;
replication_pool.clear().await;
let mut opts = ObjectOptions::default();
opts.user_defined.insert(
AMZ_BUCKET_REPLICATION_STATUS.to_string(),
ReplicationStatusType::Failed.as_str().to_string(),
);
let replication_status_key = format!("{}replication-status", RESERVED_METADATA_PREFIX_LOWER);
opts.user_defined.insert(
replication_status_key.clone(),
format!("{}={};", TEST_REPLICATION_TARGET_ARN, ReplicationStatusType::Failed.as_str()),
);
let mut reader = PutObjReader::from_vec(b"replication heal data".to_vec());
ecstore
.put_object(&bucket_name, object_name, &mut reader, &opts)
.await
.expect("Failed to upload replication test object");
let object_info = ecstore
.get_object_info(&bucket_name, object_name, &ObjectOptions::default())
.await
.expect("Failed to read object info for replication test");
assert_eq!(
object_info
.user_defined
.get(AMZ_BUCKET_REPLICATION_STATUS)
.map(|s| s.as_str()),
Some(ReplicationStatusType::Failed.as_str()),
"Uploaded object should contain replication status metadata"
);
assert!(
object_info
.user_defined
.get(&replication_status_key)
.map(|s| s.contains(ReplicationStatusType::Failed.as_str()))
.unwrap_or(false),
"Uploaded object should preserve internal replication status metadata"
);
let heal_cfg = HealConfig {
enable_auto_heal: true,
heal_interval: Duration::from_millis(20),
max_concurrent_heals: 4,
..Default::default()
};
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
heal_manager.start().await.unwrap();
let scanner = Scanner::new(None, Some(heal_manager.clone()));
scanner.initialize_with_ecstore().await;
scanner.set_config_enable_healing(true).await;
scanner.set_config_scan_mode(ScanMode::Deep).await;
scanner
.scan_cycle()
.await
.expect("Scan cycle should succeed and evaluate replication state");
let replica_tasks = replication_pool.take_replica_tasks().await;
assert!(
replica_tasks
.iter()
.any(|info| info.bucket == bucket_name && info.name == object_name),
"Scanner should enqueue replication heal task when replication status is FAILED (recorded tasks: {:?})",
replica_tasks
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_scanner_submits_erasure_set_heal_when_disk_offline() {
let (disk_paths, _ecstore, heal_storage) = setup_test_env().await;
let format_path = disk_paths[0].join(".rustfs.sys").join("format.json");
assert!(format_path.exists(), "format.json should exist before simulating offline disk");
let _format_guard = FormatPathGuard::new(format_path.clone()).expect("failed to move format.json");
let heal_cfg = HealConfig {
enable_auto_heal: true,
heal_interval: Duration::from_millis(20),
max_concurrent_heals: 2,
..Default::default()
};
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
heal_manager.start().await.unwrap();
let scanner = Scanner::new(None, Some(heal_manager.clone()));
scanner.initialize_with_ecstore().await;
scanner.set_config_enable_healing(true).await;
scanner.set_config_scan_mode(ScanMode::Normal).await;
let baseline_stats = heal_manager.get_statistics().await;
scanner
.scan_cycle()
.await
.expect("Scan cycle should complete even when a disk is offline");
tokio::time::sleep(Duration::from_millis(200)).await;
let updated_stats = heal_manager.get_statistics().await;
assert!(
updated_stats.total_tasks > baseline_stats.total_tasks,
"Scanner should enqueue erasure set heal when disk is offline (before {}, after {})",
baseline_stats.total_tasks,
updated_stats.total_tasks
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_scanner_submits_erasure_set_heal_when_listing_volumes_fails() {
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
let bucket_name = format!("scanner-list-volumes-{}", uuid::Uuid::new_v4().simple());
let object_name = "scanner-list-volumes-object";
create_test_bucket(&ecstore, &bucket_name).await;
upload_test_object(&ecstore, &bucket_name, object_name, b"disk list volumes failure").await;
let heal_cfg = HealConfig {
enable_auto_heal: true,
heal_interval: Duration::from_millis(20),
max_concurrent_heals: 2,
..Default::default()
};
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
heal_manager.start().await.unwrap();
let scanner = Scanner::new(None, Some(heal_manager.clone()));
scanner.initialize_with_ecstore().await;
scanner.set_config_enable_healing(true).await;
scanner.set_config_scan_mode(ScanMode::Deep).await;
scanner
.scan_cycle()
.await
.expect("Initial scan should succeed before simulating disk permission issues");
let baseline_stats = heal_manager.get_statistics().await;
let disk_root = disk_paths[0].clone();
assert!(disk_root.exists(), "Disk root should exist so we can simulate permission failures");
{
let _root_perm_guard =
PermissionGuard::new(disk_root.clone(), 0o000).expect("Failed to change disk root permissions");
let scan_result = scanner.scan_cycle().await;
assert!(
scan_result.is_ok(),
"Scan cycle should continue even if disk volumes cannot be listed: {:?}",
scan_result
);
tokio::time::sleep(Duration::from_millis(200)).await;
let updated_stats = heal_manager.get_statistics().await;
assert!(
updated_stats.total_tasks > baseline_stats.total_tasks,
"Scanner should enqueue erasure set heal when listing volumes fails (before {}, after {})",
baseline_stats.total_tasks,
updated_stats.total_tasks
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_scanner_submits_erasure_set_heal_when_disk_access_fails() {
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
let bucket_name = format!("scanner-access-error-{}", uuid::Uuid::new_v4().simple());
let object_name = "scanner-access-error-object.txt";
create_test_bucket(&ecstore, &bucket_name).await;
upload_test_object(&ecstore, &bucket_name, object_name, b"disk access failure").await;
let bucket_path = disk_paths[0].join(&bucket_name);
assert!(bucket_path.exists(), "Bucket path should exist on disk for access test");
let _perm_guard = PermissionGuard::new(bucket_path.clone(), 0o000).expect("Failed to change permissions");
let heal_cfg = HealConfig {
enable_auto_heal: true,
heal_interval: Duration::from_millis(20),
max_concurrent_heals: 2,
..Default::default()
};
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
heal_manager.start().await.unwrap();
let scanner = Scanner::new(None, Some(heal_manager.clone()));
scanner.initialize_with_ecstore().await;
scanner.set_config_enable_healing(true).await;
scanner.set_config_scan_mode(ScanMode::Deep).await;
let baseline_stats = heal_manager.get_statistics().await;
let scan_result = scanner.scan_cycle().await;
assert!(
scan_result.is_ok(),
"Scan cycle should complete even if a disk volume has access errors: {:?}",
scan_result
);
tokio::time::sleep(Duration::from_millis(200)).await;
let updated_stats = heal_manager.get_statistics().await;
assert!(
updated_stats.total_tasks > baseline_stats.total_tasks,
"Scanner should enqueue erasure set heal when disk access fails (before {}, after {})",
baseline_stats.total_tasks,
updated_stats.total_tasks
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_scanner_detects_missing_bucket_directory_and_queues_bucket_heal() {
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
let bucket_name = format!("scanner-missing-bucket-{}", uuid::Uuid::new_v4().simple());
create_test_bucket(&ecstore, &bucket_name).await;
upload_test_object(&ecstore, &bucket_name, "seed-object", b"bucket heal data").await;
let scanner_heal_cfg = HealConfig {
enable_auto_heal: true,
heal_interval: Duration::from_millis(20),
max_concurrent_heals: 4,
..Default::default()
};
let scanner_heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(scanner_heal_cfg)));
scanner_heal_manager.start().await.unwrap();
let scanner = Scanner::new(None, Some(scanner_heal_manager.clone()));
scanner.initialize_with_ecstore().await;
scanner.set_config_enable_healing(true).await;
scanner.set_config_scan_mode(ScanMode::Normal).await;
scanner
.scan_cycle()
.await
.expect("Initial scan should succeed before deleting bucket directory");
let baseline_stats = scanner_heal_manager.get_statistics().await;
let missing_dir = disk_paths[0].join(&bucket_name);
assert!(missing_dir.exists());
std::fs::remove_dir_all(&missing_dir).expect("Failed to remove bucket directory for heal simulation");
assert!(!missing_dir.exists(), "Bucket directory should be removed on disk to trigger heal");
scanner
.run_volume_consistency_check()
.await
.expect("Volume consistency check should run after bucket removal");
tokio::time::sleep(Duration::from_millis(800)).await;
let updated_stats = scanner_heal_manager.get_statistics().await;
assert!(
updated_stats.total_tasks > baseline_stats.total_tasks,
"Scanner should submit bucket heal tasks when a bucket directory is missing"
);
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(missing_dir.exists(), "Bucket directory should be restored after heal");
}
}

View File

@@ -12,10 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use rustfs_ahm::scanner::{Scanner, data_scanner::ScannerConfig};
use rustfs_ecstore::{
bucket::metadata::BUCKET_LIFECYCLE_CONFIG,
bucket::metadata_sys,
bucket::{
metadata::BUCKET_LIFECYCLE_CONFIG,
metadata_sys,
replication::{
DeletedObjectReplicationInfo, DynReplicationPool, GLOBAL_REPLICATION_POOL, ReplicationPoolTrait, ReplicationPriority,
},
target::{BucketTarget, BucketTargetType, BucketTargets},
utils::serialize,
},
disk::endpoint::Endpoint,
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
global::GLOBAL_TierConfigMgr,
@@ -23,18 +31,27 @@ use rustfs_ecstore::{
store_api::{MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
tier::tier_config::{TierConfig, TierMinIO, TierType},
};
use rustfs_filemeta::{ReplicateObjectInfo, ReplicationStatusType};
use rustfs_utils::http::headers::{AMZ_BUCKET_REPLICATION_STATUS, RESERVED_METADATA_PREFIX_LOWER};
use s3s::dto::{
BucketVersioningStatus, Destination, ExistingObjectReplication, ExistingObjectReplicationStatus, ReplicationConfiguration,
ReplicationRule, ReplicationRuleStatus, VersioningConfiguration,
};
use serial_test::serial;
use std::{
path::PathBuf,
sync::{Arc, Once, OnceLock},
time::Duration,
};
use time::{Duration as TimeDuration, OffsetDateTime};
use tokio::fs;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use tracing::info;
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
static INIT: Once = Once::new();
const TEST_REPLICATION_TARGET_ARN: &str = "arn:aws:s3:::rustfs-lifecycle-replication-test";
fn init_tracing() {
INIT.call_once(|| {
@@ -159,6 +176,167 @@ async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str,
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
}
#[derive(Debug, Default)]
struct RecordingReplicationPool {
replica_tasks: Mutex<Vec<ReplicateObjectInfo>>,
delete_tasks: Mutex<Vec<DeletedObjectReplicationInfo>>,
}
impl RecordingReplicationPool {
async fn take_replica_tasks(&self) -> Vec<ReplicateObjectInfo> {
let mut guard = self.replica_tasks.lock().await;
guard.drain(..).collect()
}
}
#[async_trait]
impl ReplicationPoolTrait for RecordingReplicationPool {
async fn queue_replica_task(&self, ri: ReplicateObjectInfo) {
self.replica_tasks.lock().await.push(ri);
}
async fn queue_replica_delete_task(&self, ri: DeletedObjectReplicationInfo) {
self.delete_tasks.lock().await.push(ri);
}
async fn resize(&self, _priority: ReplicationPriority, _max_workers: usize, _max_l_workers: usize) {}
async fn init_resync(
self: Arc<Self>,
_cancellation_token: CancellationToken,
_buckets: Vec<String>,
) -> Result<(), rustfs_ecstore::error::Error> {
Ok(())
}
}
async fn ensure_test_replication_pool() -> Arc<RecordingReplicationPool> {
static POOL: OnceLock<Arc<RecordingReplicationPool>> = OnceLock::new();
if let Some(existing) = POOL.get() {
existing.replica_tasks.lock().await.clear();
existing.delete_tasks.lock().await.clear();
return existing.clone();
}
let pool = Arc::new(RecordingReplicationPool::default());
let dyn_pool: Arc<DynReplicationPool> = pool.clone();
GLOBAL_REPLICATION_POOL
.get_or_init(|| {
let pool_clone = dyn_pool.clone();
async move { pool_clone }
})
.await;
let _ = POOL.set(pool.clone());
pool
}
async fn configure_bucket_replication(bucket: &str) {
let meta = metadata_sys::get(bucket)
.await
.expect("bucket metadata should exist for replication configuration");
let mut metadata = (*meta).clone();
let replication_rule = ReplicationRule {
delete_marker_replication: None,
delete_replication: None,
destination: Destination {
access_control_translation: None,
account: None,
bucket: TEST_REPLICATION_TARGET_ARN.to_string(),
encryption_configuration: None,
metrics: None,
replication_time: None,
storage_class: None,
},
existing_object_replication: Some(ExistingObjectReplication {
status: ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::ENABLED),
}),
filter: None,
id: Some("lifecycle-replication-rule".to_string()),
prefix: Some(String::new()),
priority: Some(1),
source_selection_criteria: None,
status: ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED),
};
let replication_cfg = ReplicationConfiguration {
role: TEST_REPLICATION_TARGET_ARN.to_string(),
rules: vec![replication_rule],
};
let bucket_targets = BucketTargets {
targets: vec![BucketTarget {
source_bucket: bucket.to_string(),
endpoint: "replication.invalid".to_string(),
target_bucket: "replication-target".to_string(),
arn: TEST_REPLICATION_TARGET_ARN.to_string(),
target_type: BucketTargetType::ReplicationService,
..Default::default()
}],
};
metadata.replication_config = Some(replication_cfg.clone());
metadata.replication_config_xml = serialize(&replication_cfg).expect("serialize replication config");
metadata.bucket_target_config = Some(bucket_targets.clone());
metadata.bucket_targets_config_json = serde_json::to_vec(&bucket_targets).expect("serialize bucket targets");
let versioning_cfg = VersioningConfiguration {
status: Some(BucketVersioningStatus::from_static(BucketVersioningStatus::ENABLED)),
..Default::default()
};
metadata.versioning_config = Some(versioning_cfg.clone());
metadata.versioning_config_xml = serialize(&versioning_cfg).expect("serialize versioning config");
metadata_sys::set_bucket_metadata(bucket.to_string(), metadata)
.await
.expect("failed to persist bucket metadata with replication config");
}
async fn upload_object_with_replication_status(
ecstore: &Arc<ECStore>,
bucket: &str,
object: &str,
status: ReplicationStatusType,
) {
let mut reader = PutObjReader::from_vec(b"replication-state".to_vec());
let mut opts = ObjectOptions::default();
opts.user_defined
.insert(AMZ_BUCKET_REPLICATION_STATUS.to_string(), status.as_str().to_string());
let internal_key = format!("{}replication-status", RESERVED_METADATA_PREFIX_LOWER);
opts.user_defined
.insert(internal_key, format!("{}={};", TEST_REPLICATION_TARGET_ARN, status.as_str()));
(**ecstore)
.put_object(bucket, object, &mut reader, &opts)
.await
.expect("failed to upload replication test object");
}
async fn upload_object_with_retention(ecstore: &Arc<ECStore>, bucket: &str, object: &str, data: &[u8], retain_for: Duration) {
use s3s::header::{X_AMZ_OBJECT_LOCK_MODE, X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE};
use time::format_description::well_known::Rfc3339;
let mut reader = PutObjReader::from_vec(data.to_vec());
let mut opts = ObjectOptions::default();
let retain_duration = TimeDuration::try_from(retain_for).unwrap_or_else(|_| TimeDuration::seconds(0));
let retain_until = OffsetDateTime::now_utc() + retain_duration;
let retain_until_str = retain_until.format(&Rfc3339).expect("format retain date");
let lock_mode_key = X_AMZ_OBJECT_LOCK_MODE.as_str().to_string();
let lock_mode_lower = lock_mode_key.to_lowercase();
opts.user_defined.insert(lock_mode_lower, "GOVERNANCE".to_string());
opts.user_defined.insert(lock_mode_key, "GOVERNANCE".to_string());
let retain_key = X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str().to_string();
let retain_key_lower = retain_key.to_lowercase();
opts.user_defined.insert(retain_key_lower, retain_until_str.clone());
opts.user_defined.insert(retain_key, retain_until_str);
(**ecstore)
.put_object(bucket, object, &mut reader, &opts)
.await
.expect("Failed to upload retained object");
}
/// Test helper: Set bucket lifecycle configuration
async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
@@ -694,4 +872,127 @@ mod serial_tests {
println!("Lifecycle transition basic test completed");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_lifecycle_respects_object_lock_retention() {
let (_disk_paths, ecstore) = setup_test_env().await;
let suffix = uuid::Uuid::new_v4().simple().to_string();
let bucket_name = format!("test-lc-lock-retention-{}", &suffix[..8]);
let object_name = "test/locked-object.txt";
let test_data = b"retained payload";
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
upload_object_with_retention(&ecstore, bucket_name.as_str(), object_name, test_data, Duration::from_secs(3600)).await;
assert!(
object_exists(&ecstore, bucket_name.as_str(), object_name).await,
"Object should exist before lifecycle processing"
);
set_bucket_lifecycle(bucket_name.as_str())
.await
.expect("Failed to set lifecycle configuration");
let scanner_config = ScannerConfig {
scan_interval: Duration::from_millis(100),
deep_scan_interval: Duration::from_millis(500),
max_concurrent_scans: 1,
..Default::default()
};
let scanner = Scanner::new(Some(scanner_config), None);
scanner.start().await.expect("Failed to start scanner");
for _ in 0..3 {
scanner.scan_cycle().await.expect("scan cycle should succeed");
tokio::time::sleep(Duration::from_millis(200)).await;
}
assert!(
object_exists(&ecstore, bucket_name.as_str(), object_name).await,
"Object with active retention should not be deleted by lifecycle"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_lifecycle_triggers_replication_heal_for_lagging_and_failed_objects() {
let (_disk_paths, ecstore) = setup_test_env().await;
let suffix = uuid::Uuid::new_v4().simple().to_string();
let bucket_name = format!("lc-replication-{}", &suffix[..8]);
create_test_bucket(&ecstore, bucket_name.as_str()).await;
configure_bucket_replication(bucket_name.as_str()).await;
let replication_pool = ensure_test_replication_pool().await;
upload_object_with_replication_status(
&ecstore,
bucket_name.as_str(),
"test/lagging-pending",
ReplicationStatusType::Pending,
)
.await;
upload_object_with_replication_status(
&ecstore,
bucket_name.as_str(),
"test/failed-object",
ReplicationStatusType::Failed,
)
.await;
let scanner_config = ScannerConfig {
scan_interval: Duration::from_millis(100),
deep_scan_interval: Duration::from_millis(500),
max_concurrent_scans: 2,
replication_pending_grace: Duration::from_secs(0),
..Default::default()
};
let scanner = Scanner::new(Some(scanner_config), None);
scanner.scan_cycle().await.expect("scan cycle should complete");
tokio::time::sleep(Duration::from_millis(200)).await;
let replica_tasks = replication_pool.take_replica_tasks().await;
assert!(
replica_tasks.iter().any(|t| t.name == "test/lagging-pending"),
"Pending object should be enqueued for replication heal: {:?}",
replica_tasks
);
assert!(
replica_tasks.iter().any(|t| t.name == "test/failed-object"),
"Failed object should be enqueued for replication heal: {:?}",
replica_tasks
);
let metrics = scanner.get_metrics().await;
assert_eq!(
metrics.replication_tasks_queued,
replica_tasks.len() as u64,
"Replication tasks queued metric should match recorded tasks"
);
assert!(
metrics.replication_pending_objects >= 1,
"Pending replication metric should be incremented"
);
assert!(metrics.replication_failed_objects >= 1, "Failed replication metric should be incremented");
assert!(
metrics.replication_lagging_objects >= 1,
"Lagging replication metric should track pending object beyond grace"
);
let bucket_metrics = metrics
.bucket_metrics
.get(&bucket_name)
.expect("bucket metrics should contain replication counters");
assert!(
bucket_metrics.replication_pending >= 1 && bucket_metrics.replication_failed >= 1,
"Bucket-level replication metrics should reflect observed statuses"
);
assert_eq!(
bucket_metrics.replication_tasks_queued,
replica_tasks.len() as u64,
"Bucket-level queued counter should match enqueued tasks"
);
}
}

View File

@@ -37,7 +37,7 @@ pub fn get_object_retention_meta(meta: HashMap<String, String>) -> ObjectLockRet
let mut mode_str = meta.get(X_AMZ_OBJECT_LOCK_MODE.as_str().to_lowercase().as_str());
if mode_str.is_none() {
mode_str = Some(&meta[X_AMZ_OBJECT_LOCK_MODE.as_str()]);
mode_str = meta.get(X_AMZ_OBJECT_LOCK_MODE.as_str());
}
let mode = if let Some(mode_str) = mode_str {
parse_ret_mode(mode_str.as_str())
@@ -50,7 +50,7 @@ pub fn get_object_retention_meta(meta: HashMap<String, String>) -> ObjectLockRet
let mut till_str = meta.get(X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str().to_lowercase().as_str());
if till_str.is_none() {
till_str = Some(&meta[X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str()]);
till_str = meta.get(X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str());
}
if let Some(till_str) = till_str {
let t = OffsetDateTime::parse(till_str, &format_description::well_known::Iso8601::DEFAULT);
@@ -67,7 +67,7 @@ pub fn get_object_retention_meta(meta: HashMap<String, String>) -> ObjectLockRet
pub fn get_object_legalhold_meta(meta: HashMap<String, String>) -> ObjectLockLegalHold {
let mut hold_str = meta.get(X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str().to_lowercase().as_str());
if hold_str.is_none() {
hold_str = Some(&meta[X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str()]);
hold_str = meta.get(X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str());
}
if let Some(hold_str) = hold_str {
return ObjectLockLegalHold {

View File

@@ -34,8 +34,8 @@ use rustfs_madmin::heal_commands::HealResultItem;
use rustfs_rio::Checksum;
use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader};
use rustfs_utils::CompressionAlgorithm;
use rustfs_utils::http::AMZ_STORAGE_CLASS;
use rustfs_utils::http::headers::{AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX_LOWER};
use rustfs_utils::http::{AMZ_BUCKET_REPLICATION_STATUS, AMZ_STORAGE_CLASS};
use rustfs_utils::path::decode_dir_object;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -745,7 +745,22 @@ impl ObjectInfo {
let inlined = fi.inline_data();
// TODO:expires
// TODO:ReplicationState
let mut replication_status_internal = None;
let mut version_purge_status_internal = None;
if let Some(replication_state) = fi.replication_state_internal.as_ref() {
replication_status_internal = replication_state.replication_status_internal.clone();
version_purge_status_internal = replication_state.version_purge_status_internal.clone();
}
let mut replication_status = fi.replication_status();
if replication_status.is_empty()
&& let Some(status) = fi.metadata.get(AMZ_BUCKET_REPLICATION_STATUS)
&& status == ReplicationStatusType::Replica.as_str()
{
replication_status = ReplicationStatusType::Replica;
}
let version_purge_status = fi.version_purge_status();
let transitioned_object = TransitionedObject {
name: fi.transitioned_objname.clone(),
@@ -810,6 +825,10 @@ impl ObjectInfo {
transitioned_object,
checksum: fi.checksum.clone(),
storage_class,
replication_status_internal,
version_purge_status_internal,
replication_status,
version_purge_status,
..Default::default()
}
}

View File

@@ -314,13 +314,15 @@ impl ECStore {
// contextCanceled
let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_infos(
&list_result.entries.unwrap_or_default(),
bucket,
prefix,
delimiter.clone(),
)
.await;
let entries = list_result.entries.unwrap_or_default();
for entry in entries.entries() {
if entry.is_object() {
let fi = entry.to_fileinfo(bucket).unwrap();
tracing::warn!("list_objects_generic file_info: {:?}", fi);
}
}
let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_infos(&entries, bucket, prefix, delimiter.clone()).await;
let is_truncated = {
if max_keys > 0 && get_objects.len() > max_keys as usize {