Compare commits

...

16 Commits

Author SHA1 Message Date
likewu
e8f3a7d11e fix 2026-01-17 21:39:58 +08:00
likewu
82fcb7227a fmt 2026-01-17 21:27:40 +08:00
likewu
f383ea1175 Merge branch 'fix/lifecycle' of https://github.com/rustfs/rustfs into fix/lifecycle
# Conflicts:
#	rustfs/src/storage/ecfs.rs
2026-01-17 21:26:35 +08:00
likewu
4caad5096a fmt 2026-01-17 21:17:32 +08:00
likewu
e68f791b1a fix 2026-01-17 21:16:20 +08:00
likewu@126.com
0a4438e896 change s3s::Body to Incoming 2026-01-17 13:06:18 +00:00
likewu
e15c619ed5 test 2026-01-10 10:19:11 +08:00
likewu
ecf7a2e344 Merge branch 'fix/lifecycle' of https://github.com/rustfs/rustfs into fix/lifecycle
# Conflicts:
#	crates/ecstore/src/bucket/lifecycle/lifecycle.rs
2026-01-07 12:02:48 +08:00
likewu
e393add5ee lifecycle test 2026-01-07 12:00:48 +08:00
likewu
64c0430b84 lifecycle test 2026-01-07 11:58:35 +08:00
likewu
081616b81b test 2026-01-06 14:26:09 +08:00
likewu
d9125937b0 Merge branch 'weisd/scan' of https://github.com/rustfs/rustfs into fix/lifecycle 2026-01-04 11:38:30 +08:00
likewu
2534087551 add python MinIO SDK lifecycle config test 2025-12-10 23:28:05 +08:00
likewu
5f03a1cbe0 add ansible bucket lifecycle config test 2025-12-10 23:27:09 +08:00
likewu
f9af9fe5da Merge branch 'main' of https://github.com/rustfs/rustfs into fix/lifecycle 2025-12-10 18:58:27 +08:00
likewu
59b8aa14d7 fix 2025-12-05 12:32:48 +08:00
31 changed files with 2369 additions and 338 deletions

34
.vscode/launch.json vendored
View File

