fix:#38 implement the basic storage functions of bucketmeta config use s3s struct define

This commit is contained in:
weisd
2024-10-14 14:58:32 +08:00
parent 0f39887cdb
commit 12114dce16
40 changed files with 920 additions and 1234 deletions

398
Cargo.lock generated
View File

@@ -4,19 +4,13 @@ version = 3
[[package]]
name = "addr2line"
version = "0.22.0"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678"
checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "adler2"
version = "2.0.0"
@@ -45,9 +39,9 @@ dependencies = [
[[package]]
name = "anstream"
version = "0.6.14"
version = "0.6.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b"
checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526"
dependencies = [
"anstyle",
"anstyle-parse",
@@ -66,49 +60,49 @@ checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1"
[[package]]
name = "anstyle-parse"
version = "0.2.4"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4"
checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.0"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391"
checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a"
dependencies = [
"windows-sys",
"windows-sys 0.52.0",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.3"
version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19"
checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8"
dependencies = [
"anstyle",
"windows-sys",
"windows-sys 0.52.0",
]
[[package]]
name = "anyhow"
version = "1.0.88"
version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e1496f8fb1fbf272686b8d37f523dab3e4a7443300055e74cdaa449f3114356"
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
[[package]]
name = "arrayvec"
version = "0.7.4"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "async-stream"
version = "0.3.5"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
dependencies = [
"async-stream-impl",
"futures-core",
@@ -117,9 +111,9 @@ dependencies = [
[[package]]
name = "async-stream-impl"
version = "0.3.5"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
@@ -154,15 +148,15 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "autocfg"
version = "1.3.0"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "axum"
version = "0.7.5"
version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf"
checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae"
dependencies = [
"async-trait",
"axum-core",
@@ -180,16 +174,16 @@ dependencies = [
"rustversion",
"serde",
"sync_wrapper 1.0.1",
"tower",
"tower 0.5.1",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.4.3"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3"
checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199"
dependencies = [
"async-trait",
"bytes",
@@ -200,7 +194,7 @@ dependencies = [
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper 0.1.2",
"sync_wrapper 1.0.1",
"tower-layer",
"tower-service",
]
@@ -218,17 +212,17 @@ dependencies = [
[[package]]
name = "backtrace"
version = "0.3.73"
version = "0.3.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a"
checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide 0.7.4",
"miniz_oxide",
"object",
"rustc-demangle",
"windows-targets",
]
[[package]]
@@ -297,9 +291,12 @@ dependencies = [
[[package]]
name = "cc"
version = "1.0.100"
version = "1.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c891175c3fb232128f48de6590095e59198bbeb8620c310be349bfc3afd12c7b"
checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945"
dependencies = [
"shlex",
]
[[package]]
name = "cfg-if"
@@ -352,15 +349,15 @@ dependencies = [
[[package]]
name = "clap_lex"
version = "0.7.1"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70"
checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "colorchoice"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422"
checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
[[package]]
name = "common"
@@ -374,9 +371,9 @@ dependencies = [
[[package]]
name = "cpufeatures"
version = "0.2.12"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504"
checksum = "608697df725056feaccfa42cffdaeeec3fccc4ffc38358ecd19b243e716a78e0"
dependencies = [
"libc",
]
@@ -442,7 +439,7 @@ dependencies = [
"serde_json",
"tokio",
"tonic",
"tower",
"tower 0.4.13",
"url",
]
@@ -473,6 +470,7 @@ dependencies = [
"rmp",
"rmp-serde",
"s3s",
"s3s-policy",
"serde",
"serde_json",
"sha2",
@@ -483,7 +481,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tonic",
"tower",
"tower 0.4.13",
"tracing",
"tracing-error",
"transform-stream",
@@ -511,14 +509,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba"
dependencies = [
"libc",
"windows-sys",
"windows-sys 0.52.0",
]
[[package]]
name = "fastrand"
version = "2.1.0"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]]
name = "fixedbitset"
@@ -538,12 +536,12 @@ dependencies = [
[[package]]
name = "flate2"
version = "1.0.32"
version = "1.0.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c0596c1eac1f9e04ed902702e9878208b336edc9d6fddc8a48387349bab3666"
checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0"
dependencies = [
"crc32fast",
"miniz_oxide 0.8.0",
"miniz_oxide",
]
[[package]]
@@ -688,9 +686,9 @@ dependencies = [
[[package]]
name = "gimli"
version = "0.29.0"
version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "gloo-timers"
@@ -716,7 +714,7 @@ dependencies = [
"futures-core",
"futures-sink",
"http",
"indexmap 2.2.6",
"indexmap 2.6.0",
"slab",
"tokio",
"tokio-util",
@@ -734,9 +732,9 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.14.5"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
[[package]]
name = "heck"
@@ -805,9 +803,9 @@ dependencies = [
[[package]]
name = "httparse"
version = "1.9.4"
version = "1.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9"
checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946"
[[package]]
name = "httpdate"
@@ -890,12 +888,13 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.2.6"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26"
checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da"
dependencies = [
"equivalent",
"hashbrown 0.14.5",
"hashbrown 0.15.0",
"serde",
]
[[package]]
@@ -909,15 +908,15 @@ dependencies = [
[[package]]
name = "is_terminal_polyfill"
version = "1.70.0"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "itertools"
version = "0.10.5"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
dependencies = [
"either",
]
@@ -930,9 +929,9 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "js-sys"
version = "0.3.70"
version = "0.3.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a"
checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9"
dependencies = [
"wasm-bindgen",
]
@@ -945,9 +944,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "libc"
version = "0.2.155"
version = "0.2.159"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5"
[[package]]
name = "libm"
@@ -1039,15 +1038,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08"
dependencies = [
"adler",
]
[[package]]
name = "miniz_oxide"
version = "0.8.0"
@@ -1066,14 +1056,14 @@ dependencies = [
"hermit-abi",
"libc",
"wasi",
"windows-sys",
"windows-sys 0.52.0",
]
[[package]]
name = "multimap"
version = "0.8.3"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
[[package]]
name = "netif"
@@ -1147,18 +1137,18 @@ checksum = "cf70ee2d9b1737d1836c20d9f8f96ec3901b2bf92128439db13237ddce9173a5"
[[package]]
name = "object"
version = "0.36.0"
version = "0.36.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434"
checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.19.0"
version = "1.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775"
[[package]]
name = "openssl"
@@ -1278,23 +1268,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
dependencies = [
"fixedbitset",
"indexmap 2.2.6",
"indexmap 2.6.0",
]
[[package]]
name = "pin-project"
version = "1.1.5"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3"
checksum = "baf123a161dde1e524adf36f90bc5d8d3462824a9c43553ad07a8183161189ec"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.5"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
checksum = "a4502d8515ca9f32f1fb543d987f63d95a14934883db45bdb48060b6b69257f8"
dependencies = [
"proc-macro2",
"quote",
@@ -1315,9 +1305,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.30"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2"
[[package]]
name = "powerfmt"
@@ -1327,9 +1317,12 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
version = "0.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04"
dependencies = [
"zerocopy",
]
[[package]]
name = "prettyplease"
@@ -1343,9 +1336,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.86"
version = "1.0.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77"
checksum = "b3e4daa0dcf6feba26f985457cdf104d4b4256fc5a09547140f3631bb076b19a"
dependencies = [
"unicode-ident",
]
@@ -1435,14 +1428,14 @@ dependencies = [
"tokio",
"tonic",
"tonic-build",
"tower",
"tower 0.4.13",
]
[[package]]
name = "quick-xml"
version = "0.36.1"
version = "0.36.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc"
checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe"
dependencies = [
"memchr",
"serde",
@@ -1450,9 +1443,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.36"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7"
checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af"
dependencies = [
"proc-macro2",
]
@@ -1567,7 +1560,7 @@ dependencies = [
"libc",
"spin",
"untrusted",
"windows-sys",
"windows-sys 0.52.0",
]
[[package]]
@@ -1600,9 +1593,9 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustc_version"
version = "0.4.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
dependencies = [
"semver",
]
@@ -1642,7 +1635,7 @@ dependencies = [
"tonic",
"tonic-build",
"tonic-reflection",
"tower",
"tower 0.4.13",
"tracing",
"tracing-error",
"tracing-subscriber",
@@ -1652,22 +1645,22 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.38.34"
version = "0.38.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f"
checksum = "8acb788b847c24f28525660c4d7758620a7210875711f79e7f663cc152726811"
dependencies = [
"bitflags 2.6.0",
"errno",
"libc",
"linux-raw-sys",
"windows-sys",
"windows-sys 0.52.0",
]
[[package]]
name = "rustls"
version = "0.23.12"
version = "0.23.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044"
checksum = "415d9944693cb90382053259f89fbb077ea730ad7273047ec63b19bc9b160ba8"
dependencies = [
"log",
"once_cell",
@@ -1680,25 +1673,24 @@ dependencies = [
[[package]]
name = "rustls-pemfile"
version = "2.1.3"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425"
checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50"
dependencies = [
"base64",
"rustls-pki-types",
]
[[package]]
name = "rustls-pki-types"
version = "1.8.0"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0"
checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55"
[[package]]
name = "rustls-webpki"
version = "0.102.6"
version = "0.102.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e"
checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9"
dependencies = [
"ring",
"rustls-pki-types",
@@ -1719,9 +1711,8 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
[[package]]
name = "s3s"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa54e3b4b4791c8c62291516997866b4f265c3fcbfdbcdd0b8da62896fba8bfa"
version = "0.11.0-dev"
source = "git+https://github.com/Nugine/s3s.git?rev=0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f#0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f"
dependencies = [
"arrayvec",
"async-trait",
@@ -1757,13 +1748,24 @@ dependencies = [
"thiserror",
"time",
"tokio",
"tower",
"tower 0.5.1",
"tracing",
"transform-stream",
"urlencoding",
"zeroize",
]
[[package]]
name = "s3s-policy"
version = "0.11.0-dev"
source = "git+https://github.com/Nugine/s3s.git?rev=0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f#0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f"
dependencies = [
"indexmap 2.6.0",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@@ -1861,6 +1863,12 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
@@ -1872,9 +1880,9 @@ dependencies = [
[[package]]
name = "simdutf8"
version = "0.1.4"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e"
[[package]]
name = "siphasher"
@@ -1904,7 +1912,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
dependencies = [
"libc",
"windows-sys",
"windows-sys 0.52.0",
]
[[package]]
@@ -1927,9 +1935,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "2.0.76"
version = "2.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578e081a14e0cefc3279b0472138c513f37b41a08d5a3cca9b6e4e8ceb6cd525"
checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590"
dependencies = [
"proc-macro2",
"quote",
@@ -1950,15 +1958,15 @@ checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394"
[[package]]
name = "tempfile"
version = "3.11.0"
version = "3.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fcd239983515c23a32fb82099f97d0b11b8c72f654ed659363a95c3dad7a53"
checksum = "f0f2c9fc62d0beef6951ccffd757e241266a2c833136efbe35af6cd2567dca5b"
dependencies = [
"cfg-if",
"fastrand",
"once_cell",
"rustix",
"windows-sys",
"windows-sys 0.59.0",
]
[[package]]
@@ -2024,9 +2032,9 @@ dependencies = [
[[package]]
name = "tinyvec"
version = "1.6.1"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c55115c6fbe2d2bef26eb09ad74bde02d8255476fc0c7b515ef09fbb35742d82"
checksum = "445e881f4f6d382d5f27c034e25eb92edd7c784ceab92a0937db7f2e9471b938"
dependencies = [
"tinyvec_macros",
]
@@ -2051,7 +2059,7 @@ dependencies = [
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys",
"windows-sys 0.52.0",
]
[[package]]
@@ -2127,7 +2135,7 @@ dependencies = [
"tokio",
"tokio-rustls",
"tokio-stream",
"tower",
"tower 0.4.13",
"tower-layer",
"tower-service",
"tracing",
@@ -2180,6 +2188,20 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f"
dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"sync_wrapper 0.1.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-layer"
version = "0.3.3"
@@ -2288,21 +2310,21 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "unicode-bidi"
version = "0.3.15"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75"
checksum = "5ab17db44d7388991a428b2ee655ce0c212e862eff1768a455c58f9aad6e7893"
[[package]]
name = "unicode-ident"
version = "1.0.12"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe"
[[package]]
name = "unicode-normalization"
version = "0.1.23"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5"
checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956"
dependencies = [
"tinyvec",
]
@@ -2373,9 +2395,9 @@ checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.4"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "vsimd"
@@ -2400,9 +2422,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.93"
version = "0.2.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5"
checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e"
dependencies = [
"cfg-if",
"once_cell",
@@ -2411,9 +2433,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.93"
version = "0.2.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b"
checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358"
dependencies = [
"bumpalo",
"log",
@@ -2426,9 +2448,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.93"
version = "0.2.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf"
checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -2436,9 +2458,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.93"
version = "0.2.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836"
checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68"
dependencies = [
"proc-macro2",
"quote",
@@ -2449,9 +2471,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.93"
version = "0.2.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484"
checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d"
[[package]]
name = "winapi"
@@ -2485,10 +2507,19 @@ dependencies = [
]
[[package]]
name = "windows-targets"
version = "0.52.5"
name = "windows-sys"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb"
checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
@@ -2502,51 +2533,51 @@ dependencies = [
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_i686_gnu"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_msvc"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.5"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "xxhash-rust"
@@ -2554,6 +2585,27 @@ version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984"
[[package]]
name = "zerocopy"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"byteorder",
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "zeroize"
version = "1.8.1"

View File

@@ -46,7 +46,10 @@ prost-types = "0.13.3"
protobuf = "3.6"
protos = { path = "./common/protos" }
rand = "0.8.5"
s3s = { version = "0.10.1", default-features = true, features = ["tower"] }
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f", default-features = true, features = [
"tower",
] }
s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f" }
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
thiserror = "1.0.64"

View File

@@ -20,10 +20,11 @@ serde.workspace = true
time.workspace = true
serde_json.workspace = true
tracing-error.workspace = true
s3s.workspace = true
http.workspace = true
url.workspace = true
uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] }
reed-solomon-erasure = { version = "6.0.0", features = [ "simd-accel" ] }
reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] }
transform-stream = "0.3.0"
lazy_static.workspace = true
lock.workspace = true
@@ -33,7 +34,6 @@ path-absolutize = "3.1.1"
protos.workspace = true
rmp-serde = "1.3.0"
tokio-util = { version = "0.7.12", features = ["io"] }
s3s = "0.10.0"
crc32fast = "1.4.2"
siphasher = "1.0.1"
base64-simd = "0.8.0"
@@ -48,6 +48,7 @@ rmp = "0.8.14"
byteorder = "1.5.0"
xxhash-rust = { version = "0.8.12", features = ["xxh64"] }
num_cpus = "1.16"
s3s-policy.workspace = true
[target.'cfg(not(windows))'.dependencies]
openssl = "0.10.66"

View File

@@ -1,59 +0,0 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
// 定义Algorithm枚举类型
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Algorithm {
AES256,
AWSKms,
}
// 实现从字符串到Algorithm的转换
impl std::str::FromStr for Algorithm {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"AES256" => Ok(Algorithm::AES256),
"aws:kms" => Ok(Algorithm::AWSKms),
_ => Err(format!("未知的 SSE 算法: {}", s)),
}
}
}
// 定义EncryptionAction结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct EncryptionAction {
algorithm: Option<Algorithm>,
master_key_id: Option<String>,
}
// 定义Rule结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Rule {
default_encryption_action: EncryptionAction,
}
// 定义BucketSSEConfig结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct BucketSSEConfig {
xml_ns: String,
xml_name: String,
rules: Vec<Rule>,
}
impl BucketSSEConfig {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: BucketSSEConfig = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,87 +0,0 @@
mod name;
use crate::error::Result;
use name::Name;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
// 定义common结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
struct Common {
pub id: String,
pub filter: S3Key,
pub events: Vec<Name>,
}
// 定义Queue结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
struct Queue {
pub common: Common,
pub arn: Arn,
}
// 定义ARN结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Arn {
pub target_id: TargetID,
pub region: String,
}
// 定义TargetID结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct TargetID {
pub id: String,
pub name: String,
}
// 定义FilterRule结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct FilterRule {
pub name: String,
pub value: String,
}
// 定义FilterRuleList结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct FilterRuleList {
pub rules: Vec<FilterRule>,
}
// 定义S3Key结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct S3Key {
pub rule_list: FilterRuleList,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Lambda {
arn: String,
}
// 定义Topic结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Topic {
arn: String,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Config {
queue_list: Vec<Queue>,
lambda_list: Vec<Lambda>,
topic_list: Vec<Topic>,
}
impl Config {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Config = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,98 +0,0 @@
use std::fmt::Display;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Name {
ObjectAccessedGet = 1,
ObjectAccessedGetRetention,
ObjectAccessedGetLegalHold,
ObjectAccessedHead,
ObjectAccessedAttributes,
ObjectCreatedCompleteMultipartUpload,
ObjectCreatedCopy,
ObjectCreatedPost,
ObjectCreatedPut,
ObjectCreatedPutRetention,
ObjectCreatedPutLegalHold,
ObjectCreatedPutTagging,
ObjectCreatedDeleteTagging,
ObjectRemovedDelete,
ObjectRemovedDeleteMarkerCreated,
ObjectRemovedDeleteAllVersions,
ObjectRemovedNoOP,
BucketCreated,
BucketRemoved,
ObjectReplicationFailed,
ObjectReplicationComplete,
ObjectReplicationMissedThreshold,
ObjectReplicationReplicatedAfterThreshold,
ObjectReplicationNotTracked,
ObjectRestorePost,
ObjectRestoreCompleted,
ObjectTransitionFailed,
ObjectTransitionComplete,
ObjectManyVersions,
ObjectLargeVersions,
PrefixManyFolders,
ILMDelMarkerExpirationDelete,
ObjectAccessedAll,
ObjectCreatedAll,
ObjectRemovedAll,
ObjectReplicationAll,
ObjectRestoreAll,
ObjectTransitionAll,
ObjectScannerAll,
Everything,
}
impl Display for Name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match *self {
Name::ObjectAccessedGet => "s3:ObjectAccessed:Get",
Name::ObjectAccessedGetRetention => "s3:ObjectAccessed:GetRetention",
Name::ObjectAccessedGetLegalHold => "s3:ObjectAccessed:GetLegalHold",
Name::ObjectAccessedHead => "s3:ObjectAccessed:Head",
Name::ObjectAccessedAttributes => "s3:ObjectAccessed:Attributes",
Name::ObjectCreatedCompleteMultipartUpload => "s3:ObjectCreated:CompleteMultipartUpload",
Name::ObjectCreatedCopy => "s3:ObjectCreated:Copy",
Name::ObjectCreatedPost => "s3:ObjectCreated:Post",
Name::ObjectCreatedPut => "s3:ObjectCreated:Put",
Name::ObjectCreatedPutRetention => "s3:ObjectCreated:PutRetention",
Name::ObjectCreatedPutLegalHold => "s3:ObjectCreated:PutLegalHold",
Name::ObjectCreatedPutTagging => "s3:ObjectCreated:PutTagging",
Name::ObjectCreatedDeleteTagging => "s3:ObjectCreated:DeleteTagging",
Name::ObjectRemovedDelete => "s3:ObjectRemoved:Delete",
Name::ObjectRemovedDeleteMarkerCreated => "s3:ObjectRemoved:DeleteMarkerCreated",
Name::ObjectRemovedDeleteAllVersions => "s3:ObjectRemoved:DeleteAllVersions",
Name::ObjectRemovedNoOP => "s3:ObjectRemoved:NoOP",
Name::BucketCreated => "s3:BucketCreated:*",
Name::BucketRemoved => "s3:BucketRemoved:*",
Name::ObjectReplicationFailed => "s3:Replication:OperationFailedReplication",
Name::ObjectReplicationComplete => "s3:Replication:OperationCompletedReplication",
Name::ObjectReplicationMissedThreshold => "s3:Replication:OperationMissedThreshold",
Name::ObjectReplicationReplicatedAfterThreshold => "s3:Replication:OperationReplicatedAfterThreshold",
Name::ObjectReplicationNotTracked => "s3:Replication:OperationNotTracked",
Name::ObjectRestorePost => "s3:ObjectRestore:Post",
Name::ObjectRestoreCompleted => "s3:ObjectRestore:Completed",
Name::ObjectTransitionFailed => "s3:ObjectTransition:Failed",
Name::ObjectTransitionComplete => "s3:ObjectTransition:Complete",
Name::ObjectManyVersions => "s3:Scanner:ManyVersions",
Name::ObjectLargeVersions => "s3:Scanner:LargeVersions",
Name::PrefixManyFolders => "s3:Scanner:BigPrefix",
Name::ILMDelMarkerExpirationDelete => "s3:LifecycleDelMarkerExpiration:Delete",
Name::ObjectAccessedAll => "s3:ObjectAccessed:*",
Name::ObjectCreatedAll => "s3:ObjectCreated:*",
Name::ObjectRemovedAll => "s3:ObjectRemoved:*",
Name::ObjectReplicationAll => "s3:Replication:*",
Name::ObjectRestoreAll => "s3:ObjectRestore:*",
Name::ObjectTransitionAll => "s3:ObjectTransition:*",
Name::ObjectScannerAll => "s3:Scanner:*",
Name::Everything => "*",
}
)
}
}

View File

@@ -1,9 +0,0 @@
use super::{prefix::Prefix, tag::Tag};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct And {
pub object_size_greater_than: i64,
pub object_size_less_than: i64,
pub prefix: Prefix,
pub tags: Vec<Tag>,
}

View File

@@ -1,6 +0,0 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct DelMarkerExpiration {
pub days: usize,
}

View File

@@ -1,27 +0,0 @@
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
// ExpirationDays is a type alias to unmarshal Days in Expiration
pub type ExpirationDays = usize;
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct ExpirationDate(Option<OffsetDateTime>);
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct ExpireDeleteMarker {
pub marker: Boolean,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Boolean {
pub val: bool,
pub set: bool,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Expiration {
pub days: Option<ExpirationDays>,
pub date: Option<ExpirationDate>,
pub delete_marker: ExpireDeleteMarker,
pub delete_all: Boolean,
pub set: bool,
}

View File

@@ -1,23 +0,0 @@
use std::collections::HashMap;
use super::{and::And, prefix::Prefix, tag::Tag};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Filter {
pub set: bool,
pub prefix: Prefix,
pub object_size_greater_than: Option<i64>,
pub object_size_less_than: Option<i64>,
pub and_condition: And,
pub and_set: bool,
pub tag: Tag,
pub tag_set: bool,
// 使用HashMap存储缓存的标签
pub cached_tags: HashMap<String, String>,
}

View File

@@ -1,26 +0,0 @@
use super::rule::Rule;
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Lifecycle {
pub rules: Vec<Rule>,
pub expiry_updated_at: Option<OffsetDateTime>,
}
impl Lifecycle {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Lifecycle = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,10 +0,0 @@
mod and;
mod delmarker;
mod expiration;
mod fileter;
pub(crate) mod lifecycle;
mod noncurrentversion;
mod prefix;
mod rule;
mod tag;
mod transition;

View File

@@ -1,16 +0,0 @@
use super::{expiration::ExpirationDays, transition::TransitionDays};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct NoncurrentVersionExpiration {
pub noncurrent_days: ExpirationDays,
pub newer_noncurrent_versions: usize,
set: bool,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct NoncurrentVersionTransition {
pub noncurrent_days: TransitionDays,
pub storage_class: String,
set: bool,
}

View File

@@ -1,7 +0,0 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Prefix {
pub val: String,
pub set: bool,
}

View File

@@ -1,29 +0,0 @@
use super::{
delmarker::DelMarkerExpiration,
expiration::Expiration,
fileter::Filter,
noncurrentversion::{NoncurrentVersionExpiration, NoncurrentVersionTransition},
prefix::Prefix,
transition::Transition,
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub enum Status {
#[default]
Enabled,
Disabled,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Rule {
pub id: String,
pub status: Status,
pub filter: Filter,
pub prefix: Prefix,
pub pxpiration: Expiration,
pub transition: Transition,
pub del_marker_expiration: DelMarkerExpiration,
pub noncurrent_version_expiration: NoncurrentVersionExpiration,
pub noncurrent_version_transition: NoncurrentVersionTransition,
}

View File

@@ -1,7 +0,0 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Tag {
pub key: String,
pub value: String,
}

View File

@@ -1,16 +0,0 @@
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
pub type TransitionDays = usize;
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct TransitionDate(Option<OffsetDateTime>);
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Transition {
pub days: Option<TransitionDays>,
pub date: Option<TransitionDate>,
pub storage_class: String,
pub set: bool,
}

View File

@@ -1,17 +1,19 @@
use super::{
encryption::BucketSSEConfig, event, lifecycle::lifecycle::Lifecycle, objectlock, policy::bucket_policy::BucketPolicy,
quota::BucketQuota, replication, tags::Tags, target::BucketTargets, versioning::Versioning,
};
use super::{quota::BucketQuota, target::BucketTargets};
use byteorder::{BigEndian, ByteOrder, LittleEndian};
use rmp_serde::Serializer as rmpSerializer;
use s3s::dto::{
BucketLifecycleConfiguration, NotificationConfiguration, ObjectLockConfiguration, ReplicationConfiguration,
ServerSideEncryptionConfiguration, Tagging, VersioningConfiguration,
};
use s3s::xml;
use s3s_policy::model::Policy;
use serde::Serializer;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use time::OffsetDateTime;
use tracing::{error, warn};
use crate::bucket::tags;
use crate::config;
use crate::config::common::{read_config, save_config};
use crate::error::{Error, Result};
@@ -34,7 +36,7 @@ pub const BUCKET_VERSIONING_CONFIG: &str = "versioning.xml";
pub const BUCKET_REPLICATION_CONFIG: &str = "replication.xml";
pub const BUCKET_TARGETS_FILE: &str = "bucket-targets.json";
#[derive(Debug, Deserialize, Serialize, Clone)]
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "PascalCase", default)]
pub struct BucketMetadata {
pub name: String,
@@ -68,23 +70,23 @@ pub struct BucketMetadata {
pub new_field_updated_at: OffsetDateTime,
#[serde(skip)]
pub policy_config: Option<BucketPolicy>,
pub policy_config: Option<Policy>,
#[serde(skip)]
pub notification_config: Option<event::Config>,
pub notification_config: Option<NotificationConfiguration>,
#[serde(skip)]
pub lifecycle_config: Option<Lifecycle>,
pub lifecycle_config: Option<BucketLifecycleConfiguration>,
#[serde(skip)]
pub object_lock_config: Option<objectlock::Config>,
pub object_lock_config: Option<ObjectLockConfiguration>,
#[serde(skip)]
pub versioning_config: Option<Versioning>,
pub versioning_config: Option<VersioningConfiguration>,
#[serde(skip)]
pub sse_config: Option<BucketSSEConfig>,
pub sse_config: Option<ServerSideEncryptionConfiguration>,
#[serde(skip)]
pub tagging_config: Option<Tags>,
pub tagging_config: Option<Tagging>,
#[serde(skip)]
pub quota_config: Option<BucketQuota>,
#[serde(skip)]
pub replication_config: Option<replication::Config>,
pub replication_config: Option<ReplicationConfiguration>,
#[serde(skip)]
pub bucket_target_config: Option<BucketTargets>,
#[serde(skip)]
@@ -296,32 +298,32 @@ impl BucketMetadata {
fn parse_all_configs(&mut self, _api: &ECStore) -> Result<()> {
if !self.policy_config_json.is_empty() {
self.policy_config = Some(BucketPolicy::unmarshal(&self.policy_config_json)?);
self.policy_config = Some(serde_json::from_slice(&self.policy_config_json)?);
}
if !self.notification_config_xml.is_empty() {
self.notification_config = Some(event::Config::unmarshal(&self.notification_config_xml)?);
self.notification_config = Some(deserialize::<NotificationConfiguration>(&self.notification_config_xml)?);
}
if !self.lifecycle_config_xml.is_empty() {
self.lifecycle_config = Some(Lifecycle::unmarshal(&self.lifecycle_config_xml)?);
self.lifecycle_config = Some(deserialize::<BucketLifecycleConfiguration>(&self.lifecycle_config_xml)?);
}
if !self.object_lock_config_xml.is_empty() {
self.object_lock_config = Some(objectlock::Config::unmarshal(&self.object_lock_config_xml)?);
self.object_lock_config = Some(deserialize::<ObjectLockConfiguration>(&self.object_lock_config_xml)?);
}
if !self.versioning_config_xml.is_empty() {
self.versioning_config = Some(Versioning::unmarshal(&self.versioning_config_xml)?);
self.versioning_config = Some(deserialize::<VersioningConfiguration>(&self.versioning_config_xml)?);
}
if !self.encryption_config_xml.is_empty() {
self.sse_config = Some(BucketSSEConfig::unmarshal(&self.encryption_config_xml)?);
self.sse_config = Some(deserialize::<ServerSideEncryptionConfiguration>(&self.encryption_config_xml)?);
}
if !self.tagging_config_xml.is_empty() {
self.tagging_config = Some(tags::Tags::unmarshal(&self.tagging_config_xml)?);
self.tagging_config = Some(deserialize::<Tagging>(&self.tagging_config_xml)?);
}
if !self.quota_config_json.is_empty() {
self.quota_config = Some(BucketQuota::unmarshal(&self.quota_config_json)?);
}
if !self.replication_config_xml.is_empty() {
self.replication_config = Some(replication::Config::unmarshal(&self.replication_config_xml)?);
self.replication_config = Some(deserialize::<ReplicationConfiguration>(&self.replication_config_xml)?);
}
if !self.bucket_targets_config_json.is_empty() {
self.bucket_target_config = Some(BucketTargets::unmarshal(&self.bucket_targets_config_json)?);
@@ -413,3 +415,31 @@ mod test {
assert_eq!(bm.name, new.name);
}
}
pub fn deserialize<T>(input: &[u8]) -> xml::DeResult<T>
where
T: for<'xml> xml::Deserialize<'xml>,
{
let mut d = xml::Deserializer::new(input);
let ans = T::deserialize(&mut d)?;
d.expect_eof()?;
Ok(ans)
}
pub fn serialize_content<T: xml::SerializeContent>(val: &T) -> xml::SerResult<String> {
let mut buf = Vec::with_capacity(256);
{
let mut ser = xml::Serializer::new(&mut buf);
val.serialize_content(&mut ser)?;
}
Ok(String::from_utf8(buf).unwrap())
}
pub fn serialize<T: xml::Serialize>(val: &T) -> xml::SerResult<Vec<u8>> {
let mut buf = Vec::with_capacity(256);
{
let mut ser = xml::Serializer::new(&mut buf);
val.serialize(&mut ser)?;
}
Ok(buf)
}

View File

@@ -11,18 +11,20 @@ use crate::error::{Error, Result};
use crate::global::{is_dist_erasure, is_erasure, new_object_layer_fn, GLOBAL_Endpoints};
use crate::store::ECStore;
use futures::future::join_all;
use lazy_static::lazy_static;
use s3s::dto::{
BucketLifecycleConfiguration, NotificationConfiguration, ObjectLockConfiguration, ReplicationConfiguration,
ServerSideEncryptionConfiguration, Tagging, VersioningConfiguration,
};
use s3s_policy::model::Policy;
use time::OffsetDateTime;
use tokio::sync::RwLock;
use tracing::{error, warn};
use super::encryption::BucketSSEConfig;
use super::lifecycle::lifecycle::Lifecycle;
use super::metadata::{load_bucket_metadata, BucketMetadata};
use super::policy::bucket_policy::BucketPolicy;
use super::metadata::{deserialize, load_bucket_metadata, BucketMetadata};
use super::quota::BucketQuota;
use super::target::BucketTargets;
use super::{event, objectlock, replication, tags, versioning};
use lazy_static::lazy_static;
lazy_static! {
static ref GLOBAL_BucketMetadataSys: Arc<RwLock<BucketMetadataSys>> = Arc::new(RwLock::new(BucketMetadataSys::new()));
@@ -32,24 +34,95 @@ pub async fn init_bucket_metadata_sys(api: ECStore, buckets: Vec<String>) {
let mut sys = GLOBAL_BucketMetadataSys.write().await;
sys.init(api, buckets).await
}
pub async fn get_bucket_metadata_sys() -> Arc<RwLock<BucketMetadataSys>> {
pub(super) async fn get_bucket_metadata_sys() -> Arc<RwLock<BucketMetadataSys>> {
GLOBAL_BucketMetadataSys.clone()
}
pub async fn bucket_metadata_sys_set(bucket: String, bm: BucketMetadata) {
pub(crate) async fn set_bucket_metadata(bucket: String, bm: BucketMetadata) {
let sys = GLOBAL_BucketMetadataSys.write().await;
sys.set(bucket, bm).await
sys.set(bucket, Arc::new(bm)).await
}
pub async fn update(bucket: &str, config_file: &str, data: Vec<u8>) -> Result<OffsetDateTime> {
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let mut bucket_meta_sys = bucket_meta_sys_lock.write().await;
bucket_meta_sys.update(bucket, config_file, data).await
}
pub async fn delete(bucket: &str, config_file: &str) -> Result<OffsetDateTime> {
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let mut bucket_meta_sys = bucket_meta_sys_lock.write().await;
bucket_meta_sys.delete(&bucket, config_file).await
}
pub async fn get_tagging_config(bucket: &str) -> Result<(Tagging, OffsetDateTime)> {
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
bucket_meta_sys.get_tagging_config(bucket).await
}
pub async fn get_lifecycle_config(bucket: &str) -> Result<(BucketLifecycleConfiguration, OffsetDateTime)> {
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
bucket_meta_sys.get_lifecycle_config(bucket).await
}
pub async fn get_sse_config(bucket: &str) -> Result<(ServerSideEncryptionConfiguration, OffsetDateTime)> {
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
bucket_meta_sys.get_sse_config(bucket).await
}
pub async fn get_object_lock_config(bucket: &str) -> Result<(ObjectLockConfiguration, OffsetDateTime)> {
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
bucket_meta_sys.get_object_lock_config(bucket).await
}
pub async fn get_replication_config(bucket: &str) -> Result<(ReplicationConfiguration, OffsetDateTime)> {
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
bucket_meta_sys.get_replication_config(bucket).await
}
pub async fn get_notification_config(bucket: &str) -> Result<Option<NotificationConfiguration>> {
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
bucket_meta_sys.get_notification_config(bucket).await
}
pub async fn get_versioning_config(bucket: &str) -> Result<(VersioningConfiguration, OffsetDateTime)> {
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
bucket_meta_sys.get_versioning_config(bucket).await
}
pub async fn get_config_from_disk(bucket: &str) -> Result<BucketMetadata> {
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
bucket_meta_sys.get_config_from_disk(bucket).await
}
#[derive(Debug, Default)]
pub struct BucketMetadataSys {
metadata_map: RwLock<HashMap<String, BucketMetadata>>,
metadata_map: RwLock<HashMap<String, Arc<BucketMetadata>>>,
api: Option<ECStore>,
initialized: RwLock<bool>,
}
impl BucketMetadataSys {
fn new() -> Self {
pub fn new() -> Self {
Self::default()
}
@@ -108,7 +181,7 @@ impl BucketMetadataSys {
match res {
Ok(res) => {
if let Some(bucket) = buckets.get(idx) {
mp.insert(bucket.clone(), res);
mp.insert(bucket.clone(), Arc::new(res));
}
}
Err(e) => {
@@ -123,7 +196,7 @@ impl BucketMetadataSys {
}
}
pub async fn get(&self, bucket: &str) -> Result<BucketMetadata> {
pub async fn get(&self, bucket: &str) -> Result<Arc<BucketMetadata>> {
if is_meta_bucketname(bucket) {
return Err(Error::new(ConfigError::NotFound));
}
@@ -136,7 +209,7 @@ impl BucketMetadataSys {
}
}
pub async fn set(&self, bucket: String, bm: BucketMetadata) {
pub async fn set(&self, bucket: String, bm: Arc<BucketMetadata>) {
if !is_meta_bucketname(&bucket) {
let mut map = self.metadata_map.write().await;
map.insert(bucket, bm);
@@ -166,7 +239,7 @@ impl BucketMetadataSys {
};
if !meta.lifecycle_config_xml.is_empty() {
let cfg = Lifecycle::unmarshal(&meta.lifecycle_config_xml)?;
let cfg = deserialize::<BucketLifecycleConfiguration>(&meta.lifecycle_config_xml)?;
// TODO: FIXME:
// for _v in cfg.rules.iter() {
// break;
@@ -205,12 +278,12 @@ impl BucketMetadataSys {
let updated = bm.update_config(config_file, data)?;
self.save(&mut bm).await?;
self.save(bm).await?;
Ok(updated)
}
async fn save(&self, bm: &mut BucketMetadata) -> Result<()> {
async fn save(&self, bm: BucketMetadata) -> Result<()> {
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
@@ -222,9 +295,11 @@ impl BucketMetadataSys {
return Err(Error::msg("errInvalidArgument"));
}
let mut bm = bm;
bm.save(store).await?;
self.set(bm.name.clone(), bm.clone()).await;
self.set(bm.name.clone(), Arc::new(bm)).await;
Ok(())
}
@@ -245,7 +320,7 @@ impl BucketMetadataSys {
}
}
pub async fn get_config(&self, bucket: &str) -> Result<(BucketMetadata, bool)> {
pub async fn get_config(&self, bucket: &str) -> Result<(Arc<BucketMetadata>, bool)> {
if let Some(api) = self.api.as_ref() {
let has_bm = {
let map = self.metadata_map.read().await;
@@ -268,6 +343,7 @@ impl BucketMetadataSys {
let mut map = self.metadata_map.write().await;
let bm = Arc::new(bm);
map.insert(bucket.to_string(), bm.clone());
Ok((bm, true))
@@ -277,23 +353,27 @@ impl BucketMetadataSys {
}
}
pub async fn get_versioning_config(&self, bucket: &str) -> Result<(versioning::Versioning, OffsetDateTime)> {
pub async fn get_versioning_config(&self, bucket: &str) -> Result<(VersioningConfiguration, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
warn!("get_versioning_config err {:?}", &err);
if config::error::is_not_found(&err) {
return Ok((versioning::Versioning::default(), OffsetDateTime::UNIX_EPOCH));
return Ok((VersioningConfiguration::default(), OffsetDateTime::UNIX_EPOCH));
} else {
return Err(err);
}
}
};
Ok((bm.versioning_config.unwrap_or_default(), bm.versioning_config_updated_at))
if let Some(config) = &bm.versioning_config {
Ok((config.clone(), bm.versioning_config_updated_at))
} else {
Ok((VersioningConfiguration::default(), bm.versioning_config_updated_at))
}
}
pub async fn get_bucket_policy(&self, bucket: &str) -> Result<(BucketPolicy, OffsetDateTime)> {
pub async fn get_bucket_policy(&self, bucket: &str) -> Result<(Policy, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
@@ -306,14 +386,14 @@ impl BucketMetadataSys {
}
};
if let Some(config) = bm.policy_config {
Ok((config, bm.policy_config_updated_at))
if let Some(config) = &bm.policy_config {
Ok((config.clone(), bm.policy_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketPolicyNotFound))
}
}
pub async fn get_tagging_config(&self, bucket: &str) -> Result<(tags::Tags, OffsetDateTime)> {
pub async fn get_tagging_config(&self, bucket: &str) -> Result<(Tagging, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
@@ -326,14 +406,14 @@ impl BucketMetadataSys {
}
};
if let Some(config) = bm.tagging_config {
Ok((config, bm.tagging_config_updated_at))
if let Some(config) = &bm.tagging_config {
Ok((config.clone(), bm.tagging_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::TaggingNotFound))
}
}
pub async fn get_object_lock_config(&self, bucket: &str) -> Result<(objectlock::Config, OffsetDateTime)> {
pub async fn get_object_lock_config(&self, bucket: &str) -> Result<(ObjectLockConfiguration, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
@@ -346,14 +426,14 @@ impl BucketMetadataSys {
}
};
if let Some(config) = bm.object_lock_config {
Ok((config, bm.object_lock_config_updated_at))
if let Some(config) = &bm.object_lock_config {
Ok((config.clone(), bm.object_lock_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketObjectLockConfigNotFound))
}
}
pub async fn get_lifecycle_config(&self, bucket: &str) -> Result<(Lifecycle, OffsetDateTime)> {
pub async fn get_lifecycle_config(&self, bucket: &str) -> Result<(BucketLifecycleConfiguration, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
@@ -366,20 +446,20 @@ impl BucketMetadataSys {
}
};
if let Some(config) = bm.lifecycle_config {
if let Some(config) = &bm.lifecycle_config {
if config.rules.is_empty() {
Err(Error::new(BucketMetadataError::BucketLifecycleNotFound))
} else {
Ok((config, bm.lifecycle_config_updated_at))
Ok((config.clone(), bm.lifecycle_config_updated_at))
}
} else {
Err(Error::new(BucketMetadataError::BucketLifecycleNotFound))
}
}
pub async fn get_notification_config(&self, bucket: &str) -> Result<Option<event::Config>> {
pub async fn get_notification_config(&self, bucket: &str) -> Result<Option<NotificationConfiguration>> {
let bm = match self.get_config(bucket).await {
Ok((bm, _)) => bm.notification_config,
Ok((bm, _)) => bm.notification_config.clone(),
Err(err) => {
warn!("get_notification_config err {:?}", &err);
if config::error::is_not_found(&err) {
@@ -393,7 +473,7 @@ impl BucketMetadataSys {
Ok(bm)
}
pub async fn get_sse_config(&self, bucket: &str) -> Result<(BucketSSEConfig, OffsetDateTime)> {
pub async fn get_sse_config(&self, bucket: &str) -> Result<(ServerSideEncryptionConfiguration, OffsetDateTime)> {
let bm = match self.get_config(bucket).await {
Ok((res, _)) => res,
Err(err) => {
@@ -406,8 +486,8 @@ impl BucketMetadataSys {
}
};
if let Some(config) = bm.sse_config {
Ok((config, bm.encryption_config_updated_at))
if let Some(config) = &bm.sse_config {
Ok((config.clone(), bm.encryption_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketSSEConfigNotFound))
}
@@ -437,14 +517,14 @@ impl BucketMetadataSys {
}
};
if let Some(config) = bm.quota_config {
Ok((config, bm.quota_config_updated_at))
if let Some(config) = &bm.quota_config {
Ok((config.clone(), bm.quota_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketQuotaConfigNotFound))
}
}
pub async fn get_replication_config(&self, bucket: &str) -> Result<(replication::Config, OffsetDateTime)> {
pub async fn get_replication_config(&self, bucket: &str) -> Result<(ReplicationConfiguration, OffsetDateTime)> {
let (bm, reload) = match self.get_config(bucket).await {
Ok(res) => res,
Err(err) => {
@@ -457,12 +537,12 @@ impl BucketMetadataSys {
}
};
if let Some(config) = bm.replication_config {
if let Some(config) = &bm.replication_config {
if reload {
// TODO: globalBucketTargetSys
}
Ok((config, bm.replication_config_updated_at))
Ok((config.clone(), bm.replication_config_updated_at))
} else {
Err(Error::new(BucketMetadataError::BucketReplicationConfigNotFound))
}
@@ -481,12 +561,12 @@ impl BucketMetadataSys {
}
};
if let Some(config) = bm.bucket_target_config {
if let Some(config) = &bm.bucket_target_config {
if reload {
// TODO: globalBucketTargetSys
}
Ok(config)
Ok(config.clone())
} else {
Err(Error::new(BucketMetadataError::BucketRemoteTargetNotFound))
}

View File

@@ -1,18 +1,9 @@
mod encryption;
pub mod error;
mod event;
mod lifecycle;
pub mod metadata;
mod metadata_sys;
mod objectlock;
pub mod metadata_sys;
pub mod policy;
pub mod policy_sys;
mod quota;
mod replication;
pub mod tags;
mod target;
pub mod utils;
pub mod versioning;
pub mod versioning_sys;
pub use metadata_sys::{bucket_metadata_sys_set, get_bucket_metadata_sys, init_bucket_metadata_sys};

View File

@@ -1,56 +0,0 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, PartialEq, Eq, Hash, Clone)]
pub enum RetMode {
#[default]
Govenance,
Compliance,
}
// 为RetMode实现FromStr trait方便从字符串创建枚举实例
impl std::str::FromStr for RetMode {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"GOVERNANCE" => Ok(RetMode::Govenance),
"COMPLIANCE" => Ok(RetMode::Compliance),
_ => Err(format!("Invalid RetMode: {}", s)),
}
}
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct DefaultRetention {
pub mode: RetMode,
pub days: Option<usize>,
pub years: Option<usize>,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Rule {
pub default_retention: DefaultRetention,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Config {
pub object_lock_enabled: String,
pub rule: Option<Rule>,
}
impl Config {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Config = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -126,7 +126,7 @@ impl BPStatement {
let mut resource = args.bucket_name.clone();
if !args.object_name.is_empty() {
if !args.object_name.starts_with("/") {
if !args.object_name.starts_with('/') {
resource.push('/');
}

View File

@@ -61,7 +61,7 @@ impl Resource {
if self.rtype == ResourceARNType::UnknownARN {
return false;
}
if self.is_s3() && self.pattern.starts_with("/") {
if self.is_s3() && self.pattern.starts_with('/') {
return false;
}
if self.is_kms() && self.pattern.as_bytes().iter().any(|&v| v == b'/' || v == b'\\' || v == b'.') {
@@ -77,10 +77,10 @@ impl Resource {
self.rtype == ResourceARNType::ResourceARNKMS
}
pub fn is_bucket_pattern(&self) -> bool {
!self.pattern.contains("/") || self.pattern.eq("*")
!self.pattern.contains('/') || self.pattern.eq("*")
}
pub fn is_object_pattern(&self) -> bool {
self.pattern.contains("/") || self.pattern.contains("*")
self.pattern.contains('/') || self.pattern.contains('*')
}
pub fn is_match(&self, res: &str, condition_values: &HashMap<String, Vec<String>>) -> bool {
let mut pattern = res.to_string();

View File

@@ -1,27 +1,23 @@
use super::{
error::BucketMetadataError,
get_bucket_metadata_sys,
policy::bucket_policy::{BucketPolicy, BucketPolicyArgs},
};
use super::metadata_sys::get_bucket_metadata_sys;
use crate::error::Result;
use tracing::warn;
use s3s_policy::model::Policy;
pub struct PolicySys {}
impl PolicySys {
pub async fn is_allowed(args: &BucketPolicyArgs) -> bool {
match Self::get(&args.bucket_name).await {
Ok(cfg) => return cfg.is_allowed(args),
Err(err) => {
if !BucketMetadataError::BucketPolicyNotFound.is(&err) {
warn!("config get err {:?}", err);
}
}
}
// pub async fn is_allowed(args: &BucketPolicyArgs) -> bool {
// match Self::get(&args.bucket_name).await {
// Ok(cfg) => return cfg.is_allowed(args),
// Err(err) => {
// if !BucketMetadataError::BucketPolicyNotFound.is(&err) {
// warn!("config get err {:?}", err);
// }
// }
// }
args.is_owner
}
pub async fn get(bucket: &str) -> Result<BucketPolicy> {
// args.is_owner
// }
pub async fn get(bucket: &str) -> Result<Policy> {
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let bucket_meta_sys = bucket_meta_sys_lock.write().await;
@@ -30,3 +26,13 @@ impl PolicySys {
Ok(cfg)
}
}
// trait PolicyApi {
// fn is_allowed(&self) -> bool;
// }
// impl PolicyApi for Policy {
// fn is_allowed(&self) -> bool {
// todo!()
// }
// }

View File

@@ -1,9 +0,0 @@
use super::tag::Tag;
use serde::{Deserialize, Serialize};
// 定义And结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct And {
prefix: Option<String>,
tags: Option<Vec<Tag>>,
}

View File

@@ -1,12 +0,0 @@
use super::and::And;
use super::tag::Tag;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Filter {
prefix: String,
and: And,
tag: Tag,
cached_tags: HashMap<String, String>,
}

View File

@@ -1,30 +0,0 @@
mod and;
mod filter;
mod rule;
mod tag;
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use rule::Rule;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Config {
rules: Vec<Rule>,
role_arn: String,
}
impl Config {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Config = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,56 +0,0 @@
use super::filter::Filter;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub enum Status {
#[default]
Enabled,
Disabled,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct DeleteMarkerReplication {
pub status: Status,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct DeleteReplication {
pub status: Status,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct ExistingObjectReplication {
pub status: Status,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Destination {
pub bucket: String,
pub storage_class: String,
pub arn: String,
}
// 定义ReplicaModifications结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct ReplicaModifications {
status: Status,
}
// 定义SourceSelectionCriteria结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct SourceSelectionCriteria {
replica_modifications: ReplicaModifications,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Rule {
pub id: String,
pub status: Status,
pub priority: usize,
pub delete_marker_replication: DeleteMarkerReplication,
pub delete_replication: DeleteReplication,
pub destination: Destination,
pub source_selection_criteria: SourceSelectionCriteria,
pub filter: Filter,
pub existing_object_replication: ExistingObjectReplication,
}

View File

@@ -1,7 +0,0 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Tag {
pub key: Option<String>,
pub value: Option<String>,
}

View File

@@ -1,37 +0,0 @@
use crate::error::Result;
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
// 定义tagSet结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct TagSet {
pub tag_map: HashMap<String, String>,
pub is_object: bool,
}
// 定义tagging结构体
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Tags {
pub tag_set: TagSet,
}
impl Tags {
pub fn new(tag_map: HashMap<String, String>, is_object: bool) -> Self {
Self {
tag_set: TagSet { tag_map, is_object },
}
}
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Tags = rmp_serde::from_slice(buf)?;
Ok(t)
}
}

View File

@@ -1,140 +0,0 @@
use crate::{
error::{Error, Result},
utils,
};
use rmp_serde::Serializer as rmpSerializer;
use serde::{Deserialize, Serialize};
#[derive(Debug, thiserror::Error)]
pub enum VersioningErr {
#[error("too many excluded prefixes")]
TooManyExcludedPrefixes,
#[error("excluded prefixes extension supported only when versioning is enabled")]
ExcludedPrefixNotSupported,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default, Deserialize, Serialize)]
pub enum State {
#[default]
Suspended,
Enabled,
}
// 实现Display trait用于打印
impl std::fmt::Display for State {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{}",
match *self {
State::Enabled => "Enabled",
State::Suspended => "Suspended",
}
)
}
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct ExcludedPrefix {
pub prefix: String,
}
#[derive(Debug, Deserialize, Serialize, Default, Clone)]
pub struct Versioning {
pub status: State,
pub excluded_prefixes: Vec<ExcludedPrefix>,
pub exclude_folders: bool,
}
impl Versioning {
pub fn marshal_msg(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
self.serialize(&mut rmpSerializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn unmarshal(buf: &[u8]) -> Result<Self> {
let t: Versioning = rmp_serde::from_slice(buf)?;
Ok(t)
}
pub fn validate(&self) -> Result<()> {
match self.status {
State::Suspended => {
if !self.excluded_prefixes.is_empty() {
return Err(Error::new(VersioningErr::ExcludedPrefixNotSupported));
}
}
State::Enabled => {
if self.excluded_prefixes.len() > 10 {
return Err(Error::new(VersioningErr::TooManyExcludedPrefixes));
}
}
}
Ok(())
}
pub fn enabled(&self) -> bool {
self.status == State::Enabled
}
pub fn versioned(&self, prefix: &str) -> bool {
self.prefix_enabled(prefix) || self.prefix_suspended(prefix)
}
pub fn prefix_enabled(&self, prefix: &str) -> bool {
if self.status != State::Enabled {
return false;
}
if prefix.is_empty() {
return true;
}
if self.exclude_folders && prefix.ends_with("/") {
return false;
}
for sprefix in self.excluded_prefixes.iter() {
let full_prefix = format!("{}*", sprefix.prefix);
if utils::wildcard::match_simple(&full_prefix, prefix) {
return false;
}
}
true
}
pub fn suspended(&self) -> bool {
self.status == State::Suspended
}
pub fn prefix_suspended(&self, prefix: &str) -> bool {
if self.status == State::Suspended {
return true;
}
if self.status == State::Enabled {
if prefix.is_empty() {
return false;
}
if self.exclude_folders && prefix.starts_with("/") {
return true;
}
for sprefix in self.excluded_prefixes.iter() {
let full_prefix = format!("{}*", sprefix.prefix);
if utils::wildcard::match_simple(&full_prefix, prefix) {
return true;
}
}
}
false
}
pub fn prefixes_excluded(&self) -> bool {
!self.excluded_prefixes.is_empty() || self.exclude_folders
}
}

View File

@@ -1,7 +1,7 @@
use super::get_bucket_metadata_sys;
use super::versioning::Versioning;
use super::metadata_sys::get_bucket_metadata_sys;
use crate::disk::RUSTFS_META_BUCKET;
use crate::error::Result;
use s3s::dto::{BucketVersioningStatus, VersioningConfiguration};
use tracing::warn;
pub struct BucketVersioningSys {}
@@ -17,39 +17,39 @@ impl BucketVersioningSys {
}
}
pub async fn prefix_enabled(bucket: &str, prefix: &str) -> bool {
match Self::get(bucket).await {
Ok(res) => res.prefix_enabled(prefix),
Err(err) => {
warn!("{:?}", err);
false
}
}
}
// pub async fn prefix_enabled(bucket: &str, prefix: &str) -> bool {
// match Self::get(bucket).await {
// Ok(res) => res.prefix_enabled(prefix),
// Err(err) => {
// warn!("{:?}", err);
// false
// }
// }
// }
pub async fn suspended(bucket: &str) -> bool {
match Self::get(bucket).await {
Ok(res) => res.suspended(),
Err(err) => {
warn!("{:?}", err);
false
}
}
}
// pub async fn suspended(bucket: &str) -> bool {
// match Self::get(bucket).await {
// Ok(res) => res.suspended(),
// Err(err) => {
// warn!("{:?}", err);
// false
// }
// }
// }
pub async fn prefix_suspended(bucket: &str, prefix: &str) -> bool {
match Self::get(bucket).await {
Ok(res) => res.prefix_suspended(prefix),
Err(err) => {
warn!("{:?}", err);
false
}
}
}
// pub async fn prefix_suspended(bucket: &str, prefix: &str) -> bool {
// match Self::get(bucket).await {
// Ok(res) => res.prefix_suspended(prefix),
// Err(err) => {
// warn!("{:?}", err);
// false
// }
// }
// }
pub async fn get(bucket: &str) -> Result<Versioning> {
pub async fn get(bucket: &str) -> Result<VersioningConfiguration> {
if bucket == RUSTFS_META_BUCKET || bucket.starts_with(RUSTFS_META_BUCKET) {
return Ok(Versioning::default());
return Ok(VersioningConfiguration::default());
}
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
@@ -60,3 +60,16 @@ impl BucketVersioningSys {
Ok(cfg)
}
}
trait VersioningApi {
fn enabled(&self) -> bool;
}
impl VersioningApi for VersioningConfiguration {
fn enabled(&self) -> bool {
self.status
.as_ref()
.map(|v| v.as_str() == BucketVersioningStatus::ENABLED)
.is_some_and(|v| v)
}
}

View File

@@ -485,7 +485,7 @@ impl LocalDisk {
async fn write_all_public(&self, volume: &str, path: &str, data: Vec<u8>) -> Result<()> {
if volume == super::RUSTFS_META_BUCKET && path == super::FORMAT_CONFIG_FILE {
let mut format_info = self.format_info.write().await;
format_info.data = data.clone();
format_info.data.clone_from(&data);
}
let volume_dir = self.get_bucket_path(volume)?;

View File

@@ -222,7 +222,7 @@ impl MetaCacheEntry {
Ok(wr)
}
pub fn is_dir(&self) -> bool {
self.metadata.is_empty() && self.name.ends_with("/")
self.metadata.is_empty() && self.name.ends_with('/')
}
pub fn is_object(&self) -> bool {
!self.metadata.is_empty()

View File

@@ -74,7 +74,7 @@ fn reduce_errs(errs: &[Option<Error>], ignored_errs: &[Box<dyn CheckErrorFn>]) -
for (err, &count) in error_counts.iter() {
if count > max || (count == max && *err == nil) {
max = count;
max_err = err.clone();
max_err.clone_from(err);
}
}

View File

@@ -1,5 +1,7 @@
#![allow(clippy::map_entry)]
use crate::bucket::bucket_metadata_sys_set;
use crate::bucket::metadata;
use crate::bucket::metadata_sys::set_bucket_metadata;
use crate::disk::endpoint::EndpointType;
use crate::global::{is_dist_erasure, set_object_layer, GLOBAL_LOCAL_DISK_MAP, GLOBAL_LOCAL_DISK_SET_DRIVES};
use crate::store_api::ObjectIO;
@@ -22,6 +24,7 @@ use backon::{ExponentialBuilder, Retryable};
use common::globals::{GLOBAL_Local_Node_Name, GLOBAL_Rustfs_Host, GLOBAL_Rustfs_Port};
use futures::future::join_all;
use http::HeaderMap;
use s3s::dto::{ObjectLockConfiguration, ObjectLockEnabled};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
@@ -31,7 +34,7 @@ use time::OffsetDateTime;
use tokio::fs;
use tokio::sync::Semaphore;
use tracing::{debug, info};
use tracing::{debug, info, warn};
use uuid::Uuid;
#[derive(Debug, Clone)]
@@ -508,9 +511,24 @@ impl StorageAPI for ECStore {
self.peer_sys.make_bucket(bucket, opts).await?;
let mut meta = BucketMetadata::new(bucket);
warn!("make bucket opsts {:?}", &opts);
if opts.lock_enabled {
let cfg = ObjectLockConfiguration {
object_lock_enabled: Some(ObjectLockEnabled::from_static(ObjectLockEnabled::ENABLED)),
..Default::default()
};
meta.object_lock_config_xml = metadata::serialize::<ObjectLockConfiguration>(&cfg)?;
warn!("make bucket add object_lock_config_xml {:?}", &meta.object_lock_config_xml);
// FIXME: version config
}
meta.save(self).await?;
bucket_metadata_sys_set(bucket.to_string(), meta).await;
set_bucket_metadata(bucket.to_string(), meta).await;
// TODO: toObjectErr

View File

@@ -156,7 +156,7 @@ impl FileInfo {
ObjectInfo {
bucket: bucket.to_string(),
name: object.to_string(),
is_dir: object.starts_with("/"),
is_dir: object.starts_with('/'),
parity_blocks: self.erasure.parity_blocks,
data_blocks: self.erasure.data_blocks,
version_id: self.version_id,
@@ -258,7 +258,11 @@ pub enum BitrotAlgorithm {
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct MakeBucketOptions {
pub force_create: bool,
pub lock_enabled: bool,
pub versioning_enabled: bool,
pub force_create: bool, // Create buckets even if they are already created.
pub created_at: Option<OffsetDateTime>, // only for site replication
pub no_lock: bool,
}
pub struct DeleteBucketOptions {
@@ -554,6 +558,7 @@ pub trait StorageAPI: ObjectIO {
objects: Vec<ObjectToDelete>,
opts: ObjectOptions,
) -> Result<(Vec<DeletedObject>, Vec<Option<Error>>)>;
#[warn(clippy::too_many_arguments)]
async fn list_objects_v2(
&self,
bucket: &str,

View File

@@ -59,7 +59,7 @@ futures.workspace = true
futures-util.workspace = true
# uuid = { version = "1.8.0", features = ["v4", "fast-rng", "serde"] }
ecstore = { path = "../ecstore" }
s3s = "0.10.0"
s3s.workspace = true
clap = { version = "4.5.20", features = ["derive"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] }
hyper-util = { version = "0.1.9", features = [

View File

@@ -6,7 +6,7 @@ mod storage;
use clap::Parser;
use common::error::{Error, Result};
use ecstore::{
bucket::init_bucket_metadata_sys,
bucket::metadata_sys::init_bucket_metadata_sys,
endpoints::EndpointServerPools,
set_global_endpoints,
store::{init_local_disks, ECStore},
@@ -128,11 +128,11 @@ async fn run(opt: config::Opt) -> Result<()> {
info!("authentication is enabled {}, {}", &access_key, &secret_key);
b.set_auth(SimpleAuth::from_single(access_key, secret_key));
// Enable parsing virtual-hosted-style requests
if let Some(dm) = opt.domain_name {
info!("virtual-hosted-style requests are enabled use domain_name {}", &dm);
b.set_base_domain(dm);
}
// // Enable parsing virtual-hosted-style requests
// if let Some(dm) = opt.domain_name {
// info!("virtual-hosted-style requests are enabled use domain_name {}", &dm);
// b.set_base_domain(dm);
// }
// if domain_name.is_some() {
// info!(

View File

@@ -1,14 +1,17 @@
use bytes::Bytes;
use ecstore::bucket::error::BucketMetadataError;
use ecstore::bucket::get_bucket_metadata_sys;
use ecstore::bucket::metadata;
use ecstore::bucket::metadata::BUCKET_LIFECYCLE_CONFIG;
use ecstore::bucket::metadata::BUCKET_NOTIFICATION_CONFIG;
use ecstore::bucket::metadata::BUCKET_POLICY_CONFIG;
use ecstore::bucket::metadata::BUCKET_REPLICATION_CONFIG;
use ecstore::bucket::metadata::BUCKET_SSECONFIG;
use ecstore::bucket::metadata::BUCKET_TAGGING_CONFIG;
use ecstore::bucket::metadata::BUCKET_VERSIONING_CONFIG;
use ecstore::bucket::metadata::OBJECT_LOCK_CONFIG;
use ecstore::bucket::metadata_sys;
use ecstore::bucket::policy::bucket_policy::BucketPolicy;
use ecstore::bucket::policy_sys::PolicySys;
use ecstore::bucket::tags::Tags;
use ecstore::bucket::versioning::State as VersioningState;
use ecstore::bucket::versioning::Versioning;
use ecstore::bucket::versioning_sys::BucketVersioningSys;
use ecstore::disk::error::DiskError;
use ecstore::new_object_layer_fn;
@@ -34,9 +37,9 @@ use s3s::S3ErrorCode;
use s3s::S3Result;
use s3s::S3;
use s3s::{S3Request, S3Response};
use std::collections::HashMap;
use std::fmt::Debug;
use std::str::FromStr;
use tracing::info;
use transform_stream::AsyncTryStream;
use uuid::Uuid;
@@ -73,7 +76,11 @@ impl S3 for FS {
fields(start_time=?time::OffsetDateTime::now_utc())
)]
async fn create_bucket(&self, req: S3Request<CreateBucketInput>) -> S3Result<S3Response<CreateBucketOutput>> {
let input = req.input;
let CreateBucketInput {
bucket,
object_lock_enabled_for_bucket,
..
} = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
@@ -84,7 +91,14 @@ impl S3 for FS {
try_!(
store
.make_bucket(&input.bucket, &MakeBucketOptions { force_create: true })
.make_bucket(
&bucket,
&MakeBucketOptions {
force_create: true,
lock_enabled: object_lock_enabled_for_bucket.is_some_and(|v| v),
..Default::default()
}
)
.await
);
@@ -690,36 +704,6 @@ impl S3 for FS {
Ok(S3Response::new(AbortMultipartUploadOutput { ..Default::default() }))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn put_bucket_tagging(&self, req: S3Request<PutBucketTaggingInput>) -> S3Result<S3Response<PutBucketTaggingOutput>> {
let PutBucketTaggingInput { bucket, tagging, .. } = req.input;
log::debug!("bucket: {bucket}, tagging: {tagging:?}");
// check bucket exists.
let _bucket = self
.head_bucket(S3Request::new(HeadBucketInput {
bucket: bucket.clone(),
expected_bucket_owner: None,
}))
.await?;
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let mut bucket_meta_sys = bucket_meta_sys_lock.write().await;
let mut tag_map = HashMap::new();
for tag in tagging.tag_set.iter() {
tag_map.insert(tag.key.clone(), tag.value.clone());
}
let tags = Tags::new(tag_map, false);
let data = try_!(tags.marshal_msg());
let _updated = try_!(bucket_meta_sys.update(&bucket, BUCKET_TAGGING_CONFIG, data).await);
Ok(S3Response::new(Default::default()))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn get_bucket_tagging(&self, req: S3Request<GetBucketTaggingInput>) -> S3Result<S3Response<GetBucketTaggingOutput>> {
let GetBucketTaggingInput { bucket, .. } = req.input;
@@ -731,49 +715,53 @@ impl S3 for FS {
}))
.await?;
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let bucket_meta_sys = bucket_meta_sys_lock.read().await;
let tag_set: Vec<Tag> = match bucket_meta_sys.get_tagging_config(&bucket).await {
Ok((tags, _)) => tags
.tag_set
.tag_map
.into_iter()
.map(|(key, value)| Tag { key, value })
.collect(),
let Tagging { tag_set } = match metadata_sys::get_tagging_config(&bucket).await {
Ok((tags, _)) => tags,
Err(err) => {
warn!("get_tagging_config err {:?}", &err);
// TODO: check not found
Vec::new()
Tagging::default()
}
};
Ok(S3Response::new(GetBucketTaggingOutput { tag_set }))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn put_bucket_tagging(&self, req: S3Request<PutBucketTaggingInput>) -> S3Result<S3Response<PutBucketTaggingOutput>> {
let PutBucketTaggingInput { bucket, tagging, .. } = req.input;
log::debug!("bucket: {bucket}, tagging: {tagging:?}");
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = match lock.as_ref() {
Some(s) => s,
None => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())),
};
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
}
let data = try_!(metadata::serialize(&tagging));
try_!(metadata_sys::update(&bucket, BUCKET_TAGGING_CONFIG, data).await);
Ok(S3Response::new(Default::default()))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn delete_bucket_tagging(
&self,
req: S3Request<DeleteBucketTaggingInput>,
) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
let DeleteBucketTaggingInput { bucket, .. } = req.input;
// check bucket exists.
let _bucket = self
.head_bucket(S3Request::new(HeadBucketInput {
bucket: bucket.clone(),
expected_bucket_owner: None,
}))
.await?;
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let mut bucket_meta_sys = bucket_meta_sys_lock.write().await;
let tag_map = HashMap::new();
let tags = Tags::new(tag_map, false);
let data = try_!(tags.marshal_msg());
let _updated = try_!(bucket_meta_sys.update(&bucket, BUCKET_TAGGING_CONFIG, data).await);
try_!(metadata_sys::delete(&bucket, BUCKET_TAGGING_CONFIG).await);
Ok(S3Response::new(DeleteBucketTaggingOutput {}))
}
@@ -872,16 +860,11 @@ impl S3 for FS {
}
}
let cfg = try_!(BucketVersioningSys::get(&bucket).await);
let status = match cfg.status {
VersioningState::Enabled => Some(BucketVersioningStatus::from_static(BucketVersioningStatus::ENABLED)),
VersioningState::Suspended => Some(BucketVersioningStatus::from_static(BucketVersioningStatus::SUSPENDED)),
};
let VersioningConfiguration { status, .. } = try_!(BucketVersioningSys::get(&bucket).await);
Ok(S3Response::new(GetBucketVersioningOutput {
mfa_delete: None,
status,
..Default::default()
}))
}
@@ -901,28 +884,9 @@ impl S3 for FS {
// check bucket object lock enable
// check replication suspended
let mut cfg = match BucketVersioningSys::get(&bucket).await {
Ok(res) => res,
Err(err) => {
warn!("BucketVersioningSys::get err {:?}", err);
Versioning::default()
}
};
let data = try_!(metadata::serialize(&versioning_configuration));
if let Some(verstatus) = versioning_configuration.status {
cfg.status = match verstatus.as_str() {
BucketVersioningStatus::ENABLED => VersioningState::Enabled,
BucketVersioningStatus::SUSPENDED => VersioningState::Suspended,
_ => return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init")),
}
}
let data = try_!(cfg.marshal_msg());
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let mut bucket_meta_sys = bucket_meta_sys_lock.write().await;
try_!(bucket_meta_sys.update(&bucket, BUCKET_VERSIONING_CONFIG, data).await);
try_!(metadata_sys::update(&bucket, BUCKET_VERSIONING_CONFIG, data).await);
// TODO: globalSiteReplicationSys.BucketMetaHook
@@ -963,7 +927,7 @@ impl S3 for FS {
}
};
let policys = try_!(cfg.marshal_msg());
let policys = try_!(serde_json::to_string(&cfg));
Ok(S3Response::new(GetBucketPolicyOutput { policy: Some(policys) }))
}
@@ -986,20 +950,15 @@ impl S3 for FS {
}
let cfg = try_!(BucketPolicy::unmarshal(policy.as_bytes()));
warn!("put_bucket_policy struct {:?}", &cfg);
if let Err(err) = cfg.validate(&bucket) {
warn!("put_bucket_policy input {:?}", &policy);
warn!("cfg.validate err {:?}", err);
if let Err(err) = cfg.validate(&bucket) {
warn!("put_bucket_policy err input {:?}, {:?}", &policy, err);
return Err(s3_error!(InvalidPolicyDocument));
}
let data = try_!(cfg.marshal_msg());
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let mut bucket_meta_sys = bucket_meta_sys_lock.write().await;
try_!(bucket_meta_sys.update(&bucket, BUCKET_POLICY_CONFIG, data.into()).await);
try_!(metadata_sys::update(&bucket, BUCKET_POLICY_CONFIG, data.into()).await);
Ok(S3Response::new(PutBucketPolicyOutput {}))
}
@@ -1024,10 +983,7 @@ impl S3 for FS {
}
}
let bucket_meta_sys_lock = get_bucket_metadata_sys().await;
let mut bucket_meta_sys = bucket_meta_sys_lock.write().await;
try_!(bucket_meta_sys.delete(&bucket, BUCKET_POLICY_CONFIG).await);
try_!(metadata_sys::delete(&bucket, BUCKET_POLICY_CONFIG).await);
Ok(S3Response::new(DeleteBucketPolicyOutput {}))
}
@@ -1035,99 +991,409 @@ impl S3 for FS {
#[tracing::instrument(level = "debug", skip(self))]
async fn get_bucket_lifecycle_configuration(
&self,
_req: S3Request<GetBucketLifecycleConfigurationInput>,
req: S3Request<GetBucketLifecycleConfigurationInput>,
) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
Err(s3_error!(NotImplemented, "GetBucketLifecycleConfiguration is not implemented yet"))
let GetBucketLifecycleConfigurationInput { bucket, .. } = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
}
let rules = match metadata_sys::get_lifecycle_config(&bucket).await {
Ok((cfg, _)) => Some(cfg.rules),
Err(_err) => {
// if BucketMetadataError::BucketLifecycleNotFound.is(&err) {
// return Err(s3_error!(NoSuchLifecycleConfiguration));
// }
// warn!("get_lifecycle_config err {:?}", err);
None
}
};
Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
rules,
..Default::default()
}))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn put_bucket_lifecycle_configuration(
&self,
_req: S3Request<PutBucketLifecycleConfigurationInput>,
req: S3Request<PutBucketLifecycleConfigurationInput>,
) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
Err(s3_error!(NotImplemented, "PutBucketLifecycleConfiguration is not implemented yet"))
let PutBucketLifecycleConfigurationInput {
bucket,
lifecycle_configuration,
..
} = req.input;
warn!("lifecycle_configuration {:?}", &lifecycle_configuration);
// TODO: objcetLock
let Some(input_cfg) = lifecycle_configuration else { return Err(s3_error!(InvalidArgument)) };
let data = try_!(metadata::serialize(&input_cfg));
try_!(metadata_sys::update(&bucket, BUCKET_LIFECYCLE_CONFIG, data).await);
Ok(S3Response::new(PutBucketLifecycleConfigurationOutput::default()))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn delete_bucket_lifecycle(
&self,
_req: S3Request<DeleteBucketLifecycleInput>,
req: S3Request<DeleteBucketLifecycleInput>,
) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
Err(s3_error!(NotImplemented, "DeleteBucketLifecycle is not implemented yet"))
let DeleteBucketLifecycleInput { bucket, .. } = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
}
try_!(metadata_sys::delete(&bucket, BUCKET_LIFECYCLE_CONFIG).await);
Ok(S3Response::new(DeleteBucketLifecycleOutput::default()))
}
async fn get_bucket_encryption(
&self,
_req: S3Request<GetBucketEncryptionInput>,
req: S3Request<GetBucketEncryptionInput>,
) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
Err(s3_error!(NotImplemented, "GetBucketEncryption is not implemented yet"))
let GetBucketEncryptionInput { bucket, .. } = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
}
let server_side_encryption_configuration = match metadata_sys::get_sse_config(&bucket).await {
Ok((cfg, _)) => Some(cfg),
Err(err) => {
// if BucketMetadataError::BucketLifecycleNotFound.is(&err) {
// return Err(s3_error!(ErrNoSuchBucketSSEConfig));
// }
warn!("get_sse_config err {:?}", err);
None
}
};
Ok(S3Response::new(GetBucketEncryptionOutput {
server_side_encryption_configuration,
}))
}
async fn put_bucket_encryption(
&self,
_req: S3Request<PutBucketEncryptionInput>,
req: S3Request<PutBucketEncryptionInput>,
) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
Err(s3_error!(NotImplemented, "PutBucketEncryption is not implemented yet"))
let PutBucketEncryptionInput {
bucket,
server_side_encryption_configuration,
..
} = req.input;
info!("sse_config {:?}", &server_side_encryption_configuration);
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
}
// TODO: check kms
let data = try_!(metadata::serialize(&server_side_encryption_configuration));
try_!(metadata_sys::update(&bucket, BUCKET_SSECONFIG, data).await);
Ok(S3Response::new(PutBucketEncryptionOutput::default()))
}
async fn delete_bucket_encryption(
&self,
_req: S3Request<DeleteBucketEncryptionInput>,
req: S3Request<DeleteBucketEncryptionInput>,
) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
Err(s3_error!(NotImplemented, "DeleteBucketEncryption is not implemented yet"))
let DeleteBucketEncryptionInput { bucket, .. } = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
}
try_!(metadata_sys::delete(&bucket, BUCKET_SSECONFIG).await);
Ok(S3Response::new(DeleteBucketEncryptionOutput::default()))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn get_object_lock_configuration(
&self,
_req: S3Request<GetObjectLockConfigurationInput>,
req: S3Request<GetObjectLockConfigurationInput>,
) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
// mc cp step 1
let output = GetObjectLockConfigurationOutput::default();
Ok(S3Response::new(output))
let GetObjectLockConfigurationInput { bucket, .. } = req.input;
let object_lock_configuration = match metadata_sys::get_object_lock_config(&bucket).await {
Ok((cfg, _created)) => Some(cfg),
Err(err) => {
warn!("get_object_lock_config err {:?}", err);
None
}
};
warn!("object_lock_configuration {:?}", &object_lock_configuration);
Ok(S3Response::new(GetObjectLockConfigurationOutput {
object_lock_configuration,
}))
}
#[tracing::instrument(level = "debug", skip(self))]
async fn put_object_lock_configuration(
&self,
_req: S3Request<PutObjectLockConfigurationInput>,
req: S3Request<PutObjectLockConfigurationInput>,
) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
Err(s3_error!(NotImplemented, "PutObjectLockConfiguration is not implemented yet"))
let PutObjectLockConfigurationInput {
bucket,
object_lock_configuration,
..
} = req.input;
let Some(input_cfg) = object_lock_configuration else { return Err(s3_error!(InvalidArgument)) };
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
}
let data = try_!(metadata::serialize(&input_cfg));
try_!(metadata_sys::update(&bucket, OBJECT_LOCK_CONFIG, data).await);
Ok(S3Response::new(PutObjectLockConfigurationOutput::default()))
}
async fn get_bucket_replication(
&self,
_req: S3Request<GetBucketReplicationInput>,
req: S3Request<GetBucketReplicationInput>,
) -> S3Result<S3Response<GetBucketReplicationOutput>> {
Err(s3_error!(NotImplemented, "GetBucketReplication is not implemented yet"))
let GetBucketReplicationInput { bucket, .. } = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
}
let replication_configuration = match metadata_sys::get_replication_config(&bucket).await {
Ok((cfg, _created)) => Some(cfg),
Err(err) => {
warn!("get_object_lock_config err {:?}", err);
None
}
};
Ok(S3Response::new(GetBucketReplicationOutput {
replication_configuration,
}))
}
async fn put_bucket_replication(
&self,
_req: S3Request<PutBucketReplicationInput>,
req: S3Request<PutBucketReplicationInput>,
) -> S3Result<S3Response<PutBucketReplicationOutput>> {
Err(s3_error!(NotImplemented, "PutBucketReplication is not implemented yet"))
let PutBucketReplicationInput {
bucket,
replication_configuration,
..
} = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
}
// TODO: check enable, versioning enable
let data = try_!(metadata::serialize(&replication_configuration));
try_!(metadata_sys::update(&bucket, BUCKET_REPLICATION_CONFIG, data).await);
Ok(S3Response::new(PutBucketReplicationOutput::default()))
}
async fn delete_bucket_replication(
&self,
_req: S3Request<DeleteBucketReplicationInput>,
req: S3Request<DeleteBucketReplicationInput>,
) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
Err(s3_error!(NotImplemented, "DeleteBucketReplication is not implemented yet"))
let DeleteBucketReplicationInput { bucket, .. } = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
}
try_!(metadata_sys::delete(&bucket, BUCKET_REPLICATION_CONFIG).await);
// TODO: remove targets
Ok(S3Response::new(DeleteBucketReplicationOutput::default()))
}
async fn get_bucket_notification_configuration(
&self,
_req: S3Request<GetBucketNotificationConfigurationInput>,
req: S3Request<GetBucketNotificationConfigurationInput>,
) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
Err(s3_error!(NotImplemented, "GetBucketNotificationConfiguration is not implemented yet"))
let GetBucketNotificationConfigurationInput { bucket, .. } = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
}
let has_notification_config = match metadata_sys::get_notification_config(&bucket).await {
Ok(cfg) => cfg,
Err(err) => {
warn!("get_notification_config err {:?}", err);
None
}
};
// TODO: valid target list
if let Some(NotificationConfiguration {
event_bridge_configuration,
lambda_function_configurations,
queue_configurations,
topic_configurations,
}) = has_notification_config
{
Ok(S3Response::new(GetBucketNotificationConfigurationOutput {
event_bridge_configuration,
lambda_function_configurations,
queue_configurations,
topic_configurations,
}))
} else {
Ok(S3Response::new(GetBucketNotificationConfigurationOutput::default()))
}
}
async fn put_bucket_notification_configuration(
&self,
_req: S3Request<PutBucketNotificationConfigurationInput>,
req: S3Request<PutBucketNotificationConfigurationInput>,
) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
Err(s3_error!(NotImplemented, "PutBucketNotificationConfiguration is not implemented yet"))
let PutBucketNotificationConfigurationInput {
bucket,
notification_configuration,
..
} = req.input;
let layer = new_object_layer_fn();
let lock = layer.read().await;
let store = lock
.as_ref()
.ok_or_else(|| S3Error::with_message(S3ErrorCode::InternalError, "Not init"))?;
if let Err(e) = store.get_bucket_info(&bucket, &BucketOptions::default()).await {
if DiskError::VolumeNotFound.is(&e) {
return Err(s3_error!(NoSuchBucket));
} else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{}", e)));
}
}
let data = try_!(metadata::serialize(&notification_configuration));
try_!(metadata_sys::update(&bucket, BUCKET_NOTIFICATION_CONFIG, data).await);
// TODO: event notice add rule
Ok(S3Response::new(PutBucketNotificationConfigurationOutput::default()))
}
}
@@ -1151,15 +1417,3 @@ where
Ok(())
})
}
// Consumes this body object to return a bytes stream.
// pub fn into_bytes_stream(mut body: StreamingBlob) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static {
// futures_util::stream::poll_fn(move |ctx| loop {
// match Pin::new(&mut body).poll_next(ctx) {
// Poll::Ready(Some(Ok(data))) => return Poll::Ready(Some(Ok(data))),
// Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(std::io::Error::new(std::io::ErrorKind::Other, err)))),
// Poll::Ready(None) => return Poll::Ready(None),
// Poll::Pending => return Poll::Pending,
// }
// })
// }