Feature up/ilm (#305)

* fix

* fix

* fix

* fix delete-marker expiration. add api_restore.

* fix

* time retry object upload

* lock file

* make fmt

* fix

* restore object

* fix

* fix

* serde-rs-xml -> quick-xml

* fix

* checksum

* fix

* fix

* fix

* fix

* fix

* fix

* fix
This commit is contained in:
likewu
2025-07-29 14:21:19 +08:00
committed by GitHub
parent 56fd8132e9
commit d56cee26db
35 changed files with 1281 additions and 374 deletions

124
Cargo.lock generated
View File

@@ -764,9 +764,9 @@ dependencies = [
[[package]]
name = "aws-sdk-s3"
version = "1.99.0"
version = "1.100.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2d64d68c93000d5792b2a25fbeaafb90985fa80a1c8adfe93f24fb271296f5f"
checksum = "8c5eafbdcd898114b839ba68ac628e31c4cfc3e11dfca38dc1b2de2f35bb6270"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -798,9 +798,9 @@ dependencies = [
[[package]]
name = "aws-sdk-sso"
version = "1.77.0"
version = "1.78.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18f2f37fea82468fe3f5a059542c05392ef680c4f7f00e0db02df8b6e5c7d0c6"
checksum = "dbd7bc4bd34303733bded362c4c997a39130eac4310257c79aae8484b1c4b724"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -820,9 +820,9 @@ dependencies = [
[[package]]
name = "aws-sdk-ssooidc"
version = "1.78.0"
version = "1.79.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecb4f6eada20e0193450cd48b12ed05e1e66baac86f39160191651b932f2b7d9"
checksum = "77358d25f781bb106c1a69531231d4fd12c6be904edb0c47198c604df5a2dbca"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -842,9 +842,9 @@ dependencies = [
[[package]]
name = "aws-sdk-sts"
version = "1.79.0"
version = "1.80.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "317377afba3498fca4948c5d32b399ef9a5ad35561a1e8a6f2ac7273dabf802d"
checksum = "06e3ed2a9b828ae7763ddaed41d51724d2661a50c45f845b08967e52f4939cfc"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -1037,9 +1037,9 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime-api"
version = "1.8.4"
version = "1.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38280ac228bc479f347fcfccf4bf4d22d68f3bb4629685cb591cabd856567bbc"
checksum = "937a49ecf061895fca4a6dd8e864208ed9be7546c0527d04bc07d502ec5fba1c"
dependencies = [
"aws-smithy-async",
"aws-smithy-types",
@@ -2991,6 +2991,12 @@ dependencies = [
"syn 2.0.104",
]
[[package]]
name = "diff"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8"
[[package]]
name = "digest"
version = "0.10.7"
@@ -5476,7 +5482,7 @@ checksum = "4488594b9328dee448adb906d8b126d9b7deb7cf5c22161ee591610bb1be83c0"
dependencies = [
"bitflags 2.9.1",
"libc",
"redox_syscall 0.5.15",
"redox_syscall 0.5.16",
]
[[package]]
@@ -6685,7 +6691,7 @@ checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.5.15",
"redox_syscall 0.5.16",
"smallvec",
"windows-targets 0.52.6",
]
@@ -7142,10 +7148,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
[[package]]
name = "prettyplease"
version = "0.2.35"
name = "pretty_assertions"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "061c1221631e079b26479d25bbf2275bfe5917ae8419cd7e34f13bfc2aa7539a"
checksum = "3ae130e2f271fbc2ac3a40fb1d07180839cdbbe443c7a27e1e3c13c5cac0116d"
dependencies = [
"diff",
"yansi",
]
[[package]]
name = "prettyplease"
version = "0.2.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff24dfcda44452b9816fff4cd4227e1bb73ff5a2f1bc1105aa92fb8565ce44d2"
dependencies = [
"proc-macro2",
"syn 2.0.104",
@@ -7591,9 +7607,9 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.5.15"
version = "0.5.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e8af0dde094006011e6a740d4879319439489813bd0bcdc7d821beaeeff48ec"
checksum = "7251471db004e509f4e75a62cca9435365b5ec7bcdff530d612ac7c87c44a792"
dependencies = [
"bitflags 2.9.1",
]
@@ -8077,6 +8093,27 @@ dependencies = [
"serde_json",
]
[[package]]
name = "rustfs-checksums"
version = "0.0.5"
dependencies = [
"base64-simd",
"bytes",
"bytes-utils",
"crc-fast",
"hex",
"http 1.3.1",
"http-body 1.0.1",
"md-5",
"pin-project-lite",
"pretty_assertions",
"sha1 0.10.6",
"sha2 0.10.9",
"tokio",
"tracing",
"tracing-test",
]
[[package]]
name = "rustfs-common"
version = "0.0.5"
@@ -8154,12 +8191,14 @@ dependencies = [
"path-absolutize",
"path-clean",
"pin-project-lite",
"quick-xml 0.38.0",
"rand 0.9.2",
"reed-solomon-simd",
"regex",
"reqwest",
"rmp",
"rmp-serde",
"rustfs-checksums",
"rustfs-common",
"rustfs-config",
"rustfs-filemeta",
@@ -8175,8 +8214,8 @@ dependencies = [
"rustls 0.23.29",
"s3s",
"serde",
"serde-xml-rs 0.8.1",
"serde_json",
"sha1 0.10.6",
"sha2 0.10.9",
"shadow-rs",
"smallvec",
@@ -8430,7 +8469,7 @@ dependencies = [
"regex",
"reqwest",
"serde",
"serde-xml-rs 0.6.0",
"serde-xml-rs",
"sha2 0.10.9",
"urlencoding",
]
@@ -8985,18 +9024,6 @@ dependencies = [
"xml-rs",
]
[[package]]
name = "serde-xml-rs"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53630160a98edebde0123eb4dfd0fce6adff091b2305db3154a9e920206eb510"
dependencies = [
"log",
"serde",
"thiserror 1.0.69",
"xml-rs",
]
[[package]]
name = "serde_derive"
version = "1.0.219"
@@ -10082,9 +10109,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.46.1"
version = "1.47.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17"
checksum = "43864ed400b6043a4757a25c7a64a8efde741aed79a056a2fb348a406701bb35"
dependencies = [
"backtrace",
"bytes",
@@ -10095,10 +10122,10 @@ dependencies = [
"pin-project-lite",
"signal-hook-registry",
"slab",
"socket2 0.5.10",
"socket2 0.6.0",
"tokio-macros",
"tracing",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -10474,6 +10501,27 @@ dependencies = [
"tracing-serde",
]
[[package]]
name = "tracing-test"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
dependencies = [
"tracing-core",
"tracing-subscriber",
"tracing-test-macro",
]
[[package]]
name = "tracing-test-macro"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
dependencies = [
"quote",
"syn 2.0.104",
]
[[package]]
name = "tracing-wasm"
version = "0.2.1"
@@ -11813,6 +11861,12 @@ dependencies = [
"lzma-sys",
]
[[package]]
name = "yansi"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
[[package]]
name = "yoke"
version = "0.8.0"

View File

@@ -33,6 +33,7 @@ members = [
"crates/s3select-api", # S3 Select API interface
"crates/s3select-query", # S3 Select query engine
"crates/signer", # client signer
"crates/checksums", # client checksums
"crates/utils", # Utility functions and helpers
"crates/workers", # Worker thread pools and task scheduling
"crates/zip", # ZIP file handling and compression
@@ -84,6 +85,7 @@ rustfs-utils = { path = "crates/utils", version = "0.0.5" }
rustfs-rio = { path = "crates/rio", version = "0.0.5" }
rustfs-filemeta = { path = "crates/filemeta", version = "0.0.5" }
rustfs-signer = { path = "crates/signer", version = "0.0.5" }
rustfs-checksums = { path = "crates/checksums", version = "0.0.5" }
rustfs-workers = { path = "crates/workers", version = "0.0.5" }
aes-gcm = { version = "0.10.3", features = ["std"] }
arc-swap = "1.7.1"
@@ -215,7 +217,6 @@ s3s = { version = "0.12.0-minio-preview.2" }
shadow-rs = { version = "1.2.0", default-features = false }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = { version = "1.0.141", features = ["raw_value"] }
serde-xml-rs = "0.8.1"
serde_urlencoded = "0.7.1"
sha1 = "0.10.6"
sha2 = "0.10.9"

View File

@@ -371,7 +371,7 @@ impl ServiceManager {
StdCommand::new("taskkill")
.arg("/F")
.arg("/PID")
.arg(&service_pid.to_string())
.arg(service_pid.to_string())
.output()?;
}

View File

@@ -0,0 +1,48 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[package]
name = "rustfs-checksums"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
homepage.workspace = true
description = "Checksum calculation and verification callbacks for HTTP request and response bodies sent by service clients generated by RustFS, ensuring data integrity and authenticity."
keywords = ["checksum-calculation", "verification", "integrity", "authenticity", "rustfs", "Minio"]
categories = ["web-programming", "development-tools", "checksum"]
documentation = "https://docs.rs/rustfs-signer/latest/rustfs_checksum/"
[dependencies]
bytes = { workspace = true }
crc-fast = "1.3.0"
hex = { workspace = true }
http = { workspace = true }
http-body = { workspace = true }
base64-simd = { workspace = true }
md-5 = { workspace = true }
pin-project-lite = { workspace = true }
sha1 = { workspace = true }
sha2 = { workspace = true }
tracing = { workspace = true }
[dev-dependencies]
bytes-utils = "0.1.2"
pretty_assertions = "1.3"
tracing-test = "0.2.1"
[dev-dependencies.tokio]
version = "1.23.1"
features = ["macros", "rt"]

View File

@@ -0,0 +1,3 @@
# rustfs-checksums
Checksum calculation and verification callbacks for HTTP request and response bodies sent by service clients generated by RustFS object storage.

View File

@@ -0,0 +1,44 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use base64_simd::STANDARD;
use std::error::Error;
#[derive(Debug)]
pub(crate) struct DecodeError(base64_simd::Error);
impl Error for DecodeError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(&self.0)
}
}
impl std::fmt::Display for DecodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "failed to decode base64")
}
}
pub(crate) fn decode(input: impl AsRef<str>) -> Result<Vec<u8>, DecodeError> {
STANDARD.decode_to_vec(input.as_ref()).map_err(DecodeError)
}
pub(crate) fn encode(input: impl AsRef<[u8]>) -> String {
STANDARD.encode_to_string(input.as_ref())
}
pub(crate) fn encoded_length(length: usize) -> usize {
STANDARD.encoded_length(length)
}

View File

@@ -0,0 +1,45 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::error::Error;
use std::fmt;
#[derive(Debug)]
pub struct UnknownChecksumAlgorithmError {
checksum_algorithm: String,
}
impl UnknownChecksumAlgorithmError {
pub(crate) fn new(checksum_algorithm: impl Into<String>) -> Self {
Self {
checksum_algorithm: checksum_algorithm.into(),
}
}
pub fn checksum_algorithm(&self) -> &str {
&self.checksum_algorithm
}
}
impl fmt::Display for UnknownChecksumAlgorithmError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
r#"unknown checksum algorithm "{}", please pass a known algorithm name ("crc32", "crc32c", "sha1", "sha256", "md5")"#,
self.checksum_algorithm
)
}
}
impl Error for UnknownChecksumAlgorithmError {}

View File

@@ -0,0 +1,196 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::base64;
use http::header::{HeaderMap, HeaderValue};
use crate::Crc64Nvme;
use crate::{CRC_32_C_NAME, CRC_32_NAME, CRC_64_NVME_NAME, Checksum, Crc32, Crc32c, Md5, SHA_1_NAME, SHA_256_NAME, Sha1, Sha256};
pub const CRC_32_HEADER_NAME: &str = "x-amz-checksum-crc32";
pub const CRC_32_C_HEADER_NAME: &str = "x-amz-checksum-crc32c";
pub const SHA_1_HEADER_NAME: &str = "x-amz-checksum-sha1";
pub const SHA_256_HEADER_NAME: &str = "x-amz-checksum-sha256";
pub const CRC_64_NVME_HEADER_NAME: &str = "x-amz-checksum-crc64nvme";
pub(crate) static MD5_HEADER_NAME: &str = "content-md5";
pub const CHECKSUM_ALGORITHMS_IN_PRIORITY_ORDER: [&str; 5] =
[CRC_64_NVME_NAME, CRC_32_C_NAME, CRC_32_NAME, SHA_1_NAME, SHA_256_NAME];
pub trait HttpChecksum: Checksum + Send + Sync {
fn headers(self: Box<Self>) -> HeaderMap<HeaderValue> {
let mut header_map = HeaderMap::new();
header_map.insert(self.header_name(), self.header_value());
header_map
}
fn header_name(&self) -> &'static str;
fn header_value(self: Box<Self>) -> HeaderValue {
let hash = self.finalize();
HeaderValue::from_str(&base64::encode(&hash[..])).expect("base64 encoded bytes are always valid header values")
}
fn size(&self) -> u64 {
let trailer_name_size_in_bytes = self.header_name().len();
let base64_encoded_checksum_size_in_bytes = base64::encoded_length(Checksum::size(self) as usize);
let size = trailer_name_size_in_bytes + ":".len() + base64_encoded_checksum_size_in_bytes;
size as u64
}
}
impl HttpChecksum for Crc32 {
fn header_name(&self) -> &'static str {
CRC_32_HEADER_NAME
}
}
impl HttpChecksum for Crc32c {
fn header_name(&self) -> &'static str {
CRC_32_C_HEADER_NAME
}
}
impl HttpChecksum for Crc64Nvme {
fn header_name(&self) -> &'static str {
CRC_64_NVME_HEADER_NAME
}
}
impl HttpChecksum for Sha1 {
fn header_name(&self) -> &'static str {
SHA_1_HEADER_NAME
}
}
impl HttpChecksum for Sha256 {
fn header_name(&self) -> &'static str {
SHA_256_HEADER_NAME
}
}
impl HttpChecksum for Md5 {
fn header_name(&self) -> &'static str {
MD5_HEADER_NAME
}
}
#[cfg(test)]
mod tests {
use crate::base64;
use bytes::Bytes;
use crate::{CRC_32_C_NAME, CRC_32_NAME, CRC_64_NVME_NAME, ChecksumAlgorithm, SHA_1_NAME, SHA_256_NAME};
use super::HttpChecksum;
#[test]
fn test_trailer_length_of_crc32_checksum_body() {
let checksum = CRC_32_NAME.parse::<ChecksumAlgorithm>().unwrap().into_impl();
let expected_size = 29;
let actual_size = HttpChecksum::size(&*checksum);
assert_eq!(expected_size, actual_size)
}
#[test]
fn test_trailer_value_of_crc32_checksum_body() {
let checksum = CRC_32_NAME.parse::<ChecksumAlgorithm>().unwrap().into_impl();
// The CRC32 of an empty string is all zeroes
let expected_value = Bytes::from_static(b"\0\0\0\0");
let expected_value = base64::encode(&expected_value);
let actual_value = checksum.header_value();
assert_eq!(expected_value, actual_value)
}
#[test]
fn test_trailer_length_of_crc32c_checksum_body() {
let checksum = CRC_32_C_NAME.parse::<ChecksumAlgorithm>().unwrap().into_impl();
let expected_size = 30;
let actual_size = HttpChecksum::size(&*checksum);
assert_eq!(expected_size, actual_size)
}
#[test]
fn test_trailer_value_of_crc32c_checksum_body() {
let checksum = CRC_32_C_NAME.parse::<ChecksumAlgorithm>().unwrap().into_impl();
// The CRC32C of an empty string is all zeroes
let expected_value = Bytes::from_static(b"\0\0\0\0");
let expected_value = base64::encode(&expected_value);
let actual_value = checksum.header_value();
assert_eq!(expected_value, actual_value)
}
#[test]
fn test_trailer_length_of_crc64nvme_checksum_body() {
let checksum = CRC_64_NVME_NAME.parse::<ChecksumAlgorithm>().unwrap().into_impl();
let expected_size = 37;
let actual_size = HttpChecksum::size(&*checksum);
assert_eq!(expected_size, actual_size)
}
#[test]
fn test_trailer_value_of_crc64nvme_checksum_body() {
let checksum = CRC_64_NVME_NAME.parse::<ChecksumAlgorithm>().unwrap().into_impl();
// The CRC64NVME of an empty string is all zeroes
let expected_value = Bytes::from_static(b"\0\0\0\0\0\0\0\0");
let expected_value = base64::encode(&expected_value);
let actual_value = checksum.header_value();
assert_eq!(expected_value, actual_value)
}
#[test]
fn test_trailer_length_of_sha1_checksum_body() {
let checksum = SHA_1_NAME.parse::<ChecksumAlgorithm>().unwrap().into_impl();
let expected_size = 48;
let actual_size = HttpChecksum::size(&*checksum);
assert_eq!(expected_size, actual_size)
}
#[test]
fn test_trailer_value_of_sha1_checksum_body() {
let checksum = SHA_1_NAME.parse::<ChecksumAlgorithm>().unwrap().into_impl();
// The SHA1 of an empty string is da39a3ee5e6b4b0d3255bfef95601890afd80709
let expected_value = Bytes::from_static(&[
0xda, 0x39, 0xa3, 0xee, 0x5e, 0x6b, 0x4b, 0x0d, 0x32, 0x55, 0xbf, 0xef, 0x95, 0x60, 0x18, 0x90, 0xaf, 0xd8, 0x07,
0x09,
]);
let expected_value = base64::encode(&expected_value);
let actual_value = checksum.header_value();
assert_eq!(expected_value, actual_value)
}
#[test]
fn test_trailer_length_of_sha256_checksum_body() {
let checksum = SHA_256_NAME.parse::<ChecksumAlgorithm>().unwrap().into_impl();
let expected_size = 66;
let actual_size = HttpChecksum::size(&*checksum);
assert_eq!(expected_size, actual_size)
}
#[test]
fn test_trailer_value_of_sha256_checksum_body() {
let checksum = SHA_256_NAME.parse::<ChecksumAlgorithm>().unwrap().into_impl();
let expected_value = Bytes::from_static(&[
0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14, 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24, 0x27, 0xae, 0x41,
0xe4, 0x64, 0x9b, 0x93, 0x4c, 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55,
]);
let expected_value = base64::encode(&expected_value);
let actual_value = checksum.header_value();
assert_eq!(expected_value, actual_value)
}
}

