From d987686c146b4b405477a425da77e2ca61b2ca29 Mon Sep 17 00:00:00 2001 From: guojidan <63799833+guojidan@users.noreply.github.com> Date: Thu, 7 Aug 2025 19:51:02 -0700 Subject: [PATCH] feat(lifecycle): Implement object lifecycle management functionality (#358) * feat(lifecycle): Implement object lifecycle management functionality Add a lifecycle module to automatically handle object expiration and transition during scanning Modify the file metadata cache module to be publicly visible to support lifecycle operations Adjust the scanning interval to a shorter time for testing lifecycle rules Implement the parsing and execution logic for S3 lifecycle configurations Add integration tests to verify the lifecycle expiration functionality Update dependencies to support the new lifecycle features Signed-off-by: junxiang Mu <1948535941@qq.com> * fix cargo dependencies Signed-off-by: junxiang Mu <1948535941@qq.com> * fix fmt Signed-off-by: junxiang Mu <1948535941@qq.com> --------- Signed-off-by: junxiang Mu <1948535941@qq.com> Co-authored-by: houseme --- Cargo.lock | 165 ++++++------ crates/ahm/Cargo.toml | 5 + crates/ahm/src/scanner/data_scanner.rs | 35 ++- crates/ahm/src/scanner/lifecycle.rs | 125 +++++++++ crates/ahm/src/scanner/mod.rs | 1 + .../ahm/tests/lifecycle_integration_test.rs | 243 ++++++++++++++++++ crates/e2e_test/src/reliant/lifecycle.rs | 133 ++++++++++ crates/e2e_test/src/reliant/mod.rs | 1 + .../bucket/lifecycle/bucket_lifecycle_ops.rs | 14 +- .../ecstore/src/bucket/lifecycle/lifecycle.rs | 58 +++-- crates/ecstore/src/set_disk.rs | 85 +++++- crates/filemeta/src/lib.rs | 2 +- 12 files changed, 756 insertions(+), 111 deletions(-) create mode 100644 crates/ahm/src/scanner/lifecycle.rs create mode 100644 crates/ahm/tests/lifecycle_integration_test.rs create mode 100644 crates/e2e_test/src/reliant/lifecycle.rs diff --git a/Cargo.lock b/Cargo.lock index 8a0f7329..2b315f47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,9 +119,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.19" +version = "0.6.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933" +checksum = "3ae563653d1938f79b1ab1b5e668c87c76a9930414574a6583a7b7e11a8e6192" dependencies = [ "anstyle", "anstyle-parse", @@ -149,22 +149,22 @@ dependencies = [ [[package]] name = "anstyle-query" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9" +checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] name = "anstyle-wincon" -version = "3.0.9" +version = "3.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882" +checksum = "3e0633414522a32ffaac8ac6cc8f748e090c5717661fddeea04219e2344f5f2a" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -522,9 +522,9 @@ dependencies = [ [[package]] name = "async-lock" -version = "3.4.0" +version = "3.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" dependencies = [ "event-listener", "event-listener-strategy", @@ -674,9 +674,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-config" -version = "1.8.3" +version = "1.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0baa720ebadea158c5bda642ac444a2af0cdf7bb66b46d1e4533de5d1f449d0" +checksum = "483020b893cdef3d89637e428d588650c71cfae7ea2e6ecbaee4de4ff99fb2dd" dependencies = [ "aws-credential-types", "aws-runtime", @@ -704,9 +704,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.4" +version = "1.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b68c2194a190e1efc999612792e25b1ab3abfefe4306494efaaabc25933c0cbe" +checksum = "1541072f81945fa1251f8795ef6c92c4282d74d59f88498ae7d4bf00f0ebdad9" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -739,9 +739,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.5.9" +version = "1.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2090e664216c78e766b6bac10fe74d2f451c02441d43484cd76ac9a295075f7" +checksum = "c034a1bc1d70e16e7f4e4caf7e9f7693e4c9c24cd91cf17c2a0b21abaebc7c8b" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -764,9 +764,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.100.0" +version = "1.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c5eafbdcd898114b839ba68ac628e31c4cfc3e11dfca38dc1b2de2f35bb6270" +checksum = "7b16efa59a199f5271bf21ab3e570c5297d819ce4f240e6cf0096d1dc0049c44" dependencies = [ "aws-credential-types", "aws-runtime", @@ -798,9 +798,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.78.0" +version = "1.79.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbd7bc4bd34303733bded362c4c997a39130eac4310257c79aae8484b1c4b724" +checksum = "0a847168f15b46329fa32c7aca4e4f1a2e072f9b422f0adb19756f2e1457f111" dependencies = [ "aws-credential-types", "aws-runtime", @@ -820,9 +820,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.79.0" +version = "1.80.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77358d25f781bb106c1a69531231d4fd12c6be904edb0c47198c604df5a2dbca" +checksum = "b654dd24d65568738593e8239aef279a86a15374ec926ae8714e2d7245f34149" dependencies = [ "aws-credential-types", "aws-runtime", @@ -842,9 +842,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.80.0" +version = "1.81.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06e3ed2a9b828ae7763ddaed41d51724d2661a50c45f845b08967e52f4939cfc" +checksum = "c92ea8a7602321c83615c82b408820ad54280fb026e92de0eeea937342fafa24" dependencies = [ "aws-credential-types", "aws-runtime", @@ -865,9 +865,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.3.3" +version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddfb9021f581b71870a17eac25b52335b82211cdc092e02b6876b2bcefa61666" +checksum = "084c34162187d39e3740cb635acd73c4e3a551a36146ad6fe8883c929c9f876c" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -904,9 +904,9 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.63.5" +version = "0.63.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ab9472f7a8ec259ddb5681d2ef1cb1cf16c0411890063e67cdc7b62562cc496" +checksum = "9054b4cc5eda331cde3096b1576dec45365c5cbbca61d1fffa5f236e251dfce7" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -935,9 +935,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.62.2" +version = "0.62.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43c82ba4cab184ea61f6edaafc1072aad3c2a17dcf4c0fce19ac5694b90d8b5f" +checksum = "7c4dacf2d38996cf729f55e7a762b30918229917eca115de45dfa8dfb97796c9" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -964,7 +964,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "h2 0.3.27", - "h2 0.4.11", + "h2 0.4.12", "http 0.2.12", "http 1.3.1", "http-body 0.4.6", @@ -1013,9 +1013,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.8.5" +version = "1.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "660f70d9d8af6876b4c9aa8dcb0dbaf0f89b04ee9a4455bea1b4ba03b15f26f6" +checksum = "9e107ce0783019dbff59b3a244aa0c114e4a8c9d93498af9162608cd5474e796" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1037,9 +1037,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.8.5" +version = "1.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "937a49ecf061895fca4a6dd8e864208ed9be7546c0527d04bc07d502ec5fba1c" +checksum = "75d52251ed4b9776a3e8487b2a01ac915f73b2da3af8fc1e77e0fce697a550d4" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -1540,9 +1540,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.30" +version = "1.2.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deec109607ca693028562ed836a5f1c4b8bd77755c4e132fc5ce11b0b6211ae7" +checksum = "c3a42d84bb6b69d3a8b3eaacf0d88f179e1929695e1ad012b6cf64d9caaa5fd2" dependencies = [ "jobserver", "libc", @@ -1697,9 +1697,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.42" +version = "4.5.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed87a9d530bb41a67537289bafcac159cb3ee28460e0a4571123d2a778a6a882" +checksum = "50fd97c9dc2399518aa331917ac6f274280ec5eb34e555dd291899745c48ec6f" dependencies = [ "clap_builder", "clap_derive", @@ -1707,9 +1707,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.42" +version = "4.5.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64f4f3f3c77c94aff3c7e9aac9a2ca1974a5adf392a8bb751e827d6d127ab966" +checksum = "c35b5830294e1fa0462034af85cc95225a4cb07092c088c55bda3147cfcd8f65" dependencies = [ "anstream", "anstyle", @@ -2293,12 +2293,12 @@ dependencies = [ [[package]] name = "darling" -version = "0.21.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a79c4acb1fd5fa3d9304be4c76e031c54d2e92d172a393e24b19a14fe8532fe9" +checksum = "d6b136475da5ef7b6ac596c0e956e37bad51b85b987ff3d5e230e964936736b2" dependencies = [ - "darling_core 0.21.0", - "darling_macro 0.21.0", + "darling_core 0.21.1", + "darling_macro 0.21.1", ] [[package]] @@ -2317,9 +2317,9 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.21.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74875de90daf30eb59609910b84d4d368103aaec4c924824c6799b28f77d6a1d" +checksum = "b44ad32f92b75fb438b04b68547e521a548be8acc339a6dacc4a7121488f53e6" dependencies = [ "fnv", "ident_case", @@ -2342,11 +2342,11 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.21.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e79f8e61677d5df9167cd85265f8e5f64b215cdea3fb55eebc3e622e44c7a146" +checksum = "2b5be8a7a562d315a5b92a630c30cec6bcf663e6673f00fbb69cca66a6f521b9" dependencies = [ - "darling_core 0.21.0", + "darling_core 0.21.1", "quote", "syn 2.0.104", ] @@ -3738,9 +3738,9 @@ dependencies = [ [[package]] name = "event-listener" -version = "5.4.0" +version = "5.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" dependencies = [ "concurrent-queue", "parking", @@ -4001,9 +4001,9 @@ checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-lite" -version = "2.6.0" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" dependencies = [ "fastrand", "futures-core", @@ -4490,9 +4490,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17da50a276f1e01e0ba6c029e47b7100754904ee8a278f886546e98575380785" +checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" dependencies = [ "atomic-waker", "bytes", @@ -4742,7 +4742,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.11", + "h2 0.4.12", "http 1.3.1", "http-body 1.0.1", "httparse", @@ -5251,7 +5251,7 @@ dependencies = [ "dbus-secret-service", "log", "security-framework 2.11.1", - "security-framework 3.2.0", + "security-framework 3.3.0", "windows-sys 0.60.2", "zeroize", ] @@ -5957,9 +5957,9 @@ dependencies = [ [[package]] name = "notify" -version = "8.1.0" +version = "8.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3163f59cd3fa0e9ef8c32f242966a7b9994fd7378366099593e0e73077cd8c97" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" dependencies = [ "bitflags 2.9.1", "fsevent-sys", @@ -7026,9 +7026,9 @@ dependencies = [ [[package]] name = "polling" -version = "3.9.0" +version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ee9b2fa7a4517d2c91ff5bc6c297a427a96749d15f98fcdbb22c05571a4d4b7" +checksum = "b5bd19146350fe804f7cb2669c851c03d69da628803dab0d98018142aaa5d829" dependencies = [ "cfg-if", "concurrent-queue", @@ -7320,9 +7320,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.38.0" +version = "0.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8927b0664f5c5a98265138b7e3f90aa19a6b21353182469ace36d4ac527b7b1b" +checksum = "9845d9dccf565065824e69f9f235fafba1587031eda353c1f1561cd6a6be78f4" dependencies = [ "memchr", "serde", @@ -7617,9 +7617,9 @@ dependencies = [ [[package]] name = "redox_users" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78eaea1f52c56d57821be178b2d47e09ff26481a6042e8e042fcb0ced068b470" +checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" dependencies = [ "getrandom 0.2.16", "libredox", @@ -7718,7 +7718,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.4.11", + "h2 0.4.12", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -7851,7 +7851,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4aebc912b8fa7d54999adc4e45601d1d95fe458f97eb0a1277eddcd6382cf4b1" dependencies = [ - "darling 0.21.0", + "darling 0.21.1", "proc-macro2", "quote", "serde_json", @@ -8110,10 +8110,14 @@ dependencies = [ "async-trait", "chrono", "futures", + "lazy_static", "rustfs-common", "rustfs-ecstore", "rustfs-filemeta", + "rustfs-lock", "rustfs-madmin", + "rustfs-utils", + "s3s", "serde", "serde_json", "serial_test", @@ -8123,6 +8127,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "url", "uuid", "walkdir", ] @@ -8225,7 +8230,7 @@ dependencies = [ "once_cell", "path-absolutize", "pin-project-lite", - "quick-xml 0.38.0", + "quick-xml 0.38.1", "rand 0.9.2", "reed-solomon-simd", "regex", @@ -8393,7 +8398,7 @@ dependencies = [ "form_urlencoded", "futures", "once_cell", - "quick-xml 0.38.0", + "quick-xml 0.38.1", "reqwest", "rumqttc", "rustfs-config", @@ -8741,7 +8746,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework 3.2.0", + "security-framework 3.3.0", ] [[package]] @@ -8978,9 +8983,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "3.2.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" +checksum = "80fb1d92c5028aa318b4b8bd7302a5bfcf48be96a37fc6fc790f806b0004ee0c" dependencies = [ "bitflags 2.9.1", "core-foundation 0.10.1", @@ -9387,9 +9392,9 @@ dependencies = [ [[package]] name = "signal-hook-registry" -version = "1.4.5" +version = "1.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +checksum = "b2a4719bff48cee6b39d12c020eeb490953ad2443b7055bd0b21fca26bd8c28b" dependencies = [ "libc", ] @@ -10294,9 +10299,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.15" +version = "0.7.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", @@ -10407,7 +10412,7 @@ dependencies = [ "base64 0.22.1", "bytes", "flate2", - "h2 0.4.11", + "h2 0.4.12", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -12192,9 +12197,9 @@ dependencies = [ [[package]] name = "zerovec" -version = "0.11.2" +version = "0.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a05eb080e015ba39cc9e23bbe5e7fb04d5fb040350f99f34e338d5fdd294428" +checksum = "e7aa2bd55086f1ab526693ecbe444205da57e25f4489879da80635a46d90e73b" dependencies = [ "yoke", "zerofrom", diff --git a/crates/ahm/Cargo.toml b/crates/ahm/Cargo.toml index affb423a..d94f32aa 100644 --- a/crates/ahm/Cargo.toml +++ b/crates/ahm/Cargo.toml @@ -17,6 +17,7 @@ rustfs-ecstore = { workspace = true } rustfs-common = { workspace = true } rustfs-filemeta = { workspace = true } rustfs-madmin = { workspace = true } +rustfs-utils = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-util = { workspace = true } tracing = { workspace = true } @@ -27,6 +28,10 @@ uuid = { workspace = true, features = ["v4", "serde"] } anyhow = { workspace = true } async-trait = { workspace = true } futures = { workspace = true } +url = { workspace = true } +rustfs-lock = { workspace = true } +s3s = { workspace = true } +lazy_static = { workspace = true } chrono = { workspace = true } [dev-dependencies] diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index 2b573cb8..3aa4b9df 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -30,15 +30,16 @@ use tracing::{debug, error, info, warn}; use super::metrics::{BucketMetrics, DiskMetrics, MetricsCollector, ScannerMetrics}; use crate::heal::HealManager; +use crate::scanner::lifecycle::ScannerItem; use crate::{ HealRequest, error::{Error, Result}, get_ahm_services_cancel_token, }; -use rustfs_common::{ - data_usage::DataUsageInfo, - metrics::{Metric, Metrics, globalMetrics}, -}; + +use rustfs_common::data_usage::DataUsageInfo; +use rustfs_common::metrics::{Metric, Metrics, globalMetrics}; +use rustfs_ecstore::cmd::bucket_targets::VersioningConfig; use rustfs_ecstore::disk::RUSTFS_META_BUCKET; @@ -290,7 +291,7 @@ impl Scanner { /// Get global metrics from common crate pub async fn get_global_metrics(&self) -> rustfs_madmin::metrics::ScannerMetrics { - globalMetrics.report().await + (*globalMetrics).report().await } /// Perform a single scan cycle @@ -317,7 +318,7 @@ impl Scanner { cycle_completed: vec![chrono::Utc::now()], started: chrono::Utc::now(), }; - globalMetrics.set_cycle(Some(cycle_info)).await; + (*globalMetrics).set_cycle(Some(cycle_info)).await; self.metrics.set_current_cycle(self.state.read().await.current_cycle); self.metrics.increment_total_cycles(); @@ -1160,6 +1161,19 @@ impl Scanner { /// This method collects all objects from a disk for a specific bucket. /// It returns a map of object names to their metadata for later analysis. async fn scan_volume(&self, disk: &DiskStore, bucket: &str) -> Result> { + let ecstore = match rustfs_ecstore::new_object_layer_fn() { + Some(ecstore) => ecstore, + None => { + error!("ECStore not available"); + return Err(Error::Other("ECStore not available".to_string())); + } + }; + let bucket_info = ecstore.get_bucket_info(bucket, &Default::default()).await.ok(); + let versioning_config = bucket_info.map(|bi| Arc::new(VersioningConfig { enabled: bi.versioning })); + let lifecycle_config = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket) + .await + .ok() + .map(|(c, _)| Arc::new(c)); // Start global metrics collection for volume scan let stop_fn = Metrics::time(Metric::ScanObject); @@ -1247,6 +1261,15 @@ impl Scanner { } } } else { + // Apply lifecycle actions + if let Some(lifecycle_config) = &lifecycle_config { + let mut scanner_item = + ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone()); + if let Err(e) = scanner_item.apply_actions(&entry.name, entry.clone()).await { + error!("Failed to apply lifecycle actions for {}/{}: {}", bucket, entry.name, e); + } + } + // Store object metadata for later analysis object_metadata.insert(entry.name.clone(), file_meta.clone()); } diff --git a/crates/ahm/src/scanner/lifecycle.rs b/crates/ahm/src/scanner/lifecycle.rs new file mode 100644 index 00000000..5d33399d --- /dev/null +++ b/crates/ahm/src/scanner/lifecycle.rs @@ -0,0 +1,125 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use rustfs_common::metrics::IlmAction; +use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc; +use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::{apply_lifecycle_action, eval_action_from_lifecycle}; +use rustfs_ecstore::bucket::metadata_sys::get_object_lock_config; +use rustfs_ecstore::cmd::bucket_targets::VersioningConfig; +use rustfs_ecstore::store_api::ObjectInfo; +use rustfs_filemeta::FileMetaVersion; +use rustfs_filemeta::metacache::MetaCacheEntry; +use s3s::dto::BucketLifecycleConfiguration as LifecycleConfig; +use tracing::info; + +#[derive(Clone)] +pub struct ScannerItem { + bucket: String, + lifecycle: Option>, + versioning: Option>, +} + +impl ScannerItem { + pub fn new(bucket: String, lifecycle: Option>, versioning: Option>) -> Self { + Self { + bucket, + lifecycle, + versioning, + } + } + + pub async fn apply_actions(&mut self, object: &str, mut meta: MetaCacheEntry) -> anyhow::Result<()> { + info!("apply_actions called for object: {}", object); + if self.lifecycle.is_none() { + info!("No lifecycle config for object: {}", object); + return Ok(()); + } + info!("Lifecycle config exists for object: {}", object); + + let file_meta = match meta.xl_meta() { + Ok(meta) => meta, + Err(e) => { + tracing::error!("Failed to get xl_meta for {}: {}", object, e); + return Ok(()); + } + }; + + let latest_version = file_meta.versions.first().cloned().unwrap_or_default(); + let file_meta_version = FileMetaVersion::try_from(latest_version.meta.as_slice()).unwrap_or_default(); + + let obj_info = ObjectInfo { + bucket: self.bucket.clone(), + name: object.to_string(), + version_id: latest_version.header.version_id, + mod_time: latest_version.header.mod_time, + size: file_meta_version.object.as_ref().map_or(0, |o| o.size), + user_defined: serde_json::from_slice(file_meta.data.as_slice()).unwrap_or_default(), + ..Default::default() + }; + + self.apply_lifecycle(&obj_info).await; + + Ok(()) + } + + async fn apply_lifecycle(&mut self, oi: &ObjectInfo) -> (IlmAction, i64) { + let size = oi.size; + if self.lifecycle.is_none() { + return (IlmAction::NoneAction, size); + } + + let (olcfg, rcfg) = if self.bucket != ".minio.sys" { + ( + get_object_lock_config(&self.bucket).await.ok(), + None, // FIXME: replication config + ) + } else { + (None, None) + }; + + let lc_evt = eval_action_from_lifecycle( + self.lifecycle.as_ref().unwrap(), + olcfg + .as_ref() + .and_then(|(c, _)| c.rule.as_ref().and_then(|r| r.default_retention.clone())), + rcfg.clone(), + oi, + ) + .await; + + info!("lifecycle: {} Initial scan: {}", oi.name, lc_evt.action); + + let mut new_size = size; + match lc_evt.action { + IlmAction::DeleteVersionAction | IlmAction::DeleteAllVersionsAction | IlmAction::DelMarkerDeleteAllVersionsAction => { + new_size = 0; + } + IlmAction::DeleteAction => { + if let Some(vcfg) = &self.versioning { + if !vcfg.is_enabled() { + new_size = 0; + } + } else { + new_size = 0; + } + } + _ => (), + } + + apply_lifecycle_action(&lc_evt, &LcEventSrc::Scanner, oi).await; + (lc_evt.action, new_size) + } +} diff --git a/crates/ahm/src/scanner/mod.rs b/crates/ahm/src/scanner/mod.rs index d299c143..13019eb4 100644 --- a/crates/ahm/src/scanner/mod.rs +++ b/crates/ahm/src/scanner/mod.rs @@ -14,6 +14,7 @@ pub mod data_scanner; pub mod histogram; +pub mod lifecycle; pub mod metrics; pub use data_scanner::Scanner; diff --git a/crates/ahm/tests/lifecycle_integration_test.rs b/crates/ahm/tests/lifecycle_integration_test.rs new file mode 100644 index 00000000..88087f80 --- /dev/null +++ b/crates/ahm/tests/lifecycle_integration_test.rs @@ -0,0 +1,243 @@ +use rustfs_ahm::scanner::{Scanner, data_scanner::ScannerConfig}; +use rustfs_ecstore::{ + bucket::metadata::BUCKET_LIFECYCLE_CONFIG, + bucket::metadata_sys, + disk::endpoint::Endpoint, + endpoints::{EndpointServerPools, Endpoints, PoolEndpoints}, + store::ECStore, + store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI}, +}; +use serial_test::serial; +use std::sync::Once; +use std::sync::OnceLock; +use std::{path::PathBuf, sync::Arc, time::Duration}; +use tokio::fs; +use tracing::info; + +static GLOBAL_ENV: OnceLock<(Vec, Arc)> = OnceLock::new(); +static INIT: Once = Once::new(); + +fn init_tracing() { + INIT.call_once(|| { + let _ = tracing_subscriber::fmt::try_init(); + }); +} + +/// Test helper: Create test environment with ECStore +async fn setup_test_env() -> (Vec, Arc) { + init_tracing(); + + // Fast path: already initialized, just clone and return + if let Some((paths, ecstore)) = GLOBAL_ENV.get() { + return (paths.clone(), ecstore.clone()); + } + + // create temp dir as 4 disks with unique base dir + let test_base_dir = format!("/tmp/rustfs_ahm_lifecycle_test_{}", uuid::Uuid::new_v4()); + let temp_dir = std::path::PathBuf::from(&test_base_dir); + if temp_dir.exists() { + fs::remove_dir_all(&temp_dir).await.ok(); + } + fs::create_dir_all(&temp_dir).await.unwrap(); + + // create 4 disk dirs + let disk_paths = vec![ + temp_dir.join("disk1"), + temp_dir.join("disk2"), + temp_dir.join("disk3"), + temp_dir.join("disk4"), + ]; + + for disk_path in &disk_paths { + fs::create_dir_all(disk_path).await.unwrap(); + } + + // create EndpointServerPools + let mut endpoints = Vec::new(); + for (i, disk_path) in disk_paths.iter().enumerate() { + let mut endpoint = Endpoint::try_from(disk_path.to_str().unwrap()).unwrap(); + // set correct index + endpoint.set_pool_index(0); + endpoint.set_set_index(0); + endpoint.set_disk_index(i); + endpoints.push(endpoint); + } + + let pool_endpoints = PoolEndpoints { + legacy: false, + set_count: 1, + drives_per_set: 4, + endpoints: Endpoints::from(endpoints), + cmd_line: "test".to_string(), + platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH), + }; + + let endpoint_pools = EndpointServerPools(vec![pool_endpoints]); + + // format disks (only first time) + rustfs_ecstore::store::init_local_disks(endpoint_pools.clone()).await.unwrap(); + + // create ECStore with dynamic port 0 (let OS assign) or fixed 9002 if free + let port = 9002; // for simplicity + let server_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); + let ecstore = ECStore::new(server_addr, endpoint_pools).await.unwrap(); + + // init bucket metadata system + let buckets_list = ecstore + .list_bucket(&rustfs_ecstore::store_api::BucketOptions { + no_metadata: true, + ..Default::default() + }) + .await + .unwrap(); + let buckets = buckets_list.into_iter().map(|v| v.name).collect(); + rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await; + + // Initialize background expiry workers + rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::init_background_expiry(ecstore.clone()).await; + + // Store in global once lock + let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone())); + + (disk_paths, ecstore) +} + +/// Test helper: Create a test bucket +async fn create_test_bucket(ecstore: &Arc, bucket_name: &str) { + (**ecstore) + .make_bucket(bucket_name, &Default::default()) + .await + .expect("Failed to create test bucket"); + info!("Created test bucket: {}", bucket_name); +} + +/// Test helper: Upload test object +async fn upload_test_object(ecstore: &Arc, bucket: &str, object: &str, data: &[u8]) { + let mut reader = PutObjReader::from_vec(data.to_vec()); + let object_info = (**ecstore) + .put_object(bucket, object, &mut reader, &ObjectOptions::default()) + .await + .expect("Failed to upload test object"); + + info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size); +} + +/// Test helper: Set bucket lifecycle configuration +async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box> { + // Create a simple lifecycle configuration XML with 0 days expiry for immediate testing + let lifecycle_xml = r#" + + + test-rule + Enabled + + test/ + + + 0 + + +"#; + + metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?; + + Ok(()) +} + +/// Test helper: Check if object exists +async fn object_exists(ecstore: &Arc, bucket: &str, object: &str) -> bool { + ((**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await).is_ok() +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_lifecycle_expiry_basic() { + let (_disk_paths, ecstore) = setup_test_env().await; + + // Create test bucket and object + let bucket_name = "test-lifecycle-bucket"; + let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/" + let test_data = b"Hello, this is test data for lifecycle expiry!"; + + create_test_bucket(&ecstore, bucket_name).await; + upload_test_object(&ecstore, bucket_name, object_name, test_data).await; + + // Verify object exists initially + assert!(object_exists(&ecstore, bucket_name, object_name).await); + println!("✅ Object exists before lifecycle processing"); + + // Set lifecycle configuration with very short expiry (0 days = immediate expiry) + set_bucket_lifecycle(bucket_name) + .await + .expect("Failed to set lifecycle configuration"); + println!("✅ Lifecycle configuration set for bucket: {bucket_name}"); + + // Verify lifecycle configuration was set + match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await { + Ok(bucket_meta) => { + assert!(bucket_meta.lifecycle_config.is_some()); + println!("✅ Bucket metadata retrieved successfully"); + } + Err(e) => { + println!("❌ Error retrieving bucket metadata: {e:?}"); + } + } + + // Create scanner with very short intervals for testing + let scanner_config = ScannerConfig { + scan_interval: Duration::from_millis(100), + deep_scan_interval: Duration::from_millis(500), + max_concurrent_scans: 1, + ..Default::default() + }; + + let scanner = Scanner::new(Some(scanner_config), None); + + // Start scanner + scanner.start().await.expect("Failed to start scanner"); + println!("✅ Scanner started"); + + // Wait for scanner to process lifecycle rules + tokio::time::sleep(Duration::from_secs(2)).await; + + // Manually trigger a scan cycle to ensure lifecycle processing + scanner.scan_cycle().await.expect("Failed to trigger scan cycle"); + println!("✅ Manual scan cycle completed"); + + // Wait a bit more for background workers to process expiry tasks + tokio::time::sleep(Duration::from_secs(5)).await; + + // Check if object has been expired (deleted) + let object_still_exists = object_exists(&ecstore, bucket_name, object_name).await; + println!("Object exists after lifecycle processing: {object_still_exists}"); + + if object_still_exists { + println!("❌ Object was not deleted by lifecycle processing"); + // Let's try to get object info to see its details + match ecstore + .get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default()) + .await + { + Ok(obj_info) => { + println!( + "Object info: name={}, size={}, mod_time={:?}", + obj_info.name, obj_info.size, obj_info.mod_time + ); + } + Err(e) => { + println!("Error getting object info: {e:?}"); + } + } + } else { + println!("✅ Object was successfully deleted by lifecycle processing"); + } + + assert!(!object_still_exists); + println!("✅ Object successfully expired"); + + // Stop scanner + let _ = scanner.stop().await; + println!("✅ Scanner stopped"); + + println!("Lifecycle expiry basic test completed"); +} diff --git a/crates/e2e_test/src/reliant/lifecycle.rs b/crates/e2e_test/src/reliant/lifecycle.rs new file mode 100644 index 00000000..9f8b55de --- /dev/null +++ b/crates/e2e_test/src/reliant/lifecycle.rs @@ -0,0 +1,133 @@ +#![cfg(test)] +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use aws_config::meta::region::RegionProviderChain; +use aws_sdk_s3::Client; +use aws_sdk_s3::config::{Credentials, Region}; +use bytes::Bytes; +use serial_test::serial; +use std::error::Error; +use tokio::time::sleep; + +const ENDPOINT: &str = "http://localhost:9000"; +const ACCESS_KEY: &str = "rustfsadmin"; +const SECRET_KEY: &str = "rustfsadmin"; +const BUCKET: &str = "test-basic-bucket"; + +async fn create_aws_s3_client() -> Result> { + let region_provider = RegionProviderChain::default_provider().or_else(Region::new("us-east-1")); + let shared_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(region_provider) + .credentials_provider(Credentials::new(ACCESS_KEY, SECRET_KEY, None, None, "static")) + .endpoint_url(ENDPOINT) + .load() + .await; + + let client = Client::from_conf( + aws_sdk_s3::Config::from(&shared_config) + .to_builder() + .force_path_style(true) + .build(), + ); + Ok(client) +} + +async fn setup_test_bucket(client: &Client) -> Result<(), Box> { + match client.create_bucket().bucket(BUCKET).send().await { + Ok(_) => {} + Err(e) => { + let error_str = e.to_string(); + if !error_str.contains("BucketAlreadyOwnedByYou") && !error_str.contains("BucketAlreadyExists") { + return Err(e.into()); + } + } + } + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[serial] +#[ignore = "requires running RustFS server at localhost:9000"] +async fn test_bucket_lifecycle_configuration() -> Result<(), Box> { + use aws_sdk_s3::types::{BucketLifecycleConfiguration, LifecycleExpiration, LifecycleRule, LifecycleRuleFilter}; + use tokio::time::Duration; + + let client = create_aws_s3_client().await?; + setup_test_bucket(&client).await?; + + // Upload test object first + let test_content = "Test object for lifecycle expiration"; + let lifecycle_object_key = "lifecycle-test-object.txt"; + client + .put_object() + .bucket(BUCKET) + .key(lifecycle_object_key) + .body(Bytes::from(test_content.as_bytes()).into()) + .send() + .await?; + + // Verify object exists initially + let resp = client.get_object().bucket(BUCKET).key(lifecycle_object_key).send().await?; + assert!(resp.content_length().unwrap_or(0) > 0); + + // Configure lifecycle rule: expire after current time + 3 seconds + let expiration = LifecycleExpiration::builder().days(0).build(); + let filter = LifecycleRuleFilter::builder().prefix(lifecycle_object_key).build(); + let rule = LifecycleRule::builder() + .id("expire-test-object") + .filter(filter) + .expiration(expiration) + .status(aws_sdk_s3::types::ExpirationStatus::Enabled) + .build()?; + let lifecycle = BucketLifecycleConfiguration::builder().rules(rule).build()?; + + client + .put_bucket_lifecycle_configuration() + .bucket(BUCKET) + .lifecycle_configuration(lifecycle) + .send() + .await?; + + // Verify lifecycle configuration was set + let resp = client.get_bucket_lifecycle_configuration().bucket(BUCKET).send().await?; + let rules = resp.rules(); + assert!(rules.iter().any(|r| r.id().unwrap_or("") == "expire-test-object")); + + // Wait for lifecycle processing (scanner runs every 1 second) + sleep(Duration::from_secs(3)).await; + + // After lifecycle processing, the object should be deleted by the lifecycle rule + let get_result = client.get_object().bucket(BUCKET).key(lifecycle_object_key).send().await; + + match get_result { + Ok(_) => { + panic!("Expected object to be deleted by lifecycle rule, but it still exists"); + } + Err(e) => { + if let Some(service_error) = e.as_service_error() { + if service_error.is_no_such_key() { + println!("Lifecycle configuration test completed - object was successfully deleted by lifecycle rule"); + } else { + panic!("Expected NoSuchKey error, but got: {e:?}"); + } + } else { + panic!("Expected service error, but got: {e:?}"); + } + } + } + + println!("Lifecycle configuration test completed."); + Ok(()) +} diff --git a/crates/e2e_test/src/reliant/mod.rs b/crates/e2e_test/src/reliant/mod.rs index 00bf9d98..b8d05efd 100644 --- a/crates/e2e_test/src/reliant/mod.rs +++ b/crates/e2e_test/src/reliant/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod lifecycle; mod lock; mod node_interact_test; mod sql; diff --git a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs index e2848821..6fe4c835 100644 --- a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs +++ b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs @@ -516,7 +516,7 @@ impl TransitionState { if let Err(err) = transition_object(api.clone(), &task.obj_info, LcAuditEvent::new(task.event.clone(), task.src.clone())).await { if !is_err_version_not_found(&err) && !is_err_object_not_found(&err) && !is_network_or_host_down(&err.to_string(), false) && !err.to_string().contains("use of closed network connection") { error!("Transition to {} failed for {}/{} version:{} with {}", - task.event.storage_class, task.obj_info.bucket, task.obj_info.name, task.obj_info.version_id.expect("err"), err.to_string()); + task.event.storage_class, task.obj_info.bucket, task.obj_info.name, task.obj_info.version_id.map(|v| v.to_string()).unwrap_or_default(), err.to_string()); } } else { let mut ts = TierStats { @@ -743,7 +743,7 @@ pub async fn transition_object(api: Arc, oi: &ObjectInfo, lae: LcAuditE ..Default::default() }, //lifecycle_audit_event: lae, - version_id: Some(oi.version_id.expect("err").to_string()), + version_id: oi.version_id.map(|v| v.to_string()), versioned: BucketVersioningSys::prefix_enabled(&oi.bucket, &oi.name).await, version_suspended: BucketVersioningSys::prefix_suspended(&oi.bucket, &oi.name).await, mod_time: oi.mod_time, @@ -808,7 +808,7 @@ impl LifecycleOps for ObjectInfo { lifecycle::ObjectOpts { name: self.name.clone(), user_tags: self.user_tags.clone(), - version_id: self.version_id.expect("err").to_string(), + version_id: self.version_id.map(|v| v.to_string()).unwrap_or_default(), mod_time: self.mod_time, size: self.size as usize, is_latest: self.is_latest, @@ -874,7 +874,11 @@ pub async fn eval_action_from_lifecycle( if lock_enabled && enforce_retention_for_deletion(oi) { //if serverDebugLog { if oi.version_id.is_some() { - info!("lifecycle: {} v({}) is locked, not deleting", oi.name, oi.version_id.expect("err")); + info!( + "lifecycle: {} v({}) is locked, not deleting", + oi.name, + oi.version_id.map(|v| v.to_string()).unwrap_or_default() + ); } else { info!("lifecycle: {} is locked, not deleting", oi.name); } @@ -928,7 +932,7 @@ pub async fn apply_expiry_on_non_transitioned_objects( }; if lc_event.action.delete_versioned() { - opts.version_id = Some(oi.version_id.expect("err").to_string()); + opts.version_id = oi.version_id.map(|v| v.to_string()); } opts.versioned = BucketVersioningSys::prefix_enabled(&oi.bucket, &oi.name).await; diff --git a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs index 22caf860..f1d4a311 100644 --- a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs +++ b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs @@ -27,6 +27,7 @@ use std::env; use std::fmt::Display; use time::macros::{datetime, offset}; use time::{self, Duration, OffsetDateTime}; +use tracing::info; use crate::bucket::lifecycle::rule::TransitionOps; @@ -279,7 +280,12 @@ impl Lifecycle for BucketLifecycleConfiguration { async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event { let mut events = Vec::::new(); + info!( + "eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}", + obj.name, obj.mod_time, now, obj.is_latest, obj.delete_marker + ); if obj.mod_time.expect("err").unix_timestamp() == 0 { + info!("eval_inner: mod_time is 0, returning default event"); return Event::default(); } @@ -418,7 +424,16 @@ impl Lifecycle for BucketLifecycleConfiguration { } } - if obj.is_latest && !obj.delete_marker { + info!( + "eval_inner: checking expiration condition - is_latest={}, delete_marker={}, version_id={:?}, condition_met={}", + obj.is_latest, + obj.delete_marker, + obj.version_id, + (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker + ); + // Allow expiration for latest objects OR non-versioned objects (empty version_id) + if (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker { + info!("eval_inner: entering expiration check"); if let Some(ref expiration) = rule.expiration { if let Some(ref date) = expiration.date { let date0 = OffsetDateTime::from(date.clone()); @@ -435,22 +450,29 @@ impl Lifecycle for BucketLifecycleConfiguration { }); } } else if let Some(days) = expiration.days { - if days != 0 { - let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.expect("err!"), days); - if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() { - let mut event = Event { - action: IlmAction::DeleteAction, - rule_id: rule.id.clone().expect("err!"), - due: Some(expected_expiry), - noncurrent_days: 0, - newer_noncurrent_versions: 0, - storage_class: "".into(), - }; - /*if rule.expiration.expect("err!").delete_all.val { - event.action = IlmAction::DeleteAllVersionsAction - }*/ - events.push(event); - } + let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.expect("err!"), days); + info!( + "eval_inner: expiration check - days={}, obj_time={:?}, expiry_time={:?}, now={:?}, should_expire={}", + days, + obj.mod_time.expect("err!"), + expected_expiry, + now, + now.unix_timestamp() > expected_expiry.unix_timestamp() + ); + if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() { + info!("eval_inner: object should expire, adding DeleteAction"); + let mut event = Event { + action: IlmAction::DeleteAction, + rule_id: rule.id.clone().expect("err!"), + due: Some(expected_expiry), + noncurrent_days: 0, + newer_noncurrent_versions: 0, + storage_class: "".into(), + }; + /*if rule.expiration.expect("err!").delete_all.val { + event.action = IlmAction::DeleteAllVersionsAction + }*/ + events.push(event); } } } @@ -598,7 +620,7 @@ impl LifecycleCalculate for Transition { pub fn expected_expiry_time(mod_time: OffsetDateTime, days: i32) -> OffsetDateTime { if days == 0 { - return mod_time; + return OffsetDateTime::UNIX_EPOCH; // Return epoch time to ensure immediate expiry } let t = mod_time .to_offset(offset!(-0:00:00)) diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 517acf6a..2a4d8409 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -3837,7 +3837,90 @@ impl StorageAPI for SetDisks { return Ok(ObjectInfo::default()); } - unimplemented!() + + // Create a single object deletion request + let mut vr = FileInfo { + name: object.to_string(), + version_id: opts.version_id.as_ref().and_then(|v| Uuid::parse_str(v).ok()), + ..Default::default() + }; + + // Handle versioning + let (suspended, versioned) = (opts.version_suspended, opts.versioned); + if opts.version_id.is_none() && (suspended || versioned) { + vr.mod_time = Some(OffsetDateTime::now_utc()); + vr.deleted = true; + if versioned { + vr.version_id = Some(Uuid::new_v4()); + } + } + + let vers = vec![FileInfoVersions { + name: vr.name.clone(), + versions: vec![vr.clone()], + ..Default::default() + }]; + + let disks = self.disks.read().await; + let disks = disks.clone(); + let write_quorum = disks.len() / 2 + 1; + + let mut futures = Vec::with_capacity(disks.len()); + let mut errs = Vec::with_capacity(disks.len()); + + for disk in disks.iter() { + let vers = vers.clone(); + futures.push(async move { + if let Some(disk) = disk { + disk.delete_versions(bucket, vers, DeleteOptions::default()).await + } else { + Err(DiskError::DiskNotFound) + } + }); + } + + let results = join_all(futures).await; + + for result in results { + match result { + Ok(disk_errs) => { + // Handle errors from disk operations + for err in disk_errs.iter().flatten() { + warn!("delete_object disk error: {:?}", err); + } + errs.push(None); + } + Err(e) => { + errs.push(Some(e)); + } + } + } + + // Check write quorum + if let Some(err) = reduce_write_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, write_quorum) { + return Err(to_object_err(err.into(), vec![bucket, object])); + } + + // Create result ObjectInfo + let result_info = if vr.deleted { + ObjectInfo { + bucket: bucket.to_string(), + name: object.to_string(), + delete_marker: true, + mod_time: vr.mod_time, + version_id: vr.version_id, + ..Default::default() + } + } else { + ObjectInfo { + bucket: bucket.to_string(), + name: object.to_string(), + version_id: vr.version_id, + ..Default::default() + } + }; + + Ok(result_info) } #[tracing::instrument(skip(self))] diff --git a/crates/filemeta/src/lib.rs b/crates/filemeta/src/lib.rs index 32719fdc..48b1bf16 100644 --- a/crates/filemeta/src/lib.rs +++ b/crates/filemeta/src/lib.rs @@ -17,7 +17,7 @@ pub mod fileinfo; mod filemeta; mod filemeta_inline; pub mod headers; -mod metacache; +pub mod metacache; pub mod test_data;