fix(lifecycle): lifecycle fixes (#1625)

Signed-off-by: likewu <likewu@126.com>
Co-authored-by: loverustfs <hello@rustfs.com>
This commit is contained in:
likewu
2026-01-31 19:14:47 +08:00
committed by GitHub
parent 6fc35e442c
commit 087f58b7c8
32 changed files with 1549 additions and 383 deletions

34
.vscode/launch.json vendored
View File

@@ -99,7 +99,17 @@
"name": "Debug executable target/debug/rustfs",
"type": "lldb",
"request": "launch",
"program": "${workspaceFolder}/target/debug/rustfs",
"cargo": {
"args": [
"run",
"--bin",
"rustfs",
"-j",
"1",
"--profile",
"dev"
]
},
"args": [],
"cwd": "${workspaceFolder}",
//"stopAtEntry": false,
@@ -107,7 +117,7 @@
"env": {
"RUSTFS_ACCESS_KEY": "rustfsadmin",
"RUSTFS_SECRET_KEY": "rustfsadmin",
"RUSTFS_VOLUMES": "./target/volume/test{1...4}",
//"RUSTFS_VOLUMES": "./target/volume/test{1...4}",
"RUSTFS_ADDRESS": ":9000",
"RUSTFS_CONSOLE_ENABLE": "true",
// "RUSTFS_OBS_TRACE_ENDPOINT": "http://127.0.0.1:4318/v1/traces", // jeager otlp http endpoint
@@ -116,11 +126,31 @@
// "RUSTFS_COMPRESS_ENABLE": "true",
"RUSTFS_CONSOLE_ADDRESS": "127.0.0.1:9001",
"RUSTFS_OBS_LOG_DIRECTORY": "./target/logs",
"RUST_LOG":"rustfs=debug,ecstore=debug,s3s=debug,iam=debug",
},
"sourceLanguages": [
"rust"
],
},
{
"type": "lldb",
"request": "launch",
"name": "Debug test_lifecycle_transition_basic",
"cargo": {
"args": [
"test",
"-p",
"rustfs-scanner",
"--test",
"lifecycle_integration_test",
"serial_tests::test_lifecycle_transition_basic",
"-j",
"1"
]
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"name": "Debug executable target/debug/test",
"type": "lldb",

128
Cargo.lock generated
View File

@@ -1225,6 +1225,15 @@ dependencies = [
"num-traits",
]
[[package]]
name = "bincode"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad"
dependencies = [
"serde",
]
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -1529,7 +1538,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3"
dependencies = [
"chrono",
"phf",
"phf 0.12.1",
]
[[package]]
@@ -3196,6 +3205,15 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "780955b8b195a21ab8e4ac6b60dd1dbdcec1dc6c51c0617964b08c81785e12c9"
[[package]]
name = "doxygen-rs"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "415b6ec780d34dcf624666747194393603d0373b7141eef01d12ee58881507d9"
dependencies = [
"phf 0.11.3",
]
[[package]]
name = "dunce"
version = "1.0.5"
@@ -4283,6 +4301,44 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "heed"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a56c94661ddfb51aa9cdfbf102cfcc340aa69267f95ebccc4af08d7c530d393"
dependencies = [
"bitflags 2.10.0",
"byteorder",
"heed-traits",
"heed-types",
"libc",
"lmdb-master-sys",
"once_cell",
"page_size",
"serde",
"synchronoise",
"url",
]
[[package]]
name = "heed-traits"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb3130048d404c57ce5a1ac61a903696e8fcde7e8c2991e9fcfc1f27c3ef74ff"
[[package]]
name = "heed-types"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c255bdf46e07fb840d120a36dcc81f385140d7191c76a7391672675c01a55d"
dependencies = [
"bincode",
"byteorder",
"heed-traits",
"serde",
"serde_json",
]
[[package]]
name = "hermit-abi"
version = "0.5.2"
@@ -5292,6 +5348,17 @@ version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77"
[[package]]
name = "lmdb-master-sys"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "864808e0b19fb6dd3b70ba94ee671b82fce17554cf80aeb0a155c65bb08027df"
dependencies = [
"cc",
"doxygen-rs",
"libc",
]
[[package]]
name = "local-ip-address"
version = "0.6.9"
@@ -6350,13 +6417,55 @@ dependencies = [
"subtle",
]
[[package]]
name = "phf"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078"
dependencies = [
"phf_macros",
"phf_shared 0.11.3",
]
[[package]]
name = "phf"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7"
dependencies = [
"phf_shared",
"phf_shared 0.12.1",
]
[[package]]
name = "phf_generator"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d"
dependencies = [
"phf_shared 0.11.3",
"rand 0.8.5",
]
[[package]]
name = "phf_macros"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f84ac04429c13a7ff43785d75ad27569f2951ce0ffd30a3321230db2fc727216"
dependencies = [
"phf_generator",
"phf_shared 0.11.3",
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "phf_shared"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5"
dependencies = [
"siphasher",
]
[[package]]
@@ -7817,12 +7926,15 @@ dependencies = [
"faster-hex",
"flatbuffers",
"futures",
"futures-util",
"glob",
"google-cloud-auth",
"google-cloud-storage",
"hex-simd",
"hmac 0.13.0-rc.3",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"hyper",
"hyper-rustls",
"hyper-util",
@@ -8201,9 +8313,11 @@ dependencies = [
name = "rustfs-scanner"
version = "0.0.5"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"futures",
"heed",
"http 1.4.0",
"path-clean",
"rand 0.10.0-rc.6",
@@ -8222,6 +8336,7 @@ dependencies = [
"tokio",
"tokio-util",
"tracing",
"uuid",
]
[[package]]
@@ -9556,6 +9671,15 @@ dependencies = [
"futures-core",
]
[[package]]
name = "synchronoise"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dbc01390fc626ce8d1cffe3376ded2b72a11bb70e1c75f404a210e4daa4def2"
dependencies = [
"crossbeam-queue",
]
[[package]]
name = "synstructure"
version = "0.12.6"

View File

@@ -52,6 +52,7 @@ glob = { workspace = true }
thiserror.workspace = true
flatbuffers.workspace = true
futures.workspace = true
futures-util.workspace = true
tracing.workspace = true
serde.workspace = true
time.workspace = true
@@ -60,6 +61,8 @@ serde_json.workspace = true
quick-xml = { workspace = true, features = ["serialize", "async-tokio"] }
s3s.workspace = true
http.workspace = true
http-body = { workspace = true }
http-body-util.workspace = true
url.workspace = true
uuid = { workspace = true, features = ["v4", "fast-rng", "serde"] }
reed-solomon-simd = { workspace = true }

View File

@@ -18,12 +18,10 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use crate::bucket::lifecycle::rule::TransitionOps;
use crate::store_api::ObjectInfo;
use rustfs_filemeta::{ReplicationStatusType, VersionPurgeStatusType};
use s3s::dto::{
BucketLifecycleConfiguration, ExpirationStatus, LifecycleExpiration, LifecycleRule, NoncurrentVersionTransition,
ObjectLockConfiguration, ObjectLockEnabled, RestoreRequest, Transition,
ObjectLockConfiguration, ObjectLockEnabled, RestoreRequest, Transition, TransitionStorageClass,
};
use std::cmp::Ordering;
use std::collections::HashMap;
@@ -35,6 +33,9 @@ use time::{self, Duration, OffsetDateTime};
use tracing::info;
use uuid::Uuid;
use crate::bucket::lifecycle::rule::TransitionOps;
use crate::store_api::ObjectInfo;
pub const TRANSITION_COMPLETE: &str = "complete";
pub const TRANSITION_PENDING: &str = "pending";
const ERR_LIFECYCLE_NO_RULE: &str = "Lifecycle configuration should have at least one rule";
@@ -171,44 +172,51 @@ impl Lifecycle for BucketLifecycleConfiguration {
continue;
}
let rule_prefix = rule.prefix.as_ref().expect("err!");
let rule_prefix = &rule.prefix.clone().unwrap_or_default();
if prefix.len() > 0 && rule_prefix.len() > 0 && !prefix.starts_with(rule_prefix) && !rule_prefix.starts_with(&prefix)
{
continue;
}
let rule_noncurrent_version_expiration = rule.noncurrent_version_expiration.as_ref().expect("err!");
if rule_noncurrent_version_expiration.noncurrent_days.expect("err!") > 0 {
if let Some(rule_noncurrent_version_expiration) = &rule.noncurrent_version_expiration {
if let Some(noncurrent_days) = rule_noncurrent_version_expiration.noncurrent_days {
if noncurrent_days > 0 {
return true;
}
}
if let Some(newer_noncurrent_versions) = rule_noncurrent_version_expiration.newer_noncurrent_versions {
if newer_noncurrent_versions > 0 {
return true;
}
}
}
if rule.noncurrent_version_transitions.is_some() {
return true;
}
if rule_noncurrent_version_expiration.newer_noncurrent_versions.expect("err!") > 0 {
return true;
if let Some(rule_expiration) = &rule.expiration {
if let Some(date1) = rule_expiration.date.clone() {
if OffsetDateTime::from(date1).unix_timestamp() < OffsetDateTime::now_utc().unix_timestamp() {
return true;
}
}
if rule_expiration.date.is_some() {
return true;
}
if let Some(expired_object_delete_marker) = rule_expiration.expired_object_delete_marker
&& expired_object_delete_marker
{
return true;
}
}
if !rule.noncurrent_version_transitions.is_none() {
return true;
if let Some(rule_transitions) = &rule.transitions {
let rule_transitions_0 = rule_transitions[0].clone();
if let Some(date1) = rule_transitions_0.date {
if OffsetDateTime::from(date1).unix_timestamp() < OffsetDateTime::now_utc().unix_timestamp() {
return true;
}
}
}
let rule_expiration = rule.expiration.as_ref().expect("err!");
if !rule_expiration.date.is_none()
&& OffsetDateTime::from(rule_expiration.date.clone().expect("err!")).unix_timestamp()
< OffsetDateTime::now_utc().unix_timestamp()
{
return true;
}
if !rule_expiration.date.is_none() {
return true;
}
if rule_expiration.expired_object_delete_marker.expect("err!") {
return true;
}
let rule_transitions: &[Transition] = &rule.transitions.as_ref().expect("err!");
let rule_transitions_0 = rule_transitions[0].clone();
if !rule_transitions_0.date.is_none()
&& OffsetDateTime::from(rule_transitions_0.date.expect("err!")).unix_timestamp()
< OffsetDateTime::now_utc().unix_timestamp()
{
return true;
}
if !rule.transitions.is_none() {
if rule.transitions.is_some() {
return true;
}
}
@@ -232,7 +240,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
return Err(std::io::Error::other(ERR_LIFECYCLE_BUCKET_LOCKED));
}
}
}
}
}*/
}
for (i, _) in self.rules.iter().enumerate() {
@@ -325,7 +333,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(now),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
@@ -339,7 +347,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
@@ -360,7 +368,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
if now.unix_timestamp() >= due.unix_timestamp() {
events.push(Event {
action: IlmAction::DelMarkerDeleteAllVersionsAction,
rule_id: rule.id.clone().expect("err!"),
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(due),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
@@ -393,7 +401,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
@@ -416,7 +424,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
events.push(Event {
action: IlmAction::TransitionVersionAction,
rule_id: rule.id.clone().expect("err!"),
rule_id: rule.id.clone().unwrap_or_default(),
due,
storage_class: rule.noncurrent_version_transitions.as_ref().unwrap()[0]
.storage_class
@@ -450,7 +458,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
info!("eval_inner: expiration by date - date0={:?}", date0);
events.push(Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(date0),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
@@ -471,7 +479,7 @@ impl Lifecycle for BucketLifecycleConfiguration {
info!("eval_inner: object should expire, adding DeleteAction");
let mut event = Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
rule_id: rule.id.clone().unwrap_or_default(),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
@@ -496,9 +504,14 @@ impl Lifecycle for BucketLifecycleConfiguration {
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
events.push(Event {
action: IlmAction::TransitionAction,
rule_id: rule.id.clone().expect("err!"),
rule_id: rule.id.clone().unwrap_or_default(),
due,
storage_class: transitions[0].storage_class.clone().expect("err!").as_str().to_string(),
storage_class: transitions[0]
.storage_class
.clone()
.unwrap_or(TransitionStorageClass::from_static(""))
.as_str()
.to_string(),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
});
@@ -512,9 +525,10 @@ impl Lifecycle for BucketLifecycleConfiguration {
if events.len() > 0 {
events.sort_by(|a, b| {
if now.unix_timestamp() > a.due.expect("err!").unix_timestamp()
&& now.unix_timestamp() > b.due.expect("err").unix_timestamp()
|| a.due.expect("err").unix_timestamp() == b.due.expect("err").unix_timestamp()
if now.unix_timestamp() > a.due.unwrap_or_else(|| OffsetDateTime::UNIX_EPOCH).unix_timestamp()
&& now.unix_timestamp() > b.due.unwrap_or_else(|| OffsetDateTime::UNIX_EPOCH).unix_timestamp()
|| a.due.unwrap_or_else(|| OffsetDateTime::UNIX_EPOCH).unix_timestamp()
== b.due.unwrap_or_else(|| OffsetDateTime::UNIX_EPOCH).unix_timestamp()
{
match a.action {
IlmAction::DeleteAllVersionsAction
@@ -537,7 +551,9 @@ impl Lifecycle for BucketLifecycleConfiguration {
return Ordering::Less;
}
if a.due.expect("err").unix_timestamp() < b.due.expect("err").unix_timestamp() {
if a.due.unwrap_or_else(|| OffsetDateTime::UNIX_EPOCH).unix_timestamp()
< b.due.unwrap_or_else(|| OffsetDateTime::UNIX_EPOCH).unix_timestamp()
{
return Ordering::Less;
}
return Ordering::Greater;
@@ -558,8 +574,8 @@ impl Lifecycle for BucketLifecycleConfiguration {
}
return Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err"),
noncurrent_days: noncurrent_version_expiration.noncurrent_days.expect("noncurrent_days err.") as u32,
rule_id: rule.id.clone().unwrap_or_default(),
noncurrent_days: noncurrent_version_expiration.noncurrent_days.unwrap_or(0) as u32,
newer_noncurrent_versions: newer_noncurrent_versions as usize,
due: Some(OffsetDateTime::UNIX_EPOCH),
storage_class: "".into(),
@@ -567,8 +583,8 @@ impl Lifecycle for BucketLifecycleConfiguration {
} else {
return Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err"),
noncurrent_days: noncurrent_version_expiration.noncurrent_days.expect("noncurrent_days err.") as u32,
rule_id: rule.id.clone().unwrap_or_default(),
noncurrent_days: noncurrent_version_expiration.noncurrent_days.unwrap_or(0) as u32,
newer_noncurrent_versions: 0,
due: Some(OffsetDateTime::UNIX_EPOCH),
storage_class: "".into(),

View File

@@ -18,8 +18,10 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, StatusCode};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use std::collections::HashMap;
use crate::client::{
@@ -61,9 +63,19 @@ impl TransitionClient {
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
//defer closeResponse(resp)
let resp_status = resp.status();
let h = resp.headers().clone();
//if resp != nil {
if resp.status() != StatusCode::NO_CONTENT && resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
if resp_status != StatusCode::NO_CONTENT && resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
vec![],
bucket_name,
"",
)));
}
//}
Ok(())
@@ -97,8 +109,17 @@ impl TransitionClient {
.await?;
//defer closeResponse(resp)
if resp.status() != StatusCode::NO_CONTENT {
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
let resp_status = resp.status();
let h = resp.headers().clone();
if resp_status != StatusCode::NO_CONTENT {
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
vec![],
bucket_name,
"",
)));
}
Ok(())
@@ -136,7 +157,15 @@ impl TransitionClient {
)
.await?;
let policy = String::from_utf8_lossy(&resp.body().bytes().expect("err").to_vec()).to_string();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let policy = String::from_utf8_lossy(&body_vec).to_string();
Ok(policy)
}
}

View File

@@ -18,12 +18,11 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use http::StatusCode;
use http::{HeaderMap, StatusCode};
use serde::{Deserialize, Serialize};
use serde::{de::Deserializer, ser::Serializer};
use std::fmt::Display;
use s3s::Body;
use s3s::S3ErrorCode;
const _REPORT_ISSUE: &str = "Please report this issue at https://github.com/rustfs/rustfs/issues.";
@@ -95,20 +94,31 @@ pub fn to_error_response(err: &std::io::Error) -> ErrorResponse {
}
pub fn http_resp_to_error_response(
resp: &http::Response<Body>,
resp_status: StatusCode,
h: &HeaderMap,
b: Vec<u8>,
bucket_name: &str,
object_name: &str,
) -> ErrorResponse {
let err_body = String::from_utf8(b).unwrap();
if h.is_empty() || resp_status.is_client_error() || resp_status.is_server_error() {
return ErrorResponse {
status_code: resp_status,
code: S3ErrorCode::ResponseInterrupted,
message: "Invalid HTTP response.".to_string(),
bucket_name: bucket_name.to_string(),
key: object_name.to_string(),
..Default::default()
};
}
let err_resp_ = quick_xml::de::from_str::<ErrorResponse>(&err_body);
let mut err_resp = ErrorResponse::default();
if err_resp_.is_err() {
match resp.status() {
match resp_status {
StatusCode::NOT_FOUND => {
if object_name == "" {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::NoSuchBucket,
message: "The specified bucket does not exist.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -116,7 +126,7 @@ pub fn http_resp_to_error_response(
};
} else {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::NoSuchKey,
message: "The specified key does not exist.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -127,7 +137,7 @@ pub fn http_resp_to_error_response(
}
StatusCode::FORBIDDEN => {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::AccessDenied,
message: "Access Denied.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -137,7 +147,7 @@ pub fn http_resp_to_error_response(
}
StatusCode::CONFLICT => {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::BucketNotEmpty,
message: "Bucket not empty.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -146,7 +156,7 @@ pub fn http_resp_to_error_response(
}
StatusCode::PRECONDITION_FAILED => {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::PreconditionFailed,
message: "Pre condition failed.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -155,13 +165,13 @@ pub fn http_resp_to_error_response(
};
}
_ => {
let mut msg = resp.status().to_string();
let mut msg = resp_status.to_string();
if err_body.len() > 0 {
msg = err_body;
}
err_resp = ErrorResponse {
status_code: resp.status(),
code: S3ErrorCode::Custom(resp.status().to_string().into()),
status_code: resp_status,
code: S3ErrorCode::Custom(resp_status.to_string().into()),
message: msg,
bucket_name: bucket_name.to_string(),
..Default::default()
@@ -171,32 +181,32 @@ pub fn http_resp_to_error_response(
} else {
err_resp = err_resp_.unwrap();
}
err_resp.status_code = resp.status();
if let Some(server_name) = resp.headers().get("Server") {
err_resp.status_code = resp_status;
if let Some(server_name) = h.get("Server") {
err_resp.server = server_name.to_str().expect("err").to_string();
}
let code = resp.headers().get("x-minio-error-code");
let code = h.get("x-minio-error-code");
if code.is_some() {
err_resp.code = S3ErrorCode::Custom(code.expect("err").to_str().expect("err").into());
}
let desc = resp.headers().get("x-minio-error-desc");
let desc = h.get("x-minio-error-desc");
if desc.is_some() {
err_resp.message = desc.expect("err").to_str().expect("err").trim_matches('"').to_string();
}
if err_resp.request_id == "" {
if let Some(x_amz_request_id) = resp.headers().get("x-amz-request-id") {
if let Some(x_amz_request_id) = h.get("x-amz-request-id") {
err_resp.request_id = x_amz_request_id.to_str().expect("err").to_string();
}
}
if err_resp.host_id == "" {
if let Some(x_amz_id_2) = resp.headers().get("x-amz-id-2") {
if let Some(x_amz_id_2) = h.get("x-amz-id-2") {
err_resp.host_id = x_amz_id_2.to_str().expect("err").to_string();
}
}
if err_resp.region == "" {
if let Some(x_amz_bucket_region) = resp.headers().get("x-amz-bucket-region") {
if let Some(x_amz_bucket_region) = h.get("x-amz-bucket-region") {
err_resp.region = x_amz_bucket_region.to_str().expect("err").to_string();
}
}

View File

@@ -19,17 +19,25 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use futures_util::ready;
use http::HeaderMap;
use std::io::Cursor;
use std::io::{Cursor, Error as IoError, ErrorKind as IoErrorKind, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::BufReader;
use tokio_util::io::StreamReader;
use crate::client::{
api_error_response::err_invalid_argument,
api_get_options::GetObjectOptions,
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
};
use futures_util::StreamExt;
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use tokio_util::io::ReaderStream;
impl TransitionClient {
pub fn get_object(&self, bucket_name: &str, object_name: &str, opts: &GetObjectOptions) -> Result<Object, std::io::Error> {
@@ -65,11 +73,19 @@ impl TransitionClient {
)
.await?;
let resp = &resp;
let object_stat = to_object_info(bucket_name, object_name, resp.headers())?;
let b = resp.body().bytes().expect("err").to_vec();
Ok((object_stat, resp.headers().clone(), BufReader::new(Cursor::new(b))))
let h = resp.headers().clone();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
Ok((object_stat, h, BufReader::new(Cursor::new(body_vec))))
}
}

View File

@@ -25,6 +25,7 @@ use crate::client::{
};
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use http_body_util::BodyExt;
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use s3s::dto::Owner;
@@ -83,18 +84,29 @@ impl TransitionClient {
)
.await?;
if resp.status() != http::StatusCode::OK {
let b = resp.body().bytes().expect("err").to_vec();
return Err(std::io::Error::other(http_resp_to_error_response(&resp, b, bucket_name, object_name)));
let resp_status = resp.status();
let h = resp.headers().clone();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut res = match quick_xml::de::from_str::<AccessControlPolicy>(&String::from_utf8(b).unwrap()) {
if resp_status != http::StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
body_vec,
bucket_name,
object_name,
)));
}
let mut res = match quick_xml::de::from_str::<AccessControlPolicy>(&String::from_utf8(body_vec).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));

View File

@@ -18,7 +18,6 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use std::collections::HashMap;
use time::OffsetDateTime;
@@ -28,9 +27,12 @@ use crate::client::{
api_get_object_acl::AccessControlPolicy,
transition_api::{ReaderImpl, RequestMetadata, TransitionClient},
};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use hyper::body::Incoming;
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use s3s::Body;
use s3s::header::{X_AMZ_MAX_PARTS, X_AMZ_OBJECT_ATTRIBUTES, X_AMZ_PART_NUMBER_MARKER, X_AMZ_VERSION_ID};
pub struct ObjectAttributesOptions {
@@ -130,19 +132,12 @@ struct ObjectAttributePart {
}
impl ObjectAttributes {
pub async fn parse_response(&mut self, resp: &mut http::Response<Body>) -> Result<(), std::io::Error> {
let h = resp.headers();
pub async fn parse_response(&mut self, h: &HeaderMap, body_vec: Vec<u8>) -> Result<(), std::io::Error> {
let mod_time = OffsetDateTime::parse(h.get("Last-Modified").unwrap().to_str().unwrap(), ISO8601_DATEFORMAT).unwrap(); //RFC7231Time
self.last_modified = mod_time;
self.version_id = h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap().to_string();
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut response = match quick_xml::de::from_str::<ObjectAttributesResponse>(&String::from_utf8(b).unwrap()) {
let mut response = match quick_xml::de::from_str::<ObjectAttributesResponse>(&String::from_utf8(body_vec).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));
@@ -213,7 +208,8 @@ impl TransitionClient {
)
.await?;
let h = resp.headers();
let resp_status = resp.status();
let h = resp.headers().clone();
let has_etag = h.get("ETag").unwrap().to_str().unwrap();
if !has_etag.is_empty() {
return Err(std::io::Error::other(
@@ -221,14 +217,17 @@ impl TransitionClient {
));
}
if resp.status() != http::StatusCode::OK {
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let err_body = String::from_utf8(b).unwrap();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
if resp_status != http::StatusCode::OK {
let err_body = String::from_utf8(body_vec).unwrap();
let mut er = match quick_xml::de::from_str::<AccessControlPolicy>(&err_body) {
Ok(result) => result,
Err(err) => {
@@ -240,7 +239,7 @@ impl TransitionClient {
}
let mut oa = ObjectAttributes::new();
oa.parse_response(&mut resp).await?;
oa.parse_response(&h, body_vec).await?;
Ok(oa)
}

View File

@@ -27,8 +27,10 @@ use crate::client::{
transition_api::{ReaderImpl, RequestMetadata, TransitionClient},
};
use crate::store_api::BucketInfo;
use bytes::Bytes;
use http::{HeaderMap, StatusCode};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use std::collections::HashMap;
@@ -97,18 +99,30 @@ impl TransitionClient {
},
)
.await?;
let resp_status = resp.status();
let h = resp.headers().clone();
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
vec![],
bucket_name,
"",
)));
}
//let mut list_bucket_result = ListBucketV2Result::default();
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut list_bucket_result = match quick_xml::de::from_str::<ListBucketV2Result>(&String::from_utf8(b).unwrap()) {
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let mut list_bucket_result = match quick_xml::de::from_str::<ListBucketV2Result>(&String::from_utf8(body_vec).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));

View File

@@ -17,8 +17,8 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderName, StatusCode};
use hyper::body::Bytes;
use s3s::S3ErrorCode;
use std::collections::HashMap;
use time::OffsetDateTime;
@@ -225,10 +225,15 @@ impl TransitionClient {
};
let resp = self.execute_method(http::Method::POST, &mut req_metadata).await?;
let resp_status = resp.status();
let h = resp.headers().clone();
//if resp.is_none() {
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
resp_status,
&h,
vec![],
bucket_name,
object_name,
@@ -287,9 +292,14 @@ impl TransitionClient {
};
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
let resp_status = resp.status();
let h = resp.headers().clone();
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
resp_status,
&h,
vec![],
&p.bucket_name.clone(),
&p.object_name,
@@ -370,7 +380,8 @@ impl TransitionClient {
let resp = self.execute_method(http::Method::POST, &mut req_metadata).await?;
let b = resp.body().bytes().expect("err").to_vec();
let h = resp.headers().clone();
let complete_multipart_upload_result: CompleteMultipartUploadResult = CompleteMultipartUploadResult::default();
let (exp_time, rule_id) = if let Some(h_x_amz_expiration) = resp.headers().get(X_AMZ_EXPIRATION) {
@@ -382,7 +393,6 @@ impl TransitionClient {
(OffsetDateTime::now_utc(), "".to_string())
};
let h = resp.headers();
Ok(UploadInfo {
bucket: complete_multipart_upload_result.bucket,
key: complete_multipart_upload_result.key,

View File

@@ -479,9 +479,13 @@ impl TransitionClient {
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
let resp_status = resp.status();
let h = resp.headers().clone();
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
resp_status,
&h,
vec![],
bucket_name,
object_name,

View File

@@ -18,8 +18,10 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderValue, Method, StatusCode};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use rustfs_utils::HashAlgorithm;
use s3s::S3ErrorCode;
use s3s::dto::ReplicationStatus;
@@ -344,8 +346,15 @@ impl TransitionClient {
)
.await?;
let body_bytes: Vec<u8> = resp.body().bytes().expect("err").to_vec();
process_remove_multi_objects_response(ReaderImpl::Body(Bytes::from(body_bytes)), result_tx.clone());
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
process_remove_multi_objects_response(ReaderImpl::Body(Bytes::from(body_vec)), result_tx.clone());
}
Ok(())
}
@@ -390,6 +399,10 @@ impl TransitionClient {
},
)
.await?;
let resp_status = resp.status();
let h = resp.headers().clone();
//if resp.is_some() {
if resp.status() != StatusCode::NO_CONTENT {
let error_response: ErrorResponse;
@@ -426,7 +439,8 @@ impl TransitionClient {
}
_ => {
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
resp_status,
&h,
vec![],
bucket_name,
object_name,

View File

@@ -24,8 +24,10 @@ use crate::client::{
api_get_options::GetObjectOptions,
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
};
use bytes::Bytes;
use http::HeaderMap;
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use s3s::dto::RestoreRequest;
use std::collections::HashMap;
use std::io::Cursor;
@@ -107,9 +109,25 @@ impl TransitionClient {
)
.await?;
let b = resp.body().bytes().expect("err").to_vec();
if resp.status() != http::StatusCode::ACCEPTED && resp.status() != http::StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(&resp, b, bucket_name, "")));
let resp_status = resp.status();
let h = resp.headers().clone();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
if resp_status != http::StatusCode::ACCEPTED && resp_status != http::StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
body_vec,
bucket_name,
"",
)));
}
Ok(())
}

View File

@@ -18,8 +18,10 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use std::{collections::HashMap, str::FromStr};
use tokio::io::BufReader;
@@ -66,10 +68,20 @@ impl TransitionClient {
return Ok(false);
}
let b = resp.body().bytes().expect("err").to_vec();
let resperr = http_resp_to_error_response(&resp, b, bucket_name, "");
let resp_status = resp.status();
let h = resp.headers().clone();
warn!("bucket exists, resp: {:?}, resperr: {:?}", resp, resperr);
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let resperr = http_resp_to_error_response(resp_status, &h, body_vec, bucket_name, "");
warn!("bucket exists, resperr: {:?}", resperr);
/*if to_error_response(resperr).code == "NoSuchBucket" {
return Ok(false);
}
@@ -108,10 +120,20 @@ impl TransitionClient {
match resp {
Ok(resp) => {
let b = resp.body().bytes().expect("get bucket versioning err").to_vec();
let resperr = http_resp_to_error_response(&resp, b, bucket_name, "");
let resp_status = resp.status();
let h = resp.headers().clone();
warn!("get bucket versioning, resp: {:?}, resperr: {:?}", resp, resperr);
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let resperr = http_resp_to_error_response(resp_status, &h, body_vec, bucket_name, "");
warn!("get bucket versioning, resperr: {:?}", resperr);
Ok(VersioningConfiguration::default())
}

View File

@@ -25,10 +25,13 @@ use crate::client::{
transition_api::{CreateBucketConfiguration, LocationConstraint, TransitionClient},
};
use http::Request;
use http_body_util::BodyExt;
use hyper::StatusCode;
use hyper::body::Body;
use hyper::body::Bytes;
use hyper::body::Incoming;
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use s3s::Body;
use s3s::S3ErrorCode;
use std::collections::HashMap;
@@ -86,7 +89,7 @@ impl TransitionClient {
Ok(location)
}
fn get_bucket_location_request(&self, bucket_name: &str) -> Result<http::Request<Body>, std::io::Error> {
fn get_bucket_location_request(&self, bucket_name: &str) -> Result<http::Request<s3s::Body>, std::io::Error> {
let mut url_values = HashMap::new();
url_values.insert("location".to_string(), "".to_string());
@@ -120,7 +123,11 @@ impl TransitionClient {
url_str = target_url.to_string();
}
let Ok(mut req) = Request::builder().method(http::Method::GET).uri(url_str).body(Body::empty()) else {
let Ok(mut req) = Request::builder()
.method(http::Method::GET)
.uri(url_str)
.body(s3s::Body::empty())
else {
return Err(std::io::Error::other("create request error"));
};
@@ -172,13 +179,16 @@ impl TransitionClient {
}
async fn process_bucket_location_response(
mut resp: http::Response<Body>,
mut resp: http::Response<Incoming>,
bucket_name: &str,
tier_type: &str,
) -> Result<String, std::io::Error> {
//if resp != nil {
if resp.status() != StatusCode::OK {
let err_resp = http_resp_to_error_response(&resp, vec![], bucket_name, "");
let resp_status = resp.status();
let h = resp.headers().clone();
let err_resp = http_resp_to_error_response(resp_status, &h, vec![], bucket_name, "");
match err_resp.code {
S3ErrorCode::NotImplemented => {
match err_resp.server.as_str() {
@@ -208,18 +218,22 @@ async fn process_bucket_location_response(
}
//}
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let mut location = "".to_string();
if tier_type == "huaweicloud" {
let d = quick_xml::de::from_str::<CreateBucketConfiguration>(&String::from_utf8(b).unwrap()).unwrap();
let d = quick_xml::de::from_str::<CreateBucketConfiguration>(&String::from_utf8(body_vec).unwrap()).unwrap();
location = d.location_constraint;
} else {
if let Ok(LocationConstraint { field }) = quick_xml::de::from_str::<LocationConstraint>(&String::from_utf8(b).unwrap()) {
if let Ok(LocationConstraint { field }) =
quick_xml::de::from_str::<LocationConstraint>(&String::from_utf8(body_vec).unwrap())
{
location = field;
}
}

View File

@@ -118,38 +118,51 @@ pub fn new_getobjectreader<'a>(
let mut is_encrypted = false;
let is_compressed = false; //oi.is_compressed_ok();
let mut rs_ = None;
let rs_;
if rs.is_none() && opts.part_number.is_some() && opts.part_number.unwrap() > 0 {
rs_ = part_number_to_rangespec(oi.clone(), opts.part_number.unwrap());
} else {
rs_ = rs.clone();
}
let mut get_fn: ObjReaderFn;
let (off, length) = match rs_.unwrap().get_offset_length(oi.size) {
Ok(x) => x,
Err(err) => {
return Err(ErrorResponse {
code: S3ErrorCode::InvalidRange,
message: err.to_string(),
key: None,
bucket_name: None,
region: None,
request_id: None,
host_id: "".to_string(),
});
}
};
get_fn = Arc::new(move |input_reader: BufReader<Cursor<Vec<u8>>>, _: HeaderMap| {
//Box::pin({
let r = GetObjectReader {
object_info: oi.clone(),
stream: Box::new(input_reader),
if let Some(rs_) = rs_ {
let (off, length) = match rs_.get_offset_length(oi.size) {
Ok(x) => x,
Err(err) => {
return Err(ErrorResponse {
code: S3ErrorCode::InvalidRange,
message: err.to_string(),
key: None,
bucket_name: None,
region: None,
request_id: None,
host_id: "".to_string(),
});
}
};
r
//})
});
get_fn = Arc::new(move |input_reader: BufReader<Cursor<Vec<u8>>>, _: HeaderMap| {
//Box::pin({
let r = GetObjectReader {
object_info: oi.clone(),
stream: Box::new(input_reader),
};
r
//})
});
Ok((get_fn, off as i64, length as i64))
return Ok((get_fn, off as i64, length as i64));
}
Err(ErrorResponse {
code: S3ErrorCode::InvalidRange,
message: "Invalid range".to_string(),
key: Some(oi.name.clone()),
bucket_name: Some(oi.bucket.clone()),
region: Some("".to_string()),
request_id: None,
host_id: "".to_string(),
})
}
/// Convert a raw stored ETag into the strongly-typed `s3s::dto::ETag`.

View File

@@ -20,6 +20,7 @@
use crate::client::bucket_cache::BucketLocationCache;
use crate::client::{
api_error_response::ErrorResponse,
api_error_response::{err_invalid_argument, http_resp_to_error_response, to_error_response},
api_get_options::GetObjectOptions,
api_put_object::PutObjectOptions,
@@ -32,13 +33,16 @@ use crate::client::{
credentials::{CredContext, Credentials, SignatureType, Static},
};
use crate::{client::checksum::ChecksumMode, store_api::GetObjectReader};
use bytes::Bytes;
use futures::{Future, StreamExt};
use http::{HeaderMap, HeaderName};
use http::{
HeaderValue, Response, StatusCode,
request::{Builder, Request},
};
use http_body::Body;
use http_body_util::BodyExt;
use hyper::body::Bytes;
use hyper::body::Incoming;
use hyper_rustls::{ConfigBuilderExt, HttpsConnector};
use hyper_util::{client::legacy::Client, client::legacy::connect::HttpConnector, rt::TokioExecutor};
use md5::Digest;
@@ -54,8 +58,8 @@ use rustfs_utils::{
},
};
use s3s::S3ErrorCode;
use s3s::dto::Owner;
use s3s::dto::ReplicationStatus;
use s3s::{Body, dto::Owner};
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use std::io::Cursor;
@@ -95,7 +99,7 @@ pub struct TransitionClient {
pub creds_provider: Arc<Mutex<Credentials<Static>>>,
pub override_signer_type: SignatureType,
pub secure: bool,
pub http_client: Client<HttpsConnector<HttpConnector>, Body>,
pub http_client: Client<HttpsConnector<HttpConnector>, s3s::Body>,
pub bucket_loc_cache: Arc<Mutex<BucketLocationCache>>,
pub is_trace_enabled: Arc<Mutex<bool>>,
pub trace_errors_only: Arc<Mutex<bool>>,
@@ -271,7 +275,7 @@ impl TransitionClient {
todo!();
}
fn dump_http(&self, req: &http::Request<Body>, resp: &http::Response<Body>) -> Result<(), std::io::Error> {
fn dump_http(&self, req: &http::Request<s3s::Body>, resp: &http::Response<Incoming>) -> Result<(), std::io::Error> {
let mut resp_trace: Vec<u8>;
//info!("{}{}", self.trace_output, "---------BEGIN-HTTP---------");
@@ -280,7 +284,7 @@ impl TransitionClient {
Ok(())
}
pub async fn doit(&self, req: http::Request<Body>) -> Result<http::Response<Body>, std::io::Error> {
pub async fn doit(&self, req: http::Request<s3s::Body>) -> Result<http::Response<Incoming>, std::io::Error> {
let req_method;
let req_uri;
let req_headers;
@@ -295,9 +299,7 @@ impl TransitionClient {
debug!("endpoint_url: {}", self.endpoint_url.as_str().to_string());
resp = http_client.request(req);
}
let resp = resp
.await /*.map_err(Into::into)*/
.map(|res| res.map(Body::from));
let resp = resp.await;
debug!("http_client url: {} {}", req_method, req_uri);
debug!("http_client headers: {:?}", req_headers);
if let Err(err) = resp {
@@ -305,7 +307,7 @@ impl TransitionClient {
return Err(std::io::Error::other(err));
}
let mut resp = resp.unwrap();
let resp = resp.unwrap();
debug!("http_resp: {:?}", resp);
//let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec();
@@ -314,23 +316,27 @@ impl TransitionClient {
//if self.is_trace_enabled && !(self.trace_errors_only && resp.status() == StatusCode::OK) {
if resp.status() != StatusCode::OK {
//self.dump_http(&cloned_req, &resp)?;
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
warn!("err_body: {}", String::from_utf8(b).unwrap());
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let body_str = String::from_utf8_lossy(&body_vec);
warn!("err_body: {}", body_str);
Err(std::io::Error::other(format!("http_client call error: {}", body_str)))
} else {
Ok(resp)
}
Ok(resp)
}
pub async fn execute_method(
&self,
method: http::Method,
metadata: &mut RequestMetadata,
) -> Result<http::Response<Body>, std::io::Error> {
) -> Result<http::Response<Incoming>, std::io::Error> {
if self.is_offline() {
let mut s = self.endpoint_url.to_string();
s.push_str(" is offline.");
@@ -340,7 +346,7 @@ impl TransitionClient {
let retryable: bool;
//let mut body_seeker: BufferReader;
let mut req_retry = self.max_retries;
let mut resp: http::Response<Body>;
let mut resp: http::Response<Incoming>;
//if metadata.content_body != nil {
//body_seeker = BufferReader::new(metadata.content_body.read_all().await?);
@@ -362,13 +368,19 @@ impl TransitionClient {
}
}
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut err_response = http_resp_to_error_response(&resp, b.clone(), &metadata.bucket_name, &metadata.object_name);
let resp_status = resp.status();
let h = resp.headers().clone();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let mut err_response =
http_resp_to_error_response(resp_status, &h, body_vec.clone(), &metadata.bucket_name, &metadata.object_name);
err_response.message = format!("remote tier error: {}", err_response.message);
if self.region == "" {
@@ -404,7 +416,7 @@ impl TransitionClient {
continue;
}
if is_http_status_retryable(&resp.status()) {
if is_http_status_retryable(&resp_status) {
continue;
}
@@ -418,7 +430,7 @@ impl TransitionClient {
&self,
method: &http::Method,
metadata: &mut RequestMetadata,
) -> Result<http::Request<Body>, std::io::Error> {
) -> Result<http::Request<s3s::Body>, std::io::Error> {
let mut location = metadata.bucket_location.clone();
if location == "" && metadata.bucket_name != "" {
location = self.get_bucket_location(&metadata.bucket_name).await?;
@@ -438,7 +450,7 @@ impl TransitionClient {
let Ok(mut req) = Request::builder()
.method(method)
.uri(target_url.to_string())
.body(Body::empty())
.body(s3s::Body::empty())
else {
return Err(std::io::Error::other("create request error"));
};
@@ -550,10 +562,10 @@ impl TransitionClient {
if metadata.content_length > 0 {
match &mut metadata.content_body {
ReaderImpl::Body(content_body) => {
*req.body_mut() = Body::from(content_body.clone());
*req.body_mut() = s3s::Body::from(content_body.clone());
}
ReaderImpl::ObjectBody(content_body) => {
*req.body_mut() = Body::from(content_body.read_all().await?);
*req.body_mut() = s3s::Body::from(content_body.read_all().await?);
}
}
}
@@ -561,7 +573,7 @@ impl TransitionClient {
Ok(req)
}
pub fn set_user_agent(&self, req: &mut Request<Body>) {
pub fn set_user_agent(&self, req: &mut Request<s3s::Body>) {
let headers = req.headers_mut();
headers.insert("User-Agent", C_USER_AGENT.parse().expect("err"));
}
@@ -999,25 +1011,217 @@ impl Default for UploadInfo {
}
}
/// Convert HTTP headers to ObjectInfo struct
/// This function parses various S3 response headers to construct an ObjectInfo struct
/// containing metadata about an S3 object.
pub fn to_object_info(bucket_name: &str, object_name: &str, h: &HeaderMap) -> Result<ObjectInfo, std::io::Error> {
todo!()
// Helper function to get header value as string
let get_header = |name: &str| -> String { h.get(name).and_then(|val| val.to_str().ok()).unwrap_or("").to_string() };
// Get and process the ETag
let etag = {
let etag_raw = get_header("ETag");
// Remove surrounding quotes if present (trimming ETag)
let trimmed = etag_raw.trim_start_matches('"').trim_end_matches('"');
Some(trimmed.to_string())
};
// Parse content length if it exists
let size = {
let content_length_str = get_header("Content-Length");
if !content_length_str.is_empty() {
content_length_str
.parse::<i64>()
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Content-Length is not an integer"))?
} else {
-1
}
};
// Parse Last-Modified time
let mod_time = {
let last_modified_str = get_header("Last-Modified");
if !last_modified_str.is_empty() {
// Parse HTTP date format (RFC 7231)
// Using time crate to parse HTTP dates
let parsed_time = OffsetDateTime::parse(&last_modified_str, &time::format_description::well_known::Rfc2822)
.or_else(|_| OffsetDateTime::parse(&last_modified_str, &time::format_description::well_known::Rfc3339))
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Last-Modified time format is invalid"))?;
Some(parsed_time)
} else {
Some(OffsetDateTime::now_utc())
}
};
// Get content type
let content_type = {
let content_type_raw = get_header("Content-Type");
let content_type_trimmed = content_type_raw.trim();
if content_type_trimmed.is_empty() {
Some("application/octet-stream".to_string())
} else {
Some(content_type_trimmed.to_string())
}
};
// Parse Expires time
let expiration = {
let expiry_str = get_header("Expires");
if !expiry_str.is_empty() {
OffsetDateTime::parse(&expiry_str, &time::format_description::well_known::Rfc2822)
.or_else(|_| OffsetDateTime::parse(&expiry_str, &time::format_description::well_known::Rfc3339))
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "'Expires' is not in supported format"))?
} else {
OffsetDateTime::now_utc()
}
};
// Extract user metadata (headers prefixed with "X-Amz-Meta-")
let user_metadata = {
let mut meta = HashMap::new();
for (name, value) in h.iter() {
let header_name = name.as_str().to_lowercase();
if header_name.starts_with("x-amz-meta-") {
let key = header_name.strip_prefix("x-amz-meta-").unwrap().to_string();
if let Ok(value_str) = value.to_str() {
meta.insert(key, value_str.to_string());
}
}
}
meta
};
let user_tag = {
let user_tag_str = get_header("X-Amz-Tagging");
user_tag_str
};
// Extract user tags count
let user_tag_count = {
let count_str = get_header("x-amz-tagging-count");
if !count_str.is_empty() {
count_str
.parse::<usize>()
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "x-amz-tagging-count is not an integer"))?
} else {
0
}
};
// Handle restore info
let restore = {
let restore_hdr = get_header("x-amz-restore");
if !restore_hdr.is_empty() {
// Simplified restore header parsing - in real implementation, this would parse the specific format
// "ongoing-request=\"true\"" or "ongoing-request=\"false\", expiry-date=\"..."
let ongoing_restore = restore_hdr.contains("ongoing-request=\"true\"");
RestoreInfo {
ongoing_restore,
expiry_time: if ongoing_restore {
OffsetDateTime::now_utc()
} else {
// Try to extract expiry date from the header
// This is simplified - real parsing would be more complex
OffsetDateTime::now_utc()
},
}
} else {
RestoreInfo::default()
}
};
// Extract version ID
let version_id = {
let version_id_str = get_header("x-amz-version-id");
if !version_id_str.is_empty() {
Some(Uuid::parse_str(&version_id_str).unwrap_or_else(|_| Uuid::nil()))
} else {
None
}
};
// Check if it's a delete marker
let is_delete_marker = get_header("x-amz-delete-marker") == "true";
// Get replication status
let replication_status = {
let status_str = get_header("x-amz-replication-status");
ReplicationStatus::from_static(match status_str.as_str() {
"COMPLETE" => ReplicationStatus::COMPLETE,
"PENDING" => ReplicationStatus::PENDING,
"FAILED" => ReplicationStatus::FAILED,
"REPLICA" => ReplicationStatus::REPLICA,
_ => ReplicationStatus::PENDING,
})
};
// Extract expiration rule ID and time (simplified)
let (expiration_time, expiration_rule_id) = {
// In a real implementation, this would parse the x-amz-expiration header
// which typically has format: "expiry-date="Fri, 11 Dec 2020 00:00:00 GMT", rule-id="myrule""
let exp_header = get_header("x-amz-expiration");
if !exp_header.is_empty() {
// Simplified parsing - real implementation would be more thorough
(OffsetDateTime::now_utc(), exp_header) // Placeholder
} else {
(OffsetDateTime::now_utc(), "".to_string())
}
};
// Extract checksums
let checksum_crc32 = get_header("x-amz-checksum-crc32");
let checksum_crc32c = get_header("x-amz-checksum-crc32c");
let checksum_sha1 = get_header("x-amz-checksum-sha1");
let checksum_sha256 = get_header("x-amz-checksum-sha256");
let checksum_crc64nvme = get_header("x-amz-checksum-crc64nvme");
let checksum_mode = get_header("x-amz-checksum-mode");
// Build and return the ObjectInfo struct
Ok(ObjectInfo {
etag,
name: object_name.to_string(),
mod_time,
size,
content_type,
metadata: h.clone(),
user_metadata,
user_tags: "".to_string(), // Tags would need separate parsing
user_tag_count,
owner: Owner::default(),
storage_class: get_header("x-amz-storage-class"),
is_latest: true, // Would be determined by versioning settings
is_delete_marker,
version_id,
replication_status,
replication_ready: false, // Would be computed based on status
expiration: expiration_time,
expiration_rule_id,
num_versions: 1, // Would be determined by versioning
restore,
checksum_crc32,
checksum_crc32c,
checksum_sha1,
checksum_sha256,
checksum_crc64nvme,
checksum_mode,
})
}
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
//#[derive(Clone)]
pub struct SendRequest {
inner: hyper::client::conn::http1::SendRequest<Body>,
inner: hyper::client::conn::http1::SendRequest<s3s::Body>,
}
impl From<hyper::client::conn::http1::SendRequest<Body>> for SendRequest {
fn from(inner: hyper::client::conn::http1::SendRequest<Body>) -> Self {
impl From<hyper::client::conn::http1::SendRequest<s3s::Body>> for SendRequest {
fn from(inner: hyper::client::conn::http1::SendRequest<s3s::Body>) -> Self {
Self { inner }
}
}
impl tower::Service<Request<Body>> for SendRequest {
type Response = Response<Body>;
impl tower::Service<Request<s3s::Body>> for SendRequest {
type Response = Response<Incoming>;
type Error = std::io::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
@@ -1025,13 +1229,13 @@ impl tower::Service<Request<Body>> for SendRequest {
self.inner.poll_ready(cx).map_err(std::io::Error::other)
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
fn call(&mut self, req: Request<s3s::Body>) -> Self::Future {
//let req = hyper::Request::builder().uri("/").body(http_body_util::Empty::<Bytes>::new()).unwrap();
//let req = hyper::Request::builder().uri("/").body(Body::empty()).unwrap();
let fut = self.inner.send_request(req);
Box::pin(async move { fut.await.map_err(std::io::Error::other).map(|res| res.map(Body::from)) })
Box::pin(async move { fut.await.map_err(std::io::Error::other) })
}
}

View File

@@ -3672,7 +3672,11 @@ impl ObjectIO for SetDisks {
}
if object_info.is_remote() {
let gr = get_transitioned_object_reader(bucket, object, &range, &h, &object_info, opts).await?;
let mut opts = opts.clone();
if object_info.parts.len() == 1 {
opts.part_number = Some(1);
}
let gr = get_transitioned_object_reader(bucket, object, &range, &h, &object_info, &opts).await?;
return Ok(gr);
}
@@ -4794,10 +4798,18 @@ impl StorageAPI for SetDisks {
// Normalize ETags by removing quotes before comparison (PR #592 compatibility)
let transition_etag = rustfs_utils::path::trim_etag(&opts.transition.etag);
let stored_etag = rustfs_utils::path::trim_etag(&get_raw_etag(&fi.metadata));
if opts.mod_time.expect("err").unix_timestamp() != fi.mod_time.as_ref().expect("err").unix_timestamp()
|| transition_etag != stored_etag
{
return Err(to_object_err(Error::other(DiskError::FileNotFound), vec![bucket, object]));
if let Some(mod_time1) = opts.mod_time {
if let Some(mod_time2) = fi.mod_time.as_ref() {
if mod_time1.unix_timestamp() != mod_time2.unix_timestamp()
/*|| transition_etag != stored_etag*/
{
return Err(to_object_err(Error::other(DiskError::FileNotFound), vec![bucket, object]));
}
} else {
return Err(Error::other("mod_time 2 error.".to_string()));
}
} else {
return Err(Error::other("mod_time 1 error.".to_string()));
}
if fi.transition_status == TRANSITION_COMPLETE {
return Ok(());
@@ -4927,8 +4939,10 @@ impl StorageAPI for SetDisks {
oi = ObjectInfo::from_file_info(&actual_fi, bucket, object, opts.versioned || opts.version_suspended);
let ropts = put_restore_opts(bucket, object, &opts.transition.restore_request, &oi).await?;
if oi.parts.len() == 1 {
let mut opts = opts.clone();
opts.part_number = Some(1);
let rs: Option<HTTPRangeSpec> = None;
let gr = get_transitioned_object_reader(bucket, object, &rs, &HeaderMap::new(), &oi, opts).await;
let gr = get_transitioned_object_reader(bucket, object, &rs, &HeaderMap::new(), &oi, &opts).await;
if let Err(err) = gr {
return set_restore_header_fn(&mut oi, Some(to_object_err(err.into(), vec![bucket, object]))).await;
}

View File

@@ -807,12 +807,11 @@ impl ObjectInfo {
let mut restore_ongoing = false;
let mut restore_expires = None;
if let Some(restore_status) = fi.metadata.get(AMZ_RESTORE).cloned() {
//
if let Ok(restore_status) = parse_restore_obj_status(&restore_status) {
restore_ongoing = restore_status.on_going();
restore_expires = restore_status.expiry();
}
if let Some(restore_status) = fi.metadata.get(AMZ_RESTORE).cloned()
&& let Ok(restore_status) = parse_restore_obj_status(&restore_status)
{
restore_ongoing = restore_status.on_going();
restore_expires = restore_status.expiry();
}
// Convert parts from rustfs_filemeta::ObjectPartInfo to store_api::ObjectPartInfo

View File

@@ -30,6 +30,8 @@ use crate::client::{
transition_api::{Options, TransitionClient, TransitionCore},
transition_api::{ReadCloser, ReaderImpl},
};
use crate::error::ErrorResponse;
use crate::error::error_resp_to_object_err;
use crate::tier::{
tier_config::TierS3,
warm_backend::{WarmBackend, WarmBackendGetOpts},

View File

@@ -27,7 +27,14 @@ use aws_sdk_s3::Client;
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::primitives::ByteStream;
use crate::client::transition_api::{ReadCloser, ReaderImpl};
use crate::client::{
api_get_options::GetObjectOptions,
api_put_object::PutObjectOptions,
api_remove::RemoveObjectOptions,
transition_api::{ReadCloser, ReaderImpl},
};
use crate::error::ErrorResponse;
use crate::error::error_resp_to_object_err;
use crate::tier::{
tier_config::TierS3,
warm_backend::{WarmBackend, WarmBackendGetOpts},

View File

@@ -22,6 +22,7 @@ use s3s::header::X_AMZ_RESTORE;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
use time::{format_description::FormatItem, macros::format_description};
use uuid::Uuid;
pub const ERASURE_ALGORITHM: &str = "rs-vandermonde";
@@ -37,6 +38,9 @@ pub const TIER_SKIP_FV_ID: &str = "tier-skip-fvid";
const ERR_RESTORE_HDR_MALFORMED: &str = "x-amz-restore header malformed";
const RFC1123: &[FormatItem<'_>] =
format_description!("[weekday repr:short], [day] [month repr:short] [year] [hour]:[minute]:[second] GMT");
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
pub struct ObjectPartInfo {
pub etag: String,
@@ -580,6 +584,7 @@ pub trait RestoreStatusOps {
fn on_going(&self) -> bool;
fn on_disk(&self) -> bool;
fn to_string(&self) -> String;
fn to_string2(&self) -> String;
}
impl RestoreStatusOps for RestoreStatus {
@@ -618,6 +623,18 @@ impl RestoreStatusOps for RestoreStatus {
.unwrap()
)
}
fn to_string2(&self) -> String {
if self.on_going() {
return "ongoing-request=\"true\"".to_string();
}
format!(
"ongoing-request=\"false\", expiry-date=\"{}\"",
OffsetDateTime::from(self.restore_expiry_date.clone().unwrap())
.format(&RFC1123)
.unwrap()
)
}
}
pub fn parse_restore_obj_status(restore_hdr: &str) -> Result<RestoreStatus> {
@@ -650,10 +667,8 @@ pub fn parse_restore_obj_status(restore_hdr: &str) -> Result<RestoreStatus> {
if expiry_tokens[0].trim() != "expiry-date" {
return Err(Error::other(ERR_RESTORE_HDR_MALFORMED));
}
let expiry = OffsetDateTime::parse(expiry_tokens[1].trim_matches('"'), &Rfc3339).unwrap();
/*if err != nil {
return Err(Error::other(ERR_RESTORE_HDR_MALFORMED));
}*/
let expiry = OffsetDateTime::parse(expiry_tokens[1].trim_matches('"'), &Rfc3339)
.map_err(|_| Error::other(ERR_RESTORE_HDR_MALFORMED))?;
return Ok(RestoreStatus {
is_restore_in_progress: Some(false),
restore_expiry_date: Some(Timestamp::from(expiry)),

View File

@@ -14,8 +14,8 @@
use crate::{
ErasureAlgo, ErasureInfo, Error, FileInfo, FileInfoVersions, InlineData, ObjectPartInfo, RawFileInfo, ReplicationState,
ReplicationStatusType, Result, TIER_FV_ID, TIER_FV_MARKER, VersionPurgeStatusType, replication_statuses_map,
version_purge_statuses_map,
ReplicationStatusType, Result, TIER_FV_ID, TIER_FV_MARKER, VersionPurgeStatusType, is_restored_object_on_disk,
replication_statuses_map, version_purge_statuses_map,
};
use byteorder::ByteOrder;
use bytes::Bytes;
@@ -462,7 +462,7 @@ impl FileMeta {
self.versions
.iter()
.filter(|v| {
v.header.version_type == VersionType::Object && v.header.version_id != Some(vid) && v.header.user_data_dir()
v.header.version_type == VersionType::Object && v.header.version_id != Some(vid) && v.header.uses_data_dir()
})
.map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).unwrap_or_default())
.filter(|v| v == data_dir)
@@ -1201,10 +1201,11 @@ impl FileMeta {
.filter(|v| {
v.header.version_type == VersionType::Object
&& v.header.version_id != Some(version_id)
&& v.header.user_data_dir()
&& v.header.uses_data_dir()
})
.filter_map(|v| FileMetaVersion::decode_data_dir_from_meta(&v.meta).ok())
.filter(|&dir| dir == data_dir)
.filter(|&dir| dir.is_none() || dir != data_dir)
//.filter(|&dir| dir != data_dir)
.count()
}
@@ -1588,7 +1589,7 @@ impl FileMetaVersionHeader {
false
}
pub fn user_data_dir(&self) -> bool {
pub fn uses_data_dir(&self) -> bool {
self.flags & Flags::UsesDataDir as u8 != 0
}
@@ -1961,7 +1962,15 @@ impl MetaObject {
}
pub fn uses_data_dir(&self) -> bool {
!self.inlinedata()
if let Some(status) = self
.meta_sys
.get(&format!("{RESERVED_METADATA_PREFIX_LOWER}{TRANSITION_STATUS}"))
&& *status == TRANSITION_COMPLETE.as_bytes().to_vec()
{
return false;
}
is_restored_object_on_disk(&self.meta_user)
}
pub fn inlinedata(&self) -> bool {

View File

@@ -38,6 +38,8 @@ tracing = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
anyhow = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
time = { workspace = true }
@@ -53,3 +55,8 @@ rand = { workspace = true }
s3s = { workspace = true }
[dev-dependencies]
tokio-test = { workspace = true }
tracing-subscriber = { workspace = true }
tempfile = { workspace = true }
serial_test = { workspace = true }
heed = { version = "0.22.0" }

View File

@@ -0,0 +1,446 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_ecstore::{
bucket::metadata::BUCKET_LIFECYCLE_CONFIG,
bucket::metadata_sys,
disk::endpoint::Endpoint,
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
global::GLOBAL_TierConfigMgr,
store::ECStore,
store_api::{MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
tier::tier_config::{TierConfig, TierMinIO, TierType},
};
use rustfs_scanner::scanner::init_data_scanner;
use serial_test::serial;
use std::{
path::PathBuf,
sync::{Arc, Once, OnceLock},
time::Duration,
};
use tokio::fs;
use tokio_util::sync::CancellationToken;
use tracing::info;
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
static INIT: Once = Once::new();
fn init_tracing() {
INIT.call_once(|| {
let _ = tracing_subscriber::fmt::try_init();
});
}
/// Test helper: Create test environment with ECStore
async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>) {
init_tracing();
// Fast path: already initialized, just clone and return
if let Some((paths, ecstore)) = GLOBAL_ENV.get() {
return (paths.clone(), ecstore.clone());
}
// create temp dir as 4 disks with unique base dir
let test_base_dir = format!("/tmp/rustfs_scanner_lifecycle_test_{}", uuid::Uuid::new_v4());
let temp_dir = std::path::PathBuf::from(&test_base_dir);
if temp_dir.exists() {
fs::remove_dir_all(&temp_dir).await.ok();
}
fs::create_dir_all(&temp_dir).await.unwrap();
// create 4 disk dirs
let disk_paths = vec![
temp_dir.join("disk1"),
temp_dir.join("disk2"),
temp_dir.join("disk3"),
temp_dir.join("disk4"),
];
for disk_path in &disk_paths {
fs::create_dir_all(disk_path).await.unwrap();
}
// create EndpointServerPools
let mut endpoints = Vec::new();
for (i, disk_path) in disk_paths.iter().enumerate() {
let mut endpoint = Endpoint::try_from(disk_path.to_str().unwrap()).unwrap();
// set correct index
endpoint.set_pool_index(0);
endpoint.set_set_index(0);
endpoint.set_disk_index(i);
endpoints.push(endpoint);
}
let pool_endpoints = PoolEndpoints {
legacy: false,
set_count: 1,
drives_per_set: 4,
endpoints: Endpoints::from(endpoints),
cmd_line: "test".to_string(),
platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH),
};
let endpoint_pools = EndpointServerPools(vec![pool_endpoints]);
// format disks (only first time)
rustfs_ecstore::store::init_local_disks(endpoint_pools.clone()).await.unwrap();
// create ECStore with dynamic port 0 (let OS assign) or fixed 9002 if free
let port = 9002; // for simplicity
let server_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let ecstore = ECStore::new(server_addr, endpoint_pools, CancellationToken::new())
.await
.unwrap();
// init bucket metadata system
let buckets_list = ecstore
.list_bucket(&rustfs_ecstore::store_api::BucketOptions {
no_metadata: true,
..Default::default()
})
.await
.unwrap();
let buckets = buckets_list.into_iter().map(|v| v.name).collect();
rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await;
// Initialize background expiry workers
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::init_background_expiry(ecstore.clone()).await;
// Store in global once lock
let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone()));
(disk_paths, ecstore)
}
/// Test helper: Create a test bucket
#[allow(dead_code)]
async fn create_test_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
(**ecstore)
.make_bucket(bucket_name, &Default::default())
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Create a test lock bucket
async fn create_test_lock_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
(**ecstore)
.make_bucket(
bucket_name,
&MakeBucketOptions {
lock_enabled: true,
versioning_enabled: true,
..Default::default()
},
)
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Upload test object
async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str, data: &[u8]) {
let mut reader = PutObjReader::from_vec(data.to_vec());
let object_info = (**ecstore)
.put_object(bucket, object, &mut reader, &ObjectOptions::default())
.await
.expect("Failed to upload test object");
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
}
/// Test helper: Set bucket lifecycle configuration
#[allow(dead_code)]
async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<ID>test-rule</ID>
<Status>Enabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<Expiration>
<Days>0</Days>
</Expiration>
</Rule>
</LifecycleConfiguration>"#;
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
Ok(())
}
/// Test helper: Set bucket lifecycle configuration
#[allow(dead_code)]
async fn set_bucket_lifecycle_deletemarker(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<ID>test-rule</ID>
<Status>Enabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<Expiration>
<Days>0</Days>
<ExpiredObjectDeleteMarker>true</ExpiredObjectDeleteMarker>
</Expiration>
</Rule>
</LifecycleConfiguration>"#;
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
Ok(())
}
#[allow(dead_code)]
async fn set_bucket_lifecycle_transition(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<ID>test-rule</ID>
<Status>Enabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<Transition>
<Days>0</Days>
<StorageClass>COLDTIER44</StorageClass>
</Transition>
</Rule>
<Rule>
<ID>test-rule2</ID>
<Status>Disabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<NoncurrentVersionTransition>
<NoncurrentDays>0</NoncurrentDays>
<StorageClass>COLDTIER44</StorageClass>
</NoncurrentVersionTransition>
</Rule>
</LifecycleConfiguration>"#;
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
Ok(())
}
/// Test helper: Create a test tier
#[allow(dead_code)]
async fn create_test_tier(server: u32) {
let args = TierConfig {
version: "v1".to_string(),
tier_type: TierType::MinIO,
name: "COLDTIER44".to_string(),
s3: None,
aliyun: None,
tencent: None,
huaweicloud: None,
azure: None,
gcs: None,
r2: None,
rustfs: None,
minio: if server == 1 {
Some(TierMinIO {
access_key: "minioadmin".to_string(),
secret_key: "minioadmin".to_string(),
bucket: "hello".to_string(),
endpoint: "http://127.0.0.1:9000".to_string(),
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
region: "".to_string(),
..Default::default()
})
} else if server == 2 {
let test_minio_server = std::env::var("TEST_MINIO_SERVER").unwrap_or_else(|_| "localhost:9000".to_string());
Some(TierMinIO {
access_key: "minioadmin".to_string(),
secret_key: "minioadmin".to_string(),
bucket: "mblock2".to_string(),
endpoint: format!("http://{}", test_minio_server),
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
region: "".to_string(),
..Default::default()
})
} else {
Some(TierMinIO {
access_key: "minioadmin".to_string(),
secret_key: "minioadmin".to_string(),
bucket: "mblock2".to_string(),
endpoint: "http://127.0.0.1:9020".to_string(),
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
region: "".to_string(),
..Default::default()
})
},
};
let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await;
if let Err(err) = tier_config_mgr.add(args, false).await {
println!("tier_config_mgr add failed, e: {err:?}");
panic!("tier add failed. {err}");
}
if let Err(e) = tier_config_mgr.save().await {
println!("tier_config_mgr save failed, e: {e:?}");
panic!("tier save failed");
}
println!("Created test tier: COLDTIER44");
}
/// Test helper: Check if object exists
async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
match (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
Ok(info) => !info.delete_marker,
Err(_) => false,
}
}
/// Test helper: Check if object exists
#[allow(dead_code)]
async fn object_is_delete_marker(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
println!("oi: {oi:?}");
oi.delete_marker
} else {
println!("object_is_delete_marker is error");
panic!("object_is_delete_marker is error");
}
}
/// Test helper: Check if object exists
#[allow(dead_code)]
async fn object_is_transitioned(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
println!("oi: {oi:?}");
!oi.transitioned_object.status.is_empty()
} else {
println!("object_is_transitioned is error");
panic!("object_is_transitioned is error");
}
}
#[allow(dead_code)]
async fn wait_for_object_absence(ecstore: &Arc<ECStore>, bucket: &str, object: &str, timeout: Duration) -> bool {
let deadline = tokio::time::Instant::now() + timeout;
loop {
if !object_exists(ecstore, bucket, object).await {
return true;
}
if tokio::time::Instant::now() >= deadline {
return false;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
mod serial_tests {
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[serial]
#[ignore]
async fn test_lifecycle_transition_basic() {
let (_disk_paths, ecstore) = setup_test_env().await;
create_test_tier(2).await;
// Create test bucket and object
let suffix = uuid::Uuid::new_v4().simple().to_string();
let bucket_name = format!("test-lc-transition-{}", &suffix[..8]);
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
let test_data = b"Hello, this is test data for lifecycle expiry!";
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
upload_test_object(
&ecstore,
bucket_name.as_str(),
object_name,
b"Hello, this is test data for lifecycle expiry 1111-11111111-1111 !",
)
.await;
//create_test_bucket(&ecstore, bucket_name.as_str()).await;
upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await;
// Verify object exists initially
assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await);
println!("✅ Object exists before lifecycle processing");
// Set lifecycle configuration with very short expiry (0 days = immediate expiry)
set_bucket_lifecycle_transition(bucket_name.as_str())
.await
.expect("Failed to set lifecycle configuration");
println!("✅ Lifecycle configuration set for bucket: {bucket_name}");
// Verify lifecycle configuration was set
match rustfs_ecstore::bucket::metadata_sys::get(bucket_name.as_str()).await {
Ok(bucket_meta) => {
assert!(bucket_meta.lifecycle_config.is_some());
println!("✅ Bucket metadata retrieved successfully");
}
Err(e) => {
println!("❌ Error retrieving bucket metadata: {e:?}");
}
}
let ctx = CancellationToken::new();
// Start scanner
init_data_scanner(ctx.clone(), ecstore.clone()).await;
println!("✅ Scanner started");
// Wait for scanner to process lifecycle rules
tokio::time::sleep(Duration::from_secs(1200)).await;
// Check if object has been expired (deleted)
let check_result = object_is_transitioned(&ecstore, &bucket_name, object_name).await;
println!("Object exists after lifecycle processing: {check_result}");
if check_result {
println!("✅ Object was transitioned by lifecycle processing");
// Let's try to get object info to see its details
match ecstore
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
.await
{
Ok(obj_info) => {
println!(
"Object info: name={}, size={}, mod_time={:?}",
obj_info.name, obj_info.size, obj_info.mod_time
);
println!("Object info: transitioned_object={:?}", obj_info.transitioned_object);
}
Err(e) => {
println!("Error getting object info: {e:?}");
}
}
} else {
println!("❌ Object was not transitioned by lifecycle processing");
}
assert!(check_result);
println!("✅ Object successfully transitioned");
// Stop scanner
ctx.cancel();
println!("✅ Scanner stopped");
println!("Lifecycle transition basic test completed");
}
}

View File

@@ -38,7 +38,7 @@ pub fn get_info(p: impl AsRef<Path>) -> std::io::Result<DiskInfo> {
Some(&mut total_number_of_bytes),
Some(&mut total_number_of_free_bytes),
)
.map_err(|e| Error::from_raw_os_error(e.code().0 as i32))?;
.map_err(|e| Error::from_raw_os_error(e.code().0))?;
}
let total = total_number_of_bytes;
@@ -67,7 +67,7 @@ pub fn get_info(p: impl AsRef<Path>) -> std::io::Result<DiskInfo> {
Some(&mut number_of_free_clusters),
Some(&mut total_number_of_clusters),
)
.map_err(|e| Error::from_raw_os_error(e.code().0 as i32))?;
.map_err(|e| Error::from_raw_os_error(e.code().0))?;
}
Ok(DiskInfo {
@@ -103,7 +103,7 @@ fn get_windows_fs_type(p: &[u16]) -> std::io::Result<String> {
Some(&mut file_system_flags),
Some(&mut file_system_name_buffer),
)
.map_err(|e| Error::from_raw_os_error(e.code().0 as i32))?;
.map_err(|e| Error::from_raw_os_error(e.code().0))?;
}
Ok(utf16_to_string(&file_system_name_buffer))
@@ -119,7 +119,7 @@ fn get_volume_name(v: &[u16]) -> std::io::Result<Vec<u16>> {
// Note: GetVolumePathNameW documentation says "The buffer should be large enough to hold the path". MAX_PATH is generally safe for volume roots.
unsafe {
GetVolumePathNameW(windows::core::PCWSTR::from_raw(v.as_ptr()), &mut volume_name_buffer)
.map_err(|e| Error::from_raw_os_error(e.code().0 as i32))?;
.map_err(|e| Error::from_raw_os_error(e.code().0))?;
}
let len = volume_name_buffer

View File

@@ -24,6 +24,7 @@ use crate::storage::helper::OperationHelper;
use crate::storage::options::{filter_object_metadata, get_content_sha256};
use crate::storage::{
access::{ReqInfo, authorize_request, has_bypass_governance_header},
ecfs_extend::RFC1123,
options::{
copy_dst_opts, copy_src_opts, del_opts, extract_metadata, extract_metadata_from_mime_with_object_name,
get_complete_multipart_upload_opts, get_opts, parse_copy_source_range, put_opts,
@@ -93,8 +94,8 @@ use rustfs_ecstore::{
},
};
use rustfs_filemeta::REPLICATE_INCOMING_DELETE;
use rustfs_filemeta::RestoreStatusOps;
use rustfs_filemeta::{ReplicationStatusType, ReplicationType, VersionPurgeStatusType};
use rustfs_filemeta::{RestoreStatusOps, parse_restore_obj_status};
use rustfs_kms::DataKey;
use rustfs_notify::{EventArgsBuilder, notifier_global};
use rustfs_policy::policy::{
@@ -3665,8 +3666,44 @@ impl S3 for FS {
{
response.headers.insert(header_name, header_value);
}
if let Some(amz_restore) = metadata_map.get(X_AMZ_RESTORE.as_str()) {
let Ok(restore_status) = parse_restore_obj_status(amz_restore) else {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrMeta".into()), "parse amz_restore failed."));
};
if let Ok(header_value) = HeaderValue::from_str(restore_status.to_string2().as_str()) {
response.headers.insert(X_AMZ_RESTORE, header_value);
}
}
if let Some(amz_restore_request_date) = metadata_map.get(AMZ_RESTORE_REQUEST_DATE)
&& let Ok(header_name) = http::HeaderName::from_bytes(AMZ_RESTORE_REQUEST_DATE.as_bytes())
{
let Ok(amz_restore_request_date) = OffsetDateTime::parse(amz_restore_request_date, &Rfc3339) else {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrMeta".into()),
"parse amz_restore_request_date failed.",
));
};
let Ok(amz_restore_request_date) = amz_restore_request_date.format(&RFC1123) else {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrMeta".into()),
"format amz_restore_request_date failed.",
));
};
if let Ok(header_value) = HeaderValue::from_str(&amz_restore_request_date) {
response.headers.insert(header_name, header_value);
}
}
if let Some(amz_restore_expiry_days) = metadata_map.get(AMZ_RESTORE_EXPIRY_DAYS)
&& let Ok(header_name) = http::HeaderName::from_bytes(AMZ_RESTORE_EXPIRY_DAYS.as_bytes())
&& let Ok(header_value) = HeaderValue::from_str(amz_restore_expiry_days)
{
response.headers.insert(header_name, header_value);
}
let result = Ok(response);
let _ = helper.complete(&result);
result
}
@@ -5238,6 +5275,7 @@ impl S3 for FS {
histogram!("rustfs.object_tagging.operation.duration.seconds", "operation" => "put").record(duration.as_secs_f64());
result
}
async fn restore_object(&self, req: S3Request<RestoreObjectInput>) -> S3Result<S3Response<RestoreObjectOutput>> {
let RestoreObjectInput {
bucket,
@@ -5246,76 +5284,66 @@ impl S3 for FS {
version_id,
..
} = req.input.clone();
let rreq = rreq.unwrap();
/*if let Err(e) = un_escape_path(object) {
warn!("post restore object failed, e: {:?}", e);
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
}*/
let rreq = rreq.ok_or_else(|| {
S3Error::with_message(S3ErrorCode::Custom("ErrValidRestoreObject".into()), "restore request is required")
})?;
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
/*if Err(err) = check_request_auth_type(req, policy::RestoreObjectAction, bucket, object) {
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
}*/
let version_id_str = version_id.clone().unwrap_or_default();
let opts = post_restore_opts(&version_id_str, &bucket, &object)
.await
.map_err(|_| S3Error::with_message(S3ErrorCode::Custom("ErrPostRestoreOpts".into()), "restore object failed."))?;
/*if req.content_length <= 0 {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
}*/
let Ok(opts) = post_restore_opts(&version_id.unwrap(), &bucket, &object).await else {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
"post restore object failed",
));
};
let Ok(mut obj_info) = store.get_object_info(&bucket, &object, &opts).await else {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
"post restore object failed",
));
};
let mut obj_info = store
.get_object_info(&bucket, &object, &opts)
.await
.map_err(|_| S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "restore object failed."))?;
// Check if object is in a transitioned state
if obj_info.transitioned_object.status != lifecycle::TRANSITION_COMPLETE {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
"post restore object failed",
S3ErrorCode::Custom("ErrInvalidTransitionedState".into()),
"restore object failed.",
));
}
//let mut api_err;
let mut _status_code = StatusCode::OK;
let mut already_restored = false;
if let Err(_err) = rreq.validate(store.clone()) {
//api_err = to_api_err(ErrMalformedXML);
//api_err.description = err.to_string();
// Validate restore request
if let Err(e) = rreq.validate(store.clone()) {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
"post restore object failed",
S3ErrorCode::Custom("ErrValidRestoreObject".into()),
format!("Restore object validation failed: {}", e),
));
} else {
if obj_info.restore_ongoing && (rreq.type_.is_none() || rreq.type_.as_ref().unwrap().as_str() != "SELECT") {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()),
"post restore object failed",
));
}
if !obj_info.restore_ongoing && obj_info.restore_expires.unwrap().unix_timestamp() != 0 {
_status_code = StatusCode::ACCEPTED;
already_restored = true;
}
}
let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), *rreq.days.as_ref().unwrap());
// Check if restore is already in progress
if obj_info.restore_ongoing && (rreq.type_.as_ref().is_none_or(|t| t.as_str() != "SELECT")) {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()),
"restore object failed.",
));
}
let mut already_restored = false;
if let Some(restore_expires) = obj_info.restore_expires
&& !obj_info.restore_ongoing
&& restore_expires.unix_timestamp() != 0
{
already_restored = true;
}
let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), *rreq.days.as_ref().unwrap_or(&1));
let mut metadata = obj_info.user_defined.clone();
let mut header = HeaderMap::new();
let obj_info_ = obj_info.clone();
if rreq.type_.is_none() || rreq.type_.as_ref().unwrap().as_str() != "SELECT" {
if rreq.type_.as_ref().is_none_or(|t| t.as_str() != "SELECT") {
obj_info.metadata_only = true;
metadata.insert(AMZ_RESTORE_EXPIRY_DAYS.to_string(), rreq.days.unwrap().to_string());
metadata.insert(AMZ_RESTORE_EXPIRY_DAYS.to_string(), rreq.days.unwrap_or(1).to_string());
metadata.insert(AMZ_RESTORE_REQUEST_DATE.to_string(), OffsetDateTime::now_utc().format(&Rfc3339).unwrap());
if already_restored {
metadata.insert(
@@ -5337,7 +5365,8 @@ impl S3 for FS {
);
}
obj_info.user_defined = metadata;
if let Err(_err) = store
store
.clone()
.copy_object(
&bucket,
@@ -5346,22 +5375,18 @@ impl S3 for FS {
&object,
&mut obj_info,
&ObjectOptions {
version_id: obj_info_.version_id.map(|e| e.to_string()),
version_id: obj_info_.version_id.map(|v| v.to_string()),
..Default::default()
},
&ObjectOptions {
version_id: obj_info_.version_id.map(|e| e.to_string()),
version_id: obj_info_.version_id.map(|v| v.to_string()),
mod_time: obj_info_.mod_time,
..Default::default()
},
)
.await
{
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrInvalidObjectState".into()),
"post restore object failed",
));
}
.map_err(|_| S3Error::with_message(S3ErrorCode::Custom("ErrCopyObject".into()), "restore object failed."))?;
if already_restored {
let output = RestoreObjectOutput {
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
@@ -5371,97 +5396,51 @@ impl S3 for FS {
}
}
let restore_object = Uuid::new_v4().to_string();
//if let Some(rreq) = rreq {
// Handle output location for SELECT requests
if let Some(output_location) = &rreq.output_location
&& let Some(s3) = &output_location.s3
&& !s3.bucket_name.is_empty()
{
let restore_object = Uuid::new_v4().to_string();
header.insert(
X_AMZ_RESTORE_OUTPUT_PATH,
format!("{}{}{}", s3.bucket_name, s3.prefix, restore_object).parse().unwrap(),
);
}
//}
/*send_event(EventArgs {
event_name: event::ObjectRestorePost,
bucket_name: bucket,
object: obj_info,
req_params: extract_req_params(r),
user_agent: req.user_agent(),
host: handlers::get_source_ip(r),
});*/
tokio::spawn(async move {
/*if rreq.select_parameters.is_some() {
let actual_size = obj_info_.get_actual_size();
if actual_size.is_err() {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
let object_rsc = s3select.new_object_read_seek_closer(
|offset: i64| -> (ReadCloser, error) {
rs := &HTTPRangeSpec{
IsSuffixLength: false,
Start: offset,
End: -1,
}
return get_transitioned_object_reader(bucket, object, rs, r.Header,
obj_info, ObjectOptions {version_id: obj_info_.version_id});
},
actual_size.unwrap(),
);
if err = rreq.select_parameters.open(object_rsc); err != nil {
if serr, ok := err.(s3select.SelectError); ok {
let encoded_error_response = encodeResponse(APIErrorResponse {
code: serr.ErrorCode(),
message: serr.ErrorMessage(),
bucket_name: bucket,
key: object,
resource: r.URL.Path,
request_id: w.Header().Get(xhttp.AmzRequestID),
host_id: globalDeploymentID(),
});
//writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
} else {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
return Ok(());
}
let nr = httptest.NewRecorder();
let rw = xhttp.NewResponseRecorder(nr);
rw.log_err_body = true;
rw.log_all_body = true;
rreq.select_parameters.evaluate(rw);
rreq.select_parameters.Close();
return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
}*/
// Spawn restoration task in the background
let store_clone = store.clone();
let bucket_clone = bucket.clone();
let object_clone = object.clone();
let rreq_clone = rreq.clone();
let version_id_clone = version_id.clone();
tokio::spawn(async move {
let opts = ObjectOptions {
transition: TransitionOptions {
restore_request: rreq,
restore_request: rreq_clone,
restore_expiry,
..Default::default()
},
version_id: obj_info_.version_id.map(|e| e.to_string()),
version_id: version_id_clone,
..Default::default()
};
if let Err(err) = store.clone().restore_transitioned_object(&bucket, &object, &opts).await {
warn!("unable to restore transitioned bucket/object {}/{}: {}", bucket, object, err.to_string());
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrRestoreTransitionedObject".into()),
format!("unable to restore transitioned bucket/object {bucket}/{object}: {err}"),
));
}
/*send_event(EventArgs {
EventName: event.ObjectRestoreCompleted,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
});*/
Ok(())
if let Err(err) = store_clone
.restore_transitioned_object(&bucket_clone, &object_clone, &opts)
.await
{
warn!(
"unable to restore transitioned bucket/object {}/{}: {}",
bucket_clone,
object_clone,
err.to_string()
);
// Note: Errors from background tasks cannot be returned to client
// Consider adding to monitoring/metrics system
} else {
info!("successfully restored transitioned object: {}/{}", bucket_clone, object_clone);
}
});
let output = RestoreObjectOutput {

View File

@@ -52,7 +52,7 @@ use time::{format_description::FormatItem, macros::format_description};
use tokio::io::AsyncRead;
use tracing::{debug, warn};
const RFC1123: &[FormatItem<'_>] =
pub const RFC1123: &[FormatItem<'_>] =
format_description!("[weekday repr:short], [day] [month repr:short] [year] [hour]:[minute]:[second] GMT");
/// =======================

View File

@@ -0,0 +1 @@
ansible-playbook rustfs/tests/lifecycle_ansible_test.yml

View File

@@ -0,0 +1,15 @@
---
- hosts: webservers
remote_user: leafcolor
vars:
ansible_ssh_pass: "123456"
tasks:
- name: Configure S3 bucket lifecycle
community.aws.s3_lifecycle:
endpoint_url: "http://www.example.com:9000"
access_key: "rustfsadmin"
secret_key: "rustfsadmin"
name: "mblock"
rule_id: "rule1"
state: present
expiration_days: "1"

View File

@@ -0,0 +1,120 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import timedelta
import uuid
from fastapi import UploadFile
from minio import Minio
from minio.commonconfig import Filter
from minio.error import S3Error
from minio.lifecycleconfig import Expiration, LifecycleConfig, Rule
import os
from collections import namedtuple
Settings = namedtuple("Settings", ['oss_endpoint',
'oss_access_key',
'oss_secret_key',
'oss_bucket_name',
'oss_lifecycle_days',
'oss_secure',
'oss_region'])
settings = Settings(
oss_endpoint=os.getenv("TEST_RUSTFS_SERVER", "localhost:9000"),
oss_access_key = "rustfsadmin",
oss_secret_key = "rustfsadmin",
oss_bucket_name = "mblock99",
oss_lifecycle_days = 1,
oss_secure = False,
oss_region = ""
)
class OSS:
def __init__(self):
self.bucket_name = settings.oss_bucket_name
self.lifecycle_days = settings.oss_lifecycle_days
self.client = Minio(
endpoint=settings.oss_endpoint,
access_key=settings.oss_access_key,
secret_key=settings.oss_secret_key,
secure=settings.oss_secure,
region=settings.oss_region,
)
def new_uuid(self):
return str(uuid.uuid4())
def create_bucket(self):
found = self.client.bucket_exists(self.bucket_name)
if not found:
try:
self.client.make_bucket(self.bucket_name)
self.set_lifecycle_expiration(days=self.lifecycle_days)
print(f"Bucket {self.bucket_name} created successfully")
except S3Error as exc:
if exc.code == "BucketAlreadyOwnedByYou":
print(f"Bucket {self.bucket_name} already owned by you; continuing")
else:
raise
else:
print(f"Bucket {self.bucket_name} already exists")
def set_lifecycle_expiration(self,days: int = 1, prefix: str = "") -> None:
"""
设置按天自动过期删除MinIO 按天、每天巡检一次;<1天不生效
"""
rule_filter = Filter(prefix=prefix or "")
rule = Rule(
rule_id=f"expire-{prefix or 'all'}-{days}d",
status="Enabled",
rule_filter=rule_filter,
expiration=Expiration(days=int(days)),
)
cfg = LifecycleConfig([rule])
self.client.set_bucket_lifecycle(self.bucket_name, cfg)
def upload_file(self, file: UploadFile):
"""
上传文件到OSS返回文件的UUID
"""
ext = os.path.splitext(file.filename)[1]
uuid = self.new_uuid()
filename = f'{uuid}{ext}'
file.file.seek(0)
self.client.put_object(
self.bucket_name,
filename,
file.file,
length=-1,
part_size=10*1024*1024,
)
return filename
def get_presigned_url(self, filename: str):
"""
获取文件的预签名URL用于下载文件
"""
return self.client.presigned_get_object(
self.bucket_name, filename, expires=timedelta(days=self.lifecycle_days)
)
def get_oss():
"""
获取OSS实例
"""
return OSS()
if __name__ == "__main__":
oss = get_oss()
oss.create_bucket()