446
crates/checksums/src/lib.rs Normal file
View File

@@ -0,0 +1,446 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![allow(clippy::derive_partial_eq_without_eq)]
#![warn(
// missing_docs,
rustdoc::missing_crate_level_docs,
unreachable_pub,
rust_2018_idioms
)]
use crate::error::UnknownChecksumAlgorithmError;
use bytes::Bytes;
use std::{fmt::Debug, str::FromStr};
mod base64;
pub mod error;
pub mod http;
pub const CRC_32_NAME: &str = "crc32";
pub const CRC_32_C_NAME: &str = "crc32c";
pub const CRC_64_NVME_NAME: &str = "crc64nvme";
pub const SHA_1_NAME: &str = "sha1";
pub const SHA_256_NAME: &str = "sha256";
pub const MD5_NAME: &str = "md5";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum ChecksumAlgorithm {
#[default]
Crc32,
Crc32c,
#[deprecated]
Md5,
Sha1,
Sha256,
Crc64Nvme,
}
impl FromStr for ChecksumAlgorithm {
type Err = UnknownChecksumAlgorithmError;
fn from_str(checksum_algorithm: &str) -> Result<Self, Self::Err> {
if checksum_algorithm.eq_ignore_ascii_case(CRC_32_NAME) {
Ok(Self::Crc32)
} else if checksum_algorithm.eq_ignore_ascii_case(CRC_32_C_NAME) {
Ok(Self::Crc32c)
} else if checksum_algorithm.eq_ignore_ascii_case(SHA_1_NAME) {
Ok(Self::Sha1)
} else if checksum_algorithm.eq_ignore_ascii_case(SHA_256_NAME) {
Ok(Self::Sha256)
} else if checksum_algorithm.eq_ignore_ascii_case(MD5_NAME) {
// MD5 is now an alias for the default Crc32 since it is deprecated
Ok(Self::Crc32)
} else if checksum_algorithm.eq_ignore_ascii_case(CRC_64_NVME_NAME) {
Ok(Self::Crc64Nvme)
} else {
Err(UnknownChecksumAlgorithmError::new(checksum_algorithm))
}
}
}
impl ChecksumAlgorithm {
pub fn into_impl(self) -> Box<dyn http::HttpChecksum> {
match self {
Self::Crc32 => Box::<Crc32>::default(),
Self::Crc32c => Box::<Crc32c>::default(),
Self::Crc64Nvme => Box::<Crc64Nvme>::default(),
#[allow(deprecated)]
Self::Md5 => Box::<Crc32>::default(),
Self::Sha1 => Box::<Sha1>::default(),
Self::Sha256 => Box::<Sha256>::default(),
}
}
pub fn as_str(&self) -> &'static str {
match self {
Self::Crc32 => CRC_32_NAME,
Self::Crc32c => CRC_32_C_NAME,
Self::Crc64Nvme => CRC_64_NVME_NAME,
#[allow(deprecated)]
Self::Md5 => MD5_NAME,
Self::Sha1 => SHA_1_NAME,
Self::Sha256 => SHA_256_NAME,
}
}
}
pub trait Checksum: Send + Sync {
fn update(&mut self, bytes: &[u8]);
fn finalize(self: Box<Self>) -> Bytes;
fn size(&self) -> u64;
}
#[derive(Debug)]
struct Crc32 {
hasher: crc_fast::Digest,
}
impl Default for Crc32 {
fn default() -> Self {
Self {
hasher: crc_fast::Digest::new(crc_fast::CrcAlgorithm::Crc32IsoHdlc),
}
}
}
impl Crc32 {
fn update(&mut self, bytes: &[u8]) {
self.hasher.update(bytes);
}
fn finalize(self) -> Bytes {
let checksum = self.hasher.finalize() as u32;
Bytes::copy_from_slice(checksum.to_be_bytes().as_slice())
}
fn size() -> u64 {
4
}
}
impl Checksum for Crc32 {
fn update(&mut self, bytes: &[u8]) {
Self::update(self, bytes)
}
fn finalize(self: Box<Self>) -> Bytes {
Self::finalize(*self)
}
fn size(&self) -> u64 {
Self::size()
}
}
#[derive(Debug)]
struct Crc32c {
hasher: crc_fast::Digest,
}
impl Default for Crc32c {
fn default() -> Self {
Self {
hasher: crc_fast::Digest::new(crc_fast::CrcAlgorithm::Crc32Iscsi),
}
}
}
impl Crc32c {
fn update(&mut self, bytes: &[u8]) {
self.hasher.update(bytes);
}
fn finalize(self) -> Bytes {
let checksum = self.hasher.finalize() as u32;
Bytes::copy_from_slice(checksum.to_be_bytes().as_slice())
}
fn size() -> u64 {
4
}
}
impl Checksum for Crc32c {
fn update(&mut self, bytes: &[u8]) {
Self::update(self, bytes)
}
fn finalize(self: Box<Self>) -> Bytes {
Self::finalize(*self)
}
fn size(&self) -> u64 {
Self::size()
}
}
#[derive(Debug)]
struct Crc64Nvme {
hasher: crc_fast::Digest,
}
impl Default for Crc64Nvme {
fn default() -> Self {
Self {
hasher: crc_fast::Digest::new(crc_fast::CrcAlgorithm::Crc64Nvme),
}
}
}
impl Crc64Nvme {
fn update(&mut self, bytes: &[u8]) {
self.hasher.update(bytes);
}
fn finalize(self) -> Bytes {
Bytes::copy_from_slice(self.hasher.finalize().to_be_bytes().as_slice())
}
fn size() -> u64 {
8
}
}
impl Checksum for Crc64Nvme {
fn update(&mut self, bytes: &[u8]) {
Self::update(self, bytes)
}
fn finalize(self: Box<Self>) -> Bytes {
Self::finalize(*self)
}
fn size(&self) -> u64 {
Self::size()
}
}
#[derive(Debug, Default)]
struct Sha1 {
hasher: sha1::Sha1,
}
impl Sha1 {
fn update(&mut self, bytes: &[u8]) {
use sha1::Digest;
self.hasher.update(bytes);
}
fn finalize(self) -> Bytes {
use sha1::Digest;
Bytes::copy_from_slice(self.hasher.finalize().as_slice())
}
fn size() -> u64 {
use sha1::Digest;
sha1::Sha1::output_size() as u64
}
}
impl Checksum for Sha1 {
fn update(&mut self, bytes: &[u8]) {
Self::update(self, bytes)
}
fn finalize(self: Box<Self>) -> Bytes {
Self::finalize(*self)
}
fn size(&self) -> u64 {
Self::size()
}
}
#[derive(Debug, Default)]
struct Sha256 {
hasher: sha2::Sha256,
}
impl Sha256 {
fn update(&mut self, bytes: &[u8]) {
use sha2::Digest;
self.hasher.update(bytes);
}
fn finalize(self) -> Bytes {
use sha2::Digest;
Bytes::copy_from_slice(self.hasher.finalize().as_slice())
}
fn size() -> u64 {
use sha2::Digest;
sha2::Sha256::output_size() as u64
}
}
impl Checksum for Sha256 {
fn update(&mut self, bytes: &[u8]) {
Self::update(self, bytes);
}
fn finalize(self: Box<Self>) -> Bytes {
Self::finalize(*self)
}
fn size(&self) -> u64 {
Self::size()
}
}
#[derive(Debug, Default)]
struct Md5 {
hasher: md5::Md5,
}
impl Md5 {
fn update(&mut self, bytes: &[u8]) {
use md5::Digest;
self.hasher.update(bytes);
}
fn finalize(self) -> Bytes {
use md5::Digest;
Bytes::copy_from_slice(self.hasher.finalize().as_slice())
}
fn size() -> u64 {
use md5::Digest;
md5::Md5::output_size() as u64
}
}
impl Checksum for Md5 {
fn update(&mut self, bytes: &[u8]) {
Self::update(self, bytes)
}
fn finalize(self: Box<Self>) -> Bytes {
Self::finalize(*self)
}
fn size(&self) -> u64 {
Self::size()
}
}
#[cfg(test)]
mod tests {
use super::{
Crc32, Crc32c, Md5, Sha1, Sha256,
http::{CRC_32_C_HEADER_NAME, CRC_32_HEADER_NAME, MD5_HEADER_NAME, SHA_1_HEADER_NAME, SHA_256_HEADER_NAME},
};
use crate::ChecksumAlgorithm;
use crate::http::HttpChecksum;
use crate::base64;
use http::HeaderValue;
use pretty_assertions::assert_eq;
use std::fmt::Write;
const TEST_DATA: &str = r#"test data"#;
fn base64_encoded_checksum_to_hex_string(header_value: &HeaderValue) -> String {
let decoded_checksum = base64::decode(header_value.to_str().unwrap()).unwrap();
let decoded_checksum = decoded_checksum.into_iter().fold(String::new(), |mut acc, byte| {
write!(acc, "{byte:02X?}").expect("string will always be writeable");
acc
});
format!("0x{decoded_checksum}")
}
#[test]
fn test_crc32_checksum() {
let mut checksum = Crc32::default();
checksum.update(TEST_DATA.as_bytes());
let checksum_result = Box::new(checksum).headers();
let encoded_checksum = checksum_result.get(CRC_32_HEADER_NAME).unwrap();
let decoded_checksum = base64_encoded_checksum_to_hex_string(encoded_checksum);
let expected_checksum = "0xD308AEB2";
assert_eq!(decoded_checksum, expected_checksum);
}
#[cfg(not(any(target_arch = "powerpc", target_arch = "powerpc64")))]
#[test]
fn test_crc32c_checksum() {
let mut checksum = Crc32c::default();
checksum.update(TEST_DATA.as_bytes());
let checksum_result = Box::new(checksum).headers();
let encoded_checksum = checksum_result.get(CRC_32_C_HEADER_NAME).unwrap();
let decoded_checksum = base64_encoded_checksum_to_hex_string(encoded_checksum);
let expected_checksum = "0x3379B4CA";
assert_eq!(decoded_checksum, expected_checksum);
}
#[test]
fn test_crc64nvme_checksum() {
use crate::{Crc64Nvme, http::CRC_64_NVME_HEADER_NAME};
let mut checksum = Crc64Nvme::default();
checksum.update(TEST_DATA.as_bytes());
let checksum_result = Box::new(checksum).headers();
let encoded_checksum = checksum_result.get(CRC_64_NVME_HEADER_NAME).unwrap();
let decoded_checksum = base64_encoded_checksum_to_hex_string(encoded_checksum);
let expected_checksum = "0xAECAF3AF9C98A855";
assert_eq!(decoded_checksum, expected_checksum);
}
#[test]
fn test_sha1_checksum() {
let mut checksum = Sha1::default();
checksum.update(TEST_DATA.as_bytes());
let checksum_result = Box::new(checksum).headers();
let encoded_checksum = checksum_result.get(SHA_1_HEADER_NAME).unwrap();
let decoded_checksum = base64_encoded_checksum_to_hex_string(encoded_checksum);
let expected_checksum = "0xF48DD853820860816C75D54D0F584DC863327A7C";
assert_eq!(decoded_checksum, expected_checksum);
}
#[test]
fn test_sha256_checksum() {
let mut checksum = Sha256::default();
checksum.update(TEST_DATA.as_bytes());
let checksum_result = Box::new(checksum).headers();
let encoded_checksum = checksum_result.get(SHA_256_HEADER_NAME).unwrap();
let decoded_checksum = base64_encoded_checksum_to_hex_string(encoded_checksum);
let expected_checksum = "0x916F0027A575074CE72A331777C3478D6513F786A591BD892DA1A577BF2335F9";
assert_eq!(decoded_checksum, expected_checksum);
}
#[test]
fn test_md5_checksum() {
let mut checksum = Md5::default();
checksum.update(TEST_DATA.as_bytes());
let checksum_result = Box::new(checksum).headers();
let encoded_checksum = checksum_result.get(MD5_HEADER_NAME).unwrap();
let decoded_checksum = base64_encoded_checksum_to_hex_string(encoded_checksum);
let expected_checksum = "0xEB733A00C0C9D336E65691A37AB54293";
assert_eq!(decoded_checksum, expected_checksum);
}
#[test]
fn test_checksum_algorithm_returns_error_for_unknown() {
let error = "some invalid checksum algorithm"
.parse::<ChecksumAlgorithm>()
.expect_err("it should error");
assert_eq!("some invalid checksum algorithm", error.checksum_algorithm());
}
}

