mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
Compare commits
12 Commits
1.0.0-alph
...
feat/scan
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
00787cbce4 | ||
|
|
3ac004510a | ||
|
|
d8f8bfa5b7 | ||
|
|
1768e7bbdb | ||
|
|
3326737c01 | ||
|
|
91770ffd1b | ||
|
|
7940b69bf8 | ||
|
|
427d31d09c | ||
|
|
dbdcecb9c5 | ||
|
|
ad34f1b031 | ||
|
|
2a5ccd2211 | ||
|
|
c43166c4c6 |
@@ -31,7 +31,7 @@ use tokio::{
|
||||
time::interval,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
/// Priority queue wrapper for heal requests
|
||||
/// Uses BinaryHeap for priority-based ordering while maintaining FIFO for same-priority items
|
||||
@@ -418,7 +418,12 @@ impl HealManager {
|
||||
|
||||
/// Get statistics
|
||||
pub async fn get_statistics(&self) -> HealStatistics {
|
||||
self.statistics.read().await.clone()
|
||||
let stats = self.statistics.read().await.clone();
|
||||
debug!(
|
||||
"HealManager stats snapshot: total_tasks={}, successful_tasks={}, failed_tasks={}, running_tasks={}",
|
||||
stats.total_tasks, stats.successful_tasks, stats.failed_tasks, stats.running_tasks
|
||||
);
|
||||
stats
|
||||
}
|
||||
|
||||
/// Get active task count
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -12,7 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::Result;
|
||||
use crate::{
|
||||
Result,
|
||||
scanner::metrics::{BucketMetrics, MetricsCollector},
|
||||
};
|
||||
use rustfs_common::data_usage::SizeSummary;
|
||||
use rustfs_common::metrics::IlmAction;
|
||||
use rustfs_ecstore::bucket::{
|
||||
@@ -27,15 +30,26 @@ use rustfs_ecstore::bucket::{
|
||||
versioning::VersioningApi,
|
||||
versioning_sys::BucketVersioningSys,
|
||||
};
|
||||
use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete};
|
||||
use rustfs_filemeta::FileInfo;
|
||||
use s3s::dto::{BucketLifecycleConfiguration as LifecycleConfig, VersioningConfiguration};
|
||||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
use rustfs_ecstore::bucket::{
|
||||
replication::{GLOBAL_REPLICATION_POOL, ReplicationConfig, get_heal_replicate_object_info},
|
||||
utils::is_meta_bucketname,
|
||||
};
|
||||
use time::OffsetDateTime;
|
||||
use tracing::info;
|
||||
use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete};
|
||||
use rustfs_filemeta::{FileInfo, ReplicationStatusType, replication_statuses_map};
|
||||
use rustfs_utils::http::headers::{AMZ_BUCKET_REPLICATION_STATUS, HeaderExt, VERSION_PURGE_STATUS_KEY};
|
||||
use s3s::dto::DefaultRetention;
|
||||
use s3s::dto::{BucketLifecycleConfiguration as LifecycleConfig, VersioningConfiguration};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
},
|
||||
time::Duration as StdDuration,
|
||||
};
|
||||
use time::{Duration as TimeDuration, OffsetDateTime};
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
static SCANNER_EXCESS_OBJECT_VERSIONS: AtomicU64 = AtomicU64::new(100);
|
||||
static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(1024 * 1024 * 1024 * 1024); // 1 TB
|
||||
@@ -44,21 +58,94 @@ static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(102
|
||||
pub struct ScannerItem {
|
||||
pub bucket: String,
|
||||
pub object_name: String,
|
||||
pub replication: Option<ReplicationConfig>,
|
||||
pub lifecycle: Option<Arc<LifecycleConfig>>,
|
||||
pub versioning: Option<Arc<VersioningConfiguration>>,
|
||||
pub object_lock_config: Option<DefaultRetention>,
|
||||
pub replication_pending_grace: StdDuration,
|
||||
pub replication_metrics: Option<ReplicationMetricsHandle>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ReplicationMetricsHandle {
|
||||
inner: Arc<ReplicationMetricsInner>,
|
||||
}
|
||||
|
||||
struct ReplicationMetricsInner {
|
||||
metrics: Arc<MetricsCollector>,
|
||||
bucket_metrics: Arc<Mutex<HashMap<String, BucketMetrics>>>,
|
||||
}
|
||||
|
||||
impl ReplicationMetricsHandle {
|
||||
pub fn new(metrics: Arc<MetricsCollector>, bucket_metrics: Arc<Mutex<HashMap<String, BucketMetrics>>>) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(ReplicationMetricsInner { metrics, bucket_metrics }),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn record_status(&self, bucket: &str, status: ReplicationStatusType, lagging: bool) {
|
||||
match status {
|
||||
ReplicationStatusType::Pending => self.inner.metrics.increment_replication_pending_objects(1),
|
||||
ReplicationStatusType::Failed => self.inner.metrics.increment_replication_failed_objects(1),
|
||||
_ => {}
|
||||
}
|
||||
if lagging {
|
||||
self.inner.metrics.increment_replication_lagging_objects(1);
|
||||
}
|
||||
|
||||
let mut guard = self.inner.bucket_metrics.lock().await;
|
||||
let entry = guard.entry(bucket.to_string()).or_insert_with(|| BucketMetrics {
|
||||
bucket: bucket.to_string(),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
match status {
|
||||
ReplicationStatusType::Pending => {
|
||||
entry.replication_pending = entry.replication_pending.saturating_add(1);
|
||||
}
|
||||
ReplicationStatusType::Failed => {
|
||||
entry.replication_failed = entry.replication_failed.saturating_add(1);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if lagging {
|
||||
entry.replication_lagging = entry.replication_lagging.saturating_add(1);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn record_task_submission(&self, bucket: &str) {
|
||||
self.inner.metrics.increment_replication_tasks_queued(1);
|
||||
let mut guard = self.inner.bucket_metrics.lock().await;
|
||||
let entry = guard.entry(bucket.to_string()).or_insert_with(|| BucketMetrics {
|
||||
bucket: bucket.to_string(),
|
||||
..Default::default()
|
||||
});
|
||||
entry.replication_tasks_queued = entry.replication_tasks_queued.saturating_add(1);
|
||||
}
|
||||
}
|
||||
|
||||
impl ScannerItem {
|
||||
const INTERNAL_REPLICATION_STATUS_KEY: &'static str = "x-rustfs-internal-replication-status";
|
||||
|
||||
pub fn new(
|
||||
bucket: String,
|
||||
replication: Option<ReplicationConfig>,
|
||||
lifecycle: Option<Arc<LifecycleConfig>>,
|
||||
versioning: Option<Arc<VersioningConfiguration>>,
|
||||
object_lock_config: Option<DefaultRetention>,
|
||||
replication_pending_grace: StdDuration,
|
||||
replication_metrics: Option<ReplicationMetricsHandle>,
|
||||
) -> Self {
|
||||
Self {
|
||||
bucket,
|
||||
object_name: "".to_string(),
|
||||
replication,
|
||||
lifecycle,
|
||||
versioning,
|
||||
object_lock_config,
|
||||
replication_pending_grace,
|
||||
replication_metrics,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,6 +251,23 @@ impl ScannerItem {
|
||||
}
|
||||
|
||||
pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, i64) {
|
||||
let object_locked = self.is_object_lock_protected(oi);
|
||||
|
||||
if let Err(err) = self.heal_replication(oi).await {
|
||||
warn!(
|
||||
"heal_replication failed for {}/{} (version {:?}): {}",
|
||||
oi.bucket, oi.name, oi.version_id, err
|
||||
);
|
||||
}
|
||||
|
||||
if object_locked {
|
||||
info!(
|
||||
"apply_actions: Skipping lifecycle for {}/{} because object lock retention or legal hold is active",
|
||||
oi.bucket, oi.name
|
||||
);
|
||||
return (false, oi.size);
|
||||
}
|
||||
|
||||
let (action, _size) = self.apply_lifecycle(oi).await;
|
||||
|
||||
info!(
|
||||
@@ -174,16 +278,6 @@ impl ScannerItem {
|
||||
oi.user_defined.clone()
|
||||
);
|
||||
|
||||
// Create a mutable clone if you need to modify fields
|
||||
/*let mut oi = oi.clone();
|
||||
oi.replication_status = ReplicationStatusType::from(
|
||||
oi.user_defined
|
||||
.get("x-amz-bucket-replication-status")
|
||||
.unwrap_or(&"PENDING".to_string()),
|
||||
);
|
||||
info!("apply status is: {:?}", oi.replication_status);
|
||||
self.heal_replication(&oi, _size_s).await;*/
|
||||
|
||||
if action.delete_all() {
|
||||
return (true, 0);
|
||||
}
|
||||
@@ -200,7 +294,7 @@ impl ScannerItem {
|
||||
|
||||
info!("apply_lifecycle: Lifecycle config exists for object: {}", oi.name);
|
||||
|
||||
let (olcfg, rcfg) = if self.bucket != ".minio.sys" {
|
||||
let (olcfg, rcfg) = if !is_meta_bucketname(&self.bucket) {
|
||||
(
|
||||
get_object_lock_config(&self.bucket).await.ok(),
|
||||
None, // FIXME: replication config
|
||||
@@ -266,4 +360,202 @@ impl ScannerItem {
|
||||
|
||||
(lc_evt.action, new_size)
|
||||
}
|
||||
|
||||
fn is_object_lock_protected(&self, oi: &ObjectInfo) -> bool {
|
||||
enforce_retention_for_deletion(oi)
|
||||
}
|
||||
|
||||
async fn heal_replication(&self, oi: &ObjectInfo) -> Result<()> {
|
||||
warn!("heal_replication: healing replication for {}/{}", oi.bucket, oi.name);
|
||||
warn!("heal_replication: ObjectInfo oi: {:?}", oi);
|
||||
|
||||
let enriched = Self::hydrate_replication_metadata(oi);
|
||||
let pending_lagging = self.is_pending_lagging(&enriched);
|
||||
|
||||
if let Some(handle) = &self.replication_metrics {
|
||||
handle
|
||||
.record_status(&self.bucket, enriched.replication_status.clone(), pending_lagging)
|
||||
.await;
|
||||
}
|
||||
|
||||
debug!(
|
||||
"heal_replication: evaluating {}/{} with status {:?} and internal {:?}",
|
||||
enriched.bucket, enriched.name, enriched.replication_status, enriched.replication_status_internal
|
||||
);
|
||||
|
||||
// if !self.needs_replication_heal(&enriched, pending_lagging) {
|
||||
// return Ok(());
|
||||
// }
|
||||
|
||||
// let replication_cfg = match get_replication_config(&self.bucket).await {
|
||||
// Ok((cfg, _)) => Some(cfg),
|
||||
// Err(err) => {
|
||||
// debug!("heal_replication: failed to fetch replication config for bucket {}: {}", self.bucket, err);
|
||||
// None
|
||||
// }
|
||||
// };
|
||||
|
||||
// if replication_cfg.is_none() {
|
||||
// return Ok(());
|
||||
// }
|
||||
|
||||
// let bucket_targets = match get_bucket_targets_config(&self.bucket).await {
|
||||
// Ok(targets) => Some(targets),
|
||||
// Err(err) => {
|
||||
// debug!("heal_replication: no bucket targets for bucket {}: {}", self.bucket, err);
|
||||
// None
|
||||
// }
|
||||
// };
|
||||
|
||||
// let replication_cfg = ReplicationConfig::new(replication_cfg, bucket_targets);
|
||||
|
||||
let replication_cfg = self.replication.clone().unwrap_or_default();
|
||||
|
||||
if replication_cfg.config.is_none() && replication_cfg.remotes.is_none() {
|
||||
debug!("heal_replication: no replication config for {}/{}", enriched.bucket, enriched.name);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let replicate_info = get_heal_replicate_object_info(&enriched, &replication_cfg).await;
|
||||
let should_replicate = replicate_info.dsc.replicate_any()
|
||||
|| matches!(
|
||||
enriched.replication_status,
|
||||
ReplicationStatusType::Failed | ReplicationStatusType::Pending
|
||||
);
|
||||
if !should_replicate {
|
||||
debug!("heal_replication: no actionable targets for {}/{}", enriched.bucket, enriched.name);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(pool) = GLOBAL_REPLICATION_POOL.get() {
|
||||
pool.queue_replica_task(replicate_info).await;
|
||||
if let Some(handle) = &self.replication_metrics {
|
||||
handle.record_task_submission(&self.bucket).await;
|
||||
}
|
||||
warn!("heal_replication: queued replication heal task for {}/{}", enriched.bucket, enriched.name);
|
||||
} else {
|
||||
warn!(
|
||||
"heal_replication: GLOBAL_REPLICATION_POOL not initialized, skipping heal for {}/{}",
|
||||
enriched.bucket, enriched.name
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn needs_replication_heal(&self, oi: &ObjectInfo, pending_lagging: bool) -> bool {
|
||||
if matches!(oi.replication_status, ReplicationStatusType::Failed) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if pending_lagging && matches!(oi.replication_status, ReplicationStatusType::Pending) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if let Some(raw) = oi.replication_status_internal.as_ref() {
|
||||
let statuses = replication_statuses_map(raw);
|
||||
if statuses
|
||||
.values()
|
||||
.any(|status| matches!(status, ReplicationStatusType::Failed))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if pending_lagging
|
||||
&& statuses
|
||||
.values()
|
||||
.any(|status| matches!(status, ReplicationStatusType::Pending))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
fn hydrate_replication_metadata(oi: &ObjectInfo) -> ObjectInfo {
|
||||
let mut enriched = oi.clone();
|
||||
|
||||
if enriched.replication_status.is_empty() {
|
||||
if let Some(status) = enriched.user_defined.lookup(AMZ_BUCKET_REPLICATION_STATUS) {
|
||||
enriched.replication_status = ReplicationStatusType::from(status);
|
||||
}
|
||||
}
|
||||
|
||||
if enriched.replication_status_internal.is_none() {
|
||||
if let Some(raw) = enriched.user_defined.lookup(Self::INTERNAL_REPLICATION_STATUS_KEY) {
|
||||
if !raw.is_empty() {
|
||||
enriched.replication_status_internal = Some(raw.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if enriched.version_purge_status_internal.is_none() {
|
||||
if let Some(raw) = enriched.user_defined.lookup(VERSION_PURGE_STATUS_KEY) {
|
||||
if !raw.is_empty() {
|
||||
enriched.version_purge_status_internal = Some(raw.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enriched
|
||||
}
|
||||
|
||||
fn is_pending_lagging(&self, oi: &ObjectInfo) -> bool {
|
||||
if !matches!(oi.replication_status, ReplicationStatusType::Pending) {
|
||||
return false;
|
||||
}
|
||||
|
||||
let Some(mod_time) = oi.mod_time else {
|
||||
return false;
|
||||
};
|
||||
|
||||
let grace = TimeDuration::try_from(self.replication_pending_grace).unwrap_or_else(|_| {
|
||||
warn!(
|
||||
"replication_pending_grace is invalid, using default value: 0 seconds, grace: {:?}",
|
||||
self.replication_pending_grace
|
||||
);
|
||||
TimeDuration::seconds(0)
|
||||
});
|
||||
if grace.is_zero() {
|
||||
return true;
|
||||
}
|
||||
|
||||
let elapsed = OffsetDateTime::now_utc() - mod_time;
|
||||
elapsed >= grace
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn replication_metrics_handle_tracks_counts() {
|
||||
let metrics = Arc::new(MetricsCollector::new());
|
||||
let bucket_metrics = Arc::new(Mutex::new(HashMap::new()));
|
||||
let handle = ReplicationMetricsHandle::new(metrics.clone(), bucket_metrics.clone());
|
||||
|
||||
handle
|
||||
.record_status("test-bucket", ReplicationStatusType::Pending, true)
|
||||
.await;
|
||||
handle
|
||||
.record_status("test-bucket", ReplicationStatusType::Failed, false)
|
||||
.await;
|
||||
handle.record_task_submission("test-bucket").await;
|
||||
|
||||
let snapshot = metrics.get_metrics();
|
||||
assert_eq!(snapshot.replication_pending_objects, 1);
|
||||
assert_eq!(snapshot.replication_failed_objects, 1);
|
||||
assert_eq!(snapshot.replication_lagging_objects, 1);
|
||||
assert_eq!(snapshot.replication_tasks_queued, 1);
|
||||
|
||||
let guard = bucket_metrics.lock().await;
|
||||
let bucket_entry = guard.get("test-bucket").expect("bucket metrics exists");
|
||||
assert_eq!(bucket_entry.replication_pending, 1);
|
||||
assert_eq!(bucket_entry.replication_failed, 1);
|
||||
assert_eq!(bucket_entry.replication_lagging, 1);
|
||||
assert_eq!(bucket_entry.replication_tasks_queued, 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,6 +62,7 @@ struct DiskScanResult {
|
||||
pub struct LocalObjectRecord {
|
||||
pub usage: LocalObjectUsage,
|
||||
pub object_info: Option<rustfs_ecstore::store_api::ObjectInfo>,
|
||||
pub file_info: Option<FileInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -223,9 +224,11 @@ fn scan_disk_blocking(root: PathBuf, meta: LocalUsageSnapshotMeta, mut state: In
|
||||
record.usage.last_modified_ns = mtime_ns;
|
||||
state.objects.insert(rel_path.clone(), record.usage.clone());
|
||||
emitted.insert(rel_path.clone());
|
||||
warn!("compute_object_usage: record: {:?}", record.clone());
|
||||
objects_by_bucket.entry(record.usage.bucket.clone()).or_default().push(record);
|
||||
}
|
||||
Ok(None) => {
|
||||
warn!("compute_object_usage: None, rel_path: {:?}", rel_path);
|
||||
state.objects.remove(&rel_path);
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -240,24 +243,27 @@ fn scan_disk_blocking(root: PathBuf, meta: LocalUsageSnapshotMeta, mut state: In
|
||||
warn!("Failed to read xl.meta {:?}: {}", xl_path, err);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("should_parse: false, rel_path: {:?}", rel_path);
|
||||
}
|
||||
}
|
||||
|
||||
state.objects.retain(|key, _| visited.contains(key));
|
||||
state.last_scan_ns = Some(now_ns);
|
||||
|
||||
for (key, usage) in &state.objects {
|
||||
if emitted.contains(key) {
|
||||
continue;
|
||||
}
|
||||
objects_by_bucket
|
||||
.entry(usage.bucket.clone())
|
||||
.or_default()
|
||||
.push(LocalObjectRecord {
|
||||
usage: usage.clone(),
|
||||
object_info: None,
|
||||
});
|
||||
}
|
||||
// for (key, usage) in &state.objects {
|
||||
// if emitted.contains(key) {
|
||||
// continue;
|
||||
// }
|
||||
// objects_by_bucket
|
||||
// .entry(usage.bucket.clone())
|
||||
// .or_default()
|
||||
// .push(LocalObjectRecord {
|
||||
// usage: usage.clone(),
|
||||
// object_info: None,
|
||||
// file_info: None,
|
||||
// });
|
||||
// }
|
||||
|
||||
let snapshot = build_snapshot(meta, &state.objects, now);
|
||||
status.snapshot_exists = true;
|
||||
@@ -319,6 +325,7 @@ fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Res
|
||||
let versioned = fi.version_id.is_some();
|
||||
ObjectInfo::from_file_info(fi, bucket, object, versioned)
|
||||
});
|
||||
let file_info = latest_file_info.clone();
|
||||
|
||||
Ok(Some(LocalObjectRecord {
|
||||
usage: LocalObjectUsage {
|
||||
@@ -331,6 +338,7 @@ fn compute_object_usage(bucket: &str, object: &str, file_meta: &FileMeta) -> Res
|
||||
has_live_object,
|
||||
},
|
||||
object_info,
|
||||
file_info,
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -45,6 +45,14 @@ pub struct ScannerMetrics {
|
||||
pub healthy_objects: u64,
|
||||
/// Total corrupted objects found
|
||||
pub corrupted_objects: u64,
|
||||
/// Replication heal tasks queued
|
||||
pub replication_tasks_queued: u64,
|
||||
/// Objects observed with pending replication
|
||||
pub replication_pending_objects: u64,
|
||||
/// Objects observed with failed replication
|
||||
pub replication_failed_objects: u64,
|
||||
/// Objects with replication pending longer than grace period
|
||||
pub replication_lagging_objects: u64,
|
||||
/// Last scan activity time
|
||||
pub last_activity: Option<SystemTime>,
|
||||
/// Current scan cycle
|
||||
@@ -86,6 +94,14 @@ pub struct BucketMetrics {
|
||||
pub heal_tasks_completed: u64,
|
||||
/// Heal tasks failed for this bucket
|
||||
pub heal_tasks_failed: u64,
|
||||
/// Objects observed with pending replication status
|
||||
pub replication_pending: u64,
|
||||
/// Objects observed with failed replication status
|
||||
pub replication_failed: u64,
|
||||
/// Objects exceeding replication grace period
|
||||
pub replication_lagging: u64,
|
||||
/// Replication heal tasks queued for this bucket
|
||||
pub replication_tasks_queued: u64,
|
||||
}
|
||||
|
||||
/// Disk-specific metrics
|
||||
@@ -127,6 +143,10 @@ pub struct MetricsCollector {
|
||||
total_cycles: AtomicU64,
|
||||
healthy_objects: AtomicU64,
|
||||
corrupted_objects: AtomicU64,
|
||||
replication_tasks_queued: AtomicU64,
|
||||
replication_pending_objects: AtomicU64,
|
||||
replication_failed_objects: AtomicU64,
|
||||
replication_lagging_objects: AtomicU64,
|
||||
}
|
||||
|
||||
impl MetricsCollector {
|
||||
@@ -146,6 +166,10 @@ impl MetricsCollector {
|
||||
total_cycles: AtomicU64::new(0),
|
||||
healthy_objects: AtomicU64::new(0),
|
||||
corrupted_objects: AtomicU64::new(0),
|
||||
replication_tasks_queued: AtomicU64::new(0),
|
||||
replication_pending_objects: AtomicU64::new(0),
|
||||
replication_failed_objects: AtomicU64::new(0),
|
||||
replication_lagging_objects: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,6 +218,26 @@ impl MetricsCollector {
|
||||
self.heal_tasks_failed.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment replication tasks queued
|
||||
pub fn increment_replication_tasks_queued(&self, count: u64) {
|
||||
self.replication_tasks_queued.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment replication pending objects
|
||||
pub fn increment_replication_pending_objects(&self, count: u64) {
|
||||
self.replication_pending_objects.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment replication failed objects
|
||||
pub fn increment_replication_failed_objects(&self, count: u64) {
|
||||
self.replication_failed_objects.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Increment replication lagging objects
|
||||
pub fn increment_replication_lagging_objects(&self, count: u64) {
|
||||
self.replication_lagging_objects.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Set current cycle
|
||||
pub fn set_current_cycle(&self, cycle: u64) {
|
||||
self.current_cycle.store(cycle, Ordering::Relaxed);
|
||||
@@ -228,6 +272,10 @@ impl MetricsCollector {
|
||||
heal_tasks_failed: self.heal_tasks_failed.load(Ordering::Relaxed),
|
||||
healthy_objects: self.healthy_objects.load(Ordering::Relaxed),
|
||||
corrupted_objects: self.corrupted_objects.load(Ordering::Relaxed),
|
||||
replication_tasks_queued: self.replication_tasks_queued.load(Ordering::Relaxed),
|
||||
replication_pending_objects: self.replication_pending_objects.load(Ordering::Relaxed),
|
||||
replication_failed_objects: self.replication_failed_objects.load(Ordering::Relaxed),
|
||||
replication_lagging_objects: self.replication_lagging_objects.load(Ordering::Relaxed),
|
||||
last_activity: Some(SystemTime::now()),
|
||||
current_cycle: self.current_cycle.load(Ordering::Relaxed),
|
||||
total_cycles: self.total_cycles.load(Ordering::Relaxed),
|
||||
@@ -255,6 +303,10 @@ impl MetricsCollector {
|
||||
self.total_cycles.store(0, Ordering::Relaxed);
|
||||
self.healthy_objects.store(0, Ordering::Relaxed);
|
||||
self.corrupted_objects.store(0, Ordering::Relaxed);
|
||||
self.replication_tasks_queued.store(0, Ordering::Relaxed);
|
||||
self.replication_pending_objects.store(0, Ordering::Relaxed);
|
||||
self.replication_failed_objects.store(0, Ordering::Relaxed);
|
||||
self.replication_lagging_objects.store(0, Ordering::Relaxed);
|
||||
|
||||
info!("Scanner metrics reset");
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use crate::scanner::{
|
||||
};
|
||||
use rustfs_common::data_usage::DataUsageInfo;
|
||||
use rustfs_ecstore::StorageAPI;
|
||||
use rustfs_ecstore::bucket::utils::is_meta_bucketname;
|
||||
use rustfs_ecstore::disk::{DiskAPI, DiskStore};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
@@ -879,7 +880,7 @@ impl NodeScanner {
|
||||
let bucket_name = &bucket_info.name;
|
||||
|
||||
// skip system internal buckets
|
||||
if bucket_name == ".minio.sys" {
|
||||
if is_meta_bucketname(bucket_name) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -12,31 +12,52 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use rustfs_ahm::heal::{
|
||||
manager::{HealConfig, HealManager},
|
||||
storage::{ECStoreHealStorage, HealStorageAPI},
|
||||
task::{HealOptions, HealPriority, HealRequest, HealTaskStatus, HealType},
|
||||
use async_trait::async_trait;
|
||||
use rustfs_ahm::{
|
||||
heal::{
|
||||
manager::{HealConfig, HealManager},
|
||||
storage::{ECStoreHealStorage, HealStorageAPI},
|
||||
task::{HealOptions, HealPriority, HealRequest, HealTaskStatus, HealType},
|
||||
},
|
||||
scanner::{ScanMode, Scanner},
|
||||
};
|
||||
use rustfs_common::heal_channel::{HealOpts, HealScanMode};
|
||||
use rustfs_ecstore::bucket::metadata_sys::{self, set_bucket_metadata};
|
||||
use rustfs_ecstore::bucket::replication::{
|
||||
DeletedObjectReplicationInfo, DynReplicationPool, GLOBAL_REPLICATION_POOL, ReplicationPoolTrait, ReplicationPriority,
|
||||
};
|
||||
use rustfs_ecstore::bucket::target::{BucketTarget, BucketTargetType, BucketTargets};
|
||||
use rustfs_ecstore::bucket::utils::serialize;
|
||||
use rustfs_ecstore::error::Error as EcstoreError;
|
||||
use rustfs_ecstore::{
|
||||
disk::endpoint::Endpoint,
|
||||
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
|
||||
store::ECStore,
|
||||
store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
|
||||
};
|
||||
use rustfs_filemeta::{ReplicateObjectInfo, ReplicationStatusType};
|
||||
use rustfs_utils::http::headers::{AMZ_BUCKET_REPLICATION_STATUS, RESERVED_METADATA_PREFIX_LOWER};
|
||||
use s3s::dto::{
|
||||
BucketVersioningStatus, Destination, ExistingObjectReplication, ExistingObjectReplicationStatus, ReplicationConfiguration,
|
||||
ReplicationRule, ReplicationRuleStatus, VersioningConfiguration,
|
||||
};
|
||||
use serial_test::serial;
|
||||
use std::{
|
||||
os::unix::fs::PermissionsExt,
|
||||
path::PathBuf,
|
||||
sync::{Arc, Once, OnceLock},
|
||||
time::Duration,
|
||||
};
|
||||
use time::OffsetDateTime;
|
||||
use tokio::fs;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>, Arc<ECStoreHealStorage>)> = OnceLock::new();
|
||||
static INIT: Once = Once::new();
|
||||
const TEST_REPLICATION_TARGET_ARN: &str = "arn:aws:s3:::rustfs-replication-heal-target";
|
||||
|
||||
fn init_tracing() {
|
||||
INIT.call_once(|| {
|
||||
@@ -145,6 +166,225 @@ async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str,
|
||||
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
|
||||
}
|
||||
|
||||
fn delete_first_part_file(disk_paths: &[PathBuf], bucket: &str, object: &str) -> PathBuf {
|
||||
for disk_path in disk_paths {
|
||||
let obj_dir = disk_path.join(bucket).join(object);
|
||||
if !obj_dir.exists() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(part_path) = WalkDir::new(&obj_dir)
|
||||
.min_depth(2)
|
||||
.max_depth(2)
|
||||
.into_iter()
|
||||
.filter_map(Result::ok)
|
||||
.find(|entry| {
|
||||
entry.file_type().is_file()
|
||||
&& entry
|
||||
.file_name()
|
||||
.to_str()
|
||||
.map(|name| name.starts_with("part."))
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.map(|entry| entry.into_path())
|
||||
{
|
||||
std::fs::remove_file(&part_path).expect("Failed to delete part file");
|
||||
return part_path;
|
||||
}
|
||||
}
|
||||
|
||||
panic!("Failed to locate part file for {}/{}", bucket, object);
|
||||
}
|
||||
|
||||
fn delete_xl_meta_file(disk_paths: &[PathBuf], bucket: &str, object: &str) -> PathBuf {
|
||||
for disk_path in disk_paths {
|
||||
let xl_meta_path = disk_path.join(bucket).join(object).join("xl.meta");
|
||||
if xl_meta_path.exists() {
|
||||
std::fs::remove_file(&xl_meta_path).expect("Failed to delete xl.meta file");
|
||||
return xl_meta_path;
|
||||
}
|
||||
}
|
||||
|
||||
panic!("Failed to locate xl.meta for {}/{}", bucket, object);
|
||||
}
|
||||
|
||||
struct FormatPathGuard {
|
||||
original: PathBuf,
|
||||
backup: PathBuf,
|
||||
}
|
||||
|
||||
impl FormatPathGuard {
|
||||
fn new(original: PathBuf) -> std::io::Result<Self> {
|
||||
let backup = original.with_extension("bak");
|
||||
if backup.exists() {
|
||||
std::fs::remove_file(&backup)?;
|
||||
}
|
||||
std::fs::rename(&original, &backup)?;
|
||||
Ok(Self { original, backup })
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for FormatPathGuard {
|
||||
fn drop(&mut self) {
|
||||
if self.backup.exists() {
|
||||
let _ = std::fs::rename(&self.backup, &self.original);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct PermissionGuard {
|
||||
path: PathBuf,
|
||||
original_mode: u32,
|
||||
}
|
||||
|
||||
impl PermissionGuard {
|
||||
fn new(path: PathBuf, new_mode: u32) -> std::io::Result<Self> {
|
||||
let metadata = std::fs::metadata(&path)?;
|
||||
let original_mode = metadata.permissions().mode();
|
||||
std::fs::set_permissions(&path, std::fs::Permissions::from_mode(new_mode))?;
|
||||
Ok(Self { path, original_mode })
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PermissionGuard {
|
||||
fn drop(&mut self) {
|
||||
if self.path.exists() {
|
||||
let _ = std::fs::set_permissions(&self.path, std::fs::Permissions::from_mode(self.original_mode));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct RecordingReplicationPool {
|
||||
replica_tasks: Mutex<Vec<ReplicateObjectInfo>>,
|
||||
delete_tasks: Mutex<Vec<DeletedObjectReplicationInfo>>,
|
||||
}
|
||||
|
||||
impl RecordingReplicationPool {
|
||||
async fn take_replica_tasks(&self) -> Vec<ReplicateObjectInfo> {
|
||||
let mut guard = self.replica_tasks.lock().await;
|
||||
guard.drain(..).collect()
|
||||
}
|
||||
|
||||
async fn clear(&self) {
|
||||
self.replica_tasks.lock().await.clear();
|
||||
self.delete_tasks.lock().await.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ReplicationPoolTrait for RecordingReplicationPool {
|
||||
async fn queue_replica_task(&self, ri: ReplicateObjectInfo) {
|
||||
self.replica_tasks.lock().await.push(ri);
|
||||
}
|
||||
|
||||
async fn queue_replica_delete_task(&self, ri: DeletedObjectReplicationInfo) {
|
||||
self.delete_tasks.lock().await.push(ri);
|
||||
}
|
||||
|
||||
async fn resize(&self, _priority: ReplicationPriority, _max_workers: usize, _max_l_workers: usize) {}
|
||||
|
||||
async fn init_resync(
|
||||
self: Arc<Self>,
|
||||
_cancellation_token: CancellationToken,
|
||||
_buckets: Vec<String>,
|
||||
) -> Result<(), EcstoreError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn ensure_test_replication_pool() -> Arc<RecordingReplicationPool> {
|
||||
static TEST_POOL: OnceLock<Arc<RecordingReplicationPool>> = OnceLock::new();
|
||||
|
||||
if let Some(pool) = TEST_POOL.get() {
|
||||
pool.clear().await;
|
||||
return pool.clone();
|
||||
}
|
||||
|
||||
let pool = Arc::new(RecordingReplicationPool::default());
|
||||
let dyn_pool: Arc<DynReplicationPool> = pool.clone();
|
||||
let global_pool = GLOBAL_REPLICATION_POOL
|
||||
.get_or_init(|| {
|
||||
let pool_clone = dyn_pool.clone();
|
||||
async move { pool_clone }
|
||||
})
|
||||
.await
|
||||
.clone();
|
||||
|
||||
assert!(
|
||||
Arc::ptr_eq(&dyn_pool, &global_pool),
|
||||
"GLOBAL_REPLICATION_POOL initialized before test replication pool"
|
||||
);
|
||||
|
||||
let _ = TEST_POOL.set(pool.clone());
|
||||
pool.clear().await;
|
||||
pool
|
||||
}
|
||||
|
||||
async fn configure_bucket_replication(bucket: &str, target_arn: &str) {
|
||||
let meta = metadata_sys::get(bucket)
|
||||
.await
|
||||
.expect("bucket metadata should exist for replication configuration");
|
||||
let mut metadata = (*meta).clone();
|
||||
|
||||
let replication_rule = ReplicationRule {
|
||||
delete_marker_replication: None,
|
||||
delete_replication: None,
|
||||
destination: Destination {
|
||||
access_control_translation: None,
|
||||
account: None,
|
||||
bucket: target_arn.to_string(),
|
||||
encryption_configuration: None,
|
||||
metrics: None,
|
||||
replication_time: None,
|
||||
storage_class: None,
|
||||
},
|
||||
existing_object_replication: Some(ExistingObjectReplication {
|
||||
status: ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::ENABLED),
|
||||
}),
|
||||
filter: None,
|
||||
id: Some("heal-replication-rule".to_string()),
|
||||
prefix: Some(String::new()),
|
||||
priority: Some(1),
|
||||
source_selection_criteria: None,
|
||||
status: ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED),
|
||||
};
|
||||
|
||||
let replication_cfg = ReplicationConfiguration {
|
||||
role: target_arn.to_string(),
|
||||
rules: vec![replication_rule],
|
||||
};
|
||||
|
||||
let bucket_targets = BucketTargets {
|
||||
targets: vec![BucketTarget {
|
||||
source_bucket: bucket.to_string(),
|
||||
endpoint: "replication.invalid".to_string(),
|
||||
target_bucket: "replication-target".to_string(),
|
||||
arn: target_arn.to_string(),
|
||||
target_type: BucketTargetType::ReplicationService,
|
||||
..Default::default()
|
||||
}],
|
||||
};
|
||||
|
||||
metadata.replication_config = Some(replication_cfg.clone());
|
||||
metadata.replication_config_xml = serialize(&replication_cfg).expect("serialize replication config");
|
||||
metadata.replication_config_updated_at = OffsetDateTime::now_utc();
|
||||
metadata.bucket_target_config = Some(bucket_targets.clone());
|
||||
metadata.bucket_targets_config_json = serde_json::to_vec(&bucket_targets).expect("serialize bucket targets");
|
||||
metadata.bucket_targets_config_updated_at = OffsetDateTime::now_utc();
|
||||
let versioning_cfg = VersioningConfiguration {
|
||||
status: Some(BucketVersioningStatus::from_static(BucketVersioningStatus::ENABLED)),
|
||||
..Default::default()
|
||||
};
|
||||
metadata.versioning_config = Some(versioning_cfg.clone());
|
||||
metadata.versioning_config_xml = serialize(&versioning_cfg).expect("serialize versioning config");
|
||||
metadata.versioning_config_updated_at = OffsetDateTime::now_utc();
|
||||
|
||||
set_bucket_metadata(bucket.to_string(), metadata)
|
||||
.await
|
||||
.expect("failed to update bucket metadata for replication");
|
||||
}
|
||||
|
||||
mod serial_tests {
|
||||
use super::*;
|
||||
|
||||
@@ -430,4 +670,380 @@ mod serial_tests {
|
||||
|
||||
info!("Direct heal storage API test passed");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_submits_heal_task_when_part_missing() {
|
||||
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let bucket_name = format!("scanner-heal-bucket-{}", uuid::Uuid::new_v4().simple());
|
||||
let object_name = "scanner-heal-object.txt";
|
||||
create_test_bucket(&ecstore, &bucket_name).await;
|
||||
upload_test_object(&ecstore, &bucket_name, object_name, b"Scanner auto-heal data").await;
|
||||
|
||||
let heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 4,
|
||||
..Default::default()
|
||||
};
|
||||
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
|
||||
heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Deep).await;
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Initial scan should succeed before simulating failures");
|
||||
let baseline_stats = heal_manager.get_statistics().await;
|
||||
|
||||
let deleted_part_path = delete_first_part_file(&disk_paths, &bucket_name, object_name);
|
||||
assert!(!deleted_part_path.exists(), "Deleted part file should not exist before healing");
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Scan after part deletion should finish and enqueue heal task");
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
|
||||
let updated_stats = heal_manager.get_statistics().await;
|
||||
assert!(
|
||||
updated_stats.total_tasks > baseline_stats.total_tasks,
|
||||
"Scanner should submit heal tasks when data parts go missing"
|
||||
);
|
||||
|
||||
// Allow heal manager to restore the missing part
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
assert!(
|
||||
deleted_part_path.exists(),
|
||||
"Missing part should be restored after heal: {:?}",
|
||||
deleted_part_path
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_submits_metadata_heal_when_xl_meta_missing() {
|
||||
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let bucket_name = format!("scanner-meta-bucket-{}", uuid::Uuid::new_v4().simple());
|
||||
let object_name = "scanner-meta-object.txt";
|
||||
create_test_bucket(&ecstore, &bucket_name).await;
|
||||
upload_test_object(&ecstore, &bucket_name, object_name, b"Scanner metadata heal data").await;
|
||||
|
||||
let heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 4,
|
||||
..Default::default()
|
||||
};
|
||||
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
|
||||
heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Deep).await;
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Initial scan should succeed before metadata deletion");
|
||||
let baseline_stats = heal_manager.get_statistics().await;
|
||||
|
||||
let deleted_meta_path = delete_xl_meta_file(&disk_paths, &bucket_name, object_name);
|
||||
assert!(!deleted_meta_path.exists(), "Deleted xl.meta should not exist before healing");
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Scan after metadata deletion should finish and enqueue heal task");
|
||||
tokio::time::sleep(Duration::from_millis(800)).await;
|
||||
|
||||
let updated_stats = heal_manager.get_statistics().await;
|
||||
assert!(
|
||||
updated_stats.total_tasks > baseline_stats.total_tasks,
|
||||
"Scanner should submit metadata heal tasks when xl.meta is missing"
|
||||
);
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
assert!(
|
||||
deleted_meta_path.exists(),
|
||||
"xl.meta should be restored after heal: {:?}",
|
||||
deleted_meta_path
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_triggers_replication_heal_when_status_failed() {
|
||||
let (_disk_paths, ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let bucket_name = format!("scanner-replication-bucket-{}", uuid::Uuid::new_v4().simple());
|
||||
let object_name = "scanner-replication-heal-object";
|
||||
create_test_bucket(&ecstore, &bucket_name).await;
|
||||
configure_bucket_replication(&bucket_name, TEST_REPLICATION_TARGET_ARN).await;
|
||||
|
||||
let replication_pool = ensure_test_replication_pool().await;
|
||||
replication_pool.clear().await;
|
||||
|
||||
let mut opts = ObjectOptions::default();
|
||||
opts.user_defined.insert(
|
||||
AMZ_BUCKET_REPLICATION_STATUS.to_string(),
|
||||
ReplicationStatusType::Failed.as_str().to_string(),
|
||||
);
|
||||
let replication_status_key = format!("{}replication-status", RESERVED_METADATA_PREFIX_LOWER);
|
||||
opts.user_defined.insert(
|
||||
replication_status_key.clone(),
|
||||
format!("{}={};", TEST_REPLICATION_TARGET_ARN, ReplicationStatusType::Failed.as_str()),
|
||||
);
|
||||
let mut reader = PutObjReader::from_vec(b"replication heal data".to_vec());
|
||||
ecstore
|
||||
.put_object(&bucket_name, object_name, &mut reader, &opts)
|
||||
.await
|
||||
.expect("Failed to upload replication test object");
|
||||
|
||||
let object_info = ecstore
|
||||
.get_object_info(&bucket_name, object_name, &ObjectOptions::default())
|
||||
.await
|
||||
.expect("Failed to read object info for replication test");
|
||||
assert_eq!(
|
||||
object_info
|
||||
.user_defined
|
||||
.get(AMZ_BUCKET_REPLICATION_STATUS)
|
||||
.map(|s| s.as_str()),
|
||||
Some(ReplicationStatusType::Failed.as_str()),
|
||||
"Uploaded object should contain replication status metadata"
|
||||
);
|
||||
assert!(
|
||||
object_info
|
||||
.user_defined
|
||||
.get(&replication_status_key)
|
||||
.map(|s| s.contains(ReplicationStatusType::Failed.as_str()))
|
||||
.unwrap_or(false),
|
||||
"Uploaded object should preserve internal replication status metadata"
|
||||
);
|
||||
|
||||
let heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 4,
|
||||
..Default::default()
|
||||
};
|
||||
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
|
||||
heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Deep).await;
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Scan cycle should succeed and evaluate replication state");
|
||||
|
||||
let replica_tasks = replication_pool.take_replica_tasks().await;
|
||||
assert!(
|
||||
replica_tasks
|
||||
.iter()
|
||||
.any(|info| info.bucket == bucket_name && info.name == object_name),
|
||||
"Scanner should enqueue replication heal task when replication status is FAILED (recorded tasks: {:?})",
|
||||
replica_tasks
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_submits_erasure_set_heal_when_disk_offline() {
|
||||
let (disk_paths, _ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let format_path = disk_paths[0].join(".rustfs.sys").join("format.json");
|
||||
assert!(format_path.exists(), "format.json should exist before simulating offline disk");
|
||||
let _format_guard = FormatPathGuard::new(format_path.clone()).expect("failed to move format.json");
|
||||
|
||||
let heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 2,
|
||||
..Default::default()
|
||||
};
|
||||
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
|
||||
heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Normal).await;
|
||||
|
||||
let baseline_stats = heal_manager.get_statistics().await;
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Scan cycle should complete even when a disk is offline");
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
let updated_stats = heal_manager.get_statistics().await;
|
||||
|
||||
assert!(
|
||||
updated_stats.total_tasks > baseline_stats.total_tasks,
|
||||
"Scanner should enqueue erasure set heal when disk is offline (before {}, after {})",
|
||||
baseline_stats.total_tasks,
|
||||
updated_stats.total_tasks
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_submits_erasure_set_heal_when_listing_volumes_fails() {
|
||||
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let bucket_name = format!("scanner-list-volumes-{}", uuid::Uuid::new_v4().simple());
|
||||
let object_name = "scanner-list-volumes-object";
|
||||
create_test_bucket(&ecstore, &bucket_name).await;
|
||||
upload_test_object(&ecstore, &bucket_name, object_name, b"disk list volumes failure").await;
|
||||
|
||||
let heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 2,
|
||||
..Default::default()
|
||||
};
|
||||
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
|
||||
heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Deep).await;
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Initial scan should succeed before simulating disk permission issues");
|
||||
let baseline_stats = heal_manager.get_statistics().await;
|
||||
|
||||
let disk_root = disk_paths[0].clone();
|
||||
assert!(disk_root.exists(), "Disk root should exist so we can simulate permission failures");
|
||||
|
||||
{
|
||||
let _root_perm_guard =
|
||||
PermissionGuard::new(disk_root.clone(), 0o000).expect("Failed to change disk root permissions");
|
||||
|
||||
let scan_result = scanner.scan_cycle().await;
|
||||
assert!(
|
||||
scan_result.is_ok(),
|
||||
"Scan cycle should continue even if disk volumes cannot be listed: {:?}",
|
||||
scan_result
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
let updated_stats = heal_manager.get_statistics().await;
|
||||
|
||||
assert!(
|
||||
updated_stats.total_tasks > baseline_stats.total_tasks,
|
||||
"Scanner should enqueue erasure set heal when listing volumes fails (before {}, after {})",
|
||||
baseline_stats.total_tasks,
|
||||
updated_stats.total_tasks
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_submits_erasure_set_heal_when_disk_access_fails() {
|
||||
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let bucket_name = format!("scanner-access-error-{}", uuid::Uuid::new_v4().simple());
|
||||
let object_name = "scanner-access-error-object.txt";
|
||||
create_test_bucket(&ecstore, &bucket_name).await;
|
||||
upload_test_object(&ecstore, &bucket_name, object_name, b"disk access failure").await;
|
||||
|
||||
let bucket_path = disk_paths[0].join(&bucket_name);
|
||||
assert!(bucket_path.exists(), "Bucket path should exist on disk for access test");
|
||||
let _perm_guard = PermissionGuard::new(bucket_path.clone(), 0o000).expect("Failed to change permissions");
|
||||
|
||||
let heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 2,
|
||||
..Default::default()
|
||||
};
|
||||
let heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(heal_cfg)));
|
||||
heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Deep).await;
|
||||
|
||||
let baseline_stats = heal_manager.get_statistics().await;
|
||||
let scan_result = scanner.scan_cycle().await;
|
||||
assert!(
|
||||
scan_result.is_ok(),
|
||||
"Scan cycle should complete even if a disk volume has access errors: {:?}",
|
||||
scan_result
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
let updated_stats = heal_manager.get_statistics().await;
|
||||
|
||||
assert!(
|
||||
updated_stats.total_tasks > baseline_stats.total_tasks,
|
||||
"Scanner should enqueue erasure set heal when disk access fails (before {}, after {})",
|
||||
baseline_stats.total_tasks,
|
||||
updated_stats.total_tasks
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_scanner_detects_missing_bucket_directory_and_queues_bucket_heal() {
|
||||
let (disk_paths, ecstore, heal_storage) = setup_test_env().await;
|
||||
|
||||
let bucket_name = format!("scanner-missing-bucket-{}", uuid::Uuid::new_v4().simple());
|
||||
create_test_bucket(&ecstore, &bucket_name).await;
|
||||
upload_test_object(&ecstore, &bucket_name, "seed-object", b"bucket heal data").await;
|
||||
|
||||
let scanner_heal_cfg = HealConfig {
|
||||
enable_auto_heal: true,
|
||||
heal_interval: Duration::from_millis(20),
|
||||
max_concurrent_heals: 4,
|
||||
..Default::default()
|
||||
};
|
||||
let scanner_heal_manager = Arc::new(HealManager::new(heal_storage.clone(), Some(scanner_heal_cfg)));
|
||||
scanner_heal_manager.start().await.unwrap();
|
||||
|
||||
let scanner = Scanner::new(None, Some(scanner_heal_manager.clone()));
|
||||
scanner.initialize_with_ecstore().await;
|
||||
scanner.set_config_enable_healing(true).await;
|
||||
scanner.set_config_scan_mode(ScanMode::Normal).await;
|
||||
|
||||
scanner
|
||||
.scan_cycle()
|
||||
.await
|
||||
.expect("Initial scan should succeed before deleting bucket directory");
|
||||
let baseline_stats = scanner_heal_manager.get_statistics().await;
|
||||
|
||||
let missing_dir = disk_paths[0].join(&bucket_name);
|
||||
assert!(missing_dir.exists());
|
||||
std::fs::remove_dir_all(&missing_dir).expect("Failed to remove bucket directory for heal simulation");
|
||||
assert!(!missing_dir.exists(), "Bucket directory should be removed on disk to trigger heal");
|
||||
|
||||
scanner
|
||||
.run_volume_consistency_check()
|
||||
.await
|
||||
.expect("Volume consistency check should run after bucket removal");
|
||||
tokio::time::sleep(Duration::from_millis(800)).await;
|
||||
|
||||
let updated_stats = scanner_heal_manager.get_statistics().await;
|
||||
assert!(
|
||||
updated_stats.total_tasks > baseline_stats.total_tasks,
|
||||
"Scanner should submit bucket heal tasks when a bucket directory is missing"
|
||||
);
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
assert!(missing_dir.exists(), "Bucket directory should be restored after heal");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,10 +12,18 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use rustfs_ahm::scanner::{Scanner, data_scanner::ScannerConfig};
|
||||
use rustfs_ecstore::{
|
||||
bucket::metadata::BUCKET_LIFECYCLE_CONFIG,
|
||||
bucket::metadata_sys,
|
||||
bucket::{
|
||||
metadata::BUCKET_LIFECYCLE_CONFIG,
|
||||
metadata_sys,
|
||||
replication::{
|
||||
DeletedObjectReplicationInfo, DynReplicationPool, GLOBAL_REPLICATION_POOL, ReplicationPoolTrait, ReplicationPriority,
|
||||
},
|
||||
target::{BucketTarget, BucketTargetType, BucketTargets},
|
||||
utils::serialize,
|
||||
},
|
||||
disk::endpoint::Endpoint,
|
||||
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
|
||||
global::GLOBAL_TierConfigMgr,
|
||||
@@ -23,18 +31,27 @@ use rustfs_ecstore::{
|
||||
store_api::{MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
|
||||
tier::tier_config::{TierConfig, TierMinIO, TierType},
|
||||
};
|
||||
use rustfs_filemeta::{ReplicateObjectInfo, ReplicationStatusType};
|
||||
use rustfs_utils::http::headers::{AMZ_BUCKET_REPLICATION_STATUS, RESERVED_METADATA_PREFIX_LOWER};
|
||||
use s3s::dto::{
|
||||
BucketVersioningStatus, Destination, ExistingObjectReplication, ExistingObjectReplicationStatus, ReplicationConfiguration,
|
||||
ReplicationRule, ReplicationRuleStatus, VersioningConfiguration,
|
||||
};
|
||||
use serial_test::serial;
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
sync::{Arc, Once, OnceLock},
|
||||
time::Duration,
|
||||
};
|
||||
use time::{Duration as TimeDuration, OffsetDateTime};
|
||||
use tokio::fs;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
|
||||
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
|
||||
static INIT: Once = Once::new();
|
||||
const TEST_REPLICATION_TARGET_ARN: &str = "arn:aws:s3:::rustfs-lifecycle-replication-test";
|
||||
|
||||
fn init_tracing() {
|
||||
INIT.call_once(|| {
|
||||
@@ -159,6 +176,167 @@ async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str,
|
||||
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct RecordingReplicationPool {
|
||||
replica_tasks: Mutex<Vec<ReplicateObjectInfo>>,
|
||||
delete_tasks: Mutex<Vec<DeletedObjectReplicationInfo>>,
|
||||
}
|
||||
|
||||
impl RecordingReplicationPool {
|
||||
async fn take_replica_tasks(&self) -> Vec<ReplicateObjectInfo> {
|
||||
let mut guard = self.replica_tasks.lock().await;
|
||||
guard.drain(..).collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ReplicationPoolTrait for RecordingReplicationPool {
|
||||
async fn queue_replica_task(&self, ri: ReplicateObjectInfo) {
|
||||
self.replica_tasks.lock().await.push(ri);
|
||||
}
|
||||
|
||||
async fn queue_replica_delete_task(&self, ri: DeletedObjectReplicationInfo) {
|
||||
self.delete_tasks.lock().await.push(ri);
|
||||
}
|
||||
|
||||
async fn resize(&self, _priority: ReplicationPriority, _max_workers: usize, _max_l_workers: usize) {}
|
||||
|
||||
async fn init_resync(
|
||||
self: Arc<Self>,
|
||||
_cancellation_token: CancellationToken,
|
||||
_buckets: Vec<String>,
|
||||
) -> Result<(), rustfs_ecstore::error::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn ensure_test_replication_pool() -> Arc<RecordingReplicationPool> {
|
||||
static POOL: OnceLock<Arc<RecordingReplicationPool>> = OnceLock::new();
|
||||
if let Some(existing) = POOL.get() {
|
||||
existing.replica_tasks.lock().await.clear();
|
||||
existing.delete_tasks.lock().await.clear();
|
||||
return existing.clone();
|
||||
}
|
||||
|
||||
let pool = Arc::new(RecordingReplicationPool::default());
|
||||
let dyn_pool: Arc<DynReplicationPool> = pool.clone();
|
||||
GLOBAL_REPLICATION_POOL
|
||||
.get_or_init(|| {
|
||||
let pool_clone = dyn_pool.clone();
|
||||
async move { pool_clone }
|
||||
})
|
||||
.await;
|
||||
let _ = POOL.set(pool.clone());
|
||||
pool
|
||||
}
|
||||
|
||||
async fn configure_bucket_replication(bucket: &str) {
|
||||
let meta = metadata_sys::get(bucket)
|
||||
.await
|
||||
.expect("bucket metadata should exist for replication configuration");
|
||||
let mut metadata = (*meta).clone();
|
||||
|
||||
let replication_rule = ReplicationRule {
|
||||
delete_marker_replication: None,
|
||||
delete_replication: None,
|
||||
destination: Destination {
|
||||
access_control_translation: None,
|
||||
account: None,
|
||||
bucket: TEST_REPLICATION_TARGET_ARN.to_string(),
|
||||
encryption_configuration: None,
|
||||
metrics: None,
|
||||
replication_time: None,
|
||||
storage_class: None,
|
||||
},
|
||||
existing_object_replication: Some(ExistingObjectReplication {
|
||||
status: ExistingObjectReplicationStatus::from_static(ExistingObjectReplicationStatus::ENABLED),
|
||||
}),
|
||||
filter: None,
|
||||
id: Some("lifecycle-replication-rule".to_string()),
|
||||
prefix: Some(String::new()),
|
||||
priority: Some(1),
|
||||
source_selection_criteria: None,
|
||||
status: ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED),
|
||||
};
|
||||
|
||||
let replication_cfg = ReplicationConfiguration {
|
||||
role: TEST_REPLICATION_TARGET_ARN.to_string(),
|
||||
rules: vec![replication_rule],
|
||||
};
|
||||
|
||||
let bucket_targets = BucketTargets {
|
||||
targets: vec![BucketTarget {
|
||||
source_bucket: bucket.to_string(),
|
||||
endpoint: "replication.invalid".to_string(),
|
||||
target_bucket: "replication-target".to_string(),
|
||||
arn: TEST_REPLICATION_TARGET_ARN.to_string(),
|
||||
target_type: BucketTargetType::ReplicationService,
|
||||
..Default::default()
|
||||
}],
|
||||
};
|
||||
|
||||
metadata.replication_config = Some(replication_cfg.clone());
|
||||
metadata.replication_config_xml = serialize(&replication_cfg).expect("serialize replication config");
|
||||
metadata.bucket_target_config = Some(bucket_targets.clone());
|
||||
metadata.bucket_targets_config_json = serde_json::to_vec(&bucket_targets).expect("serialize bucket targets");
|
||||
|
||||
let versioning_cfg = VersioningConfiguration {
|
||||
status: Some(BucketVersioningStatus::from_static(BucketVersioningStatus::ENABLED)),
|
||||
..Default::default()
|
||||
};
|
||||
metadata.versioning_config = Some(versioning_cfg.clone());
|
||||
metadata.versioning_config_xml = serialize(&versioning_cfg).expect("serialize versioning config");
|
||||
|
||||
metadata_sys::set_bucket_metadata(bucket.to_string(), metadata)
|
||||
.await
|
||||
.expect("failed to persist bucket metadata with replication config");
|
||||
}
|
||||
|
||||
async fn upload_object_with_replication_status(
|
||||
ecstore: &Arc<ECStore>,
|
||||
bucket: &str,
|
||||
object: &str,
|
||||
status: ReplicationStatusType,
|
||||
) {
|
||||
let mut reader = PutObjReader::from_vec(b"replication-state".to_vec());
|
||||
let mut opts = ObjectOptions::default();
|
||||
opts.user_defined
|
||||
.insert(AMZ_BUCKET_REPLICATION_STATUS.to_string(), status.as_str().to_string());
|
||||
let internal_key = format!("{}replication-status", RESERVED_METADATA_PREFIX_LOWER);
|
||||
opts.user_defined
|
||||
.insert(internal_key, format!("{}={};", TEST_REPLICATION_TARGET_ARN, status.as_str()));
|
||||
|
||||
(**ecstore)
|
||||
.put_object(bucket, object, &mut reader, &opts)
|
||||
.await
|
||||
.expect("failed to upload replication test object");
|
||||
}
|
||||
|
||||
async fn upload_object_with_retention(ecstore: &Arc<ECStore>, bucket: &str, object: &str, data: &[u8], retain_for: Duration) {
|
||||
use s3s::header::{X_AMZ_OBJECT_LOCK_MODE, X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE};
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
|
||||
let mut reader = PutObjReader::from_vec(data.to_vec());
|
||||
let mut opts = ObjectOptions::default();
|
||||
let retain_duration = TimeDuration::try_from(retain_for).unwrap_or_else(|_| TimeDuration::seconds(0));
|
||||
let retain_until = OffsetDateTime::now_utc() + retain_duration;
|
||||
let retain_until_str = retain_until.format(&Rfc3339).expect("format retain date");
|
||||
let lock_mode_key = X_AMZ_OBJECT_LOCK_MODE.as_str().to_string();
|
||||
let lock_mode_lower = lock_mode_key.to_lowercase();
|
||||
opts.user_defined.insert(lock_mode_lower, "GOVERNANCE".to_string());
|
||||
opts.user_defined.insert(lock_mode_key, "GOVERNANCE".to_string());
|
||||
|
||||
let retain_key = X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str().to_string();
|
||||
let retain_key_lower = retain_key.to_lowercase();
|
||||
opts.user_defined.insert(retain_key_lower, retain_until_str.clone());
|
||||
opts.user_defined.insert(retain_key, retain_until_str);
|
||||
|
||||
(**ecstore)
|
||||
.put_object(bucket, object, &mut reader, &opts)
|
||||
.await
|
||||
.expect("Failed to upload retained object");
|
||||
}
|
||||
|
||||
/// Test helper: Set bucket lifecycle configuration
|
||||
async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
|
||||
@@ -694,4 +872,127 @@ mod serial_tests {
|
||||
|
||||
println!("Lifecycle transition basic test completed");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_lifecycle_respects_object_lock_retention() {
|
||||
let (_disk_paths, ecstore) = setup_test_env().await;
|
||||
|
||||
let suffix = uuid::Uuid::new_v4().simple().to_string();
|
||||
let bucket_name = format!("test-lc-lock-retention-{}", &suffix[..8]);
|
||||
let object_name = "test/locked-object.txt";
|
||||
let test_data = b"retained payload";
|
||||
|
||||
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
upload_object_with_retention(&ecstore, bucket_name.as_str(), object_name, test_data, Duration::from_secs(3600)).await;
|
||||
|
||||
assert!(
|
||||
object_exists(&ecstore, bucket_name.as_str(), object_name).await,
|
||||
"Object should exist before lifecycle processing"
|
||||
);
|
||||
|
||||
set_bucket_lifecycle(bucket_name.as_str())
|
||||
.await
|
||||
.expect("Failed to set lifecycle configuration");
|
||||
|
||||
let scanner_config = ScannerConfig {
|
||||
scan_interval: Duration::from_millis(100),
|
||||
deep_scan_interval: Duration::from_millis(500),
|
||||
max_concurrent_scans: 1,
|
||||
..Default::default()
|
||||
};
|
||||
let scanner = Scanner::new(Some(scanner_config), None);
|
||||
scanner.start().await.expect("Failed to start scanner");
|
||||
|
||||
for _ in 0..3 {
|
||||
scanner.scan_cycle().await.expect("scan cycle should succeed");
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
}
|
||||
|
||||
assert!(
|
||||
object_exists(&ecstore, bucket_name.as_str(), object_name).await,
|
||||
"Object with active retention should not be deleted by lifecycle"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_lifecycle_triggers_replication_heal_for_lagging_and_failed_objects() {
|
||||
let (_disk_paths, ecstore) = setup_test_env().await;
|
||||
|
||||
let suffix = uuid::Uuid::new_v4().simple().to_string();
|
||||
let bucket_name = format!("lc-replication-{}", &suffix[..8]);
|
||||
create_test_bucket(&ecstore, bucket_name.as_str()).await;
|
||||
configure_bucket_replication(bucket_name.as_str()).await;
|
||||
let replication_pool = ensure_test_replication_pool().await;
|
||||
|
||||
upload_object_with_replication_status(
|
||||
&ecstore,
|
||||
bucket_name.as_str(),
|
||||
"test/lagging-pending",
|
||||
ReplicationStatusType::Pending,
|
||||
)
|
||||
.await;
|
||||
upload_object_with_replication_status(
|
||||
&ecstore,
|
||||
bucket_name.as_str(),
|
||||
"test/failed-object",
|
||||
ReplicationStatusType::Failed,
|
||||
)
|
||||
.await;
|
||||
|
||||
let scanner_config = ScannerConfig {
|
||||
scan_interval: Duration::from_millis(100),
|
||||
deep_scan_interval: Duration::from_millis(500),
|
||||
max_concurrent_scans: 2,
|
||||
replication_pending_grace: Duration::from_secs(0),
|
||||
..Default::default()
|
||||
};
|
||||
let scanner = Scanner::new(Some(scanner_config), None);
|
||||
|
||||
scanner.scan_cycle().await.expect("scan cycle should complete");
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
let replica_tasks = replication_pool.take_replica_tasks().await;
|
||||
assert!(
|
||||
replica_tasks.iter().any(|t| t.name == "test/lagging-pending"),
|
||||
"Pending object should be enqueued for replication heal: {:?}",
|
||||
replica_tasks
|
||||
);
|
||||
assert!(
|
||||
replica_tasks.iter().any(|t| t.name == "test/failed-object"),
|
||||
"Failed object should be enqueued for replication heal: {:?}",
|
||||
replica_tasks
|
||||
);
|
||||
|
||||
let metrics = scanner.get_metrics().await;
|
||||
assert_eq!(
|
||||
metrics.replication_tasks_queued,
|
||||
replica_tasks.len() as u64,
|
||||
"Replication tasks queued metric should match recorded tasks"
|
||||
);
|
||||
assert!(
|
||||
metrics.replication_pending_objects >= 1,
|
||||
"Pending replication metric should be incremented"
|
||||
);
|
||||
assert!(metrics.replication_failed_objects >= 1, "Failed replication metric should be incremented");
|
||||
assert!(
|
||||
metrics.replication_lagging_objects >= 1,
|
||||
"Lagging replication metric should track pending object beyond grace"
|
||||
);
|
||||
|
||||
let bucket_metrics = metrics
|
||||
.bucket_metrics
|
||||
.get(&bucket_name)
|
||||
.expect("bucket metrics should contain replication counters");
|
||||
assert!(
|
||||
bucket_metrics.replication_pending >= 1 && bucket_metrics.replication_failed >= 1,
|
||||
"Bucket-level replication metrics should reflect observed statuses"
|
||||
);
|
||||
assert_eq!(
|
||||
bucket_metrics.replication_tasks_queued,
|
||||
replica_tasks.len() as u64,
|
||||
"Bucket-level queued counter should match enqueued tasks"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ pub fn get_object_retention_meta(meta: HashMap<String, String>) -> ObjectLockRet
|
||||
|
||||
let mut mode_str = meta.get(X_AMZ_OBJECT_LOCK_MODE.as_str().to_lowercase().as_str());
|
||||
if mode_str.is_none() {
|
||||
mode_str = Some(&meta[X_AMZ_OBJECT_LOCK_MODE.as_str()]);
|
||||
mode_str = meta.get(X_AMZ_OBJECT_LOCK_MODE.as_str());
|
||||
}
|
||||
let mode = if let Some(mode_str) = mode_str {
|
||||
parse_ret_mode(mode_str.as_str())
|
||||
@@ -50,7 +50,7 @@ pub fn get_object_retention_meta(meta: HashMap<String, String>) -> ObjectLockRet
|
||||
|
||||
let mut till_str = meta.get(X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str().to_lowercase().as_str());
|
||||
if till_str.is_none() {
|
||||
till_str = Some(&meta[X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str()]);
|
||||
till_str = meta.get(X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str());
|
||||
}
|
||||
if let Some(till_str) = till_str {
|
||||
let t = OffsetDateTime::parse(till_str, &format_description::well_known::Iso8601::DEFAULT);
|
||||
@@ -67,7 +67,7 @@ pub fn get_object_retention_meta(meta: HashMap<String, String>) -> ObjectLockRet
|
||||
pub fn get_object_legalhold_meta(meta: HashMap<String, String>) -> ObjectLockLegalHold {
|
||||
let mut hold_str = meta.get(X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str().to_lowercase().as_str());
|
||||
if hold_str.is_none() {
|
||||
hold_str = Some(&meta[X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str()]);
|
||||
hold_str = meta.get(X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str());
|
||||
}
|
||||
if let Some(hold_str) = hold_str {
|
||||
return ObjectLockLegalHold {
|
||||
|
||||
@@ -34,8 +34,8 @@ use rustfs_madmin::heal_commands::HealResultItem;
|
||||
use rustfs_rio::Checksum;
|
||||
use rustfs_rio::{DecompressReader, HashReader, LimitReader, WarpReader};
|
||||
use rustfs_utils::CompressionAlgorithm;
|
||||
use rustfs_utils::http::AMZ_STORAGE_CLASS;
|
||||
use rustfs_utils::http::headers::{AMZ_OBJECT_TAGGING, RESERVED_METADATA_PREFIX_LOWER};
|
||||
use rustfs_utils::http::{AMZ_BUCKET_REPLICATION_STATUS, AMZ_STORAGE_CLASS};
|
||||
use rustfs_utils::path::decode_dir_object;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
@@ -745,7 +745,22 @@ impl ObjectInfo {
|
||||
let inlined = fi.inline_data();
|
||||
|
||||
// TODO:expires
|
||||
// TODO:ReplicationState
|
||||
|
||||
let mut replication_status_internal = None;
|
||||
let mut version_purge_status_internal = None;
|
||||
if let Some(replication_state) = fi.replication_state_internal.as_ref() {
|
||||
replication_status_internal = replication_state.replication_status_internal.clone();
|
||||
version_purge_status_internal = replication_state.version_purge_status_internal.clone();
|
||||
}
|
||||
let mut replication_status = fi.replication_status();
|
||||
if replication_status.is_empty()
|
||||
&& let Some(status) = fi.metadata.get(AMZ_BUCKET_REPLICATION_STATUS)
|
||||
&& status == ReplicationStatusType::Replica.as_str()
|
||||
{
|
||||
replication_status = ReplicationStatusType::Replica;
|
||||
}
|
||||
|
||||
let version_purge_status = fi.version_purge_status();
|
||||
|
||||
let transitioned_object = TransitionedObject {
|
||||
name: fi.transitioned_objname.clone(),
|
||||
@@ -810,6 +825,10 @@ impl ObjectInfo {
|
||||
transitioned_object,
|
||||
checksum: fi.checksum.clone(),
|
||||
storage_class,
|
||||
replication_status_internal,
|
||||
version_purge_status_internal,
|
||||
replication_status,
|
||||
version_purge_status,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -314,13 +314,15 @@ impl ECStore {
|
||||
|
||||
// contextCanceled
|
||||
|
||||
let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_infos(
|
||||
&list_result.entries.unwrap_or_default(),
|
||||
bucket,
|
||||
prefix,
|
||||
delimiter.clone(),
|
||||
)
|
||||
.await;
|
||||
let entries = list_result.entries.unwrap_or_default();
|
||||
for entry in entries.entries() {
|
||||
if entry.is_object() {
|
||||
let fi = entry.to_fileinfo(bucket).unwrap();
|
||||
tracing::warn!("list_objects_generic file_info: {:?}", fi);
|
||||
}
|
||||
}
|
||||
|
||||
let mut get_objects = ObjectInfo::from_meta_cache_entries_sorted_infos(&entries, bucket, prefix, delimiter.clone()).await;
|
||||
|
||||
let is_truncated = {
|
||||
if max_keys > 0 && get_objects.len() > max_keys as usize {
|
||||
|
||||
Reference in New Issue
Block a user