feat(lifecycle): Implement object lifecycle management functionality (#358)

* feat(lifecycle): Implement object lifecycle management functionality

Add a lifecycle module to automatically handle object expiration and transition during scanning
Modify the file metadata cache module to be publicly visible to support lifecycle operations
Adjust the scanning interval to a shorter time for testing lifecycle rules
Implement the parsing and execution logic for S3 lifecycle configurations
Add integration tests to verify the lifecycle expiration functionality
Update dependencies to support the new lifecycle features

Signed-off-by: junxiang Mu <1948535941@qq.com>

* fix cargo dependencies

Signed-off-by: junxiang Mu <1948535941@qq.com>

* fix fmt

Signed-off-by: junxiang Mu <1948535941@qq.com>

---------

Signed-off-by: junxiang Mu <1948535941@qq.com>
Co-authored-by: houseme <housemecn@gmail.com>
This commit is contained in:
guojidan
2025-08-07 19:51:02 -07:00
committed by GitHub
parent 48a9707110
commit d987686c14
12 changed files with 756 additions and 111 deletions

165
Cargo.lock generated
View File

@@ -119,9 +119,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]]
name = "anstream"
version = "0.6.19"
version = "0.6.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933"
checksum = "3ae563653d1938f79b1ab1b5e668c87c76a9930414574a6583a7b7e11a8e6192"
dependencies = [
"anstyle",
"anstyle-parse",
@@ -149,22 +149,22 @@ dependencies = [
[[package]]
name = "anstyle-query"
version = "1.1.3"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9"
checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2"
dependencies = [
"windows-sys 0.59.0",
"windows-sys 0.60.2",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.9"
version = "3.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882"
checksum = "3e0633414522a32ffaac8ac6cc8f748e090c5717661fddeea04219e2344f5f2a"
dependencies = [
"anstyle",
"once_cell_polyfill",
"windows-sys 0.59.0",
"windows-sys 0.60.2",
]
[[package]]
@@ -522,9 +522,9 @@ dependencies = [
[[package]]
name = "async-lock"
version = "3.4.0"
version = "3.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18"
checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc"
dependencies = [
"event-listener",
"event-listener-strategy",
@@ -674,9 +674,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "aws-config"
version = "1.8.3"
version = "1.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0baa720ebadea158c5bda642ac444a2af0cdf7bb66b46d1e4533de5d1f449d0"
checksum = "483020b893cdef3d89637e428d588650c71cfae7ea2e6ecbaee4de4ff99fb2dd"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -704,9 +704,9 @@ dependencies = [
[[package]]
name = "aws-credential-types"
version = "1.2.4"
version = "1.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b68c2194a190e1efc999612792e25b1ab3abfefe4306494efaaabc25933c0cbe"
checksum = "1541072f81945fa1251f8795ef6c92c4282d74d59f88498ae7d4bf00f0ebdad9"
dependencies = [
"aws-smithy-async",
"aws-smithy-runtime-api",
@@ -739,9 +739,9 @@ dependencies = [
[[package]]
name = "aws-runtime"
version = "1.5.9"
version = "1.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2090e664216c78e766b6bac10fe74d2f451c02441d43484cd76ac9a295075f7"
checksum = "c034a1bc1d70e16e7f4e4caf7e9f7693e4c9c24cd91cf17c2a0b21abaebc7c8b"
dependencies = [
"aws-credential-types",
"aws-sigv4",
@@ -764,9 +764,9 @@ dependencies = [
[[package]]
name = "aws-sdk-s3"
version = "1.100.0"
version = "1.101.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c5eafbdcd898114b839ba68ac628e31c4cfc3e11dfca38dc1b2de2f35bb6270"
checksum = "7b16efa59a199f5271bf21ab3e570c5297d819ce4f240e6cf0096d1dc0049c44"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -798,9 +798,9 @@ dependencies = [
[[package]]
name = "aws-sdk-sso"
version = "1.78.0"
version = "1.79.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbd7bc4bd34303733bded362c4c997a39130eac4310257c79aae8484b1c4b724"
checksum = "0a847168f15b46329fa32c7aca4e4f1a2e072f9b422f0adb19756f2e1457f111"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -820,9 +820,9 @@ dependencies = [
[[package]]
name = "aws-sdk-ssooidc"
version = "1.79.0"
version = "1.80.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77358d25f781bb106c1a69531231d4fd12c6be904edb0c47198c604df5a2dbca"
checksum = "b654dd24d65568738593e8239aef279a86a15374ec926ae8714e2d7245f34149"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -842,9 +842,9 @@ dependencies = [
[[package]]
name = "aws-sdk-sts"
version = "1.80.0"
version = "1.81.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06e3ed2a9b828ae7763ddaed41d51724d2661a50c45f845b08967e52f4939cfc"
checksum = "c92ea8a7602321c83615c82b408820ad54280fb026e92de0eeea937342fafa24"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -865,9 +865,9 @@ dependencies = [
[[package]]
name = "aws-sigv4"
version = "1.3.3"
version = "1.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddfb9021f581b71870a17eac25b52335b82211cdc092e02b6876b2bcefa61666"
checksum = "084c34162187d39e3740cb635acd73c4e3a551a36146ad6fe8883c929c9f876c"
dependencies = [
"aws-credential-types",
"aws-smithy-eventstream",
@@ -904,9 +904,9 @@ dependencies = [
[[package]]
name = "aws-smithy-checksums"
version = "0.63.5"
version = "0.63.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ab9472f7a8ec259ddb5681d2ef1cb1cf16c0411890063e67cdc7b62562cc496"
checksum = "9054b4cc5eda331cde3096b1576dec45365c5cbbca61d1fffa5f236e251dfce7"
dependencies = [
"aws-smithy-http",
"aws-smithy-types",
@@ -935,9 +935,9 @@ dependencies = [
[[package]]
name = "aws-smithy-http"
version = "0.62.2"
version = "0.62.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43c82ba4cab184ea61f6edaafc1072aad3c2a17dcf4c0fce19ac5694b90d8b5f"
checksum = "7c4dacf2d38996cf729f55e7a762b30918229917eca115de45dfa8dfb97796c9"
dependencies = [
"aws-smithy-eventstream",
"aws-smithy-runtime-api",
@@ -964,7 +964,7 @@ dependencies = [
"aws-smithy-runtime-api",
"aws-smithy-types",
"h2 0.3.27",
"h2 0.4.11",
"h2 0.4.12",
"http 0.2.12",
"http 1.3.1",
"http-body 0.4.6",
@@ -1013,9 +1013,9 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime"
version = "1.8.5"
version = "1.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "660f70d9d8af6876b4c9aa8dcb0dbaf0f89b04ee9a4455bea1b4ba03b15f26f6"
checksum = "9e107ce0783019dbff59b3a244aa0c114e4a8c9d93498af9162608cd5474e796"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
@@ -1037,9 +1037,9 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime-api"
version = "1.8.5"
version = "1.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "937a49ecf061895fca4a6dd8e864208ed9be7546c0527d04bc07d502ec5fba1c"
checksum = "75d52251ed4b9776a3e8487b2a01ac915f73b2da3af8fc1e77e0fce697a550d4"
dependencies = [
"aws-smithy-async",
"aws-smithy-types",
@@ -1540,9 +1540,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "cc"
version = "1.2.30"
version = "1.2.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "deec109607ca693028562ed836a5f1c4b8bd77755c4e132fc5ce11b0b6211ae7"
checksum = "c3a42d84bb6b69d3a8b3eaacf0d88f179e1929695e1ad012b6cf64d9caaa5fd2"
dependencies = [
"jobserver",
"libc",
@@ -1697,9 +1697,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.42"
version = "4.5.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed87a9d530bb41a67537289bafcac159cb3ee28460e0a4571123d2a778a6a882"
checksum = "50fd97c9dc2399518aa331917ac6f274280ec5eb34e555dd291899745c48ec6f"
dependencies = [
"clap_builder",
"clap_derive",
@@ -1707,9 +1707,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.42"
version = "4.5.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64f4f3f3c77c94aff3c7e9aac9a2ca1974a5adf392a8bb751e827d6d127ab966"
checksum = "c35b5830294e1fa0462034af85cc95225a4cb07092c088c55bda3147cfcd8f65"
dependencies = [
"anstream",
"anstyle",
@@ -2293,12 +2293,12 @@ dependencies = [
[[package]]
name = "darling"
version = "0.21.0"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a79c4acb1fd5fa3d9304be4c76e031c54d2e92d172a393e24b19a14fe8532fe9"
checksum = "d6b136475da5ef7b6ac596c0e956e37bad51b85b987ff3d5e230e964936736b2"
dependencies = [
"darling_core 0.21.0",
"darling_macro 0.21.0",
"darling_core 0.21.1",
"darling_macro 0.21.1",
]
[[package]]
@@ -2317,9 +2317,9 @@ dependencies = [
[[package]]
name = "darling_core"
version = "0.21.0"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74875de90daf30eb59609910b84d4d368103aaec4c924824c6799b28f77d6a1d"
checksum = "b44ad32f92b75fb438b04b68547e521a548be8acc339a6dacc4a7121488f53e6"
dependencies = [
"fnv",
"ident_case",
@@ -2342,11 +2342,11 @@ dependencies = [
[[package]]
name = "darling_macro"
version = "0.21.0"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e79f8e61677d5df9167cd85265f8e5f64b215cdea3fb55eebc3e622e44c7a146"
checksum = "2b5be8a7a562d315a5b92a630c30cec6bcf663e6673f00fbb69cca66a6f521b9"
dependencies = [
"darling_core 0.21.0",
"darling_core 0.21.1",
"quote",
"syn 2.0.104",
]
@@ -3738,9 +3738,9 @@ dependencies = [
[[package]]
name = "event-listener"
version = "5.4.0"
version = "5.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae"
checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab"
dependencies = [
"concurrent-queue",
"parking",
@@ -4001,9 +4001,9 @@ checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-lite"
version = "2.6.0"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532"
checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad"
dependencies = [
"fastrand",
"futures-core",
@@ -4490,9 +4490,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.4.11"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17da50a276f1e01e0ba6c029e47b7100754904ee8a278f886546e98575380785"
checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386"
dependencies = [
"atomic-waker",
"bytes",
@@ -4742,7 +4742,7 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"h2 0.4.11",
"h2 0.4.12",
"http 1.3.1",
"http-body 1.0.1",
"httparse",
@@ -5251,7 +5251,7 @@ dependencies = [
"dbus-secret-service",
"log",
"security-framework 2.11.1",
"security-framework 3.2.0",
"security-framework 3.3.0",
"windows-sys 0.60.2",
"zeroize",
]
@@ -5957,9 +5957,9 @@ dependencies = [
[[package]]
name = "notify"
version = "8.1.0"
version = "8.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3163f59cd3fa0e9ef8c32f242966a7b9994fd7378366099593e0e73077cd8c97"
checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3"
dependencies = [
"bitflags 2.9.1",
"fsevent-sys",
@@ -7026,9 +7026,9 @@ dependencies = [
[[package]]
name = "polling"
version = "3.9.0"
version = "3.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee9b2fa7a4517d2c91ff5bc6c297a427a96749d15f98fcdbb22c05571a4d4b7"
checksum = "b5bd19146350fe804f7cb2669c851c03d69da628803dab0d98018142aaa5d829"
dependencies = [
"cfg-if",
"concurrent-queue",
@@ -7320,9 +7320,9 @@ dependencies = [
[[package]]
name = "quick-xml"
version = "0.38.0"
version = "0.38.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8927b0664f5c5a98265138b7e3f90aa19a6b21353182469ace36d4ac527b7b1b"
checksum = "9845d9dccf565065824e69f9f235fafba1587031eda353c1f1561cd6a6be78f4"
dependencies = [
"memchr",
"serde",
@@ -7617,9 +7617,9 @@ dependencies = [
[[package]]
name = "redox_users"
version = "0.5.1"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78eaea1f52c56d57821be178b2d47e09ff26481a6042e8e042fcb0ced068b470"
checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac"
dependencies = [
"getrandom 0.2.16",
"libredox",
@@ -7718,7 +7718,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2 0.4.11",
"h2 0.4.12",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
@@ -7851,7 +7851,7 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4aebc912b8fa7d54999adc4e45601d1d95fe458f97eb0a1277eddcd6382cf4b1"
dependencies = [
"darling 0.21.0",
"darling 0.21.1",
"proc-macro2",
"quote",
"serde_json",
@@ -8110,10 +8110,14 @@ dependencies = [
"async-trait",
"chrono",
"futures",
"lazy_static",
"rustfs-common",
"rustfs-ecstore",
"rustfs-filemeta",
"rustfs-lock",
"rustfs-madmin",
"rustfs-utils",
"s3s",
"serde",
"serde_json",
"serial_test",
@@ -8123,6 +8127,7 @@ dependencies = [
"tokio-util",
"tracing",
"tracing-subscriber",
"url",
"uuid",
"walkdir",
]
@@ -8225,7 +8230,7 @@ dependencies = [
"once_cell",
"path-absolutize",
"pin-project-lite",
"quick-xml 0.38.0",
"quick-xml 0.38.1",
"rand 0.9.2",
"reed-solomon-simd",
"regex",
@@ -8393,7 +8398,7 @@ dependencies = [
"form_urlencoded",
"futures",
"once_cell",
"quick-xml 0.38.0",
"quick-xml 0.38.1",
"reqwest",
"rumqttc",
"rustfs-config",
@@ -8741,7 +8746,7 @@ dependencies = [
"openssl-probe",
"rustls-pki-types",
"schannel",
"security-framework 3.2.0",
"security-framework 3.3.0",
]
[[package]]
@@ -8978,9 +8983,9 @@ dependencies = [
[[package]]
name = "security-framework"
version = "3.2.0"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316"
checksum = "80fb1d92c5028aa318b4b8bd7302a5bfcf48be96a37fc6fc790f806b0004ee0c"
dependencies = [
"bitflags 2.9.1",
"core-foundation 0.10.1",
@@ -9387,9 +9392,9 @@ dependencies = [
[[package]]
name = "signal-hook-registry"
version = "1.4.5"
version = "1.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410"
checksum = "b2a4719bff48cee6b39d12c020eeb490953ad2443b7055bd0b21fca26bd8c28b"
dependencies = [
"libc",
]
@@ -10294,9 +10299,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.15"
version = "0.7.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df"
checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5"
dependencies = [
"bytes",
"futures-core",
@@ -10407,7 +10412,7 @@ dependencies = [
"base64 0.22.1",
"bytes",
"flate2",
"h2 0.4.11",
"h2 0.4.12",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
@@ -12192,9 +12197,9 @@ dependencies = [
[[package]]
name = "zerovec"
version = "0.11.2"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a05eb080e015ba39cc9e23bbe5e7fb04d5fb040350f99f34e338d5fdd294428"
checksum = "e7aa2bd55086f1ab526693ecbe444205da57e25f4489879da80635a46d90e73b"
dependencies = [
"yoke",
"zerofrom",

View File

@@ -17,6 +17,7 @@ rustfs-ecstore = { workspace = true }
rustfs-common = { workspace = true }
rustfs-filemeta = { workspace = true }
rustfs-madmin = { workspace = true }
rustfs-utils = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
@@ -27,6 +28,10 @@ uuid = { workspace = true, features = ["v4", "serde"] }
anyhow = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
url = { workspace = true }
rustfs-lock = { workspace = true }
s3s = { workspace = true }
lazy_static = { workspace = true }
chrono = { workspace = true }
[dev-dependencies]

View File

@@ -30,15 +30,16 @@ use tracing::{debug, error, info, warn};
use super::metrics::{BucketMetrics, DiskMetrics, MetricsCollector, ScannerMetrics};
use crate::heal::HealManager;
use crate::scanner::lifecycle::ScannerItem;
use crate::{
HealRequest,
error::{Error, Result},
get_ahm_services_cancel_token,
};
use rustfs_common::{
data_usage::DataUsageInfo,
metrics::{Metric, Metrics, globalMetrics},
};
use rustfs_common::data_usage::DataUsageInfo;
use rustfs_common::metrics::{Metric, Metrics, globalMetrics};
use rustfs_ecstore::cmd::bucket_targets::VersioningConfig;
use rustfs_ecstore::disk::RUSTFS_META_BUCKET;
@@ -290,7 +291,7 @@ impl Scanner {
/// Get global metrics from common crate
pub async fn get_global_metrics(&self) -> rustfs_madmin::metrics::ScannerMetrics {
globalMetrics.report().await
(*globalMetrics).report().await
}
/// Perform a single scan cycle
@@ -317,7 +318,7 @@ impl Scanner {
cycle_completed: vec![chrono::Utc::now()],
started: chrono::Utc::now(),
};
globalMetrics.set_cycle(Some(cycle_info)).await;
(*globalMetrics).set_cycle(Some(cycle_info)).await;
self.metrics.set_current_cycle(self.state.read().await.current_cycle);
self.metrics.increment_total_cycles();
@@ -1160,6 +1161,19 @@ impl Scanner {
/// This method collects all objects from a disk for a specific bucket.
/// It returns a map of object names to their metadata for later analysis.
async fn scan_volume(&self, disk: &DiskStore, bucket: &str) -> Result<HashMap<String, rustfs_filemeta::FileMeta>> {
let ecstore = match rustfs_ecstore::new_object_layer_fn() {
Some(ecstore) => ecstore,
None => {
error!("ECStore not available");
return Err(Error::Other("ECStore not available".to_string()));
}
};
let bucket_info = ecstore.get_bucket_info(bucket, &Default::default()).await.ok();
let versioning_config = bucket_info.map(|bi| Arc::new(VersioningConfig { enabled: bi.versioning }));
let lifecycle_config = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket)
.await
.ok()
.map(|(c, _)| Arc::new(c));
// Start global metrics collection for volume scan
let stop_fn = Metrics::time(Metric::ScanObject);
@@ -1247,6 +1261,15 @@ impl Scanner {
}
}
} else {
// Apply lifecycle actions
if let Some(lifecycle_config) = &lifecycle_config {
let mut scanner_item =
ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone());
if let Err(e) = scanner_item.apply_actions(&entry.name, entry.clone()).await {
error!("Failed to apply lifecycle actions for {}/{}: {}", bucket, entry.name, e);
}
}
// Store object metadata for later analysis
object_metadata.insert(entry.name.clone(), file_meta.clone());
}

View File

@@ -0,0 +1,125 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use rustfs_common::metrics::IlmAction;
use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc;
use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::{apply_lifecycle_action, eval_action_from_lifecycle};
use rustfs_ecstore::bucket::metadata_sys::get_object_lock_config;
use rustfs_ecstore::cmd::bucket_targets::VersioningConfig;
use rustfs_ecstore::store_api::ObjectInfo;
use rustfs_filemeta::FileMetaVersion;
use rustfs_filemeta::metacache::MetaCacheEntry;
use s3s::dto::BucketLifecycleConfiguration as LifecycleConfig;
use tracing::info;
#[derive(Clone)]
pub struct ScannerItem {
bucket: String,
lifecycle: Option<Arc<LifecycleConfig>>,
versioning: Option<Arc<VersioningConfig>>,
}
impl ScannerItem {
pub fn new(bucket: String, lifecycle: Option<Arc<LifecycleConfig>>, versioning: Option<Arc<VersioningConfig>>) -> Self {
Self {
bucket,
lifecycle,
versioning,
}
}
pub async fn apply_actions(&mut self, object: &str, mut meta: MetaCacheEntry) -> anyhow::Result<()> {
info!("apply_actions called for object: {}", object);
if self.lifecycle.is_none() {
info!("No lifecycle config for object: {}", object);
return Ok(());
}
info!("Lifecycle config exists for object: {}", object);
let file_meta = match meta.xl_meta() {
Ok(meta) => meta,
Err(e) => {
tracing::error!("Failed to get xl_meta for {}: {}", object, e);
return Ok(());
}
};
let latest_version = file_meta.versions.first().cloned().unwrap_or_default();
let file_meta_version = FileMetaVersion::try_from(latest_version.meta.as_slice()).unwrap_or_default();
let obj_info = ObjectInfo {
bucket: self.bucket.clone(),
name: object.to_string(),
version_id: latest_version.header.version_id,
mod_time: latest_version.header.mod_time,
size: file_meta_version.object.as_ref().map_or(0, |o| o.size),
user_defined: serde_json::from_slice(file_meta.data.as_slice()).unwrap_or_default(),
..Default::default()
};
self.apply_lifecycle(&obj_info).await;
Ok(())
}
async fn apply_lifecycle(&mut self, oi: &ObjectInfo) -> (IlmAction, i64) {
let size = oi.size;
if self.lifecycle.is_none() {
return (IlmAction::NoneAction, size);
}
let (olcfg, rcfg) = if self.bucket != ".minio.sys" {
(
get_object_lock_config(&self.bucket).await.ok(),
None, // FIXME: replication config
)
} else {
(None, None)
};
let lc_evt = eval_action_from_lifecycle(
self.lifecycle.as_ref().unwrap(),
olcfg
.as_ref()
.and_then(|(c, _)| c.rule.as_ref().and_then(|r| r.default_retention.clone())),
rcfg.clone(),
oi,
)
.await;
info!("lifecycle: {} Initial scan: {}", oi.name, lc_evt.action);
let mut new_size = size;
match lc_evt.action {
IlmAction::DeleteVersionAction | IlmAction::DeleteAllVersionsAction | IlmAction::DelMarkerDeleteAllVersionsAction => {
new_size = 0;
}
IlmAction::DeleteAction => {
if let Some(vcfg) = &self.versioning {
if !vcfg.is_enabled() {
new_size = 0;
}
} else {
new_size = 0;
}
}
_ => (),
}
apply_lifecycle_action(&lc_evt, &LcEventSrc::Scanner, oi).await;
(lc_evt.action, new_size)
}
}

View File

@@ -14,6 +14,7 @@
pub mod data_scanner;
pub mod histogram;
pub mod lifecycle;
pub mod metrics;
pub use data_scanner::Scanner;

View File

@@ -0,0 +1,243 @@
use rustfs_ahm::scanner::{Scanner, data_scanner::ScannerConfig};
use rustfs_ecstore::{
bucket::metadata::BUCKET_LIFECYCLE_CONFIG,
bucket::metadata_sys,
disk::endpoint::Endpoint,
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
store::ECStore,
store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
};
use serial_test::serial;
use std::sync::Once;
use std::sync::OnceLock;
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::fs;
use tracing::info;
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
static INIT: Once = Once::new();
fn init_tracing() {
INIT.call_once(|| {
let _ = tracing_subscriber::fmt::try_init();
});
}
/// Test helper: Create test environment with ECStore
async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>) {
init_tracing();
// Fast path: already initialized, just clone and return
if let Some((paths, ecstore)) = GLOBAL_ENV.get() {
return (paths.clone(), ecstore.clone());
}
// create temp dir as 4 disks with unique base dir
let test_base_dir = format!("/tmp/rustfs_ahm_lifecycle_test_{}", uuid::Uuid::new_v4());
let temp_dir = std::path::PathBuf::from(&test_base_dir);
if temp_dir.exists() {
fs::remove_dir_all(&temp_dir).await.ok();
}
fs::create_dir_all(&temp_dir).await.unwrap();
// create 4 disk dirs
let disk_paths = vec![
temp_dir.join("disk1"),
temp_dir.join("disk2"),
temp_dir.join("disk3"),
temp_dir.join("disk4"),
];
for disk_path in &disk_paths {
fs::create_dir_all(disk_path).await.unwrap();
}
// create EndpointServerPools
let mut endpoints = Vec::new();
for (i, disk_path) in disk_paths.iter().enumerate() {
let mut endpoint = Endpoint::try_from(disk_path.to_str().unwrap()).unwrap();
// set correct index
endpoint.set_pool_index(0);
endpoint.set_set_index(0);
endpoint.set_disk_index(i);
endpoints.push(endpoint);
}
let pool_endpoints = PoolEndpoints {
legacy: false,
set_count: 1,
drives_per_set: 4,
endpoints: Endpoints::from(endpoints),
cmd_line: "test".to_string(),
platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH),
};
let endpoint_pools = EndpointServerPools(vec![pool_endpoints]);
// format disks (only first time)
rustfs_ecstore::store::init_local_disks(endpoint_pools.clone()).await.unwrap();
// create ECStore with dynamic port 0 (let OS assign) or fixed 9002 if free
let port = 9002; // for simplicity
let server_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let ecstore = ECStore::new(server_addr, endpoint_pools).await.unwrap();
// init bucket metadata system
let buckets_list = ecstore
.list_bucket(&rustfs_ecstore::store_api::BucketOptions {
no_metadata: true,
..Default::default()
})
.await
.unwrap();
let buckets = buckets_list.into_iter().map(|v| v.name).collect();
rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await;
// Initialize background expiry workers
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::init_background_expiry(ecstore.clone()).await;
// Store in global once lock
let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone()));
(disk_paths, ecstore)
}
/// Test helper: Create a test bucket
async fn create_test_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
(**ecstore)
.make_bucket(bucket_name, &Default::default())
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Upload test object
async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str, data: &[u8]) {
let mut reader = PutObjReader::from_vec(data.to_vec());
let object_info = (**ecstore)
.put_object(bucket, object, &mut reader, &ObjectOptions::default())
.await
.expect("Failed to upload test object");
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
}
/// Test helper: Set bucket lifecycle configuration
async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<ID>test-rule</ID>
<Status>Enabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<Expiration>
<Days>0</Days>
</Expiration>
</Rule>
</LifecycleConfiguration>"#;
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
Ok(())
}
/// Test helper: Check if object exists
async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
((**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await).is_ok()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_lifecycle_expiry_basic() {
let (_disk_paths, ecstore) = setup_test_env().await;
// Create test bucket and object
let bucket_name = "test-lifecycle-bucket";
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
let test_data = b"Hello, this is test data for lifecycle expiry!";
create_test_bucket(&ecstore, bucket_name).await;
upload_test_object(&ecstore, bucket_name, object_name, test_data).await;
// Verify object exists initially
assert!(object_exists(&ecstore, bucket_name, object_name).await);
println!("✅ Object exists before lifecycle processing");
// Set lifecycle configuration with very short expiry (0 days = immediate expiry)
set_bucket_lifecycle(bucket_name)
.await
.expect("Failed to set lifecycle configuration");
println!("✅ Lifecycle configuration set for bucket: {bucket_name}");
// Verify lifecycle configuration was set
match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await {
Ok(bucket_meta) => {
assert!(bucket_meta.lifecycle_config.is_some());
println!("✅ Bucket metadata retrieved successfully");
}
Err(e) => {
println!("❌ Error retrieving bucket metadata: {e:?}");
}
}
// Create scanner with very short intervals for testing
let scanner_config = ScannerConfig {
scan_interval: Duration::from_millis(100),
deep_scan_interval: Duration::from_millis(500),
max_concurrent_scans: 1,
..Default::default()
};
let scanner = Scanner::new(Some(scanner_config), None);
// Start scanner
scanner.start().await.expect("Failed to start scanner");
println!("✅ Scanner started");
// Wait for scanner to process lifecycle rules
tokio::time::sleep(Duration::from_secs(2)).await;
// Manually trigger a scan cycle to ensure lifecycle processing
scanner.scan_cycle().await.expect("Failed to trigger scan cycle");
println!("✅ Manual scan cycle completed");
// Wait a bit more for background workers to process expiry tasks
tokio::time::sleep(Duration::from_secs(5)).await;
// Check if object has been expired (deleted)
let object_still_exists = object_exists(&ecstore, bucket_name, object_name).await;
println!("Object exists after lifecycle processing: {object_still_exists}");
if object_still_exists {
println!("❌ Object was not deleted by lifecycle processing");
// Let's try to get object info to see its details
match ecstore
.get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
.await
{
Ok(obj_info) => {
println!(
"Object info: name={}, size={}, mod_time={:?}",
obj_info.name, obj_info.size, obj_info.mod_time
);
}
Err(e) => {
println!("Error getting object info: {e:?}");
}
}
} else {
println!("✅ Object was successfully deleted by lifecycle processing");
}
assert!(!object_still_exists);
println!("✅ Object successfully expired");
// Stop scanner
let _ = scanner.stop().await;
println!("✅ Scanner stopped");
println!("Lifecycle expiry basic test completed");
}

View File

@@ -0,0 +1,133 @@
#![cfg(test)]
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::Client;
use aws_sdk_s3::config::{Credentials, Region};
use bytes::Bytes;
use serial_test::serial;
use std::error::Error;
use tokio::time::sleep;
const ENDPOINT: &str = "http://localhost:9000";
const ACCESS_KEY: &str = "rustfsadmin";
const SECRET_KEY: &str = "rustfsadmin";
const BUCKET: &str = "test-basic-bucket";
async fn create_aws_s3_client() -> Result<Client, Box<dyn Error>> {
let region_provider = RegionProviderChain::default_provider().or_else(Region::new("us-east-1"));
let shared_config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.region(region_provider)
.credentials_provider(Credentials::new(ACCESS_KEY, SECRET_KEY, None, None, "static"))
.endpoint_url(ENDPOINT)
.load()
.await;
let client = Client::from_conf(
aws_sdk_s3::Config::from(&shared_config)
.to_builder()
.force_path_style(true)
.build(),
);
Ok(client)
}
async fn setup_test_bucket(client: &Client) -> Result<(), Box<dyn Error>> {
match client.create_bucket().bucket(BUCKET).send().await {
Ok(_) => {}
Err(e) => {
let error_str = e.to_string();
if !error_str.contains("BucketAlreadyOwnedByYou") && !error_str.contains("BucketAlreadyExists") {
return Err(e.into());
}
}
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
#[ignore = "requires running RustFS server at localhost:9000"]
async fn test_bucket_lifecycle_configuration() -> Result<(), Box<dyn std::error::Error>> {
use aws_sdk_s3::types::{BucketLifecycleConfiguration, LifecycleExpiration, LifecycleRule, LifecycleRuleFilter};
use tokio::time::Duration;
let client = create_aws_s3_client().await?;
setup_test_bucket(&client).await?;
// Upload test object first
let test_content = "Test object for lifecycle expiration";
let lifecycle_object_key = "lifecycle-test-object.txt";
client
.put_object()
.bucket(BUCKET)
.key(lifecycle_object_key)
.body(Bytes::from(test_content.as_bytes()).into())
.send()
.await?;
// Verify object exists initially
let resp = client.get_object().bucket(BUCKET).key(lifecycle_object_key).send().await?;
assert!(resp.content_length().unwrap_or(0) > 0);
// Configure lifecycle rule: expire after current time + 3 seconds
let expiration = LifecycleExpiration::builder().days(0).build();
let filter = LifecycleRuleFilter::builder().prefix(lifecycle_object_key).build();
let rule = LifecycleRule::builder()
.id("expire-test-object")
.filter(filter)
.expiration(expiration)
.status(aws_sdk_s3::types::ExpirationStatus::Enabled)
.build()?;
let lifecycle = BucketLifecycleConfiguration::builder().rules(rule).build()?;
client
.put_bucket_lifecycle_configuration()
.bucket(BUCKET)
.lifecycle_configuration(lifecycle)
.send()
.await?;
// Verify lifecycle configuration was set
let resp = client.get_bucket_lifecycle_configuration().bucket(BUCKET).send().await?;
let rules = resp.rules();
assert!(rules.iter().any(|r| r.id().unwrap_or("") == "expire-test-object"));
// Wait for lifecycle processing (scanner runs every 1 second)
sleep(Duration::from_secs(3)).await;
// After lifecycle processing, the object should be deleted by the lifecycle rule
let get_result = client.get_object().bucket(BUCKET).key(lifecycle_object_key).send().await;
match get_result {
Ok(_) => {
panic!("Expected object to be deleted by lifecycle rule, but it still exists");
}
Err(e) => {
if let Some(service_error) = e.as_service_error() {
if service_error.is_no_such_key() {
println!("Lifecycle configuration test completed - object was successfully deleted by lifecycle rule");
} else {
panic!("Expected NoSuchKey error, but got: {e:?}");
}
} else {
panic!("Expected service error, but got: {e:?}");
}
}
}
println!("Lifecycle configuration test completed.");
Ok(())
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod lifecycle;
mod lock;
mod node_interact_test;
mod sql;

View File

@@ -516,7 +516,7 @@ impl TransitionState {
if let Err(err) = transition_object(api.clone(), &task.obj_info, LcAuditEvent::new(task.event.clone(), task.src.clone())).await {
if !is_err_version_not_found(&err) && !is_err_object_not_found(&err) && !is_network_or_host_down(&err.to_string(), false) && !err.to_string().contains("use of closed network connection") {
error!("Transition to {} failed for {}/{} version:{} with {}",
task.event.storage_class, task.obj_info.bucket, task.obj_info.name, task.obj_info.version_id.expect("err"), err.to_string());
task.event.storage_class, task.obj_info.bucket, task.obj_info.name, task.obj_info.version_id.map(|v| v.to_string()).unwrap_or_default(), err.to_string());
}
} else {
let mut ts = TierStats {
@@ -743,7 +743,7 @@ pub async fn transition_object(api: Arc<ECStore>, oi: &ObjectInfo, lae: LcAuditE
..Default::default()
},
//lifecycle_audit_event: lae,
version_id: Some(oi.version_id.expect("err").to_string()),
version_id: oi.version_id.map(|v| v.to_string()),
versioned: BucketVersioningSys::prefix_enabled(&oi.bucket, &oi.name).await,
version_suspended: BucketVersioningSys::prefix_suspended(&oi.bucket, &oi.name).await,
mod_time: oi.mod_time,
@@ -808,7 +808,7 @@ impl LifecycleOps for ObjectInfo {
lifecycle::ObjectOpts {
name: self.name.clone(),
user_tags: self.user_tags.clone(),
version_id: self.version_id.expect("err").to_string(),
version_id: self.version_id.map(|v| v.to_string()).unwrap_or_default(),
mod_time: self.mod_time,
size: self.size as usize,
is_latest: self.is_latest,
@@ -874,7 +874,11 @@ pub async fn eval_action_from_lifecycle(
if lock_enabled && enforce_retention_for_deletion(oi) {
//if serverDebugLog {
if oi.version_id.is_some() {
info!("lifecycle: {} v({}) is locked, not deleting", oi.name, oi.version_id.expect("err"));
info!(
"lifecycle: {} v({}) is locked, not deleting",
oi.name,
oi.version_id.map(|v| v.to_string()).unwrap_or_default()
);
} else {
info!("lifecycle: {} is locked, not deleting", oi.name);
}
@@ -928,7 +932,7 @@ pub async fn apply_expiry_on_non_transitioned_objects(
};
if lc_event.action.delete_versioned() {
opts.version_id = Some(oi.version_id.expect("err").to_string());
opts.version_id = oi.version_id.map(|v| v.to_string());
}
opts.versioned = BucketVersioningSys::prefix_enabled(&oi.bucket, &oi.name).await;

View File

@@ -27,6 +27,7 @@ use std::env;
use std::fmt::Display;
use time::macros::{datetime, offset};
use time::{self, Duration, OffsetDateTime};
use tracing::info;
use crate::bucket::lifecycle::rule::TransitionOps;
@@ -279,7 +280,12 @@ impl Lifecycle for BucketLifecycleConfiguration {
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event {
let mut events = Vec::<Event>::new();
info!(
"eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}",
obj.name, obj.mod_time, now, obj.is_latest, obj.delete_marker
);
if obj.mod_time.expect("err").unix_timestamp() == 0 {
info!("eval_inner: mod_time is 0, returning default event");
return Event::default();
}
@@ -418,7 +424,16 @@ impl Lifecycle for BucketLifecycleConfiguration {
}
}
if obj.is_latest && !obj.delete_marker {
info!(
"eval_inner: checking expiration condition - is_latest={}, delete_marker={}, version_id={:?}, condition_met={}",
obj.is_latest,
obj.delete_marker,
obj.version_id,
(obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker
);
// Allow expiration for latest objects OR non-versioned objects (empty version_id)
if (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker {
info!("eval_inner: entering expiration check");
if let Some(ref expiration) = rule.expiration {
if let Some(ref date) = expiration.date {
let date0 = OffsetDateTime::from(date.clone());
@@ -435,22 +450,29 @@ impl Lifecycle for BucketLifecycleConfiguration {
});
}
} else if let Some(days) = expiration.days {
if days != 0 {
let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.expect("err!"), days);
if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() {
let mut event = Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
};
/*if rule.expiration.expect("err!").delete_all.val {
event.action = IlmAction::DeleteAllVersionsAction
}*/
events.push(event);
}
let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.expect("err!"), days);
info!(
"eval_inner: expiration check - days={}, obj_time={:?}, expiry_time={:?}, now={:?}, should_expire={}",
days,
obj.mod_time.expect("err!"),
expected_expiry,
now,
now.unix_timestamp() > expected_expiry.unix_timestamp()
);
if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() {
info!("eval_inner: object should expire, adding DeleteAction");
let mut event = Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
};
/*if rule.expiration.expect("err!").delete_all.val {
event.action = IlmAction::DeleteAllVersionsAction
}*/
events.push(event);
}
}
}
@@ -598,7 +620,7 @@ impl LifecycleCalculate for Transition {
pub fn expected_expiry_time(mod_time: OffsetDateTime, days: i32) -> OffsetDateTime {
if days == 0 {
return mod_time;
return OffsetDateTime::UNIX_EPOCH; // Return epoch time to ensure immediate expiry
}
let t = mod_time
.to_offset(offset!(-0:00:00))

View File

@@ -3837,7 +3837,90 @@ impl StorageAPI for SetDisks {
return Ok(ObjectInfo::default());
}
unimplemented!()
// Create a single object deletion request
let mut vr = FileInfo {
name: object.to_string(),
version_id: opts.version_id.as_ref().and_then(|v| Uuid::parse_str(v).ok()),
..Default::default()
};
// Handle versioning
let (suspended, versioned) = (opts.version_suspended, opts.versioned);
if opts.version_id.is_none() && (suspended || versioned) {
vr.mod_time = Some(OffsetDateTime::now_utc());
vr.deleted = true;
if versioned {
vr.version_id = Some(Uuid::new_v4());
}
}
let vers = vec![FileInfoVersions {
name: vr.name.clone(),
versions: vec![vr.clone()],
..Default::default()
}];
let disks = self.disks.read().await;
let disks = disks.clone();
let write_quorum = disks.len() / 2 + 1;
let mut futures = Vec::with_capacity(disks.len());
let mut errs = Vec::with_capacity(disks.len());
for disk in disks.iter() {
let vers = vers.clone();
futures.push(async move {
if let Some(disk) = disk {
disk.delete_versions(bucket, vers, DeleteOptions::default()).await
} else {
Err(DiskError::DiskNotFound)
}
});
}
let results = join_all(futures).await;
for result in results {
match result {
Ok(disk_errs) => {
// Handle errors from disk operations
for err in disk_errs.iter().flatten() {
warn!("delete_object disk error: {:?}", err);
}
errs.push(None);
}
Err(e) => {
errs.push(Some(e));
}
}
}
// Check write quorum
if let Some(err) = reduce_write_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, write_quorum) {
return Err(to_object_err(err.into(), vec![bucket, object]));
}
// Create result ObjectInfo
let result_info = if vr.deleted {
ObjectInfo {
bucket: bucket.to_string(),
name: object.to_string(),
delete_marker: true,
mod_time: vr.mod_time,
version_id: vr.version_id,
..Default::default()
}
} else {
ObjectInfo {
bucket: bucket.to_string(),
name: object.to_string(),
version_id: vr.version_id,
..Default::default()
}
};
Ok(result_info)
}
#[tracing::instrument(skip(self))]

View File

@@ -17,7 +17,7 @@ pub mod fileinfo;
mod filemeta;
mod filemeta_inline;
pub mod headers;
mod metacache;
pub mod metacache;
pub mod test_data;