View File

@@ -50,7 +50,7 @@ serde.workspace = true
time.workspace = true
bytesize.workspace = true
serde_json.workspace = true
serde-xml-rs.workspace = true
quick-xml.workspace = true
s3s.workspace = true
http.workspace = true
url.workspace = true
@@ -66,6 +66,7 @@ rmp-serde.workspace = true
tokio-util = { workspace = true, features = ["io", "compat"] }
base64 = { workspace = true }
hmac = { workspace = true }
sha1 = { workspace = true }
sha2 = { workspace = true }
hex-simd = { workspace = true }
path-clean = { workspace = true }
@@ -98,6 +99,7 @@ rustfs-filemeta.workspace = true
rustfs-utils = { workspace = true, features = ["full"] }
rustfs-rio.workspace = true
rustfs-signer.workspace = true
rustfs-checksums.workspace = true
futures-util.workspace = true
[target.'cfg(not(windows))'.dependencies]

View File

@@ -346,8 +346,12 @@ impl ExpiryState {
}
pub async fn worker(rx: &mut Receiver<Option<ExpiryOpType>>, api: Arc<ECStore>) {
//let cancel_token =
// get_background_services_cancel_token().ok_or_else(|| Error::other("Background services not initialized"))?;
loop {
select! {
//_ = cancel_token.cancelled() => {
_ = tokio::signal::ctrl_c() => {
info!("got ctrl+c, exits");
break;
@@ -811,8 +815,8 @@ impl LifecycleOps for ObjectInfo {
num_versions: self.num_versions,
delete_marker: self.delete_marker,
successor_mod_time: self.successor_mod_time,
//restore_ongoing: self.restore_ongoing,
//restore_expires: self.restore_expires,
restore_ongoing: self.restore_ongoing,
restore_expires: self.restore_expires,
transition_status: self.transitioned_object.status.clone(),
..Default::default()
}

View File

@@ -132,7 +132,7 @@ pub trait Lifecycle {
async fn has_transition(&self) -> bool;
fn has_expiry(&self) -> bool;
async fn has_active_rules(&self, prefix: &str) -> bool;
async fn validate(&self, lr_retention: bool) -> Result<(), std::io::Error>;
async fn validate(&self, lr: &ObjectLockConfiguration) -> Result<(), std::io::Error>;
async fn filter_rules(&self, obj: &ObjectOpts) -> Option<Vec<LifecycleRule>>;
async fn eval(&self, obj: &ObjectOpts) -> Event;
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event;
@@ -213,7 +213,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
false
}
async fn validate(&self, lr_retention: bool) -> Result<(), std::io::Error> {
async fn validate(&self, lr: &ObjectLockConfiguration) -> Result<(), std::io::Error> {
if self.rules.len() > 1000 {
return Err(std::io::Error::other(ERR_LIFECYCLE_TOO_MANY_RULES));
}
@@ -223,13 +223,15 @@ impl Lifecycle for BucketLifecycleConfiguration {
for r in &self.rules {
r.validate()?;
if let Some(expiration) = r.expiration.as_ref() {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
if lr_retention && (expired_object_delete_marker) {
return Err(std::io::Error::other(ERR_LIFECYCLE_BUCKET_LOCKED));
/*if let Some(object_lock_enabled) = lr.object_lock_enabled.as_ref() {
if let Some(expiration) = r.expiration.as_ref() {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
if object_lock_enabled.as_str() == ObjectLockEnabled::ENABLED && (expired_object_delete_marker) {
return Err(std::io::Error::other(ERR_LIFECYCLE_BUCKET_LOCKED));
}
}
}
}
}
}*/
}
for (i, _) in self.rules.iter().enumerate() {
if i == self.rules.len() - 1 {
@@ -600,7 +602,7 @@ pub fn expected_expiry_time(mod_time: OffsetDateTime, days: i32) -> OffsetDateTi
}
let t = mod_time
.to_offset(offset!(-0:00:00))
.saturating_add(Duration::days(0 /*days as i64*/)); //debug
.saturating_add(Duration::days(days as i64));
let mut hour = 3600;
if let Ok(env_ilm_hour) = env::var("_RUSTFS_ILM_HOUR") {
if let Ok(num_hour) = env_ilm_hour.parse::<usize>() {

View File

@@ -20,7 +20,7 @@
#![allow(clippy::all)]
use lazy_static::lazy_static;
use rustfs_utils::HashAlgorithm;
use rustfs_checksums::ChecksumAlgorithm;
use std::collections::HashMap;
use crate::client::{api_put_object::PutObjectOptions, api_s3_datatypes::ObjectPart};
@@ -103,15 +103,34 @@ impl ChecksumMode {
}
pub fn can_composite(&self) -> bool {
todo!();
let s = EnumSet::from(*self).intersection(*C_ChecksumMask);
match s.as_u8() {
2_u8 => true,
4_u8 => true,
8_u8 => true,
16_u8 => true,
_ => false,
}
}
pub fn can_merge_crc(&self) -> bool {
todo!();
let s = EnumSet::from(*self).intersection(*C_ChecksumMask);
match s.as_u8() {
8_u8 => true,
16_u8 => true,
32_u8 => true,
_ => false,
}
}
pub fn full_object_requested(&self) -> bool {
todo!();
let s = EnumSet::from(*self).intersection(*C_ChecksumMask);
match s.as_u8() {
//C_ChecksumFullObjectCRC32 as u8 => true,
//C_ChecksumFullObjectCRC32C as u8 => true,
32_u8 => true,
_ => false,
}
}
pub fn key_capitalized(&self) -> String {
@@ -123,33 +142,35 @@ impl ChecksumMode {
if u == ChecksumMode::ChecksumCRC32 as u8 || u == ChecksumMode::ChecksumCRC32C as u8 {
4
} else if u == ChecksumMode::ChecksumSHA1 as u8 {
4 //sha1.size
use sha1::Digest;
sha1::Sha1::output_size() as usize
} else if u == ChecksumMode::ChecksumSHA256 as u8 {
4 //sha256.size
use sha2::Digest;
sha2::Sha256::output_size() as usize
} else if u == ChecksumMode::ChecksumCRC64NVME as u8 {
4 //crc64.size
8
} else {
0
}
}
pub fn hasher(&self) -> Result<HashAlgorithm, std::io::Error> {
pub fn hasher(&self) -> Result<Box<dyn rustfs_checksums::http::HttpChecksum>, std::io::Error> {
match /*C_ChecksumMask & **/self {
/*ChecksumMode::ChecksumCRC32 => {
return Ok(Box::new(crc32fast::Hasher::new()));
}*/
/*ChecksumMode::ChecksumCRC32C => {
return Ok(Box::new(crc32::new(crc32.MakeTable(crc32.Castagnoli))));
ChecksumMode::ChecksumCRC32 => {
return Ok(ChecksumAlgorithm::Crc32.into_impl());
}
ChecksumMode::ChecksumCRC32C => {
return Ok(ChecksumAlgorithm::Crc32c.into_impl());
}
ChecksumMode::ChecksumSHA1 => {
return Ok(Box::new(sha1::new()));
}*/
ChecksumMode::ChecksumSHA256 => {
return Ok(HashAlgorithm::SHA256);
return Ok(ChecksumAlgorithm::Sha1.into_impl());
}
ChecksumMode::ChecksumSHA256 => {
return Ok(ChecksumAlgorithm::Sha256.into_impl());
}
ChecksumMode::ChecksumCRC64NVME => {
return Ok(ChecksumAlgorithm::Crc64Nvme.into_impl());
}
/*ChecksumMode::ChecksumCRC64NVME => {
return Ok(Box::new(crc64nvme.New());
}*/
_ => return Err(std::io::Error::other("unsupported checksum type")),
}
}
@@ -170,7 +191,8 @@ impl ChecksumMode {
return Ok("".to_string());
}
let mut h = self.hasher()?;
let hash = h.hash_encode(b);
h.update(b);
let hash = h.finalize();
Ok(base64_encode(hash.as_ref()))
}
@@ -227,7 +249,8 @@ impl ChecksumMode {
let c = self.base();
let crc_bytes = Vec::<u8>::with_capacity(p.len() * self.raw_byte_len() as usize);
let mut h = self.hasher()?;
let hash = h.hash_encode(crc_bytes.as_ref());
h.update(crc_bytes.as_ref());
let hash = h.finalize();
Ok(Checksum {
checksum_type: self.clone(),
r: hash.as_ref().to_vec(),

View File

@@ -63,7 +63,7 @@ impl TransitionClient {
//defer closeResponse(resp)
//if resp != nil {
if resp.status() != StatusCode::NO_CONTENT && resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, "")));
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
}
//}
Ok(())
@@ -98,7 +98,7 @@ impl TransitionClient {
//defer closeResponse(resp)
if resp.status() != StatusCode::NO_CONTENT {
return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, "")));
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
}
Ok(())

View File

@@ -95,13 +95,13 @@ pub fn to_error_response(err: &std::io::Error) -> ErrorResponse {
}
pub fn http_resp_to_error_response(
resp: http::Response<Body>,
resp: &http::Response<Body>,
b: Vec<u8>,
bucket_name: &str,
object_name: &str,
) -> ErrorResponse {
let err_body = String::from_utf8(b).unwrap();
let err_resp_ = serde_xml_rs::from_str::<ErrorResponse>(&err_body);
let err_resp_ = quick_xml::de::from_str::<ErrorResponse>(&err_body);
let mut err_resp = ErrorResponse::default();
if err_resp_.is_err() {
match resp.status() {

View File

@@ -87,11 +87,11 @@ impl TransitionClient {
if resp.status() != http::StatusCode::OK {
let b = resp.body().bytes().expect("err").to_vec();
return Err(std::io::Error::other(http_resp_to_error_response(resp, b, bucket_name, object_name)));
return Err(std::io::Error::other(http_resp_to_error_response(&resp, b, bucket_name, object_name)));
}
let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec();
let mut res = match serde_xml_rs::from_str::<AccessControlPolicy>(&String::from_utf8(b).unwrap()) {
let mut res = match quick_xml::de::from_str::<AccessControlPolicy>(&String::from_utf8(b).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));

View File

@@ -144,7 +144,7 @@ impl ObjectAttributes {
self.version_id = h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap().to_string();
let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec();
let mut response = match serde_xml_rs::from_str::<ObjectAttributesResponse>(&String::from_utf8(b).unwrap()) {
let mut response = match quick_xml::de::from_str::<ObjectAttributesResponse>(&String::from_utf8(b).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));
@@ -226,7 +226,7 @@ impl TransitionClient {
if resp.status() != http::StatusCode::OK {
let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec();
let err_body = String::from_utf8(b).unwrap();
let mut er = match serde_xml_rs::from_str::<AccessControlPolicy>(&err_body) {
let mut er = match quick_xml::de::from_str::<AccessControlPolicy>(&err_body) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));

View File

@@ -98,12 +98,12 @@ impl TransitionClient {
)
.await?;
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, "")));
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
}
//let mut list_bucket_result = ListBucketV2Result::default();
let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec();
let mut list_bucket_result = match serde_xml_rs::from_str::<ListBucketV2Result>(&String::from_utf8(b).unwrap()) {
let mut list_bucket_result = match quick_xml::de::from_str::<ListBucketV2Result>(&String::from_utf8(b).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));

View File

@@ -85,7 +85,7 @@ pub struct PutObjectOptions {
pub expires: OffsetDateTime,
pub mode: ObjectLockRetentionMode,
pub retain_until_date: OffsetDateTime,
//pub server_side_encryption: encrypt.ServerSide,
//pub server_side_encryption: encrypt::ServerSide,
pub num_threads: u64,
pub storage_class: String,
pub website_redirect_location: String,
@@ -135,7 +135,7 @@ impl Default for PutObjectOptions {
#[allow(dead_code)]
impl PutObjectOptions {
fn set_match_tag(&mut self, etag: &str) {
fn set_match_etag(&mut self, etag: &str) {
if etag == "*" {
self.custom_header
.insert("If-Match", HeaderValue::from_str("*").expect("err"));
@@ -145,7 +145,7 @@ impl PutObjectOptions {
}
}
fn set_match_tag_except(&mut self, etag: &str) {
fn set_match_etag_except(&mut self, etag: &str) {
if etag == "*" {
self.custom_header
.insert("If-None-Match", HeaderValue::from_str("*").expect("err"));
@@ -366,7 +366,8 @@ impl TransitionClient {
md5_base64 = base64_encode(hash.as_ref());
} else {
let mut crc = opts.auto_checksum.hasher()?;
let csum = crc.hash_encode(&buf[..length]);
crc.update(&buf[..length]);
let csum = crc.finalize();
if let Ok(header_name) = HeaderName::from_bytes(opts.auto_checksum.key().as_bytes()) {
custom_header.insert(header_name, base64_encode(csum.as_ref()).parse().expect("err"));

View File

@@ -12,7 +12,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]

View File

@@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(unused_assignments)]
@@ -19,20 +18,14 @@
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
use http::{HeaderMap, HeaderName, StatusCode};
use s3s::S3ErrorCode;
use std::io::Read;
use std::{collections::HashMap, sync::Arc};
use time::{OffsetDateTime, format_description};
use tokio_util::sync::CancellationToken;
use std::collections::HashMap;
use time::OffsetDateTime;
use tracing::warn;
use tracing::{error, info};
use url::form_urlencoded::Serializer;
use uuid::Uuid;
use s3s::header::{X_AMZ_EXPIRATION, X_AMZ_VERSION_ID};
use s3s::{Body, dto::StreamingBlob};
//use crate::disk::{Reader, BufferReader};
use crate::checksum::ChecksumMode;
use crate::client::{
api_error_response::{
err_entity_too_large, err_entity_too_small, err_invalid_argument, http_resp_to_error_response, to_error_response,
@@ -42,15 +35,11 @@ use crate::client::{
api_s3_datatypes::{
CompleteMultipartUpload, CompleteMultipartUploadResult, CompletePart, InitiateMultipartUploadResult, ObjectPart,
},
constants::{ABS_MIN_PART_SIZE, ISO8601_DATEFORMAT, MAX_PART_SIZE, MAX_SINGLE_PUT_OBJECT_SIZE},
constants::{ISO8601_DATEFORMAT, MAX_PART_SIZE, MAX_SINGLE_PUT_OBJECT_SIZE},
transition_api::{ReaderImpl, RequestMetadata, TransitionClient, UploadInfo},
};
use crate::{
checksum::ChecksumMode,
disk::DiskAPI,
store_api::{GetObjectReader, StorageAPI},
};
use rustfs_utils::{crypto::base64_encode, path::trim_etag};
use s3s::header::{X_AMZ_EXPIRATION, X_AMZ_VERSION_ID};
impl TransitionClient {
pub async fn put_object_multipart(
@@ -133,7 +122,8 @@ impl TransitionClient {
//}
if hash_sums.len() == 0 {
let mut crc = opts.auto_checksum.hasher()?;
let csum = crc.hash_encode(&buf[..length]);
crc.update(&buf[..length]);
let csum = crc.finalize();
if let Ok(header_name) = HeaderName::from_bytes(opts.auto_checksum.key().as_bytes()) {
custom_header.insert(header_name, base64_encode(csum.as_ref()).parse().expect("err"));
@@ -236,7 +226,12 @@ impl TransitionClient {
let resp = self.execute_method(http::Method::POST, &mut req_metadata).await?;
//if resp.is_none() {
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, object_name)));
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
vec![],
bucket_name,
object_name,
)));
}
//}
let initiate_multipart_upload_result = InitiateMultipartUploadResult::default();
@@ -293,7 +288,7 @@ impl TransitionClient {
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
resp,
&resp,
vec![],
&p.bucket_name.clone(),
&p.object_name,

View File

@@ -156,7 +156,8 @@ impl TransitionClient {
md5_base64 = base64_encode(hash.as_ref());
} else {
let mut crc = opts.auto_checksum.hasher()?;
let csum = crc.hash_encode(&buf[..length]);
crc.update(&buf[..length]);
let csum = crc.finalize();
if let Ok(header_name) = HeaderName::from_bytes(opts.auto_checksum.key().as_bytes()) {
custom_header.insert(header_name, base64_encode(csum.as_ref()).parse().expect("err"));
@@ -303,7 +304,8 @@ impl TransitionClient {
let mut custom_header = HeaderMap::new();
if !opts.send_content_md5 {
let mut crc = opts.auto_checksum.hasher()?;
let csum = crc.hash_encode(&buf[..length]);
crc.update(&buf[..length]);
let csum = crc.finalize();
if let Ok(header_name) = HeaderName::from_bytes(opts.auto_checksum.key().as_bytes()) {
custom_header.insert(header_name, base64_encode(csum.as_ref()).parse().expect("err"));
@@ -477,7 +479,12 @@ impl TransitionClient {
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, object_name)));
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
vec![],
bucket_name,
object_name,
)));
}
let (exp_time, rule_id) = if let Some(h_x_amz_expiration) = resp.headers().get(X_AMZ_EXPIRATION) {

View File

@@ -425,7 +425,12 @@ impl TransitionClient {
};
}
_ => {
return Err(std::io::Error::other(http_resp_to_error_response(resp, vec![], bucket_name, object_name)));
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
vec![],
bucket_name,
object_name,
)));
}
}
return Err(std::io::Error::other(error_response));

View File

@@ -125,7 +125,7 @@ impl TransitionClient {
version_id: &str,
restore_req: &RestoreRequest,
) -> Result<(), std::io::Error> {
let restore_request = match serde_xml_rs::to_string(restore_req) {
let restore_request = match quick_xml::se::to_string(restore_req) {
Ok(buf) => buf,
Err(e) => {
return Err(std::io::Error::other(e));
@@ -165,7 +165,7 @@ impl TransitionClient {
let b = resp.body().bytes().expect("err").to_vec();
if resp.status() != http::StatusCode::ACCEPTED && resp.status() != http::StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(resp, b, bucket_name, "")));
return Err(std::io::Error::other(http_resp_to_error_response(&resp, b, bucket_name, "")));
}
Ok(())
}

View File

@@ -279,7 +279,7 @@ pub struct CompleteMultipartUpload {
impl CompleteMultipartUpload {
pub fn marshal_msg(&self) -> Result<String, std::io::Error> {
//let buf = serde_json::to_string(self)?;
let buf = match serde_xml_rs::to_string(self) {
let buf = match quick_xml::se::to_string(self) {
Ok(buf) => buf,
Err(e) => {
return Err(std::io::Error::other(e));
@@ -329,7 +329,7 @@ pub struct DeleteMultiObjects {
impl DeleteMultiObjects {
pub fn marshal_msg(&self) -> Result<String, std::io::Error> {
//let buf = serde_json::to_string(self)?;
let buf = match serde_xml_rs::to_string(self) {
let buf = match quick_xml::se::to_string(self) {
Ok(buf) => buf,
Err(e) => {
return Err(std::io::Error::other(e));

View File

@@ -59,7 +59,7 @@ impl TransitionClient {
if let Ok(resp) = resp {
let b = resp.body().bytes().expect("err").to_vec();
let resperr = http_resp_to_error_response(resp, b, bucket_name, "");
let resperr = http_resp_to_error_response(&resp, b, bucket_name, "");
/*if to_error_response(resperr).code == "NoSuchBucket" {
return Ok(false);
}

View File

@@ -177,7 +177,7 @@ impl TransitionClient {
async fn process_bucket_location_response(mut resp: http::Response<Body>, bucket_name: &str) -> Result<String, std::io::Error> {
//if resp != nil {
if resp.status() != StatusCode::OK {
let err_resp = http_resp_to_error_response(resp, vec![], bucket_name, "");
let err_resp = http_resp_to_error_response(&resp, vec![], bucket_name, "");
match err_resp.code {
S3ErrorCode::NotImplemented => {
match err_resp.server.as_str() {
@@ -208,7 +208,7 @@ async fn process_bucket_location_response(mut resp: http::Response<Body>, bucket
//}
let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec();
let Document(location_constraint) = serde_xml_rs::from_str::<Document>(&String::from_utf8(b).unwrap()).unwrap();
let Document(location_constraint) = quick_xml::de::from_str::<Document>(&String::from_utf8(b).unwrap()).unwrap();
let mut location = location_constraint;
if location == "" {

View File

@@ -19,7 +19,7 @@
#![allow(clippy::all)]
use bytes::Bytes;
use futures::Future;
use futures::{Future, StreamExt};
use http::{HeaderMap, HeaderName};
use http::{
HeaderValue, Response, StatusCode,
@@ -65,7 +65,9 @@ use crate::{checksum::ChecksumMode, store_api::GetObjectReader};
use rustfs_rio::HashReader;
use rustfs_utils::{
net::get_endpoint_url,
retry::{MAX_RETRY, new_retry_timer},
retry::{
DEFAULT_RETRY_CAP, DEFAULT_RETRY_UNIT, MAX_JITTER, MAX_RETRY, RetryTimer, is_http_status_retryable, is_s3code_retryable,
},
};
use s3s::S3ErrorCode;
use s3s::dto::ReplicationStatus;
@@ -186,6 +188,7 @@ impl TransitionClient {
clnt.trailing_header_support = opts.trailing_headers && clnt.override_signer_type == SignatureType::SignatureV4;
clnt.max_retries = MAX_RETRY;
if opts.max_retries > 0 {
clnt.max_retries = opts.max_retries;
}
@@ -313,12 +316,9 @@ impl TransitionClient {
}
//}
//let mut retry_timer = RetryTimer::new();
//while let Some(v) = retry_timer.next().await {
for _ in [1; 1]
/*new_retry_timer(req_retry, default_retry_unit, default_retry_cap, max_jitter)*/
{
let req = self.new_request(method, metadata).await?;
let mut retry_timer = RetryTimer::new(req_retry, DEFAULT_RETRY_UNIT, DEFAULT_RETRY_CAP, MAX_JITTER, self.random);
while let Some(v) = retry_timer.next().await {
let req = self.new_request(&method, metadata).await?;
resp = self.doit(req).await?;
@@ -329,7 +329,7 @@ impl TransitionClient {
}
let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec();
let err_response = http_resp_to_error_response(resp, b.clone(), &metadata.bucket_name, &metadata.object_name);
let err_response = http_resp_to_error_response(&resp, b.clone(), &metadata.bucket_name, &metadata.object_name);
if self.region == "" {
match err_response.code {
@@ -360,6 +360,14 @@ impl TransitionClient {
}
}
if is_s3code_retryable(err_response.code.as_str()) {
continue;
}
if is_http_status_retryable(&resp.status()) {
continue;
}
break;
}
@@ -368,7 +376,7 @@ impl TransitionClient {
async fn new_request(
&self,
method: http::Method,
method: &http::Method,
metadata: &mut RequestMetadata,
) -> Result<http::Request<Body>, std::io::Error> {
let location = metadata.bucket_location.clone();

View File

@@ -2014,6 +2014,8 @@ impl ReplicateObjectInfo {
version_id: Uuid::try_parse(&self.version_id).ok(),
delete_marker: self.delete_marker,
transitioned_object: TransitionedObject::default(),
restore_ongoing: false,
restore_expires: Some(OffsetDateTime::now_utc()),
user_tags: self.user_tags.clone(),
parts: Vec::new(),
is_latest: true,

View File

@@ -4056,11 +4056,9 @@ impl StorageAPI for SetDisks {
return to_object_err(err, vec![bucket, object]);
}
}*/
//let traceFn = GLOBAL_LifecycleSys.trace(fi.to_object_info(bucket, object, opts.Versioned || opts.VersionSuspended));
let dest_obj = gen_transition_objname(bucket);
if let Err(err) = dest_obj {
//traceFn(ILMTransition, nil, err)
return Err(to_object_err(err, vec![]));
}
let dest_obj = dest_obj.unwrap();
@@ -4068,8 +4066,6 @@ impl StorageAPI for SetDisks {
let oi = ObjectInfo::from_file_info(&fi, bucket, object, opts.versioned || opts.version_suspended);
let (pr, mut pw) = tokio::io::duplex(fi.erasure.block_size);
//let h = HeaderMap::new();
//let reader = ReaderImpl::ObjectBody(GetObjectReader {stream: StreamingBlob::wrap(tokio_util::io::ReaderStream::new(pr)), object_info: oi});
let reader = ReaderImpl::ObjectBody(GetObjectReader {
stream: Box::new(pr),
object_info: oi,
@@ -4106,9 +4102,7 @@ impl StorageAPI for SetDisks {
m
})
.await;
//pr.CloseWithError(err);
if let Err(err) = rv {
//traceFn(ILMTransition, nil, err)
return Err(StorageError::Io(err));
}
let rv = rv.unwrap();
@@ -4172,7 +4166,6 @@ impl StorageAPI for SetDisks {
//if err != nil {
// return set_restore_header_fn(&mut oi, Some(toObjectErr(err, bucket, object)));
//}
//defer gr.Close()
let hash_reader = HashReader::new(gr, gr.obj_info.size, "", "", gr.obj_info.size);
let p_reader = PutObjReader::new(StreamingBlob::from(Box::pin(hash_reader)), hash_reader.size());
if let Err(err) = self.put_object(bucket, object, &mut p_reader, &ropts).await {

View File

@@ -387,6 +387,8 @@ pub struct ObjectInfo {
pub version_id: Option<Uuid>,
pub delete_marker: bool,
pub transitioned_object: TransitionedObject,
pub restore_ongoing: bool,
pub restore_expires: Option<OffsetDateTime>,
pub user_tags: String,
pub parts: Vec<ObjectPartInfo>,
pub is_latest: bool,
@@ -421,6 +423,8 @@ impl Clone for ObjectInfo {
version_id: self.version_id,
delete_marker: self.delete_marker,
transitioned_object: self.transitioned_object.clone(),
restore_ongoing: self.restore_ongoing,
restore_expires: self.restore_expires,
user_tags: self.user_tags.clone(),
parts: self.parts.clone(),
is_latest: self.is_latest,

View File

@@ -12,44 +12,39 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use futures::Stream;
use hyper::http;
use std::{
pin::Pin,
sync::LazyLock,
task::{Context, Poll},
time::Duration,
};
use tokio::time::interval;
pub const MAX_RETRY: i64 = 10;
pub const MAX_JITTER: f64 = 1.0;
pub const NO_JITTER: f64 = 0.0;
/*
struct Delay {
when: Instant,
}
pub const DEFAULT_RETRY_UNIT: Duration = Duration::from_millis(200);
pub const DEFAULT_RETRY_CAP: Duration = Duration::from_secs(1);
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
struct RetryTimer {
rem: usize,
delay: Delay,
pub struct RetryTimer {
base_sleep: Duration,
max_sleep: Duration,
jitter: f64,
random: u64,
rem: i64,
}
impl RetryTimer {
fn new() -> Self {
pub fn new(max_retry: i64, base_sleep: Duration, max_sleep: Duration, jitter: f64, random: u64) -> Self {
Self {
rem: 3,
delay: Delay { when: Instant::now() }
base_sleep,
max_sleep,
jitter,
random,
rem: max_retry,
}
}
}
@@ -57,26 +52,87 @@ impl RetryTimer {
impl Stream for RetryTimer {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<()>>
{
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
let jitter = self.jitter.clamp(NO_JITTER, MAX_JITTER);
let attempt = MAX_RETRY - self.rem;
let mut sleep = self.base_sleep * (1 << attempt);
if sleep > self.max_sleep {
sleep = self.max_sleep;
}
if (jitter - NO_JITTER).abs() > 1e-9 {
sleep -= sleep * self.random as u32 * jitter as u32;
}
if self.rem == 0 {
// No more delays
return Poll::Ready(None);
}
match Pin::new(&mut self.delay).poll(cx) {
Poll::Ready(_) => {
let when = self.delay.when + Duration::from_millis(10);
self.delay = Delay { when };
self.rem -= 1;
Poll::Ready(Some(()))
}
self.rem -= 1;
let mut t = interval(sleep);
match t.poll_tick(cx) {
Poll::Ready(_) => Poll::Ready(Some(())),
Poll::Pending => Poll::Pending,
}
}
}*/
}
pub fn new_retry_timer(_max_retry: i32, _base_sleep: Duration, _max_sleep: Duration, _jitter: f64) -> Vec<i32> {
static RETRYABLE_S3CODES: LazyLock<Vec<String>> = LazyLock::new(|| {
vec![
"RequestError".to_string(),
"RequestTimeout".to_string(),
"Throttling".to_string(),
"ThrottlingException".to_string(),
"RequestLimitExceeded".to_string(),
"RequestThrottled".to_string(),
"InternalError".to_string(),
"ExpiredToken".to_string(),
"ExpiredTokenException".to_string(),
"SlowDown".to_string(),
]
});
static RETRYABLE_HTTP_STATUSCODES: LazyLock<Vec<http::StatusCode>> = LazyLock::new(|| {
vec![
http::StatusCode::REQUEST_TIMEOUT,
http::StatusCode::TOO_MANY_REQUESTS,
//499,
http::StatusCode::INTERNAL_SERVER_ERROR,
http::StatusCode::BAD_GATEWAY,
http::StatusCode::SERVICE_UNAVAILABLE,
http::StatusCode::GATEWAY_TIMEOUT,
//520,
]
});
pub fn is_s3code_retryable(s3code: &str) -> bool {
RETRYABLE_S3CODES.contains(&s3code.to_string())
}
pub fn is_http_status_retryable(http_statuscode: &http::StatusCode) -> bool {
RETRYABLE_HTTP_STATUSCODES.contains(http_statuscode)
}
pub fn is_request_error_retryable(_err: std::io::Error) -> bool {
/*if err == Err::Canceled || err == Err::DeadlineExceeded {
return err() == nil;
}
let uerr = err.(*url.Error);
if uerr.is_ok() {
let e = uerr.unwrap();
return match e.type {
x509.UnknownAuthorityError => {
false
}
_ => true,
};
return match e.error() {
"http: server gave HTTP response to HTTPS client" => {
false
}
_ => rue,
};
}
true*/
todo!();
}

View File

@@ -88,7 +88,7 @@ impl UserAgent {
Some(version) => version,
None => "Windows NT Unknown".to_string(),
};
format!("Windows NT {}", version)
format!("Windows NT {version}")
}
#[cfg(not(windows))]

View File

@@ -465,182 +465,3 @@ impl Operation for ClearTier {
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}
/*pub struct PostRestoreObject {}
#[async_trait::async_trait]
impl Operation for PostRestoreObject {
async fn call(&self, req: S3Request<Body>, params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
let query = {
if let Some(query) = req.uri.query() {
let input: PostRestoreObject =
from_bytes(query.as_bytes()).map_err(|_e| s3_error!(InvalidArgument, "get query failed"))?;
input
} else {
PostRestoreObject::default()
}
};
let bucket = params.bucket;
if let Err(e) = un_escape_path(params.object) {
warn!("post restore object failed, e: {:?}", e);
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
}
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
let get_object_info = store.get_object_info();
if Err(err) = check_request_auth_type(req, policy::RestoreObjectAction, bucket, object) {
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
}
if req.content_length <= 0 {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
}
let Some(opts) = post_restore_opts(req, bucket, object) else {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
};
let Some(obj_info) = getObjectInfo(ctx, bucket, object, opts) else {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
};
if obj_info.transitioned_object.status != lifecycle::TRANSITION_COMPLETE {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
}
let mut api_err;
let Some(rreq) = parsere_store_request(req.body(), req.content_length) else {
let api_err = errorCodes.ToAPIErr(ErrMalformedXML);
api_err.description = err.Error()
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
};
let mut status_code = http::StatusCode::OK;
let mut already_restored = false;
if Err(err) = rreq.validate(store) {
api_err = errorCodes.ToAPIErr(ErrMalformedXML)
api_err.description = err.Error()
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
} else {
if obj_info.restore_ongoing && rreq.Type != "SELECT" {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()), "post restore object failed"));
}
if !obj_info.restore_ongoing && !obj_info.restore_expires.unix_timestamp() == 0 {
status_code = http::StatusCode::Accepted;
already_restored = true;
}
}
let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), rreq.days);
let mut metadata = clone_mss(obj_info.user_defined);
if rreq.type != "SELECT" {
obj_info.metadataOnly = true;
metadata[xhttp.AmzRestoreExpiryDays] = rreq.days;
metadata[xhttp.AmzRestoreRequestDate] = OffsetDateTime::now_utc().format(http::TimeFormat);
if already_restored {
metadata[xhttp.AmzRestore] = completedRestoreObj(restore_expiry).String()
} else {
metadata[xhttp.AmzRestore] = ongoingRestoreObj().String()
}
obj_info.user_defined = metadata;
if let Err(err) = store.copy_object(bucket, object, bucket, object, obj_info, ObjectOptions {
version_id: obj_info.version_id,
}, ObjectOptions {
version_id: obj_info.version_id,
m_time: obj_info.mod_time,
}) {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
if already_restored {
return Ok(());
}
}
let restore_object = must_get_uuid();
if rreq.output_location.s3.bucket_name != "" {
w.Header()[xhttp.AmzRestoreOutputPath] = []string{pathJoin(rreq.OutputLocation.S3.BucketName, rreq.OutputLocation.S3.Prefix, restoreObject)}
}
w.WriteHeader(status_code)
send_event(EventArgs {
event_name: event::ObjectRestorePost,
bucket_name: bucket,
object: obj_info,
req_params: extract_req_params(r),
user_agent: req.user_agent(),
host: handlers::get_source_ip(r),
});
tokio::spawn(async move {
if !rreq.SelectParameters.IsEmpty() {
let actual_size = obj_info.get_actual_size();
if actual_size.is_err() {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
let object_rsc = s3select.NewObjectReadSeekCloser(
|offset int64| -> (io.ReadCloser, error) {
rs := &HTTPRangeSpec{
IsSuffixLength: false,
Start: offset,
End: -1,
}
return getTransitionedObjectReader(bucket, object, rs, r.Header,
obj_info, ObjectOptions {version_id: obj_info.version_id});
},
actual_size.unwrap(),
);
if err = rreq.SelectParameters.Open(objectRSC); err != nil {
if serr, ok := err.(s3select.SelectError); ok {
let encoded_error_response = encodeResponse(APIErrorResponse {
code: serr.ErrorCode(),
message: serr.ErrorMessage(),
bucket_name: bucket,
key: object,
resource: r.URL.Path,
request_id: w.Header().Get(xhttp.AmzRequestID),
host_id: globalDeploymentID(),
});
//writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
} else {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
return Ok(());
}
let nr = httptest.NewRecorder();
let rw = xhttp.NewResponseRecorder(nr);
rw.log_err_body = true;
rw.log_all_body = true;
rreq.select_parameters.evaluate(rw);
rreq.select_parameters.Close();
return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
}
let opts = ObjectOptions {
transition: TransitionOptions {
restore_request: rreq,
restore_expiry: restore_expiry,
},
version_id: objInfo.version_id,
}
if Err(err) = store.restore_transitioned_object(bucket, object, opts) {
format!(format!("unable to restore transitioned bucket/object {}/{}: {}", bucket, object, err.to_string()));
return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
}
send_event(EventArgs {
EventName: event.ObjectRestoreCompleted,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
});
});
let mut header = HeaderMap::new();
header.insert(CONTENT_TYPE, "application/json".parse().unwrap());
header.insert(CONTENT_LENGTH, "0".parse().unwrap());
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header))
}
}*/

View File

@@ -486,6 +486,167 @@ impl S3 for FS {
Ok(S3Response::new(output))
}
async fn restore_object(&self, _req: S3Request<RestoreObjectInput>) -> S3Result<S3Response<RestoreObjectOutput>> {
Err(s3_error!(NotImplemented, "RestoreObject is not implemented yet"))
/*
let bucket = params.bucket;
if let Err(e) = un_escape_path(params.object) {
warn!("post restore object failed, e: {:?}", e);
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
}
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
if Err(err) = check_request_auth_type(req, policy::RestoreObjectAction, bucket, object) {
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
}
if req.content_length <= 0 {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
}
let Some(opts) = post_restore_opts(req, bucket, object) else {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
};
let Some(obj_info) = store.get_object_info(bucket, object, opts) else {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
};
if obj_info.transitioned_object.status != lifecycle::TRANSITION_COMPLETE {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
}
let mut api_err;
let Some(rreq) = parse_restore_request(req.body(), req.content_length) else {
let api_err = errorCodes.ToAPIErr(ErrMalformedXML);
api_err.description = err.Error()
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
};
let mut status_code = http::StatusCode::OK;
let mut already_restored = false;
if Err(err) = rreq.validate(store) {
api_err = errorCodes.ToAPIErr(ErrMalformedXML)
api_err.description = err.Error()
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
} else {
if obj_info.restore_ongoing && rreq.Type != "SELECT" {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()), "post restore object failed"));
}
if !obj_info.restore_ongoing && !obj_info.restore_expires.unix_timestamp() == 0 {
status_code = http::StatusCode::Accepted;
already_restored = true;
}
}
let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), rreq.days);
let mut metadata = clone_mss(obj_info.user_defined);
if rreq.type != "SELECT" {
obj_info.metadataOnly = true;
metadata[xhttp.AmzRestoreExpiryDays] = rreq.days;
metadata[xhttp.AmzRestoreRequestDate] = OffsetDateTime::now_utc().format(http::TimeFormat);
if already_restored {
metadata[AmzRestore] = completed_restore_obj(restore_expiry).String()
} else {
metadata[AmzRestore] = ongoing_restore_obj().to_string()
}
obj_info.user_defined = metadata;
if let Err(err) = store.copy_object(bucket, object, bucket, object, obj_info, ObjectOptions {
version_id: obj_info.version_id,
}, ObjectOptions {
version_id: obj_info.version_id,
m_time: obj_info.mod_time,
}) {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
if already_restored {
return Ok(());
}
}
let restore_object = must_get_uuid();
if rreq.output_location.s3.bucket_name != "" {
w.Header()[AmzRestoreOutputPath] = []string{pathJoin(rreq.OutputLocation.S3.BucketName, rreq.OutputLocation.S3.Prefix, restore_object)}
}
w.WriteHeader(status_code)
send_event(EventArgs {
event_name: event::ObjectRestorePost,
bucket_name: bucket,
object: obj_info,
req_params: extract_req_params(r),
user_agent: req.user_agent(),
host: handlers::get_source_ip(r),
});
tokio::spawn(async move {
if !rreq.SelectParameters.IsEmpty() {
let actual_size = obj_info.get_actual_size();
if actual_size.is_err() {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
let object_rsc = s3select.NewObjectReadSeekCloser(
|offset int64| -> (io.ReadCloser, error) {
rs := &HTTPRangeSpec{
IsSuffixLength: false,
Start: offset,
End: -1,
}
return getTransitionedObjectReader(bucket, object, rs, r.Header,
obj_info, ObjectOptions {version_id: obj_info.version_id});
},
actual_size.unwrap(),
);
if err = rreq.SelectParameters.Open(objectRSC); err != nil {
if serr, ok := err.(s3select.SelectError); ok {
let encoded_error_response = encodeResponse(APIErrorResponse {
code: serr.ErrorCode(),
message: serr.ErrorMessage(),
bucket_name: bucket,
key: object,
resource: r.URL.Path,
request_id: w.Header().Get(xhttp.AmzRequestID),
host_id: globalDeploymentID(),
});
//writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
} else {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
return Ok(());
}
let nr = httptest.NewRecorder();
let rw = xhttp.NewResponseRecorder(nr);
rw.log_err_body = true;
rw.log_all_body = true;
rreq.select_parameters.evaluate(rw);
rreq.select_parameters.Close();
return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
}
let opts = ObjectOptions {
transition: TransitionOptions {
restore_request: rreq,
restore_expiry: restore_expiry,
},
version_id: objInfo.version_id,
}
if Err(err) = store.restore_transitioned_object(bucket, object, opts) {
format!(format!("unable to restore transitioned bucket/object {}/{}: {}", bucket, object, err.to_string()));
return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
}
send_event(EventArgs {
EventName: event.ObjectRestoreCompleted,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
});
});
*/
}
/// Delete a bucket
#[tracing::instrument(level = "debug", skip(self, req))]
async fn delete_bucket(&self, req: S3Request<DeleteBucketInput>) -> S3Result<S3Response<DeleteBucketOutput>> {
@@ -2097,27 +2258,14 @@ impl S3 for FS {
..
} = req.input;
let lr_retention = false;
/*let rcfg = metadata_sys::get_object_lock_config(&bucket).await;
if let Ok(rcfg) = rcfg {
if let Some(rule) = rcfg.0.rule {
if let Some(retention) = rule.default_retention {
if let Some(mode) = retention.mode {
//if mode == ObjectLockRetentionMode::from_static(ObjectLockRetentionMode::GOVERNANCE) {
lr_retention = true;
//}
}
}
}
}*/
//info!("lifecycle_configuration: {:?}", &lifecycle_configuration);
let Some(input_cfg) = lifecycle_configuration else { return Err(s3_error!(InvalidArgument)) };
if let Err(err) = input_cfg.validate(lr_retention).await {
//return Err(S3Error::with_message(S3ErrorCode::Custom("BucketLockValidateFailed".into()), "bucket lock validate failed."));
return Err(S3Error::with_message(S3ErrorCode::Custom("ValidateFailed".into()), err.to_string()));
let rcfg = metadata_sys::get_object_lock_config(&bucket).await;
if let Ok(rcfg) = rcfg {
if let Err(err) = input_cfg.validate(&rcfg.0).await {
//return Err(S3Error::with_message(S3ErrorCode::Custom("BucketLockValidateFailed".into()), err.to_string()));
return Err(S3Error::with_message(S3ErrorCode::Custom("ValidateFailed".into()), err.to_string()));
}
}
if let Err(err) = validate_transition_tier(&input_cfg).await {