@@ -99,7 +99,17 @@
"name": "Debug executable target/debug/rustfs",
"type": "lldb",
"request": "launch",
"program": "${workspaceFolder}/target/debug/rustfs",
"cargo": {
"args": [
"run",
"--bin",
"rustfs",
"-j",
"1",
"--profile",
"dev"
]
},
"args": [],
"cwd": "${workspaceFolder}",
//"stopAtEntry": false,
@@ -107,7 +117,7 @@
"env": {
"RUSTFS_ACCESS_KEY": "rustfsadmin",
"RUSTFS_SECRET_KEY": "rustfsadmin",
"RUSTFS_VOLUMES": "./target/volume/test{1...4}",
//"RUSTFS_VOLUMES": "./target/volume/test{1...4}",
"RUSTFS_ADDRESS": ":9000",
"RUSTFS_CONSOLE_ENABLE": "true",
// "RUSTFS_OBS_TRACE_ENDPOINT": "http://127.0.0.1:4318/v1/traces", // jeager otlp http endpoint
@@ -116,11 +126,31 @@
// "RUSTFS_COMPRESS_ENABLE": "true",
"RUSTFS_CONSOLE_ADDRESS": "127.0.0.1:9001",
"RUSTFS_OBS_LOG_DIRECTORY": "./target/logs",
"RUST_LOG":"rustfs=debug,ecstore=debug,s3s=debug,iam=debug",
},
"sourceLanguages": [
"rust"
],
},
{
"type": "lldb",
"request": "launch",
"name": "Debug test_lifecycle_transition_basic",
"cargo": {
"args": [
"test",
"-p",
"rustfs-scanner",
"--test",
"lifecycle_integration_test",
"serial_tests::test_lifecycle_transition_basic",
"-j",
"1"
]
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"name": "Debug executable target/debug/test",
"type": "lldb",

5
Cargo.lock generated
View File

@@ -7251,12 +7251,15 @@ dependencies = [
"faster-hex",
"flatbuffers",
"futures",
"futures-util",
"glob",
"google-cloud-auth",
"google-cloud-storage",
"hex-simd",
"hmac 0.13.0-rc.3",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"hyper 1.8.1",
"hyper-rustls 0.27.7",
"hyper-util",
@@ -7614,10 +7617,12 @@ dependencies = [
"async-trait",
"chrono",
"futures",
"heed",
"http 1.4.0",
"path-clean",
"rand 0.10.0-rc.5",
"rmp-serde",
"rustfs-ahm",
"rustfs-common",
"rustfs-config",
"rustfs-ecstore",

View File

@@ -495,6 +495,26 @@ mod serial_tests {
object_name,
} = &elm.1;
println!("cache row:{ver_no} {ver_id} {mod_time} {type_:?} {object_name}");
//eval_inner(&oi.to_lifecycle_opts(), OffsetDateTime::now_utc()).await;
eval_inner(
&lifecycle::ObjectOpts {
name: oi.name.clone(),
user_tags: oi.user_tags.clone(),
version_id: oi.version_id.map(|v| v.to_string()).unwrap_or_default(),
mod_time: oi.mod_time,
size: oi.size as usize,
is_latest: oi.is_latest,
num_versions: oi.num_versions,
delete_marker: oi.delete_marker,
successor_mod_time: oi.successor_mod_time,
restore_ongoing: oi.restore_ongoing,
restore_expires: oi.restore_expires,
transition_status: oi.transitioned_object.status.clone(),
..Default::default()
},
OffsetDateTime::now_utc(),
)
.await;
}
println!("row:{row:?}");
}
@@ -506,3 +526,261 @@ mod serial_tests {
println!("Lifecycle cache test completed");
}
}
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event {
let mut events = Vec::<Event>::new();
info!(
"eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}",
obj.name, obj.mod_time, now, obj.is_latest, obj.delete_marker
);
if obj.mod_time.expect("err").unix_timestamp() == 0 {
info!("eval_inner: mod_time is 0, returning default event");
return Event::default();
}
if let Some(restore_expires) = obj.restore_expires {
if !restore_expires.unix_timestamp() == 0 && now.unix_timestamp() > restore_expires.unix_timestamp() {
let mut action = IlmAction::DeleteRestoredAction;
if !obj.is_latest {
action = IlmAction::DeleteRestoredVersionAction;
}
events.push(Event {
action,
due: Some(now),
rule_id: "".into(),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
}
if let Some(ref lc_rules) = self.filter_rules(obj).await {
for rule in lc_rules.iter() {
if obj.expired_object_deletemarker() {
if let Some(expiration) = rule.expiration.as_ref() {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(now),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
break;
}
if let Some(days) = expiration.days {
let expected_expiry = expected_expiry_time(obj.mod_time.unwrap(), days /*, date*/);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
break;
}
}
}
}
if obj.is_latest {
if let Some(ref expiration) = rule.expiration {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
if obj.delete_marker && expired_object_delete_marker {
let due = expiration.next_due(obj);
if let Some(due) = due {
if now.unix_timestamp() >= due.unix_timestamp() {
events.push(Event {
action: IlmAction::DelMarkerDeleteAllVersionsAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(due),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
}
continue;
}
}
}
}
if !obj.is_latest {
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
if let Some(newer_noncurrent_versions) = noncurrent_version_expiration.newer_noncurrent_versions {
if newer_noncurrent_versions > 0 {
continue;
}
}
}
}
if !obj.is_latest {
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
if let Some(noncurrent_days) = noncurrent_version_expiration.noncurrent_days {
if noncurrent_days != 0 {
if let Some(successor_mod_time) = obj.successor_mod_time {
let expected_expiry = expected_expiry_time(successor_mod_time, noncurrent_days);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
}
}
}
}
}
if !obj.is_latest {
if let Some(ref noncurrent_version_transitions) = rule.noncurrent_version_transitions {
if let Some(ref storage_class) = noncurrent_version_transitions[0].storage_class {
if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE {
let due = rule.noncurrent_version_transitions.as_ref().unwrap()[0].next_due(obj);
if let Some(due0) = due {
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
events.push(Event {
action: IlmAction::TransitionVersionAction,
rule_id: rule.id.clone().expect("err!"),
due,
storage_class: rule.noncurrent_version_transitions.as_ref().unwrap()[0]
.storage_class
.clone()
.unwrap()
.as_str()
.to_string(),
..Default::default()
});
}
}
}
}
}
}
info!(
"eval_inner: checking expiration condition - is_latest={}, delete_marker={}, version_id={:?}, condition_met={}",
obj.is_latest,
obj.delete_marker,
obj.version_id,
(obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker
);
// Allow expiration for latest objects OR non-versioned objects (empty version_id)
if (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker {
info!("eval_inner: entering expiration check");
if let Some(ref expiration) = rule.expiration {
if let Some(ref date) = expiration.date {
let date0 = OffsetDateTime::from(date.clone());
if date0.unix_timestamp() != 0 && (now.unix_timestamp() >= date0.unix_timestamp()) {
info!("eval_inner: expiration by date - date0={:?}", date0);
events.push(Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(date0),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
} else if let Some(days) = expiration.days {
let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.unwrap(), days);
info!(
"eval_inner: expiration check - days={}, obj_time={:?}, expiry_time={:?}, now={:?}, should_expire={}",
days,
obj.mod_time.expect("err!"),
expected_expiry,
now,
now.unix_timestamp() > expected_expiry.unix_timestamp()
);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
info!("eval_inner: object should expire, adding DeleteAction");
let mut event = Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
};
/*if rule.expiration.expect("err!").delete_all.val {
event.action = IlmAction::DeleteAllVersionsAction
}*/
events.push(event);
}
} else {
info!("eval_inner: expiration.days is None");
}
} else {
info!("eval_inner: rule.expiration is None");
}
if obj.transition_status != TRANSITION_COMPLETE {
if let Some(ref transitions) = rule.transitions {
let due = transitions[0].next_due(obj);
if let Some(due0) = due {
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
events.push(Event {
action: IlmAction::TransitionAction,
rule_id: rule.id.clone().expect("err!"),
due,
storage_class: transitions[0].storage_class.clone().expect("err!").as_str().to_string(),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
});
}
}
}
}
}
}
}
if events.len() > 0 {
events.sort_by(|a, b| {
if now.unix_timestamp() > a.due.expect("err!").unix_timestamp()
&& now.unix_timestamp() > b.due.expect("err").unix_timestamp()
|| a.due.expect("err").unix_timestamp() == b.due.expect("err").unix_timestamp()
{
match a.action {
IlmAction::DeleteAllVersionsAction
| IlmAction::DelMarkerDeleteAllVersionsAction
| IlmAction::DeleteAction
| IlmAction::DeleteVersionAction => {
return Ordering::Less;
}
_ => (),
}
match b.action {
IlmAction::DeleteAllVersionsAction
| IlmAction::DelMarkerDeleteAllVersionsAction
| IlmAction::DeleteAction
| IlmAction::DeleteVersionAction => {
return Ordering::Greater;
}
_ => (),
}
return Ordering::Less;
}
if a.due.expect("err").unix_timestamp() < b.due.expect("err").unix_timestamp() {
return Ordering::Less;
}
return Ordering::Greater;
});
return events[0].clone();
}
Event::default()
}

View File

@@ -263,6 +263,16 @@ async fn create_test_tier(server: u32) {
region: "".to_string(),
..Default::default()
})
} else if server == 2 {
Some(TierMinIO {
access_key: "minioadmin".to_string(),
secret_key: "minioadmin".to_string(),
bucket: "mblock2".to_string(),
endpoint: "http://m1ddns.pvtool.com:9020".to_string(),
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
region: "".to_string(),
..Default::default()
})
} else {
Some(TierMinIO {
access_key: "minioadmin".to_string(),
@@ -602,7 +612,7 @@ mod serial_tests {
async fn test_lifecycle_transition_basic() {
let (_disk_paths, ecstore) = setup_test_env().await;
create_test_tier(1).await;
create_test_tier(2).await;
// Create test bucket and object
let suffix = uuid::Uuid::new_v4().simple().to_string();
@@ -610,8 +620,15 @@ mod serial_tests {
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
let test_data = b"Hello, this is test data for lifecycle expiry!";
//create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
create_test_bucket(&ecstore, bucket_name.as_str()).await;
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
upload_test_object(
&ecstore,
bucket_name.as_str(),
object_name,
b"Hello, this is test data for lifecycle expiry 1111-11111111-1111 !",
)
.await;
//create_test_bucket(&ecstore, bucket_name.as_str()).await;
upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await;
// Verify object exists initially

View File

@@ -45,6 +45,7 @@ glob = { workspace = true }
thiserror.workspace = true
flatbuffers.workspace = true
futures.workspace = true
futures-util.workspace = true
tracing.workspace = true
serde.workspace = true
time.workspace = true
@@ -53,6 +54,8 @@ serde_json.workspace = true
quick-xml = { workspace = true, features = ["serialize", "async-tokio"] }
s3s.workspace = true
http.workspace = true
http-body = { workspace = true }
http-body-util.workspace = true
url.workspace = true
uuid = { workspace = true, features = ["v4", "fast-rng", "serde"] }
reed-solomon-simd = { workspace = true }

View File

@@ -21,7 +21,7 @@
use rustfs_filemeta::{ReplicationStatusType, VersionPurgeStatusType};
use s3s::dto::{
BucketLifecycleConfiguration, ExpirationStatus, LifecycleExpiration, LifecycleRule, NoncurrentVersionTransition,
ObjectLockConfiguration, ObjectLockEnabled, RestoreRequest, Transition,
ObjectLockConfiguration, ObjectLockEnabled, Prefix, RestoreRequest, Transition,
};
use std::cmp::Ordering;
use std::collections::HashMap;
@@ -173,44 +173,51 @@ impl Lifecycle for BucketLifecycleConfiguration {
continue;
}
let rule_prefix = rule.prefix.as_ref().expect("err!");
let rule_prefix = &rule.prefix.clone().unwrap_or_default();
if prefix.len() > 0 && rule_prefix.len() > 0 && !prefix.starts_with(rule_prefix) && !rule_prefix.starts_with(&prefix)
{
continue;
}
let rule_noncurrent_version_expiration = rule.noncurrent_version_expiration.as_ref().expect("err!");
if rule_noncurrent_version_expiration.noncurrent_days.expect("err!") > 0 {
if let Some(rule_noncurrent_version_expiration) = &rule.noncurrent_version_expiration {
if let Some(noncurrent_days) = rule_noncurrent_version_expiration.noncurrent_days {
if noncurrent_days > 0 {
return true;
}
}
if let Some(newer_noncurrent_versions) = rule_noncurrent_version_expiration.newer_noncurrent_versions {
if newer_noncurrent_versions > 0 {
return true;
}
}
}
if rule.noncurrent_version_transitions.is_some() {
return true;
}
if rule_noncurrent_version_expiration.newer_noncurrent_versions.expect("err!") > 0 {
return true;
if let Some(rule_expiration) = &rule.expiration {
if let Some(date1) = rule_expiration.date.clone() {
if OffsetDateTime::from(date1).unix_timestamp() < OffsetDateTime::now_utc().unix_timestamp() {
return true;
}
}
if rule_expiration.date.is_some() {
return true;
}
if let Some(expired_object_delete_marker) = rule_expiration.expired_object_delete_marker
&& expired_object_delete_marker
{
return true;
}
}
if !rule.noncurrent_version_transitions.is_none() {
return true;
if let Some(rule_transitions) = &rule.transitions {
let rule_transitions_0 = rule_transitions[0].clone();
if let Some(date1) = rule_transitions_0.date {
if OffsetDateTime::from(date1).unix_timestamp() < OffsetDateTime::now_utc().unix_timestamp() {
return true;
}
}
}
let rule_expiration = rule.expiration.as_ref().expect("err!");
if !rule_expiration.date.is_none()
&& OffsetDateTime::from(rule_expiration.date.clone().expect("err!")).unix_timestamp()
< OffsetDateTime::now_utc().unix_timestamp()
{
return true;
}
if !rule_expiration.date.is_none() {
return true;
}
if rule_expiration.expired_object_delete_marker.expect("err!") {
return true;
}
let rule_transitions: &[Transition] = &rule.transitions.as_ref().expect("err!");
let rule_transitions_0 = rule_transitions[0].clone();
if !rule_transitions_0.date.is_none()
&& OffsetDateTime::from(rule_transitions_0.date.expect("err!")).unix_timestamp()
< OffsetDateTime::now_utc().unix_timestamp()
{
return true;
}
if !rule.transitions.is_none() {
if rule.transitions.is_some() {
return true;
}
}

View File

@@ -18,8 +18,10 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, StatusCode};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use std::collections::HashMap;
use crate::client::{
@@ -61,9 +63,19 @@ impl TransitionClient {
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
//defer closeResponse(resp)
let resp_status = resp.status();
let h = resp.headers().clone();
//if resp != nil {
if resp.status() != StatusCode::NO_CONTENT && resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
if resp_status != StatusCode::NO_CONTENT && resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
vec![],
bucket_name,
"",
)));
}
//}
Ok(())
@@ -97,8 +109,17 @@ impl TransitionClient {
.await?;
//defer closeResponse(resp)
if resp.status() != StatusCode::NO_CONTENT {
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
let resp_status = resp.status();
let h = resp.headers().clone();
if resp_status != StatusCode::NO_CONTENT {
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
vec![],
bucket_name,
"",
)));
}
Ok(())
@@ -136,7 +157,15 @@ impl TransitionClient {
)
.await?;
let policy = String::from_utf8_lossy(&resp.body().bytes().expect("err").to_vec()).to_string();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let policy = String::from_utf8_lossy(&body_vec).to_string();
Ok(policy)
}
}

View File

@@ -18,12 +18,12 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use http::StatusCode;
use http::{HeaderMap, StatusCode};
use serde::{Deserialize, Serialize};
use serde::{de::Deserializer, ser::Serializer};
use std::fmt::Display;
use s3s::Body;
//use s3s::Body;
use s3s::S3ErrorCode;
const _REPORT_ISSUE: &str = "Please report this issue at https://github.com/rustfs/rustfs/issues.";
@@ -95,7 +95,8 @@ pub fn to_error_response(err: &std::io::Error) -> ErrorResponse {
}
pub fn http_resp_to_error_response(
resp: &http::Response<Body>,
resp_status: StatusCode,
h: &HeaderMap,
b: Vec<u8>,
bucket_name: &str,
object_name: &str,
@@ -104,11 +105,11 @@ pub fn http_resp_to_error_response(
let err_resp_ = quick_xml::de::from_str::<ErrorResponse>(&err_body);
let mut err_resp = ErrorResponse::default();
if err_resp_.is_err() {
match resp.status() {
match resp_status {
StatusCode::NOT_FOUND => {
if object_name == "" {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::NoSuchBucket,
message: "The specified bucket does not exist.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -116,7 +117,7 @@ pub fn http_resp_to_error_response(
};
} else {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::NoSuchKey,
message: "The specified key does not exist.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -127,7 +128,7 @@ pub fn http_resp_to_error_response(
}
StatusCode::FORBIDDEN => {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::AccessDenied,
message: "Access Denied.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -137,7 +138,7 @@ pub fn http_resp_to_error_response(
}
StatusCode::CONFLICT => {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::BucketNotEmpty,
message: "Bucket not empty.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -146,7 +147,7 @@ pub fn http_resp_to_error_response(
}
StatusCode::PRECONDITION_FAILED => {
err_resp = ErrorResponse {
status_code: resp.status(),
status_code: resp_status,
code: S3ErrorCode::PreconditionFailed,
message: "Pre condition failed.".to_string(),
bucket_name: bucket_name.to_string(),
@@ -155,13 +156,13 @@ pub fn http_resp_to_error_response(
};
}
_ => {
let mut msg = resp.status().to_string();
let mut msg = resp_status.to_string();
if err_body.len() > 0 {
msg = err_body;
}
err_resp = ErrorResponse {
status_code: resp.status(),
code: S3ErrorCode::Custom(resp.status().to_string().into()),
status_code: resp_status,
code: S3ErrorCode::Custom(resp_status.to_string().into()),
message: msg,
bucket_name: bucket_name.to_string(),
..Default::default()
@@ -171,32 +172,32 @@ pub fn http_resp_to_error_response(
} else {
err_resp = err_resp_.unwrap();
}
err_resp.status_code = resp.status();
if let Some(server_name) = resp.headers().get("Server") {
err_resp.status_code = resp_status;
if let Some(server_name) = h.get("Server") {
err_resp.server = server_name.to_str().expect("err").to_string();
}
let code = resp.headers().get("x-minio-error-code");
let code = h.get("x-minio-error-code");
if code.is_some() {
err_resp.code = S3ErrorCode::Custom(code.expect("err").to_str().expect("err").into());
}
let desc = resp.headers().get("x-minio-error-desc");
let desc = h.get("x-minio-error-desc");
if desc.is_some() {
err_resp.message = desc.expect("err").to_str().expect("err").trim_matches('"').to_string();
}
if err_resp.request_id == "" {
if let Some(x_amz_request_id) = resp.headers().get("x-amz-request-id") {
if let Some(x_amz_request_id) = h.get("x-amz-request-id") {
err_resp.request_id = x_amz_request_id.to_str().expect("err").to_string();
}
}
if err_resp.host_id == "" {
if let Some(x_amz_id_2) = resp.headers().get("x-amz-id-2") {
if let Some(x_amz_id_2) = h.get("x-amz-id-2") {
err_resp.host_id = x_amz_id_2.to_str().expect("err").to_string();
}
}
if err_resp.region == "" {
if let Some(x_amz_bucket_region) = resp.headers().get("x-amz-bucket-region") {
if let Some(x_amz_bucket_region) = h.get("x-amz-bucket-region") {
err_resp.region = x_amz_bucket_region.to_str().expect("err").to_string();
}
}

View File

@@ -19,17 +19,26 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
//use bytes::Bytes;
use futures_util::ready;
use http::HeaderMap;
use std::io::Cursor;
use std::io::{Cursor, Error as IoError, ErrorKind as IoErrorKind, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::BufReader;
use tokio_util::io::StreamReader;
use crate::client::{
api_error_response::err_invalid_argument,
api_get_options::GetObjectOptions,
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
};
use futures_util::StreamExt;
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use tokio_util::io::ReaderStream;
impl TransitionClient {
pub fn get_object(&self, bucket_name: &str, object_name: &str, opts: &GetObjectOptions) -> Result<Object, std::io::Error> {
@@ -65,11 +74,19 @@ impl TransitionClient {
)
.await?;
let resp = &resp;
let object_stat = to_object_info(bucket_name, object_name, resp.headers())?;
let b = resp.body().bytes().expect("err").to_vec();
Ok((object_stat, resp.headers().clone(), BufReader::new(Cursor::new(b))))
let h = resp.headers().clone();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
Ok((object_stat, h, BufReader::new(Cursor::new(body_vec))))
}
}

View File

@@ -25,6 +25,7 @@ use crate::client::{
};
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use http_body_util::BodyExt;
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use s3s::dto::Owner;
@@ -83,18 +84,29 @@ impl TransitionClient {
)
.await?;
if resp.status() != http::StatusCode::OK {
let b = resp.body().bytes().expect("err").to_vec();
return Err(std::io::Error::other(http_resp_to_error_response(&resp, b, bucket_name, object_name)));
let resp_status = resp.status();
let h = resp.headers().clone();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut res = match quick_xml::de::from_str::<AccessControlPolicy>(&String::from_utf8(b).unwrap()) {
if resp_status != http::StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
body_vec,
bucket_name,
object_name,
)));
}
let mut res = match quick_xml::de::from_str::<AccessControlPolicy>(&String::from_utf8(body_vec).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));

View File

@@ -18,7 +18,6 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use std::collections::HashMap;
use time::OffsetDateTime;
@@ -30,7 +29,11 @@ use crate::client::{
};
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use s3s::Body;
//use s3s::Body;
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use hyper::body::Incoming;
use s3s::header::{X_AMZ_MAX_PARTS, X_AMZ_OBJECT_ATTRIBUTES, X_AMZ_PART_NUMBER_MARKER, X_AMZ_VERSION_ID};
pub struct ObjectAttributesOptions {
@@ -130,19 +133,12 @@ struct ObjectAttributePart {
}
impl ObjectAttributes {
pub async fn parse_response(&mut self, resp: &mut http::Response<Body>) -> Result<(), std::io::Error> {
let h = resp.headers();
pub async fn parse_response(&mut self, h: &HeaderMap, body_vec: Vec<u8>) -> Result<(), std::io::Error> {
let mod_time = OffsetDateTime::parse(h.get("Last-Modified").unwrap().to_str().unwrap(), ISO8601_DATEFORMAT).unwrap(); //RFC7231Time
self.last_modified = mod_time;
self.version_id = h.get(X_AMZ_VERSION_ID).unwrap().to_str().unwrap().to_string();
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut response = match quick_xml::de::from_str::<ObjectAttributesResponse>(&String::from_utf8(b).unwrap()) {
let mut response = match quick_xml::de::from_str::<ObjectAttributesResponse>(&String::from_utf8(body_vec).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));
@@ -213,7 +209,8 @@ impl TransitionClient {
)
.await?;
let h = resp.headers();
let resp_status = resp.status();
let h = resp.headers().clone();
let has_etag = h.get("ETag").unwrap().to_str().unwrap();
if !has_etag.is_empty() {
return Err(std::io::Error::other(
@@ -221,14 +218,17 @@ impl TransitionClient {
));
}
if resp.status() != http::StatusCode::OK {
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let err_body = String::from_utf8(b).unwrap();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
if resp_status != http::StatusCode::OK {
let err_body = String::from_utf8(body_vec).unwrap();
let mut er = match quick_xml::de::from_str::<AccessControlPolicy>(&err_body) {
Ok(result) => result,
Err(err) => {
@@ -240,7 +240,7 @@ impl TransitionClient {
}
let mut oa = ObjectAttributes::new();
oa.parse_response(&mut resp).await?;
oa.parse_response(&h, body_vec).await?;
Ok(oa)
}

View File

@@ -27,8 +27,11 @@ use crate::client::{
transition_api::{ReaderImpl, RequestMetadata, TransitionClient},
};
use crate::store_api::BucketInfo;
use bytes::Bytes;
//use bytes::Bytes;
use http::{HeaderMap, StatusCode};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use std::collections::HashMap;
@@ -97,18 +100,30 @@ impl TransitionClient {
},
)
.await?;
let resp_status = resp.status();
let h = resp.headers().clone();
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(&resp, vec![], bucket_name, "")));
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
vec![],
bucket_name,
"",
)));
}
//let mut list_bucket_result = ListBucketV2Result::default();
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut list_bucket_result = match quick_xml::de::from_str::<ListBucketV2Result>(&String::from_utf8(b).unwrap()) {
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let mut list_bucket_result = match quick_xml::de::from_str::<ListBucketV2Result>(&String::from_utf8(body_vec).unwrap()) {
Ok(result) => result,
Err(err) => {
return Err(std::io::Error::other(err.to_string()));

View File

@@ -17,8 +17,9 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
//use bytes::Bytes;
use http::{HeaderMap, HeaderName, StatusCode};
use hyper::body::Bytes;
use s3s::S3ErrorCode;
use std::collections::HashMap;
use time::OffsetDateTime;
@@ -225,10 +226,15 @@ impl TransitionClient {
};
let resp = self.execute_method(http::Method::POST, &mut req_metadata).await?;
let resp_status = resp.status();
let h = resp.headers().clone();
//if resp.is_none() {
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
resp_status,
&h,
vec![],
bucket_name,
object_name,
@@ -287,9 +293,14 @@ impl TransitionClient {
};
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
let resp_status = resp.status();
let h = resp.headers().clone();
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
resp_status,
&h,
vec![],
&p.bucket_name.clone(),
&p.object_name,
@@ -370,7 +381,8 @@ impl TransitionClient {
let resp = self.execute_method(http::Method::POST, &mut req_metadata).await?;
let b = resp.body().bytes().expect("err").to_vec();
let h = resp.headers().clone();
let complete_multipart_upload_result: CompleteMultipartUploadResult = CompleteMultipartUploadResult::default();
let (exp_time, rule_id) = if let Some(h_x_amz_expiration) = resp.headers().get(X_AMZ_EXPIRATION) {
@@ -382,7 +394,6 @@ impl TransitionClient {
(OffsetDateTime::now_utc(), "".to_string())
};
let h = resp.headers();
Ok(UploadInfo {
bucket: complete_multipart_upload_result.bucket,
key: complete_multipart_upload_result.key,

View File

@@ -479,9 +479,13 @@ impl TransitionClient {
let resp = self.execute_method(http::Method::PUT, &mut req_metadata).await?;
let resp_status = resp.status();
let h = resp.headers().clone();
if resp.status() != StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
resp_status,
&h,
vec![],
bucket_name,
object_name,

View File

@@ -18,8 +18,10 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderValue, Method, StatusCode};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use rustfs_utils::HashAlgorithm;
use s3s::S3ErrorCode;
use s3s::dto::ReplicationStatus;
@@ -344,8 +346,15 @@ impl TransitionClient {
)
.await?;
let body_bytes: Vec<u8> = resp.body().bytes().expect("err").to_vec();
process_remove_multi_objects_response(ReaderImpl::Body(Bytes::from(body_bytes)), result_tx.clone());
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
process_remove_multi_objects_response(ReaderImpl::Body(Bytes::from(body_vec)), result_tx.clone());
}
Ok(())
}
@@ -390,6 +399,10 @@ impl TransitionClient {
},
)
.await?;
let resp_status = resp.status();
let h = resp.headers().clone();
//if resp.is_some() {
if resp.status() != StatusCode::NO_CONTENT {
let error_response: ErrorResponse;
@@ -426,7 +439,8 @@ impl TransitionClient {
}
_ => {
return Err(std::io::Error::other(http_resp_to_error_response(
&resp,
resp_status,
&h,
vec![],
bucket_name,
object_name,

View File

@@ -24,8 +24,10 @@ use crate::client::{
api_get_options::GetObjectOptions,
transition_api::{ObjectInfo, ReadCloser, ReaderImpl, RequestMetadata, TransitionClient, to_object_info},
};
use bytes::Bytes;
use http::HeaderMap;
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use s3s::dto::RestoreRequest;
use std::collections::HashMap;
use std::io::Cursor;
@@ -107,9 +109,25 @@ impl TransitionClient {
)
.await?;
let b = resp.body().bytes().expect("err").to_vec();
if resp.status() != http::StatusCode::ACCEPTED && resp.status() != http::StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(&resp, b, bucket_name, "")));
let resp_status = resp.status();
let h = resp.headers().clone();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
if resp_status != http::StatusCode::ACCEPTED && resp_status != http::StatusCode::OK {
return Err(std::io::Error::other(http_resp_to_error_response(
resp_status,
&h,
body_vec,
bucket_name,
"",
)));
}
Ok(())
}

View File

@@ -18,8 +18,10 @@
#![allow(unused_must_use)]
#![allow(clippy::all)]
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Bytes;
use rustfs_utils::EMPTY_STRING_SHA256_HASH;
use std::{collections::HashMap, str::FromStr};
use tokio::io::BufReader;
@@ -66,10 +68,20 @@ impl TransitionClient {
return Ok(false);
}
let b = resp.body().bytes().expect("err").to_vec();
let resperr = http_resp_to_error_response(&resp, b, bucket_name, "");
let resp_status = resp.status();
let h = resp.headers().clone();
warn!("bucket exists, resp: {:?}, resperr: {:?}", resp, resperr);
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let resperr = http_resp_to_error_response(resp_status, &h, body_vec, bucket_name, "");
warn!("bucket exists, resperr: {:?}", resperr);
/*if to_error_response(resperr).code == "NoSuchBucket" {
return Ok(false);
}
@@ -108,10 +120,20 @@ impl TransitionClient {
match resp {
Ok(resp) => {
let b = resp.body().bytes().expect("get bucket versioning err").to_vec();
let resperr = http_resp_to_error_response(&resp, b, bucket_name, "");
let resp_status = resp.status();
let h = resp.headers().clone();
warn!("get bucket versioning, resp: {:?}, resperr: {:?}", resp, resperr);
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let resperr = http_resp_to_error_response(resp_status, &h, body_vec, bucket_name, "");
warn!("get bucket versioning, resperr: {:?}", resperr);
Ok(VersioningConfiguration::default())
}

View File

@@ -25,10 +25,13 @@ use crate::client::{
transition_api::{CreateBucketConfiguration, LocationConstraint, TransitionClient},
};
use http::Request;
use http_body_util::BodyExt;
use hyper::StatusCode;
use hyper::body::Body;
use hyper::body::Bytes;
use hyper::body::Incoming;
use rustfs_config::MAX_S3_CLIENT_RESPONSE_SIZE;
use rustfs_utils::hash::EMPTY_STRING_SHA256_HASH;
use s3s::Body;
use s3s::S3ErrorCode;
use std::collections::HashMap;
@@ -86,7 +89,7 @@ impl TransitionClient {
Ok(location)
}
fn get_bucket_location_request(&self, bucket_name: &str) -> Result<http::Request<Body>, std::io::Error> {
fn get_bucket_location_request(&self, bucket_name: &str) -> Result<http::Request<s3s::Body>, std::io::Error> {
let mut url_values = HashMap::new();
url_values.insert("location".to_string(), "".to_string());
@@ -120,7 +123,11 @@ impl TransitionClient {
url_str = target_url.to_string();
}
let Ok(mut req) = Request::builder().method(http::Method::GET).uri(url_str).body(Body::empty()) else {
let Ok(mut req) = Request::builder()
.method(http::Method::GET)
.uri(url_str)
.body(s3s::Body::empty())
else {
return Err(std::io::Error::other("create request error"));
};
@@ -172,13 +179,16 @@ impl TransitionClient {
}
async fn process_bucket_location_response(
mut resp: http::Response<Body>,
mut resp: http::Response<Incoming>,
bucket_name: &str,
tier_type: &str,
) -> Result<String, std::io::Error> {
//if resp != nil {
if resp.status() != StatusCode::OK {
let err_resp = http_resp_to_error_response(&resp, vec![], bucket_name, "");
let resp_status = resp.status();
let h = resp.headers().clone();
let err_resp = http_resp_to_error_response(resp_status, &h, vec![], bucket_name, "");
match err_resp.code {
S3ErrorCode::NotImplemented => {
match err_resp.server.as_str() {
@@ -208,18 +218,22 @@ async fn process_bucket_location_response(
}
//}
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let mut location = "".to_string();
if tier_type == "huaweicloud" {
let d = quick_xml::de::from_str::<CreateBucketConfiguration>(&String::from_utf8(b).unwrap()).unwrap();
let d = quick_xml::de::from_str::<CreateBucketConfiguration>(&String::from_utf8(body_vec).unwrap()).unwrap();
location = d.location_constraint;
} else {
if let Ok(LocationConstraint { field }) = quick_xml::de::from_str::<LocationConstraint>(&String::from_utf8(b).unwrap()) {
if let Ok(LocationConstraint { field }) =
quick_xml::de::from_str::<LocationConstraint>(&String::from_utf8(body_vec).unwrap())
{
location = field;
}
}

View File

@@ -118,38 +118,51 @@ pub fn new_getobjectreader<'a>(
let mut is_encrypted = false;
let is_compressed = false; //oi.is_compressed_ok();
let mut rs_ = None;
let rs_;
if rs.is_none() && opts.part_number.is_some() && opts.part_number.unwrap() > 0 {
rs_ = part_number_to_rangespec(oi.clone(), opts.part_number.unwrap());
} else {
rs_ = rs.clone();
}
let mut get_fn: ObjReaderFn;
let (off, length) = match rs_.unwrap().get_offset_length(oi.size) {
Ok(x) => x,
Err(err) => {
return Err(ErrorResponse {
code: S3ErrorCode::InvalidRange,
message: err.to_string(),
key: None,
bucket_name: None,
region: None,
request_id: None,
host_id: "".to_string(),
});
}
};
get_fn = Arc::new(move |input_reader: BufReader<Cursor<Vec<u8>>>, _: HeaderMap| {
//Box::pin({
let r = GetObjectReader {
object_info: oi.clone(),
stream: Box::new(input_reader),
if let Some(rs_) = rs_ {
let (off, length) = match rs_.get_offset_length(oi.size) {
Ok(x) => x,
Err(err) => {
return Err(ErrorResponse {
code: S3ErrorCode::InvalidRange,
message: err.to_string(),
key: None,
bucket_name: None,
region: None,
request_id: None,
host_id: "".to_string(),
});
}
};
r
//})
});
get_fn = Arc::new(move |input_reader: BufReader<Cursor<Vec<u8>>>, _: HeaderMap| {
//Box::pin({
let r = GetObjectReader {
object_info: oi.clone(),
stream: Box::new(input_reader),
};
r
//})
});
Ok((get_fn, off as i64, length as i64))
return Ok((get_fn, off as i64, length as i64));
}
Err(ErrorResponse {
code: S3ErrorCode::InvalidRange,
message: "Invalid range".to_string(),
key: Some(oi.name.clone()),
bucket_name: Some(oi.bucket.clone()),
region: Some("".to_string()),
request_id: None,
host_id: "".to_string(),
})
}
/// Convert a raw stored ETag into the strongly-typed `s3s::dto::ETag`.

View File

@@ -20,6 +20,7 @@
use crate::client::bucket_cache::BucketLocationCache;
use crate::client::{
api_error_response::ErrorResponse,
api_error_response::{err_invalid_argument, http_resp_to_error_response, to_error_response},
api_get_options::GetObjectOptions,
api_put_object::PutObjectOptions,
@@ -32,13 +33,17 @@ use crate::client::{
credentials::{CredContext, Credentials, SignatureType, Static},
};
use crate::{client::checksum::ChecksumMode, store_api::GetObjectReader};
use bytes::Bytes;
//use bytes::Bytes;
use futures::{Future, StreamExt};
use http::{HeaderMap, HeaderName};
use http::{
HeaderValue, Response, StatusCode,
request::{Builder, Request},
};
use http_body::Body;
use http_body_util::BodyExt;
use hyper::body::Bytes;
use hyper::body::Incoming;
use hyper_rustls::{ConfigBuilderExt, HttpsConnector};
use hyper_util::{client::legacy::Client, client::legacy::connect::HttpConnector, rt::TokioExecutor};
use md5::Digest;
@@ -54,8 +59,8 @@ use rustfs_utils::{
},
};
use s3s::S3ErrorCode;
use s3s::dto::Owner;
use s3s::dto::ReplicationStatus;
use s3s::{Body, dto::Owner};
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use std::io::Cursor;
@@ -95,7 +100,7 @@ pub struct TransitionClient {
pub creds_provider: Arc<Mutex<Credentials<Static>>>,
pub override_signer_type: SignatureType,
pub secure: bool,
pub http_client: Client<HttpsConnector<HttpConnector>, Body>,
pub http_client: Client<HttpsConnector<HttpConnector>, s3s::Body>,
pub bucket_loc_cache: Arc<Mutex<BucketLocationCache>>,
pub is_trace_enabled: Arc<Mutex<bool>>,
pub trace_errors_only: Arc<Mutex<bool>>,
@@ -248,7 +253,7 @@ impl TransitionClient {
todo!();
}
fn dump_http(&self, req: &http::Request<Body>, resp: &http::Response<Body>) -> Result<(), std::io::Error> {
fn dump_http(&self, req: &http::Request<s3s::Body>, resp: &http::Response<Incoming>) -> Result<(), std::io::Error> {
let mut resp_trace: Vec<u8>;
//info!("{}{}", self.trace_output, "---------BEGIN-HTTP---------");
@@ -257,7 +262,7 @@ impl TransitionClient {
Ok(())
}
pub async fn doit(&self, req: http::Request<Body>) -> Result<http::Response<Body>, std::io::Error> {
pub async fn doit(&self, req: http::Request<s3s::Body>) -> Result<http::Response<Incoming>, std::io::Error> {
let req_method;
let req_uri;
let req_headers;
@@ -272,9 +277,7 @@ impl TransitionClient {
debug!("endpoint_url: {}", self.endpoint_url.as_str().to_string());
resp = http_client.request(req);
}
let resp = resp
.await /*.map_err(Into::into)*/
.map(|res| res.map(Body::from));
let resp = resp.await;
debug!("http_client url: {} {}", req_method, req_uri);
debug!("http_client headers: {:?}", req_headers);
if let Err(err) = resp {
@@ -282,7 +285,7 @@ impl TransitionClient {
return Err(std::io::Error::other(err));
}
let mut resp = resp.unwrap();
let resp = resp.unwrap();
debug!("http_resp: {:?}", resp);
//let b = resp.body_mut().store_all_unlimited().await.unwrap().to_vec();
@@ -291,23 +294,26 @@ impl TransitionClient {
//if self.is_trace_enabled && !(self.trace_errors_only && resp.status() == StatusCode::OK) {
if resp.status() != StatusCode::OK {
//self.dump_http(&cloned_req, &resp)?;
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
warn!("err_body: {}", String::from_utf8(b).unwrap());
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
warn!("err_body: {}", String::from_utf8(body_vec).unwrap());
Err(std::io::Error::other("http_client call error."))
} else {
Ok(resp)
}
Ok(resp)
}
pub async fn execute_method(
&self,
method: http::Method,
metadata: &mut RequestMetadata,
) -> Result<http::Response<Body>, std::io::Error> {
) -> Result<http::Response<Incoming>, std::io::Error> {
if self.is_offline() {
let mut s = self.endpoint_url.to_string();
s.push_str(" is offline.");
@@ -317,7 +323,7 @@ impl TransitionClient {
let retryable: bool;
//let mut body_seeker: BufferReader;
let mut req_retry = self.max_retries;
let mut resp: http::Response<Body>;
let mut resp: http::Response<Incoming>;
//if metadata.content_body != nil {
//body_seeker = BufferReader::new(metadata.content_body.read_all().await?);
@@ -339,13 +345,19 @@ impl TransitionClient {
}
}
let b = resp
.body_mut()
.store_all_limited(MAX_S3_CLIENT_RESPONSE_SIZE)
.await
.unwrap()
.to_vec();
let mut err_response = http_resp_to_error_response(&resp, b.clone(), &metadata.bucket_name, &metadata.object_name);
let resp_status = resp.status();
let h = resp.headers().clone();
let mut body_vec = Vec::new();
let mut body = resp.into_body();
while let Some(frame) = body.frame().await {
let frame = frame.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
if let Some(data) = frame.data_ref() {
body_vec.extend_from_slice(data);
}
}
let mut err_response =
http_resp_to_error_response(resp_status, &h, body_vec.clone(), &metadata.bucket_name, &metadata.object_name);
err_response.message = format!("remote tier error: {}", err_response.message);
if self.region == "" {
@@ -381,7 +393,7 @@ impl TransitionClient {
continue;
}
if is_http_status_retryable(&resp.status()) {
if is_http_status_retryable(&resp_status) {
continue;
}
@@ -395,7 +407,7 @@ impl TransitionClient {
&self,
method: &http::Method,
metadata: &mut RequestMetadata,
) -> Result<http::Request<Body>, std::io::Error> {
) -> Result<http::Request<s3s::Body>, std::io::Error> {
let mut location = metadata.bucket_location.clone();
if location == "" && metadata.bucket_name != "" {
location = self.get_bucket_location(&metadata.bucket_name).await?;
@@ -415,7 +427,7 @@ impl TransitionClient {
let Ok(mut req) = Request::builder()
.method(method)
.uri(target_url.to_string())
.body(Body::empty())
.body(s3s::Body::empty())
else {
return Err(std::io::Error::other("create request error"));
};
@@ -527,10 +539,10 @@ impl TransitionClient {
if metadata.content_length > 0 {
match &mut metadata.content_body {
ReaderImpl::Body(content_body) => {
*req.body_mut() = Body::from(content_body.clone());
*req.body_mut() = s3s::Body::from(content_body.clone());
}
ReaderImpl::ObjectBody(content_body) => {
*req.body_mut() = Body::from(content_body.read_all().await?);
*req.body_mut() = s3s::Body::from(content_body.read_all().await?);
}
}
}
@@ -538,7 +550,7 @@ impl TransitionClient {
Ok(req)
}
pub fn set_user_agent(&self, req: &mut Request<Body>) {
pub fn set_user_agent(&self, req: &mut Request<s3s::Body>) {
let headers = req.headers_mut();
headers.insert("User-Agent", C_USER_AGENT.parse().expect("err"));
}
@@ -976,25 +988,217 @@ impl Default for UploadInfo {
}
}
/// Convert HTTP headers to ObjectInfo struct
/// This function parses various S3 response headers to construct an ObjectInfo struct
/// containing metadata about an S3 object.
pub fn to_object_info(bucket_name: &str, object_name: &str, h: &HeaderMap) -> Result<ObjectInfo, std::io::Error> {
todo!()
// Helper function to get header value as string
let get_header = |name: &str| -> String { h.get(name).and_then(|val| val.to_str().ok()).unwrap_or("").to_string() };
// Get and process the ETag
let etag = {
let etag_raw = get_header("ETag");
// Remove surrounding quotes if present (trimming ETag)
let trimmed = etag_raw.trim_start_matches('"').trim_end_matches('"');
Some(trimmed.to_string())
};
// Parse content length if it exists
let size = {
let content_length_str = get_header("Content-Length");
if !content_length_str.is_empty() {
content_length_str
.parse::<i64>()
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Content-Length is not an integer"))?
} else {
-1
}
};
// Parse Last-Modified time
let mod_time = {
let last_modified_str = get_header("Last-Modified");
if !last_modified_str.is_empty() {
// Parse HTTP date format (RFC 7231)
// Using time crate to parse HTTP dates
let parsed_time = OffsetDateTime::parse(&last_modified_str, &time::format_description::well_known::Rfc2822)
.or_else(|_| OffsetDateTime::parse(&last_modified_str, &time::format_description::well_known::Rfc3339))
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Last-Modified time format is invalid"))?;
Some(parsed_time)
} else {
Some(OffsetDateTime::now_utc())
}
};
// Get content type
let content_type = {
let content_type_raw = get_header("Content-Type");
let content_type_trimmed = content_type_raw.trim();
if content_type_trimmed.is_empty() {
Some("application/octet-stream".to_string())
} else {
Some(content_type_trimmed.to_string())
}
};
// Parse Expires time
let expiration = {
let expiry_str = get_header("Expires");
if !expiry_str.is_empty() {
OffsetDateTime::parse(&expiry_str, &time::format_description::well_known::Rfc2822)
.or_else(|_| OffsetDateTime::parse(&expiry_str, &time::format_description::well_known::Rfc3339))
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "'Expires' is not in supported format"))?
} else {
OffsetDateTime::now_utc()
}
};
// Extract user metadata (headers prefixed with "X-Amz-Meta-")
let user_metadata = {
let mut meta = HashMap::new();
for (name, value) in h.iter() {
let header_name = name.as_str().to_lowercase();
if header_name.starts_with("x-amz-meta-") {
let key = header_name.strip_prefix("x-amz-meta-").unwrap().to_string();
if let Ok(value_str) = value.to_str() {
meta.insert(key, value_str.to_string());
}
}
}
meta
};
let user_tag = {
let user_tag_str = get_header("X-Amz-Tagging");
user_tag_str
};
// Extract user tags count
let user_tag_count = {
let count_str = get_header("x-amz-tagging-count");
if !count_str.is_empty() {
count_str
.parse::<usize>()
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "x-amz-tagging-count is not an integer"))?
} else {
0
}
};
// Handle restore info
let restore = {
let restore_hdr = get_header("x-amz-restore");
if !restore_hdr.is_empty() {
// Simplified restore header parsing - in real implementation, this would parse the specific format
// "ongoing-request=\"true\"" or "ongoing-request=\"false\", expiry-date=\"..."
let ongoing_restore = restore_hdr.contains("ongoing-request=\"true\"");
RestoreInfo {
ongoing_restore,
expiry_time: if ongoing_restore {
OffsetDateTime::now_utc()
} else {
// Try to extract expiry date from the header
// This is simplified - real parsing would be more complex
OffsetDateTime::now_utc()
},
}
} else {
RestoreInfo::default()
}
};
// Extract version ID
let version_id = {
let version_id_str = get_header("x-amz-version-id");
if !version_id_str.is_empty() {
Some(Uuid::parse_str(&version_id_str).unwrap_or_else(|_| Uuid::nil()))
} else {
None
}
};
// Check if it's a delete marker
let is_delete_marker = get_header("x-amz-delete-marker") == "true";
// Get replication status
let replication_status = {
let status_str = get_header("x-amz-replication-status");
ReplicationStatus::from_static(match status_str.as_str() {
"COMPLETE" => ReplicationStatus::COMPLETE,
"PENDING" => ReplicationStatus::PENDING,
"FAILED" => ReplicationStatus::FAILED,
"REPLICA" => ReplicationStatus::REPLICA,
_ => ReplicationStatus::PENDING,
})
};
// Extract expiration rule ID and time (simplified)
let (expiration_time, expiration_rule_id) = {
// In a real implementation, this would parse the x-amz-expiration header
// which typically has format: "expiry-date="Fri, 11 Dec 2020 00:00:00 GMT", rule-id="myrule""
let exp_header = get_header("x-amz-expiration");
if !exp_header.is_empty() {
// Simplified parsing - real implementation would be more thorough
(OffsetDateTime::now_utc(), exp_header) // Placeholder
} else {
(OffsetDateTime::now_utc(), "".to_string())
}
};
// Extract checksums
let checksum_crc32 = get_header("x-amz-checksum-crc32");
let checksum_crc32c = get_header("x-amz-checksum-crc32c");
let checksum_sha1 = get_header("x-amz-checksum-sha1");
let checksum_sha256 = get_header("x-amz-checksum-sha256");
let checksum_crc64nvme = get_header("x-amz-checksum-crc64nvme");
let checksum_mode = get_header("x-amz-checksum-mode");
// Build and return the ObjectInfo struct
Ok(ObjectInfo {
etag,
name: object_name.to_string(),
mod_time,
size,
content_type,
metadata: h.clone(),
user_metadata,
user_tags: "".to_string(), // Tags would need separate parsing
user_tag_count,
owner: Owner::default(),
storage_class: get_header("x-amz-storage-class"),
is_latest: true, // Would be determined by versioning settings
is_delete_marker,
version_id,
replication_status,
replication_ready: false, // Would be computed based on status
expiration: expiration_time,
expiration_rule_id,
num_versions: 1, // Would be determined by versioning
restore,
checksum_crc32,
checksum_crc32c,
checksum_sha1,
checksum_sha256,
checksum_crc64nvme,
checksum_mode,
})
}
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
//#[derive(Clone)]
pub struct SendRequest {
inner: hyper::client::conn::http1::SendRequest<Body>,
inner: hyper::client::conn::http1::SendRequest<s3s::Body>,
}
impl From<hyper::client::conn::http1::SendRequest<Body>> for SendRequest {
fn from(inner: hyper::client::conn::http1::SendRequest<Body>) -> Self {
impl From<hyper::client::conn::http1::SendRequest<s3s::Body>> for SendRequest {
fn from(inner: hyper::client::conn::http1::SendRequest<s3s::Body>) -> Self {
Self { inner }
}
}
impl tower::Service<Request<Body>> for SendRequest {
type Response = Response<Body>;
impl tower::Service<Request<s3s::Body>> for SendRequest {
type Response = Response<Incoming>;
type Error = std::io::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
@@ -1002,13 +1206,13 @@ impl tower::Service<Request<Body>> for SendRequest {
self.inner.poll_ready(cx).map_err(std::io::Error::other)
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
fn call(&mut self, req: Request<s3s::Body>) -> Self::Future {
//let req = hyper::Request::builder().uri("/").body(http_body_util::Empty::<Bytes>::new()).unwrap();
//let req = hyper::Request::builder().uri("/").body(Body::empty()).unwrap();
let fut = self.inner.send_request(req);
Box::pin(async move { fut.await.map_err(std::io::Error::other).map(|res| res.map(Body::from)) })
Box::pin(async move { fut.await.map_err(std::io::Error::other) })
}
}

View File

@@ -452,6 +452,8 @@ pub fn path2_bucket_object_with_base_path(base_path: &str, path: &str) -> (Strin
.strip_prefix(SLASH_SEPARATOR)
.unwrap_or(path);
// Find the position of the first '/'
#[cfg(windows)]
let trimmed_path = trimmed_path.replace('\\', "/");
let Some(pos) = trimmed_path.find(SLASH_SEPARATOR) else {
return (trimmed_path.to_string(), "".to_string());
};

View File

@@ -4852,10 +4852,18 @@ impl StorageAPI for SetDisks {
// Normalize ETags by removing quotes before comparison (PR #592 compatibility)
let transition_etag = rustfs_utils::path::trim_etag(&opts.transition.etag);
let stored_etag = rustfs_utils::path::trim_etag(&get_raw_etag(&fi.metadata));
if opts.mod_time.expect("err").unix_timestamp() != fi.mod_time.as_ref().expect("err").unix_timestamp()
|| transition_etag != stored_etag
{
return Err(to_object_err(Error::other(DiskError::FileNotFound), vec![bucket, object]));
if let Some(mod_time1) = opts.mod_time {
if let Some(mod_time2) = fi.mod_time.as_ref() {
if mod_time1.unix_timestamp() != mod_time2.unix_timestamp()
/*|| transition_etag != stored_etag*/
{
return Err(to_object_err(Error::other(DiskError::FileNotFound), vec![bucket, object]));
}
} else {
return Err(Error::other("mod_time 2 error.".to_string()));
}
} else {
return Err(Error::other("mod_time 1 error.".to_string()));
}
if fi.transition_status == TRANSITION_COMPLETE {
return Ok(());
@@ -4985,8 +4993,10 @@ impl StorageAPI for SetDisks {
oi = ObjectInfo::from_file_info(&actual_fi, bucket, object, opts.versioned || opts.version_suspended);
let ropts = put_restore_opts(bucket, object, &opts.transition.restore_request, &oi).await?;
if oi.parts.len() == 1 {
let mut opts = opts.clone();
opts.part_number = Some(1);
let rs: Option<HTTPRangeSpec> = None;
let gr = get_transitioned_object_reader(bucket, object, &rs, &HeaderMap::new(), &oi, opts).await;
let gr = get_transitioned_object_reader(bucket, object, &rs, &HeaderMap::new(), &oi, &opts).await;
if let Err(err) = gr {
return set_restore_header_fn(&mut oi, Some(to_object_err(err.into(), vec![bucket, object]))).await;
}

View File

@@ -50,6 +50,7 @@ rustfs-filemeta = { workspace = true }
rustfs-madmin = { workspace = true }
tokio-util = { workspace = true }
rustfs-ecstore = { workspace = true }
rustfs-ahm = { workspace = true }
http = { workspace = true }
rand = { workspace = true }
s3s = { workspace = true }
@@ -59,3 +60,4 @@ tokio-test = { workspace = true }
tracing-subscriber = { workspace = true }
tempfile = { workspace = true }
serial_test = { workspace = true }
heed = { workspace = true }

View File

@@ -554,9 +554,9 @@ impl FolderScanner {
let file_path = entry.path().to_string_lossy().to_string();
let trim_dir_name = file_path.strip_prefix(&dir_path).unwrap_or(&file_path);
//let trim_dir_name = file_path.strip_prefix(&dir_path).unwrap_or(&file_path);
let entry_name = path_join_buf(&[&folder.name, trim_dir_name]);
let entry_name = path_join_buf(&[&folder.name, &file_name]);
if entry_name.is_empty() || entry_name == folder.name {
debug!("scan_folder: done for now entry_name is empty or equals folder name");

View File

@@ -272,7 +272,7 @@ impl ScannerIOCache for SetDisks {
let store_clone = self.clone();
let ctx_clone = ctx.clone();
let send_update_fut = tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_secs(30 + rand::random::<u64>() % 10));
let mut ticker = tokio::time::interval(Duration::from_secs(3 + rand::random::<u64>() % 10));
let mut last_update = None;

View File

@@ -0,0 +1,786 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use heed::byteorder::BigEndian;
use heed::types::*;
use heed::{BoxedError, BytesDecode, BytesEncode, Database, DatabaseFlags, Env, EnvOpenOptions};
use rustfs_ecstore::{
disk::endpoint::Endpoint,
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
store::ECStore,
store_api::{MakeBucketOptions, ObjectIO, ObjectInfo, ObjectOptions, PutObjReader, StorageAPI},
};
use rustfs_scanner::scanner::local_scan::{self, LocalObjectRecord, LocalScanOutcome};
use serial_test::serial;
use std::{
borrow::Cow,
path::PathBuf,
sync::{Arc, Once, OnceLock},
};
//use heed_traits::Comparator;
use time::OffsetDateTime;
use tokio::fs;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use uuid::Uuid;
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
static INIT: Once = Once::new();
static _LIFECYCLE_EXPIRY_CURRENT_DAYS: i32 = 1;
static _LIFECYCLE_EXPIRY_NONCURRENT_DAYS: i32 = 1;
static _LIFECYCLE_TRANSITION_CURRENT_DAYS: i32 = 1;
static _LIFECYCLE_TRANSITION_NONCURRENT_DAYS: i32 = 1;
static GLOBAL_LMDB_ENV: OnceLock<Env> = OnceLock::new();
static GLOBAL_LMDB_DB: OnceLock<Database<I64<BigEndian>, LifecycleContentCodec>> = OnceLock::new();
fn init_tracing() {
INIT.call_once(|| {
let _ = tracing_subscriber::fmt::try_init();
});
}
/// Test helper: Create test environment with ECStore
async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>) {
init_tracing();
// Fast path: already initialized, just clone and return
if let Some((paths, ecstore)) = GLOBAL_ENV.get() {
return (paths.clone(), ecstore.clone());
}
// create temp dir as 4 disks with unique base dir
let test_base_dir = format!("/tmp/rustfs_ahm_lifecyclecache_test_{}", uuid::Uuid::new_v4());
let temp_dir = std::path::PathBuf::from(&test_base_dir);
if temp_dir.exists() {
fs::remove_dir_all(&temp_dir).await.ok();
}
fs::create_dir_all(&temp_dir).await.unwrap();
// create 4 disk dirs
let disk_paths = vec![
temp_dir.join("disk1"),
temp_dir.join("disk2"),
temp_dir.join("disk3"),
temp_dir.join("disk4"),
];
for disk_path in &disk_paths {
fs::create_dir_all(disk_path).await.unwrap();
}
// create EndpointServerPools
let mut endpoints = Vec::new();
for (i, disk_path) in disk_paths.iter().enumerate() {
let mut endpoint = Endpoint::try_from(disk_path.to_str().unwrap()).unwrap();
// set correct index
endpoint.set_pool_index(0);
endpoint.set_set_index(0);
endpoint.set_disk_index(i);
endpoints.push(endpoint);
}
let pool_endpoints = PoolEndpoints {
legacy: false,
set_count: 1,
drives_per_set: 4,
endpoints: Endpoints::from(endpoints),
cmd_line: "test".to_string(),
platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH),
};
let endpoint_pools = EndpointServerPools(vec![pool_endpoints]);
// format disks (only first time)
rustfs_ecstore::store::init_local_disks(endpoint_pools.clone()).await.unwrap();
// create ECStore with dynamic port 0 (let OS assign) or fixed 9002 if free
let port = 9002; // for simplicity
let server_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let ecstore = ECStore::new(server_addr, endpoint_pools, CancellationToken::new())
.await
.unwrap();
// init bucket metadata system
let buckets_list = ecstore
.list_bucket(&rustfs_ecstore::store_api::BucketOptions {
no_metadata: true,
..Default::default()
})
.await
.unwrap();
let buckets = buckets_list.into_iter().map(|v| v.name).collect();
rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await;
//lmdb env
// User home directory
/*if let Ok(home_dir) = env::var("HOME").or_else(|_| env::var("USERPROFILE")) {
let mut path = PathBuf::from(home_dir);
path.push(format!(".{DEFAULT_LOG_FILENAME}"));
path.push(DEFAULT_LOG_DIR);
if ensure_directory_writable(&path) {
//return path;
}
}*/
let test_lmdb_lifecycle_dir = "/tmp/lmdb_lifecycle".to_string();
let temp_dir = std::path::PathBuf::from(&test_lmdb_lifecycle_dir);
if temp_dir.exists() {
fs::remove_dir_all(&temp_dir).await.ok();
}
fs::create_dir_all(&temp_dir).await.unwrap();
let lmdb_env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&test_lmdb_lifecycle_dir).unwrap() };
let bucket_name = format!("test-lc-cache-{}", "00000");
let mut wtxn = lmdb_env.write_txn().unwrap();
let db = match lmdb_env
.database_options()
.name(&format!("bucket_{bucket_name}"))
.types::<I64<BigEndian>, LifecycleContentCodec>()
.flags(DatabaseFlags::DUP_SORT)
//.dup_sort_comparator::<>()
.create(&mut wtxn)
{
Ok(db) => db,
Err(err) => {
panic!("lmdb error: {err}");
}
};
let _ = wtxn.commit();
let _ = GLOBAL_LMDB_ENV.set(lmdb_env);
let _ = GLOBAL_LMDB_DB.set(db);
// Store in global once lock
let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone()));
(disk_paths, ecstore)
}
/// Test helper: Create a test bucket
#[allow(dead_code)]
async fn create_test_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
(**ecstore)
.make_bucket(bucket_name, &Default::default())
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Create a test lock bucket
async fn create_test_lock_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
(**ecstore)
.make_bucket(
bucket_name,
&MakeBucketOptions {
lock_enabled: true,
versioning_enabled: true,
..Default::default()
},
)
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Upload test object
async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str, data: &[u8]) {
let mut reader = PutObjReader::from_vec(data.to_vec());
let object_info = (**ecstore)
.put_object(bucket, object, &mut reader, &ObjectOptions::default())
.await
.expect("Failed to upload test object");
println!("object_info1: {object_info:?}");
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
}
/// Test helper: Check if object exists
async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
match (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
Ok(info) => !info.delete_marker,
Err(_) => false,
}
}
fn ns_to_offset_datetime(ns: i128) -> Option<OffsetDateTime> {
OffsetDateTime::from_unix_timestamp_nanos(ns).ok()
}
fn convert_record_to_object_info(record: &LocalObjectRecord) -> ObjectInfo {
let usage = &record.usage;
ObjectInfo {
bucket: usage.bucket.clone(),
name: usage.object.clone(),
size: usage.total_size as i64,
delete_marker: !usage.has_live_object && usage.delete_markers_count > 0,
mod_time: usage.last_modified_ns.and_then(ns_to_offset_datetime),
..Default::default()
}
}
#[allow(dead_code)]
fn to_object_info(
bucket: &str,
object: &str,
total_size: i64,
delete_marker: bool,
mod_time: OffsetDateTime,
version_id: &str,
) -> ObjectInfo {
ObjectInfo {
bucket: bucket.to_string(),
name: object.to_string(),
size: total_size,
delete_marker,
mod_time: Some(mod_time),
version_id: Some(Uuid::parse_str(version_id).unwrap()),
..Default::default()
}
}
#[derive(Debug, PartialEq, Eq)]
enum LifecycleType {
ExpiryCurrent,
ExpiryNoncurrent,
TransitionCurrent,
TransitionNoncurrent,
}
#[derive(Debug, PartialEq, Eq)]
pub struct LifecycleContent {
ver_no: u8,
ver_id: String,
mod_time: OffsetDateTime,
type_: LifecycleType,
object_name: String,
}
pub struct LifecycleContentCodec;
impl BytesEncode<'_> for LifecycleContentCodec {
type EItem = LifecycleContent;
fn bytes_encode(lcc: &Self::EItem) -> Result<Cow<'_, [u8]>, BoxedError> {
let (ver_no_byte, ver_id_bytes, mod_timestamp_bytes, type_byte, object_name_bytes) = match lcc {
LifecycleContent {
ver_no,
ver_id,
mod_time,
type_: LifecycleType::ExpiryCurrent,
object_name,
} => (
ver_no,
ver_id.clone().into_bytes(),
mod_time.unix_timestamp().to_be_bytes(),
0,
object_name.clone().into_bytes(),
),
LifecycleContent {
ver_no,
ver_id,
mod_time,
type_: LifecycleType::ExpiryNoncurrent,
object_name,
} => (
ver_no,
ver_id.clone().into_bytes(),
mod_time.unix_timestamp().to_be_bytes(),
1,
object_name.clone().into_bytes(),
),
LifecycleContent {
ver_no,
ver_id,
mod_time,
type_: LifecycleType::TransitionCurrent,
object_name,
} => (
ver_no,
ver_id.clone().into_bytes(),
mod_time.unix_timestamp().to_be_bytes(),
2,
object_name.clone().into_bytes(),
),
LifecycleContent {
ver_no,
ver_id,
mod_time,
type_: LifecycleType::TransitionNoncurrent,
object_name,
} => (
ver_no,
ver_id.clone().into_bytes(),
mod_time.unix_timestamp().to_be_bytes(),
3,
object_name.clone().into_bytes(),
),
};
let mut output = Vec::<u8>::new();
output.push(*ver_no_byte);
output.extend_from_slice(&ver_id_bytes);
output.extend_from_slice(&mod_timestamp_bytes);
output.push(type_byte);
output.extend_from_slice(&object_name_bytes);
Ok(Cow::Owned(output))
}
}
impl<'a> BytesDecode<'a> for LifecycleContentCodec {
type DItem = LifecycleContent;
fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, BoxedError> {
use std::mem::size_of;
let ver_no = match bytes.get(..size_of::<u8>()) {
Some(bytes) => bytes.try_into().map(u8::from_be_bytes).unwrap(),
None => return Err("invalid LifecycleContent: cannot extract ver_no".into()),
};
let ver_id = match bytes.get(size_of::<u8>()..(36 + 1)) {
Some(bytes) => unsafe { std::str::from_utf8_unchecked(bytes).to_string() },
None => return Err("invalid LifecycleContent: cannot extract ver_id".into()),
};
let mod_timestamp = match bytes.get((36 + 1)..(size_of::<i64>() + 36 + 1)) {
Some(bytes) => bytes.try_into().map(i64::from_be_bytes).unwrap(),
None => return Err("invalid LifecycleContent: cannot extract mod_time timestamp".into()),
};
let type_ = match bytes.get(size_of::<i64>() + 36 + 1) {
Some(&0) => LifecycleType::ExpiryCurrent,
Some(&1) => LifecycleType::ExpiryNoncurrent,
Some(&2) => LifecycleType::TransitionCurrent,
Some(&3) => LifecycleType::TransitionNoncurrent,
Some(_) => return Err("invalid LifecycleContent: invalid LifecycleType".into()),
None => return Err("invalid LifecycleContent: cannot extract LifecycleType".into()),
};
let object_name = match bytes.get((size_of::<i64>() + 36 + 1 + 1)..) {
Some(bytes) => unsafe { std::str::from_utf8_unchecked(bytes).to_string() },
None => return Err("invalid LifecycleContent: cannot extract object_name".into()),
};
Ok(LifecycleContent {
ver_no,
ver_id,
mod_time: OffsetDateTime::from_unix_timestamp(mod_timestamp).unwrap(),
type_,
object_name,
})
}
}
mod serial_tests {
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
//#[ignore]
async fn test_lifecycle_chche_build() {
let (_disk_paths, ecstore) = setup_test_env().await;
// Create test bucket and object
let suffix = uuid::Uuid::new_v4().simple().to_string();
let bucket_name = format!("test-lc-cache-{}", &suffix[..8]);
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
let test_data = b"Hello, this is test data for lifecycle expiry!";
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await;
// Verify object exists initially
assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await);
println!("✅ Object exists before lifecycle processing");
let scan_outcome = match local_scan::scan_and_persist_local_usage(ecstore.clone()).await {
Ok(outcome) => outcome,
Err(err) => {
warn!("Local usage scan failed: {}", err);
LocalScanOutcome::default()
}
};
let bucket_objects_map = &scan_outcome.bucket_objects;
let records = match bucket_objects_map.get(&bucket_name) {
Some(records) => records,
None => {
debug!("No local snapshot entries found for bucket {}; skipping lifecycle/integrity", bucket_name);
&vec![]
}
};
if let Some(lmdb_env) = GLOBAL_LMDB_ENV.get() {
if let Some(lmdb) = GLOBAL_LMDB_DB.get() {
let mut wtxn = lmdb_env.write_txn().unwrap();
/*if let Ok((lc_config, _)) = rustfs_ecstore::bucket::metadata_sys::get_lifecycle_config(bucket_name.as_str()).await {
if let Ok(object_info) = ecstore
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
.await
{
let event = rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::eval_action_from_lifecycle(
&lc_config,
None,
None,
&object_info,
)
.await;
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::apply_expiry_on_non_transitioned_objects(
ecstore.clone(),
&object_info,
&event,
&rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc::Scanner,
)
.await;
expired = wait_for_object_absence(&ecstore, bucket_name.as_str(), object_name, Duration::from_secs(2)).await;
}
}*/
for record in records {
if !record.usage.has_live_object {
continue;
}
let object_info = convert_record_to_object_info(record);
println!("object_info2: {object_info:?}");
let mod_time = object_info.mod_time.unwrap_or(OffsetDateTime::now_utc());
let expiry_time = rustfs_ecstore::bucket::lifecycle::lifecycle::expected_expiry_time(mod_time, 1);
let version_id = if let Some(version_id) = object_info.version_id {
version_id.to_string()
} else {
"zzzzzzzz-zzzz-zzzz-zzzz-zzzzzzzzzzzz".to_string()
};
lmdb.put(
&mut wtxn,
&expiry_time.unix_timestamp(),
&LifecycleContent {
ver_no: 0,
ver_id: version_id,
mod_time,
type_: LifecycleType::TransitionNoncurrent,
object_name: object_info.name,
},
)
.unwrap();
}
wtxn.commit().unwrap();
let mut wtxn = lmdb_env.write_txn().unwrap();
let iter = lmdb.iter_mut(&mut wtxn).unwrap();
//let _ = unsafe { iter.del_current().unwrap() };
for row in iter {
if let Ok(ref elm) = row {
let LifecycleContent {
ver_no,
ver_id,
mod_time,
type_,
object_name,
} = &elm.1;
println!("cache row:{ver_no} {ver_id} {mod_time} {type_:?} {object_name}");
//eval_inner(&oi.to_lifecycle_opts(), OffsetDateTime::now_utc()).await;
eval_inner(
&lifecycle::ObjectOpts {
name: oi.name.clone(),
user_tags: oi.user_tags.clone(),
version_id: oi.version_id.map(|v| v.to_string()).unwrap_or_default(),
mod_time: oi.mod_time,
size: oi.size as usize,
is_latest: oi.is_latest,
num_versions: oi.num_versions,
delete_marker: oi.delete_marker,
successor_mod_time: oi.successor_mod_time,
restore_ongoing: oi.restore_ongoing,
restore_expires: oi.restore_expires,
transition_status: oi.transitioned_object.status.clone(),
..Default::default()
},
OffsetDateTime::now_utc(),
)
.await;
}
println!("row:{row:?}");
}
//drop(iter);
wtxn.commit().unwrap();
}
}
println!("Lifecycle cache test completed");
}
}
async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event {
let mut events = Vec::<Event>::new();
info!(
"eval_inner: object={}, mod_time={:?}, now={:?}, is_latest={}, delete_marker={}",
obj.name, obj.mod_time, now, obj.is_latest, obj.delete_marker
);
if obj.mod_time.expect("err").unix_timestamp() == 0 {
info!("eval_inner: mod_time is 0, returning default event");
return Event::default();
}
if let Some(restore_expires) = obj.restore_expires {
if !restore_expires.unix_timestamp() == 0 && now.unix_timestamp() > restore_expires.unix_timestamp() {
let mut action = IlmAction::DeleteRestoredAction;
if !obj.is_latest {
action = IlmAction::DeleteRestoredVersionAction;
}
events.push(Event {
action,
due: Some(now),
rule_id: "".into(),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
}
if let Some(ref lc_rules) = self.filter_rules(obj).await {
for rule in lc_rules.iter() {
if obj.expired_object_deletemarker() {
if let Some(expiration) = rule.expiration.as_ref() {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(now),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
break;
}
if let Some(days) = expiration.days {
let expected_expiry = expected_expiry_time(obj.mod_time.unwrap(), days /*, date*/);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
break;
}
}
}
}
if obj.is_latest {
if let Some(ref expiration) = rule.expiration {
if let Some(expired_object_delete_marker) = expiration.expired_object_delete_marker {
if obj.delete_marker && expired_object_delete_marker {
let due = expiration.next_due(obj);
if let Some(due) = due {
if now.unix_timestamp() >= due.unix_timestamp() {
events.push(Event {
action: IlmAction::DelMarkerDeleteAllVersionsAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(due),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
}
continue;
}
}
}
}
if !obj.is_latest {
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
if let Some(newer_noncurrent_versions) = noncurrent_version_expiration.newer_noncurrent_versions {
if newer_noncurrent_versions > 0 {
continue;
}
}
}
}
if !obj.is_latest {
if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration {
if let Some(noncurrent_days) = noncurrent_version_expiration.noncurrent_days {
if noncurrent_days != 0 {
if let Some(successor_mod_time) = obj.successor_mod_time {
let expected_expiry = expected_expiry_time(successor_mod_time, noncurrent_days);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
events.push(Event {
action: IlmAction::DeleteVersionAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
}
}
}
}
}
if !obj.is_latest {
if let Some(ref noncurrent_version_transitions) = rule.noncurrent_version_transitions {
if let Some(ref storage_class) = noncurrent_version_transitions[0].storage_class {
if storage_class.as_str() != "" && !obj.delete_marker && obj.transition_status != TRANSITION_COMPLETE {
let due = rule.noncurrent_version_transitions.as_ref().unwrap()[0].next_due(obj);
if let Some(due0) = due {
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
events.push(Event {
action: IlmAction::TransitionVersionAction,
rule_id: rule.id.clone().expect("err!"),
due,
storage_class: rule.noncurrent_version_transitions.as_ref().unwrap()[0]
.storage_class
.clone()
.unwrap()
.as_str()
.to_string(),
..Default::default()
});
}
}
}
}
}
}
info!(
"eval_inner: checking expiration condition - is_latest={}, delete_marker={}, version_id={:?}, condition_met={}",
obj.is_latest,
obj.delete_marker,
obj.version_id,
(obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker
);
// Allow expiration for latest objects OR non-versioned objects (empty version_id)
if (obj.is_latest || obj.version_id.is_empty()) && !obj.delete_marker {
info!("eval_inner: entering expiration check");
if let Some(ref expiration) = rule.expiration {
if let Some(ref date) = expiration.date {
let date0 = OffsetDateTime::from(date.clone());
if date0.unix_timestamp() != 0 && (now.unix_timestamp() >= date0.unix_timestamp()) {
info!("eval_inner: expiration by date - date0={:?}", date0);
events.push(Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(date0),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
});
}
} else if let Some(days) = expiration.days {
let expected_expiry: OffsetDateTime = expected_expiry_time(obj.mod_time.unwrap(), days);
info!(
"eval_inner: expiration check - days={}, obj_time={:?}, expiry_time={:?}, now={:?}, should_expire={}",
days,
obj.mod_time.expect("err!"),
expected_expiry,
now,
now.unix_timestamp() > expected_expiry.unix_timestamp()
);
if now.unix_timestamp() >= expected_expiry.unix_timestamp() {
info!("eval_inner: object should expire, adding DeleteAction");
let mut event = Event {
action: IlmAction::DeleteAction,
rule_id: rule.id.clone().expect("err!"),
due: Some(expected_expiry),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
storage_class: "".into(),
};
/*if rule.expiration.expect("err!").delete_all.val {
event.action = IlmAction::DeleteAllVersionsAction
}*/
events.push(event);
}
} else {
info!("eval_inner: expiration.days is None");
}
} else {
info!("eval_inner: rule.expiration is None");
}
if obj.transition_status != TRANSITION_COMPLETE {
if let Some(ref transitions) = rule.transitions {
let due = transitions[0].next_due(obj);
if let Some(due0) = due {
if now.unix_timestamp() == 0 || now.unix_timestamp() > due0.unix_timestamp() {
events.push(Event {
action: IlmAction::TransitionAction,
rule_id: rule.id.clone().expect("err!"),
due,
storage_class: transitions[0].storage_class.clone().expect("err!").as_str().to_string(),
noncurrent_days: 0,
newer_noncurrent_versions: 0,
});
}
}
}
}
}
}
}
if events.len() > 0 {
events.sort_by(|a, b| {
if now.unix_timestamp() > a.due.expect("err!").unix_timestamp()
&& now.unix_timestamp() > b.due.expect("err").unix_timestamp()
|| a.due.expect("err").unix_timestamp() == b.due.expect("err").unix_timestamp()
{
match a.action {
IlmAction::DeleteAllVersionsAction
| IlmAction::DelMarkerDeleteAllVersionsAction
| IlmAction::DeleteAction
| IlmAction::DeleteVersionAction => {
return Ordering::Less;
}
_ => (),
}
match b.action {
IlmAction::DeleteAllVersionsAction
| IlmAction::DelMarkerDeleteAllVersionsAction
| IlmAction::DeleteAction
| IlmAction::DeleteVersionAction => {
return Ordering::Greater;
}
_ => (),
}
return Ordering::Less;
}
if a.due.expect("err").unix_timestamp() < b.due.expect("err").unix_timestamp() {
return Ordering::Less;
}
return Ordering::Greater;
});
return events[0].clone();
}
Event::default()
}

View File

@@ -0,0 +1,444 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use rustfs_ahm::{heal::storage::ECStoreHealStorage, init_heal_manager};
use rustfs_ecstore::{
bucket::metadata::BUCKET_LIFECYCLE_CONFIG,
bucket::metadata_sys,
disk::endpoint::Endpoint,
endpoints::{EndpointServerPools, Endpoints, PoolEndpoints},
global::GLOBAL_TierConfigMgr,
store::ECStore,
store_api::{MakeBucketOptions, ObjectIO, ObjectOptions, PutObjReader, StorageAPI},
tier::tier_config::{TierConfig, TierMinIO, TierType},
};
use rustfs_scanner::scanner::init_data_scanner;
use serial_test::serial;
use std::{
path::PathBuf,
sync::{Arc, Once, OnceLock},
time::Duration,
};
use tokio::fs;
use tokio_util::sync::CancellationToken;
use tracing::info;
static GLOBAL_ENV: OnceLock<(Vec<PathBuf>, Arc<ECStore>)> = OnceLock::new();
static INIT: Once = Once::new();
fn init_tracing() {
INIT.call_once(|| {
let _ = tracing_subscriber::fmt::try_init();
});
}
/// Test helper: Create test environment with ECStore
async fn setup_test_env() -> (Vec<PathBuf>, Arc<ECStore>) {
init_tracing();
// Fast path: already initialized, just clone and return
if let Some((paths, ecstore)) = GLOBAL_ENV.get() {
return (paths.clone(), ecstore.clone());
}
// create temp dir as 4 disks with unique base dir
let test_base_dir = format!("/tmp/rustfs_scanner_lifecycle_test_{}", uuid::Uuid::new_v4());
let temp_dir = std::path::PathBuf::from(&test_base_dir);
if temp_dir.exists() {
fs::remove_dir_all(&temp_dir).await.ok();
}
fs::create_dir_all(&temp_dir).await.unwrap();
// create 4 disk dirs
let disk_paths = vec![
temp_dir.join("disk1"),
temp_dir.join("disk2"),
temp_dir.join("disk3"),
temp_dir.join("disk4"),
];
for disk_path in &disk_paths {
fs::create_dir_all(disk_path).await.unwrap();
}
// create EndpointServerPools
let mut endpoints = Vec::new();
for (i, disk_path) in disk_paths.iter().enumerate() {
let mut endpoint = Endpoint::try_from(disk_path.to_str().unwrap()).unwrap();
// set correct index
endpoint.set_pool_index(0);
endpoint.set_set_index(0);
endpoint.set_disk_index(i);
endpoints.push(endpoint);
}
let pool_endpoints = PoolEndpoints {
legacy: false,
set_count: 1,
drives_per_set: 4,
endpoints: Endpoints::from(endpoints),
cmd_line: "test".to_string(),
platform: format!("OS: {} | Arch: {}", std::env::consts::OS, std::env::consts::ARCH),
};
let endpoint_pools = EndpointServerPools(vec![pool_endpoints]);
// format disks (only first time)
rustfs_ecstore::store::init_local_disks(endpoint_pools.clone()).await.unwrap();
// create ECStore with dynamic port 0 (let OS assign) or fixed 9002 if free
let port = 9002; // for simplicity
let server_addr: std::net::SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let ecstore = ECStore::new(server_addr, endpoint_pools, CancellationToken::new())
.await
.unwrap();
// init bucket metadata system
let buckets_list = ecstore
.list_bucket(&rustfs_ecstore::store_api::BucketOptions {
no_metadata: true,
..Default::default()
})
.await
.unwrap();
let buckets = buckets_list.into_iter().map(|v| v.name).collect();
rustfs_ecstore::bucket::metadata_sys::init_bucket_metadata_sys(ecstore.clone(), buckets).await;
// Initialize background expiry workers
rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::init_background_expiry(ecstore.clone()).await;
// Store in global once lock
let _ = GLOBAL_ENV.set((disk_paths.clone(), ecstore.clone()));
(disk_paths, ecstore)
}
/// Test helper: Create a test bucket
async fn create_test_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
(**ecstore)
.make_bucket(bucket_name, &Default::default())
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Create a test lock bucket
async fn create_test_lock_bucket(ecstore: &Arc<ECStore>, bucket_name: &str) {
(**ecstore)
.make_bucket(
bucket_name,
&MakeBucketOptions {
lock_enabled: true,
versioning_enabled: true,
..Default::default()
},
)
.await
.expect("Failed to create test bucket");
info!("Created test bucket: {}", bucket_name);
}
/// Test helper: Upload test object
async fn upload_test_object(ecstore: &Arc<ECStore>, bucket: &str, object: &str, data: &[u8]) {
let mut reader = PutObjReader::from_vec(data.to_vec());
let object_info = (**ecstore)
.put_object(bucket, object, &mut reader, &ObjectOptions::default())
.await
.expect("Failed to upload test object");
info!("Uploaded test object: {}/{} ({} bytes)", bucket, object, object_info.size);
}
/// Test helper: Set bucket lifecycle configuration
async fn set_bucket_lifecycle(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<ID>test-rule</ID>
<Status>Enabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<Expiration>
<Days>0</Days>
</Expiration>
</Rule>
</LifecycleConfiguration>"#;
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
Ok(())
}
/// Test helper: Set bucket lifecycle configuration
async fn set_bucket_lifecycle_deletemarker(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<ID>test-rule</ID>
<Status>Enabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<Expiration>
<Days>0</Days>
<ExpiredObjectDeleteMarker>true</ExpiredObjectDeleteMarker>
</Expiration>
</Rule>
</LifecycleConfiguration>"#;
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
Ok(())
}
#[allow(dead_code)]
async fn set_bucket_lifecycle_transition(bucket_name: &str) -> Result<(), Box<dyn std::error::Error>> {
// Create a simple lifecycle configuration XML with 0 days expiry for immediate testing
let lifecycle_xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<LifecycleConfiguration>
<Rule>
<ID>test-rule</ID>
<Status>Enabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<Transition>
<Days>0</Days>
<StorageClass>COLDTIER44</StorageClass>
</Transition>
</Rule>
<Rule>
<ID>test-rule2</ID>
<Status>Disabled</Status>
<Filter>
<Prefix>test/</Prefix>
</Filter>
<NoncurrentVersionTransition>
<NoncurrentDays>0</NoncurrentDays>
<StorageClass>COLDTIER44</StorageClass>
</NoncurrentVersionTransition>
</Rule>
</LifecycleConfiguration>"#;
metadata_sys::update(bucket_name, BUCKET_LIFECYCLE_CONFIG, lifecycle_xml.as_bytes().to_vec()).await?;
Ok(())
}
/// Test helper: Create a test tier
#[allow(dead_code)]
async fn create_test_tier(server: u32) {
let args = TierConfig {
version: "v1".to_string(),
tier_type: TierType::MinIO,
name: "COLDTIER44".to_string(),
s3: None,
aliyun: None,
tencent: None,
huaweicloud: None,
azure: None,
gcs: None,
r2: None,
rustfs: None,
minio: if server == 1 {
Some(TierMinIO {
access_key: "minioadmin".to_string(),
secret_key: "minioadmin".to_string(),
bucket: "hello".to_string(),
endpoint: "http://39.105.198.204:9000".to_string(),
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
region: "".to_string(),
..Default::default()
})
} else if server == 2 {
Some(TierMinIO {
access_key: "minioadmin".to_string(),
secret_key: "minioadmin".to_string(),
bucket: "mblock2".to_string(),
endpoint: "http://m1ddns.pvtool.com:9020".to_string(),
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
region: "".to_string(),
..Default::default()
})
} else {
Some(TierMinIO {
access_key: "minioadmin".to_string(),
secret_key: "minioadmin".to_string(),
bucket: "mblock2".to_string(),
endpoint: "http://127.0.0.1:9020".to_string(),
prefix: format!("mypre{}/", uuid::Uuid::new_v4()),
region: "".to_string(),
..Default::default()
})
},
};
let mut tier_config_mgr = GLOBAL_TierConfigMgr.write().await;
if let Err(err) = tier_config_mgr.add(args, false).await {
println!("tier_config_mgr add failed, e: {err:?}");
panic!("tier add failed. {err}");
}
if let Err(e) = tier_config_mgr.save().await {
println!("tier_config_mgr save failed, e: {e:?}");
panic!("tier save failed");
}
println!("Created test tier: COLDTIER44");
}
/// Test helper: Check if object exists
async fn object_exists(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
match (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
Ok(info) => !info.delete_marker,
Err(_) => false,
}
}
/// Test helper: Check if object exists
#[allow(dead_code)]
async fn object_is_delete_marker(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
println!("oi: {oi:?}");
oi.delete_marker
} else {
println!("object_is_delete_marker is error");
panic!("object_is_delete_marker is error");
}
}
/// Test helper: Check if object exists
#[allow(dead_code)]
async fn object_is_transitioned(ecstore: &Arc<ECStore>, bucket: &str, object: &str) -> bool {
if let Ok(oi) = (**ecstore).get_object_info(bucket, object, &ObjectOptions::default()).await {
println!("oi: {oi:?}");
!oi.transitioned_object.status.is_empty()
} else {
println!("object_is_transitioned is error");
panic!("object_is_transitioned is error");
}
}
async fn wait_for_object_absence(ecstore: &Arc<ECStore>, bucket: &str, object: &str, timeout: Duration) -> bool {
let deadline = tokio::time::Instant::now() + timeout;
loop {
if !object_exists(ecstore, bucket, object).await {
return true;
}
if tokio::time::Instant::now() >= deadline {
return false;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
mod serial_tests {
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[serial]
#[ignore]
async fn test_lifecycle_transition_basic() {
let (_disk_paths, ecstore) = setup_test_env().await;
create_test_tier(2).await;
// Create test bucket and object
let suffix = uuid::Uuid::new_v4().simple().to_string();
let bucket_name = format!("test-lc-transition-{}", &suffix[..8]);
let object_name = "test/object.txt"; // Match the lifecycle rule prefix "test/"
let test_data = b"Hello, this is test data for lifecycle expiry!";
create_test_lock_bucket(&ecstore, bucket_name.as_str()).await;
upload_test_object(
&ecstore,
bucket_name.as_str(),
object_name,
b"Hello, this is test data for lifecycle expiry 1111-11111111-1111 !",
)
.await;
//create_test_bucket(&ecstore, bucket_name.as_str()).await;
upload_test_object(&ecstore, bucket_name.as_str(), object_name, test_data).await;
// Verify object exists initially
assert!(object_exists(&ecstore, bucket_name.as_str(), object_name).await);
println!("✅ Object exists before lifecycle processing");
// Set lifecycle configuration with very short expiry (0 days = immediate expiry)
set_bucket_lifecycle_transition(bucket_name.as_str())
.await
.expect("Failed to set lifecycle configuration");
println!("✅ Lifecycle configuration set for bucket: {bucket_name}");
// Verify lifecycle configuration was set
match rustfs_ecstore::bucket::metadata_sys::get(bucket_name.as_str()).await {
Ok(bucket_meta) => {
assert!(bucket_meta.lifecycle_config.is_some());
println!("✅ Bucket metadata retrieved successfully");
}
Err(e) => {
println!("❌ Error retrieving bucket metadata: {e:?}");
}
}
let ctx = CancellationToken::new();
// Start scanner
let heal_storage = Arc::new(ECStoreHealStorage::new(ecstore.clone()));
init_heal_manager(heal_storage, None).await;
init_data_scanner(ctx.clone(), ecstore.clone()).await;
println!("✅ Scanner started");
// Wait for scanner to process lifecycle rules
tokio::time::sleep(Duration::from_secs(1200)).await;
// Check if object has been expired (deleted)
let check_result = object_is_transitioned(&ecstore, &bucket_name, object_name).await;
println!("Object exists after lifecycle processing: {check_result}");
if check_result {
println!("✅ Object was transitioned by lifecycle processing");
// Let's try to get object info to see its details
match ecstore
.get_object_info(bucket_name.as_str(), object_name, &rustfs_ecstore::store_api::ObjectOptions::default())
.await
{
Ok(obj_info) => {
println!(
"Object info: name={}, size={}, mod_time={:?}",
obj_info.name, obj_info.size, obj_info.mod_time
);
println!("Object info: transitioned_object={:?}", obj_info.transitioned_object);
}
Err(e) => {
println!("Error getting object info: {e:?}");
}
}
} else {
println!("❌ Object was not transitioned by lifecycle processing");
}
assert!(check_result);
println!("✅ Object successfully transitioned");
// Stop scanner
ctx.cancel();
println!("✅ Scanner stopped");
println!("Lifecycle transition basic test completed");
}
}

View File

@@ -1071,76 +1071,62 @@ impl S3 for FS {
version_id,
..
} = req.input.clone();
let rreq = rreq.unwrap();
/*if let Err(e) = un_escape_path(object) {
warn!("post restore object failed, e: {:?}", e);
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
}*/
let rreq = rreq.ok_or_else(|| {
S3Error::with_message(S3ErrorCode::Custom("ErrValidRestoreObject".into()), "restore request is required")
})?;
let Some(store) = new_object_layer_fn() else {
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
};
/*if Err(err) = check_request_auth_type(req, policy::RestoreObjectAction, bucket, object) {
return Err(S3Error::with_message(S3ErrorCode::Custom("PostRestoreObjectFailed".into()), "post restore object failed"));
}*/
let version_id_str = version_id.unwrap_or_default();
let opts = post_restore_opts(&version_id_str, &bucket, &object)
.await
.map_err(|_| S3Error::with_message(S3ErrorCode::Custom("ErrPostRestoreOpts".into()), "restore object failed."))?;
/*if req.content_length <= 0 {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrEmptyRequestBody".into()), "post restore object failed"));
}*/
let Ok(opts) = post_restore_opts(&version_id.unwrap(), &bucket, &object).await else {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
"post restore object failed",
));
};
let Ok(mut obj_info) = store.get_object_info(&bucket, &object, &opts).await else {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
"post restore object failed",
));
};
let mut obj_info = store
.get_object_info(&bucket, &object, &opts)
.await
.map_err(|_| S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "restore object failed."))?;
// Check if object is in a transitioned state
if obj_info.transitioned_object.status != lifecycle::TRANSITION_COMPLETE {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
"post restore object failed",
S3ErrorCode::Custom("ErrInvalidTransitionedState".into()),
"restore object failed.",
));
}
//let mut api_err;
let mut _status_code = StatusCode::OK;
let mut already_restored = false;
if let Err(_err) = rreq.validate(store.clone()) {
//api_err = to_api_err(ErrMalformedXML);
//api_err.description = err.to_string();
// Validate restore request
rreq.validate(store.clone()).map_err(|_| {
S3Error::with_message(S3ErrorCode::Custom("ErrValidRestoreObject".into()), "restore object validation failed")
})?;
// Check if restore is already in progress
if obj_info.restore_ongoing && (rreq.type_.as_ref().is_none_or(|t| t.as_str() != "SELECT")) {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrEmptyRequestBody".into()),
"post restore object failed",
S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()),
"restore object failed.",
));
} else {
if obj_info.restore_ongoing && (rreq.type_.is_none() || rreq.type_.as_ref().unwrap().as_str() != "SELECT") {
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrObjectRestoreAlreadyInProgress".into()),
"post restore object failed",
));
}
if !obj_info.restore_ongoing && obj_info.restore_expires.unwrap().unix_timestamp() != 0 {
_status_code = StatusCode::ACCEPTED;
}
let mut already_restored = false;
if let Some(restore_expires) = obj_info.restore_expires {
if !obj_info.restore_ongoing && restore_expires.unix_timestamp() != 0 {
already_restored = true;
}
}
let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), *rreq.days.as_ref().unwrap());
let restore_expiry = lifecycle::expected_expiry_time(OffsetDateTime::now_utc(), *rreq.days.as_ref().unwrap_or(&1));
let mut metadata = obj_info.user_defined.clone();
let mut header = HeaderMap::new();
let obj_info_ = obj_info.clone();
if rreq.type_.is_none() || rreq.type_.as_ref().unwrap().as_str() != "SELECT" {
if rreq.type_.as_ref().is_none_or(|t| t.as_str() != "SELECT") {
obj_info.metadata_only = true;
metadata.insert(AMZ_RESTORE_EXPIRY_DAYS.to_string(), rreq.days.unwrap().to_string());
metadata.insert(AMZ_RESTORE_EXPIRY_DAYS.to_string(), rreq.days.unwrap_or(1).to_string());
metadata.insert(AMZ_RESTORE_REQUEST_DATE.to_string(), OffsetDateTime::now_utc().format(&Rfc3339).unwrap());
if already_restored {
metadata.insert(
@@ -1162,7 +1148,8 @@ impl S3 for FS {
);
}
obj_info.user_defined = metadata;
if let Err(_err) = store
store
.clone()
.copy_object(
&bucket,
@@ -1181,12 +1168,8 @@ impl S3 for FS {
},
)
.await
{
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrInvalidObjectState".into()),
"post restore object failed",
));
}
.map_err(|_| S3Error::with_message(S3ErrorCode::Custom("ErrCopyObject".into()), "restore object failed"))?;
if already_restored {
let output = RestoreObjectOutput {
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
@@ -1196,11 +1179,11 @@ impl S3 for FS {
}
}
let restore_object = Uuid::new_v4().to_string();
//if let Some(rreq) = rreq {
// Handle output location for SELECT requests
if let Some(output_location) = &rreq.output_location {
if let Some(s3) = &output_location.s3 {
if !s3.bucket_name.is_empty() {
let restore_object = Uuid::new_v4().to_string();
header.insert(
X_AMZ_RESTORE_OUTPUT_PATH,
format!("{}{}{}", s3.bucket_name, s3.prefix, restore_object).parse().unwrap(),
@@ -1208,86 +1191,40 @@ impl S3 for FS {
}
}
}
//}
/*send_event(EventArgs {
event_name: event::ObjectRestorePost,
bucket_name: bucket,
object: obj_info,
req_params: extract_req_params(r),
user_agent: req.user_agent(),
host: handlers::get_source_ip(r),
});*/
tokio::spawn(async move {
/*if rreq.select_parameters.is_some() {
let actual_size = obj_info_.get_actual_size();
if actual_size.is_err() {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
let object_rsc = s3select.new_object_read_seek_closer(
|offset: i64| -> (ReadCloser, error) {
rs := &HTTPRangeSpec{
IsSuffixLength: false,
Start: offset,
End: -1,
}
return get_transitioned_object_reader(bucket, object, rs, r.Header,
obj_info, ObjectOptions {version_id: obj_info_.version_id});
},
actual_size.unwrap(),
);
if err = rreq.select_parameters.open(object_rsc); err != nil {
if serr, ok := err.(s3select.SelectError); ok {
let encoded_error_response = encodeResponse(APIErrorResponse {
code: serr.ErrorCode(),
message: serr.ErrorMessage(),
bucket_name: bucket,
key: object,
resource: r.URL.Path,
request_id: w.Header().Get(xhttp.AmzRequestID),
host_id: globalDeploymentID(),
});
//writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
} else {
return Err(S3Error::with_message(S3ErrorCode::Custom("ErrInvalidObjectState".into()), "post restore object failed"));
}
return Ok(());
}
let nr = httptest.NewRecorder();
let rw = xhttp.NewResponseRecorder(nr);
rw.log_err_body = true;
rw.log_all_body = true;
rreq.select_parameters.evaluate(rw);
rreq.select_parameters.Close();
return Ok(S3Response::with_headers((StatusCode::OK, Body::empty()), header));
}*/
// Spawn restoration task in the background
let store_clone = store.clone();
let bucket_clone = bucket.clone();
let object_clone = object.clone();
let rreq_clone = rreq.clone();
let obj_info_clone = obj_info_.clone();
tokio::spawn(async move {
let opts = ObjectOptions {
transition: TransitionOptions {
restore_request: rreq,
restore_request: rreq_clone,
restore_expiry,
..Default::default()
},
version_id: obj_info_.version_id.map(|e| e.to_string()),
version_id: obj_info_clone.version_id.map(|e| e.to_string()),
..Default::default()
};
if let Err(err) = store.clone().restore_transitioned_object(&bucket, &object, &opts).await {
warn!("unable to restore transitioned bucket/object {}/{}: {}", bucket, object, err.to_string());
return Err(S3Error::with_message(
S3ErrorCode::Custom("ErrRestoreTransitionedObject".into()),
format!("unable to restore transitioned bucket/object {bucket}/{object}: {err}"),
));
}
/*send_event(EventArgs {
EventName: event.ObjectRestoreCompleted,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
});*/
Ok(())
if let Err(err) = store_clone
.restore_transitioned_object(&bucket_clone, &object_clone, &opts)
.await
{
warn!(
"unable to restore transitioned bucket/object {}/{}: {}",
bucket_clone,
object_clone,
err.to_string()
);
// Note: Errors from background tasks cannot be returned to client
// Consider adding to monitoring/metrics system
} else {
info!("successfully restored transitioned object: {}/{}", bucket_clone, object_clone);
}
});
let output = RestoreObjectOutput {

View File

@@ -0,0 +1 @@
ansible-playbook rustfs/tests/lifecycle_ansible_test.yml

View File

@@ -0,0 +1,15 @@
---
- hosts: webservers
remote_user: leafcolor
vars:
ansible_ssh_pass: "123456"
tasks:
- name: Configure S3 bucket lifecycle
community.aws.s3_lifecycle:
endpoint_url: "http://m1ddns.pvtool.com:9000"
access_key: "rustfsadmin"
secret_key: "rustfsadmin"
name: "mblock"
rule_id: "rule1"
state: present
expiration_days: "1"

View File

@@ -0,0 +1,120 @@
# Copyright 2024 RustFS Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import timedelta
import uuid
from fastapi import UploadFile
from minio import Minio
from minio.commonconfig import Filter
from minio.error import S3Error
from minio.lifecycleconfig import Expiration, LifecycleConfig, Rule
import os
from collections import namedtuple
Settings = namedtuple("Settings", ['oss_endpoint',
'oss_access_key',
'oss_secret_key',
'oss_bucket_name',
'oss_lifecycle_days',
'oss_secure',
'oss_region'])
settings = Settings(
oss_endpoint = "m1ddns.pvtool.com:9000",
oss_access_key = "rustfsadmin",
oss_secret_key = "rustfsadmin",
oss_bucket_name = "mblock99",
oss_lifecycle_days = 1,
oss_secure = False,
oss_region = ""
)
class OSS:
def __init__(self):
self.bucket_name = settings.oss_bucket_name
self.lifecycle_days = settings.oss_lifecycle_days
self.client = Minio(
endpoint=settings.oss_endpoint,
access_key=settings.oss_access_key,
secret_key=settings.oss_secret_key,
secure=settings.oss_secure,
region=settings.oss_region,
)
def new_uuid(self):
return str(uuid.uuid4())
def create_bucket(self):
found = self.client.bucket_exists(self.bucket_name)
if not found:
try:
self.client.make_bucket(self.bucket_name)
self.set_lifecycle_expiration(days=self.lifecycle_days)
print(f"Bucket {self.bucket_name} created successfully")
except S3Error as exc:
if exc.code == "BucketAlreadyOwnedByYou":
print(f"Bucket {self.bucket_name} already owned by you; continuing")
else:
raise
else:
print(f"Bucket {self.bucket_name} already exists")
def set_lifecycle_expiration(self,days: int = 1, prefix: str = "") -> None:
"""
设置按天自动过期删除MinIO 按天、每天巡检一次;<1天不生效
"""
rule_filter = Filter(prefix=prefix or "")
rule = Rule(
rule_id=f"expire-{prefix or 'all'}-{days}d",
status="Enabled",
rule_filter=rule_filter,
expiration=Expiration(days=int(days)),
)
cfg = LifecycleConfig([rule])
self.client.set_bucket_lifecycle(self.bucket_name, cfg)
def upload_file(self, file: UploadFile):
"""
上传文件到OSS返回文件的UUID
"""
ext = os.path.splitext(file.filename)[1]
uuid = self.new_uuid()
filename = f'{uuid}{ext}'
file.file.seek(0)
self.client.put_object(
self.bucket_name,
filename,
file.file,
length=-1,
part_size=10*1024*1024,
)
return filename
def get_presigned_url(self, filename: str):
"""
获取文件的预签名URL用于下载文件
"""
return self.client.presigned_get_object(
self.bucket_name, filename, expires=timedelta(days=self.lifecycle_days)
)
def get_oss():
"""
获取OSS实例
"""
return OSS()
if __name__ == "__main__":
oss = get_oss()
oss.create_bucket()