diff --git a/Cargo.lock b/Cargo.lock index 9f6e2a63..0dd4d78d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -764,9 +764,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.99.0" +version = "1.100.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2d64d68c93000d5792b2a25fbeaafb90985fa80a1c8adfe93f24fb271296f5f" +checksum = "8c5eafbdcd898114b839ba68ac628e31c4cfc3e11dfca38dc1b2de2f35bb6270" dependencies = [ "aws-credential-types", "aws-runtime", @@ -798,9 +798,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.77.0" +version = "1.78.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18f2f37fea82468fe3f5a059542c05392ef680c4f7f00e0db02df8b6e5c7d0c6" +checksum = "dbd7bc4bd34303733bded362c4c997a39130eac4310257c79aae8484b1c4b724" dependencies = [ "aws-credential-types", "aws-runtime", @@ -820,9 +820,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.78.0" +version = "1.79.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecb4f6eada20e0193450cd48b12ed05e1e66baac86f39160191651b932f2b7d9" +checksum = "77358d25f781bb106c1a69531231d4fd12c6be904edb0c47198c604df5a2dbca" dependencies = [ "aws-credential-types", "aws-runtime", @@ -842,9 +842,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.79.0" +version = "1.80.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317377afba3498fca4948c5d32b399ef9a5ad35561a1e8a6f2ac7273dabf802d" +checksum = "06e3ed2a9b828ae7763ddaed41d51724d2661a50c45f845b08967e52f4939cfc" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1037,9 +1037,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.8.4" +version = "1.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38280ac228bc479f347fcfccf4bf4d22d68f3bb4629685cb591cabd856567bbc" +checksum = "937a49ecf061895fca4a6dd8e864208ed9be7546c0527d04bc07d502ec5fba1c" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -2991,6 +2991,12 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "diff" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" + [[package]] name = "digest" version = "0.10.7" @@ -5476,7 +5482,7 @@ checksum = "4488594b9328dee448adb906d8b126d9b7deb7cf5c22161ee591610bb1be83c0" dependencies = [ "bitflags 2.9.1", "libc", - "redox_syscall 0.5.15", + "redox_syscall 0.5.16", ] [[package]] @@ -6685,7 +6691,7 @@ checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.5.15", + "redox_syscall 0.5.16", "smallvec", "windows-targets 0.52.6", ] @@ -7142,10 +7148,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" [[package]] -name = "prettyplease" -version = "0.2.35" +name = "pretty_assertions" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "061c1221631e079b26479d25bbf2275bfe5917ae8419cd7e34f13bfc2aa7539a" +checksum = "3ae130e2f271fbc2ac3a40fb1d07180839cdbbe443c7a27e1e3c13c5cac0116d" +dependencies = [ + "diff", + "yansi", +] + +[[package]] +name = "prettyplease" +version = "0.2.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff24dfcda44452b9816fff4cd4227e1bb73ff5a2f1bc1105aa92fb8565ce44d2" dependencies = [ "proc-macro2", "syn 2.0.104", @@ -7591,9 +7607,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.15" +version = "0.5.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e8af0dde094006011e6a740d4879319439489813bd0bcdc7d821beaeeff48ec" +checksum = "7251471db004e509f4e75a62cca9435365b5ec7bcdff530d612ac7c87c44a792" dependencies = [ "bitflags 2.9.1", ] @@ -8077,6 +8093,27 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rustfs-checksums" +version = "0.0.5" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "crc-fast", + "hex", + "http 1.3.1", + "http-body 1.0.1", + "md-5", + "pin-project-lite", + "pretty_assertions", + "sha1 0.10.6", + "sha2 0.10.9", + "tokio", + "tracing", + "tracing-test", +] + [[package]] name = "rustfs-common" version = "0.0.5" @@ -8154,12 +8191,14 @@ dependencies = [ "path-absolutize", "path-clean", "pin-project-lite", + "quick-xml 0.38.0", "rand 0.9.2", "reed-solomon-simd", "regex", "reqwest", "rmp", "rmp-serde", + "rustfs-checksums", "rustfs-common", "rustfs-config", "rustfs-filemeta", @@ -8175,8 +8214,8 @@ dependencies = [ "rustls 0.23.29", "s3s", "serde", - "serde-xml-rs 0.8.1", "serde_json", + "sha1 0.10.6", "sha2 0.10.9", "shadow-rs", "smallvec", @@ -8430,7 +8469,7 @@ dependencies = [ "regex", "reqwest", "serde", - "serde-xml-rs 0.6.0", + "serde-xml-rs", "sha2 0.10.9", "urlencoding", ] @@ -8985,18 +9024,6 @@ dependencies = [ "xml-rs", ] -[[package]] -name = "serde-xml-rs" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53630160a98edebde0123eb4dfd0fce6adff091b2305db3154a9e920206eb510" -dependencies = [ - "log", - "serde", - "thiserror 1.0.69", - "xml-rs", -] - [[package]] name = "serde_derive" version = "1.0.219" @@ -10082,9 +10109,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.46.1" +version = "1.47.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17" +checksum = "43864ed400b6043a4757a25c7a64a8efde741aed79a056a2fb348a406701bb35" dependencies = [ "backtrace", "bytes", @@ -10095,10 +10122,10 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "slab", - "socket2 0.5.10", + "socket2 0.6.0", "tokio-macros", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -10474,6 +10501,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.104", +] + [[package]] name = "tracing-wasm" version = "0.2.1" @@ -11813,6 +11861,12 @@ dependencies = [ "lzma-sys", ] +[[package]] +name = "yansi" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" + [[package]] name = "yoke" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 84e2e9c7..8800ceef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ "crates/s3select-api", # S3 Select API interface "crates/s3select-query", # S3 Select query engine "crates/signer", # client signer + "crates/checksums", # client checksums "crates/utils", # Utility functions and helpers "crates/workers", # Worker thread pools and task scheduling "crates/zip", # ZIP file handling and compression @@ -84,6 +85,7 @@ rustfs-utils = { path = "crates/utils", version = "0.0.5" } rustfs-rio = { path = "crates/rio", version = "0.0.5" } rustfs-filemeta = { path = "crates/filemeta", version = "0.0.5" } rustfs-signer = { path = "crates/signer", version = "0.0.5" } +rustfs-checksums = { path = "crates/checksums", version = "0.0.5" } rustfs-workers = { path = "crates/workers", version = "0.0.5" } aes-gcm = { version = "0.10.3", features = ["std"] } arc-swap = "1.7.1" @@ -215,7 +217,6 @@ s3s = { version = "0.12.0-minio-preview.2" } shadow-rs = { version = "1.2.0", default-features = false } serde = { version = "1.0.219", features = ["derive"] } serde_json = { version = "1.0.141", features = ["raw_value"] } -serde-xml-rs = "0.8.1" serde_urlencoded = "0.7.1" sha1 = "0.10.6" sha2 = "0.10.9" diff --git a/cli/rustfs-gui/src/utils/helper.rs b/cli/rustfs-gui/src/utils/helper.rs index d13261c6..ec63f931 100644 --- a/cli/rustfs-gui/src/utils/helper.rs +++ b/cli/rustfs-gui/src/utils/helper.rs @@ -371,7 +371,7 @@ impl ServiceManager { StdCommand::new("taskkill") .arg("/F") .arg("/PID") - .arg(&service_pid.to_string()) + .arg(service_pid.to_string()) .output()?; } diff --git a/crates/checksums/Cargo.toml b/crates/checksums/Cargo.toml new file mode 100644 index 00000000..71a6e547 --- /dev/null +++ b/crates/checksums/Cargo.toml @@ -0,0 +1,48 @@ +# Copyright 2024 RustFS Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[package] +name = "rustfs-checksums" +edition.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true +homepage.workspace = true +description = "Checksum calculation and verification callbacks for HTTP request and response bodies sent by service clients generated by RustFS, ensuring data integrity and authenticity." +keywords = ["checksum-calculation", "verification", "integrity", "authenticity", "rustfs", "Minio"] +categories = ["web-programming", "development-tools", "checksum"] +documentation = "https://docs.rs/rustfs-signer/latest/rustfs_checksum/" + +[dependencies] +bytes = { workspace = true } +crc-fast = "1.3.0" +hex = { workspace = true } +http = { workspace = true } +http-body = { workspace = true } +base64-simd = { workspace = true } +md-5 = { workspace = true } +pin-project-lite = { workspace = true } +sha1 = { workspace = true } +sha2 = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +bytes-utils = "0.1.2" +pretty_assertions = "1.3" +tracing-test = "0.2.1" + +[dev-dependencies.tokio] +version = "1.23.1" +features = ["macros", "rt"] diff --git a/crates/checksums/README.md b/crates/checksums/README.md new file mode 100644 index 00000000..bc74d854 --- /dev/null +++ b/crates/checksums/README.md @@ -0,0 +1,3 @@ +# rustfs-checksums + +Checksum calculation and verification callbacks for HTTP request and response bodies sent by service clients generated by RustFS object storage. diff --git a/crates/checksums/src/base64.rs b/crates/checksums/src/base64.rs new file mode 100644 index 00000000..d434a4f0 --- /dev/null +++ b/crates/checksums/src/base64.rs @@ -0,0 +1,44 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#![allow(dead_code)] + +use base64_simd::STANDARD; +use std::error::Error; + +#[derive(Debug)] +pub(crate) struct DecodeError(base64_simd::Error); + +impl Error for DecodeError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + Some(&self.0) + } +} + +impl std::fmt::Display for DecodeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "failed to decode base64") + } +} + +pub(crate) fn decode(input: impl AsRef) -> Result, DecodeError> { + STANDARD.decode_to_vec(input.as_ref()).map_err(DecodeError) +} + +pub(crate) fn encode(input: impl AsRef<[u8]>) -> String { + STANDARD.encode_to_string(input.as_ref()) +} + +pub(crate) fn encoded_length(length: usize) -> usize { + STANDARD.encoded_length(length) +} diff --git a/crates/checksums/src/error.rs b/crates/checksums/src/error.rs new file mode 100644 index 00000000..fcdb6d8d --- /dev/null +++ b/crates/checksums/src/error.rs @@ -0,0 +1,45 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::error::Error; +use std::fmt; + +#[derive(Debug)] +pub struct UnknownChecksumAlgorithmError { + checksum_algorithm: String, +} + +impl UnknownChecksumAlgorithmError { + pub(crate) fn new(checksum_algorithm: impl Into) -> Self { + Self { + checksum_algorithm: checksum_algorithm.into(), + } + } + + pub fn checksum_algorithm(&self) -> &str { + &self.checksum_algorithm + } +} + +impl fmt::Display for UnknownChecksumAlgorithmError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + r#"unknown checksum algorithm "{}", please pass a known algorithm name ("crc32", "crc32c", "sha1", "sha256", "md5")"#, + self.checksum_algorithm + ) + } +} + +impl Error for UnknownChecksumAlgorithmError {} diff --git a/crates/checksums/src/http.rs b/crates/checksums/src/http.rs new file mode 100644 index 00000000..915dd976 --- /dev/null +++ b/crates/checksums/src/http.rs @@ -0,0 +1,196 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::base64; +use http::header::{HeaderMap, HeaderValue}; + +use crate::Crc64Nvme; +use crate::{CRC_32_C_NAME, CRC_32_NAME, CRC_64_NVME_NAME, Checksum, Crc32, Crc32c, Md5, SHA_1_NAME, SHA_256_NAME, Sha1, Sha256}; + +pub const CRC_32_HEADER_NAME: &str = "x-amz-checksum-crc32"; +pub const CRC_32_C_HEADER_NAME: &str = "x-amz-checksum-crc32c"; +pub const SHA_1_HEADER_NAME: &str = "x-amz-checksum-sha1"; +pub const SHA_256_HEADER_NAME: &str = "x-amz-checksum-sha256"; +pub const CRC_64_NVME_HEADER_NAME: &str = "x-amz-checksum-crc64nvme"; + +pub(crate) static MD5_HEADER_NAME: &str = "content-md5"; + +pub const CHECKSUM_ALGORITHMS_IN_PRIORITY_ORDER: [&str; 5] = + [CRC_64_NVME_NAME, CRC_32_C_NAME, CRC_32_NAME, SHA_1_NAME, SHA_256_NAME]; + +pub trait HttpChecksum: Checksum + Send + Sync { + fn headers(self: Box) -> HeaderMap { + let mut header_map = HeaderMap::new(); + header_map.insert(self.header_name(), self.header_value()); + + header_map + } + + fn header_name(&self) -> &'static str; + + fn header_value(self: Box) -> HeaderValue { + let hash = self.finalize(); + HeaderValue::from_str(&base64::encode(&hash[..])).expect("base64 encoded bytes are always valid header values") + } + + fn size(&self) -> u64 { + let trailer_name_size_in_bytes = self.header_name().len(); + let base64_encoded_checksum_size_in_bytes = base64::encoded_length(Checksum::size(self) as usize); + + let size = trailer_name_size_in_bytes + ":".len() + base64_encoded_checksum_size_in_bytes; + + size as u64 + } +} + +impl HttpChecksum for Crc32 { + fn header_name(&self) -> &'static str { + CRC_32_HEADER_NAME + } +} + +impl HttpChecksum for Crc32c { + fn header_name(&self) -> &'static str { + CRC_32_C_HEADER_NAME + } +} + +impl HttpChecksum for Crc64Nvme { + fn header_name(&self) -> &'static str { + CRC_64_NVME_HEADER_NAME + } +} + +impl HttpChecksum for Sha1 { + fn header_name(&self) -> &'static str { + SHA_1_HEADER_NAME + } +} + +impl HttpChecksum for Sha256 { + fn header_name(&self) -> &'static str { + SHA_256_HEADER_NAME + } +} + +impl HttpChecksum for Md5 { + fn header_name(&self) -> &'static str { + MD5_HEADER_NAME + } +} + +#[cfg(test)] +mod tests { + use crate::base64; + use bytes::Bytes; + + use crate::{CRC_32_C_NAME, CRC_32_NAME, CRC_64_NVME_NAME, ChecksumAlgorithm, SHA_1_NAME, SHA_256_NAME}; + + use super::HttpChecksum; + + #[test] + fn test_trailer_length_of_crc32_checksum_body() { + let checksum = CRC_32_NAME.parse::().unwrap().into_impl(); + let expected_size = 29; + let actual_size = HttpChecksum::size(&*checksum); + assert_eq!(expected_size, actual_size) + } + + #[test] + fn test_trailer_value_of_crc32_checksum_body() { + let checksum = CRC_32_NAME.parse::().unwrap().into_impl(); + // The CRC32 of an empty string is all zeroes + let expected_value = Bytes::from_static(b"\0\0\0\0"); + let expected_value = base64::encode(&expected_value); + let actual_value = checksum.header_value(); + assert_eq!(expected_value, actual_value) + } + + #[test] + fn test_trailer_length_of_crc32c_checksum_body() { + let checksum = CRC_32_C_NAME.parse::().unwrap().into_impl(); + let expected_size = 30; + let actual_size = HttpChecksum::size(&*checksum); + assert_eq!(expected_size, actual_size) + } + + #[test] + fn test_trailer_value_of_crc32c_checksum_body() { + let checksum = CRC_32_C_NAME.parse::().unwrap().into_impl(); + // The CRC32C of an empty string is all zeroes + let expected_value = Bytes::from_static(b"\0\0\0\0"); + let expected_value = base64::encode(&expected_value); + let actual_value = checksum.header_value(); + assert_eq!(expected_value, actual_value) + } + + #[test] + fn test_trailer_length_of_crc64nvme_checksum_body() { + let checksum = CRC_64_NVME_NAME.parse::().unwrap().into_impl(); + let expected_size = 37; + let actual_size = HttpChecksum::size(&*checksum); + assert_eq!(expected_size, actual_size) + } + + #[test] + fn test_trailer_value_of_crc64nvme_checksum_body() { + let checksum = CRC_64_NVME_NAME.parse::().unwrap().into_impl(); + // The CRC64NVME of an empty string is all zeroes + let expected_value = Bytes::from_static(b"\0\0\0\0\0\0\0\0"); + let expected_value = base64::encode(&expected_value); + let actual_value = checksum.header_value(); + assert_eq!(expected_value, actual_value) + } + + #[test] + fn test_trailer_length_of_sha1_checksum_body() { + let checksum = SHA_1_NAME.parse::().unwrap().into_impl(); + let expected_size = 48; + let actual_size = HttpChecksum::size(&*checksum); + assert_eq!(expected_size, actual_size) + } + + #[test] + fn test_trailer_value_of_sha1_checksum_body() { + let checksum = SHA_1_NAME.parse::().unwrap().into_impl(); + // The SHA1 of an empty string is da39a3ee5e6b4b0d3255bfef95601890afd80709 + let expected_value = Bytes::from_static(&[ + 0xda, 0x39, 0xa3, 0xee, 0x5e, 0x6b, 0x4b, 0x0d, 0x32, 0x55, 0xbf, 0xef, 0x95, 0x60, 0x18, 0x90, 0xaf, 0xd8, 0x07, + 0x09, + ]); + let expected_value = base64::encode(&expected_value); + let actual_value = checksum.header_value(); + assert_eq!(expected_value, actual_value) + } + + #[test] + fn test_trailer_length_of_sha256_checksum_body() { + let checksum = SHA_256_NAME.parse::().unwrap().into_impl(); + let expected_size = 66; + let actual_size = HttpChecksum::size(&*checksum); + assert_eq!(expected_size, actual_size) + } + + #[test] + fn test_trailer_value_of_sha256_checksum_body() { + let checksum = SHA_256_NAME.parse::().unwrap().into_impl(); + let expected_value = Bytes::from_static(&[ + 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41, + 0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55, + ]); + let expected_value = base64::encode(&expected_value); + let actual_value = checksum.header_value(); + assert_eq!(expected_value, actual_value) + } +} diff --git a/crates/checksums/src/lib.rs b/crates/checksums/src/lib.rs new file mode 100644 index 00000000..4501725a --- /dev/null +++ b/crates/checksums/src/lib.rs @@ -0,0 +1,446 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![cfg_attr(docsrs, feature(doc_auto_cfg))] +#![allow(clippy::derive_partial_eq_without_eq)] +#![warn( + // missing_docs, + rustdoc::missing_crate_level_docs, + unreachable_pub, + rust_2018_idioms +)] + +use crate::error::UnknownChecksumAlgorithmError; + +use bytes::Bytes; +use std::{fmt::Debug, str::FromStr}; + +mod base64; +pub mod error; +pub mod http; + +pub const CRC_32_NAME: &str = "crc32"; +pub const CRC_32_C_NAME: &str = "crc32c"; +pub const CRC_64_NVME_NAME: &str = "crc64nvme"; +pub const SHA_1_NAME: &str = "sha1"; +pub const SHA_256_NAME: &str = "sha256"; +pub const MD5_NAME: &str = "md5"; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +#[non_exhaustive] +pub enum ChecksumAlgorithm { + #[default] + Crc32, + Crc32c, + #[deprecated] + Md5, + Sha1, + Sha256, + Crc64Nvme, +} + +impl FromStr for ChecksumAlgorithm { + type Err = UnknownChecksumAlgorithmError; + + fn from_str(checksum_algorithm: &str) -> Result { + if checksum_algorithm.eq_ignore_ascii_case(CRC_32_NAME) { + Ok(Self::Crc32) + } else if checksum_algorithm.eq_ignore_ascii_case(CRC_32_C_NAME) { + Ok(Self::Crc32c) + } else if checksum_algorithm.eq_ignore_ascii_case(SHA_1_NAME) { + Ok(Self::Sha1) + } else if checksum_algorithm.eq_ignore_ascii_case(SHA_256_NAME) { + Ok(Self::Sha256) + } else if checksum_algorithm.eq_ignore_ascii_case(MD5_NAME) { + // MD5 is now an alias for the default Crc32 since it is deprecated + Ok(Self::Crc32) + } else if checksum_algorithm.eq_ignore_ascii_case(CRC_64_NVME_NAME) { + Ok(Self::Crc64Nvme) + } else { + Err(UnknownChecksumAlgorithmError::new(checksum_algorithm)) + } + } +} + +impl ChecksumAlgorithm { + pub fn into_impl(self) -> Box { + match self { + Self::Crc32 => Box::::default(), + Self::Crc32c => Box::::default(), + Self::Crc64Nvme => Box::::default(), + #[allow(deprecated)] + Self::Md5 => Box::::default(), + Self::Sha1 => Box::::default(), + Self::Sha256 => Box::::default(), + } + } + + pub fn as_str(&self) -> &'static str { + match self { + Self::Crc32 => CRC_32_NAME, + Self::Crc32c => CRC_32_C_NAME, + Self::Crc64Nvme => CRC_64_NVME_NAME, + #[allow(deprecated)] + Self::Md5 => MD5_NAME, + Self::Sha1 => SHA_1_NAME, + Self::Sha256 => SHA_256_NAME, + } + } +} + +pub trait Checksum: Send + Sync { + fn update(&mut self, bytes: &[u8]); + fn finalize(self: Box) -> Bytes; + fn size(&self) -> u64; +} + +#[derive(Debug)] +struct Crc32 { + hasher: crc_fast::Digest, +} + +impl Default for Crc32 { + fn default() -> Self { + Self { + hasher: crc_fast::Digest::new(crc_fast::CrcAlgorithm::Crc32IsoHdlc), + } + } +} + +impl Crc32 { + fn update(&mut self, bytes: &[u8]) { + self.hasher.update(bytes); + } + + fn finalize(self) -> Bytes { + let checksum = self.hasher.finalize() as u32; + + Bytes::copy_from_slice(checksum.to_be_bytes().as_slice()) + } + + fn size() -> u64 { + 4 + } +} + +impl Checksum for Crc32 { + fn update(&mut self, bytes: &[u8]) { + Self::update(self, bytes) + } + fn finalize(self: Box) -> Bytes { + Self::finalize(*self) + } + fn size(&self) -> u64 { + Self::size() + } +} + +#[derive(Debug)] +struct Crc32c { + hasher: crc_fast::Digest, +} + +impl Default for Crc32c { + fn default() -> Self { + Self { + hasher: crc_fast::Digest::new(crc_fast::CrcAlgorithm::Crc32Iscsi), + } + } +} + +impl Crc32c { + fn update(&mut self, bytes: &[u8]) { + self.hasher.update(bytes); + } + + fn finalize(self) -> Bytes { + let checksum = self.hasher.finalize() as u32; + + Bytes::copy_from_slice(checksum.to_be_bytes().as_slice()) + } + + fn size() -> u64 { + 4 + } +} + +impl Checksum for Crc32c { + fn update(&mut self, bytes: &[u8]) { + Self::update(self, bytes) + } + fn finalize(self: Box) -> Bytes { + Self::finalize(*self) + } + fn size(&self) -> u64 { + Self::size() + } +} + +#[derive(Debug)] +struct Crc64Nvme { + hasher: crc_fast::Digest, +} + +impl Default for Crc64Nvme { + fn default() -> Self { + Self { + hasher: crc_fast::Digest::new(crc_fast::CrcAlgorithm::Crc64Nvme), + } + } +} + +impl Crc64Nvme { + fn update(&mut self, bytes: &[u8]) { + self.hasher.update(bytes); + } + + fn finalize(self) -> Bytes { + Bytes::copy_from_slice(self.hasher.finalize().to_be_bytes().as_slice()) + } + + fn size() -> u64 { + 8 + } +} + +impl Checksum for Crc64Nvme { + fn update(&mut self, bytes: &[u8]) { + Self::update(self, bytes) + } + fn finalize(self: Box) -> Bytes { + Self::finalize(*self) + } + fn size(&self) -> u64 { + Self::size() + } +} + +#[derive(Debug, Default)] +struct Sha1 { + hasher: sha1::Sha1, +} + +impl Sha1 { + fn update(&mut self, bytes: &[u8]) { + use sha1::Digest; + self.hasher.update(bytes); + } + + fn finalize(self) -> Bytes { + use sha1::Digest; + Bytes::copy_from_slice(self.hasher.finalize().as_slice()) + } + + fn size() -> u64 { + use sha1::Digest; + sha1::Sha1::output_size() as u64 + } +} + +impl Checksum for Sha1 { + fn update(&mut self, bytes: &[u8]) { + Self::update(self, bytes) + } + + fn finalize(self: Box) -> Bytes { + Self::finalize(*self) + } + fn size(&self) -> u64 { + Self::size() + } +} + +#[derive(Debug, Default)] +struct Sha256 { + hasher: sha2::Sha256, +} + +impl Sha256 { + fn update(&mut self, bytes: &[u8]) { + use sha2::Digest; + self.hasher.update(bytes); + } + + fn finalize(self) -> Bytes { + use sha2::Digest; + Bytes::copy_from_slice(self.hasher.finalize().as_slice()) + } + + fn size() -> u64 { + use sha2::Digest; + sha2::Sha256::output_size() as u64 + } +} + +impl Checksum for Sha256 { + fn update(&mut self, bytes: &[u8]) { + Self::update(self, bytes); + } + fn finalize(self: Box) -> Bytes { + Self::finalize(*self) + } + fn size(&self) -> u64 { + Self::size() + } +} + +#[derive(Debug, Default)] +struct Md5 { + hasher: md5::Md5, +} + +impl Md5 { + fn update(&mut self, bytes: &[u8]) { + use md5::Digest; + self.hasher.update(bytes); + } + + fn finalize(self) -> Bytes { + use md5::Digest; + Bytes::copy_from_slice(self.hasher.finalize().as_slice()) + } + + fn size() -> u64 { + use md5::Digest; + md5::Md5::output_size() as u64 + } +} + +impl Checksum for Md5 { + fn update(&mut self, bytes: &[u8]) { + Self::update(self, bytes) + } + fn finalize(self: Box) -> Bytes { + Self::finalize(*self) + } + fn size(&self) -> u64 { + Self::size() + } +} + +#[cfg(test)] +mod tests { + use super::{ + Crc32, Crc32c, Md5, Sha1, Sha256, + http::{CRC_32_C_HEADER_NAME, CRC_32_HEADER_NAME, MD5_HEADER_NAME, SHA_1_HEADER_NAME, SHA_256_HEADER_NAME}, + }; + + use crate::ChecksumAlgorithm; + use crate::http::HttpChecksum; + + use crate::base64; + use http::HeaderValue; + use pretty_assertions::assert_eq; + use std::fmt::Write; + + const TEST_DATA: &str = r#"test data"#; + + fn base64_encoded_checksum_to_hex_string(header_value: &HeaderValue) -> String { + let decoded_checksum = base64::decode(header_value.to_str().unwrap()).unwrap(); + let decoded_checksum = decoded_checksum.into_iter().fold(String::new(), |mut acc, byte| { + write!(acc, "{byte:02X?}").expect("string will always be writeable"); + acc + }); + + format!("0x{decoded_checksum}") + } + + #[test] + fn test_crc32_checksum() { + let mut checksum = Crc32::default(); + checksum.update(TEST_DATA.as_bytes()); + let checksum_result = Box::new(checksum).headers(); + let encoded_checksum = checksum_result.get(CRC_32_HEADER_NAME).unwrap(); + let decoded_checksum = base64_encoded_checksum_to_hex_string(encoded_checksum); + + let expected_checksum = "0xD308AEB2"; + + assert_eq!(decoded_checksum, expected_checksum); + } + + #[cfg(not(any(target_arch = "powerpc", target_arch = "powerpc64")))] + #[test] + fn test_crc32c_checksum() { + let mut checksum = Crc32c::default(); + checksum.update(TEST_DATA.as_bytes()); + let checksum_result = Box::new(checksum).headers(); + let encoded_checksum = checksum_result.get(CRC_32_C_HEADER_NAME).unwrap(); + let decoded_checksum = base64_encoded_checksum_to_hex_string(encoded_checksum); + + let expected_checksum = "0x3379B4CA"; + + assert_eq!(decoded_checksum, expected_checksum); + } + + #[test] + fn test_crc64nvme_checksum() { + use crate::{Crc64Nvme, http::CRC_64_NVME_HEADER_NAME}; + let mut checksum = Crc64Nvme::default(); + checksum.update(TEST_DATA.as_bytes()); + let checksum_result = Box::new(checksum).headers(); + let encoded_checksum = checksum_result.get(CRC_64_NVME_HEADER_NAME).unwrap(); + let decoded_checksum = base64_encoded_checksum_to_hex_string(encoded_checksum); + + let expected_checksum = "0xAECAF3AF9C98A855"; + + assert_eq!(decoded_checksum, expected_checksum); + } + + #[test] + fn test_sha1_checksum() { + let mut checksum = Sha1::default(); + checksum.update(TEST_DATA.as_bytes()); + let checksum_result = Box::new(checksum).headers(); + let encoded_checksum = checksum_result.get(SHA_1_HEADER_NAME).unwrap(); + let decoded_checksum = base64_encoded_checksum_to_hex_string(encoded_checksum); + + let expected_checksum = "0xF48DD853820860816C75D54D0F584DC863327A7C"; + + assert_eq!(decoded_checksum, expected_checksum); + } + + #[test] + fn test_sha256_checksum() { + let mut checksum = Sha256::default(); + checksum.update(TEST_DATA.as_bytes()); + let checksum_result = Box::new(checksum).headers(); + let encoded_checksum = checksum_result.get(SHA_256_HEADER_NAME).unwrap(); + let decoded_checksum = base64_encoded_checksum_to_hex_string(encoded_checksum); + + let expected_checksum = "0x916F0027A575074CE72A331777C3478D6513F786A591BD892DA1A577BF2335F9"; + + assert_eq!(decoded_checksum, expected_checksum); + } + + #[test] + fn test_md5_checksum() { + let mut checksum = Md5::default(); + checksum.update(TEST_DATA.as_bytes()); + let checksum_result = Box::new(checksum).headers(); + let encoded_checksum = checksum_result.get(MD5_HEADER_NAME).unwrap(); + let decoded_checksum = base64_encoded_checksum_to_hex_string(encoded_checksum); + + let expected_checksum = "0xEB733A00C0C9D336E65691A37AB54293"; + + assert_eq!(decoded_checksum, expected_checksum); + } + + #[test] + fn test_checksum_algorithm_returns_error_for_unknown() { + let error = "some invalid checksum algorithm" + .parse::() + .expect_err("it should error"); + assert_eq!("some invalid checksum algorithm", error.checksum_algorithm()); + } +} diff --git a/crates/ecstore/Cargo.toml b/crates/ecstore/Cargo.toml index 0edb4b9a..0e12a846 100644 --- a/crates/ecstore/Cargo.toml +++ b/crates/ecstore/Cargo.toml @@ -50,7 +50,7 @@ serde.workspace = true time.workspace = true bytesize.workspace = true serde_json.workspace = true -serde-xml-rs.workspace = true +quick-xml.workspace = true s3s.workspace = true http.workspace = true url.workspace = true @@ -66,6 +66,7 @@ rmp-serde.workspace = true tokio-util = { workspace = true, features = ["io", "compat"] } base64 = { workspace = true } hmac = { workspace = true } +sha1 = { workspace = true } sha2 = { workspace = true } hex-simd = { workspace = true } path-clean = { workspace = true } @@ -98,6 +99,7 @@ rustfs-filemeta.workspace = true rustfs-utils = { workspace = true, features = ["full"] } rustfs-rio.workspace = true rustfs-signer.workspace = true +rustfs-checksums.workspace = true futures-util.workspace = true [target.'cfg(not(windows))'.dependencies] diff --git a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs index 29b2097f..e2848821 100644 --- a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs +++ b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs @@ -346,8 +346,12 @@ impl ExpiryState { } pub async fn worker(rx: &mut Receiver>, api: Arc) { + //let cancel_token = + // get_background_services_cancel_token().ok_or_else(|| Error::other("Background services not initialized"))?; + loop { select! { + //_ = cancel_token.cancelled() => { _ = tokio::signal::ctrl_c() => { info!("got ctrl+c, exits"); break; @@ -811,8 +815,8 @@ impl LifecycleOps for ObjectInfo { num_versions: self.num_versions, delete_marker: self.delete_marker, successor_mod_time: self.successor_mod_time, - //restore_ongoing: self.restore_ongoing, - //restore_expires: self.restore_expires, + restore_ongoing: self.restore_ongoing, + restore_expires: self.restore_expires, transition_status: self.transitioned_object.status.clone(), ..Default::default() } diff --git a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs index 3d232363..22caf860 100644 --- a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs +++ b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs @@ -132,7 +132,7 @@ pub trait Lifecycle { async fn has_transition(&self) -> bool; fn has_expiry(&self) -> bool; async fn has_active_rules(&self, prefix: &str) -> bool; - async fn validate(&self, lr_retention: bool) -> Result<(), std::io::Error>; + async fn validate(&self, lr: &ObjectLockConfiguration) -> Result<(), std::io::Error>; async fn filter_rules(&self, obj: &ObjectOpts) -> Option>; async fn eval(&self, obj: &ObjectOpts) -> Event; async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event; @@ -213,7 +213,7 @@ impl Lifecycle for BucketLifecycleConfiguration { false } - async fn validate(&self, lr_retention: bool) -> Result<(), std::io::Error> { + async fn validate(&self, lr: &ObjectLockConfiguration) -> Result<(), std::io::Error> { if self.rules.len() > 1000 { return Err(std::io::Error::other(ERR_LIFECYCLE_TOO_MANY_RULES)); } @@ -223,13 +223,15 @@ impl Lifecycle for BucketLifecycleConfiguration { for r in &self.rules { r.validate()?; - if let Some(expiration) = r.expiration.as_ref() { - if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker { - if lr_retention && (expired_object_delete_marker) { - return Err(std::io::Error::other(ERR_LIFECYCLE_BUCKET_LOCKED)); + /*if let Some(object_lock_enabled) = lr.object_lock_enabled.as_ref() { + if let Some(expiration) = r.expiration.as_ref() { + if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker { + if object_lock_enabled.as_str() == ObjectLockEnabled::ENABLED && (expired_object_delete_marker) { + return Err(std::io::Error::other(ERR_LIFECYCLE_BUCKET_LOCKED)); + } } - } - } + } + }*/ } for (i, _) in self.rules.iter().enumerate() { if i == self.rules.len() - 1 { @@ -600,7 +602,7 @@ pub fn expected_expiry_time(mod_time: OffsetDateTime, days: i32) -> OffsetDateTi } let t = mod_time .to_offset(offset!(-0:00:00)) - .saturating_add(Duration::days(0 /*days as i64*/)); //debug + .saturating_add(Duration::days(days as i64)); let mut hour = 3600; if let Ok(env_ilm_hour) = env::var("_RUSTFS_ILM_HOUR") { if let Ok(num_hour) = env_ilm_hour.parse::() { diff --git a/crates/ecstore/src/checksum.rs b/crates/ecstore/src/checksum.rs index e338b6b1..dd8be1e6 100644 --- a/crates/ecstore/src/checksum.rs +++ b/crates/ecstore/src/checksum.rs @@ -20,7 +20,7 @@ #![allow(clippy::all)] use lazy_static::lazy_static; -use rustfs_utils::HashAlgorithm; +use rustfs_checksums::ChecksumAlgorithm; use std::collections::HashMap; use crate::client::{api_put_object::PutObjectOptions, api_s3_datatypes::ObjectPart}; @@ -103,15 +103,34 @@ impl ChecksumMode { } pub fn can_composite(&self) -> bool { - todo!(); + let s = EnumSet::from(*self).intersection(*C_ChecksumMask); + match s.as_u8() { + 2_u8 => true, + 4_u8 => true, + 8_u8 => true, + 16_u8 => true, + _ => false, + } } pub fn can_merge_crc(&self) -> bool { - todo!(); + let s = EnumSet::from(*self).intersection(*C_ChecksumMask); + match s.as_u8() { + 8_u8 => true, + 16_u8 => true, + 32_u8 => true, + _ => false, + } } pub fn full_object_requested(&self) -> bool { - todo!(); + let s = EnumSet::from(*self).intersection(*C_ChecksumMask); + match s.as_u8() { + //C_ChecksumFullObjectCRC32 as u8 => true, + //C_ChecksumFullObjectCRC32C as u8 => true, + 32_u8 => true, + _ => false, + } } pub fn key_capitalized(&self) -> String { @@ -123,33 +142,35 @@ impl ChecksumMode { if u == ChecksumMode::ChecksumCRC32 as u8 || u == ChecksumMode::ChecksumCRC32C as u8 { 4 } else if u == ChecksumMode::ChecksumSHA1 as u8 { - 4 //sha1.size + use sha1::Digest; + sha1::Sha1::output_size() as usize } else if u == ChecksumMode::ChecksumSHA256 as u8 { - 4 //sha256.size + use sha2::Digest; + sha2::Sha256::output_size() as usize } else if u == ChecksumMode::ChecksumCRC64NVME as u8 { - 4 //crc64.size + 8 } else { 0 } } - pub fn hasher(&self) -> Result { + pub fn hasher(&self) -> Result, std::io::Error> { match /*C_ChecksumMask & **/self { - /*ChecksumMode::ChecksumCRC32 => { - return Ok(Box::new(crc32fast::Hasher::new())); - }*/ - /*ChecksumMode::ChecksumCRC32C => { - return Ok(Box::new(crc32::new(crc32.MakeTable(crc32.Castagnoli)))); + ChecksumMode::ChecksumCRC32 => { + return Ok(ChecksumAlgorithm::Crc32.into_impl()); + } + ChecksumMode::ChecksumCRC32C => { + return Ok(ChecksumAlgorithm::Crc32c.into_impl()); } ChecksumMode::ChecksumSHA1 => { - return Ok(Box::new(sha1::new())); - }*/ - ChecksumMode::ChecksumSHA256 => { - return Ok(HashAlgorithm::SHA256); + return Ok(ChecksumAlgorithm::Sha1.into_impl()); + } + ChecksumMode::ChecksumSHA256 => { + return Ok(ChecksumAlgorithm::Sha256.into_impl()); + } + ChecksumMode::ChecksumCRC64NVME => { + return Ok(ChecksumAlgorithm::Crc64Nvme.into_impl()); } - /*ChecksumMode::ChecksumCRC64NVME => { - return Ok(Box::new(crc64nvme.New()); - }*/ _ => return Err(std::io::Error::other("unsupported checksum type")), } } @@ -170,7 +191,8 @@ impl ChecksumMode { return Ok("".to_string()); } let mut h = self.hasher()?; - let hash = h.hash_encode(b); + h.update(b); + let hash = h.finalize(); Ok(base64_encode(hash.as_ref())) } @@ -227,7 +249,8 @@ impl ChecksumMode { let c = self.base(); let crc_bytes = Vec::::with_capacity(p.len() * self.raw_byte_len() as usize); let mut h = self.hasher()?; - let hash = h.hash_encode(crc_bytes.as_ref()); + h.update(crc_bytes.as_ref()); + let hash = h.finalize(); Ok(Checksum { checksum_type: self.clone(), r: hash.as_ref().to_vec(), diff --git a/crates/ecstore/src/client/api_bucket_policy.rs b/crates/ecstore/src/client/api_bucket_policy.rs index 8ed9c606..9512e911 100644 --- a/crates/ecstore/src/client/api_bucket_policy.rs +++ b/crates/ecstore/src/client/api_bucket_policy.rs @@ -63,7 +63,7 @@ impl TransitionClient { //defer closeResponse(resp) //if resp != nil { if resp.status() != StatusCode::NO_CONTENT && resp.status() != StatusCode::OK { - return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, ""))); + return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, ""))); } //} Ok(()) @@ -98,7 +98,7 @@ impl TransitionClient { //defer closeResponse(resp) if resp.status() != StatusCode::NO_CONTENT { - return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, ""))); + return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, ""))); } Ok(()) diff --git a/crates/ecstore/src/client/api_error_response.rs b/crates/ecstore/src/client/api_error_response.rs index 31ca0645..402c34df 100644 --- a/crates/ecstore/src/client/api_error_response.rs +++ b/crates/ecstore/src/client/api_error_response.rs @@ -95,13 +95,13 @@ pub fn to_error_response(err: &std::io::Error) -> ErrorResponse { } pub fn http_resp_to_error_response( - resp: http::Response, + resp: &http::Response, b: Vec, bucket_name: &str, object_name: &str, ) -> ErrorResponse { let err_body = String::from_utf8(b).unwrap(); - let err_resp_ = serde_xml_rs::from_str::(&err_body); + let err_resp_ = quick_xml::de::from_str::(&err_body); let mut err_resp = ErrorResponse::default(); if err_resp_.is_err() { match resp.status() { diff --git a/crates/ecstore/src/client/api_get_object_acl.rs b/crates/ecstore/src/client/api_get_object_acl.rs index 0ee14a42..1e811512 100644 --- a/crates/ecstore/src/client/api_get_object_acl.rs +++ b/crates/ecstore/src/client/api_get_object_acl.rs @@ -87,11 +87,11 @@ impl TransitionClient { if resp.status() != http::StatusCode::OK { let b = resp.body().bytes().expect("err").to_vec(); - return Err(std::io::Error::other(http_resp_to_error_response(resp, b, bucket_name, object_name))); + return Err(std::io::Error::other(http_resp_to_error_response(&resp, b, bucket_name, object_name))); } let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec(); - let mut res = match serde_xml_rs::from_str::(&String::from_utf8(b).unwrap()) { + let mut res = match quick_xml::de::from_str::(&String::from_utf8(b).unwrap()) { Ok(result) => result, Err(err) => { return Err(std::io::Error::other(err.to_string())); diff --git a/crates/ecstore/src/client/api_get_object_attributes.rs b/crates/ecstore/src/client/api_get_object_attributes.rs index f236118d..fd8015ad 100644 --- a/crates/ecstore/src/client/api_get_object_attributes.rs +++ b/crates/ecstore/src/client/api_get_object_attributes.rs @@ -144,7 +144,7 @@ impl ObjectAttributes { self.version_id = h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap().to_string(); let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec(); - let mut response = match serde_xml_rs::from_str::(&String::from_utf8(b).unwrap()) { + let mut response = match quick_xml::de::from_str::(&String::from_utf8(b).unwrap()) { Ok(result) => result, Err(err) => { return Err(std::io::Error::other(err.to_string())); @@ -226,7 +226,7 @@ impl TransitionClient { if resp.status() != http::StatusCode::OK { let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec(); let err_body = String::from_utf8(b).unwrap(); - let mut er = match serde_xml_rs::from_str::(&err_body) { + let mut er = match quick_xml::de::from_str::(&err_body) { Ok(result) => result, Err(err) => { return Err(std::io::Error::other(err.to_string())); diff --git a/crates/ecstore/src/client/api_list.rs b/crates/ecstore/src/client/api_list.rs index f978f063..fdbffc68 100644 --- a/crates/ecstore/src/client/api_list.rs +++ b/crates/ecstore/src/client/api_list.rs @@ -98,12 +98,12 @@ impl TransitionClient { ) .await?; if resp.status() != StatusCode::OK { - return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, ""))); + return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, ""))); } //let mut list_bucket_result = ListBucketV2Result::default(); let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec(); - let mut list_bucket_result = match serde_xml_rs::from_str::(&String::from_utf8(b).unwrap()) { + let mut list_bucket_result = match quick_xml::de::from_str::(&String::from_utf8(b).unwrap()) { Ok(result) => result, Err(err) => { return Err(std::io::Error::other(err.to_string())); diff --git a/crates/ecstore/src/client/api_put_object.rs b/crates/ecstore/src/client/api_put_object.rs index 6ef3733c..19f38895 100644 --- a/crates/ecstore/src/client/api_put_object.rs +++ b/crates/ecstore/src/client/api_put_object.rs @@ -85,7 +85,7 @@ pub struct PutObjectOptions { pub expires: OffsetDateTime, pub mode: ObjectLockRetentionMode, pub retain_until_date: OffsetDateTime, - //pub server_side_encryption: encrypt.ServerSide, + //pub server_side_encryption: encrypt::ServerSide, pub num_threads: u64, pub storage_class: String, pub website_redirect_location: String, @@ -135,7 +135,7 @@ impl Default for PutObjectOptions { #[allow(dead_code)] impl PutObjectOptions { - fn set_match_tag(&mut self, etag: &str) { + fn set_match_etag(&mut self, etag: &str) { if etag == "*" { self.custom_header .insert("If-Match", HeaderValue::from_str("*").expect("err")); @@ -145,7 +145,7 @@ impl PutObjectOptions { } } - fn set_match_tag_except(&mut self, etag: &str) { + fn set_match_etag_except(&mut self, etag: &str) { if etag == "*" { self.custom_header .insert("If-None-Match", HeaderValue::from_str("*").expect("err")); @@ -366,7 +366,8 @@ impl TransitionClient { md5_base64 = base64_encode(hash.as_ref()); } else { let mut crc = opts.auto_checksum.hasher()?; - let csum = crc.hash_encode(&buf[..length]); + crc.update(&buf[..length]); + let csum = crc.finalize(); if let Ok(header_name) = HeaderName::from_bytes(opts.auto_checksum.key().as_bytes()) { custom_header.insert(header_name, base64_encode(csum.as_ref()).parse().expect("err")); diff --git a/crates/ecstore/src/client/api_put_object_common.rs b/crates/ecstore/src/client/api_put_object_common.rs index 0fbec8f4..6652d223 100644 --- a/crates/ecstore/src/client/api_put_object_common.rs +++ b/crates/ecstore/src/client/api_put_object_common.rs @@ -12,7 +12,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#![allow(unused_imports)] #![allow(unused_variables)] #![allow(unused_mut)] #![allow(unused_assignments)] diff --git a/crates/ecstore/src/client/api_put_object_multipart.rs b/crates/ecstore/src/client/api_put_object_multipart.rs index 67854c72..84ba5810 100644 --- a/crates/ecstore/src/client/api_put_object_multipart.rs +++ b/crates/ecstore/src/client/api_put_object_multipart.rs @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#![allow(unused_imports)] #![allow(unused_variables)] #![allow(unused_mut)] #![allow(unused_assignments)] @@ -19,20 +18,14 @@ #![allow(clippy::all)] use bytes::Bytes; -use http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; +use http::{HeaderMap, HeaderName, StatusCode}; use s3s::S3ErrorCode; -use std::io::Read; -use std::{collections::HashMap, sync::Arc}; -use time::{OffsetDateTime, format_description}; -use tokio_util::sync::CancellationToken; +use std::collections::HashMap; +use time::OffsetDateTime; use tracing::warn; -use tracing::{error, info}; -use url::form_urlencoded::Serializer; use uuid::Uuid; -use s3s::header::{X_AMZ_EXPIRATION, X_AMZ_VERSION_ID}; -use s3s::{Body, dto::StreamingBlob}; -//use crate::disk::{Reader, BufferReader}; +use crate::checksum::ChecksumMode; use crate::client::{ api_error_response::{ err_entity_too_large, err_entity_too_small, err_invalid_argument, http_resp_to_error_response, to_error_response, @@ -42,15 +35,11 @@ use crate::client::{ api_s3_datatypes::{ CompleteMultipartUpload, CompleteMultipartUploadResult, CompletePart, InitiateMultipartUploadResult, ObjectPart, }, - constants::{ABS_MIN_PART_SIZE, ISO8601_DATEFORMAT, MAX_PART_SIZE, MAX_SINGLE_PUT_OBJECT_SIZE}, + constants::{ISO8601_DATEFORMAT, MAX_PART_SIZE, MAX_SINGLE_PUT_OBJECT_SIZE}, transition_api::{ReaderImpl, RequestMetadata, TransitionClient, UploadInfo}, }; -use crate::{ - checksum::ChecksumMode, - disk::DiskAPI, - store_api::{GetObjectReader, StorageAPI}, -}; use rustfs_utils::{crypto::base64_encode, path::trim_etag}; +use s3s::header::{X_AMZ_EXPIRATION, X_AMZ_VERSION_ID}; impl TransitionClient { pub async fn put_object_multipart( @@ -133,7 +122,8 @@ impl TransitionClient { //} if hash_sums.len() == 0 { let mut crc = opts.auto_checksum.hasher()?; - let csum = crc.hash_encode(&buf[..length]); + crc.update(&buf[..length]); + let csum = crc.finalize(); if let Ok(header_name) = HeaderName::from_bytes(opts.auto_checksum.key().as_bytes()) { custom_header.insert(header_name, base64_encode(csum.as_ref()).parse().expect("err")); @@ -236,7 +226,12 @@ impl TransitionClient { let resp = self.execute_method(http::Method::POST, &mut req_metadata).await?; //if resp.is_none() { if resp.status() != StatusCode::OK { - return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, object_name))); + return Err(std::io::Error::other(http_resp_to_error_response( + &resp, + vec![], + bucket_name, + object_name, + ))); } //} let initiate_multipart_upload_result = InitiateMultipartUploadResult::default(); @@ -293,7 +288,7 @@ impl TransitionClient { let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?; if resp.status() != StatusCode::OK { return Err(std::io::Error::other(http_resp_to_error_response( - resp, + &resp, vec![], &p.bucket_name.clone(), &p.object_name, diff --git a/crates/ecstore/src/client/api_put_object_streaming.rs b/crates/ecstore/src/client/api_put_object_streaming.rs index 2e56dcac..ca985eca 100644 --- a/crates/ecstore/src/client/api_put_object_streaming.rs +++ b/crates/ecstore/src/client/api_put_object_streaming.rs @@ -156,7 +156,8 @@ impl TransitionClient { md5_base64 = base64_encode(hash.as_ref()); } else { let mut crc = opts.auto_checksum.hasher()?; - let csum = crc.hash_encode(&buf[..length]); + crc.update(&buf[..length]); + let csum = crc.finalize(); if let Ok(header_name) = HeaderName::from_bytes(opts.auto_checksum.key().as_bytes()) { custom_header.insert(header_name, base64_encode(csum.as_ref()).parse().expect("err")); @@ -303,7 +304,8 @@ impl TransitionClient { let mut custom_header = HeaderMap::new(); if !opts.send_content_md5 { let mut crc = opts.auto_checksum.hasher()?; - let csum = crc.hash_encode(&buf[..length]); + crc.update(&buf[..length]); + let csum = crc.finalize(); if let Ok(header_name) = HeaderName::from_bytes(opts.auto_checksum.key().as_bytes()) { custom_header.insert(header_name, base64_encode(csum.as_ref()).parse().expect("err")); @@ -477,7 +479,12 @@ impl TransitionClient { let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?; if resp.status() != StatusCode::OK { - return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, object_name))); + return Err(std::io::Error::other(http_resp_to_error_response( + &resp, + vec![], + bucket_name, + object_name, + ))); } let (exp_time, rule_id) = if let Some(h_x_amz_expiration) = resp.headers().get(X_AMZ_EXPIRATION) { diff --git a/crates/ecstore/src/client/api_remove.rs b/crates/ecstore/src/client/api_remove.rs index a6845229..9cb67d86 100644 --- a/crates/ecstore/src/client/api_remove.rs +++ b/crates/ecstore/src/client/api_remove.rs @@ -425,7 +425,12 @@ impl TransitionClient { }; } _ => { - return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, object_name))); + return Err(std::io::Error::other(http_resp_to_error_response( + &resp, + vec![], + bucket_name, + object_name, + ))); } } return Err(std::io::Error::other(error_response)); diff --git a/crates/ecstore/src/client/api_restore.rs b/crates/ecstore/src/client/api_restore.rs index 9dc5fead..84b1ccb0 100644 --- a/crates/ecstore/src/client/api_restore.rs +++ b/crates/ecstore/src/client/api_restore.rs @@ -125,7 +125,7 @@ impl TransitionClient { version_id: &str, restore_req: &RestoreRequest, ) -> Result<(), std::io::Error> { - let restore_request = match serde_xml_rs::to_string(restore_req) { + let restore_request = match quick_xml::se::to_string(restore_req) { Ok(buf) => buf, Err(e) => { return Err(std::io::Error::other(e)); @@ -165,7 +165,7 @@ impl TransitionClient { let b = resp.body().bytes().expect("err").to_vec(); if resp.status() != http::StatusCode::ACCEPTED && resp.status() != http::StatusCode::OK { - return Err(std::io::Error::other(http_resp_to_error_response(resp, b, bucket_name, ""))); + return Err(std::io::Error::other(http_resp_to_error_response(&resp, b, bucket_name, ""))); } Ok(()) } diff --git a/crates/ecstore/src/client/api_s3_datatypes.rs b/crates/ecstore/src/client/api_s3_datatypes.rs index ba26325e..cd92b3ec 100644 --- a/crates/ecstore/src/client/api_s3_datatypes.rs +++ b/crates/ecstore/src/client/api_s3_datatypes.rs @@ -279,7 +279,7 @@ pub struct CompleteMultipartUpload { impl CompleteMultipartUpload { pub fn marshal_msg(&self) -> Result { //let buf = serde_json::to_string(self)?; - let buf = match serde_xml_rs::to_string(self) { + let buf = match quick_xml::se::to_string(self) { Ok(buf) => buf, Err(e) => { return Err(std::io::Error::other(e)); @@ -329,7 +329,7 @@ pub struct DeleteMultiObjects { impl DeleteMultiObjects { pub fn marshal_msg(&self) -> Result { //let buf = serde_json::to_string(self)?; - let buf = match serde_xml_rs::to_string(self) { + let buf = match quick_xml::se::to_string(self) { Ok(buf) => buf, Err(e) => { return Err(std::io::Error::other(e)); diff --git a/crates/ecstore/src/client/api_stat.rs b/crates/ecstore/src/client/api_stat.rs index 99eed21f..8a064558 100644 --- a/crates/ecstore/src/client/api_stat.rs +++ b/crates/ecstore/src/client/api_stat.rs @@ -59,7 +59,7 @@ impl TransitionClient { if let Ok(resp) = resp { let b = resp.body().bytes().expect("err").to_vec(); - let resperr = http_resp_to_error_response(resp, b, bucket_name, ""); + let resperr = http_resp_to_error_response(&resp, b, bucket_name, ""); /*if to_error_response(resperr).code == "NoSuchBucket" { return Ok(false); } diff --git a/crates/ecstore/src/client/bucket_cache.rs b/crates/ecstore/src/client/bucket_cache.rs index b26aedb9..2ddf860e 100644 --- a/crates/ecstore/src/client/bucket_cache.rs +++ b/crates/ecstore/src/client/bucket_cache.rs @@ -177,7 +177,7 @@ impl TransitionClient { async fn process_bucket_location_response(mut resp: http::Response, bucket_name: &str) -> Result { //if resp != nil { if resp.status() != StatusCode::OK { - let err_resp = http_resp_to_error_response(resp, vec![], bucket_name, ""); + let err_resp = http_resp_to_error_response(&resp, vec![], bucket_name, ""); match err_resp.code { S3ErrorCode::NotImplemented => { match err_resp.server.as_str() { @@ -208,7 +208,7 @@ async fn process_bucket_location_response(mut resp: http::Response, bucket //} let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec(); - let Document(location_constraint) = serde_xml_rs::from_str::(&String::from_utf8(b).unwrap()).unwrap(); + let Document(location_constraint) = quick_xml::de::from_str::(&String::from_utf8(b).unwrap()).unwrap(); let mut location = location_constraint; if location == "" { diff --git a/crates/ecstore/src/client/transition_api.rs b/crates/ecstore/src/client/transition_api.rs index a91a7a7d..fba6eda2 100644 --- a/crates/ecstore/src/client/transition_api.rs +++ b/crates/ecstore/src/client/transition_api.rs @@ -19,7 +19,7 @@ #![allow(clippy::all)] use bytes::Bytes; -use futures::Future; +use futures::{Future, StreamExt}; use http::{HeaderMap, HeaderName}; use http::{ HeaderValue, Response, StatusCode, @@ -65,7 +65,9 @@ use crate::{checksum::ChecksumMode, store_api::GetObjectReader}; use rustfs_rio::HashReader; use rustfs_utils::{ net::get_endpoint_url, - retry::{MAX_RETRY, new_retry_timer}, + retry::{ + DEFAULT_RETRY_CAP, DEFAULT_RETRY_UNIT, MAX_JITTER, MAX_RETRY, RetryTimer, is_http_status_retryable, is_s3code_retryable, + }, }; use s3s::S3ErrorCode; use s3s::dto::ReplicationStatus; @@ -186,6 +188,7 @@ impl TransitionClient { clnt.trailing_header_support = opts.trailing_headers && clnt.override_signer_type == SignatureType::SignatureV4; + clnt.max_retries = MAX_RETRY; if opts.max_retries > 0 { clnt.max_retries = opts.max_retries; } @@ -313,12 +316,9 @@ impl TransitionClient { } //} - //let mut retry_timer = RetryTimer::new(); - //while let Some(v) = retry_timer.next().await { - for _ in [1; 1] - /*new_retry_timer(req_retry, default_retry_unit, default_retry_cap, max_jitter)*/ - { - let req = self.new_request(method, metadata).await?; + let mut retry_timer = RetryTimer::new(req_retry, DEFAULT_RETRY_UNIT, DEFAULT_RETRY_CAP, MAX_JITTER, self.random); + while let Some(v) = retry_timer.next().await { + let req = self.new_request(&method, metadata).await?; resp = self.doit(req).await?; @@ -329,7 +329,7 @@ impl TransitionClient { } let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec(); - let err_response = http_resp_to_error_response(resp, b.clone(), &metadata.bucket_name, &metadata.object_name); + let err_response = http_resp_to_error_response(&resp, b.clone(), &metadata.bucket_name, &metadata.object_name); if self.region == "" { match err_response.code { @@ -360,6 +360,14 @@ impl TransitionClient { } } + if is_s3code_retryable(err_response.code.as_str()) { + continue; + } + + if is_http_status_retryable(&resp.status()) { + continue; + } + break; } @@ -368,7 +376,7 @@ impl TransitionClient { async fn new_request( &self, - method: http::Method, + method: &http::Method, metadata: &mut RequestMetadata, ) -> Result, std::io::Error> { let location = metadata.bucket_location.clone(); diff --git a/crates/ecstore/src/cmd/bucket_replication.rs b/crates/ecstore/src/cmd/bucket_replication.rs index 04465a22..05deb491 100644 --- a/crates/ecstore/src/cmd/bucket_replication.rs +++ b/crates/ecstore/src/cmd/bucket_replication.rs @@ -2014,6 +2014,8 @@ impl ReplicateObjectInfo { version_id: Uuid::try_parse(&self.version_id).ok(), delete_marker: self.delete_marker, transitioned_object: TransitionedObject::default(), + restore_ongoing: false, + restore_expires: Some(OffsetDateTime::now_utc()), user_tags: self.user_tags.clone(), parts: Vec::new(), is_latest: true, diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 4519d520..760ab2df 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -4056,11 +4056,9 @@ impl StorageAPI for SetDisks { return to_object_err(err, vec![bucket, object]); } }*/ - //let traceFn = GLOBAL_LifecycleSys.trace(fi.to_object_info(bucket, object, opts.Versioned || opts.VersionSuspended)); let dest_obj = gen_transition_objname(bucket); if let Err(err) = dest_obj { - //traceFn(ILMTransition, nil, err) return Err(to_object_err(err, vec![])); } let dest_obj = dest_obj.unwrap(); @@ -4068,8 +4066,6 @@ impl StorageAPI for SetDisks { let oi = ObjectInfo::from_file_info(&fi, bucket, object, opts.versioned || opts.version_suspended); let (pr, mut pw) = tokio::io::duplex(fi.erasure.block_size); - //let h = HeaderMap::new(); - //let reader = ReaderImpl::ObjectBody(GetObjectReader {stream: StreamingBlob::wrap(tokio_util::io::ReaderStream::new(pr)), object_info: oi}); let reader = ReaderImpl::ObjectBody(GetObjectReader { stream: Box::new(pr), object_info: oi, @@ -4106,9 +4102,7 @@ impl StorageAPI for SetDisks { m }) .await; - //pr.CloseWithError(err); if let Err(err) = rv { - //traceFn(ILMTransition, nil, err) return Err(StorageError::Io(err)); } let rv = rv.unwrap(); @@ -4172,7 +4166,6 @@ impl StorageAPI for SetDisks { //if err != nil { // return set_restore_header_fn(&mut oi, Some(toObjectErr(err, bucket, object))); //} - //defer gr.Close() let hash_reader = HashReader::new(gr, gr.obj_info.size, "", "", gr.obj_info.size); let p_reader = PutObjReader::new(StreamingBlob::from(Box::pin(hash_reader)), hash_reader.size()); if let Err(err) = self.put_object(bucket, object, &mut p_reader, &ropts).await { diff --git a/crates/ecstore/src/store_api.rs b/crates/ecstore/src/store_api.rs index 5a582a52..7373734f 100644 --- a/crates/ecstore/src/store_api.rs +++ b/crates/ecstore/src/store_api.rs @@ -387,6 +387,8 @@ pub struct ObjectInfo { pub version_id: Option, pub delete_marker: bool, pub transitioned_object: TransitionedObject, + pub restore_ongoing: bool, + pub restore_expires: Option, pub user_tags: String, pub parts: Vec, pub is_latest: bool, @@ -421,6 +423,8 @@ impl Clone for ObjectInfo { version_id: self.version_id, delete_marker: self.delete_marker, transitioned_object: self.transitioned_object.clone(), + restore_ongoing: self.restore_ongoing, + restore_expires: self.restore_expires, user_tags: self.user_tags.clone(), parts: self.parts.clone(), is_latest: self.is_latest, diff --git a/crates/utils/src/retry.rs b/crates/utils/src/retry.rs index ce07253a..d8d69e03 100644 --- a/crates/utils/src/retry.rs +++ b/crates/utils/src/retry.rs @@ -12,44 +12,39 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; +use futures::Stream; +use hyper::http; +use std::{ + pin::Pin, + sync::LazyLock, + task::{Context, Poll}, + time::Duration, +}; +use tokio::time::interval; pub const MAX_RETRY: i64 = 10; pub const MAX_JITTER: f64 = 1.0; pub const NO_JITTER: f64 = 0.0; -/* -struct Delay { - when: Instant, -} +pub const DEFAULT_RETRY_UNIT: Duration = Duration::from_millis(200); +pub const DEFAULT_RETRY_CAP: Duration = Duration::from_secs(1); -impl Future for Delay { - type Output = &'static str; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll<&'static str> - { - if Instant::now() >= self.when { - println!("Hello world"); - Poll::Ready("done") - } else { - // Ignore this line for now. - cx.waker().wake_by_ref(); - Poll::Pending - } - } -} - -struct RetryTimer { - rem: usize, - delay: Delay, +pub struct RetryTimer { + base_sleep: Duration, + max_sleep: Duration, + jitter: f64, + random: u64, + rem: i64, } impl RetryTimer { - fn new() -> Self { + pub fn new(max_retry: i64, base_sleep: Duration, max_sleep: Duration, jitter: f64, random: u64) -> Self { Self { - rem: 3, - delay: Delay { when: Instant::now() } + base_sleep, + max_sleep, + jitter, + random, + rem: max_retry, } } } @@ -57,26 +52,87 @@ impl RetryTimer { impl Stream for RetryTimer { type Item = (); - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll> - { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let jitter = self.jitter.clamp(NO_JITTER, MAX_JITTER); + + let attempt = MAX_RETRY - self.rem; + let mut sleep = self.base_sleep * (1 << attempt); + if sleep > self.max_sleep { + sleep = self.max_sleep; + } + if (jitter - NO_JITTER).abs() > 1e-9 { + sleep -= sleep * self.random as u32 * jitter as u32; + } + if self.rem == 0 { - // No more delays return Poll::Ready(None); } - match Pin::new(&mut self.delay).poll(cx) { - Poll::Ready(_) => { - let when = self.delay.when + Duration::from_millis(10); - self.delay = Delay { when }; - self.rem -= 1; - Poll::Ready(Some(())) - } + self.rem -= 1; + let mut t = interval(sleep); + match t.poll_tick(cx) { + Poll::Ready(_) => Poll::Ready(Some(())), Poll::Pending => Poll::Pending, } } -}*/ +} -pub fn new_retry_timer(_max_retry: i32, _base_sleep: Duration, _max_sleep: Duration, _jitter: f64) -> Vec { +static RETRYABLE_S3CODES: LazyLock> = LazyLock::new(|| { + vec![ + "RequestError".to_string(), + "RequestTimeout".to_string(), + "Throttling".to_string(), + "ThrottlingException".to_string(), + "RequestLimitExceeded".to_string(), + "RequestThrottled".to_string(), + "InternalError".to_string(), + "ExpiredToken".to_string(), + "ExpiredTokenException".to_string(), + "SlowDown".to_string(), + ] +}); + +static RETRYABLE_HTTP_STATUSCODES: LazyLock> = LazyLock::new(|| { + vec![ + http::StatusCode::REQUEST_TIMEOUT, + http::StatusCode::TOO_MANY_REQUESTS, + //499, + http::StatusCode::INTERNAL_SERVER_ERROR, + http::StatusCode::BAD_GATEWAY, + http::StatusCode::SERVICE_UNAVAILABLE, + http::StatusCode::GATEWAY_TIMEOUT, + //520, + ] +}); + +pub fn is_s3code_retryable(s3code: &str) -> bool { + RETRYABLE_S3CODES.contains(&s3code.to_string()) +} + +pub fn is_http_status_retryable(http_statuscode: &http::StatusCode) -> bool { + RETRYABLE_HTTP_STATUSCODES.contains(http_statuscode) +} + +pub fn is_request_error_retryable(_err: std::io::Error) -> bool { + /*if err == Err::Canceled || err == Err::DeadlineExceeded { + return err() == nil; + } + let uerr = err.(*url.Error); + if uerr.is_ok() { + let e = uerr.unwrap(); + return match e.type { + x509.UnknownAuthorityError => { + false + } + _ => true, + }; + return match e.error() { + "http: server gave HTTP response to HTTPS client" => { + false + } + _ => rue, + }; + } + true*/ todo!(); } diff --git a/crates/utils/src/sys/user_agent.rs b/crates/utils/src/sys/user_agent.rs index e50b6f65..fb549ad0 100644 --- a/crates/utils/src/sys/user_agent.rs +++ b/crates/utils/src/sys/user_agent.rs @@ -88,7 +88,7 @@ impl UserAgent { Some(version) => version, None => "Windows NT Unknown".to_string(), }; - format!("Windows NT {}", version) + format!("Windows NT {version}") } #[cfg(not(windows))] diff --git a/rustfs/src/admin/handlers/tier.rs b/rustfs/src/admin/handlers/tier.rs index d0c40246..e467e443 100644 --- a/rustfs/src/admin/handlers/tier.rs +++ b/rustfs/src/admin/handlers/tier.rs @@ -465,182 +465,3 @@ impl Operation for ClearTier { Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) } } - -/*pub struct PostRestoreObject {} -#[async_trait::async_trait] -impl Operation for PostRestoreObject { - async fn call(&self, req: S3Request, params: Params<'_, '_>) -> S3Result> { - let query = { - if let Some(query) = req.uri.query() { - let input: PostRestoreObject = - from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?; - input - } else { - PostRestoreObject::default() - } - }; - - let bucket = params.bucket; - if let Err(e) = un_escape_path(params.object) { - warn!("post restore object failed, e: {:?}", e); - return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed")); - } - - let Some(store) = new_object_layer_fn() else { - return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); - }; - - let get_object_info = store.get_object_info(); - - if Err(err) = check_request_auth_type(req, policy::RestoreObjectAction, bucket, object) { - return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed")); - } - - if req.content_length <= 0 { - return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); - } - let Some(opts) = post_restore_opts(req, bucket, object) else { - return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); - }; - - let Some(obj_info) = getObjectInfo(ctx, bucket, object, opts) else { - return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); - }; - - if obj_info.transitioned_object.status != lifecycle::TRANSITION_COMPLETE { - return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); - } - - let mut api_err; - let Some(rreq) = parsere_store_request(req.body(), req.content_length) else { - let api_err = errorCodes.ToAPIErr(ErrMalformedXML); - api_err.description = err.Error() - return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); - }; - let mut status_code = http::StatusCode::OK; - let mut already_restored = false; - if Err(err) = rreq.validate(store) { - api_err = errorCodes.ToAPIErr(ErrMalformedXML) - api_err.description = err.Error() - return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); - } else { - if obj_info.restore_ongoing && rreq.Type != "SELECT" { - return Err(S3Error::with_message(S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()), "post restore object failed")); - } - if !obj_info.restore_ongoing && !obj_info.restore_expires.unix_timestamp() == 0 { - status_code = http::StatusCode::Accepted; - already_restored = true; - } - } - let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), rreq.days); - let mut metadata = clone_mss(obj_info.user_defined); - - if rreq.type != "SELECT" { - obj_info.metadataOnly = true; - metadata[xhttp.AmzRestoreExpiryDays] = rreq.days; - metadata[xhttp.AmzRestoreRequestDate] = OffsetDateTime::now_utc().format(http::TimeFormat); - if already_restored { - metadata[xhttp.AmzRestore] = completedRestoreObj(restore_expiry).String() - } else { - metadata[xhttp.AmzRestore] = ongoingRestoreObj().String() - } - obj_info.user_defined = metadata; - if let Err(err) = store.copy_object(bucket, object, bucket, object, obj_info, ObjectOptions { - version_id: obj_info.version_id, - }, ObjectOptions { - version_id: obj_info.version_id, - m_time: obj_info.mod_time, - }) { - return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed")); - } - if already_restored { - return Ok(()); - } - } - - let restore_object = must_get_uuid(); - if rreq.output_location.s3.bucket_name != "" { - w.Header()[xhttp.AmzRestoreOutputPath] = []string{pathJoin(rreq.OutputLocation.S3.BucketName, rreq.OutputLocation.S3.Prefix, restoreObject)} - } - w.WriteHeader(status_code) - send_event(EventArgs { - event_name: event::ObjectRestorePost, - bucket_name: bucket, - object: obj_info, - req_params: extract_req_params(r), - user_agent: req.user_agent(), - host: handlers::get_source_ip(r), - }); - tokio::spawn(async move { - if !rreq.SelectParameters.IsEmpty() { - let actual_size = obj_info.get_actual_size(); - if actual_size.is_err() { - return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed")); - } - - let object_rsc = s3select.NewObjectReadSeekCloser( - |offset int64| -> (io.ReadCloser, error) { - rs := &HTTPRangeSpec{ - IsSuffixLength: false, - Start: offset, - End: -1, - } - return getTransitionedObjectReader(bucket, object, rs, r.Header, - obj_info, ObjectOptions {version_id: obj_info.version_id}); - }, - actual_size.unwrap(), - ); - if err = rreq.SelectParameters.Open(objectRSC); err != nil { - if serr, ok := err.(s3select.SelectError); ok { - let encoded_error_response = encodeResponse(APIErrorResponse { - code: serr.ErrorCode(), - message: serr.ErrorMessage(), - bucket_name: bucket, - key: object, - resource: r.URL.Path, - request_id: w.Header().Get(xhttp.AmzRequestID), - host_id: globalDeploymentID(), - }); - //writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML) - Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)); - } else { - return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed")); - } - return Ok(()); - } - let nr = httptest.NewRecorder(); - let rw = xhttp.NewResponseRecorder(nr); - rw.log_err_body = true; - rw.log_all_body = true; - rreq.select_parameters.evaluate(rw); - rreq.select_parameters.Close(); - return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)); - } - let opts = ObjectOptions { - transition: TransitionOptions { - restore_request: rreq, - restore_expiry: restore_expiry, - }, - version_id: objInfo.version_id, - } - if Err(err) = store.restore_transitioned_object(bucket, object, opts) { - format!(format!("unable to restore transitioned bucket/object {}/{}: {}", bucket, object, err.to_string())); - return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)); - } - - send_event(EventArgs { - EventName: event.ObjectRestoreCompleted, - BucketName: bucket, - Object: objInfo, - ReqParams: extractReqParams(r), - UserAgent: r.UserAgent(), - Host: handlers.GetSourceIP(r), - }); - }); - - let mut header = HeaderMap::new(); - header.insert(CONTENT_TYPE, "application/json".parse().unwrap()); - header.insert(CONTENT_LENGTH, "0".parse().unwrap()); - Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)) - } -}*/ diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index 06500864..8281efe8 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -486,6 +486,167 @@ impl S3 for FS { Ok(S3Response::new(output)) } + async fn restore_object(&self, _req: S3Request) -> S3Result> { + Err(s3_error!(NotImplemented, "RestoreObject is not implemented yet")) + /* + let bucket = params.bucket; + if let Err(e) = un_escape_path(params.object) { + warn!("post restore object failed, e: {:?}", e); + return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed")); + } + + let Some(store) = new_object_layer_fn() else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); + }; + + if Err(err) = check_request_auth_type(req, policy::RestoreObjectAction, bucket, object) { + return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed")); + } + + if req.content_length <= 0 { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); + } + let Some(opts) = post_restore_opts(req, bucket, object) else { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); + }; + + let Some(obj_info) = store.get_object_info(bucket, object, opts) else { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); + }; + + if obj_info.transitioned_object.status != lifecycle::TRANSITION_COMPLETE { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); + } + + let mut api_err; + let Some(rreq) = parse_restore_request(req.body(), req.content_length) else { + let api_err = errorCodes.ToAPIErr(ErrMalformedXML); + api_err.description = err.Error() + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); + }; + let mut status_code = http::StatusCode::OK; + let mut already_restored = false; + if Err(err) = rreq.validate(store) { + api_err = errorCodes.ToAPIErr(ErrMalformedXML) + api_err.description = err.Error() + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed")); + } else { + if obj_info.restore_ongoing && rreq.Type != "SELECT" { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()), "post restore object failed")); + } + if !obj_info.restore_ongoing && !obj_info.restore_expires.unix_timestamp() == 0 { + status_code = http::StatusCode::Accepted; + already_restored = true; + } + } + let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), rreq.days); + let mut metadata = clone_mss(obj_info.user_defined); + + if rreq.type != "SELECT" { + obj_info.metadataOnly = true; + metadata[xhttp.AmzRestoreExpiryDays] = rreq.days; + metadata[xhttp.AmzRestoreRequestDate] = OffsetDateTime::now_utc().format(http::TimeFormat); + if already_restored { + metadata[AmzRestore] = completed_restore_obj(restore_expiry).String() + } else { + metadata[AmzRestore] = ongoing_restore_obj().to_string() + } + obj_info.user_defined = metadata; + if let Err(err) = store.copy_object(bucket, object, bucket, object, obj_info, ObjectOptions { + version_id: obj_info.version_id, + }, ObjectOptions { + version_id: obj_info.version_id, + m_time: obj_info.mod_time, + }) { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed")); + } + if already_restored { + return Ok(()); + } + } + + let restore_object = must_get_uuid(); + if rreq.output_location.s3.bucket_name != "" { + w.Header()[AmzRestoreOutputPath] = []string{pathJoin(rreq.OutputLocation.S3.BucketName, rreq.OutputLocation.S3.Prefix, restore_object)} + } + w.WriteHeader(status_code) + send_event(EventArgs { + event_name: event::ObjectRestorePost, + bucket_name: bucket, + object: obj_info, + req_params: extract_req_params(r), + user_agent: req.user_agent(), + host: handlers::get_source_ip(r), + }); + tokio::spawn(async move { + if !rreq.SelectParameters.IsEmpty() { + let actual_size = obj_info.get_actual_size(); + if actual_size.is_err() { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed")); + } + + let object_rsc = s3select.NewObjectReadSeekCloser( + |offset int64| -> (io.ReadCloser, error) { + rs := &HTTPRangeSpec{ + IsSuffixLength: false, + Start: offset, + End: -1, + } + return getTransitionedObjectReader(bucket, object, rs, r.Header, + obj_info, ObjectOptions {version_id: obj_info.version_id}); + }, + actual_size.unwrap(), + ); + if err = rreq.SelectParameters.Open(objectRSC); err != nil { + if serr, ok := err.(s3select.SelectError); ok { + let encoded_error_response = encodeResponse(APIErrorResponse { + code: serr.ErrorCode(), + message: serr.ErrorMessage(), + bucket_name: bucket, + key: object, + resource: r.URL.Path, + request_id: w.Header().Get(xhttp.AmzRequestID), + host_id: globalDeploymentID(), + }); + //writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML) + Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)); + } else { + return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed")); + } + return Ok(()); + } + let nr = httptest.NewRecorder(); + let rw = xhttp.NewResponseRecorder(nr); + rw.log_err_body = true; + rw.log_all_body = true; + rreq.select_parameters.evaluate(rw); + rreq.select_parameters.Close(); + return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)); + } + let opts = ObjectOptions { + transition: TransitionOptions { + restore_request: rreq, + restore_expiry: restore_expiry, + }, + version_id: objInfo.version_id, + } + if Err(err) = store.restore_transitioned_object(bucket, object, opts) { + format!(format!("unable to restore transitioned bucket/object {}/{}: {}", bucket, object, err.to_string())); + return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header)); + } + + send_event(EventArgs { + EventName: event.ObjectRestoreCompleted, + BucketName: bucket, + Object: objInfo, + ReqParams: extractReqParams(r), + UserAgent: r.UserAgent(), + Host: handlers.GetSourceIP(r), + }); + }); + */ + } + /// Delete a bucket #[tracing::instrument(level = "debug", skip(self, req))] async fn delete_bucket(&self, req: S3Request) -> S3Result> { @@ -2097,27 +2258,14 @@ impl S3 for FS { .. } = req.input; - let lr_retention = false; - /*let rcfg = metadata_sys::get_object_lock_config(&bucket).await; - if let Ok(rcfg) = rcfg { - if let Some(rule) = rcfg.0.rule { - if let Some(retention) = rule.default_retention { - if let Some(mode) = retention.mode { - //if mode == ObjectLockRetentionMode::from_static(ObjectLockRetentionMode::GOVERNANCE) { - lr_retention = true; - //} - } - } - } - }*/ - - //info!("lifecycle_configuration: {:?}", &lifecycle_configuration); - let Some(input_cfg) = lifecycle_configuration else { return Err(s3_error!(InvalidArgument)) }; - if let Err(err) = input_cfg.validate(lr_retention).await { - //return Err(S3Error::with_message(S3ErrorCode::Custom("BucketLockValidateFailed".into()), "bucket lock validate failed.")); - return Err(S3Error::with_message(S3ErrorCode::Custom("ValidateFailed".into()), err.to_string())); + let rcfg = metadata_sys::get_object_lock_config(&bucket).await; + if let Ok(rcfg) = rcfg { + if let Err(err) = input_cfg.validate(&rcfg.0).await { + //return Err(S3Error::with_message(S3ErrorCode::Custom("BucketLockValidateFailed".into()), err.to_string())); + return Err(S3Error::with_message(S3ErrorCode::Custom("ValidateFailed".into()), err.to_string())); + } } if let Err(err) = validate_transition_tier(&input_cfg).await {