diff --git a/Cargo.lock b/Cargo.lock index 8a0f7329..2b315f47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,9 +119,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.19" +version = "0.6.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933" +checksum = "3ae563653d1938f79b1ab1b5e668c87c76a9930414574a6583a7b7e11a8e6192" dependencies = [ "anstyle", "anstyle-parse", @@ -149,22 +149,22 @@ dependencies = [ [[package]] name = "anstyle-query" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9" +checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] name = "anstyle-wincon" -version = "3.0.9" +version = "3.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882" +checksum = "3e0633414522a32ffaac8ac6cc8f748e090c5717661fddeea04219e2344f5f2a" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -522,9 +522,9 @@ dependencies = [ [[package]] name = "async-lock" -version = "3.4.0" +version = "3.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" dependencies = [ "event-listener", "event-listener-strategy", @@ -674,9 +674,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-config" -version = "1.8.3" +version = "1.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0baa720ebadea158c5bda642ac444a2af0cdf7bb66b46d1e4533de5d1f449d0" +checksum = "483020b893cdef3d89637e428d588650c71cfae7ea2e6ecbaee4de4ff99fb2dd" dependencies = [ "aws-credential-types", "aws-runtime", @@ -704,9 +704,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.4" +version = "1.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b68c2194a190e1efc999612792e25b1ab3abfefe4306494efaaabc25933c0cbe" +checksum = "1541072f81945fa1251f8795ef6c92c4282d74d59f88498ae7d4bf00f0ebdad9" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -739,9 +739,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.5.9" +version = "1.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2090e664216c78e766b6bac10fe74d2f451c02441d43484cd76ac9a295075f7" +checksum = "c034a1bc1d70e16e7f4e4caf7e9f7693e4c9c24cd91cf17c2a0b21abaebc7c8b" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -764,9 +764,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.100.0" +version = "1.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c5eafbdcd898114b839ba68ac628e31c4cfc3e11dfca38dc1b2de2f35bb6270" +checksum = "7b16efa59a199f5271bf21ab3e570c5297d819ce4f240e6cf0096d1dc0049c44" dependencies = [ "aws-credential-types", "aws-runtime", @@ -798,9 +798,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.78.0" +version = "1.79.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbd7bc4bd34303733bded362c4c997a39130eac4310257c79aae8484b1c4b724" +checksum = "0a847168f15b46329fa32c7aca4e4f1a2e072f9b422f0adb19756f2e1457f111" dependencies = [ "aws-credential-types", "aws-runtime", @@ -820,9 +820,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.79.0" +version = "1.80.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77358d25f781bb106c1a69531231d4fd12c6be904edb0c47198c604df5a2dbca" +checksum = "b654dd24d65568738593e8239aef279a86a15374ec926ae8714e2d7245f34149" dependencies = [ "aws-credential-types", "aws-runtime", @@ -842,9 +842,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.80.0" +version = "1.81.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06e3ed2a9b828ae7763ddaed41d51724d2661a50c45f845b08967e52f4939cfc" +checksum = "c92ea8a7602321c83615c82b408820ad54280fb026e92de0eeea937342fafa24" dependencies = [ "aws-credential-types", "aws-runtime", @@ -865,9 +865,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.3.3" +version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddfb9021f581b71870a17eac25b52335b82211cdc092e02b6876b2bcefa61666" +checksum = "084c34162187d39e3740cb635acd73c4e3a551a36146ad6fe8883c929c9f876c" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -904,9 +904,9 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.63.5" +version = "0.63.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ab9472f7a8ec259ddb5681d2ef1cb1cf16c0411890063e67cdc7b62562cc496" +checksum = "9054b4cc5eda331cde3096b1576dec45365c5cbbca61d1fffa5f236e251dfce7" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -935,9 +935,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.62.2" +version = "0.62.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43c82ba4cab184ea61f6edaafc1072aad3c2a17dcf4c0fce19ac5694b90d8b5f" +checksum = "7c4dacf2d38996cf729f55e7a762b30918229917eca115de45dfa8dfb97796c9" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -964,7 +964,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "h2 0.3.27", - "h2 0.4.11", + "h2 0.4.12", "http 0.2.12", "http 1.3.1", "http-body 0.4.6", @@ -1013,9 +1013,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.8.5" +version = "1.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "660f70d9d8af6876b4c9aa8dcb0dbaf0f89b04ee9a4455bea1b4ba03b15f26f6" +checksum = "9e107ce0783019dbff59b3a244aa0c114e4a8c9d93498af9162608cd5474e796" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1037,9 +1037,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.8.5" +version = "1.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "937a49ecf061895fca4a6dd8e864208ed9be7546c0527d04bc07d502ec5fba1c" +checksum = "75d52251ed4b9776a3e8487b2a01ac915f73b2da3af8fc1e77e0fce697a550d4" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -1540,9 +1540,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.30" +version = "1.2.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deec109607ca693028562ed836a5f1c4b8bd77755c4e132fc5ce11b0b6211ae7" +checksum = "c3a42d84bb6b69d3a8b3eaacf0d88f179e1929695e1ad012b6cf64d9caaa5fd2" dependencies = [ "jobserver", "libc", @@ -1697,9 +1697,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.42" +version = "4.5.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed87a9d530bb41a67537289bafcac159cb3ee28460e0a4571123d2a778a6a882" +checksum = "50fd97c9dc2399518aa331917ac6f274280ec5eb34e555dd291899745c48ec6f" dependencies = [ "clap_builder", "clap_derive", @@ -1707,9 +1707,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.42" +version = "4.5.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64f4f3f3c77c94aff3c7e9aac9a2ca1974a5adf392a8bb751e827d6d127ab966" +checksum = "c35b5830294e1fa0462034af85cc95225a4cb07092c088c55bda3147cfcd8f65" dependencies = [ "anstream", "anstyle", @@ -2293,12 +2293,12 @@ dependencies = [ [[package]] name = "darling" -version = "0.21.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a79c4acb1fd5fa3d9304be4c76e031c54d2e92d172a393e24b19a14fe8532fe9" +checksum = "d6b136475da5ef7b6ac596c0e956e37bad51b85b987ff3d5e230e964936736b2" dependencies = [ - "darling_core 0.21.0", - "darling_macro 0.21.0", + "darling_core 0.21.1", + "darling_macro 0.21.1", ] [[package]] @@ -2317,9 +2317,9 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.21.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74875de90daf30eb59609910b84d4d368103aaec4c924824c6799b28f77d6a1d" +checksum = "b44ad32f92b75fb438b04b68547e521a548be8acc339a6dacc4a7121488f53e6" dependencies = [ "fnv", "ident_case", @@ -2342,11 +2342,11 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.21.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e79f8e61677d5df9167cd85265f8e5f64b215cdea3fb55eebc3e622e44c7a146" +checksum = "2b5be8a7a562d315a5b92a630c30cec6bcf663e6673f00fbb69cca66a6f521b9" dependencies = [ - "darling_core 0.21.0", + "darling_core 0.21.1", "quote", "syn 2.0.104", ] @@ -3738,9 +3738,9 @@ dependencies = [ [[package]] name = "event-listener" -version = "5.4.0" +version = "5.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" dependencies = [ "concurrent-queue", "parking", @@ -4001,9 +4001,9 @@ checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-lite" -version = "2.6.0" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" dependencies = [ "fastrand", "futures-core", @@ -4490,9 +4490,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17da50a276f1e01e0ba6c029e47b7100754904ee8a278f886546e98575380785" +checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" dependencies = [ "atomic-waker", "bytes", @@ -4742,7 +4742,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.11", + "h2 0.4.12", "http 1.3.1", "http-body 1.0.1", "httparse", @@ -5251,7 +5251,7 @@ dependencies = [ "dbus-secret-service", "log", "security-framework 2.11.1", - "security-framework 3.2.0", + "security-framework 3.3.0", "windows-sys 0.60.2", "zeroize", ] @@ -5957,9 +5957,9 @@ dependencies = [ [[package]] name = "notify" -version = "8.1.0" +version = "8.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3163f59cd3fa0e9ef8c32f242966a7b9994fd7378366099593e0e73077cd8c97" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" dependencies = [ "bitflags 2.9.1", "fsevent-sys", @@ -7026,9 +7026,9 @@ dependencies = [ [[package]] name = "polling" -version = "3.9.0" +version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ee9b2fa7a4517d2c91ff5bc6c297a427a96749d15f98fcdbb22c05571a4d4b7" +checksum = "b5bd19146350fe804f7cb2669c851c03d69da628803dab0d98018142aaa5d829" dependencies = [ "cfg-if", "concurrent-queue", @@ -7320,9 +7320,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.38.0" +version = "0.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8927b0664f5c5a98265138b7e3f90aa19a6b21353182469ace36d4ac527b7b1b" +checksum = "9845d9dccf565065824e69f9f235fafba1587031eda353c1f1561cd6a6be78f4" dependencies = [ "memchr", "serde", @@ -7617,9 +7617,9 @@ dependencies = [ [[package]] name = "redox_users" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78eaea1f52c56d57821be178b2d47e09ff26481a6042e8e042fcb0ced068b470" +checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" dependencies = [ "getrandom 0.2.16", "libredox", @@ -7718,7 +7718,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.4.11", + "h2 0.4.12", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -7851,7 +7851,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4aebc912b8fa7d54999adc4e45601d1d95fe458f97eb0a1277eddcd6382cf4b1" dependencies = [ - "darling 0.21.0", + "darling 0.21.1", "proc-macro2", "quote", "serde_json", @@ -8110,10 +8110,14 @@ dependencies = [ "async-trait", "chrono", "futures", + "lazy_static", "rustfs-common", "rustfs-ecstore", "rustfs-filemeta", + "rustfs-lock", "rustfs-madmin", + "rustfs-utils", + "s3s", "serde", "serde_json", "serial_test", @@ -8123,6 +8127,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "url", "uuid", "walkdir", ] @@ -8225,7 +8230,7 @@ dependencies = [ "once_cell", "path-absolutize", "pin-project-lite", - "quick-xml 0.38.0", + "quick-xml 0.38.1", "rand 0.9.2", "reed-solomon-simd", "regex", @@ -8393,7 +8398,7 @@ dependencies = [ "form_urlencoded", "futures", "once_cell", - "quick-xml 0.38.0", + "quick-xml 0.38.1", "reqwest", "rumqttc", "rustfs-config", @@ -8741,7 +8746,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework 3.2.0", + "security-framework 3.3.0", ] [[package]] @@ -8978,9 +8983,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "3.2.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" +checksum = "80fb1d92c5028aa318b4b8bd7302a5bfcf48be96a37fc6fc790f806b0004ee0c" dependencies = [ "bitflags 2.9.1", "core-foundation 0.10.1", @@ -9387,9 +9392,9 @@ dependencies = [ [[package]] name = "signal-hook-registry" -version = "1.4.5" +version = "1.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +checksum = "b2a4719bff48cee6b39d12c020eeb490953ad2443b7055bd0b21fca26bd8c28b" dependencies = [ "libc", ] @@ -10294,9 +10299,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.15" +version = "0.7.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", @@ -10407,7 +10412,7 @@ dependencies = [ "base64 0.22.1", "bytes", "flate2", - "h2 0.4.11", + "h2 0.4.12", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -12192,9 +12197,9 @@ dependencies = [ [[package]] name = "zerovec" -version = "0.11.2" +version = "0.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a05eb080e015ba39cc9e23bbe5e7fb04d5fb040350f99f34e338d5fdd294428" +checksum = "e7aa2bd55086f1ab526693ecbe444205da57e25f4489879da80635a46d90e73b" dependencies = [ "yoke", "zerofrom", diff --git a/crates/ahm/Cargo.toml b/crates/ahm/Cargo.toml index affb423a..d94f32aa 100644 --- a/crates/ahm/Cargo.toml +++ b/crates/ahm/Cargo.toml @@ -17,6 +17,7 @@ rustfs-ecstore = { workspace = true } rustfs-common = { workspace = true } rustfs-filemeta = { workspace = true } rustfs-madmin = { workspace = true } +rustfs-utils = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-util = { workspace = true } tracing = { workspace = true } @@ -27,6 +28,10 @@ uuid = { workspace = true, features = ["v4", "serde"] } anyhow = { workspace = true } async-trait = { workspace = true } futures = { workspace = true } +url = { workspace = true } +rustfs-lock = { workspace = true } +s3s = { workspace = true } +lazy_static = { workspace = true } chrono = { workspace = true } [dev-dependencies] diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index 2b573cb8..3aa4b9df 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -30,15 +30,16 @@ use tracing::{debug, error, info, warn}; use super::metrics::{BucketMetrics, DiskMetrics, MetricsCollector, ScannerMetrics}; use crate::heal::HealManager; +use crate::scanner::lifecycle::ScannerItem; use crate::{ HealRequest, error::{Error, Result}, get_ahm_services_cancel_token, }; -use rustfs_common::{ - data_usage::DataUsageInfo, - metrics::{Metric, Metrics, globalMetrics}, -}; + +use rustfs_common::data_usage::DataUsageInfo; +use rustfs_common::metrics::{Metric, Metrics, globalMetrics}; +use rustfs_ecstore::cmd::bucket_targets::VersioningConfig; use rustfs_ecstore::disk::RUSTFS_META_BUCKET; @@ -290,7 +291,7 @@ impl Scanner { /// Get global metrics from common crate pub async fn get_global_metrics(&self) -> rustfs_madmin::metrics::ScannerMetrics { - globalMetrics.report().await + (*globalMetrics).report().await } /// Perform a single scan cycle @@ -317,7 +318,7 @@ impl Scanner { cycle_completed: vec![chrono::Utc::now()], started: chrono::Utc::now(), }; - globalMetrics.set_cycle(Some(cycle_info)).await; + (*globalMetrics).set_cycle(Some(cycle_info)).await; self.metrics.set_current_cycle(self.state.read().await.current_cycle); self.metrics.increment_total_cycles(); @@ -1160,6 +1161,19 @@ impl Scanner { /// This method collects all objects from a disk for a specific bucket. /// It returns a map of object names to their metadata for later analysis. async fn scan_volume(&self, disk: &DiskStore, bucket: &str) -> Result> { + let ecstore = match rustfs_ecstore::new_object_layer_fn() { + Some(ecstore) => ecstore, + None => { + error!("ECStore not available"); + return Err(Error::Other("ECStore not available".to_string())); + } + }; + let bucket_info = ecstore.get_bucket_info(bucket, &Default::default()).await.ok(); + let versioning_config = bucket_info.map(|bi| Arc::new(VersioningConfig { enabled: bi.versioning })); + let lifecycle_config = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket) + .await + .ok() + .map(|(c, _)| Arc::new(c)); // Start global metrics collection for volume scan let stop_fn = Metrics::time(Metric::ScanObject); @@ -1247,6 +1261,15 @@ impl Scanner { } } } else { + // Apply lifecycle actions + if let Some(lifecycle_config) = &lifecycle_config { + let mut scanner_item = + ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone()); + if let Err(e) = scanner_item.apply_actions(&entry.name, entry.clone()).await { + error!("Failed to apply lifecycle actions for {}/{}: {}", bucket, entry.name, e); + } + } + // Store object metadata for later analysis object_metadata.insert(entry.name.clone(), file_meta.clone()); } diff --git a/crates/ahm/src/scanner/lifecycle.rs b/crates/ahm/src/scanner/lifecycle.rs new file mode 100644 index 00000000..5d33399d --- /dev/null +++ b/crates/ahm/src/scanner/lifecycle.rs @@ -0,0 +1,125 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use rustfs_common::metrics::IlmAction; +use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc; +use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::{apply_lifecycle_action, eval_action_from_lifecycle}; +use rustfs_ecstore::bucket::metadata_sys::get_object_lock_config; +use rustfs_ecstore::cmd::bucket_targets::VersioningConfig; +use rustfs_ecstore::store_api::ObjectInfo; +use rustfs_filemeta::FileMetaVersion; +use rustfs_filemeta::metacache::MetaCacheEntry; +use s3s::dto::BucketLifecycleConfiguration as LifecycleConfig; +use tracing::info; + +#[derive(Clone)] +pub struct ScannerItem { + bucket: String, + lifecycle: Option>, + versioning: Option>, +} + +impl ScannerItem { + pub fn new(bucket: String, lifecycle: Option>, versioning: Option>) -> Self { + Self { + bucket, + lifecycle, + versioning, + } + } + + pub async fn apply_actions(&mut self, object: &str, mut meta: MetaCacheEntry) -> anyhow::Result<()> { + info!("apply_actions called for object: {}", object); + if self.lifecycle.is_none() { + info!("No lifecycle config for object: {}", object); + return Ok(()); + } + info!("Lifecycle config exists for object: {}", object); + + let file_meta = match meta.xl_meta() { + Ok(meta) => meta, + Err(e) => { + tracing::error!("Failed to get xl_meta for {}: {}", object, e); + return Ok(()); + } + }; + + let latest_version = file_meta.versions.first().cloned().unwrap_or_default(); + let file_meta_version = FileMetaVersion::try_from(latest_version.meta.as_slice()).unwrap_or_default(); + + let obj_info = ObjectInfo { + bucket: self.bucket.clone(), + name: object.to_string(), + version_id: latest_version.header.version_id, + mod_time: latest_version.header.mod_time, + size: file_meta_version.object.as_ref().map_or(0, |o| o.size), + user_defined: serde_json::from_slice(file_meta.data.as_slice()).unwrap_or_default(), + ..Default::default() + }; + + self.apply_lifecycle(&obj_info).await; + + Ok(()) + } + + async fn apply_lifecycle(&mut self, oi: &ObjectInfo) -> (IlmAction, i64) { + let size = oi.size; + if self.lifecycle.is_none() { + return (IlmAction::NoneAction, size); + } + + let (olcfg, rcfg) = if self.bucket != ".minio.sys" { + ( + get_object_lock_config(&self.bucket).await.ok(), + None, // FIXME: replication config + ) + } else { + (None, None) + }; + + let lc_evt = eval_action_from_lifecycle( + self.lifecycle.as_ref().unwrap(), + olcfg + .as_ref() + .and_then(|(c, _)| c.rule.as_ref().and_then(|r| r.default_retention.clone())), + rcfg.clone(), + oi, + ) + .await; + + info!("lifecycle: {} Initial scan: {}", oi.name, lc_evt.action); + + let mut new_size = size; + match lc_evt.action { + IlmAction::DeleteVersionAction | IlmAction::DeleteAllVersionsAction | IlmAction::DelMarkerDeleteAllVersionsAction => { + new_size = 0; + } + IlmAction::DeleteAction => { + if let Some(vcfg) = &self.versioning { + if !vcfg.is_enabled() { + new_size = 0; + } + } else { + new_size = 0; + } + } + _ => (), + } + + apply_lifecycle_action(&lc_evt, &LcEventSrc::Scanner, oi).await; + (lc_evt.action, new_size) + } +} diff --git a/crates/ahm/src/scanner/mod.rs b/crates/ahm/src/scanner/mod.rs index d299c143..13019eb4 100644 --- a/crates/ahm/src/scanner/mod.rs +++ b/crates/ahm/src/scanner/mod.rs @@ -14,6 +14,7 @@ pub mod data_scanner; pub mod histogram; +pub mod lifecycle; pub mod metrics; pub use data_scanner::Scanner; diff --git a/crates/ahm/tests/lifecycle_integration_test.rs b/crates/ahm/tests/lifecycle_integration_test.rs new file mode 100644 index 00000000..88087f80 --- /dev/null +++ b/crates/ahm/tests/lifecycle_integration_test.rs @@ -0,0 +1,243 @@ +use rustfs_ahm::scanner::{Scanner, data_scanner::ScannerConfig}; +use rustfs_ecstore::{ + bucket::metadata::BUCKET_LIFECYCLE_CONFIG, + bucket::metadata_sys, + disk::endpoint::Endpoint, + endpoints::{EndpointServerPools, Endpoints, PoolEndpoints}, + store::ECStore, + store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI}, +}; +use serial_test::serial; +use std::sync::Once; +use std::sync::OnceLock; +use std::{path::PathBuf, sync::Arc, time::Duration}; +use tokio::fs; +use tracing::info; + +static GLOBAL_ENV: OnceLock<(Vec, Arc)> = OnceLock::new(); +static INIT: Once = Once::new(); + +fn init_tracing() { + INIT.call_once(|| { + let _ = tracing_subscriber::fmt::try_init(); + }); +} + +/// Test helper: Create test environment with ECStore +async fn setup_test_env() -> (Vec, Arc) { + init_tracing(); + + // Fast path: already initialized, just clone and return + if let Some((paths, ecstore)) = GLOBAL_ENV.get() { + return (paths.clone(), ecstore.clone()); + } + + // create temp dir as 4 disks with unique base dir + let test_base_dir = format!("/tmp/rustfs_ahm_lifecycle_test_{}", uuid::Uuid::new_v4()); + let temp_dir = std::path::PathBuf::from(&test_base_dir); + if temp_dir.exists() { + fs::remove_dir_all(&temp_dir).await.ok(); + } + fs::create_dir_all(&temp_dir).await.unwrap(); + + // create 4 disk dirs + let disk_paths = vec![ + temp_dir.join("disk1"), + temp_dir.join("disk2"), + temp_dir.join("disk3"), + temp_dir.join("disk4"), + ]; + + for disk_path in &disk_paths { + fs::create_dir_all(disk_path).await.unwrap(); + } + + // create EndpointServerPools + let mut endpoints = Vec::new(); + for (i, disk_path) in disk_paths.iter().enumerate() { + let mut endpoint = Endpoint::try_from(disk_path.to_str().unwrap()).unwrap(); + // set correct index + endpoint.set_pool_index(0); + endpoint.set_set_index(0); + endpoint.set_disk_index(i); + endpoints.push(endpoint); + } + + let pool_endpoints = PoolEndpoints { + legacy: false, + set_count: 1, + drives_per_set: 4, + endpoints: Endpoints::from(endpoints), + cmd_line: "test".to_string(), + platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH), + }; + + let endpoint_pools = EndpointServerPools(vec![pool_endpoints]); + + // format disks (only first time) + rustfs_ecstore::store::init_local_disks(endpoint_pools.clone()).await.unwrap(); + + // create ECStore with dynamic port 0 (let OS assign) or fixed 9002 if free + let port = 9002; // for simplicity + let server_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); + let ecstore = ECStore::new(server_addr, endpoint_pools).await.unwrap(); + + // init bucket metadata system + let buckets_list = ecstore + .list_bucket(&rustfs_ecstore::store_api::BucketOptions { + no_metadata: true, + ..Default::default() + }) + .await + .unwrap(); + let buckets = buckets_list.into_iter().map(|v| v.name).collect(); + rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await; + + // Initialize background expiry workers + rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::init_background_expiry(ecstore.clone()).await; + + // Store in global once lock + let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone())); + + (disk_paths, ecstore) +} + +/// Test helper: Create a test bucket +async fn create_test_bucket(ecstore: &Arc, bucket_name: &str) { + (**ecstore) + .make_bucket(bucket_name, &Default::default()) + .await + .expect("Failed to create test bucket"); + info!("Created test bucket: {}", bucket_name); +} + +/// Test helper: Upload test object +async fn upload_test_object(ecstore: &Arc, bucket: &str, object: &str, data: &[u8]) { + let mut reader = PutObjReader::from_vec(data.to_vec()); + let object_info = (**ecstore) + .put_object(bucket, object, &mut reader, &ObjectOptions::default()) + .await + .expect("Failed to upload test object"); + + info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size); +} + +/// Test helper: Set bucket lifecycle configuration +async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box> { + // Create a simple lifecycle configuration XML with 0 days expiry for immediate testing + let lifecycle_xml = r#" + + + test-rule + Enabled + + test/ + + + 0 + + +"#; + + metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?; + + Ok(()) +} + +/// Test helper: Check if object exists +async fn object_exists(ecstore: &Arc, bucket: &str, object: &str) -> bool { + ((**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await).is_ok() +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_lifecycle_expiry_basic() { + let (_disk_paths, ecstore) = setup_test_env().await; + + // Create test bucket and object + let bucket_name = "test-lifecycle-bucket"; + let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/" + let test_data = b"Hello, this is test data for lifecycle expiry!"; + + create_test_bucket(&ecstore, bucket_name).await; + upload_test_object(&ecstore, bucket_name, object_name, test_data).await; + + // Verify object exists initially + assert!(object_exists(&ecstore, bucket_name, object_name).await); + println!("✅ Object exists before lifecycle processing"); + + // Set lifecycle configuration with very short expiry (0 days = immediate expiry) + set_bucket_lifecycle(bucket_name) + .await + .expect("Failed to set lifecycle configuration"); + println!("✅ Lifecycle configuration set for bucket: {bucket_name}"); + + // Verify lifecycle configuration was set + match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await { + Ok(bucket_meta) => { + assert!(bucket_meta.lifecycle_config.is_some()); + println!("✅ Bucket metadata retrieved successfully"); + } + Err(e) => { + println!("❌ Error retrieving bucket metadata: {e:?}"); + } + } + + // Create scanner with very short intervals for testing + let scanner_config = ScannerConfig { + scan_interval: Duration::from_millis(100), + deep_scan_interval: Duration::from_millis(500), + max_concurrent_scans: 1, + ..Default::default() + }; + + let scanner = Scanner::new(Some(scanner_config), None); + + // Start scanner + scanner.start().await.expect("Failed to start scanner"); + println!("✅ Scanner started"); + + // Wait for scanner to process lifecycle rules + tokio::time::sleep(Duration::from_secs(2)).await; + + // Manually trigger a scan cycle to ensure lifecycle processing + scanner.scan_cycle().await.expect("Failed to trigger scan cycle"); + println!("✅ Manual scan cycle completed"); + + // Wait a bit more for background workers to process expiry tasks + tokio::time::sleep(Duration::from_secs(5)).await; + + // Check if object has been expired (deleted) + let object_still_exists = object_exists(&ecstore, bucket_name, object_name).await; + println!("Object exists after lifecycle processing: {object_still_exists}"); + + if object_still_exists { + println!("❌ Object was not deleted by lifecycle processing"); + // Let's try to get object info to see its details + match ecstore + .get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + .await + { + Ok(obj_info) => { + println!( + "Object info: name={}, size={}, mod_time={:?}", + obj_info.name, obj_info.size, obj_info.mod_time + ); + } + Err(e) => { + println!("Error getting object info: {e:?}"); + } + } + } else { + println!("✅ Object was successfully deleted by lifecycle processing"); + } + + assert!(!object_still_exists); + println!("✅ Object successfully expired"); + + // Stop scanner + let _ = scanner.stop().await; + println!("✅ Scanner stopped"); + + println!("Lifecycle expiry basic test completed"); +} diff --git a/crates/e2e_test/src/reliant/lifecycle.rs b/crates/e2e_test/src/reliant/lifecycle.rs new file mode 100644 index 00000000..9f8b55de --- /dev/null +++ b/crates/e2e_test/src/reliant/lifecycle.rs @@ -0,0 +1,133 @@ +#![cfg(test)] +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use aws_config::meta::region::RegionProviderChain; +use aws_sdk_s3::Client; +use aws_sdk_s3::config::{Credentials, Region}; +use bytes::Bytes; +use serial_test::serial; +use std::error::Error; +use tokio::time::sleep; + +const ENDPOINT: &str = "http://localhost:9000"; +const ACCESS_KEY: &str = "rustfsadmin"; +const SECRET_KEY: &str = "rustfsadmin"; +const BUCKET: &str = "test-basic-bucket"; + +async fn create_aws_s3_client() -> Result> { + let region_provider = RegionProviderChain::default_provider().or_else(Region::new("us-east-1")); + let shared_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(region_provider) + .credentials_provider(Credentials::new(ACCESS_KEY, SECRET_KEY, None, None, "static")) + .endpoint_url(ENDPOINT) + .load() + .await; + + let client = Client::from_conf( + aws_sdk_s3::Config::from(&shared_config) + .to_builder() + .force_path_style(true) + .build(), + ); + Ok(client) +} + +async fn setup_test_bucket(client: &Client) -> Result<(), Box> { + match client.create_bucket().bucket(BUCKET).send().await { + Ok(_) => {} + Err(e) => { + let error_str = e.to_string(); + if !error_str.contains("BucketAlreadyOwnedByYou") && !error_str.contains("BucketAlreadyExists") { + return Err(e.into()); + } + } + } + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[serial] +#[ignore = "requires running RustFS server at localhost:9000"] +async fn test_bucket_lifecycle_configuration() -> Result<(), Box> { + use aws_sdk_s3::types::{BucketLifecycleConfiguration, LifecycleExpiration, LifecycleRule, LifecycleRuleFilter}; + use tokio::time::Duration; + + let client = create_aws_s3_client().await?; + setup_test_bucket(&client).await?; + + // Upload test object first + let test_content = "Test object for lifecycle expiration"; + let lifecycle_object_key = "lifecycle-test-object.txt"; + client + .put_object() + .bucket(BUCKET) + .key(lifecycle_object_key) + .body(Bytes::from(test_content.as_bytes()).into()) + .send() + .await?; + + // Verify object exists initially + let resp = client.get_object().bucket(BUCKET).key(lifecycle_object_key).send().await?; + assert!(resp.content_length().unwrap_or(0) > 0); + + // Configure lifecycle rule: expire after current time + 3 seconds + let expiration = LifecycleExpiration::builder().days(0).build(); + let filter = LifecycleRuleFilter::builder().prefix(lifecycle_object_key).build(); + let rule = LifecycleRule::builder() + .id("expire-test-object") + .filter(filter) + .expiration(expiration) + .status(aws_sdk_s3::types::ExpirationStatus::Enabled) + .build()?; + let lifecycle = BucketLifecycleConfiguration::builder().rules(rule).build()?; + + client + .put_bucket_lifecycle_configuration() + .bucket(BUCKET) + .lifecycle_configuration(lifecycle) + .send() + .await?; + + // Verify lifecycle configuration was set + let resp = client.get_bucket_lifecycle_configuration().bucket(BUCKET).send().await?; + let rules = resp.rules(); + assert!(rules.iter().any(|r| r.id().unwrap_or("") == "expire-test-object")); + + // Wait for lifecycle processing (scanner runs every 1 second) + sleep(Duration::from_secs(3)).await; + + // After lifecycle processing, the object should be deleted by the lifecycle rule + let get_result = client.get_object().bucket(BUCKET).key(lifecycle_object_key).send().await; + + match get_result { + Ok(_) => { + panic!("Expected object to be deleted by lifecycle rule, but it still exists"); + } + Err(e) => { + if let Some(service_error) = e.as_service_error() { + if service_error.is_no_such_key() { + println!("Lifecycle configuration test completed - object was successfully deleted by lifecycle rule"); + } else { + panic!("Expected NoSuchKey error, but got: {e:?}"); + } + } else { + panic!("Expected service error, but got: {e:?}"); + } + } + } + + println!("Lifecycle configuration test completed."); + Ok(()) +} diff --git a/crates/e2e_test/src/reliant/mod.rs b/crates/e2e_test/src/reliant/mod.rs index 00bf9d98..b8d05efd 100644 --- a/crates/e2e_test/src/reliant/mod.rs +++ b/crates/e2e_test/src/reliant/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod lifecycle; mod lock; mod node_interact_test; mod sql; diff --git a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs index e2848821..6fe4c835 100644 --- a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs +++ b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs @@ -516,7 +516,7 @@ impl TransitionState { if let Err(err) = transition_object(api.clone(), &task.obj_info, LcAuditEvent::new(task.event.clone(), task.src.clone())).await { if !is_err_version_not_found(&err) && !is_err_object_not_found(&err) && !is_network_or_host_down(&err.to_string(), false) && !err.to_string().contains("use of closed network connection") { error!("Transition to {} failed for {}/{} version:{} with {}", - task.event.storage_class, task.obj_info.bucket, task.obj_info.name, task.obj_info.version_id.expect("err"), err.to_string()); + task.event.storage_class, task.obj_info.bucket, task.obj_info.name, task.obj_info.version_id.map(|v| v.to_string()).unwrap_or_default(), err.to_string()); } } else { let mut ts = TierStats { @@ -743,7 +743,7 @@ pub async fn transition_object(api: Arc, oi: &ObjectInfo, lae: LcAuditE ..Default::default() }, //lifecycle_audit_event: lae, - version_id: Some(oi.version_id.expect("err").to_string()), + version_id: oi.version_id.map(|v| v.to_string()), versioned: BucketVersioningSys::prefix_enabled(&oi.bucket, &oi.name).await, version_suspended: BucketVersioningSys::prefix_suspended(&oi.bucket, &oi.name).await, mod_time: oi.mod_time, @@ -808,7 +808,7 @@ impl LifecycleOps for ObjectInfo { lifecycle::ObjectOpts { name: self.name.clone(), user_tags: self.user_tags.clone(), - version_id: self.version_id.expect("err").to_string(), + version_id: self.version_id.map(|v| v.to_string()).unwrap_or_default(), mod_time: self.mod_time, size: self.size as usize, is_latest: self.is_latest, @@ -874,7 +874,11 @@ pub async fn eval_action_from_lifecycle( if lock_enabled && enforce_retention_for_deletion(oi) { //if serverDebugLog { if oi.version_id.is_some() { - info!("lifecycle: {} v({}) is locked, not deleting", oi.name, oi.version_id.expect("err")); + info!( + "lifecycle: {} v({}) is locked, not deleting", + oi.name, + oi.version_id.map(|v| v.to_string()).unwrap_or_default() + ); } else { info!("lifecycle: {} is locked, not deleting", oi.name); } @@ -928,7 +932,7 @@ pub async fn apply_expiry_on_non_transitioned_objects( }; if lc_event.action.delete_versioned() { - opts.version_id = Some(oi.version_id.expect("err").to_string()); + opts.version_id = oi.version_id.map(|v| v.to_string()); } opts.versioned = BucketVersioningSys::prefix_enabled(&oi.bucket, &oi.name).await; diff --git a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs index 22caf860..f1d4a311 100644 --- a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs +++ b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs @@ -27,6 +27,7 @@ use std::env; use std::fmt::Display; use time::macros::{datetime, offset}; use time::{self, Duration, OffsetDateTime}; +use tracing::info; use crate::bucket::lifecycle::rule::TransitionOps; @@ -279,7 +280,12 @@ impl Lifecycle for BucketLifecycleConfiguration { async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event { let mut events = Vec::::new(); + info!( + "eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}", + obj.name, obj.mod_time, now, obj.is_latest, obj.delete_marker + ); if obj.mod_time.expect("err").unix_timestamp() == 0 { + info!("eval_inner: mod_time is 0, returning default event"); return Event::default(); } @@ -418,7 +424,16 @@ impl Lifecycle for BucketLifecycleConfiguration { } } - if obj.is_latest && !obj.delete_marker { + info!( + "eval_inner: checking expiration condition - is_latest={}, delete_marker={}, version_id={:?}, condition_met={}", + obj.is_latest, + obj.delete_marker, + obj.version_id, + (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker + ); + // Allow expiration for latest objects OR non-versioned objects (empty version_id) + if (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker { + info!("eval_inner: entering expiration check"); if let Some(ref expiration) = rule.expiration { if let Some(ref date) = expiration.date { let date0 = OffsetDateTime::from(date.clone()); @@ -435,22 +450,29 @@ impl Lifecycle for BucketLifecycleConfiguration { }); } } else if let Some(days) = expiration.days { - if days != 0 { - let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.expect("err!"), days); - if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() { - let mut event = Event { - action: IlmAction::DeleteAction, - rule_id: rule.id.clone().expect("err!"), - due: Some(expected_expiry), - noncurrent_days: 0, - newer_noncurrent_versions: 0, - storage_class: "".into(), - }; - /*if rule.expiration.expect("err!").delete_all.val { - event.action = IlmAction::DeleteAllVersionsAction - }*/ - events.push(event); - } + let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.expect("err!"), days); + info!( + "eval_inner: expiration check - days={}, obj_time={:?}, expiry_time={:?}, now={:?}, should_expire={}", + days, + obj.mod_time.expect("err!"), + expected_expiry, + now, + now.unix_timestamp() > expected_expiry.unix_timestamp() + ); + if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() { + info!("eval_inner: object should expire, adding DeleteAction"); + let mut event = Event { + action: IlmAction::DeleteAction, + rule_id: rule.id.clone().expect("err!"), + due: Some(expected_expiry), + noncurrent_days: 0, + newer_noncurrent_versions: 0, + storage_class: "".into(), + }; + /*if rule.expiration.expect("err!").delete_all.val { + event.action = IlmAction::DeleteAllVersionsAction + }*/ + events.push(event); } } } @@ -598,7 +620,7 @@ impl LifecycleCalculate for Transition { pub fn expected_expiry_time(mod_time: OffsetDateTime, days: i32) -> OffsetDateTime { if days == 0 { - return mod_time; + return OffsetDateTime::UNIX_EPOCH; // Return epoch time to ensure immediate expiry } let t = mod_time .to_offset(offset!(-0:00:00)) diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 517acf6a..2a4d8409 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -3837,7 +3837,90 @@ impl StorageAPI for SetDisks { return Ok(ObjectInfo::default()); } - unimplemented!() + + // Create a single object deletion request + let mut vr = FileInfo { + name: object.to_string(), + version_id: opts.version_id.as_ref().and_then(|v| Uuid::parse_str(v).ok()), + ..Default::default() + }; + + // Handle versioning + let (suspended, versioned) = (opts.version_suspended, opts.versioned); + if opts.version_id.is_none() && (suspended || versioned) { + vr.mod_time = Some(OffsetDateTime::now_utc()); + vr.deleted = true; + if versioned { + vr.version_id = Some(Uuid::new_v4()); + } + } + + let vers = vec![FileInfoVersions { + name: vr.name.clone(), + versions: vec![vr.clone()], + ..Default::default() + }]; + + let disks = self.disks.read().await; + let disks = disks.clone(); + let write_quorum = disks.len() / 2 + 1; + + let mut futures = Vec::with_capacity(disks.len()); + let mut errs = Vec::with_capacity(disks.len()); + + for disk in disks.iter() { + let vers = vers.clone(); + futures.push(async move { + if let Some(disk) = disk { + disk.delete_versions(bucket, vers, DeleteOptions::default()).await + } else { + Err(DiskError::DiskNotFound) + } + }); + } + + let results = join_all(futures).await; + + for result in results { + match result { + Ok(disk_errs) => { + // Handle errors from disk operations + for err in disk_errs.iter().flatten() { + warn!("delete_object disk error: {:?}", err); + } + errs.push(None); + } + Err(e) => { + errs.push(Some(e)); + } + } + } + + // Check write quorum + if let Some(err) = reduce_write_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, write_quorum) { + return Err(to_object_err(err.into(), vec![bucket, object])); + } + + // Create result ObjectInfo + let result_info = if vr.deleted { + ObjectInfo { + bucket: bucket.to_string(), + name: object.to_string(), + delete_marker: true, + mod_time: vr.mod_time, + version_id: vr.version_id, + ..Default::default() + } + } else { + ObjectInfo { + bucket: bucket.to_string(), + name: object.to_string(), + version_id: vr.version_id, + ..Default::default() + } + }; + + Ok(result_info) } #[tracing::instrument(skip(self))] diff --git a/crates/filemeta/src/lib.rs b/crates/filemeta/src/lib.rs index 32719fdc..48b1bf16 100644 --- a/crates/filemeta/src/lib.rs +++ b/crates/filemeta/src/lib.rs @@ -17,7 +17,7 @@ pub mod fileinfo; mod filemeta; mod filemeta_inline; pub mod headers; -mod metacache; +pub mod metacache; pub mod test_data;